parquet_record/
lib.rs

1use arrow::array::{Array, RecordBatch};
2use arrow::datatypes::ArrowPrimitiveType;
3use arrow::datatypes::Schema;
4use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
5use parquet::arrow::ArrowWriter;
6use parquet::arrow::ProjectionMask;
7use rayon::prelude::*;
8use rayon::iter::ParallelBridge;
9use std::fs::File;
10use std::{fs, io};
11use std::io::BufWriter;
12use std::path::Path;
13use std::sync::{Arc, Mutex, RwLock};
14
15#[derive(Debug, Clone)]
16pub struct ParquetRecordConfig {
17    pub verbose: bool,
18}
19
20impl ParquetRecordConfig {
21    pub fn with_verbose(verbose: bool) -> Self {
22        Self { verbose }
23    }
24
25    pub fn silent() -> Self {
26        Self { verbose: false }
27    }
28}
29
30impl Default for ParquetRecordConfig {
31    fn default() -> Self {
32        Self { verbose: true }
33    }
34}
35
36/// A trait for types that can be serialized to and from Parquet format.
37pub trait ParquetRecord: Send + Sync {
38    /// Returns the schema for this record type.
39    fn schema() -> Arc<Schema>;
40
41    /// Converts a slice of items to a RecordBatch.
42    fn items_to_records(schema: Arc<Schema>, items: &[Self]) -> RecordBatch
43    where
44        Self: Sized;
45
46    /// Converts a RecordBatch back to a vector of items.
47    fn records_to_items(record_batch: &RecordBatch) -> io::Result<Vec<Self>>
48    where
49        Self: Sized;
50}
51
52#[derive(Debug)]
53pub struct BatchBuffer<T: ParquetRecord> {
54    pub items: Vec<T>,
55}
56
57impl<T: ParquetRecord> Default for BatchBuffer<T> {
58    fn default() -> Self {
59        Self {
60            items: Vec::new(),
61        }
62    }
63}
64
65impl<T: ParquetRecord> BatchBuffer<T> {
66    pub fn new(capacity: usize) -> Self {
67        Self {
68            items: Vec::with_capacity(capacity),
69        }
70    }
71
72    pub fn clear(&mut self) {
73        self.items.clear();
74    }
75}
76
77/// Statistics about the writing process.
78#[derive(Debug, Default, Clone)]
79pub struct WriteStats {
80    pub total_items_written: usize,
81    pub total_batches_written: usize,
82    pub total_bytes_written: usize,
83}
84
85#[derive(Debug)]
86pub struct ParquetBatchWriter<T: ParquetRecord + Clone> {
87    buffer: Mutex<BatchBuffer<T>>,
88    buffer_size: usize,
89    file_writer: Mutex<Option<Box<ArrowWriter<BufWriter<File>>>>>, // Created lazily on first write
90    schema: Arc<Schema>,
91    output_file: String,
92    config: ParquetRecordConfig,
93    stats: Mutex<WriteStats>,
94    closed: RwLock<bool>,
95}
96
97impl<T: ParquetRecord + Clone> ParquetBatchWriter<T> {
98    pub fn new(output_file: String, buffer_size: Option<usize>) -> Self {
99        Self::with_config(output_file, buffer_size, ParquetRecordConfig::default())
100    }
101
102    pub fn with_config(
103        output_file: String,
104        buffer_size: Option<usize>,
105        config: ParquetRecordConfig,
106    ) -> Self {
107        Self {
108            buffer: Mutex::new(BatchBuffer::new(buffer_size.unwrap_or(1024))),
109            buffer_size: buffer_size.unwrap_or(usize::MAX),
110            file_writer: Mutex::new(None),
111            schema: T::schema(),
112            output_file,
113            config,
114            stats: Mutex::new(WriteStats::default()),
115            closed: RwLock::new(false),
116        }
117    }
118
119    /// Adds items to the buffer. If the buffer exceeds the specified size,
120    /// it will be swapped with a secondary buffer and written to disk on the same thread.
121    pub fn add_items(&self, items: &[T]) -> Result<(), io::Error> {
122        let closed_guard = self.closed.read().unwrap();
123        if *closed_guard {
124            return Err(io::Error::other("already closed"));
125        }
126        if self.buffer_size == usize::MAX {
127            let mut buffer_guard = self
128                .buffer
129                .lock()
130                .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
131            buffer_guard.items.extend_from_slice(items);
132            Ok(())
133        } else {
134            let mut remaining_items = items;
135            while !remaining_items.is_empty() {
136                let mut buffer_guard = self
137                    .buffer
138                    .lock()
139                    .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
140
141                let available_space = self.buffer_size - buffer_guard.items.len();
142
143                if available_space == 0 {
144                    let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
145                    std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
146                    drop(buffer_guard);
147
148                    self.write_buffer_to_disk(secondary_buffer)?;
149                    continue;
150                }
151
152                let take_count = std::cmp::min(available_space, remaining_items.len());
153                let (items_to_add, remaining) = remaining_items.split_at(take_count);
154                buffer_guard.items.extend_from_slice(items_to_add);
155                remaining_items = remaining;
156
157                if buffer_guard.items.len() >= self.buffer_size {
158                    let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
159                    std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
160                    drop(buffer_guard);
161
162                    self.write_buffer_to_disk(secondary_buffer)?;
163                }
164            }
165            Ok(())
166        }
167    }
168
169    pub fn add_item(&self, item: T) -> Result<(), io::Error> {
170        let closed_guard = self.closed.read().unwrap();
171        if *closed_guard {
172            return Err(io::Error::other("already closed"));
173        }
174        if self.buffer_size == usize::MAX {
175            let mut buffer_guard = self
176                .buffer
177                .lock()
178                .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
179            buffer_guard.items.push(item);
180            Ok(())
181        } else {
182            let mut buffer_guard = self
183                .buffer
184                .lock()
185                .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
186
187            if buffer_guard.items.len() >= self.buffer_size {
188                let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
189                std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
190                drop(buffer_guard);
191
192                self.write_buffer_to_disk(secondary_buffer)?;
193
194                let mut buffer_guard = self
195                    .buffer
196                    .lock()
197                    .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
198                buffer_guard.items.push(item);
199            } else {
200                buffer_guard.items.push(item);
201
202                if buffer_guard.items.len() >= self.buffer_size {
203                    let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
204                    std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
205                    drop(buffer_guard);
206
207                    self.write_buffer_to_disk(secondary_buffer)?;
208                }
209            }
210            Ok(())
211        }
212    }
213
214    /// Write the buffer to disk (this happens on the same thread as the caller)
215    /// This function also handles lazy initialization of file and writer on first use
216    fn write_buffer_to_disk(&self, buffer: BatchBuffer<T>) -> Result<(), io::Error> {
217        if buffer.items.is_empty() {
218            return Ok(());
219        }
220
221        let mut writer_guard = self
222            .file_writer
223            .lock()
224            .map_err(|_| io::Error::other("Writer lock poisoned"))?;
225
226        let record_batch = T::items_to_records(self.schema.clone(), &buffer.items);
227
228        let batch_size_bytes = self.calculate_batch_size(&record_batch);
229
230        if writer_guard.is_none() {
231            let path = Path::new(&self.output_file);
232            if let Some(parent) = path.parent() {
233                fs::create_dir_all(parent)?;
234            }
235            let file = File::create(path).map_err(|e| {
236                io::Error::other(format!("File creation error: {}", e))
237            })?;
238            let buf_writer = BufWriter::new(file);
239
240            let writer = ArrowWriter::try_new(buf_writer, record_batch.schema(), None)
241                .map_err(|e| {
242                    io::Error::other(
243                        format!("ArrowWriter creation error: {}", e),
244                    )
245                })?;
246            let mut boxed_writer = Box::new(writer);
247
248            boxed_writer.write(&record_batch).map_err(|e| {
249                io::Error::other(format!("Initial write error: {}", e))
250            })?;
251
252            *writer_guard = Some(boxed_writer);
253        } else if let Some(ref mut writer) = *writer_guard {
254            writer.write(&record_batch).map_err(|e| {
255                io::Error::other(format!("Parquet write error: {}", e))
256            })?;
257        }
258
259        let mut stats_guard = self
260            .stats
261            .lock()
262            .map_err(|_| io::Error::other("Stats lock poisoned"))?;
263
264        stats_guard.total_items_written += buffer.items.len();
265        stats_guard.total_batches_written += 1;
266        stats_guard.total_bytes_written += batch_size_bytes;
267
268        if self.config.verbose {
269            let mb_written = (batch_size_bytes as f64 / (1024.0 * 1024.0)).ceil() as usize;
270            let total_mb_written =
271                (stats_guard.total_bytes_written as f64 / (1024.0 * 1024.0)).ceil() as usize;
272            println!(
273                "[ParquetWriter {}] Wrote batch of {} items ({} MB) (batch #{}) Total: {} records / {} MB",
274                self.output_file,
275                buffer.items.len(),
276                mb_written,
277                stats_guard.total_batches_written,
278                stats_guard.total_items_written,
279                total_mb_written
280            );
281        }
282
283        Ok(())
284    }
285
286    /// Flushes the current buffer to disk
287    pub fn flush(&self) -> Result<(), io::Error> {
288        let mut buffer_guard = self
289            .buffer
290            .lock()
291            .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
292
293        if buffer_guard.items.is_empty() {
294            return Ok(());
295        }
296
297        let buffer_to_write = std::mem::take(&mut *buffer_guard);
298        drop(buffer_guard);
299
300        self.write_buffer_to_disk(buffer_to_write)
301    }
302
303    /// Gets the number of items currently in the buffer
304    pub fn buffer_len(&self) -> usize {
305        match self.buffer.lock() {
306            Ok(guard) => guard.items.len(),
307            Err(_) => 0,
308        }
309    }
310
311    pub fn close(self) -> Result<(), io::Error> {
312        self.close_no_consume()
313    }
314
315    /// Closes the writer and finalizes the file
316    pub fn close_no_consume(&self) -> Result<(), io::Error> {
317        let mut closed_guard = self.closed.write().unwrap();
318        if *closed_guard {
319            return Ok(())
320        }
321        *closed_guard = true;
322        // Flush any remaining items
323        self.flush()?;
324
325        // Print final stats if verbose
326        if self.config.verbose {
327            let stats_guard = self
328                .stats
329                .lock()
330                .map_err(|_| io::Error::other("Stats lock poisoned"))?;
331            let total_mb_written =
332                (stats_guard.total_bytes_written as f64 / (1024.0 * 1024.0)).ceil() as usize;
333            println!(
334                "[ParquetWriter {}] Final stats - Total items: {}, Total batches: {}, Total MB: {}",
335                self.output_file,
336                stats_guard.total_items_written,
337                stats_guard.total_batches_written,
338                total_mb_written
339            );
340        }
341
342        let maybe_writer = {
343            let mut writer_guard = self
344                .file_writer
345                .lock()
346                .map_err(|_| io::Error::other("Writer lock poisoned"))?;
347            writer_guard.take()
348        };
349
350        if let Some(writer) = maybe_writer {
351            writer.close().map_err(|e| {
352                io::Error::other(format!("Writer close error: {}", e))
353            })?;
354        }
355
356        Ok(())
357    }
358
359    /// Returns the current write statistics
360    pub fn get_stats(&self) -> Result<WriteStats, io::Error> {
361        let stats_guard = self
362            .stats
363            .lock()
364            .map_err(|_| io::Error::other("Stats lock poisoned"))?;
365        Ok(stats_guard.clone())
366    }
367
368    /// Calculate the approximate size of a record batch in bytes
369    fn calculate_batch_size(&self, record_batch: &RecordBatch) -> usize {
370        let mut total_size = 0;
371
372        for column in record_batch.columns() {
373            total_size += column.get_array_memory_size();
374        }
375
376        total_size
377    }
378}
379
380impl<T:ParquetRecord + Clone>Drop for ParquetBatchWriter<T> {
381    fn drop(&mut self) {
382        self.close_no_consume().expect("Could not close ParquetBatchWriter during drop");
383    }
384}
385
386/// Read parquet file with the provided configuration.
387pub fn read_parquet_with_config<T>(
388    _schema: Arc<Schema>,
389    file_path: &str,
390    batch_size: Option<usize>,
391    _config: &ParquetRecordConfig,
392) -> Option<(usize, impl Iterator<Item = Vec<T>>)>
393where
394    T: ParquetRecord,
395{
396    let file = match File::open(file_path) {
397        Ok(f) => f,
398        Err(_e) => {
399            return None;
400        }
401    };
402
403    let builder = match ParquetRecordBatchReaderBuilder::try_new(file) {
404        Ok(b) => b,
405        Err(e) => {
406            eprintln!("[ParquetReader] Cannot create reader: {}", e);
407            return None;
408        }
409    };
410
411    let metadata = builder.metadata();
412    let total_rows = metadata.file_metadata().num_rows() as usize;
413
414    let actual_batch_size = batch_size.unwrap_or_else(|| std::cmp::max(1, total_rows));
415    let reader = match builder.with_batch_size(actual_batch_size).build() {
416        Ok(r) => r,
417        Err(_e) => {
418            eprintln!("[ParquetReader] Cannot build reader: {_e}");
419            return None;
420        }
421    };
422
423    let scanned_iterator = reader.scan(false, |errored, record_batch_result| {
424        if *errored {
425            return None;
426        }
427
428        match record_batch_result {
429            Ok(record_batch) => {
430                match T::records_to_items(&record_batch) {
431                    Ok(vec_t) => Some(vec_t),
432                    Err(e) => {
433                        eprintln!("[ParquetReader] Error converting batch: {}", e);
434                        *errored = true;
435                        None
436                    }
437                }
438            }
439            Err(e) => {
440                eprintln!("[ParquetReader] Parquet read error: {}", e);
441                *errored = true;
442                None
443            }
444        }
445    });
446
447    Some((total_rows, scanned_iterator))
448}
449
450/// Read parquet file with default configuration (verbose enabled).
451pub fn read_parquet<T>(
452    schema: Arc<Schema>,
453    file_path: &str,
454    batch_size: Option<usize>,
455) -> Option<(usize, impl Iterator<Item = Vec<T>>)>
456where
457    T: ParquetRecord,
458{
459    read_parquet_with_config(
460        schema,
461        file_path,
462        batch_size,
463        &ParquetRecordConfig::default(),
464    )
465}
466
467/// Read only specified column from parquet file with the provided configuration.
468/// This function efficiently reads only the specified column from parquet files, which is faster
469/// because only the specified column needs to be scanned.
470pub fn read_parquet_columns_with_config<I>(
471    file_path: &str,
472    column_name: &str,
473    batch_size: Option<usize>,
474    _config: &ParquetRecordConfig,
475) -> Option<(usize, impl Iterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
476where
477    I: ArrowPrimitiveType,
478{
479    let file = match File::open(file_path) {
480        Ok(f) => f,
481        Err(e) => {
482            eprintln!("[ParquetReader] Cannot open file {}: {}", file_path, e);
483            return None;
484        }
485    };
486
487    let builder = match ParquetRecordBatchReaderBuilder::try_new(file) {
488        Ok(b) => b,
489        Err(e) => {
490            eprintln!("[ParquetReader] Cannot create reader: {}", e);
491            return None;
492        }
493    };
494
495    let metadata = builder.metadata();
496    let total_rows = metadata.file_metadata().num_rows() as usize;
497
498    let column_indices: Vec<usize> = builder
499        .parquet_schema()
500        .columns()
501        .iter()
502        .enumerate()
503        .filter_map(|(i, col)| {
504            if col.name() == column_name {
505                Some(i)
506            } else {
507                None
508            }
509        })
510        .collect();
511
512    if column_indices.is_empty() {
513        eprintln!("[ParquetReader] Column '{}' not found in parquet file schema", column_name);
514        return None;
515    }
516
517    let mask = ProjectionMask::roots(builder.parquet_schema(), column_indices);
518    let builder = builder.with_projection(mask);
519
520    let actual_batch_size = batch_size.unwrap_or_else(|| std::cmp::max(1, total_rows));
521    let reader = match builder.with_batch_size(actual_batch_size).build() {
522        Ok(r) => r,
523        Err(_e) => {
524            eprintln!("[ParquetReader] Cannot build reader: {_e}");
525            return None;
526        }
527    };
528
529    let col_name = column_name.to_string();
530    let scanned_iterator = reader.scan(false, move |errored, record_batch_result| {
531        if *errored {
532            return None;
533        }
534
535        match record_batch_result {
536            Ok(record_batch) => {
537                let col_array = record_batch.column_by_name(&col_name)?;
538
539                if let Some(values) = col_array
540                    .as_any()
541                    .downcast_ref::<arrow::array::PrimitiveArray<I>>()
542                {
543                    let mut col_vec = Vec::with_capacity(values.len());
544                    for i in 0..values.len() {
545                        if !values.is_null(i) {
546                            col_vec.push(values.value(i));
547                        }
548                    }
549
550                    Some(col_vec)
551                } else {
552                    *errored = true;
553                    None
554                }
555            }
556            Err(e) => {
557                eprintln!("[ParquetReader] Parquet read error: {}", e);
558                *errored = true;
559                None
560            }
561        }
562    });
563
564    Some((total_rows, scanned_iterator))
565}
566
567/// Read only specified column from parquet file with default configuration (verbose enabled).
568pub fn read_parquet_columns<I>(
569    file_path: &str,
570    column_name: &str,
571    batch_size: Option<usize>,
572) -> Option<(usize, impl Iterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
573where
574    I: ArrowPrimitiveType,
575{
576    read_parquet_columns_with_config::<I>(
577        file_path,
578        column_name,
579        batch_size,
580        &ParquetRecordConfig::default(),
581    )
582}
583
584/// Read only specified column from parquet in parallel with the provided configuration using Rayon.
585pub fn read_parquet_columns_with_config_par<I>(
586    file_path: &str,
587    column_name: &str,
588    batch_size: Option<usize>,
589    _config: &ParquetRecordConfig,
590) -> Option<(usize, impl ParallelIterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
591where
592    I: ArrowPrimitiveType + Send + Sync,
593{
594    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
595    use rayon::prelude::*;
596    use std::fs::File;
597
598    let file = File::open(file_path).ok()?;
599    let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
600    let metadata = builder.metadata();
601    let total_rows = metadata.file_metadata().num_rows() as usize;
602    let num_row_groups = metadata.num_row_groups();
603
604    let file_path = file_path.to_string();
605    let column_name = column_name.to_string();
606
607    let iterator = (0..num_row_groups)
608        .into_par_iter()
609        .filter_map(move |row_group_idx| {
610            let file = File::open(&file_path).ok()?;
611            let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
612            let actual_batch_size = batch_size.unwrap_or_else(|| {
613                std::cmp::max(1, builder.metadata().file_metadata().num_rows() as usize)
614            });
615
616            let reader = builder
617                .with_batch_size(actual_batch_size)
618                .with_row_groups(vec![row_group_idx])
619                .build()
620                .ok()?;
621
622            let mut all_column_values = Vec::new();
623
624            for batch_result in reader {
625                let batch = batch_result.ok()?;
626                let col_array = batch.column_by_name(&column_name)?;
627                let values = col_array
628                    .as_any()
629                    .downcast_ref::<arrow::array::PrimitiveArray<I>>()?;
630
631                for i in 0..values.len() {
632                    if !values.is_null(i) {
633                        all_column_values.push(values.value(i));
634                    }
635                }
636            }
637
638            if all_column_values.is_empty() {
639                None
640            } else {
641                Some(all_column_values)
642            }
643        });
644
645    Some((total_rows, iterator))
646}
647
648
649/// Read only specified column from parquet in parallel with default configuration (verbose enabled) using Rayon.
650pub fn read_parquet_columns_par<I>(
651    file_path: &str,
652    column_name: &str,
653    batch_size: Option<usize>,
654) -> Option<(usize, impl ParallelIterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
655where
656    I: ArrowPrimitiveType + Sync + Send,
657{
658    read_parquet_columns_with_config_par::<I>(
659        file_path,
660        column_name,
661        batch_size,
662        &ParquetRecordConfig::default(),
663    )
664}
665
666/// Read parquet in parallel with the provided configuration using Rayon.
667pub fn read_parquet_with_config_par<T>(
668    _schema: Arc<Schema>,
669    file_path: &str,
670    batch_size: Option<usize>,
671    _config: &ParquetRecordConfig,
672) -> Option<(usize, impl ParallelIterator<Item = Vec<T>>)>
673where
674    T: ParquetRecord + 'static,
675{
676    let file = File::open(file_path).ok()?;
677    let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
678    let metadata = builder.metadata();
679    let total_rows = metadata.file_metadata().num_rows() as usize;
680    let num_row_groups = metadata.num_row_groups();
681
682    let row_group_indices = 0..num_row_groups;
683
684    let file_path = file_path.to_string();
685
686    let iterator = row_group_indices
687        .into_par_iter()
688        .filter_map(move |row_group_idx| RowGroupReader::new(row_group_idx, &file_path, batch_size))
689        .flat_map(|reader| reader);
690
691    Some((total_rows, iterator))
692}
693
694pub struct RowGroupReader<T: ParquetRecord> {
695    reader: ParquetRecordBatchReader,
696    _phantom: std::marker::PhantomData<T>,
697}
698
699impl<T: ParquetRecord> RowGroupReader<T> {
700    pub fn new(row_group_idx: usize, file_path: &str, batch_size: Option<usize>) -> Option<Self> {
701        let file = File::open(file_path).ok()?;
702        let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
703
704        let actual_batch_size = batch_size.unwrap_or_else(|| {
705            std::cmp::max(1, builder.metadata().file_metadata().num_rows() as usize)
706        });
707
708        let reader = builder
709            .with_batch_size(actual_batch_size)
710            .with_row_groups(vec![row_group_idx])
711            .build()
712            .ok()?;
713
714        Some(Self {
715            reader,
716            _phantom: std::marker::PhantomData,
717        })
718    }
719}
720
721impl<T: ParquetRecord> ParallelIterator for RowGroupReader<T> {
722    type Item = Vec<T>;
723
724    fn drive_unindexed<C>(self, consumer: C) -> C::Result
725    where
726        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>
727    {
728        let par_iter = self.reader
729            .par_bridge()
730            .filter_map(process_batch_result::<T>);
731
732        par_iter.drive_unindexed(consumer)
733    }
734}
735
736fn process_batch_result<T: ParquetRecord>(
737    batch: Result<arrow::record_batch::RecordBatch, arrow::error::ArrowError>
738) -> Option<Vec<T>>
739{
740    match batch {
741        Ok(batch) => T::records_to_items(&batch).ok(),
742        Err(_) => None,
743    }
744}
745
746
747/// Read parquet in parallel with default configuration (verbose enabled) using Rayon.
748pub fn read_parquet_par<T>(
749    schema: Arc<Schema>,
750    file_path: &str,
751    batch_size: Option<usize>,
752) -> Option<(usize, impl ParallelIterator<Item = Vec<T>>)>
753where
754    T: ParquetRecord + 'static,
755{
756    read_parquet_with_config_par(
757        schema,
758        file_path,
759        batch_size,
760        &ParquetRecordConfig::default(),
761    )
762}