1use std::{collections::VecDeque, path::Path, sync::Arc};
8
9use arrow::{array::RecordBatch, datatypes::SchemaRef};
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11
12use crate::error::{Error, Result};
13
14pub trait DataSource: Send {
16 fn schema(&self) -> SchemaRef;
18
19 fn next_batch(&mut self) -> Result<Option<RecordBatch>>;
25
26 fn size_hint(&self) -> Option<usize> {
28 None
29 }
30
31 fn reset(&mut self) -> Result<()> {
37 Err(Error::storage("This data source does not support reset"))
38 }
39}
40
41pub struct StreamingDataset {
61 source: Box<dyn DataSource>,
62 buffer: VecDeque<RecordBatch>,
63 buffer_size: usize,
64 prefetch: usize,
65 schema: SchemaRef,
66 exhausted: bool,
67}
68
69impl StreamingDataset {
70 pub fn new(source: Box<dyn DataSource>, buffer_size: usize) -> Self {
82 let schema = source.schema();
83 Self {
84 source,
85 buffer: VecDeque::with_capacity(buffer_size),
86 buffer_size: buffer_size.max(1),
87 prefetch: 1,
88 schema,
89 exhausted: false,
90 }
91 }
92
93 pub fn from_parquet(path: impl AsRef<Path>, batch_size: usize) -> Result<Self> {
104 let source = ParquetSource::new(path, batch_size)?;
105 Ok(Self::new(Box::new(source), 4))
106 }
107
108 #[must_use]
112 pub fn prefetch(mut self, count: usize) -> Self {
113 self.prefetch = count.max(1);
114 self
115 }
116
117 pub fn schema(&self) -> SchemaRef {
119 Arc::clone(&self.schema)
120 }
121
122 pub fn size_hint(&self) -> Option<usize> {
124 self.source.size_hint()
125 }
126
127 fn fill_buffer(&mut self) -> Result<()> {
129 while !self.exhausted && self.buffer.len() < self.prefetch {
130 if let Some(batch) = self.source.next_batch()? {
131 self.buffer.push_back(batch);
132 } else {
133 self.exhausted = true;
134 break;
135 }
136 }
137 Ok(())
138 }
139
140 pub fn reset(&mut self) -> Result<()> {
146 self.source.reset()?;
147 self.buffer.clear();
148 self.exhausted = false;
149 Ok(())
150 }
151}
152
153impl Iterator for StreamingDataset {
154 type Item = RecordBatch;
155
156 fn next(&mut self) -> Option<Self::Item> {
157 if let Err(_e) = self.fill_buffer() {
159 return None;
160 }
161
162 self.buffer.pop_front()
163 }
164}
165
166impl std::fmt::Debug for StreamingDataset {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 f.debug_struct("StreamingDataset")
169 .field("buffer_size", &self.buffer_size)
170 .field("prefetch", &self.prefetch)
171 .field("buffered", &self.buffer.len())
172 .field("exhausted", &self.exhausted)
173 .finish_non_exhaustive()
174 }
175}
176
177pub struct ParquetSource {
179 reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
180 schema: SchemaRef,
181 path: std::path::PathBuf,
182 batch_size: usize,
183}
184
185impl ParquetSource {
186 pub fn new(path: impl AsRef<Path>, batch_size: usize) -> Result<Self> {
192 let path = path.as_ref().to_path_buf();
193 let file = std::fs::File::open(&path).map_err(|e| Error::io(e, &path))?;
194
195 let builder = ParquetRecordBatchReaderBuilder::try_new(file)
196 .map_err(Error::Parquet)?
197 .with_batch_size(batch_size);
198
199 let schema = builder.schema().clone();
200 let reader = builder.build().map_err(Error::Parquet)?;
201
202 Ok(Self {
203 reader,
204 schema,
205 path,
206 batch_size,
207 })
208 }
209}
210
211impl DataSource for ParquetSource {
212 fn schema(&self) -> SchemaRef {
213 Arc::clone(&self.schema)
214 }
215
216 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
217 match self.reader.next() {
218 Some(Ok(batch)) => Ok(Some(batch)),
219 Some(Err(e)) => Err(Error::Arrow(e)),
220 None => Ok(None),
221 }
222 }
223
224 fn reset(&mut self) -> Result<()> {
225 let file = std::fs::File::open(&self.path).map_err(|e| Error::io(e, &self.path))?;
227
228 let builder = ParquetRecordBatchReaderBuilder::try_new(file)
229 .map_err(Error::Parquet)?
230 .with_batch_size(self.batch_size);
231
232 self.reader = builder.build().map_err(Error::Parquet)?;
233 Ok(())
234 }
235}
236
237impl std::fmt::Debug for ParquetSource {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 f.debug_struct("ParquetSource")
240 .field("path", &self.path)
241 .field("batch_size", &self.batch_size)
242 .finish_non_exhaustive()
243 }
244}
245
246#[derive(Debug)]
251pub struct MemorySource {
252 batches: Vec<RecordBatch>,
253 schema: SchemaRef,
254 position: usize,
255}
256
257impl MemorySource {
258 pub fn new(batches: Vec<RecordBatch>) -> Result<Self> {
264 if batches.is_empty() {
265 return Err(Error::EmptyDataset);
266 }
267
268 let schema = batches[0].schema();
269 Ok(Self {
270 batches,
271 schema,
272 position: 0,
273 })
274 }
275}
276
277impl DataSource for MemorySource {
278 fn schema(&self) -> SchemaRef {
279 Arc::clone(&self.schema)
280 }
281
282 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
283 if self.position >= self.batches.len() {
284 return Ok(None);
285 }
286
287 let batch = self.batches[self.position].clone();
288 self.position += 1;
289 Ok(Some(batch))
290 }
291
292 fn size_hint(&self) -> Option<usize> {
293 Some(self.batches.iter().map(|b| b.num_rows()).sum())
294 }
295
296 fn reset(&mut self) -> Result<()> {
297 self.position = 0;
298 Ok(())
299 }
300}
301
302pub struct ChainedSource {
304 sources: Vec<Box<dyn DataSource>>,
305 current: usize,
306 schema: SchemaRef,
307}
308
309impl ChainedSource {
310 pub fn new(sources: Vec<Box<dyn DataSource>>) -> Result<Self> {
316 if sources.is_empty() {
317 return Err(Error::invalid_config(
318 "ChainedSource requires at least one source",
319 ));
320 }
321
322 let schema = sources[0].schema();
323 Ok(Self {
324 sources,
325 current: 0,
326 schema,
327 })
328 }
329}
330
331impl DataSource for ChainedSource {
332 fn schema(&self) -> SchemaRef {
333 Arc::clone(&self.schema)
334 }
335
336 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
337 while self.current < self.sources.len() {
338 match self.sources[self.current].next_batch()? {
339 Some(batch) => return Ok(Some(batch)),
340 None => self.current += 1,
341 }
342 }
343 Ok(None)
344 }
345
346 fn size_hint(&self) -> Option<usize> {
347 let mut total = 0;
348 for source in &self.sources {
349 match source.size_hint() {
350 Some(hint) => total += hint,
351 None => return None,
352 }
353 }
354 Some(total)
355 }
356
357 fn reset(&mut self) -> Result<()> {
358 for source in &mut self.sources {
359 source.reset()?;
360 }
361 self.current = 0;
362 Ok(())
363 }
364}
365
366impl std::fmt::Debug for ChainedSource {
367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 f.debug_struct("ChainedSource")
369 .field("num_sources", &self.sources.len())
370 .field("current", &self.current)
371 .finish_non_exhaustive()
372 }
373}
374
375#[cfg(test)]
376#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
377mod tests {
378 use arrow::{
379 array::{Int32Array, StringArray},
380 datatypes::{DataType, Field, Schema},
381 };
382
383 use super::*;
384
385 fn create_test_batch(start: i32, count: usize) -> RecordBatch {
386 let schema = Arc::new(Schema::new(vec![
387 Field::new("id", DataType::Int32, false),
388 Field::new("name", DataType::Utf8, false),
389 ]));
390
391 let ids: Vec<i32> = (start..start + count as i32).collect();
392 let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
393
394 RecordBatch::try_new(
395 schema,
396 vec![
397 Arc::new(Int32Array::from(ids)),
398 Arc::new(StringArray::from(names)),
399 ],
400 )
401 .ok()
402 .unwrap_or_else(|| panic!("Should create batch"))
403 }
404
405 #[test]
406 fn test_memory_source() {
407 let batches = vec![
408 create_test_batch(0, 5),
409 create_test_batch(5, 5),
410 create_test_batch(10, 5),
411 ];
412
413 let mut source = MemorySource::new(batches)
414 .ok()
415 .unwrap_or_else(|| panic!("Should create source"));
416
417 assert_eq!(source.size_hint(), Some(15));
418
419 let mut count = 0;
420 while let Ok(Some(batch)) = source.next_batch() {
421 count += batch.num_rows();
422 }
423 assert_eq!(count, 15);
424 }
425
426 #[test]
427 fn test_memory_source_reset() {
428 let batches = vec![create_test_batch(0, 5)];
429
430 let mut source = MemorySource::new(batches)
431 .ok()
432 .unwrap_or_else(|| panic!("Should create source"));
433
434 let _ = source.next_batch();
436 assert!(source.next_batch().ok().flatten().is_none());
437
438 source
440 .reset()
441 .ok()
442 .unwrap_or_else(|| panic!("Should reset"));
443 assert!(source.next_batch().ok().flatten().is_some());
444 }
445
446 #[test]
447 fn test_streaming_dataset_from_memory() {
448 let batches = vec![create_test_batch(0, 10), create_test_batch(10, 10)];
449
450 let source = MemorySource::new(batches)
451 .ok()
452 .unwrap_or_else(|| panic!("Should create source"));
453
454 let dataset = StreamingDataset::new(Box::new(source), 4);
455
456 let mut total = 0;
457 for batch in dataset {
458 total += batch.num_rows();
459 }
460 assert_eq!(total, 20);
461 }
462
463 #[test]
464 fn test_streaming_dataset_prefetch() {
465 let batches = vec![
466 create_test_batch(0, 5),
467 create_test_batch(5, 5),
468 create_test_batch(10, 5),
469 ];
470
471 let source = MemorySource::new(batches)
472 .ok()
473 .unwrap_or_else(|| panic!("Should create source"));
474
475 let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(2);
476
477 let collected: Vec<RecordBatch> = dataset.collect();
478 assert_eq!(collected.len(), 3);
479 }
480
481 #[test]
482 fn test_streaming_dataset_schema() {
483 let batches = vec![create_test_batch(0, 5)];
484
485 let source = MemorySource::new(batches)
486 .ok()
487 .unwrap_or_else(|| panic!("Should create source"));
488
489 let dataset = StreamingDataset::new(Box::new(source), 4);
490
491 assert_eq!(dataset.schema().fields().len(), 2);
492 assert_eq!(dataset.schema().field(0).name(), "id");
493 }
494
495 #[test]
496 fn test_streaming_dataset_reset() {
497 let batches = vec![create_test_batch(0, 5)];
498
499 let source = MemorySource::new(batches)
500 .ok()
501 .unwrap_or_else(|| panic!("Should create source"));
502
503 let mut dataset = StreamingDataset::new(Box::new(source), 4);
504
505 let first: Vec<RecordBatch> = dataset.by_ref().collect();
507 assert_eq!(first.len(), 1);
508
509 dataset
511 .reset()
512 .ok()
513 .unwrap_or_else(|| panic!("Should reset"));
514
515 let second: Vec<RecordBatch> = dataset.collect();
517 assert_eq!(second.len(), 1);
518 }
519
520 #[test]
521 fn test_chained_source() {
522 let source1 = MemorySource::new(vec![create_test_batch(0, 5)])
523 .ok()
524 .unwrap_or_else(|| panic!("Should create source"));
525 let source2 = MemorySource::new(vec![create_test_batch(5, 5)])
526 .ok()
527 .unwrap_or_else(|| panic!("Should create source"));
528
529 let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
530 .ok()
531 .unwrap_or_else(|| panic!("Should create chained"));
532
533 assert_eq!(chained.size_hint(), Some(10));
534
535 let mut total = 0;
536 while let Ok(Some(batch)) = chained.next_batch() {
537 total += batch.num_rows();
538 }
539 assert_eq!(total, 10);
540 }
541
542 #[test]
543 fn test_empty_memory_source_error() {
544 let result = MemorySource::new(vec![]);
545 assert!(result.is_err());
546 }
547
548 #[test]
549 fn test_empty_chained_source_error() {
550 let result = ChainedSource::new(vec![]);
551 assert!(result.is_err());
552 }
553
554 #[test]
555 fn test_parquet_source_roundtrip() {
556 let batch = create_test_batch(0, 100);
558 let dataset = crate::ArrowDataset::from_batch(batch)
559 .ok()
560 .unwrap_or_else(|| panic!("Should create dataset"));
561
562 let temp_dir = tempfile::tempdir()
564 .ok()
565 .unwrap_or_else(|| panic!("Should create temp dir"));
566 let path = temp_dir.path().join("test.parquet");
567 dataset
568 .to_parquet(&path)
569 .ok()
570 .unwrap_or_else(|| panic!("Should write parquet"));
571
572 let streaming = StreamingDataset::from_parquet(&path, 25)
574 .ok()
575 .unwrap_or_else(|| panic!("Should create streaming"));
576
577 let total: usize = streaming.map(|b| b.num_rows()).sum();
578 assert_eq!(total, 100);
579 }
580
581 #[test]
582 fn test_streaming_dataset_debug() {
583 let batches = vec![create_test_batch(0, 5)];
584 let source = MemorySource::new(batches)
585 .ok()
586 .unwrap_or_else(|| panic!("Should create source"));
587 let dataset = StreamingDataset::new(Box::new(source), 4);
588
589 let debug_str = format!("{:?}", dataset);
590 assert!(debug_str.contains("StreamingDataset"));
591 }
592
593 #[test]
594 fn test_parquet_source_debug() {
595 let batch = create_test_batch(0, 10);
597 let dataset = crate::ArrowDataset::from_batch(batch)
598 .ok()
599 .unwrap_or_else(|| panic!("Should create dataset"));
600
601 let temp_dir = tempfile::tempdir()
602 .ok()
603 .unwrap_or_else(|| panic!("Should create temp dir"));
604 let path = temp_dir.path().join("debug_test.parquet");
605 dataset
606 .to_parquet(&path)
607 .ok()
608 .unwrap_or_else(|| panic!("Should write parquet"));
609
610 let source = ParquetSource::new(&path, 10)
611 .ok()
612 .unwrap_or_else(|| panic!("Should create source"));
613
614 let debug_str = format!("{:?}", source);
615 assert!(debug_str.contains("ParquetSource"));
616 assert!(debug_str.contains("debug_test.parquet"));
617 }
618
619 #[test]
620 fn test_chained_source_debug() {
621 let source1 = MemorySource::new(vec![create_test_batch(0, 5)])
622 .ok()
623 .unwrap_or_else(|| panic!("Should create source"));
624
625 let chained = ChainedSource::new(vec![Box::new(source1)])
626 .ok()
627 .unwrap_or_else(|| panic!("Should create chained"));
628
629 let debug_str = format!("{:?}", chained);
630 assert!(debug_str.contains("ChainedSource"));
631 assert!(debug_str.contains("num_sources"));
632 }
633
634 #[test]
635 fn test_streaming_dataset_size_hint() {
636 let batches = vec![create_test_batch(0, 10), create_test_batch(10, 15)];
637 let source = MemorySource::new(batches)
638 .ok()
639 .unwrap_or_else(|| panic!("Should create source"));
640
641 let dataset = StreamingDataset::new(Box::new(source), 4);
642 assert_eq!(dataset.size_hint(), Some(25));
643 }
644
645 #[test]
646 fn test_parquet_source_reset() {
647 let batch = create_test_batch(0, 50);
649 let dataset = crate::ArrowDataset::from_batch(batch)
650 .ok()
651 .unwrap_or_else(|| panic!("Should create dataset"));
652
653 let temp_dir = tempfile::tempdir()
654 .ok()
655 .unwrap_or_else(|| panic!("Should create temp dir"));
656 let path = temp_dir.path().join("reset_test.parquet");
657 dataset
658 .to_parquet(&path)
659 .ok()
660 .unwrap_or_else(|| panic!("Should write parquet"));
661
662 let mut source = ParquetSource::new(&path, 10)
663 .ok()
664 .unwrap_or_else(|| panic!("Should create source"));
665
666 let mut count = 0;
668 while let Ok(Some(_)) = source.next_batch() {
669 count += 1;
670 }
671 assert!(count > 0);
672
673 source
675 .reset()
676 .ok()
677 .unwrap_or_else(|| panic!("Should reset"));
678
679 let mut count2 = 0;
680 while let Ok(Some(_)) = source.next_batch() {
681 count2 += 1;
682 }
683 assert_eq!(count, count2);
684 }
685
686 #[test]
687 fn test_chained_source_reset() {
688 let source1 = MemorySource::new(vec![create_test_batch(0, 5)])
689 .ok()
690 .unwrap_or_else(|| panic!("Should create source"));
691 let source2 = MemorySource::new(vec![create_test_batch(5, 5)])
692 .ok()
693 .unwrap_or_else(|| panic!("Should create source"));
694
695 let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
696 .ok()
697 .unwrap_or_else(|| panic!("Should create chained"));
698
699 let mut count = 0;
701 while let Ok(Some(_)) = chained.next_batch() {
702 count += 1;
703 }
704 assert_eq!(count, 2);
705
706 chained
708 .reset()
709 .ok()
710 .unwrap_or_else(|| panic!("Should reset"));
711
712 let mut count2 = 0;
714 while let Ok(Some(_)) = chained.next_batch() {
715 count2 += 1;
716 }
717 assert_eq!(count2, 2);
718 }
719
720 #[test]
721 fn test_chained_source_size_hint_unknown() {
722 struct UnknownSizeSource {
724 schema: SchemaRef,
725 count: usize,
726 }
727
728 impl DataSource for UnknownSizeSource {
729 fn schema(&self) -> SchemaRef {
730 Arc::clone(&self.schema)
731 }
732
733 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
734 if self.count > 0 {
735 self.count -= 1;
736 Ok(Some(create_test_batch(0, 1)))
737 } else {
738 Ok(None)
739 }
740 }
741
742 }
744
745 let schema = Arc::new(Schema::new(vec![
746 Field::new("id", DataType::Int32, false),
747 Field::new("name", DataType::Utf8, false),
748 ]));
749
750 let memory_source = MemorySource::new(vec![create_test_batch(0, 5)])
751 .ok()
752 .unwrap_or_else(|| panic!("Should create source"));
753
754 let unknown_source = UnknownSizeSource { schema, count: 1 };
755
756 let chained = ChainedSource::new(vec![Box::new(memory_source), Box::new(unknown_source)])
757 .ok()
758 .unwrap_or_else(|| panic!("Should create chained"));
759
760 assert_eq!(chained.size_hint(), None);
762 }
763
764 #[test]
765 fn test_streaming_dataset_buffer_size_minimum() {
766 let batches = vec![create_test_batch(0, 5)];
767 let source = MemorySource::new(batches)
768 .ok()
769 .unwrap_or_else(|| panic!("Should create source"));
770
771 let dataset = StreamingDataset::new(Box::new(source), 0);
773 let collected: Vec<RecordBatch> = dataset.collect();
774 assert_eq!(collected.len(), 1);
775 }
776
777 #[test]
778 fn test_streaming_dataset_prefetch_minimum() {
779 let batches = vec![create_test_batch(0, 5), create_test_batch(5, 5)];
780 let source = MemorySource::new(batches)
781 .ok()
782 .unwrap_or_else(|| panic!("Should create source"));
783
784 let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(0);
786 let collected: Vec<RecordBatch> = dataset.collect();
787 assert_eq!(collected.len(), 2);
788 }
789
790 #[test]
791 fn test_memory_source_debug() {
792 let batches = vec![create_test_batch(0, 5)];
793 let source = MemorySource::new(batches)
794 .ok()
795 .unwrap_or_else(|| panic!("Should create source"));
796
797 let debug_str = format!("{:?}", source);
798 assert!(debug_str.contains("MemorySource"));
799 }
800
801 #[test]
802 fn test_parquet_source_schema() {
803 let batch = create_test_batch(0, 10);
804 let dataset = crate::ArrowDataset::from_batch(batch)
805 .ok()
806 .unwrap_or_else(|| panic!("Should create dataset"));
807
808 let temp_dir = tempfile::tempdir()
809 .ok()
810 .unwrap_or_else(|| panic!("Should create temp dir"));
811 let path = temp_dir.path().join("schema_test.parquet");
812 dataset
813 .to_parquet(&path)
814 .ok()
815 .unwrap_or_else(|| panic!("Should write parquet"));
816
817 let source = ParquetSource::new(&path, 10)
818 .ok()
819 .unwrap_or_else(|| panic!("Should create source"));
820
821 let schema = source.schema();
822 assert_eq!(schema.fields().len(), 2);
823 assert_eq!(schema.field(0).name(), "id");
824 }
825
826 #[test]
827 fn test_chained_source_schema() {
828 let source = MemorySource::new(vec![create_test_batch(0, 5)])
829 .ok()
830 .unwrap_or_else(|| panic!("Should create source"));
831
832 let chained = ChainedSource::new(vec![Box::new(source)])
833 .ok()
834 .unwrap_or_else(|| panic!("Should create chained"));
835
836 let schema = chained.schema();
837 assert_eq!(schema.fields().len(), 2);
838 }
839
840 #[test]
841 fn test_parquet_source_file_not_found() {
842 let result = ParquetSource::new("/nonexistent/path/to/file.parquet", 100);
843 assert!(result.is_err());
844 }
845
846 #[test]
847 fn test_streaming_dataset_from_parquet_not_found() {
848 let result = StreamingDataset::from_parquet("/nonexistent/file.parquet", 100);
849 assert!(result.is_err());
850 }
851
852 #[test]
853 fn test_data_source_default_reset_error() {
854 struct NoResetSource {
856 schema: SchemaRef,
857 }
858
859 impl DataSource for NoResetSource {
860 fn schema(&self) -> SchemaRef {
861 Arc::clone(&self.schema)
862 }
863
864 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
865 Ok(None)
866 }
867 }
869
870 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
871
872 let mut source = NoResetSource { schema };
873 let result = source.reset();
874 assert!(result.is_err());
875 }
876
877 #[test]
878 fn test_streaming_dataset_reset_unsupported() {
879 struct NoResetSource {
881 schema: SchemaRef,
882 done: bool,
883 }
884
885 impl DataSource for NoResetSource {
886 fn schema(&self) -> SchemaRef {
887 Arc::clone(&self.schema)
888 }
889
890 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
891 if self.done {
892 Ok(None)
893 } else {
894 self.done = true;
895 Ok(Some(create_test_batch(0, 5)))
896 }
897 }
898 }
900
901 let schema = Arc::new(Schema::new(vec![
902 Field::new("id", DataType::Int32, false),
903 Field::new("name", DataType::Utf8, false),
904 ]));
905
906 let source = NoResetSource {
907 schema,
908 done: false,
909 };
910 let mut dataset = StreamingDataset::new(Box::new(source), 4);
911
912 let _: Vec<_> = dataset.by_ref().collect();
914
915 let result = dataset.reset();
917 assert!(result.is_err());
918 }
919
920 #[test]
921 fn test_streaming_dataset_fill_buffer_error() {
922 struct ErrorSource {
924 schema: SchemaRef,
925 error_on_call: usize,
926 call_count: usize,
927 }
928
929 impl DataSource for ErrorSource {
930 fn schema(&self) -> SchemaRef {
931 Arc::clone(&self.schema)
932 }
933
934 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
935 self.call_count += 1;
936 if self.call_count >= self.error_on_call {
937 Err(crate::Error::storage("Simulated error"))
938 } else {
939 Ok(Some(create_test_batch(0, 5)))
940 }
941 }
942 }
943
944 let schema = Arc::new(Schema::new(vec![
945 Field::new("id", DataType::Int32, false),
946 Field::new("name", DataType::Utf8, false),
947 ]));
948
949 let source = ErrorSource {
950 schema,
951 error_on_call: 3, call_count: 0,
953 };
954
955 let mut dataset = StreamingDataset::new(Box::new(source), 4).prefetch(1);
957
958 let first = dataset.next();
960 assert!(first.is_some());
961 let second = dataset.next();
962 assert!(second.is_some());
963
964 let third = dataset.next();
966 assert!(third.is_none());
967 }
968
969 #[test]
970 fn test_streaming_dataset_large_prefetch() {
971 let batches = vec![
972 create_test_batch(0, 10),
973 create_test_batch(10, 10),
974 create_test_batch(20, 10),
975 create_test_batch(30, 10),
976 create_test_batch(40, 10),
977 ];
978
979 let source = MemorySource::new(batches)
980 .ok()
981 .unwrap_or_else(|| panic!("Should create source"));
982
983 let dataset = StreamingDataset::new(Box::new(source), 10).prefetch(100);
985
986 let collected: Vec<RecordBatch> = dataset.collect();
987 assert_eq!(collected.len(), 5);
988 }
989
990 #[test]
991 fn test_memory_source_multiple_iterations() {
992 let batches = vec![create_test_batch(0, 5), create_test_batch(5, 5)];
993
994 let mut source = MemorySource::new(batches)
995 .ok()
996 .unwrap_or_else(|| panic!("Should create source"));
997
998 let mut count1 = 0;
1000 while let Ok(Some(_)) = source.next_batch() {
1001 count1 += 1;
1002 }
1003 assert_eq!(count1, 2);
1004
1005 source
1007 .reset()
1008 .ok()
1009 .unwrap_or_else(|| panic!("Should reset"));
1010
1011 let mut count2 = 0;
1012 while let Ok(Some(_)) = source.next_batch() {
1013 count2 += 1;
1014 }
1015 assert_eq!(count2, 2);
1016 }
1017
1018 #[test]
1019 fn test_chained_source_exhaustion() {
1020 let source1 = MemorySource::new(vec![create_test_batch(0, 3)])
1021 .ok()
1022 .unwrap_or_else(|| panic!("Should create source"));
1023 let source2 = MemorySource::new(vec![create_test_batch(3, 2)])
1024 .ok()
1025 .unwrap_or_else(|| panic!("Should create source"));
1026 let source3 = MemorySource::new(vec![create_test_batch(5, 1)])
1027 .ok()
1028 .unwrap_or_else(|| panic!("Should create source"));
1029
1030 let mut chained = ChainedSource::new(vec![
1031 Box::new(source1),
1032 Box::new(source2),
1033 Box::new(source3),
1034 ])
1035 .ok()
1036 .unwrap_or_else(|| panic!("Should create chained"));
1037
1038 let mut batches = Vec::new();
1039 while let Ok(Some(batch)) = chained.next_batch() {
1040 batches.push(batch);
1041 }
1042
1043 assert_eq!(batches.len(), 3);
1044 assert_eq!(batches[0].num_rows(), 3);
1045 assert_eq!(batches[1].num_rows(), 2);
1046 assert_eq!(batches[2].num_rows(), 1);
1047 }
1048
1049 #[test]
1050 fn test_streaming_dataset_empty_iteration() {
1051 struct EmptySource {
1052 schema: SchemaRef,
1053 }
1054
1055 impl DataSource for EmptySource {
1056 fn schema(&self) -> SchemaRef {
1057 Arc::clone(&self.schema)
1058 }
1059
1060 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1061 Ok(None)
1062 }
1063 }
1064
1065 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1066
1067 let source = EmptySource { schema };
1068 let dataset = StreamingDataset::new(Box::new(source), 4);
1069
1070 let collected: Vec<RecordBatch> = dataset.collect();
1071 assert!(collected.is_empty());
1072 }
1073
1074 #[test]
1075 fn test_streaming_dataset_single_batch() {
1076 let batches = vec![create_test_batch(0, 100)];
1077
1078 let source = MemorySource::new(batches)
1079 .ok()
1080 .unwrap_or_else(|| panic!("Should create source"));
1081
1082 let dataset = StreamingDataset::new(Box::new(source), 1);
1083
1084 let collected: Vec<RecordBatch> = dataset.collect();
1085 assert_eq!(collected.len(), 1);
1086 assert_eq!(collected[0].num_rows(), 100);
1087 }
1088
1089 #[test]
1090 fn test_parquet_source_batch_size_variation() {
1091 let batch = create_test_batch(0, 100);
1093 let dataset = crate::ArrowDataset::from_batch(batch)
1094 .ok()
1095 .unwrap_or_else(|| panic!("Should create dataset"));
1096
1097 let temp_dir = tempfile::tempdir()
1098 .ok()
1099 .unwrap_or_else(|| panic!("Should create temp dir"));
1100 let path = temp_dir.path().join("batch_size_test.parquet");
1101 dataset
1102 .to_parquet(&path)
1103 .ok()
1104 .unwrap_or_else(|| panic!("Should write parquet"));
1105
1106 for batch_size in [1, 10, 50, 100, 200] {
1108 let source = ParquetSource::new(&path, batch_size)
1109 .ok()
1110 .unwrap_or_else(|| panic!("Should create source"));
1111
1112 let streaming = StreamingDataset::new(Box::new(source), 4);
1113 let total: usize = streaming.map(|b| b.num_rows()).sum();
1114 assert_eq!(
1115 total, 100,
1116 "Batch size {} should read all 100 rows",
1117 batch_size
1118 );
1119 }
1120 }
1121
1122 #[test]
1123 fn test_chained_source_single_source() {
1124 let batches = vec![create_test_batch(0, 50)];
1125 let source = MemorySource::new(batches)
1126 .ok()
1127 .unwrap_or_else(|| panic!("source"));
1128
1129 let chained = ChainedSource::new(vec![Box::new(source)])
1130 .ok()
1131 .unwrap_or_else(|| panic!("chained"));
1132 let dataset = StreamingDataset::new(Box::new(chained), 4);
1133
1134 let collected: Vec<RecordBatch> = dataset.collect();
1135 assert_eq!(collected.len(), 1);
1136 assert_eq!(collected[0].num_rows(), 50);
1137 }
1138
1139 #[test]
1140 fn test_chained_source_multiple_sources() {
1141 let source1 = MemorySource::new(vec![create_test_batch(0, 30)])
1142 .ok()
1143 .unwrap_or_else(|| panic!("source1"));
1144 let source2 = MemorySource::new(vec![create_test_batch(30, 20)])
1145 .ok()
1146 .unwrap_or_else(|| panic!("source2"));
1147
1148 let chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1149 .ok()
1150 .unwrap_or_else(|| panic!("chained"));
1151 let dataset = StreamingDataset::new(Box::new(chained), 4);
1152
1153 let collected: Vec<RecordBatch> = dataset.collect();
1154 assert_eq!(collected.len(), 2);
1155
1156 let total: usize = collected.iter().map(|b| b.num_rows()).sum();
1157 assert_eq!(total, 50);
1158 }
1159
1160 #[test]
1161 fn test_chained_source_empty_sources_vec() {
1162 let result: std::result::Result<ChainedSource, Error> = ChainedSource::new(vec![]);
1164 assert!(result.is_err());
1165 }
1166
1167 #[test]
1168 fn test_chained_source_with_empty_yielding_sources() {
1169 struct EmptyYieldSource {
1170 schema: SchemaRef,
1171 }
1172 impl DataSource for EmptyYieldSource {
1173 fn schema(&self) -> SchemaRef {
1174 Arc::clone(&self.schema)
1175 }
1176 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1177 Ok(None)
1178 }
1179 }
1180
1181 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1182
1183 let source1 = EmptyYieldSource {
1184 schema: Arc::clone(&schema),
1185 };
1186 let source2 = EmptyYieldSource { schema };
1187
1188 let chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1189 .ok()
1190 .unwrap_or_else(|| panic!("chained"));
1191 let dataset = StreamingDataset::new(Box::new(chained), 4);
1192
1193 let collected: Vec<RecordBatch> = dataset.collect();
1194 assert!(collected.is_empty());
1195 }
1196
1197 #[test]
1198 fn test_streaming_dataset_prefetch_config() {
1199 let batches = vec![create_test_batch(0, 100)];
1200 let source = MemorySource::new(batches)
1201 .ok()
1202 .unwrap_or_else(|| panic!("source"));
1203
1204 let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(2);
1205
1206 let collected: Vec<RecordBatch> = dataset.collect();
1207 assert_eq!(collected.len(), 1);
1208 }
1209
1210 #[test]
1211 fn test_chained_source_size_hint() {
1212 let source1 = MemorySource::new(vec![create_test_batch(0, 50)])
1213 .ok()
1214 .unwrap_or_else(|| panic!("source1"));
1215 let source2 = MemorySource::new(vec![create_test_batch(50, 50)])
1216 .ok()
1217 .unwrap_or_else(|| panic!("source2"));
1218
1219 let chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1220 .ok()
1221 .unwrap_or_else(|| panic!("chained"));
1222
1223 assert_eq!(chained.size_hint(), Some(100));
1225 }
1226
1227 #[test]
1228 fn test_streaming_dataset_buffer_size_one() {
1229 let batches = vec![
1230 create_test_batch(0, 25),
1231 create_test_batch(25, 25),
1232 create_test_batch(50, 25),
1233 create_test_batch(75, 25),
1234 ];
1235 let source = MemorySource::new(batches)
1236 .ok()
1237 .unwrap_or_else(|| panic!("source"));
1238
1239 let dataset = StreamingDataset::new(Box::new(source), 1);
1241
1242 let collected: Vec<RecordBatch> = dataset.collect();
1243 assert_eq!(collected.len(), 4);
1244
1245 let total: usize = collected.iter().map(|b| b.num_rows()).sum();
1246 assert_eq!(total, 100);
1247 }
1248
1249 #[test]
1250 fn test_parquet_source_invalid_path() {
1251 let result = ParquetSource::new("/nonexistent/path/to/file.parquet", 100);
1252 assert!(result.is_err());
1253 }
1254
1255 #[test]
1256 fn test_memory_source_schema_consistency() {
1257 let batches = vec![create_test_batch(0, 50), create_test_batch(50, 50)];
1258 let source = MemorySource::new(batches)
1259 .ok()
1260 .unwrap_or_else(|| panic!("source"));
1261
1262 let schema = source.schema();
1263 assert_eq!(schema.fields().len(), 2); }
1265
1266 #[test]
1269 fn test_parquet_source_next_batch_error_handling() {
1270 let batch = create_test_batch(0, 10);
1272 let dataset = crate::ArrowDataset::from_batch(batch)
1273 .ok()
1274 .unwrap_or_else(|| panic!("dataset"));
1275
1276 let temp_dir = tempfile::tempdir()
1277 .ok()
1278 .unwrap_or_else(|| panic!("temp dir"));
1279 let path = temp_dir.path().join("next_batch_test.parquet");
1280 dataset
1281 .to_parquet(&path)
1282 .ok()
1283 .unwrap_or_else(|| panic!("parquet"));
1284
1285 let mut source = ParquetSource::new(&path, 5)
1286 .ok()
1287 .unwrap_or_else(|| panic!("source"));
1288
1289 let mut batches = Vec::new();
1291 while let Ok(Some(batch)) = source.next_batch() {
1292 batches.push(batch);
1293 }
1294 assert!(!batches.is_empty());
1295
1296 let next = source.next_batch();
1298 assert!(next.is_ok());
1299 assert!(next.ok().flatten().is_none());
1300 }
1301
1302 #[test]
1303 fn test_memory_source_position_tracking() {
1304 let batches = vec![
1305 create_test_batch(0, 3),
1306 create_test_batch(3, 3),
1307 create_test_batch(6, 4),
1308 ];
1309 let mut source = MemorySource::new(batches)
1310 .ok()
1311 .unwrap_or_else(|| panic!("source"));
1312
1313 let b1 = source.next_batch().ok().flatten();
1315 assert!(b1.is_some());
1316 assert_eq!(b1.as_ref().map(|b| b.num_rows()), Some(3));
1317
1318 let b2 = source.next_batch().ok().flatten();
1320 assert!(b2.is_some());
1321
1322 let b3 = source.next_batch().ok().flatten();
1324 assert!(b3.is_some());
1325
1326 let b4 = source.next_batch().ok().flatten();
1328 assert!(b4.is_none());
1329 }
1330
1331 #[test]
1332 fn test_chained_source_transitions_between_sources() {
1333 let source1 = MemorySource::new(vec![create_test_batch(0, 2), create_test_batch(2, 2)])
1334 .ok()
1335 .unwrap_or_else(|| panic!("source1"));
1336
1337 let source2 = MemorySource::new(vec![create_test_batch(4, 3)])
1338 .ok()
1339 .unwrap_or_else(|| panic!("source2"));
1340
1341 let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1342 .ok()
1343 .unwrap_or_else(|| panic!("chained"));
1344
1345 let mut batches = Vec::new();
1346 while let Ok(Some(batch)) = chained.next_batch() {
1347 batches.push(batch.num_rows());
1348 }
1349
1350 assert_eq!(batches.len(), 3);
1352 assert_eq!(batches, vec![2, 2, 3]);
1353 }
1354
1355 #[test]
1356 fn test_streaming_dataset_exhaustion() {
1357 let batches = vec![create_test_batch(0, 5)];
1358 let source = MemorySource::new(batches)
1359 .ok()
1360 .unwrap_or_else(|| panic!("source"));
1361
1362 let mut dataset = StreamingDataset::new(Box::new(source), 2);
1363
1364 let _: Vec<_> = dataset.by_ref().collect();
1366
1367 assert!(dataset.next().is_none());
1369 }
1370
1371 #[test]
1372 fn test_streaming_dataset_schema_preserved() {
1373 let batches = vec![create_test_batch(0, 10)];
1374 let source = MemorySource::new(batches)
1375 .ok()
1376 .unwrap_or_else(|| panic!("source"));
1377
1378 let dataset = StreamingDataset::new(Box::new(source), 4);
1379 let schema = dataset.schema();
1380
1381 assert_eq!(schema.fields().len(), 2);
1382 assert_eq!(schema.field(0).name(), "id");
1383 assert_eq!(schema.field(1).name(), "name");
1384 }
1385
1386 #[test]
1387 fn test_chained_source_reset_restores_all() {
1388 let source1 = MemorySource::new(vec![create_test_batch(0, 10)])
1389 .ok()
1390 .unwrap_or_else(|| panic!("source1"));
1391 let source2 = MemorySource::new(vec![create_test_batch(10, 10)])
1392 .ok()
1393 .unwrap_or_else(|| panic!("source2"));
1394
1395 let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1396 .ok()
1397 .unwrap_or_else(|| panic!("chained"));
1398
1399 let count1: usize = std::iter::from_fn(|| chained.next_batch().ok().flatten())
1401 .map(|b| b.num_rows())
1402 .sum();
1403 assert_eq!(count1, 20);
1404
1405 chained.reset().ok().unwrap_or_else(|| panic!("reset"));
1407
1408 let count2: usize = std::iter::from_fn(|| chained.next_batch().ok().flatten())
1410 .map(|b| b.num_rows())
1411 .sum();
1412 assert_eq!(count2, 20);
1413 }
1414
1415 #[test]
1416 fn test_data_source_default_size_hint() {
1417 struct NoHintSource {
1419 schema: SchemaRef,
1420 }
1421
1422 impl DataSource for NoHintSource {
1423 fn schema(&self) -> SchemaRef {
1424 Arc::clone(&self.schema)
1425 }
1426
1427 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1428 Ok(None)
1429 }
1430 }
1432
1433 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1434 let source = NoHintSource { schema };
1435 assert!(source.size_hint().is_none());
1436 }
1437
1438 #[test]
1439 fn test_streaming_dataset_large_buffer() {
1440 let batches = vec![create_test_batch(0, 10), create_test_batch(10, 10)];
1441 let source = MemorySource::new(batches)
1442 .ok()
1443 .unwrap_or_else(|| panic!("source"));
1444
1445 let dataset = StreamingDataset::new(Box::new(source), 100);
1447 let collected: Vec<RecordBatch> = dataset.collect();
1448 assert_eq!(collected.len(), 2);
1449 }
1450
1451 #[test]
1452 fn test_parquet_source_small_batch_size() {
1453 let batch = create_test_batch(0, 100);
1454 let dataset = crate::ArrowDataset::from_batch(batch)
1455 .ok()
1456 .unwrap_or_else(|| panic!("dataset"));
1457
1458 let temp_dir = tempfile::tempdir()
1459 .ok()
1460 .unwrap_or_else(|| panic!("temp dir"));
1461 let path = temp_dir.path().join("small_batch.parquet");
1462 dataset
1463 .to_parquet(&path)
1464 .ok()
1465 .unwrap_or_else(|| panic!("parquet"));
1466
1467 let source = ParquetSource::new(&path, 1)
1469 .ok()
1470 .unwrap_or_else(|| panic!("source"));
1471
1472 let dataset = StreamingDataset::new(Box::new(source), 10);
1473 let total: usize = dataset.map(|b| b.num_rows()).sum();
1474 assert_eq!(total, 100);
1475 }
1476
1477 #[test]
1478 fn test_chained_source_first_source_empty() {
1479 struct EmptySource {
1480 schema: SchemaRef,
1481 }
1482
1483 impl DataSource for EmptySource {
1484 fn schema(&self) -> SchemaRef {
1485 Arc::clone(&self.schema)
1486 }
1487 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1488 Ok(None)
1489 }
1490 fn reset(&mut self) -> Result<()> {
1491 Ok(())
1492 }
1493 }
1494
1495 let schema = Arc::new(Schema::new(vec![
1496 Field::new("id", DataType::Int32, false),
1497 Field::new("name", DataType::Utf8, false),
1498 ]));
1499
1500 let empty = EmptySource { schema };
1501 let memory = MemorySource::new(vec![create_test_batch(0, 5)])
1502 .ok()
1503 .unwrap_or_else(|| panic!("memory"));
1504
1505 let mut chained = ChainedSource::new(vec![Box::new(empty), Box::new(memory)])
1506 .ok()
1507 .unwrap_or_else(|| panic!("chained"));
1508
1509 let batch = chained.next_batch().ok().flatten();
1511 assert!(batch.is_some());
1512 assert_eq!(batch.map(|b| b.num_rows()), Some(5));
1513 }
1514
1515 #[test]
1516 fn test_streaming_dataset_new_initializes_correctly() {
1517 let batches = vec![create_test_batch(0, 50)];
1518 let source = MemorySource::new(batches)
1519 .ok()
1520 .unwrap_or_else(|| panic!("source"));
1521
1522 let dataset = StreamingDataset::new(Box::new(source), 4);
1523
1524 assert_eq!(dataset.size_hint(), Some(50));
1526 assert_eq!(dataset.schema().fields().len(), 2);
1527 }
1528
1529 #[test]
1530 fn test_memory_source_single_batch() {
1531 let batches = vec![create_test_batch(0, 100)];
1532 let mut source = MemorySource::new(batches)
1533 .ok()
1534 .unwrap_or_else(|| panic!("source"));
1535
1536 assert_eq!(source.size_hint(), Some(100));
1537
1538 let batch = source.next_batch().ok().flatten();
1539 assert!(batch.is_some());
1540 assert_eq!(batch.map(|b| b.num_rows()), Some(100));
1541
1542 assert!(source.next_batch().ok().flatten().is_none());
1544 }
1545
1546 #[test]
1547 fn test_chained_source_all_sources_empty() {
1548 struct EmptySource {
1549 schema: SchemaRef,
1550 }
1551
1552 impl DataSource for EmptySource {
1553 fn schema(&self) -> SchemaRef {
1554 Arc::clone(&self.schema)
1555 }
1556 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1557 Ok(None)
1558 }
1559 }
1560
1561 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1562
1563 let empty1 = EmptySource {
1564 schema: Arc::clone(&schema),
1565 };
1566 let empty2 = EmptySource { schema };
1567
1568 let mut chained = ChainedSource::new(vec![Box::new(empty1), Box::new(empty2)])
1569 .ok()
1570 .unwrap_or_else(|| panic!("chained"));
1571
1572 assert!(chained.next_batch().ok().flatten().is_none());
1574 }
1575
1576 #[test]
1577 fn test_streaming_dataset_prefetch_larger_than_available() {
1578 let batches = vec![create_test_batch(0, 10)];
1579 let source = MemorySource::new(batches)
1580 .ok()
1581 .unwrap_or_else(|| panic!("source"));
1582
1583 let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(10);
1585 let collected: Vec<RecordBatch> = dataset.collect();
1586
1587 assert_eq!(collected.len(), 1);
1589 }
1590
1591 #[test]
1592 fn test_parquet_source_reset_and_reread() {
1593 let batch = create_test_batch(0, 50);
1594 let dataset = crate::ArrowDataset::from_batch(batch)
1595 .ok()
1596 .unwrap_or_else(|| panic!("dataset"));
1597
1598 let temp_dir = tempfile::tempdir()
1599 .ok()
1600 .unwrap_or_else(|| panic!("temp dir"));
1601 let path = temp_dir.path().join("reset_reread.parquet");
1602 dataset
1603 .to_parquet(&path)
1604 .ok()
1605 .unwrap_or_else(|| panic!("parquet"));
1606
1607 let mut source = ParquetSource::new(&path, 20)
1608 .ok()
1609 .unwrap_or_else(|| panic!("source"));
1610
1611 let first_pass: Vec<RecordBatch> =
1613 std::iter::from_fn(|| source.next_batch().ok().flatten()).collect();
1614 let first_count: usize = first_pass.iter().map(|b| b.num_rows()).sum();
1615
1616 source.reset().ok().unwrap_or_else(|| panic!("reset"));
1618
1619 let second_pass: Vec<RecordBatch> =
1621 std::iter::from_fn(|| source.next_batch().ok().flatten()).collect();
1622 let second_count: usize = second_pass.iter().map(|b| b.num_rows()).sum();
1623
1624 assert_eq!(first_count, second_count);
1625 assert_eq!(first_count, 50);
1626 }
1627
1628 #[test]
1629 fn test_chained_source_multiple_batches_per_source() {
1630 let source1 = MemorySource::new(vec![
1631 create_test_batch(0, 10),
1632 create_test_batch(10, 10),
1633 create_test_batch(20, 10),
1634 ])
1635 .ok()
1636 .unwrap_or_else(|| panic!("source1"));
1637
1638 let source2 = MemorySource::new(vec![create_test_batch(30, 15), create_test_batch(45, 15)])
1639 .ok()
1640 .unwrap_or_else(|| panic!("source2"));
1641
1642 let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1643 .ok()
1644 .unwrap_or_else(|| panic!("chained"));
1645
1646 let batches: Vec<usize> = std::iter::from_fn(|| chained.next_batch().ok().flatten())
1647 .map(|b| b.num_rows())
1648 .collect();
1649
1650 assert_eq!(batches, vec![10, 10, 10, 15, 15]);
1651 }
1652
1653 #[test]
1656 fn test_streaming_dataset_default_buffer() {
1657 let batches = vec![create_test_batch(0, 10)];
1658 let source = MemorySource::new(batches)
1659 .ok()
1660 .unwrap_or_else(|| panic!("source"));
1661
1662 let dataset = StreamingDataset::new(Box::new(source), 4);
1664 assert!(!dataset.exhausted);
1665 let collected: Vec<RecordBatch> = dataset.collect();
1666 assert_eq!(collected.len(), 1);
1667 }
1668
1669 #[test]
1670 fn test_memory_source_empty_error_message() {
1671 let result = MemorySource::new(vec![]);
1672 assert!(result.is_err());
1673 if let Err(e) = result {
1674 let msg = format!("{:?}", e);
1675 assert!(msg.contains("EmptyDataset") || msg.len() > 0);
1676 }
1677 }
1678
1679 #[test]
1680 fn test_chained_source_empty_error_message() {
1681 let result: std::result::Result<ChainedSource, Error> = ChainedSource::new(vec![]);
1682 assert!(result.is_err());
1683 if let Err(e) = result {
1684 let msg = format!("{:?}", e);
1685 assert!(msg.len() > 0);
1686 }
1687 }
1688
1689 #[test]
1690 fn test_streaming_dataset_collect_all() {
1691 let batches = vec![
1692 create_test_batch(0, 5),
1693 create_test_batch(5, 5),
1694 create_test_batch(10, 5),
1695 create_test_batch(15, 5),
1696 ];
1697 let source = MemorySource::new(batches)
1698 .ok()
1699 .unwrap_or_else(|| panic!("source"));
1700
1701 let dataset = StreamingDataset::new(Box::new(source), 2).prefetch(2);
1702 let collected: Vec<RecordBatch> = dataset.collect();
1703
1704 let total_rows: usize = collected.iter().map(|b| b.num_rows()).sum();
1705 assert_eq!(total_rows, 20);
1706 }
1707
1708 #[test]
1709 fn test_parquet_source_schema_matches() {
1710 let batch = create_test_batch(0, 10);
1711 let dataset = crate::ArrowDataset::from_batch(batch)
1712 .ok()
1713 .unwrap_or_else(|| panic!("dataset"));
1714
1715 let temp_dir = tempfile::tempdir()
1716 .ok()
1717 .unwrap_or_else(|| panic!("temp dir"));
1718 let path = temp_dir.path().join("schema_match.parquet");
1719 dataset
1720 .to_parquet(&path)
1721 .ok()
1722 .unwrap_or_else(|| panic!("parquet"));
1723
1724 let source = ParquetSource::new(&path, 5)
1725 .ok()
1726 .unwrap_or_else(|| panic!("source"));
1727
1728 assert_eq!(source.schema().fields().len(), 2);
1730 }
1731
1732 #[test]
1733 fn test_streaming_dataset_from_parquet_with_prefetch() {
1734 let batch = create_test_batch(0, 50);
1735 let dataset = crate::ArrowDataset::from_batch(batch)
1736 .ok()
1737 .unwrap_or_else(|| panic!("dataset"));
1738
1739 let temp_dir = tempfile::tempdir()
1740 .ok()
1741 .unwrap_or_else(|| panic!("temp dir"));
1742 let path = temp_dir.path().join("prefetch_test.parquet");
1743 dataset
1744 .to_parquet(&path)
1745 .ok()
1746 .unwrap_or_else(|| panic!("parquet"));
1747
1748 let streaming = StreamingDataset::from_parquet(&path, 10)
1749 .ok()
1750 .unwrap_or_else(|| panic!("streaming"))
1751 .prefetch(3);
1752
1753 let total: usize = streaming.map(|b| b.num_rows()).sum();
1754 assert_eq!(total, 50);
1755 }
1756
1757 #[test]
1758 fn test_chained_source_with_different_batch_counts() {
1759 let source1 = MemorySource::new(vec![create_test_batch(0, 100)])
1761 .ok()
1762 .unwrap_or_else(|| panic!("source1"));
1763 let source2 = MemorySource::new(vec![
1764 create_test_batch(100, 10),
1765 create_test_batch(110, 10),
1766 create_test_batch(120, 10),
1767 ])
1768 .ok()
1769 .unwrap_or_else(|| panic!("source2"));
1770
1771 let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1772 .ok()
1773 .unwrap_or_else(|| panic!("chained"));
1774
1775 let mut total = 0;
1776 while let Ok(Some(batch)) = chained.next_batch() {
1777 total += batch.num_rows();
1778 }
1779 assert_eq!(total, 130);
1780 }
1781
1782 #[test]
1783 fn test_streaming_dataset_next_after_exhaustion() {
1784 let batches = vec![create_test_batch(0, 5)];
1785 let source = MemorySource::new(batches)
1786 .ok()
1787 .unwrap_or_else(|| panic!("source"));
1788
1789 let mut dataset = StreamingDataset::new(Box::new(source), 1);
1790
1791 while dataset.next().is_some() {}
1793
1794 assert!(dataset.next().is_none());
1796 assert!(dataset.next().is_none());
1797 }
1798
1799 #[test]
1800 fn test_memory_source_size_hint_calculation() {
1801 let batches = vec![
1802 create_test_batch(0, 7),
1803 create_test_batch(7, 13),
1804 create_test_batch(20, 3),
1805 ];
1806 let source = MemorySource::new(batches)
1807 .ok()
1808 .unwrap_or_else(|| panic!("source"));
1809
1810 assert_eq!(source.size_hint(), Some(23));
1812 }
1813
1814 #[test]
1815 fn test_chained_source_partial_size_hint() {
1816 struct UnknownSource {
1818 schema: SchemaRef,
1819 batches: Vec<RecordBatch>,
1820 pos: usize,
1821 }
1822
1823 impl DataSource for UnknownSource {
1824 fn schema(&self) -> SchemaRef {
1825 Arc::clone(&self.schema)
1826 }
1827
1828 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1829 if self.pos < self.batches.len() {
1830 let b = self.batches[self.pos].clone();
1831 self.pos += 1;
1832 Ok(Some(b))
1833 } else {
1834 Ok(None)
1835 }
1836 }
1837
1838 }
1840
1841 let memory = MemorySource::new(vec![create_test_batch(0, 10)])
1842 .ok()
1843 .unwrap_or_else(|| panic!("memory"));
1844
1845 let unknown = UnknownSource {
1846 schema: create_test_batch(0, 1).schema(),
1847 batches: vec![create_test_batch(10, 5)],
1848 pos: 0,
1849 };
1850
1851 let chained = ChainedSource::new(vec![Box::new(memory), Box::new(unknown)])
1852 .ok()
1853 .unwrap_or_else(|| panic!("chained"));
1854
1855 assert!(chained.size_hint().is_none());
1857 }
1858
1859 #[test]
1860 fn test_streaming_dataset_buffer_boundary() {
1861 let batches: Vec<RecordBatch> = (0..4).map(|i| create_test_batch(i * 10, 10)).collect();
1863
1864 let source = MemorySource::new(batches)
1865 .ok()
1866 .unwrap_or_else(|| panic!("source"));
1867
1868 let dataset = StreamingDataset::new(Box::new(source), 4);
1870 let collected: Vec<RecordBatch> = dataset.collect();
1871
1872 assert_eq!(collected.len(), 4);
1873 }
1874
1875 #[test]
1876 fn test_parquet_source_read_all_batches() {
1877 let batch = create_test_batch(0, 1000);
1878 let dataset = crate::ArrowDataset::from_batch(batch)
1879 .ok()
1880 .unwrap_or_else(|| panic!("dataset"));
1881
1882 let temp_dir = tempfile::tempdir()
1883 .ok()
1884 .unwrap_or_else(|| panic!("temp dir"));
1885 let path = temp_dir.path().join("all_batches.parquet");
1886 dataset
1887 .to_parquet(&path)
1888 .ok()
1889 .unwrap_or_else(|| panic!("parquet"));
1890
1891 let mut source = ParquetSource::new(&path, 100)
1893 .ok()
1894 .unwrap_or_else(|| panic!("source"));
1895
1896 let mut batch_count = 0;
1897 let mut total_rows = 0;
1898 while let Ok(Some(batch)) = source.next_batch() {
1899 batch_count += 1;
1900 total_rows += batch.num_rows();
1901 }
1902
1903 assert!(batch_count >= 1);
1904 assert_eq!(total_rows, 1000);
1905 }
1906}