1use arrow::array::{Array, RecordBatch};
2use arrow::datatypes::ArrowPrimitiveType;
3use arrow::datatypes::Schema;
4use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
5use parquet::arrow::ArrowWriter;
6use parquet::arrow::ProjectionMask;
7use rayon::prelude::*;
8use rayon::iter::ParallelBridge;
9use std::fs::File;
10use std::{fs, io};
11use std::io::BufWriter;
12use std::path::Path;
13use std::sync::{Arc, Mutex, RwLock};
14
15#[derive(Debug, Clone)]
16pub struct ParquetRecordConfig {
17 pub verbose: bool,
18}
19
20impl ParquetRecordConfig {
21 pub fn with_verbose(verbose: bool) -> Self {
22 Self { verbose }
23 }
24
25 pub fn silent() -> Self {
26 Self { verbose: false }
27 }
28}
29
30impl Default for ParquetRecordConfig {
31 fn default() -> Self {
32 Self { verbose: true }
33 }
34}
35
36pub trait ParquetRecord: Send + Sync {
38 fn schema() -> Arc<Schema>;
40
41 fn items_to_records(schema: Arc<Schema>, items: &[Self]) -> RecordBatch
43 where
44 Self: Sized;
45
46 fn records_to_items(record_batch: &RecordBatch) -> io::Result<Vec<Self>>
48 where
49 Self: Sized;
50}
51
52#[derive(Debug)]
53pub struct BatchBuffer<T: ParquetRecord> {
54 pub items: Vec<T>,
55}
56
57impl<T: ParquetRecord> Default for BatchBuffer<T> {
58 fn default() -> Self {
59 Self {
60 items: Vec::new(),
61 }
62 }
63}
64
65impl<T: ParquetRecord> BatchBuffer<T> {
66 pub fn new(capacity: usize) -> Self {
67 Self {
68 items: Vec::with_capacity(capacity),
69 }
70 }
71
72 pub fn clear(&mut self) {
73 self.items.clear();
74 }
75}
76
77#[derive(Debug, Default, Clone)]
79pub struct WriteStats {
80 pub total_items_written: usize,
81 pub total_batches_written: usize,
82 pub total_bytes_written: usize,
83}
84
85#[derive(Debug)]
86pub struct ParquetBatchWriter<T: ParquetRecord + Clone> {
87 buffer: Mutex<BatchBuffer<T>>,
88 buffer_size: usize,
89 file_writer: Mutex<Option<Box<ArrowWriter<BufWriter<File>>>>>, schema: Arc<Schema>,
91 output_file: String,
92 config: ParquetRecordConfig,
93 stats: Mutex<WriteStats>,
94 closed: RwLock<bool>,
95}
96
97impl<T: ParquetRecord + Clone> ParquetBatchWriter<T> {
98 pub fn new(output_file: String, buffer_size: Option<usize>) -> Self {
99 Self::with_config(output_file, buffer_size, ParquetRecordConfig::default())
100 }
101
102 pub fn with_config(
103 output_file: String,
104 buffer_size: Option<usize>,
105 config: ParquetRecordConfig,
106 ) -> Self {
107 Self {
108 buffer: Mutex::new(BatchBuffer::new(buffer_size.unwrap_or(1024))),
109 buffer_size: buffer_size.unwrap_or(usize::MAX),
110 file_writer: Mutex::new(None),
111 schema: T::schema(),
112 output_file,
113 config,
114 stats: Mutex::new(WriteStats::default()),
115 closed: RwLock::new(false),
116 }
117 }
118
119 pub fn add_items(&self, items: &[T]) -> Result<(), io::Error> {
122 let closed_guard = self.closed.read().unwrap();
123 if *closed_guard {
124 return Err(io::Error::other("already closed"));
125 }
126 if self.buffer_size == usize::MAX {
127 let mut buffer_guard = self
128 .buffer
129 .lock()
130 .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
131 buffer_guard.items.extend_from_slice(items);
132 Ok(())
133 } else {
134 let mut remaining_items = items;
135 while !remaining_items.is_empty() {
136 let mut buffer_guard = self
137 .buffer
138 .lock()
139 .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
140
141 let available_space = self.buffer_size - buffer_guard.items.len();
142
143 if available_space == 0 {
144 let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
145 std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
146 drop(buffer_guard);
147
148 self.write_buffer_to_disk(secondary_buffer)?;
149 continue;
150 }
151
152 let take_count = std::cmp::min(available_space, remaining_items.len());
153 let (items_to_add, remaining) = remaining_items.split_at(take_count);
154 buffer_guard.items.extend_from_slice(items_to_add);
155 remaining_items = remaining;
156
157 if buffer_guard.items.len() >= self.buffer_size {
158 let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
159 std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
160 drop(buffer_guard);
161
162 self.write_buffer_to_disk(secondary_buffer)?;
163 }
164 }
165 Ok(())
166 }
167 }
168
169 pub fn add_item(&self, item: T) -> Result<(), io::Error> {
170 let closed_guard = self.closed.read().unwrap();
171 if *closed_guard {
172 return Err(io::Error::other("already closed"));
173 }
174 if self.buffer_size == usize::MAX {
175 let mut buffer_guard = self
176 .buffer
177 .lock()
178 .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
179 buffer_guard.items.push(item);
180 Ok(())
181 } else {
182 let mut buffer_guard = self
183 .buffer
184 .lock()
185 .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
186
187 if buffer_guard.items.len() >= self.buffer_size {
188 let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
189 std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
190 drop(buffer_guard);
191
192 self.write_buffer_to_disk(secondary_buffer)?;
193
194 let mut buffer_guard = self
195 .buffer
196 .lock()
197 .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
198 buffer_guard.items.push(item);
199 } else {
200 buffer_guard.items.push(item);
201
202 if buffer_guard.items.len() >= self.buffer_size {
203 let mut secondary_buffer = BatchBuffer::new(self.buffer_size);
204 std::mem::swap(&mut *buffer_guard, &mut secondary_buffer);
205 drop(buffer_guard);
206
207 self.write_buffer_to_disk(secondary_buffer)?;
208 }
209 }
210 Ok(())
211 }
212 }
213
214 fn write_buffer_to_disk(&self, buffer: BatchBuffer<T>) -> Result<(), io::Error> {
217 if buffer.items.is_empty() {
218 return Ok(());
219 }
220
221 let mut writer_guard = self
222 .file_writer
223 .lock()
224 .map_err(|_| io::Error::other("Writer lock poisoned"))?;
225
226 let record_batch = T::items_to_records(self.schema.clone(), &buffer.items);
227
228 let batch_size_bytes = self.calculate_batch_size(&record_batch);
229
230 if writer_guard.is_none() {
231 let path = Path::new(&self.output_file);
232 if let Some(parent) = path.parent() {
233 fs::create_dir_all(parent)?;
234 }
235 let file = File::create(path).map_err(|e| {
236 io::Error::other(format!("File creation error: {}", e))
237 })?;
238 let buf_writer = BufWriter::new(file);
239
240 let writer = ArrowWriter::try_new(buf_writer, record_batch.schema(), None)
241 .map_err(|e| {
242 io::Error::other(
243 format!("ArrowWriter creation error: {}", e),
244 )
245 })?;
246 let mut boxed_writer = Box::new(writer);
247
248 boxed_writer.write(&record_batch).map_err(|e| {
249 io::Error::other(format!("Initial write error: {}", e))
250 })?;
251
252 *writer_guard = Some(boxed_writer);
253 } else if let Some(ref mut writer) = *writer_guard {
254 writer.write(&record_batch).map_err(|e| {
255 io::Error::other(format!("Parquet write error: {}", e))
256 })?;
257 }
258
259 let mut stats_guard = self
260 .stats
261 .lock()
262 .map_err(|_| io::Error::other("Stats lock poisoned"))?;
263
264 stats_guard.total_items_written += buffer.items.len();
265 stats_guard.total_batches_written += 1;
266 stats_guard.total_bytes_written += batch_size_bytes;
267
268 if self.config.verbose {
269 let mb_written = (batch_size_bytes as f64 / (1024.0 * 1024.0)).ceil() as usize;
270 let total_mb_written =
271 (stats_guard.total_bytes_written as f64 / (1024.0 * 1024.0)).ceil() as usize;
272 println!(
273 "[ParquetWriter {}] Wrote batch of {} items ({} MB) (batch #{}) Total: {} records / {} MB",
274 self.output_file,
275 buffer.items.len(),
276 mb_written,
277 stats_guard.total_batches_written,
278 stats_guard.total_items_written,
279 total_mb_written
280 );
281 }
282
283 Ok(())
284 }
285
286 pub fn flush(&self) -> Result<(), io::Error> {
288 let mut buffer_guard = self
289 .buffer
290 .lock()
291 .map_err(|_| io::Error::other("Buffer lock poisoned"))?;
292
293 if buffer_guard.items.is_empty() {
294 return Ok(());
295 }
296
297 let buffer_to_write = std::mem::take(&mut *buffer_guard);
298 drop(buffer_guard);
299
300 self.write_buffer_to_disk(buffer_to_write)
301 }
302
303 pub fn buffer_len(&self) -> usize {
305 match self.buffer.lock() {
306 Ok(guard) => guard.items.len(),
307 Err(_) => 0,
308 }
309 }
310
311 pub fn close(self) -> Result<(), io::Error> {
312 self.close_no_consume()
313 }
314
315 pub fn close_no_consume(&self) -> Result<(), io::Error> {
317 let mut closed_guard = self.closed.write().unwrap();
318 if *closed_guard {
319 return Ok(())
320 }
321 *closed_guard = true;
322 self.flush()?;
324
325 if self.config.verbose {
327 let stats_guard = self
328 .stats
329 .lock()
330 .map_err(|_| io::Error::other("Stats lock poisoned"))?;
331 let total_mb_written =
332 (stats_guard.total_bytes_written as f64 / (1024.0 * 1024.0)).ceil() as usize;
333 println!(
334 "[ParquetWriter {}] Final stats - Total items: {}, Total batches: {}, Total MB: {}",
335 self.output_file,
336 stats_guard.total_items_written,
337 stats_guard.total_batches_written,
338 total_mb_written
339 );
340 }
341
342 let maybe_writer = {
343 let mut writer_guard = self
344 .file_writer
345 .lock()
346 .map_err(|_| io::Error::other("Writer lock poisoned"))?;
347 writer_guard.take()
348 };
349
350 if let Some(writer) = maybe_writer {
351 writer.close().map_err(|e| {
352 io::Error::other(format!("Writer close error: {}", e))
353 })?;
354 }
355
356 Ok(())
357 }
358
359 pub fn get_stats(&self) -> Result<WriteStats, io::Error> {
361 let stats_guard = self
362 .stats
363 .lock()
364 .map_err(|_| io::Error::other("Stats lock poisoned"))?;
365 Ok(stats_guard.clone())
366 }
367
368 fn calculate_batch_size(&self, record_batch: &RecordBatch) -> usize {
370 let mut total_size = 0;
371
372 for column in record_batch.columns() {
373 total_size += column.get_array_memory_size();
374 }
375
376 total_size
377 }
378}
379
380impl<T:ParquetRecord + Clone>Drop for ParquetBatchWriter<T> {
381 fn drop(&mut self) {
382 self.close_no_consume().expect("Could not close ParquetBatchWriter during drop");
383 }
384}
385
386pub fn read_parquet_with_config<T>(
388 _schema: Arc<Schema>,
389 file_path: &str,
390 batch_size: Option<usize>,
391 _config: &ParquetRecordConfig,
392) -> Option<(usize, impl Iterator<Item = Vec<T>>)>
393where
394 T: ParquetRecord,
395{
396 let file = match File::open(file_path) {
397 Ok(f) => f,
398 Err(_e) => {
399 return None;
400 }
401 };
402
403 let builder = match ParquetRecordBatchReaderBuilder::try_new(file) {
404 Ok(b) => b,
405 Err(e) => {
406 eprintln!("[ParquetReader] Cannot create reader: {}", e);
407 return None;
408 }
409 };
410
411 let metadata = builder.metadata();
412 let total_rows = metadata.file_metadata().num_rows() as usize;
413
414 let actual_batch_size = batch_size.unwrap_or_else(|| std::cmp::max(1, total_rows));
415 let reader = match builder.with_batch_size(actual_batch_size).build() {
416 Ok(r) => r,
417 Err(_e) => {
418 eprintln!("[ParquetReader] Cannot build reader: {_e}");
419 return None;
420 }
421 };
422
423 let scanned_iterator = reader.scan(false, |errored, record_batch_result| {
424 if *errored {
425 return None;
426 }
427
428 match record_batch_result {
429 Ok(record_batch) => {
430 match T::records_to_items(&record_batch) {
431 Ok(vec_t) => Some(vec_t),
432 Err(e) => {
433 eprintln!("[ParquetReader] Error converting batch: {}", e);
434 *errored = true;
435 None
436 }
437 }
438 }
439 Err(e) => {
440 eprintln!("[ParquetReader] Parquet read error: {}", e);
441 *errored = true;
442 None
443 }
444 }
445 });
446
447 Some((total_rows, scanned_iterator))
448}
449
450pub fn read_parquet<T>(
452 schema: Arc<Schema>,
453 file_path: &str,
454 batch_size: Option<usize>,
455) -> Option<(usize, impl Iterator<Item = Vec<T>>)>
456where
457 T: ParquetRecord,
458{
459 read_parquet_with_config(
460 schema,
461 file_path,
462 batch_size,
463 &ParquetRecordConfig::default(),
464 )
465}
466
467pub fn read_parquet_columns_with_config<I>(
471 file_path: &str,
472 column_name: &str,
473 batch_size: Option<usize>,
474 _config: &ParquetRecordConfig,
475) -> Option<(usize, impl Iterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
476where
477 I: ArrowPrimitiveType,
478{
479 let file = match File::open(file_path) {
480 Ok(f) => f,
481 Err(e) => {
482 eprintln!("[ParquetReader] Cannot open file {}: {}", file_path, e);
483 return None;
484 }
485 };
486
487 let builder = match ParquetRecordBatchReaderBuilder::try_new(file) {
488 Ok(b) => b,
489 Err(e) => {
490 eprintln!("[ParquetReader] Cannot create reader: {}", e);
491 return None;
492 }
493 };
494
495 let metadata = builder.metadata();
496 let total_rows = metadata.file_metadata().num_rows() as usize;
497
498 let column_indices: Vec<usize> = builder
499 .parquet_schema()
500 .columns()
501 .iter()
502 .enumerate()
503 .filter_map(|(i, col)| {
504 if col.name() == column_name {
505 Some(i)
506 } else {
507 None
508 }
509 })
510 .collect();
511
512 if column_indices.is_empty() {
513 eprintln!("[ParquetReader] Column '{}' not found in parquet file schema", column_name);
514 return None;
515 }
516
517 let mask = ProjectionMask::roots(builder.parquet_schema(), column_indices);
518 let builder = builder.with_projection(mask);
519
520 let actual_batch_size = batch_size.unwrap_or_else(|| std::cmp::max(1, total_rows));
521 let reader = match builder.with_batch_size(actual_batch_size).build() {
522 Ok(r) => r,
523 Err(_e) => {
524 eprintln!("[ParquetReader] Cannot build reader: {_e}");
525 return None;
526 }
527 };
528
529 let col_name = column_name.to_string();
530 let scanned_iterator = reader.scan(false, move |errored, record_batch_result| {
531 if *errored {
532 return None;
533 }
534
535 match record_batch_result {
536 Ok(record_batch) => {
537 let col_array = record_batch.column_by_name(&col_name)?;
538
539 if let Some(values) = col_array
540 .as_any()
541 .downcast_ref::<arrow::array::PrimitiveArray<I>>()
542 {
543 let mut col_vec = Vec::with_capacity(values.len());
544 for i in 0..values.len() {
545 if !values.is_null(i) {
546 col_vec.push(values.value(i));
547 }
548 }
549
550 Some(col_vec)
551 } else {
552 *errored = true;
553 None
554 }
555 }
556 Err(e) => {
557 eprintln!("[ParquetReader] Parquet read error: {}", e);
558 *errored = true;
559 None
560 }
561 }
562 });
563
564 Some((total_rows, scanned_iterator))
565}
566
567pub fn read_parquet_columns<I>(
569 file_path: &str,
570 column_name: &str,
571 batch_size: Option<usize>,
572) -> Option<(usize, impl Iterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
573where
574 I: ArrowPrimitiveType,
575{
576 read_parquet_columns_with_config::<I>(
577 file_path,
578 column_name,
579 batch_size,
580 &ParquetRecordConfig::default(),
581 )
582}
583
584pub fn read_parquet_columns_with_config_par<I>(
586 file_path: &str,
587 column_name: &str,
588 batch_size: Option<usize>,
589 _config: &ParquetRecordConfig,
590) -> Option<(usize, impl ParallelIterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
591where
592 I: ArrowPrimitiveType + Send + Sync,
593{
594 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
595 use rayon::prelude::*;
596 use std::fs::File;
597
598 let file = File::open(file_path).ok()?;
599 let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
600 let metadata = builder.metadata();
601 let total_rows = metadata.file_metadata().num_rows() as usize;
602 let num_row_groups = metadata.num_row_groups();
603
604 let file_path = file_path.to_string();
605 let column_name = column_name.to_string();
606
607 let iterator = (0..num_row_groups)
608 .into_par_iter()
609 .filter_map(move |row_group_idx| {
610 let file = File::open(&file_path).ok()?;
611 let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
612 let actual_batch_size = batch_size.unwrap_or_else(|| {
613 std::cmp::max(1, builder.metadata().file_metadata().num_rows() as usize)
614 });
615
616 let reader = builder
617 .with_batch_size(actual_batch_size)
618 .with_row_groups(vec![row_group_idx])
619 .build()
620 .ok()?;
621
622 let mut all_column_values = Vec::new();
623
624 for batch_result in reader {
625 let batch = batch_result.ok()?;
626 let col_array = batch.column_by_name(&column_name)?;
627 let values = col_array
628 .as_any()
629 .downcast_ref::<arrow::array::PrimitiveArray<I>>()?;
630
631 for i in 0..values.len() {
632 if !values.is_null(i) {
633 all_column_values.push(values.value(i));
634 }
635 }
636 }
637
638 if all_column_values.is_empty() {
639 None
640 } else {
641 Some(all_column_values)
642 }
643 });
644
645 Some((total_rows, iterator))
646}
647
648
649pub fn read_parquet_columns_par<I>(
651 file_path: &str,
652 column_name: &str,
653 batch_size: Option<usize>,
654) -> Option<(usize, impl ParallelIterator<Item = Vec<<I as ArrowPrimitiveType>::Native>>)>
655where
656 I: ArrowPrimitiveType + Sync + Send,
657{
658 read_parquet_columns_with_config_par::<I>(
659 file_path,
660 column_name,
661 batch_size,
662 &ParquetRecordConfig::default(),
663 )
664}
665
666pub fn read_parquet_with_config_par<T>(
668 _schema: Arc<Schema>,
669 file_path: &str,
670 batch_size: Option<usize>,
671 _config: &ParquetRecordConfig,
672) -> Option<(usize, impl ParallelIterator<Item = Vec<T>>)>
673where
674 T: ParquetRecord + 'static,
675{
676 let file = File::open(file_path).ok()?;
677 let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
678 let metadata = builder.metadata();
679 let total_rows = metadata.file_metadata().num_rows() as usize;
680 let num_row_groups = metadata.num_row_groups();
681
682 let row_group_indices = 0..num_row_groups;
683
684 let file_path = file_path.to_string();
685
686 let iterator = row_group_indices
687 .into_par_iter()
688 .filter_map(move |row_group_idx| RowGroupReader::new(row_group_idx, &file_path, batch_size))
689 .flat_map(|reader| reader);
690
691 Some((total_rows, iterator))
692}
693
694pub struct RowGroupReader<T: ParquetRecord> {
695 reader: ParquetRecordBatchReader,
696 _phantom: std::marker::PhantomData<T>,
697}
698
699impl<T: ParquetRecord> RowGroupReader<T> {
700 pub fn new(row_group_idx: usize, file_path: &str, batch_size: Option<usize>) -> Option<Self> {
701 let file = File::open(file_path).ok()?;
702 let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
703
704 let actual_batch_size = batch_size.unwrap_or_else(|| {
705 std::cmp::max(1, builder.metadata().file_metadata().num_rows() as usize)
706 });
707
708 let reader = builder
709 .with_batch_size(actual_batch_size)
710 .with_row_groups(vec![row_group_idx])
711 .build()
712 .ok()?;
713
714 Some(Self {
715 reader,
716 _phantom: std::marker::PhantomData,
717 })
718 }
719}
720
721impl<T: ParquetRecord> ParallelIterator for RowGroupReader<T> {
722 type Item = Vec<T>;
723
724 fn drive_unindexed<C>(self, consumer: C) -> C::Result
725 where
726 C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>
727 {
728 let par_iter = self.reader
729 .par_bridge()
730 .filter_map(process_batch_result::<T>);
731
732 par_iter.drive_unindexed(consumer)
733 }
734}
735
736fn process_batch_result<T: ParquetRecord>(
737 batch: Result<arrow::record_batch::RecordBatch, arrow::error::ArrowError>
738) -> Option<Vec<T>>
739{
740 match batch {
741 Ok(batch) => T::records_to_items(&batch).ok(),
742 Err(_) => None,
743 }
744}
745
746
747pub fn read_parquet_par<T>(
749 schema: Arc<Schema>,
750 file_path: &str,
751 batch_size: Option<usize>,
752) -> Option<(usize, impl ParallelIterator<Item = Vec<T>>)>
753where
754 T: ParquetRecord + 'static,
755{
756 read_parquet_with_config_par(
757 schema,
758 file_path,
759 batch_size,
760 &ParquetRecordConfig::default(),
761 )
762}