1use std::{path::Path, sync::Arc};
7
8use arrow::{array::RecordBatch, datatypes::SchemaRef};
9use parquet::{
10 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
11 file::properties::WriterProperties,
12};
13
14use crate::{
15 error::{Error, Result},
16 transform::Transform,
17};
18
19pub trait Dataset: Send + Sync {
24 fn len(&self) -> usize;
26
27 fn is_empty(&self) -> bool {
29 self.len() == 0
30 }
31
32 fn get(&self, index: usize) -> Option<RecordBatch>;
36
37 fn schema(&self) -> SchemaRef;
39
40 fn iter(&self) -> Box<dyn Iterator<Item = RecordBatch> + Send + '_>;
42
43 fn num_batches(&self) -> usize;
45
46 fn get_batch(&self, index: usize) -> Option<&RecordBatch>;
48}
49
50#[derive(Debug, Clone)]
66pub struct ArrowDataset {
67 batches: Vec<RecordBatch>,
68 schema: SchemaRef,
69 row_count: usize,
70}
71
72impl ArrowDataset {
73 pub fn new(batches: Vec<RecordBatch>) -> Result<Self> {
81 if batches.is_empty() {
82 return Err(Error::EmptyDataset);
83 }
84
85 let schema = batches[0].schema();
86
87 for (i, batch) in batches.iter().enumerate().skip(1) {
89 if batch.schema() != schema {
90 return Err(Error::schema_mismatch(format!(
91 "Batch {} has different schema than batch 0",
92 i
93 )));
94 }
95 }
96
97 let row_count = batches.iter().map(|b| b.num_rows()).sum();
98
99 Ok(Self {
100 batches,
101 schema,
102 row_count,
103 })
104 }
105
106 pub fn from_batch(batch: RecordBatch) -> Result<Self> {
112 Self::new(vec![batch])
113 }
114
115 pub fn from_parquet(path: impl AsRef<Path>) -> Result<Self> {
124 let path = path.as_ref();
125 let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
126
127 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(Error::Parquet)?;
128
129 let reader = builder.build().map_err(Error::Parquet)?;
130
131 let batches: Vec<RecordBatch> = reader
132 .collect::<std::result::Result<Vec<_>, _>>()
133 .map_err(Error::Arrow)?;
134
135 if batches.is_empty() {
136 return Err(Error::EmptyDataset);
137 }
138
139 Self::new(batches)
140 }
141
142 pub fn to_parquet(&self, path: impl AsRef<Path>) -> Result<()> {
150 let path = path.as_ref();
151 let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
152
153 let props = WriterProperties::builder().build();
154 let mut writer =
155 ArrowWriter::try_new(file, self.schema.clone(), Some(props)).map_err(Error::Parquet)?;
156
157 for batch in &self.batches {
158 writer.write(batch).map_err(Error::Parquet)?;
159 }
160
161 writer.close().map_err(Error::Parquet)?;
162 Ok(())
163 }
164
165 pub fn from_ipc(path: impl AsRef<Path>) -> Result<Self> {
189 use arrow::ipc::reader::FileReader;
190
191 let path = path.as_ref();
192 let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
193
194 let reader = FileReader::try_new(file, None).map_err(Error::Arrow)?;
195
196 let batches: Vec<RecordBatch> = reader
197 .collect::<std::result::Result<Vec<_>, _>>()
198 .map_err(Error::Arrow)?;
199
200 if batches.is_empty() {
201 return Err(Error::EmptyDataset);
202 }
203
204 Self::new(batches)
205 }
206
207 pub fn to_ipc(&self, path: impl AsRef<Path>) -> Result<()> {
226 use arrow::ipc::writer::FileWriter;
227
228 let path = path.as_ref();
229 let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
230
231 let mut writer = FileWriter::try_new(file, &self.schema).map_err(Error::Arrow)?;
232
233 for batch in &self.batches {
234 writer.write(batch).map_err(Error::Arrow)?;
235 }
236
237 writer.finish().map_err(Error::Arrow)?;
238 Ok(())
239 }
240
241 pub fn from_ipc_stream(path: impl AsRef<Path>) -> Result<Self> {
256 use arrow::ipc::reader::StreamReader;
257
258 let path = path.as_ref();
259 let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
260
261 let reader = StreamReader::try_new(file, None).map_err(Error::Arrow)?;
262
263 let batches: Vec<RecordBatch> = reader
264 .collect::<std::result::Result<Vec<_>, _>>()
265 .map_err(Error::Arrow)?;
266
267 if batches.is_empty() {
268 return Err(Error::EmptyDataset);
269 }
270
271 Self::new(batches)
272 }
273
274 pub fn to_ipc_stream(&self, path: impl AsRef<Path>) -> Result<()> {
289 use arrow::ipc::writer::StreamWriter;
290
291 let path = path.as_ref();
292 let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
293
294 let mut writer = StreamWriter::try_new(file, &self.schema).map_err(Error::Arrow)?;
295
296 for batch in &self.batches {
297 writer.write(batch).map_err(Error::Arrow)?;
298 }
299
300 writer.finish().map_err(Error::Arrow)?;
301 Ok(())
302 }
303
304 pub fn from_csv(path: impl AsRef<Path>) -> Result<Self> {
317 Self::from_csv_with_options(path, CsvOptions::default())
318 }
319
320 pub fn from_csv_with_options(path: impl AsRef<Path>, options: CsvOptions) -> Result<Self> {
331 use std::io::{BufReader, Seek, SeekFrom};
332
333 use arrow_csv::{reader::Format, ReaderBuilder};
334
335 let path = path.as_ref();
336 let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
337 let mut buf_reader = BufReader::new(file);
338
339 let schema = if let Some(schema) = options.schema {
341 Arc::new(schema)
342 } else {
343 let mut format = Format::default().with_header(options.has_header);
345 if let Some(delim) = options.delimiter {
346 format = format.with_delimiter(delim);
347 }
348 let (inferred, _) = format
349 .infer_schema(&mut buf_reader, Some(1000))
350 .map_err(Error::Arrow)?;
351
352 buf_reader
354 .seek(SeekFrom::Start(0))
355 .map_err(|e| Error::io(e, path))?;
356
357 Arc::new(inferred)
358 };
359
360 let mut builder = ReaderBuilder::new(schema)
361 .with_batch_size(options.batch_size)
362 .with_header(options.has_header);
363
364 if let Some(delim) = options.delimiter {
365 builder = builder.with_delimiter(delim);
366 }
367
368 let reader = builder.build(buf_reader).map_err(Error::Arrow)?;
369
370 let batches: Vec<RecordBatch> = reader
371 .collect::<std::result::Result<Vec<_>, _>>()
372 .map_err(Error::Arrow)?;
373
374 if batches.is_empty() {
375 return Err(Error::EmptyDataset);
376 }
377
378 Self::new(batches)
379 }
380
381 pub fn to_csv(&self, path: impl AsRef<Path>) -> Result<()> {
387 use arrow_csv::WriterBuilder;
388
389 let path = path.as_ref();
390 let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
391
392 let mut writer = WriterBuilder::new().with_header(true).build(file);
393
394 for batch in &self.batches {
395 writer.write(batch).map_err(Error::Arrow)?;
396 }
397
398 Ok(())
399 }
400
401 pub fn from_json(path: impl AsRef<Path>) -> Result<Self> {
409 Self::from_json_with_options(path, JsonOptions::default())
410 }
411
412 pub fn from_json_with_options(path: impl AsRef<Path>, options: JsonOptions) -> Result<Self> {
418 use std::io::BufReader;
419
420 use arrow_json::ReaderBuilder;
421
422 let path = path.as_ref();
423
424 let schema = if let Some(schema) = options.schema {
426 Arc::new(schema)
427 } else {
428 let infer_file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
430 let infer_reader = BufReader::new(infer_file);
431 let (inferred, _) = arrow_json::reader::infer_json_schema(infer_reader, Some(1000))
432 .map_err(Error::Arrow)?;
433 Arc::new(inferred)
434 };
435
436 let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
438 let buf_reader = BufReader::new(file);
439
440 let builder = ReaderBuilder::new(schema).with_batch_size(options.batch_size);
441 let reader = builder.build(buf_reader).map_err(Error::Arrow)?;
442
443 let batches: Vec<RecordBatch> = reader
444 .collect::<std::result::Result<Vec<_>, _>>()
445 .map_err(Error::Arrow)?;
446
447 if batches.is_empty() {
448 return Err(Error::EmptyDataset);
449 }
450
451 Self::new(batches)
452 }
453
454 pub fn to_json(&self, path: impl AsRef<Path>) -> Result<()> {
462 use std::io::BufWriter;
463
464 use arrow_json::LineDelimitedWriter;
465
466 let path = path.as_ref();
467 let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
468 let buf_writer = BufWriter::new(file);
469
470 let mut writer = LineDelimitedWriter::new(buf_writer);
471
472 for batch in &self.batches {
473 writer.write(batch).map_err(Error::Arrow)?;
474 }
475
476 writer.finish().map_err(Error::Arrow)?;
477
478 Ok(())
479 }
480
481 pub fn from_parquet_bytes(data: &[u8]) -> Result<Self> {
487 use bytes::Bytes;
488
489 let bytes = Bytes::copy_from_slice(data);
490
491 let builder = ParquetRecordBatchReaderBuilder::try_new(bytes).map_err(Error::Parquet)?;
492
493 let reader = builder.build().map_err(Error::Parquet)?;
494
495 let batches: Vec<RecordBatch> = reader
496 .collect::<std::result::Result<Vec<_>, _>>()
497 .map_err(Error::Arrow)?;
498
499 if batches.is_empty() {
500 return Err(Error::EmptyDataset);
501 }
502
503 Self::new(batches)
504 }
505
506 pub fn to_parquet_bytes(&self) -> Result<Vec<u8>> {
512 let mut buffer = Vec::new();
513 let cursor = std::io::Cursor::new(&mut buffer);
514
515 let props = WriterProperties::builder().build();
516 let mut writer = ArrowWriter::try_new(cursor, self.schema.clone(), Some(props))
517 .map_err(Error::Parquet)?;
518
519 for batch in &self.batches {
520 writer.write(batch).map_err(Error::Parquet)?;
521 }
522
523 writer.close().map_err(Error::Parquet)?;
524 Ok(buffer)
525 }
526
527 pub fn from_csv_str(data: &str) -> Result<Self> {
533 use std::io::Cursor;
534
535 use arrow_csv::{reader::Format, ReaderBuilder};
536
537 let mut cursor_for_infer = Cursor::new(data.as_bytes());
539 let format = Format::default().with_header(true);
540 let (inferred, _) = format
541 .infer_schema(&mut cursor_for_infer, Some(1000))
542 .map_err(Error::Arrow)?;
543
544 let schema = Arc::new(inferred);
545 let cursor = Cursor::new(data.as_bytes());
546
547 let builder = ReaderBuilder::new(schema)
548 .with_batch_size(8192)
549 .with_header(true);
550
551 let reader = builder.build(cursor).map_err(Error::Arrow)?;
552
553 let batches: Vec<RecordBatch> = reader
554 .collect::<std::result::Result<Vec<_>, _>>()
555 .map_err(Error::Arrow)?;
556
557 if batches.is_empty() {
558 return Err(Error::EmptyDataset);
559 }
560
561 Self::new(batches)
562 }
563
564 pub fn from_json_str(data: &str) -> Result<Self> {
570 use std::io::Cursor;
571
572 use arrow_json::ReaderBuilder;
573
574 let cursor_for_infer = Cursor::new(data.as_bytes());
576 let (inferred, _) = arrow_json::reader::infer_json_schema(cursor_for_infer, Some(1000))
577 .map_err(Error::Arrow)?;
578
579 let schema = Arc::new(inferred);
580 let cursor = Cursor::new(data.as_bytes());
581
582 let builder = ReaderBuilder::new(schema).with_batch_size(8192);
583 let reader = builder.build(cursor).map_err(Error::Arrow)?;
584
585 let batches: Vec<RecordBatch> = reader
586 .collect::<std::result::Result<Vec<_>, _>>()
587 .map_err(Error::Arrow)?;
588
589 if batches.is_empty() {
590 return Err(Error::EmptyDataset);
591 }
592
593 Self::new(batches)
594 }
595
596 pub fn batches(&self) -> &[RecordBatch] {
598 &self.batches
599 }
600
601 pub fn into_batches(self) -> Vec<RecordBatch> {
603 self.batches
604 }
605
606 pub fn with_transform<T: Transform>(&self, transform: &T) -> Result<Self> {
612 let new_batches: Vec<RecordBatch> = self
613 .batches
614 .iter()
615 .map(|batch| transform.apply(batch.clone()))
616 .collect::<Result<Vec<_>>>()?;
617
618 Self::new(new_batches)
619 }
620
621 pub fn rows(&self) -> RowIterator<'_> {
623 RowIterator {
624 dataset: self,
625 current_batch: 0,
626 current_row: 0,
627 }
628 }
629
630 fn find_row(&self, global_index: usize) -> Option<(usize, usize)> {
632 if global_index >= self.row_count {
633 return None;
634 }
635
636 let mut remaining = global_index;
637 for (batch_idx, batch) in self.batches.iter().enumerate() {
638 let batch_rows = batch.num_rows();
639 if remaining < batch_rows {
640 return Some((batch_idx, remaining));
641 }
642 remaining -= batch_rows;
643 }
644
645 None
646 }
647}
648
649impl Dataset for ArrowDataset {
650 fn len(&self) -> usize {
651 self.row_count
652 }
653
654 fn get(&self, index: usize) -> Option<RecordBatch> {
655 let (batch_idx, local_idx) = self.find_row(index)?;
656 let batch = &self.batches[batch_idx];
657 Some(batch.slice(local_idx, 1))
658 }
659
660 fn schema(&self) -> SchemaRef {
661 Arc::clone(&self.schema)
662 }
663
664 fn iter(&self) -> Box<dyn Iterator<Item = RecordBatch> + Send + '_> {
665 Box::new(self.batches.iter().cloned())
666 }
667
668 fn num_batches(&self) -> usize {
669 self.batches.len()
670 }
671
672 fn get_batch(&self, index: usize) -> Option<&RecordBatch> {
673 self.batches.get(index)
674 }
675}
676
677pub struct RowIterator<'a> {
679 dataset: &'a ArrowDataset,
680 current_batch: usize,
681 current_row: usize,
682}
683
684impl Iterator for RowIterator<'_> {
685 type Item = RecordBatch;
686
687 fn next(&mut self) -> Option<Self::Item> {
688 loop {
689 if self.current_batch >= self.dataset.batches.len() {
690 return None;
691 }
692
693 let batch = &self.dataset.batches[self.current_batch];
694 if self.current_row < batch.num_rows() {
695 let row = batch.slice(self.current_row, 1);
696 self.current_row += 1;
697 return Some(row);
698 }
699
700 self.current_batch += 1;
701 self.current_row = 0;
702 }
703 }
704
705 fn size_hint(&self) -> (usize, Option<usize>) {
706 let mut remaining = 0;
707 for batch in self.dataset.batches.iter().skip(self.current_batch) {
708 remaining += batch.num_rows();
709 }
710 if self.current_batch < self.dataset.batches.len() {
711 remaining -= self.current_row;
712 }
713 (remaining, Some(remaining))
714 }
715}
716
717impl ExactSizeIterator for RowIterator<'_> {}
718
719#[derive(Debug, Clone)]
721pub struct CsvOptions {
722 pub has_header: bool,
724 pub delimiter: Option<u8>,
726 pub batch_size: usize,
728 pub schema: Option<arrow::datatypes::Schema>,
730}
731
732impl Default for CsvOptions {
733 fn default() -> Self {
734 Self {
735 has_header: true,
736 delimiter: None, batch_size: 8192,
738 schema: None,
739 }
740 }
741}
742
743impl CsvOptions {
744 pub fn new() -> Self {
746 Self::default()
747 }
748
749 #[must_use]
751 pub fn with_header(mut self, has_header: bool) -> Self {
752 self.has_header = has_header;
753 self
754 }
755
756 #[must_use]
758 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
759 self.delimiter = Some(delimiter);
760 self
761 }
762
763 #[must_use]
765 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
766 self.batch_size = batch_size;
767 self
768 }
769
770 #[must_use]
772 pub fn with_schema(mut self, schema: arrow::datatypes::Schema) -> Self {
773 self.schema = Some(schema);
774 self
775 }
776}
777
778#[derive(Debug, Clone)]
780pub struct JsonOptions {
781 pub batch_size: usize,
783 pub schema: Option<arrow::datatypes::Schema>,
785}
786
787impl Default for JsonOptions {
788 fn default() -> Self {
789 Self {
790 batch_size: 8192,
791 schema: None,
792 }
793 }
794}
795
796impl JsonOptions {
797 pub fn new() -> Self {
799 Self::default()
800 }
801
802 #[must_use]
804 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
805 self.batch_size = batch_size;
806 self
807 }
808
809 #[must_use]
811 pub fn with_schema(mut self, schema: arrow::datatypes::Schema) -> Self {
812 self.schema = Some(schema);
813 self
814 }
815}
816
817#[cfg(test)]
818#[allow(
819 clippy::cast_possible_truncation,
820 clippy::cast_possible_wrap,
821 clippy::uninlined_format_args
822)]
823mod tests {
824 use std::sync::Arc;
825
826 use arrow::{
827 array::{Int32Array, StringArray},
828 datatypes::{DataType, Field, Schema},
829 };
830
831 use super::*;
832
833 fn create_test_batch(start: i32, count: usize) -> RecordBatch {
834 let schema = Arc::new(Schema::new(vec![
835 Field::new("id", DataType::Int32, false),
836 Field::new("name", DataType::Utf8, false),
837 ]));
838
839 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
840 let ids: Vec<i32> = (start..start + count as i32).collect();
841 let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
842
843 let id_array = Int32Array::from(ids);
844 let name_array = StringArray::from(names);
845
846 RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)])
847 .ok()
848 .unwrap_or_else(|| panic!("Failed to create test batch"))
849 }
850
851 #[test]
852 fn test_new_dataset() {
853 let batch = create_test_batch(0, 10);
854 let dataset = ArrowDataset::new(vec![batch]).ok();
855 assert!(dataset.is_some());
856 let dataset = dataset.unwrap_or_else(|| panic!("Dataset should be Some"));
857 assert_eq!(dataset.len(), 10);
858 }
859
860 #[test]
861 fn test_empty_dataset_error() {
862 let result = ArrowDataset::new(vec![]);
863 assert!(result.is_err());
864 if matches!(result, Err(Error::EmptyDataset)) {
865 } else {
867 panic!("Expected EmptyDataset error");
868 }
869 }
870
871 #[test]
872 fn test_from_batch() {
873 let batch = create_test_batch(0, 5);
874 let dataset = ArrowDataset::from_batch(batch).ok();
875 assert!(dataset.is_some());
876 let dataset = dataset.unwrap_or_else(|| panic!("Dataset should be Some"));
877 assert_eq!(dataset.len(), 5);
878 assert_eq!(dataset.num_batches(), 1);
879 }
880
881 #[test]
882 fn test_get_row() {
883 let batch = create_test_batch(0, 10);
884 let dataset = ArrowDataset::from_batch(batch)
885 .ok()
886 .unwrap_or_else(|| panic!("Should create dataset"));
887
888 let row = dataset.get(5);
889 assert!(row.is_some());
890 let row = row.unwrap_or_else(|| panic!("Row should exist"));
891 assert_eq!(row.num_rows(), 1);
892
893 assert!(dataset.get(100).is_none());
895 }
896
897 #[test]
898 fn test_get_row_across_batches() {
899 let batch1 = create_test_batch(0, 5);
900 let batch2 = create_test_batch(5, 5);
901 let dataset = ArrowDataset::new(vec![batch1, batch2])
902 .ok()
903 .unwrap_or_else(|| panic!("Should create dataset"));
904
905 assert_eq!(dataset.len(), 10);
906 assert_eq!(dataset.num_batches(), 2);
907
908 let row = dataset.get(3);
910 assert!(row.is_some());
911
912 let row = dataset.get(7);
914 assert!(row.is_some());
915 }
916
917 #[test]
918 fn test_iter() {
919 let batch = create_test_batch(0, 10);
920 let dataset = ArrowDataset::from_batch(batch)
921 .ok()
922 .unwrap_or_else(|| panic!("Should create dataset"));
923
924 let batches: Vec<RecordBatch> = dataset.iter().collect();
925 assert_eq!(batches.len(), 1);
926 assert_eq!(batches[0].num_rows(), 10);
927 }
928
929 #[test]
930 fn test_row_iterator() {
931 let batch = create_test_batch(0, 5);
932 let dataset = ArrowDataset::from_batch(batch)
933 .ok()
934 .unwrap_or_else(|| panic!("Should create dataset"));
935
936 let rows: Vec<RecordBatch> = dataset.rows().collect();
937 assert_eq!(rows.len(), 5);
938 for row in rows {
939 assert_eq!(row.num_rows(), 1);
940 }
941 }
942
943 #[test]
944 fn test_row_iterator_exact_size() {
945 let batch = create_test_batch(0, 10);
946 let dataset = ArrowDataset::from_batch(batch)
947 .ok()
948 .unwrap_or_else(|| panic!("Should create dataset"));
949
950 let iter = dataset.rows();
951 assert_eq!(iter.len(), 10);
952 }
953
954 #[test]
955 fn test_schema() {
956 let batch = create_test_batch(0, 5);
957 let expected_schema = batch.schema();
958 let dataset = ArrowDataset::from_batch(batch)
959 .ok()
960 .unwrap_or_else(|| panic!("Should create dataset"));
961
962 assert_eq!(dataset.schema(), expected_schema);
963 }
964
965 #[test]
966 fn test_is_empty() {
967 let batch = create_test_batch(0, 5);
968 let dataset = ArrowDataset::from_batch(batch)
969 .ok()
970 .unwrap_or_else(|| panic!("Should create dataset"));
971
972 assert!(!dataset.is_empty());
973 }
974
975 #[test]
976 fn test_get_batch() {
977 let batch1 = create_test_batch(0, 5);
978 let batch2 = create_test_batch(5, 5);
979 let dataset = ArrowDataset::new(vec![batch1, batch2])
980 .ok()
981 .unwrap_or_else(|| panic!("Should create dataset"));
982
983 assert!(dataset.get_batch(0).is_some());
984 assert!(dataset.get_batch(1).is_some());
985 assert!(dataset.get_batch(2).is_none());
986 }
987
988 #[test]
989 fn test_into_batches() {
990 let batch = create_test_batch(0, 5);
991 let dataset = ArrowDataset::from_batch(batch)
992 .ok()
993 .unwrap_or_else(|| panic!("Should create dataset"));
994
995 let batches = dataset.into_batches();
996 assert_eq!(batches.len(), 1);
997 }
998
999 #[test]
1000 fn test_parquet_roundtrip() {
1001 let batch = create_test_batch(0, 10);
1002 let dataset = ArrowDataset::from_batch(batch)
1003 .ok()
1004 .unwrap_or_else(|| panic!("Should create dataset"));
1005
1006 let temp_dir = tempfile::tempdir()
1007 .ok()
1008 .unwrap_or_else(|| panic!("Should create temp dir"));
1009 let path = temp_dir.path().join("test.parquet");
1010
1011 dataset
1012 .to_parquet(&path)
1013 .ok()
1014 .unwrap_or_else(|| panic!("Should write parquet"));
1015
1016 let loaded = ArrowDataset::from_parquet(&path)
1017 .ok()
1018 .unwrap_or_else(|| panic!("Should load parquet"));
1019
1020 assert_eq!(loaded.len(), dataset.len());
1021 assert_eq!(loaded.schema(), dataset.schema());
1022 }
1023
1024 #[test]
1025 fn test_csv_roundtrip() {
1026 let batch = create_test_batch(0, 10);
1027 let dataset = ArrowDataset::from_batch(batch)
1028 .ok()
1029 .unwrap_or_else(|| panic!("Should create dataset"));
1030
1031 let temp_dir = tempfile::tempdir()
1032 .ok()
1033 .unwrap_or_else(|| panic!("Should create temp dir"));
1034 let path = temp_dir.path().join("test.csv");
1035
1036 dataset
1037 .to_csv(&path)
1038 .ok()
1039 .unwrap_or_else(|| panic!("Should write csv"));
1040
1041 let loaded = ArrowDataset::from_csv(&path)
1042 .ok()
1043 .unwrap_or_else(|| panic!("Should load csv"));
1044
1045 assert_eq!(loaded.len(), dataset.len());
1046 }
1047
1048 #[test]
1049 fn test_ipc_roundtrip() {
1050 let batch = create_test_batch(0, 10);
1051 let dataset = ArrowDataset::from_batch(batch)
1052 .ok()
1053 .unwrap_or_else(|| panic!("Should create dataset"));
1054
1055 let temp_dir = tempfile::tempdir()
1056 .ok()
1057 .unwrap_or_else(|| panic!("Should create temp dir"));
1058 let path = temp_dir.path().join("test.arrow");
1059
1060 dataset
1061 .to_ipc(&path)
1062 .ok()
1063 .unwrap_or_else(|| panic!("Should write IPC"));
1064
1065 let loaded = ArrowDataset::from_ipc(&path)
1066 .ok()
1067 .unwrap_or_else(|| panic!("Should load IPC"));
1068
1069 assert_eq!(loaded.len(), dataset.len());
1070 assert_eq!(loaded.schema(), dataset.schema());
1071 }
1072
1073 #[test]
1074 fn test_ipc_stream_roundtrip() {
1075 let batch = create_test_batch(0, 10);
1076 let dataset = ArrowDataset::from_batch(batch)
1077 .ok()
1078 .unwrap_or_else(|| panic!("Should create dataset"));
1079
1080 let temp_dir = tempfile::tempdir()
1081 .ok()
1082 .unwrap_or_else(|| panic!("Should create temp dir"));
1083 let path = temp_dir.path().join("test.arrows");
1084
1085 dataset
1086 .to_ipc_stream(&path)
1087 .ok()
1088 .unwrap_or_else(|| panic!("Should write IPC stream"));
1089
1090 let loaded = ArrowDataset::from_ipc_stream(&path)
1091 .ok()
1092 .unwrap_or_else(|| panic!("Should load IPC stream"));
1093
1094 assert_eq!(loaded.len(), dataset.len());
1095 assert_eq!(loaded.schema(), dataset.schema());
1096 }
1097
1098 #[test]
1099 fn test_ipc_error_nonexistent() {
1100 let result = ArrowDataset::from_ipc("/nonexistent/path/to/file.arrow");
1101 assert!(result.is_err());
1102 }
1103
1104 #[test]
1105 fn test_ipc_stream_error_nonexistent() {
1106 let result = ArrowDataset::from_ipc_stream("/nonexistent/path/to/file.arrows");
1107 assert!(result.is_err());
1108 }
1109
1110 #[test]
1111 fn test_csv_options() {
1112 let options = CsvOptions::new()
1113 .with_header(true)
1114 .with_delimiter(b',')
1115 .with_batch_size(1024);
1116
1117 assert!(options.has_header);
1118 assert_eq!(options.delimiter, Some(b','));
1119 assert_eq!(options.batch_size, 1024);
1120 }
1121
1122 #[test]
1123 fn test_csv_options_default() {
1124 let options = CsvOptions::default();
1125 assert!(options.has_header);
1126 assert!(options.delimiter.is_none());
1127 assert_eq!(options.batch_size, 8192);
1128 assert!(options.schema.is_none());
1129 }
1130
1131 #[test]
1132 fn test_json_roundtrip() {
1133 let batch = create_test_batch(0, 10);
1134 let dataset = ArrowDataset::from_batch(batch)
1135 .ok()
1136 .unwrap_or_else(|| panic!("Should create dataset"));
1137
1138 let temp_dir = tempfile::tempdir()
1139 .ok()
1140 .unwrap_or_else(|| panic!("Should create temp dir"));
1141 let path = temp_dir.path().join("test.jsonl");
1142
1143 dataset
1144 .to_json(&path)
1145 .ok()
1146 .unwrap_or_else(|| panic!("Should write json"));
1147
1148 let loaded = ArrowDataset::from_json(&path)
1149 .ok()
1150 .unwrap_or_else(|| panic!("Should load json"));
1151
1152 assert_eq!(loaded.len(), dataset.len());
1153 }
1154
1155 #[test]
1156 fn test_json_options() {
1157 let options = JsonOptions::new().with_batch_size(1024);
1158
1159 assert_eq!(options.batch_size, 1024);
1160 assert!(options.schema.is_none());
1161 }
1162
1163 #[test]
1164 fn test_json_options_default() {
1165 let options = JsonOptions::default();
1166 assert_eq!(options.batch_size, 8192);
1167 assert!(options.schema.is_none());
1168 }
1169
1170 #[test]
1171 fn test_clone() {
1172 let batch = create_test_batch(0, 5);
1173 let dataset = ArrowDataset::from_batch(batch)
1174 .ok()
1175 .unwrap_or_else(|| panic!("Should create dataset"));
1176
1177 let cloned = dataset.clone();
1178 assert_eq!(cloned.len(), dataset.len());
1179 assert_eq!(cloned.schema(), dataset.schema());
1180 }
1181
1182 #[test]
1183 fn test_debug() {
1184 let batch = create_test_batch(0, 5);
1185 let dataset = ArrowDataset::from_batch(batch)
1186 .ok()
1187 .unwrap_or_else(|| panic!("Should create dataset"));
1188
1189 let debug_str = format!("{:?}", dataset);
1190 assert!(debug_str.contains("ArrowDataset"));
1191 }
1192
1193 #[test]
1194 fn test_csv_with_schema() {
1195 let schema = Schema::new(vec![
1196 Field::new("id", DataType::Int32, false),
1197 Field::new("name", DataType::Utf8, false),
1198 ]);
1199
1200 let options = CsvOptions::new().with_schema(schema);
1201 assert!(options.schema.is_some());
1202 }
1203
1204 #[test]
1205 fn test_json_with_schema() {
1206 let schema = Schema::new(vec![
1207 Field::new("id", DataType::Int32, false),
1208 Field::new("name", DataType::Utf8, false),
1209 ]);
1210
1211 let options = JsonOptions::new().with_schema(schema);
1212 assert!(options.schema.is_some());
1213 }
1214
1215 #[test]
1216 fn test_schema_mismatch_error() {
1217 let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1218 let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
1219
1220 let batch1 = RecordBatch::try_new(schema1, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
1221 .ok()
1222 .unwrap_or_else(|| panic!("Should create batch"));
1223
1224 let batch2 = RecordBatch::try_new(
1225 schema2,
1226 vec![Arc::new(StringArray::from(vec!["a", "b", "c"]))],
1227 )
1228 .ok()
1229 .unwrap_or_else(|| panic!("Should create batch"));
1230
1231 let result = ArrowDataset::new(vec![batch1, batch2]);
1232 assert!(result.is_err());
1233 }
1234
1235 #[test]
1236 fn test_from_parquet_error() {
1237 let result = ArrowDataset::from_parquet("/nonexistent/path/to/file.parquet");
1238 assert!(result.is_err());
1239 }
1240
1241 #[test]
1242 fn test_from_csv_error() {
1243 let result = ArrowDataset::from_csv("/nonexistent/path/to/file.csv");
1244 assert!(result.is_err());
1245 }
1246
1247 #[test]
1248 fn test_from_json_error() {
1249 let result = ArrowDataset::from_json("/nonexistent/path/to/file.json");
1250 assert!(result.is_err());
1251 }
1252
1253 #[test]
1254 fn test_with_transform() {
1255 use crate::transform::Select;
1256
1257 let batch = create_test_batch(0, 10);
1258 let dataset = ArrowDataset::from_batch(batch)
1259 .ok()
1260 .unwrap_or_else(|| panic!("Should create dataset"));
1261
1262 let transform = Select::new(vec!["id"]);
1263 let transformed = dataset
1264 .with_transform(&transform)
1265 .ok()
1266 .unwrap_or_else(|| panic!("Should apply transform"));
1267
1268 assert_eq!(transformed.schema().fields().len(), 1);
1269 assert_eq!(transformed.len(), 10);
1270 }
1271
1272 #[test]
1273 fn test_batches_accessor() {
1274 let batch = create_test_batch(0, 10);
1275 let dataset = ArrowDataset::from_batch(batch)
1276 .ok()
1277 .unwrap_or_else(|| panic!("Should create dataset"));
1278
1279 let batches = dataset.batches();
1280 assert_eq!(batches.len(), 1);
1281 assert_eq!(batches[0].num_rows(), 10);
1282 }
1283
1284 #[test]
1285 fn test_rows_size_hint() {
1286 let batch = create_test_batch(0, 10);
1287 let dataset = ArrowDataset::from_batch(batch)
1288 .ok()
1289 .unwrap_or_else(|| panic!("Should create dataset"));
1290
1291 let rows = dataset.rows();
1292 assert_eq!(rows.size_hint(), (10, Some(10)));
1293 }
1294
1295 #[test]
1296 fn test_multiple_batches_iteration() {
1297 let batch1 = create_test_batch(0, 5);
1298 let batch2 = create_test_batch(5, 3);
1299 let batch3 = create_test_batch(8, 2);
1300
1301 let dataset = ArrowDataset::new(vec![batch1, batch2, batch3])
1302 .ok()
1303 .unwrap_or_else(|| panic!("Should create dataset"));
1304
1305 let total_rows: usize = dataset.iter().map(|b| b.num_rows()).sum();
1306 assert_eq!(total_rows, 10);
1307
1308 let row_count = dataset.rows().count();
1310 assert_eq!(row_count, 10);
1311 }
1312
1313 #[test]
1314 fn test_csv_options_debug() {
1315 let options = CsvOptions::default();
1316 let debug_str = format!("{:?}", options);
1317 assert!(debug_str.contains("CsvOptions"));
1318 }
1319
1320 #[test]
1321 fn test_json_options_debug() {
1322 let options = JsonOptions::default();
1323 let debug_str = format!("{:?}", options);
1324 assert!(debug_str.contains("JsonOptions"));
1325 }
1326
1327 #[test]
1330 fn test_from_parquet_bytes_and_to_parquet_bytes() {
1331 let batch = create_test_batch(0, 10);
1332 let dataset = ArrowDataset::from_batch(batch)
1333 .ok()
1334 .unwrap_or_else(|| panic!("Should create dataset"));
1335
1336 let bytes = dataset
1337 .to_parquet_bytes()
1338 .ok()
1339 .unwrap_or_else(|| panic!("Should convert to bytes"));
1340
1341 let loaded = ArrowDataset::from_parquet_bytes(&bytes)
1342 .ok()
1343 .unwrap_or_else(|| panic!("Should load from bytes"));
1344
1345 assert_eq!(loaded.len(), 10);
1346 assert_eq!(loaded.schema(), dataset.schema());
1347 }
1348
1349 #[test]
1350 fn test_from_csv_str() {
1351 let csv = "id,name\n1,Alice\n2,Bob\n3,Charlie";
1352 let dataset = ArrowDataset::from_csv_str(csv)
1353 .ok()
1354 .unwrap_or_else(|| panic!("Should parse CSV string"));
1355
1356 assert_eq!(dataset.len(), 3);
1357 assert_eq!(dataset.schema().fields().len(), 2);
1358 }
1359
1360 #[test]
1361 fn test_from_json_str() {
1362 let json = r#"{"id": 1, "name": "Alice"}
1363{"id": 2, "name": "Bob"}
1364{"id": 3, "name": "Charlie"}"#;
1365 let dataset = ArrowDataset::from_json_str(json)
1366 .ok()
1367 .unwrap_or_else(|| panic!("Should parse JSON string"));
1368
1369 assert_eq!(dataset.len(), 3);
1370 }
1371
1372 #[test]
1373 fn test_csv_with_options_custom_delimiter() {
1374 let temp_dir = tempfile::tempdir()
1375 .ok()
1376 .unwrap_or_else(|| panic!("Should create temp dir"));
1377 let path = temp_dir.path().join("test.tsv");
1378
1379 std::fs::write(&path, "id\tname\n1\tAlice\n2\tBob\n3\tCharlie")
1381 .ok()
1382 .unwrap_or_else(|| panic!("Should write TSV"));
1383
1384 let options = CsvOptions::new().with_delimiter(b'\t');
1385 let dataset = ArrowDataset::from_csv_with_options(&path, options)
1386 .ok()
1387 .unwrap_or_else(|| panic!("Should load TSV"));
1388
1389 assert_eq!(dataset.len(), 3);
1390 }
1391
1392 #[test]
1393 fn test_csv_without_header() {
1394 let temp_dir = tempfile::tempdir()
1395 .ok()
1396 .unwrap_or_else(|| panic!("Should create temp dir"));
1397 let path = temp_dir.path().join("no_header.csv");
1398
1399 std::fs::write(&path, "1,Alice\n2,Bob\n3,Charlie")
1401 .ok()
1402 .unwrap_or_else(|| panic!("Should write CSV"));
1403
1404 let options = CsvOptions::new().with_header(false);
1405 let dataset = ArrowDataset::from_csv_with_options(&path, options)
1406 .ok()
1407 .unwrap_or_else(|| panic!("Should load CSV"));
1408
1409 assert_eq!(dataset.len(), 3);
1410 }
1411
1412 #[test]
1413 fn test_json_with_options_batch_size() {
1414 let temp_dir = tempfile::tempdir()
1415 .ok()
1416 .unwrap_or_else(|| panic!("Should create temp dir"));
1417 let path = temp_dir.path().join("test.jsonl");
1418
1419 let json = r#"{"id": 1, "name": "Alice"}
1420{"id": 2, "name": "Bob"}
1421{"id": 3, "name": "Charlie"}"#;
1422 std::fs::write(&path, json)
1423 .ok()
1424 .unwrap_or_else(|| panic!("Should write JSON"));
1425
1426 let options = JsonOptions::new().with_batch_size(1024);
1427 let dataset = ArrowDataset::from_json_with_options(&path, options)
1428 .ok()
1429 .unwrap_or_else(|| panic!("Should load JSON"));
1430
1431 assert_eq!(dataset.len(), 3);
1432 }
1433
1434 #[test]
1435 fn test_row_iterator_multiple_batches_size_hint() {
1436 let batch1 = create_test_batch(0, 5);
1437 let batch2 = create_test_batch(5, 3);
1438 let batch3 = create_test_batch(8, 2);
1439
1440 let dataset = ArrowDataset::new(vec![batch1, batch2, batch3])
1441 .ok()
1442 .unwrap_or_else(|| panic!("Should create dataset"));
1443
1444 let mut iter = dataset.rows();
1445 assert_eq!(iter.size_hint(), (10, Some(10)));
1446
1447 iter.next();
1449 iter.next();
1450 assert_eq!(iter.size_hint(), (8, Some(8)));
1451
1452 for _ in 0..3 {
1454 iter.next();
1455 }
1456 assert_eq!(iter.size_hint(), (5, Some(5)));
1457 }
1458
1459 #[test]
1460 fn test_find_row_boundary_conditions() {
1461 let batch1 = create_test_batch(0, 3);
1462 let batch2 = create_test_batch(3, 2);
1463
1464 let dataset = ArrowDataset::new(vec![batch1, batch2])
1465 .ok()
1466 .unwrap_or_else(|| panic!("Should create dataset"));
1467
1468 let row0 = dataset.get(0);
1470 assert!(row0.is_some());
1471
1472 let row2 = dataset.get(2);
1474 assert!(row2.is_some());
1475
1476 let row3 = dataset.get(3);
1478 assert!(row3.is_some());
1479
1480 let row4 = dataset.get(4);
1482 assert!(row4.is_some());
1483
1484 let row5 = dataset.get(5);
1486 assert!(row5.is_none());
1487 }
1488
1489 #[test]
1490 fn test_empty_csv_str_error() {
1491 let result = ArrowDataset::from_csv_str("");
1493 assert!(result.is_err());
1494 }
1495
1496 #[test]
1497 fn test_empty_json_str_error() {
1498 let result = ArrowDataset::from_json_str("");
1500 assert!(result.is_err());
1501 }
1502
1503 #[test]
1504 fn test_dataset_trait_is_empty_for_nonempty() {
1505 let batch = create_test_batch(0, 1);
1506 let dataset = ArrowDataset::from_batch(batch)
1507 .ok()
1508 .unwrap_or_else(|| panic!("Should create dataset"));
1509
1510 assert!(!<ArrowDataset as Dataset>::is_empty(&dataset));
1511 }
1512
1513 #[test]
1514 fn test_rows_exact_size_iterator_len_exhaustion() {
1515 let batch = create_test_batch(0, 3);
1516 let dataset = ArrowDataset::from_batch(batch)
1517 .ok()
1518 .unwrap_or_else(|| panic!("Should create dataset"));
1519
1520 let mut iter = dataset.rows();
1521 assert_eq!(iter.len(), 3);
1522
1523 iter.next();
1524 assert_eq!(iter.len(), 2);
1525
1526 iter.next();
1527 assert_eq!(iter.len(), 1);
1528
1529 iter.next();
1530 assert_eq!(iter.len(), 0);
1531
1532 assert!(iter.next().is_none());
1534 assert_eq!(iter.len(), 0);
1535 }
1536
1537 #[test]
1538 fn test_csv_options_clone() {
1539 let options = CsvOptions::new()
1540 .with_header(false)
1541 .with_delimiter(b';')
1542 .with_batch_size(512);
1543 let cloned = options.clone();
1544
1545 assert_eq!(cloned.has_header, options.has_header);
1546 assert_eq!(cloned.delimiter, options.delimiter);
1547 assert_eq!(cloned.batch_size, options.batch_size);
1548 }
1549
1550 #[test]
1551 fn test_json_options_clone() {
1552 let options = JsonOptions::new().with_batch_size(256);
1553 let cloned = options.clone();
1554
1555 assert_eq!(cloned.batch_size, options.batch_size);
1556 }
1557
1558 #[test]
1559 fn test_iter_returns_cloned_batches() {
1560 let batch = create_test_batch(0, 5);
1561 let dataset = ArrowDataset::from_batch(batch)
1562 .ok()
1563 .unwrap_or_else(|| panic!("Should create dataset"));
1564
1565 let mut iter = dataset.iter();
1566 let first = iter.next();
1567 assert!(first.is_some());
1568
1569 assert_eq!(dataset.len(), 5);
1571 }
1572
1573 #[test]
1574 fn test_row_iterator_empty_batch_handling() {
1575 let batch1 = create_test_batch(0, 2);
1577 let batch2 = create_test_batch(2, 1);
1578
1579 let dataset = ArrowDataset::new(vec![batch1, batch2])
1580 .ok()
1581 .unwrap_or_else(|| panic!("Should create dataset"));
1582
1583 let rows: Vec<_> = dataset.rows().collect();
1584 assert_eq!(rows.len(), 3);
1585 }
1586
1587 #[test]
1588 fn test_large_batch_count() {
1589 let batches: Vec<RecordBatch> = (0..20).map(|i| create_test_batch(i * 2, 2)).collect();
1590
1591 let dataset = ArrowDataset::new(batches)
1592 .ok()
1593 .unwrap_or_else(|| panic!("Should create dataset"));
1594
1595 assert_eq!(dataset.len(), 40);
1596 assert_eq!(dataset.num_batches(), 20);
1597
1598 let row = dataset.get(39);
1600 assert!(row.is_some());
1601 }
1602
1603 #[test]
1604 fn test_csv_write_and_verify_content() {
1605 let batch = create_test_batch(0, 3);
1606 let dataset = ArrowDataset::from_batch(batch)
1607 .ok()
1608 .unwrap_or_else(|| panic!("Should create dataset"));
1609
1610 let temp_dir = tempfile::tempdir()
1611 .ok()
1612 .unwrap_or_else(|| panic!("Should create temp dir"));
1613 let path = temp_dir.path().join("verify.csv");
1614
1615 dataset
1616 .to_csv(&path)
1617 .ok()
1618 .unwrap_or_else(|| panic!("Should write CSV"));
1619
1620 let content = std::fs::read_to_string(&path)
1622 .ok()
1623 .unwrap_or_else(|| panic!("Should read file"));
1624 assert!(content.contains("id"));
1625 assert!(content.contains("name"));
1626 }
1627
1628 #[test]
1629 fn test_json_write_and_verify_content() {
1630 let batch = create_test_batch(0, 2);
1631 let dataset = ArrowDataset::from_batch(batch)
1632 .ok()
1633 .unwrap_or_else(|| panic!("Should create dataset"));
1634
1635 let temp_dir = tempfile::tempdir()
1636 .ok()
1637 .unwrap_or_else(|| panic!("Should create temp dir"));
1638 let path = temp_dir.path().join("verify.jsonl");
1639
1640 dataset
1641 .to_json(&path)
1642 .ok()
1643 .unwrap_or_else(|| panic!("Should write JSON"));
1644
1645 let content = std::fs::read_to_string(&path)
1647 .ok()
1648 .unwrap_or_else(|| panic!("Should read file"));
1649 assert!(content.contains("\"id\""));
1650 assert!(content.contains("\"name\""));
1651 }
1652
1653 #[test]
1654 fn test_to_ipc_error_invalid_path() {
1655 let batch = create_test_batch(0, 5);
1656 let dataset = ArrowDataset::from_batch(batch)
1657 .ok()
1658 .unwrap_or_else(|| panic!("Should create dataset"));
1659
1660 let result = dataset.to_ipc("/nonexistent/path/output.arrow");
1661 assert!(result.is_err());
1662 }
1663
1664 #[test]
1665 fn test_to_ipc_stream_error_invalid_path() {
1666 let batch = create_test_batch(0, 5);
1667 let dataset = ArrowDataset::from_batch(batch)
1668 .ok()
1669 .unwrap_or_else(|| panic!("Should create dataset"));
1670
1671 let result = dataset.to_ipc_stream("/nonexistent/path/output.arrows");
1672 assert!(result.is_err());
1673 }
1674
1675 #[test]
1676 fn test_to_parquet_error_invalid_path() {
1677 let batch = create_test_batch(0, 5);
1678 let dataset = ArrowDataset::from_batch(batch)
1679 .ok()
1680 .unwrap_or_else(|| panic!("Should create dataset"));
1681
1682 let result = dataset.to_parquet("/nonexistent/path/output.parquet");
1683 assert!(result.is_err());
1684 }
1685
1686 #[test]
1687 fn test_to_csv_error_invalid_path() {
1688 let batch = create_test_batch(0, 5);
1689 let dataset = ArrowDataset::from_batch(batch)
1690 .ok()
1691 .unwrap_or_else(|| panic!("Should create dataset"));
1692
1693 let result = dataset.to_csv("/nonexistent/path/output.csv");
1694 assert!(result.is_err());
1695 }
1696
1697 #[test]
1698 fn test_to_json_error_invalid_path() {
1699 let batch = create_test_batch(0, 5);
1700 let dataset = ArrowDataset::from_batch(batch)
1701 .ok()
1702 .unwrap_or_else(|| panic!("Should create dataset"));
1703
1704 let result = dataset.to_json("/nonexistent/path/output.jsonl");
1705 assert!(result.is_err());
1706 }
1707}