Skip to main content

alimentar/
streaming.rs

1//! Streaming dataset for lazy/chunked data loading.
2//!
3//! Provides [`StreamingDataset`] for working with datasets that are too large
4//! to fit in memory, or when you want to start processing before the full
5//! dataset is loaded.
6
7use std::{collections::VecDeque, path::Path, sync::Arc};
8
9use arrow::{array::RecordBatch, datatypes::SchemaRef};
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11
12use crate::error::{Error, Result};
13
14/// A data source that can produce RecordBatches on demand.
15pub trait DataSource: Send {
16    /// Returns the schema of the data.
17    fn schema(&self) -> SchemaRef;
18
19    /// Returns the next batch of data, or None if exhausted.
20    ///
21    /// # Errors
22    ///
23    /// Returns an error if reading the next batch fails.
24    fn next_batch(&mut self) -> Result<Option<RecordBatch>>;
25
26    /// Returns an estimate of total rows, if known.
27    fn size_hint(&self) -> Option<usize> {
28        None
29    }
30
31    /// Resets the source to the beginning, if supported.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if this data source does not support reset.
36    fn reset(&mut self) -> Result<()> {
37        Err(Error::storage("This data source does not support reset"))
38    }
39}
40
41/// A streaming dataset that loads data lazily in chunks.
42///
43/// Unlike [`ArrowDataset`](crate::ArrowDataset) which loads all data into
44/// memory, `StreamingDataset` fetches data on-demand, making it suitable for:
45/// - Large datasets that don't fit in memory
46/// - Network-based data sources where you want to start processing early
47/// - Infinite or very large data streams
48///
49/// # Example
50///
51/// ```no_run
52/// use alimentar::streaming::StreamingDataset;
53///
54/// let dataset = StreamingDataset::from_parquet("large_data.parquet", 1024).unwrap();
55///
56/// for batch in dataset {
57///     println!("Processing batch with {} rows", batch.num_rows());
58/// }
59/// ```
60pub struct StreamingDataset {
61    source: Box<dyn DataSource>,
62    buffer: VecDeque<RecordBatch>,
63    buffer_size: usize,
64    prefetch: usize,
65    schema: SchemaRef,
66    exhausted: bool,
67}
68
69impl StreamingDataset {
70    /// Creates a new streaming dataset from a data source.
71    ///
72    /// # Arguments
73    ///
74    /// * `source` - The data source to stream from
75    /// * `buffer_size` - Maximum number of batches to buffer
76    ///
77    /// #[requires(buffer_size > 0)]
78    /// #[ensures(result.buffer_size >= 1)]
79    /// #[ensures(result.exhausted == false)]
80    /// #[invariant(self.buffer.len() <= self.buffer_size)]
81    pub fn new(source: Box<dyn DataSource>, buffer_size: usize) -> Self {
82        let schema = source.schema();
83        Self {
84            source,
85            buffer: VecDeque::with_capacity(buffer_size),
86            buffer_size: buffer_size.max(1),
87            prefetch: 1,
88            schema,
89            exhausted: false,
90        }
91    }
92
93    /// Creates a streaming dataset from a Parquet file.
94    ///
95    /// # Arguments
96    ///
97    /// * `path` - Path to the Parquet file
98    /// * `batch_size` - Number of rows per batch
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if the file cannot be opened or is invalid.
103    pub fn from_parquet(path: impl AsRef<Path>, batch_size: usize) -> Result<Self> {
104        let source = ParquetSource::new(path, batch_size)?;
105        Ok(Self::new(Box::new(source), 4))
106    }
107
108    /// Sets the number of batches to prefetch.
109    ///
110    /// Higher values reduce latency but increase memory usage.
111    #[must_use]
112    pub fn prefetch(mut self, count: usize) -> Self {
113        self.prefetch = count.max(1);
114        self
115    }
116
117    /// Returns the schema of the dataset.
118    pub fn schema(&self) -> SchemaRef {
119        Arc::clone(&self.schema)
120    }
121
122    /// Returns an estimate of total rows, if known.
123    pub fn size_hint(&self) -> Option<usize> {
124        self.source.size_hint()
125    }
126
127    /// Fills the buffer up to the prefetch limit.
128    fn fill_buffer(&mut self) -> Result<()> {
129        while !self.exhausted && self.buffer.len() < self.prefetch {
130            if let Some(batch) = self.source.next_batch()? {
131                self.buffer.push_back(batch);
132            } else {
133                self.exhausted = true;
134                break;
135            }
136        }
137        Ok(())
138    }
139
140    /// Resets the dataset to the beginning, if the source supports it.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the source does not support reset.
145    pub fn reset(&mut self) -> Result<()> {
146        self.source.reset()?;
147        self.buffer.clear();
148        self.exhausted = false;
149        Ok(())
150    }
151}
152
153impl Iterator for StreamingDataset {
154    type Item = RecordBatch;
155
156    fn next(&mut self) -> Option<Self::Item> {
157        // Try to fill buffer first
158        if let Err(_e) = self.fill_buffer() {
159            return None;
160        }
161
162        self.buffer.pop_front()
163    }
164}
165
166impl std::fmt::Debug for StreamingDataset {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        f.debug_struct("StreamingDataset")
169            .field("buffer_size", &self.buffer_size)
170            .field("prefetch", &self.prefetch)
171            .field("buffered", &self.buffer.len())
172            .field("exhausted", &self.exhausted)
173            .finish_non_exhaustive()
174    }
175}
176
177/// A data source that reads from a Parquet file.
178pub struct ParquetSource {
179    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
180    schema: SchemaRef,
181    path: std::path::PathBuf,
182    batch_size: usize,
183}
184
185impl ParquetSource {
186    /// Creates a new Parquet source.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if the file cannot be opened.
191    pub fn new(path: impl AsRef<Path>, batch_size: usize) -> Result<Self> {
192        let path = path.as_ref().to_path_buf();
193        let file = std::fs::File::open(&path).map_err(|e| Error::io(e, &path))?;
194
195        let builder = ParquetRecordBatchReaderBuilder::try_new(file)
196            .map_err(Error::Parquet)?
197            .with_batch_size(batch_size);
198
199        let schema = builder.schema().clone();
200        let reader = builder.build().map_err(Error::Parquet)?;
201
202        Ok(Self {
203            reader,
204            schema,
205            path,
206            batch_size,
207        })
208    }
209}
210
211impl DataSource for ParquetSource {
212    fn schema(&self) -> SchemaRef {
213        Arc::clone(&self.schema)
214    }
215
216    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
217        match self.reader.next() {
218            Some(Ok(batch)) => Ok(Some(batch)),
219            Some(Err(e)) => Err(Error::Arrow(e)),
220            None => Ok(None),
221        }
222    }
223
224    fn reset(&mut self) -> Result<()> {
225        // Re-open the file
226        let file = std::fs::File::open(&self.path).map_err(|e| Error::io(e, &self.path))?;
227
228        let builder = ParquetRecordBatchReaderBuilder::try_new(file)
229            .map_err(Error::Parquet)?
230            .with_batch_size(self.batch_size);
231
232        self.reader = builder.build().map_err(Error::Parquet)?;
233        Ok(())
234    }
235}
236
237impl std::fmt::Debug for ParquetSource {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        f.debug_struct("ParquetSource")
240            .field("path", &self.path)
241            .field("batch_size", &self.batch_size)
242            .finish_non_exhaustive()
243    }
244}
245
246/// A data source backed by in-memory RecordBatches.
247///
248/// Useful for testing or when you have data already in memory
249/// but want to use the streaming interface.
250#[derive(Debug)]
251pub struct MemorySource {
252    batches: Vec<RecordBatch>,
253    schema: SchemaRef,
254    position: usize,
255}
256
257impl MemorySource {
258    /// Creates a new memory source from a vector of batches.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the batches vector is empty.
263    pub fn new(batches: Vec<RecordBatch>) -> Result<Self> {
264        if batches.is_empty() {
265            return Err(Error::EmptyDataset);
266        }
267
268        let schema = batches[0].schema();
269        Ok(Self {
270            batches,
271            schema,
272            position: 0,
273        })
274    }
275}
276
277impl DataSource for MemorySource {
278    fn schema(&self) -> SchemaRef {
279        Arc::clone(&self.schema)
280    }
281
282    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
283        if self.position >= self.batches.len() {
284            return Ok(None);
285        }
286
287        let batch = self.batches[self.position].clone();
288        self.position += 1;
289        Ok(Some(batch))
290    }
291
292    fn size_hint(&self) -> Option<usize> {
293        Some(self.batches.iter().map(|b| b.num_rows()).sum())
294    }
295
296    fn reset(&mut self) -> Result<()> {
297        self.position = 0;
298        Ok(())
299    }
300}
301
302/// A data source that chains multiple sources together.
303pub struct ChainedSource {
304    sources: Vec<Box<dyn DataSource>>,
305    current: usize,
306    schema: SchemaRef,
307}
308
309impl ChainedSource {
310    /// Creates a new chained source.
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if the sources vector is empty.
315    pub fn new(sources: Vec<Box<dyn DataSource>>) -> Result<Self> {
316        if sources.is_empty() {
317            return Err(Error::invalid_config(
318                "ChainedSource requires at least one source",
319            ));
320        }
321
322        let schema = sources[0].schema();
323        Ok(Self {
324            sources,
325            current: 0,
326            schema,
327        })
328    }
329}
330
331impl DataSource for ChainedSource {
332    fn schema(&self) -> SchemaRef {
333        Arc::clone(&self.schema)
334    }
335
336    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
337        while self.current < self.sources.len() {
338            match self.sources[self.current].next_batch()? {
339                Some(batch) => return Ok(Some(batch)),
340                None => self.current += 1,
341            }
342        }
343        Ok(None)
344    }
345
346    fn size_hint(&self) -> Option<usize> {
347        let mut total = 0;
348        for source in &self.sources {
349            match source.size_hint() {
350                Some(hint) => total += hint,
351                None => return None,
352            }
353        }
354        Some(total)
355    }
356
357    fn reset(&mut self) -> Result<()> {
358        for source in &mut self.sources {
359            source.reset()?;
360        }
361        self.current = 0;
362        Ok(())
363    }
364}
365
366impl std::fmt::Debug for ChainedSource {
367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368        f.debug_struct("ChainedSource")
369            .field("num_sources", &self.sources.len())
370            .field("current", &self.current)
371            .finish_non_exhaustive()
372    }
373}
374
375#[cfg(test)]
376#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
377mod tests {
378    use arrow::{
379        array::{Int32Array, StringArray},
380        datatypes::{DataType, Field, Schema},
381    };
382
383    use super::*;
384
385    fn create_test_batch(start: i32, count: usize) -> RecordBatch {
386        let schema = Arc::new(Schema::new(vec![
387            Field::new("id", DataType::Int32, false),
388            Field::new("name", DataType::Utf8, false),
389        ]));
390
391        let ids: Vec<i32> = (start..start + count as i32).collect();
392        let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
393
394        RecordBatch::try_new(
395            schema,
396            vec![
397                Arc::new(Int32Array::from(ids)),
398                Arc::new(StringArray::from(names)),
399            ],
400        )
401        .ok()
402        .unwrap_or_else(|| panic!("Should create batch"))
403    }
404
405    #[test]
406    fn test_memory_source() {
407        let batches = vec![
408            create_test_batch(0, 5),
409            create_test_batch(5, 5),
410            create_test_batch(10, 5),
411        ];
412
413        let mut source = MemorySource::new(batches)
414            .ok()
415            .unwrap_or_else(|| panic!("Should create source"));
416
417        assert_eq!(source.size_hint(), Some(15));
418
419        let mut count = 0;
420        while let Ok(Some(batch)) = source.next_batch() {
421            count += batch.num_rows();
422        }
423        assert_eq!(count, 15);
424    }
425
426    #[test]
427    fn test_memory_source_reset() {
428        let batches = vec![create_test_batch(0, 5)];
429
430        let mut source = MemorySource::new(batches)
431            .ok()
432            .unwrap_or_else(|| panic!("Should create source"));
433
434        // Consume
435        let _ = source.next_batch();
436        assert!(source.next_batch().ok().flatten().is_none());
437
438        // Reset and consume again
439        source
440            .reset()
441            .ok()
442            .unwrap_or_else(|| panic!("Should reset"));
443        assert!(source.next_batch().ok().flatten().is_some());
444    }
445
446    #[test]
447    fn test_streaming_dataset_from_memory() {
448        let batches = vec![create_test_batch(0, 10), create_test_batch(10, 10)];
449
450        let source = MemorySource::new(batches)
451            .ok()
452            .unwrap_or_else(|| panic!("Should create source"));
453
454        let dataset = StreamingDataset::new(Box::new(source), 4);
455
456        let mut total = 0;
457        for batch in dataset {
458            total += batch.num_rows();
459        }
460        assert_eq!(total, 20);
461    }
462
463    #[test]
464    fn test_streaming_dataset_prefetch() {
465        let batches = vec![
466            create_test_batch(0, 5),
467            create_test_batch(5, 5),
468            create_test_batch(10, 5),
469        ];
470
471        let source = MemorySource::new(batches)
472            .ok()
473            .unwrap_or_else(|| panic!("Should create source"));
474
475        let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(2);
476
477        let collected: Vec<RecordBatch> = dataset.collect();
478        assert_eq!(collected.len(), 3);
479    }
480
481    #[test]
482    fn test_streaming_dataset_schema() {
483        let batches = vec![create_test_batch(0, 5)];
484
485        let source = MemorySource::new(batches)
486            .ok()
487            .unwrap_or_else(|| panic!("Should create source"));
488
489        let dataset = StreamingDataset::new(Box::new(source), 4);
490
491        assert_eq!(dataset.schema().fields().len(), 2);
492        assert_eq!(dataset.schema().field(0).name(), "id");
493    }
494
495    #[test]
496    fn test_streaming_dataset_reset() {
497        let batches = vec![create_test_batch(0, 5)];
498
499        let source = MemorySource::new(batches)
500            .ok()
501            .unwrap_or_else(|| panic!("Should create source"));
502
503        let mut dataset = StreamingDataset::new(Box::new(source), 4);
504
505        // Consume
506        let first: Vec<RecordBatch> = dataset.by_ref().collect();
507        assert_eq!(first.len(), 1);
508
509        // Reset
510        dataset
511            .reset()
512            .ok()
513            .unwrap_or_else(|| panic!("Should reset"));
514
515        // Consume again
516        let second: Vec<RecordBatch> = dataset.collect();
517        assert_eq!(second.len(), 1);
518    }
519
520    #[test]
521    fn test_chained_source() {
522        let source1 = MemorySource::new(vec![create_test_batch(0, 5)])
523            .ok()
524            .unwrap_or_else(|| panic!("Should create source"));
525        let source2 = MemorySource::new(vec![create_test_batch(5, 5)])
526            .ok()
527            .unwrap_or_else(|| panic!("Should create source"));
528
529        let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
530            .ok()
531            .unwrap_or_else(|| panic!("Should create chained"));
532
533        assert_eq!(chained.size_hint(), Some(10));
534
535        let mut total = 0;
536        while let Ok(Some(batch)) = chained.next_batch() {
537            total += batch.num_rows();
538        }
539        assert_eq!(total, 10);
540    }
541
542    #[test]
543    fn test_empty_memory_source_error() {
544        let result = MemorySource::new(vec![]);
545        assert!(result.is_err());
546    }
547
548    #[test]
549    fn test_empty_chained_source_error() {
550        let result = ChainedSource::new(vec![]);
551        assert!(result.is_err());
552    }
553
554    #[test]
555    fn test_parquet_source_roundtrip() {
556        // Create test data
557        let batch = create_test_batch(0, 100);
558        let dataset = crate::ArrowDataset::from_batch(batch)
559            .ok()
560            .unwrap_or_else(|| panic!("Should create dataset"));
561
562        // Write to temp file
563        let temp_dir = tempfile::tempdir()
564            .ok()
565            .unwrap_or_else(|| panic!("Should create temp dir"));
566        let path = temp_dir.path().join("test.parquet");
567        dataset
568            .to_parquet(&path)
569            .ok()
570            .unwrap_or_else(|| panic!("Should write parquet"));
571
572        // Read back via streaming
573        let streaming = StreamingDataset::from_parquet(&path, 25)
574            .ok()
575            .unwrap_or_else(|| panic!("Should create streaming"));
576
577        let total: usize = streaming.map(|b| b.num_rows()).sum();
578        assert_eq!(total, 100);
579    }
580
581    #[test]
582    fn test_streaming_dataset_debug() {
583        let batches = vec![create_test_batch(0, 5)];
584        let source = MemorySource::new(batches)
585            .ok()
586            .unwrap_or_else(|| panic!("Should create source"));
587        let dataset = StreamingDataset::new(Box::new(source), 4);
588
589        let debug_str = format!("{:?}", dataset);
590        assert!(debug_str.contains("StreamingDataset"));
591    }
592
593    #[test]
594    fn test_parquet_source_debug() {
595        // Create test data
596        let batch = create_test_batch(0, 10);
597        let dataset = crate::ArrowDataset::from_batch(batch)
598            .ok()
599            .unwrap_or_else(|| panic!("Should create dataset"));
600
601        let temp_dir = tempfile::tempdir()
602            .ok()
603            .unwrap_or_else(|| panic!("Should create temp dir"));
604        let path = temp_dir.path().join("debug_test.parquet");
605        dataset
606            .to_parquet(&path)
607            .ok()
608            .unwrap_or_else(|| panic!("Should write parquet"));
609
610        let source = ParquetSource::new(&path, 10)
611            .ok()
612            .unwrap_or_else(|| panic!("Should create source"));
613
614        let debug_str = format!("{:?}", source);
615        assert!(debug_str.contains("ParquetSource"));
616        assert!(debug_str.contains("debug_test.parquet"));
617    }
618
619    #[test]
620    fn test_chained_source_debug() {
621        let source1 = MemorySource::new(vec![create_test_batch(0, 5)])
622            .ok()
623            .unwrap_or_else(|| panic!("Should create source"));
624
625        let chained = ChainedSource::new(vec![Box::new(source1)])
626            .ok()
627            .unwrap_or_else(|| panic!("Should create chained"));
628
629        let debug_str = format!("{:?}", chained);
630        assert!(debug_str.contains("ChainedSource"));
631        assert!(debug_str.contains("num_sources"));
632    }
633
634    #[test]
635    fn test_streaming_dataset_size_hint() {
636        let batches = vec![create_test_batch(0, 10), create_test_batch(10, 15)];
637        let source = MemorySource::new(batches)
638            .ok()
639            .unwrap_or_else(|| panic!("Should create source"));
640
641        let dataset = StreamingDataset::new(Box::new(source), 4);
642        assert_eq!(dataset.size_hint(), Some(25));
643    }
644
645    #[test]
646    fn test_parquet_source_reset() {
647        // Create test data
648        let batch = create_test_batch(0, 50);
649        let dataset = crate::ArrowDataset::from_batch(batch)
650            .ok()
651            .unwrap_or_else(|| panic!("Should create dataset"));
652
653        let temp_dir = tempfile::tempdir()
654            .ok()
655            .unwrap_or_else(|| panic!("Should create temp dir"));
656        let path = temp_dir.path().join("reset_test.parquet");
657        dataset
658            .to_parquet(&path)
659            .ok()
660            .unwrap_or_else(|| panic!("Should write parquet"));
661
662        let mut source = ParquetSource::new(&path, 10)
663            .ok()
664            .unwrap_or_else(|| panic!("Should create source"));
665
666        // Read all batches
667        let mut count = 0;
668        while let Ok(Some(_)) = source.next_batch() {
669            count += 1;
670        }
671        assert!(count > 0);
672
673        // Reset and read again
674        source
675            .reset()
676            .ok()
677            .unwrap_or_else(|| panic!("Should reset"));
678
679        let mut count2 = 0;
680        while let Ok(Some(_)) = source.next_batch() {
681            count2 += 1;
682        }
683        assert_eq!(count, count2);
684    }
685
686    #[test]
687    fn test_chained_source_reset() {
688        let source1 = MemorySource::new(vec![create_test_batch(0, 5)])
689            .ok()
690            .unwrap_or_else(|| panic!("Should create source"));
691        let source2 = MemorySource::new(vec![create_test_batch(5, 5)])
692            .ok()
693            .unwrap_or_else(|| panic!("Should create source"));
694
695        let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
696            .ok()
697            .unwrap_or_else(|| panic!("Should create chained"));
698
699        // Exhaust
700        let mut count = 0;
701        while let Ok(Some(_)) = chained.next_batch() {
702            count += 1;
703        }
704        assert_eq!(count, 2);
705
706        // Reset
707        chained
708            .reset()
709            .ok()
710            .unwrap_or_else(|| panic!("Should reset"));
711
712        // Read again
713        let mut count2 = 0;
714        while let Ok(Some(_)) = chained.next_batch() {
715            count2 += 1;
716        }
717        assert_eq!(count2, 2);
718    }
719
720    #[test]
721    fn test_chained_source_size_hint_unknown() {
722        // Create a source that doesn't know its size
723        struct UnknownSizeSource {
724            schema: SchemaRef,
725            count: usize,
726        }
727
728        impl DataSource for UnknownSizeSource {
729            fn schema(&self) -> SchemaRef {
730                Arc::clone(&self.schema)
731            }
732
733            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
734                if self.count > 0 {
735                    self.count -= 1;
736                    Ok(Some(create_test_batch(0, 1)))
737                } else {
738                    Ok(None)
739                }
740            }
741
742            // Returns None, so chained source can't calculate total
743        }
744
745        let schema = Arc::new(Schema::new(vec![
746            Field::new("id", DataType::Int32, false),
747            Field::new("name", DataType::Utf8, false),
748        ]));
749
750        let memory_source = MemorySource::new(vec![create_test_batch(0, 5)])
751            .ok()
752            .unwrap_or_else(|| panic!("Should create source"));
753
754        let unknown_source = UnknownSizeSource { schema, count: 1 };
755
756        let chained = ChainedSource::new(vec![Box::new(memory_source), Box::new(unknown_source)])
757            .ok()
758            .unwrap_or_else(|| panic!("Should create chained"));
759
760        // One source has unknown size, so total is None
761        assert_eq!(chained.size_hint(), None);
762    }
763
764    #[test]
765    fn test_streaming_dataset_buffer_size_minimum() {
766        let batches = vec![create_test_batch(0, 5)];
767        let source = MemorySource::new(batches)
768            .ok()
769            .unwrap_or_else(|| panic!("Should create source"));
770
771        // Buffer size 0 should be treated as 1
772        let dataset = StreamingDataset::new(Box::new(source), 0);
773        let collected: Vec<RecordBatch> = dataset.collect();
774        assert_eq!(collected.len(), 1);
775    }
776
777    #[test]
778    fn test_streaming_dataset_prefetch_minimum() {
779        let batches = vec![create_test_batch(0, 5), create_test_batch(5, 5)];
780        let source = MemorySource::new(batches)
781            .ok()
782            .unwrap_or_else(|| panic!("Should create source"));
783
784        // Prefetch 0 should be treated as 1
785        let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(0);
786        let collected: Vec<RecordBatch> = dataset.collect();
787        assert_eq!(collected.len(), 2);
788    }
789
790    #[test]
791    fn test_memory_source_debug() {
792        let batches = vec![create_test_batch(0, 5)];
793        let source = MemorySource::new(batches)
794            .ok()
795            .unwrap_or_else(|| panic!("Should create source"));
796
797        let debug_str = format!("{:?}", source);
798        assert!(debug_str.contains("MemorySource"));
799    }
800
801    #[test]
802    fn test_parquet_source_schema() {
803        let batch = create_test_batch(0, 10);
804        let dataset = crate::ArrowDataset::from_batch(batch)
805            .ok()
806            .unwrap_or_else(|| panic!("Should create dataset"));
807
808        let temp_dir = tempfile::tempdir()
809            .ok()
810            .unwrap_or_else(|| panic!("Should create temp dir"));
811        let path = temp_dir.path().join("schema_test.parquet");
812        dataset
813            .to_parquet(&path)
814            .ok()
815            .unwrap_or_else(|| panic!("Should write parquet"));
816
817        let source = ParquetSource::new(&path, 10)
818            .ok()
819            .unwrap_or_else(|| panic!("Should create source"));
820
821        let schema = source.schema();
822        assert_eq!(schema.fields().len(), 2);
823        assert_eq!(schema.field(0).name(), "id");
824    }
825
826    #[test]
827    fn test_chained_source_schema() {
828        let source = MemorySource::new(vec![create_test_batch(0, 5)])
829            .ok()
830            .unwrap_or_else(|| panic!("Should create source"));
831
832        let chained = ChainedSource::new(vec![Box::new(source)])
833            .ok()
834            .unwrap_or_else(|| panic!("Should create chained"));
835
836        let schema = chained.schema();
837        assert_eq!(schema.fields().len(), 2);
838    }
839
840    #[test]
841    fn test_parquet_source_file_not_found() {
842        let result = ParquetSource::new("/nonexistent/path/to/file.parquet", 100);
843        assert!(result.is_err());
844    }
845
846    #[test]
847    fn test_streaming_dataset_from_parquet_not_found() {
848        let result = StreamingDataset::from_parquet("/nonexistent/file.parquet", 100);
849        assert!(result.is_err());
850    }
851
852    #[test]
853    fn test_data_source_default_reset_error() {
854        // Test that the default DataSource::reset returns error
855        struct NoResetSource {
856            schema: SchemaRef,
857        }
858
859        impl DataSource for NoResetSource {
860            fn schema(&self) -> SchemaRef {
861                Arc::clone(&self.schema)
862            }
863
864            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
865                Ok(None)
866            }
867            // Don't override reset() - uses default impl that returns error
868        }
869
870        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
871
872        let mut source = NoResetSource { schema };
873        let result = source.reset();
874        assert!(result.is_err());
875    }
876
877    #[test]
878    fn test_streaming_dataset_reset_unsupported() {
879        // Test that reset fails when source doesn't support it
880        struct NoResetSource {
881            schema: SchemaRef,
882            done: bool,
883        }
884
885        impl DataSource for NoResetSource {
886            fn schema(&self) -> SchemaRef {
887                Arc::clone(&self.schema)
888            }
889
890            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
891                if self.done {
892                    Ok(None)
893                } else {
894                    self.done = true;
895                    Ok(Some(create_test_batch(0, 5)))
896                }
897            }
898            // Uses default reset which fails
899        }
900
901        let schema = Arc::new(Schema::new(vec![
902            Field::new("id", DataType::Int32, false),
903            Field::new("name", DataType::Utf8, false),
904        ]));
905
906        let source = NoResetSource {
907            schema,
908            done: false,
909        };
910        let mut dataset = StreamingDataset::new(Box::new(source), 4);
911
912        // Consume
913        let _: Vec<_> = dataset.by_ref().collect();
914
915        // Reset should fail
916        let result = dataset.reset();
917        assert!(result.is_err());
918    }
919
920    #[test]
921    fn test_streaming_dataset_fill_buffer_error() {
922        // Test that iterator returns None on fill_buffer error
923        struct ErrorSource {
924            schema: SchemaRef,
925            error_on_call: usize,
926            call_count: usize,
927        }
928
929        impl DataSource for ErrorSource {
930            fn schema(&self) -> SchemaRef {
931                Arc::clone(&self.schema)
932            }
933
934            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
935                self.call_count += 1;
936                if self.call_count >= self.error_on_call {
937                    Err(crate::Error::storage("Simulated error"))
938                } else {
939                    Ok(Some(create_test_batch(0, 5)))
940                }
941            }
942        }
943
944        let schema = Arc::new(Schema::new(vec![
945            Field::new("id", DataType::Int32, false),
946            Field::new("name", DataType::Utf8, false),
947        ]));
948
949        let source = ErrorSource {
950            schema,
951            error_on_call: 3, // Error on 3rd call (after 2 successful batches)
952            call_count: 0,
953        };
954
955        // Use prefetch 1 so we get 1 batch at a time
956        let mut dataset = StreamingDataset::new(Box::new(source), 4).prefetch(1);
957
958        // First two calls should succeed
959        let first = dataset.next();
960        assert!(first.is_some());
961        let second = dataset.next();
962        assert!(second.is_some());
963
964        // Third call triggers error in fill_buffer, returns None
965        let third = dataset.next();
966        assert!(third.is_none());
967    }
968
969    #[test]
970    fn test_streaming_dataset_large_prefetch() {
971        let batches = vec![
972            create_test_batch(0, 10),
973            create_test_batch(10, 10),
974            create_test_batch(20, 10),
975            create_test_batch(30, 10),
976            create_test_batch(40, 10),
977        ];
978
979        let source = MemorySource::new(batches)
980            .ok()
981            .unwrap_or_else(|| panic!("Should create source"));
982
983        // Prefetch more than available batches
984        let dataset = StreamingDataset::new(Box::new(source), 10).prefetch(100);
985
986        let collected: Vec<RecordBatch> = dataset.collect();
987        assert_eq!(collected.len(), 5);
988    }
989
990    #[test]
991    fn test_memory_source_multiple_iterations() {
992        let batches = vec![create_test_batch(0, 5), create_test_batch(5, 5)];
993
994        let mut source = MemorySource::new(batches)
995            .ok()
996            .unwrap_or_else(|| panic!("Should create source"));
997
998        // First iteration
999        let mut count1 = 0;
1000        while let Ok(Some(_)) = source.next_batch() {
1001            count1 += 1;
1002        }
1003        assert_eq!(count1, 2);
1004
1005        // Reset and iterate again
1006        source
1007            .reset()
1008            .ok()
1009            .unwrap_or_else(|| panic!("Should reset"));
1010
1011        let mut count2 = 0;
1012        while let Ok(Some(_)) = source.next_batch() {
1013            count2 += 1;
1014        }
1015        assert_eq!(count2, 2);
1016    }
1017
1018    #[test]
1019    fn test_chained_source_exhaustion() {
1020        let source1 = MemorySource::new(vec![create_test_batch(0, 3)])
1021            .ok()
1022            .unwrap_or_else(|| panic!("Should create source"));
1023        let source2 = MemorySource::new(vec![create_test_batch(3, 2)])
1024            .ok()
1025            .unwrap_or_else(|| panic!("Should create source"));
1026        let source3 = MemorySource::new(vec![create_test_batch(5, 1)])
1027            .ok()
1028            .unwrap_or_else(|| panic!("Should create source"));
1029
1030        let mut chained = ChainedSource::new(vec![
1031            Box::new(source1),
1032            Box::new(source2),
1033            Box::new(source3),
1034        ])
1035        .ok()
1036        .unwrap_or_else(|| panic!("Should create chained"));
1037
1038        let mut batches = Vec::new();
1039        while let Ok(Some(batch)) = chained.next_batch() {
1040            batches.push(batch);
1041        }
1042
1043        assert_eq!(batches.len(), 3);
1044        assert_eq!(batches[0].num_rows(), 3);
1045        assert_eq!(batches[1].num_rows(), 2);
1046        assert_eq!(batches[2].num_rows(), 1);
1047    }
1048
1049    #[test]
1050    fn test_streaming_dataset_empty_iteration() {
1051        struct EmptySource {
1052            schema: SchemaRef,
1053        }
1054
1055        impl DataSource for EmptySource {
1056            fn schema(&self) -> SchemaRef {
1057                Arc::clone(&self.schema)
1058            }
1059
1060            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1061                Ok(None)
1062            }
1063        }
1064
1065        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1066
1067        let source = EmptySource { schema };
1068        let dataset = StreamingDataset::new(Box::new(source), 4);
1069
1070        let collected: Vec<RecordBatch> = dataset.collect();
1071        assert!(collected.is_empty());
1072    }
1073
1074    #[test]
1075    fn test_streaming_dataset_single_batch() {
1076        let batches = vec![create_test_batch(0, 100)];
1077
1078        let source = MemorySource::new(batches)
1079            .ok()
1080            .unwrap_or_else(|| panic!("Should create source"));
1081
1082        let dataset = StreamingDataset::new(Box::new(source), 1);
1083
1084        let collected: Vec<RecordBatch> = dataset.collect();
1085        assert_eq!(collected.len(), 1);
1086        assert_eq!(collected[0].num_rows(), 100);
1087    }
1088
1089    #[test]
1090    fn test_parquet_source_batch_size_variation() {
1091        // Create test data
1092        let batch = create_test_batch(0, 100);
1093        let dataset = crate::ArrowDataset::from_batch(batch)
1094            .ok()
1095            .unwrap_or_else(|| panic!("Should create dataset"));
1096
1097        let temp_dir = tempfile::tempdir()
1098            .ok()
1099            .unwrap_or_else(|| panic!("Should create temp dir"));
1100        let path = temp_dir.path().join("batch_size_test.parquet");
1101        dataset
1102            .to_parquet(&path)
1103            .ok()
1104            .unwrap_or_else(|| panic!("Should write parquet"));
1105
1106        // Test with different batch sizes
1107        for batch_size in [1, 10, 50, 100, 200] {
1108            let source = ParquetSource::new(&path, batch_size)
1109                .ok()
1110                .unwrap_or_else(|| panic!("Should create source"));
1111
1112            let streaming = StreamingDataset::new(Box::new(source), 4);
1113            let total: usize = streaming.map(|b| b.num_rows()).sum();
1114            assert_eq!(
1115                total, 100,
1116                "Batch size {} should read all 100 rows",
1117                batch_size
1118            );
1119        }
1120    }
1121
1122    #[test]
1123    fn test_chained_source_single_source() {
1124        let batches = vec![create_test_batch(0, 50)];
1125        let source = MemorySource::new(batches)
1126            .ok()
1127            .unwrap_or_else(|| panic!("source"));
1128
1129        let chained = ChainedSource::new(vec![Box::new(source)])
1130            .ok()
1131            .unwrap_or_else(|| panic!("chained"));
1132        let dataset = StreamingDataset::new(Box::new(chained), 4);
1133
1134        let collected: Vec<RecordBatch> = dataset.collect();
1135        assert_eq!(collected.len(), 1);
1136        assert_eq!(collected[0].num_rows(), 50);
1137    }
1138
1139    #[test]
1140    fn test_chained_source_multiple_sources() {
1141        let source1 = MemorySource::new(vec![create_test_batch(0, 30)])
1142            .ok()
1143            .unwrap_or_else(|| panic!("source1"));
1144        let source2 = MemorySource::new(vec![create_test_batch(30, 20)])
1145            .ok()
1146            .unwrap_or_else(|| panic!("source2"));
1147
1148        let chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1149            .ok()
1150            .unwrap_or_else(|| panic!("chained"));
1151        let dataset = StreamingDataset::new(Box::new(chained), 4);
1152
1153        let collected: Vec<RecordBatch> = dataset.collect();
1154        assert_eq!(collected.len(), 2);
1155
1156        let total: usize = collected.iter().map(|b| b.num_rows()).sum();
1157        assert_eq!(total, 50);
1158    }
1159
1160    #[test]
1161    fn test_chained_source_empty_sources_vec() {
1162        // Empty sources vec should return an error
1163        let result: std::result::Result<ChainedSource, Error> = ChainedSource::new(vec![]);
1164        assert!(result.is_err());
1165    }
1166
1167    #[test]
1168    fn test_chained_source_with_empty_yielding_sources() {
1169        struct EmptyYieldSource {
1170            schema: SchemaRef,
1171        }
1172        impl DataSource for EmptyYieldSource {
1173            fn schema(&self) -> SchemaRef {
1174                Arc::clone(&self.schema)
1175            }
1176            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1177                Ok(None)
1178            }
1179        }
1180
1181        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1182
1183        let source1 = EmptyYieldSource {
1184            schema: Arc::clone(&schema),
1185        };
1186        let source2 = EmptyYieldSource { schema };
1187
1188        let chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1189            .ok()
1190            .unwrap_or_else(|| panic!("chained"));
1191        let dataset = StreamingDataset::new(Box::new(chained), 4);
1192
1193        let collected: Vec<RecordBatch> = dataset.collect();
1194        assert!(collected.is_empty());
1195    }
1196
1197    #[test]
1198    fn test_streaming_dataset_prefetch_config() {
1199        let batches = vec![create_test_batch(0, 100)];
1200        let source = MemorySource::new(batches)
1201            .ok()
1202            .unwrap_or_else(|| panic!("source"));
1203
1204        let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(2);
1205
1206        let collected: Vec<RecordBatch> = dataset.collect();
1207        assert_eq!(collected.len(), 1);
1208    }
1209
1210    #[test]
1211    fn test_chained_source_size_hint() {
1212        let source1 = MemorySource::new(vec![create_test_batch(0, 50)])
1213            .ok()
1214            .unwrap_or_else(|| panic!("source1"));
1215        let source2 = MemorySource::new(vec![create_test_batch(50, 50)])
1216            .ok()
1217            .unwrap_or_else(|| panic!("source2"));
1218
1219        let chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1220            .ok()
1221            .unwrap_or_else(|| panic!("chained"));
1222
1223        // Should sum size hints from all sources
1224        assert_eq!(chained.size_hint(), Some(100));
1225    }
1226
1227    #[test]
1228    fn test_streaming_dataset_buffer_size_one() {
1229        let batches = vec![
1230            create_test_batch(0, 25),
1231            create_test_batch(25, 25),
1232            create_test_batch(50, 25),
1233            create_test_batch(75, 25),
1234        ];
1235        let source = MemorySource::new(batches)
1236            .ok()
1237            .unwrap_or_else(|| panic!("source"));
1238
1239        // Minimal buffer
1240        let dataset = StreamingDataset::new(Box::new(source), 1);
1241
1242        let collected: Vec<RecordBatch> = dataset.collect();
1243        assert_eq!(collected.len(), 4);
1244
1245        let total: usize = collected.iter().map(|b| b.num_rows()).sum();
1246        assert_eq!(total, 100);
1247    }
1248
1249    #[test]
1250    fn test_parquet_source_invalid_path() {
1251        let result = ParquetSource::new("/nonexistent/path/to/file.parquet", 100);
1252        assert!(result.is_err());
1253    }
1254
1255    #[test]
1256    fn test_memory_source_schema_consistency() {
1257        let batches = vec![create_test_batch(0, 50), create_test_batch(50, 50)];
1258        let source = MemorySource::new(batches)
1259            .ok()
1260            .unwrap_or_else(|| panic!("source"));
1261
1262        let schema = source.schema();
1263        assert_eq!(schema.fields().len(), 2); // id and name columns
1264    }
1265
1266    // === Additional coverage tests for streaming module ===
1267
1268    #[test]
1269    fn test_parquet_source_next_batch_error_handling() {
1270        // Create a valid parquet file first
1271        let batch = create_test_batch(0, 10);
1272        let dataset = crate::ArrowDataset::from_batch(batch)
1273            .ok()
1274            .unwrap_or_else(|| panic!("dataset"));
1275
1276        let temp_dir = tempfile::tempdir()
1277            .ok()
1278            .unwrap_or_else(|| panic!("temp dir"));
1279        let path = temp_dir.path().join("next_batch_test.parquet");
1280        dataset
1281            .to_parquet(&path)
1282            .ok()
1283            .unwrap_or_else(|| panic!("parquet"));
1284
1285        let mut source = ParquetSource::new(&path, 5)
1286            .ok()
1287            .unwrap_or_else(|| panic!("source"));
1288
1289        // Read all batches
1290        let mut batches = Vec::new();
1291        while let Ok(Some(batch)) = source.next_batch() {
1292            batches.push(batch);
1293        }
1294        assert!(!batches.is_empty());
1295
1296        // After exhaustion, should return None
1297        let next = source.next_batch();
1298        assert!(next.is_ok());
1299        assert!(next.ok().flatten().is_none());
1300    }
1301
1302    #[test]
1303    fn test_memory_source_position_tracking() {
1304        let batches = vec![
1305            create_test_batch(0, 3),
1306            create_test_batch(3, 3),
1307            create_test_batch(6, 4),
1308        ];
1309        let mut source = MemorySource::new(batches)
1310            .ok()
1311            .unwrap_or_else(|| panic!("source"));
1312
1313        // Read first batch
1314        let b1 = source.next_batch().ok().flatten();
1315        assert!(b1.is_some());
1316        assert_eq!(b1.as_ref().map(|b| b.num_rows()), Some(3));
1317
1318        // Read second batch
1319        let b2 = source.next_batch().ok().flatten();
1320        assert!(b2.is_some());
1321
1322        // Read third batch
1323        let b3 = source.next_batch().ok().flatten();
1324        assert!(b3.is_some());
1325
1326        // No more batches
1327        let b4 = source.next_batch().ok().flatten();
1328        assert!(b4.is_none());
1329    }
1330
1331    #[test]
1332    fn test_chained_source_transitions_between_sources() {
1333        let source1 = MemorySource::new(vec![create_test_batch(0, 2), create_test_batch(2, 2)])
1334            .ok()
1335            .unwrap_or_else(|| panic!("source1"));
1336
1337        let source2 = MemorySource::new(vec![create_test_batch(4, 3)])
1338            .ok()
1339            .unwrap_or_else(|| panic!("source2"));
1340
1341        let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1342            .ok()
1343            .unwrap_or_else(|| panic!("chained"));
1344
1345        let mut batches = Vec::new();
1346        while let Ok(Some(batch)) = chained.next_batch() {
1347            batches.push(batch.num_rows());
1348        }
1349
1350        // Should get 3 batches total (2 from source1, 1 from source2)
1351        assert_eq!(batches.len(), 3);
1352        assert_eq!(batches, vec![2, 2, 3]);
1353    }
1354
1355    #[test]
1356    fn test_streaming_dataset_exhaustion() {
1357        let batches = vec![create_test_batch(0, 5)];
1358        let source = MemorySource::new(batches)
1359            .ok()
1360            .unwrap_or_else(|| panic!("source"));
1361
1362        let mut dataset = StreamingDataset::new(Box::new(source), 2);
1363
1364        // Consume all
1365        let _: Vec<_> = dataset.by_ref().collect();
1366
1367        // After exhaustion, next should return None
1368        assert!(dataset.next().is_none());
1369    }
1370
1371    #[test]
1372    fn test_streaming_dataset_schema_preserved() {
1373        let batches = vec![create_test_batch(0, 10)];
1374        let source = MemorySource::new(batches)
1375            .ok()
1376            .unwrap_or_else(|| panic!("source"));
1377
1378        let dataset = StreamingDataset::new(Box::new(source), 4);
1379        let schema = dataset.schema();
1380
1381        assert_eq!(schema.fields().len(), 2);
1382        assert_eq!(schema.field(0).name(), "id");
1383        assert_eq!(schema.field(1).name(), "name");
1384    }
1385
1386    #[test]
1387    fn test_chained_source_reset_restores_all() {
1388        let source1 = MemorySource::new(vec![create_test_batch(0, 10)])
1389            .ok()
1390            .unwrap_or_else(|| panic!("source1"));
1391        let source2 = MemorySource::new(vec![create_test_batch(10, 10)])
1392            .ok()
1393            .unwrap_or_else(|| panic!("source2"));
1394
1395        let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1396            .ok()
1397            .unwrap_or_else(|| panic!("chained"));
1398
1399        // First iteration
1400        let count1: usize = std::iter::from_fn(|| chained.next_batch().ok().flatten())
1401            .map(|b| b.num_rows())
1402            .sum();
1403        assert_eq!(count1, 20);
1404
1405        // Reset
1406        chained.reset().ok().unwrap_or_else(|| panic!("reset"));
1407
1408        // Second iteration
1409        let count2: usize = std::iter::from_fn(|| chained.next_batch().ok().flatten())
1410            .map(|b| b.num_rows())
1411            .sum();
1412        assert_eq!(count2, 20);
1413    }
1414
1415    #[test]
1416    fn test_data_source_default_size_hint() {
1417        // Test the default size_hint returns None
1418        struct NoHintSource {
1419            schema: SchemaRef,
1420        }
1421
1422        impl DataSource for NoHintSource {
1423            fn schema(&self) -> SchemaRef {
1424                Arc::clone(&self.schema)
1425            }
1426
1427            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1428                Ok(None)
1429            }
1430            // Uses default size_hint which returns None
1431        }
1432
1433        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1434        let source = NoHintSource { schema };
1435        assert!(source.size_hint().is_none());
1436    }
1437
1438    #[test]
1439    fn test_streaming_dataset_large_buffer() {
1440        let batches = vec![create_test_batch(0, 10), create_test_batch(10, 10)];
1441        let source = MemorySource::new(batches)
1442            .ok()
1443            .unwrap_or_else(|| panic!("source"));
1444
1445        // Buffer larger than number of batches
1446        let dataset = StreamingDataset::new(Box::new(source), 100);
1447        let collected: Vec<RecordBatch> = dataset.collect();
1448        assert_eq!(collected.len(), 2);
1449    }
1450
1451    #[test]
1452    fn test_parquet_source_small_batch_size() {
1453        let batch = create_test_batch(0, 100);
1454        let dataset = crate::ArrowDataset::from_batch(batch)
1455            .ok()
1456            .unwrap_or_else(|| panic!("dataset"));
1457
1458        let temp_dir = tempfile::tempdir()
1459            .ok()
1460            .unwrap_or_else(|| panic!("temp dir"));
1461        let path = temp_dir.path().join("small_batch.parquet");
1462        dataset
1463            .to_parquet(&path)
1464            .ok()
1465            .unwrap_or_else(|| panic!("parquet"));
1466
1467        // Very small batch size
1468        let source = ParquetSource::new(&path, 1)
1469            .ok()
1470            .unwrap_or_else(|| panic!("source"));
1471
1472        let dataset = StreamingDataset::new(Box::new(source), 10);
1473        let total: usize = dataset.map(|b| b.num_rows()).sum();
1474        assert_eq!(total, 100);
1475    }
1476
1477    #[test]
1478    fn test_chained_source_first_source_empty() {
1479        struct EmptySource {
1480            schema: SchemaRef,
1481        }
1482
1483        impl DataSource for EmptySource {
1484            fn schema(&self) -> SchemaRef {
1485                Arc::clone(&self.schema)
1486            }
1487            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1488                Ok(None)
1489            }
1490            fn reset(&mut self) -> Result<()> {
1491                Ok(())
1492            }
1493        }
1494
1495        let schema = Arc::new(Schema::new(vec![
1496            Field::new("id", DataType::Int32, false),
1497            Field::new("name", DataType::Utf8, false),
1498        ]));
1499
1500        let empty = EmptySource { schema };
1501        let memory = MemorySource::new(vec![create_test_batch(0, 5)])
1502            .ok()
1503            .unwrap_or_else(|| panic!("memory"));
1504
1505        let mut chained = ChainedSource::new(vec![Box::new(empty), Box::new(memory)])
1506            .ok()
1507            .unwrap_or_else(|| panic!("chained"));
1508
1509        // Should skip empty source and get batch from memory source
1510        let batch = chained.next_batch().ok().flatten();
1511        assert!(batch.is_some());
1512        assert_eq!(batch.map(|b| b.num_rows()), Some(5));
1513    }
1514
1515    #[test]
1516    fn test_streaming_dataset_new_initializes_correctly() {
1517        let batches = vec![create_test_batch(0, 50)];
1518        let source = MemorySource::new(batches)
1519            .ok()
1520            .unwrap_or_else(|| panic!("source"));
1521
1522        let dataset = StreamingDataset::new(Box::new(source), 4);
1523
1524        // Verify initial state
1525        assert_eq!(dataset.size_hint(), Some(50));
1526        assert_eq!(dataset.schema().fields().len(), 2);
1527    }
1528
1529    #[test]
1530    fn test_memory_source_single_batch() {
1531        let batches = vec![create_test_batch(0, 100)];
1532        let mut source = MemorySource::new(batches)
1533            .ok()
1534            .unwrap_or_else(|| panic!("source"));
1535
1536        assert_eq!(source.size_hint(), Some(100));
1537
1538        let batch = source.next_batch().ok().flatten();
1539        assert!(batch.is_some());
1540        assert_eq!(batch.map(|b| b.num_rows()), Some(100));
1541
1542        // No more batches
1543        assert!(source.next_batch().ok().flatten().is_none());
1544    }
1545
1546    #[test]
1547    fn test_chained_source_all_sources_empty() {
1548        struct EmptySource {
1549            schema: SchemaRef,
1550        }
1551
1552        impl DataSource for EmptySource {
1553            fn schema(&self) -> SchemaRef {
1554                Arc::clone(&self.schema)
1555            }
1556            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1557                Ok(None)
1558            }
1559        }
1560
1561        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1562
1563        let empty1 = EmptySource {
1564            schema: Arc::clone(&schema),
1565        };
1566        let empty2 = EmptySource { schema };
1567
1568        let mut chained = ChainedSource::new(vec![Box::new(empty1), Box::new(empty2)])
1569            .ok()
1570            .unwrap_or_else(|| panic!("chained"));
1571
1572        // Should return None when all sources are empty
1573        assert!(chained.next_batch().ok().flatten().is_none());
1574    }
1575
1576    #[test]
1577    fn test_streaming_dataset_prefetch_larger_than_available() {
1578        let batches = vec![create_test_batch(0, 10)];
1579        let source = MemorySource::new(batches)
1580            .ok()
1581            .unwrap_or_else(|| panic!("source"));
1582
1583        // Prefetch more batches than exist
1584        let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(10);
1585        let collected: Vec<RecordBatch> = dataset.collect();
1586
1587        // Should still work correctly
1588        assert_eq!(collected.len(), 1);
1589    }
1590
1591    #[test]
1592    fn test_parquet_source_reset_and_reread() {
1593        let batch = create_test_batch(0, 50);
1594        let dataset = crate::ArrowDataset::from_batch(batch)
1595            .ok()
1596            .unwrap_or_else(|| panic!("dataset"));
1597
1598        let temp_dir = tempfile::tempdir()
1599            .ok()
1600            .unwrap_or_else(|| panic!("temp dir"));
1601        let path = temp_dir.path().join("reset_reread.parquet");
1602        dataset
1603            .to_parquet(&path)
1604            .ok()
1605            .unwrap_or_else(|| panic!("parquet"));
1606
1607        let mut source = ParquetSource::new(&path, 20)
1608            .ok()
1609            .unwrap_or_else(|| panic!("source"));
1610
1611        // Read once
1612        let first_pass: Vec<RecordBatch> =
1613            std::iter::from_fn(|| source.next_batch().ok().flatten()).collect();
1614        let first_count: usize = first_pass.iter().map(|b| b.num_rows()).sum();
1615
1616        // Reset
1617        source.reset().ok().unwrap_or_else(|| panic!("reset"));
1618
1619        // Read again
1620        let second_pass: Vec<RecordBatch> =
1621            std::iter::from_fn(|| source.next_batch().ok().flatten()).collect();
1622        let second_count: usize = second_pass.iter().map(|b| b.num_rows()).sum();
1623
1624        assert_eq!(first_count, second_count);
1625        assert_eq!(first_count, 50);
1626    }
1627
1628    #[test]
1629    fn test_chained_source_multiple_batches_per_source() {
1630        let source1 = MemorySource::new(vec![
1631            create_test_batch(0, 10),
1632            create_test_batch(10, 10),
1633            create_test_batch(20, 10),
1634        ])
1635        .ok()
1636        .unwrap_or_else(|| panic!("source1"));
1637
1638        let source2 = MemorySource::new(vec![create_test_batch(30, 15), create_test_batch(45, 15)])
1639            .ok()
1640            .unwrap_or_else(|| panic!("source2"));
1641
1642        let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1643            .ok()
1644            .unwrap_or_else(|| panic!("chained"));
1645
1646        let batches: Vec<usize> = std::iter::from_fn(|| chained.next_batch().ok().flatten())
1647            .map(|b| b.num_rows())
1648            .collect();
1649
1650        assert_eq!(batches, vec![10, 10, 10, 15, 15]);
1651    }
1652
1653    // === Additional tests for streaming edge cases ===
1654
1655    #[test]
1656    fn test_streaming_dataset_default_buffer() {
1657        let batches = vec![create_test_batch(0, 10)];
1658        let source = MemorySource::new(batches)
1659            .ok()
1660            .unwrap_or_else(|| panic!("source"));
1661
1662        // Test default streaming behavior
1663        let dataset = StreamingDataset::new(Box::new(source), 4);
1664        assert!(!dataset.exhausted);
1665        let collected: Vec<RecordBatch> = dataset.collect();
1666        assert_eq!(collected.len(), 1);
1667    }
1668
1669    #[test]
1670    fn test_memory_source_empty_error_message() {
1671        let result = MemorySource::new(vec![]);
1672        assert!(result.is_err());
1673        if let Err(e) = result {
1674            let msg = format!("{:?}", e);
1675            assert!(msg.contains("EmptyDataset") || msg.len() > 0);
1676        }
1677    }
1678
1679    #[test]
1680    fn test_chained_source_empty_error_message() {
1681        let result: std::result::Result<ChainedSource, Error> = ChainedSource::new(vec![]);
1682        assert!(result.is_err());
1683        if let Err(e) = result {
1684            let msg = format!("{:?}", e);
1685            assert!(msg.len() > 0);
1686        }
1687    }
1688
1689    #[test]
1690    fn test_streaming_dataset_collect_all() {
1691        let batches = vec![
1692            create_test_batch(0, 5),
1693            create_test_batch(5, 5),
1694            create_test_batch(10, 5),
1695            create_test_batch(15, 5),
1696        ];
1697        let source = MemorySource::new(batches)
1698            .ok()
1699            .unwrap_or_else(|| panic!("source"));
1700
1701        let dataset = StreamingDataset::new(Box::new(source), 2).prefetch(2);
1702        let collected: Vec<RecordBatch> = dataset.collect();
1703
1704        let total_rows: usize = collected.iter().map(|b| b.num_rows()).sum();
1705        assert_eq!(total_rows, 20);
1706    }
1707
1708    #[test]
1709    fn test_parquet_source_schema_matches() {
1710        let batch = create_test_batch(0, 10);
1711        let dataset = crate::ArrowDataset::from_batch(batch)
1712            .ok()
1713            .unwrap_or_else(|| panic!("dataset"));
1714
1715        let temp_dir = tempfile::tempdir()
1716            .ok()
1717            .unwrap_or_else(|| panic!("temp dir"));
1718        let path = temp_dir.path().join("schema_match.parquet");
1719        dataset
1720            .to_parquet(&path)
1721            .ok()
1722            .unwrap_or_else(|| panic!("parquet"));
1723
1724        let source = ParquetSource::new(&path, 5)
1725            .ok()
1726            .unwrap_or_else(|| panic!("source"));
1727
1728        // Schema should have 2 fields
1729        assert_eq!(source.schema().fields().len(), 2);
1730    }
1731
1732    #[test]
1733    fn test_streaming_dataset_from_parquet_with_prefetch() {
1734        let batch = create_test_batch(0, 50);
1735        let dataset = crate::ArrowDataset::from_batch(batch)
1736            .ok()
1737            .unwrap_or_else(|| panic!("dataset"));
1738
1739        let temp_dir = tempfile::tempdir()
1740            .ok()
1741            .unwrap_or_else(|| panic!("temp dir"));
1742        let path = temp_dir.path().join("prefetch_test.parquet");
1743        dataset
1744            .to_parquet(&path)
1745            .ok()
1746            .unwrap_or_else(|| panic!("parquet"));
1747
1748        let streaming = StreamingDataset::from_parquet(&path, 10)
1749            .ok()
1750            .unwrap_or_else(|| panic!("streaming"))
1751            .prefetch(3);
1752
1753        let total: usize = streaming.map(|b| b.num_rows()).sum();
1754        assert_eq!(total, 50);
1755    }
1756
1757    #[test]
1758    fn test_chained_source_with_different_batch_counts() {
1759        // First source has 1 batch, second has 3
1760        let source1 = MemorySource::new(vec![create_test_batch(0, 100)])
1761            .ok()
1762            .unwrap_or_else(|| panic!("source1"));
1763        let source2 = MemorySource::new(vec![
1764            create_test_batch(100, 10),
1765            create_test_batch(110, 10),
1766            create_test_batch(120, 10),
1767        ])
1768        .ok()
1769        .unwrap_or_else(|| panic!("source2"));
1770
1771        let mut chained = ChainedSource::new(vec![Box::new(source1), Box::new(source2)])
1772            .ok()
1773            .unwrap_or_else(|| panic!("chained"));
1774
1775        let mut total = 0;
1776        while let Ok(Some(batch)) = chained.next_batch() {
1777            total += batch.num_rows();
1778        }
1779        assert_eq!(total, 130);
1780    }
1781
1782    #[test]
1783    fn test_streaming_dataset_next_after_exhaustion() {
1784        let batches = vec![create_test_batch(0, 5)];
1785        let source = MemorySource::new(batches)
1786            .ok()
1787            .unwrap_or_else(|| panic!("source"));
1788
1789        let mut dataset = StreamingDataset::new(Box::new(source), 1);
1790
1791        // Exhaust the dataset
1792        while dataset.next().is_some() {}
1793
1794        // Additional next calls should return None
1795        assert!(dataset.next().is_none());
1796        assert!(dataset.next().is_none());
1797    }
1798
1799    #[test]
1800    fn test_memory_source_size_hint_calculation() {
1801        let batches = vec![
1802            create_test_batch(0, 7),
1803            create_test_batch(7, 13),
1804            create_test_batch(20, 3),
1805        ];
1806        let source = MemorySource::new(batches)
1807            .ok()
1808            .unwrap_or_else(|| panic!("source"));
1809
1810        // 7 + 13 + 3 = 23
1811        assert_eq!(source.size_hint(), Some(23));
1812    }
1813
1814    #[test]
1815    fn test_chained_source_partial_size_hint() {
1816        // One source with known size, one without
1817        struct UnknownSource {
1818            schema: SchemaRef,
1819            batches: Vec<RecordBatch>,
1820            pos: usize,
1821        }
1822
1823        impl DataSource for UnknownSource {
1824            fn schema(&self) -> SchemaRef {
1825                Arc::clone(&self.schema)
1826            }
1827
1828            fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1829                if self.pos < self.batches.len() {
1830                    let b = self.batches[self.pos].clone();
1831                    self.pos += 1;
1832                    Ok(Some(b))
1833                } else {
1834                    Ok(None)
1835                }
1836            }
1837
1838            // No size_hint override - returns None
1839        }
1840
1841        let memory = MemorySource::new(vec![create_test_batch(0, 10)])
1842            .ok()
1843            .unwrap_or_else(|| panic!("memory"));
1844
1845        let unknown = UnknownSource {
1846            schema: create_test_batch(0, 1).schema(),
1847            batches: vec![create_test_batch(10, 5)],
1848            pos: 0,
1849        };
1850
1851        let chained = ChainedSource::new(vec![Box::new(memory), Box::new(unknown)])
1852            .ok()
1853            .unwrap_or_else(|| panic!("chained"));
1854
1855        // Size hint should be None since one source doesn't know
1856        assert!(chained.size_hint().is_none());
1857    }
1858
1859    #[test]
1860    fn test_streaming_dataset_buffer_boundary() {
1861        // Create exactly as many batches as buffer size
1862        let batches: Vec<RecordBatch> = (0..4).map(|i| create_test_batch(i * 10, 10)).collect();
1863
1864        let source = MemorySource::new(batches)
1865            .ok()
1866            .unwrap_or_else(|| panic!("source"));
1867
1868        // Buffer size equals batch count
1869        let dataset = StreamingDataset::new(Box::new(source), 4);
1870        let collected: Vec<RecordBatch> = dataset.collect();
1871
1872        assert_eq!(collected.len(), 4);
1873    }
1874
1875    #[test]
1876    fn test_parquet_source_read_all_batches() {
1877        let batch = create_test_batch(0, 1000);
1878        let dataset = crate::ArrowDataset::from_batch(batch)
1879            .ok()
1880            .unwrap_or_else(|| panic!("dataset"));
1881
1882        let temp_dir = tempfile::tempdir()
1883            .ok()
1884            .unwrap_or_else(|| panic!("temp dir"));
1885        let path = temp_dir.path().join("all_batches.parquet");
1886        dataset
1887            .to_parquet(&path)
1888            .ok()
1889            .unwrap_or_else(|| panic!("parquet"));
1890
1891        // Small batch size means many batches
1892        let mut source = ParquetSource::new(&path, 100)
1893            .ok()
1894            .unwrap_or_else(|| panic!("source"));
1895
1896        let mut batch_count = 0;
1897        let mut total_rows = 0;
1898        while let Ok(Some(batch)) = source.next_batch() {
1899            batch_count += 1;
1900            total_rows += batch.num_rows();
1901        }
1902
1903        assert!(batch_count >= 1);
1904        assert_eq!(total_rows, 1000);
1905    }
1906}