Skip to main content

alimentar/
dataset.rs

1//! Dataset types for alimentar.
2//!
3//! Provides the [`Dataset`] trait and [`ArrowDataset`] implementation
4//! for working with Arrow-based tabular data.
5
6use std::{path::Path, sync::Arc};
7
8use arrow::{array::RecordBatch, datatypes::SchemaRef};
9use parquet::{
10    arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
11    file::properties::WriterProperties,
12};
13
14use crate::{
15    error::{Error, Result},
16    transform::Transform,
17};
18
19/// A dataset that can be iterated over.
20///
21/// Datasets provide access to tabular data stored as Arrow RecordBatches.
22/// All implementations must be thread-safe (Send + Sync).
23pub trait Dataset: Send + Sync {
24    /// Returns the total number of rows in the dataset.
25    fn len(&self) -> usize;
26
27    /// Returns true if the dataset contains no rows.
28    fn is_empty(&self) -> bool {
29        self.len() == 0
30    }
31
32    /// Returns a single row as a RecordBatch with one row.
33    ///
34    /// Returns `None` if the index is out of bounds.
35    fn get(&self, index: usize) -> Option<RecordBatch>;
36
37    /// Returns the schema of the dataset.
38    fn schema(&self) -> SchemaRef;
39
40    /// Returns an iterator over all RecordBatches in the dataset.
41    fn iter(&self) -> Box<dyn Iterator<Item = RecordBatch> + Send + '_>;
42
43    /// Returns the number of batches in the dataset.
44    fn num_batches(&self) -> usize;
45
46    /// Returns a specific batch by index.
47    fn get_batch(&self, index: usize) -> Option<&RecordBatch>;
48}
49
50/// An in-memory dataset backed by Arrow RecordBatches.
51///
52/// This is the primary dataset type for alimentar. It stores data as a
53/// collection of RecordBatches and provides efficient access patterns
54/// for ML training loops.
55///
56/// # Example
57///
58/// ```no_run
59/// use alimentar::{ArrowDataset, Dataset};
60///
61/// // Load from parquet
62/// let dataset = ArrowDataset::from_parquet("data.parquet").unwrap();
63/// println!("Dataset has {} rows", dataset.len());
64/// ```
65#[derive(Debug, Clone)]
66pub struct ArrowDataset {
67    batches: Vec<RecordBatch>,
68    schema: SchemaRef,
69    row_count: usize,
70}
71
72impl ArrowDataset {
73    /// Creates a new ArrowDataset from a vector of RecordBatches.
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if:
78    /// - The batches vector is empty
79    /// - The batches have inconsistent schemas
80    pub fn new(batches: Vec<RecordBatch>) -> Result<Self> {
81        if batches.is_empty() {
82            return Err(Error::EmptyDataset);
83        }
84
85        let schema = batches[0].schema();
86
87        // Verify all batches have the same schema
88        for (i, batch) in batches.iter().enumerate().skip(1) {
89            if batch.schema() != schema {
90                return Err(Error::schema_mismatch(format!(
91                    "Batch {} has different schema than batch 0",
92                    i
93                )));
94            }
95        }
96
97        let row_count = batches.iter().map(|b| b.num_rows()).sum();
98
99        Ok(Self {
100            batches,
101            schema,
102            row_count,
103        })
104    }
105
106    /// Creates an ArrowDataset from a single RecordBatch.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the batch is empty.
111    pub fn from_batch(batch: RecordBatch) -> Result<Self> {
112        Self::new(vec![batch])
113    }
114
115    /// Loads a dataset from a Parquet file.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if:
120    /// - The file cannot be opened
121    /// - The file is not valid Parquet
122    /// - The file is empty
123    pub fn from_parquet(path: impl AsRef<Path>) -> Result<Self> {
124        let path = path.as_ref();
125        let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
126
127        let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(Error::Parquet)?;
128
129        let reader = builder.build().map_err(Error::Parquet)?;
130
131        let batches: Vec<RecordBatch> = reader
132            .collect::<std::result::Result<Vec<_>, _>>()
133            .map_err(Error::Arrow)?;
134
135        if batches.is_empty() {
136            return Err(Error::EmptyDataset);
137        }
138
139        Self::new(batches)
140    }
141
142    /// Saves the dataset to a Parquet file.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if:
147    /// - The file cannot be created
148    /// - Writing fails
149    pub fn to_parquet(&self, path: impl AsRef<Path>) -> Result<()> {
150        let path = path.as_ref();
151        let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
152
153        let props = WriterProperties::builder().build();
154        let mut writer =
155            ArrowWriter::try_new(file, self.schema.clone(), Some(props)).map_err(Error::Parquet)?;
156
157        for batch in &self.batches {
158            writer.write(batch).map_err(Error::Parquet)?;
159        }
160
161        writer.close().map_err(Error::Parquet)?;
162        Ok(())
163    }
164
165    /// Loads a dataset from an Arrow IPC file (Issue #2: Enhanced Data Loading)
166    ///
167    /// Arrow IPC (Inter-Process Communication) format enables zero-copy data
168    /// sharing. This is the native Arrow file format with optimal read
169    /// performance.
170    ///
171    /// # Arguments
172    ///
173    /// * `path` - Path to the Arrow IPC file (typically .arrow or .ipc
174    ///   extension)
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if:
179    /// - The file cannot be opened
180    /// - The file is not valid Arrow IPC format
181    /// - The file is empty
182    ///
183    /// # Example
184    ///
185    /// ```ignore
186    /// let dataset = ArrowDataset::from_ipc("data.arrow").unwrap();
187    /// ```
188    pub fn from_ipc(path: impl AsRef<Path>) -> Result<Self> {
189        use arrow::ipc::reader::FileReader;
190
191        let path = path.as_ref();
192        let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
193
194        let reader = FileReader::try_new(file, None).map_err(Error::Arrow)?;
195
196        let batches: Vec<RecordBatch> = reader
197            .collect::<std::result::Result<Vec<_>, _>>()
198            .map_err(Error::Arrow)?;
199
200        if batches.is_empty() {
201            return Err(Error::EmptyDataset);
202        }
203
204        Self::new(batches)
205    }
206
207    /// Saves the dataset to an Arrow IPC file (Issue #2: Enhanced Data Loading)
208    ///
209    /// Creates a file in Arrow IPC format, the native Arrow format.
210    /// This format provides optimal read performance for Arrow-based tools.
211    ///
212    /// # Arguments
213    ///
214    /// * `path` - Path for the output file
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the file cannot be created or writing fails.
219    ///
220    /// # Example
221    ///
222    /// ```ignore
223    /// dataset.to_ipc("output.arrow").unwrap();
224    /// ```
225    pub fn to_ipc(&self, path: impl AsRef<Path>) -> Result<()> {
226        use arrow::ipc::writer::FileWriter;
227
228        let path = path.as_ref();
229        let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
230
231        let mut writer = FileWriter::try_new(file, &self.schema).map_err(Error::Arrow)?;
232
233        for batch in &self.batches {
234            writer.write(batch).map_err(Error::Arrow)?;
235        }
236
237        writer.finish().map_err(Error::Arrow)?;
238        Ok(())
239    }
240
241    /// Loads a dataset from an Arrow IPC stream file (Issue #2: Enhanced Data
242    /// Loading)
243    ///
244    /// Arrow IPC streaming format is designed for streaming scenarios where the
245    /// schema is sent first, followed by record batches. The file extension is
246    /// typically .arrows.
247    ///
248    /// # Arguments
249    ///
250    /// * `path` - Path to the Arrow IPC stream file
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if parsing fails or the file is empty.
255    pub fn from_ipc_stream(path: impl AsRef<Path>) -> Result<Self> {
256        use arrow::ipc::reader::StreamReader;
257
258        let path = path.as_ref();
259        let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
260
261        let reader = StreamReader::try_new(file, None).map_err(Error::Arrow)?;
262
263        let batches: Vec<RecordBatch> = reader
264            .collect::<std::result::Result<Vec<_>, _>>()
265            .map_err(Error::Arrow)?;
266
267        if batches.is_empty() {
268            return Err(Error::EmptyDataset);
269        }
270
271        Self::new(batches)
272    }
273
274    /// Saves the dataset to an Arrow IPC stream file (Issue #2: Enhanced Data
275    /// Loading)
276    ///
277    /// Creates a file in Arrow IPC streaming format. This format is suitable
278    /// for streaming scenarios and produces slightly smaller files than the
279    /// standard IPC file format.
280    ///
281    /// # Arguments
282    ///
283    /// * `path` - Path for the output file (typically .arrows extension)
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if the file cannot be created or writing fails.
288    pub fn to_ipc_stream(&self, path: impl AsRef<Path>) -> Result<()> {
289        use arrow::ipc::writer::StreamWriter;
290
291        let path = path.as_ref();
292        let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
293
294        let mut writer = StreamWriter::try_new(file, &self.schema).map_err(Error::Arrow)?;
295
296        for batch in &self.batches {
297            writer.write(batch).map_err(Error::Arrow)?;
298        }
299
300        writer.finish().map_err(Error::Arrow)?;
301        Ok(())
302    }
303
304    /// Loads a dataset from a CSV file.
305    ///
306    /// # Arguments
307    ///
308    /// * `path` - Path to the CSV file
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if:
313    /// - The file cannot be opened
314    /// - The file is not valid CSV
315    /// - The file is empty
316    pub fn from_csv(path: impl AsRef<Path>) -> Result<Self> {
317        Self::from_csv_with_options(path, CsvOptions::default())
318    }
319
320    /// Loads a dataset from a CSV file with options.
321    ///
322    /// # Arguments
323    ///
324    /// * `path` - Path to the CSV file
325    /// * `options` - CSV parsing options
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if parsing fails or the file is empty.
330    pub fn from_csv_with_options(path: impl AsRef<Path>, options: CsvOptions) -> Result<Self> {
331        use std::io::{BufReader, Seek, SeekFrom};
332
333        use arrow_csv::{reader::Format, ReaderBuilder};
334
335        let path = path.as_ref();
336        let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
337        let mut buf_reader = BufReader::new(file);
338
339        // Get schema (infer or use provided)
340        let schema = if let Some(schema) = options.schema {
341            Arc::new(schema)
342        } else {
343            // Infer schema from file
344            let mut format = Format::default().with_header(options.has_header);
345            if let Some(delim) = options.delimiter {
346                format = format.with_delimiter(delim);
347            }
348            let (inferred, _) = format
349                .infer_schema(&mut buf_reader, Some(1000))
350                .map_err(Error::Arrow)?;
351
352            // Reset file position
353            buf_reader
354                .seek(SeekFrom::Start(0))
355                .map_err(|e| Error::io(e, path))?;
356
357            Arc::new(inferred)
358        };
359
360        let mut builder = ReaderBuilder::new(schema)
361            .with_batch_size(options.batch_size)
362            .with_header(options.has_header);
363
364        if let Some(delim) = options.delimiter {
365            builder = builder.with_delimiter(delim);
366        }
367
368        let reader = builder.build(buf_reader).map_err(Error::Arrow)?;
369
370        let batches: Vec<RecordBatch> = reader
371            .collect::<std::result::Result<Vec<_>, _>>()
372            .map_err(Error::Arrow)?;
373
374        if batches.is_empty() {
375            return Err(Error::EmptyDataset);
376        }
377
378        Self::new(batches)
379    }
380
381    /// Saves the dataset to a CSV file.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the file cannot be created or writing fails.
386    pub fn to_csv(&self, path: impl AsRef<Path>) -> Result<()> {
387        use arrow_csv::WriterBuilder;
388
389        let path = path.as_ref();
390        let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
391
392        let mut writer = WriterBuilder::new().with_header(true).build(file);
393
394        for batch in &self.batches {
395            writer.write(batch).map_err(Error::Arrow)?;
396        }
397
398        Ok(())
399    }
400
401    /// Loads a dataset from a JSON Lines (JSONL) file.
402    ///
403    /// Each line in the file should be a valid JSON object representing a row.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if the file cannot be opened or parsed.
408    pub fn from_json(path: impl AsRef<Path>) -> Result<Self> {
409        Self::from_json_with_options(path, JsonOptions::default())
410    }
411
412    /// Loads a dataset from a JSON Lines file with options.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if parsing fails or the file is empty.
417    pub fn from_json_with_options(path: impl AsRef<Path>, options: JsonOptions) -> Result<Self> {
418        use std::io::BufReader;
419
420        use arrow_json::ReaderBuilder;
421
422        let path = path.as_ref();
423
424        // Get schema (infer or use provided)
425        let schema = if let Some(schema) = options.schema {
426            Arc::new(schema)
427        } else {
428            // Infer schema from file
429            let infer_file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
430            let infer_reader = BufReader::new(infer_file);
431            let (inferred, _) = arrow_json::reader::infer_json_schema(infer_reader, Some(1000))
432                .map_err(Error::Arrow)?;
433            Arc::new(inferred)
434        };
435
436        // Open file for reading
437        let file = std::fs::File::open(path).map_err(|e| Error::io(e, path))?;
438        let buf_reader = BufReader::new(file);
439
440        let builder = ReaderBuilder::new(schema).with_batch_size(options.batch_size);
441        let reader = builder.build(buf_reader).map_err(Error::Arrow)?;
442
443        let batches: Vec<RecordBatch> = reader
444            .collect::<std::result::Result<Vec<_>, _>>()
445            .map_err(Error::Arrow)?;
446
447        if batches.is_empty() {
448            return Err(Error::EmptyDataset);
449        }
450
451        Self::new(batches)
452    }
453
454    /// Saves the dataset to a JSON Lines (JSONL) file.
455    ///
456    /// Each row is written as a single JSON object on its own line.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if the file cannot be created or writing fails.
461    pub fn to_json(&self, path: impl AsRef<Path>) -> Result<()> {
462        use std::io::BufWriter;
463
464        use arrow_json::LineDelimitedWriter;
465
466        let path = path.as_ref();
467        let file = std::fs::File::create(path).map_err(|e| Error::io(e, path))?;
468        let buf_writer = BufWriter::new(file);
469
470        let mut writer = LineDelimitedWriter::new(buf_writer);
471
472        for batch in &self.batches {
473            writer.write(batch).map_err(Error::Arrow)?;
474        }
475
476        writer.finish().map_err(Error::Arrow)?;
477
478        Ok(())
479    }
480
481    /// Loads a dataset from Parquet bytes in memory.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the data is not valid Parquet.
486    pub fn from_parquet_bytes(data: &[u8]) -> Result<Self> {
487        use bytes::Bytes;
488
489        let bytes = Bytes::copy_from_slice(data);
490
491        let builder = ParquetRecordBatchReaderBuilder::try_new(bytes).map_err(Error::Parquet)?;
492
493        let reader = builder.build().map_err(Error::Parquet)?;
494
495        let batches: Vec<RecordBatch> = reader
496            .collect::<std::result::Result<Vec<_>, _>>()
497            .map_err(Error::Arrow)?;
498
499        if batches.is_empty() {
500            return Err(Error::EmptyDataset);
501        }
502
503        Self::new(batches)
504    }
505
506    /// Converts the dataset to Parquet bytes.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if serialization fails.
511    pub fn to_parquet_bytes(&self) -> Result<Vec<u8>> {
512        let mut buffer = Vec::new();
513        let cursor = std::io::Cursor::new(&mut buffer);
514
515        let props = WriterProperties::builder().build();
516        let mut writer = ArrowWriter::try_new(cursor, self.schema.clone(), Some(props))
517            .map_err(Error::Parquet)?;
518
519        for batch in &self.batches {
520            writer.write(batch).map_err(Error::Parquet)?;
521        }
522
523        writer.close().map_err(Error::Parquet)?;
524        Ok(buffer)
525    }
526
527    /// Loads a dataset from a CSV string.
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if the string is not valid CSV.
532    pub fn from_csv_str(data: &str) -> Result<Self> {
533        use std::io::Cursor;
534
535        use arrow_csv::{reader::Format, ReaderBuilder};
536
537        // Infer schema
538        let mut cursor_for_infer = Cursor::new(data.as_bytes());
539        let format = Format::default().with_header(true);
540        let (inferred, _) = format
541            .infer_schema(&mut cursor_for_infer, Some(1000))
542            .map_err(Error::Arrow)?;
543
544        let schema = Arc::new(inferred);
545        let cursor = Cursor::new(data.as_bytes());
546
547        let builder = ReaderBuilder::new(schema)
548            .with_batch_size(8192)
549            .with_header(true);
550
551        let reader = builder.build(cursor).map_err(Error::Arrow)?;
552
553        let batches: Vec<RecordBatch> = reader
554            .collect::<std::result::Result<Vec<_>, _>>()
555            .map_err(Error::Arrow)?;
556
557        if batches.is_empty() {
558            return Err(Error::EmptyDataset);
559        }
560
561        Self::new(batches)
562    }
563
564    /// Loads a dataset from a JSON string.
565    ///
566    /// # Errors
567    ///
568    /// Returns an error if the string is not valid JSON.
569    pub fn from_json_str(data: &str) -> Result<Self> {
570        use std::io::Cursor;
571
572        use arrow_json::ReaderBuilder;
573
574        // Infer schema
575        let cursor_for_infer = Cursor::new(data.as_bytes());
576        let (inferred, _) = arrow_json::reader::infer_json_schema(cursor_for_infer, Some(1000))
577            .map_err(Error::Arrow)?;
578
579        let schema = Arc::new(inferred);
580        let cursor = Cursor::new(data.as_bytes());
581
582        let builder = ReaderBuilder::new(schema).with_batch_size(8192);
583        let reader = builder.build(cursor).map_err(Error::Arrow)?;
584
585        let batches: Vec<RecordBatch> = reader
586            .collect::<std::result::Result<Vec<_>, _>>()
587            .map_err(Error::Arrow)?;
588
589        if batches.is_empty() {
590            return Err(Error::EmptyDataset);
591        }
592
593        Self::new(batches)
594    }
595
596    /// Returns the underlying batches.
597    pub fn batches(&self) -> &[RecordBatch] {
598        &self.batches
599    }
600
601    /// Consumes the dataset and returns the underlying batches.
602    pub fn into_batches(self) -> Vec<RecordBatch> {
603        self.batches
604    }
605
606    /// Applies a transform to create a new dataset.
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if the transform fails on any batch.
611    pub fn with_transform<T: Transform>(&self, transform: &T) -> Result<Self> {
612        let new_batches: Vec<RecordBatch> = self
613            .batches
614            .iter()
615            .map(|batch| transform.apply(batch.clone()))
616            .collect::<Result<Vec<_>>>()?;
617
618        Self::new(new_batches)
619    }
620
621    /// Returns an iterator over rows as single-row RecordBatches.
622    pub fn rows(&self) -> RowIterator<'_> {
623        RowIterator {
624            dataset: self,
625            current_batch: 0,
626            current_row: 0,
627        }
628    }
629
630    /// Finds the batch and local row index for a global row index.
631    fn find_row(&self, global_index: usize) -> Option<(usize, usize)> {
632        if global_index >= self.row_count {
633            return None;
634        }
635
636        let mut remaining = global_index;
637        for (batch_idx, batch) in self.batches.iter().enumerate() {
638            let batch_rows = batch.num_rows();
639            if remaining < batch_rows {
640                return Some((batch_idx, remaining));
641            }
642            remaining -= batch_rows;
643        }
644
645        None
646    }
647}
648
649impl Dataset for ArrowDataset {
650    fn len(&self) -> usize {
651        self.row_count
652    }
653
654    fn get(&self, index: usize) -> Option<RecordBatch> {
655        let (batch_idx, local_idx) = self.find_row(index)?;
656        let batch = &self.batches[batch_idx];
657        Some(batch.slice(local_idx, 1))
658    }
659
660    fn schema(&self) -> SchemaRef {
661        Arc::clone(&self.schema)
662    }
663
664    fn iter(&self) -> Box<dyn Iterator<Item = RecordBatch> + Send + '_> {
665        Box::new(self.batches.iter().cloned())
666    }
667
668    fn num_batches(&self) -> usize {
669        self.batches.len()
670    }
671
672    fn get_batch(&self, index: usize) -> Option<&RecordBatch> {
673        self.batches.get(index)
674    }
675}
676
677/// Iterator over individual rows of a dataset.
678pub struct RowIterator<'a> {
679    dataset: &'a ArrowDataset,
680    current_batch: usize,
681    current_row: usize,
682}
683
684impl Iterator for RowIterator<'_> {
685    type Item = RecordBatch;
686
687    fn next(&mut self) -> Option<Self::Item> {
688        loop {
689            if self.current_batch >= self.dataset.batches.len() {
690                return None;
691            }
692
693            let batch = &self.dataset.batches[self.current_batch];
694            if self.current_row < batch.num_rows() {
695                let row = batch.slice(self.current_row, 1);
696                self.current_row += 1;
697                return Some(row);
698            }
699
700            self.current_batch += 1;
701            self.current_row = 0;
702        }
703    }
704
705    fn size_hint(&self) -> (usize, Option<usize>) {
706        let mut remaining = 0;
707        for batch in self.dataset.batches.iter().skip(self.current_batch) {
708            remaining += batch.num_rows();
709        }
710        if self.current_batch < self.dataset.batches.len() {
711            remaining -= self.current_row;
712        }
713        (remaining, Some(remaining))
714    }
715}
716
717impl ExactSizeIterator for RowIterator<'_> {}
718
719/// Options for CSV parsing.
720#[derive(Debug, Clone)]
721pub struct CsvOptions {
722    /// Whether the CSV file has a header row.
723    pub has_header: bool,
724    /// Delimiter character (default is comma).
725    pub delimiter: Option<u8>,
726    /// Batch size for reading.
727    pub batch_size: usize,
728    /// Optional schema (inferred if not provided).
729    pub schema: Option<arrow::datatypes::Schema>,
730}
731
732impl Default for CsvOptions {
733    fn default() -> Self {
734        Self {
735            has_header: true,
736            delimiter: None, // Use default comma
737            batch_size: 8192,
738            schema: None,
739        }
740    }
741}
742
743impl CsvOptions {
744    /// Creates new CSV options with default values.
745    pub fn new() -> Self {
746        Self::default()
747    }
748
749    /// Sets whether the file has a header row.
750    #[must_use]
751    pub fn with_header(mut self, has_header: bool) -> Self {
752        self.has_header = has_header;
753        self
754    }
755
756    /// Sets the delimiter character.
757    #[must_use]
758    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
759        self.delimiter = Some(delimiter);
760        self
761    }
762
763    /// Sets the batch size for reading.
764    #[must_use]
765    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
766        self.batch_size = batch_size;
767        self
768    }
769
770    /// Sets the schema for parsing.
771    #[must_use]
772    pub fn with_schema(mut self, schema: arrow::datatypes::Schema) -> Self {
773        self.schema = Some(schema);
774        self
775    }
776}
777
778/// Options for JSON/JSONL parsing.
779#[derive(Debug, Clone)]
780pub struct JsonOptions {
781    /// Batch size for reading.
782    pub batch_size: usize,
783    /// Optional schema (inferred if not provided).
784    pub schema: Option<arrow::datatypes::Schema>,
785}
786
787impl Default for JsonOptions {
788    fn default() -> Self {
789        Self {
790            batch_size: 8192,
791            schema: None,
792        }
793    }
794}
795
796impl JsonOptions {
797    /// Creates new JSON options with default values.
798    pub fn new() -> Self {
799        Self::default()
800    }
801
802    /// Sets the batch size for reading.
803    #[must_use]
804    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
805        self.batch_size = batch_size;
806        self
807    }
808
809    /// Sets the schema for parsing.
810    #[must_use]
811    pub fn with_schema(mut self, schema: arrow::datatypes::Schema) -> Self {
812        self.schema = Some(schema);
813        self
814    }
815}
816
817#[cfg(test)]
818#[allow(
819    clippy::cast_possible_truncation,
820    clippy::cast_possible_wrap,
821    clippy::uninlined_format_args
822)]
823mod tests {
824    use std::sync::Arc;
825
826    use arrow::{
827        array::{Int32Array, StringArray},
828        datatypes::{DataType, Field, Schema},
829    };
830
831    use super::*;
832
833    fn create_test_batch(start: i32, count: usize) -> RecordBatch {
834        let schema = Arc::new(Schema::new(vec![
835            Field::new("id", DataType::Int32, false),
836            Field::new("name", DataType::Utf8, false),
837        ]));
838
839        #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
840        let ids: Vec<i32> = (start..start + count as i32).collect();
841        let names: Vec<String> = ids.iter().map(|i| format!("item_{}", i)).collect();
842
843        let id_array = Int32Array::from(ids);
844        let name_array = StringArray::from(names);
845
846        RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)])
847            .ok()
848            .unwrap_or_else(|| panic!("Failed to create test batch"))
849    }
850
851    #[test]
852    fn test_new_dataset() {
853        let batch = create_test_batch(0, 10);
854        let dataset = ArrowDataset::new(vec![batch]).ok();
855        assert!(dataset.is_some());
856        let dataset = dataset.unwrap_or_else(|| panic!("Dataset should be Some"));
857        assert_eq!(dataset.len(), 10);
858    }
859
860    #[test]
861    fn test_empty_dataset_error() {
862        let result = ArrowDataset::new(vec![]);
863        assert!(result.is_err());
864        if matches!(result, Err(Error::EmptyDataset)) {
865            // Expected
866        } else {
867            panic!("Expected EmptyDataset error");
868        }
869    }
870
871    #[test]
872    fn test_from_batch() {
873        let batch = create_test_batch(0, 5);
874        let dataset = ArrowDataset::from_batch(batch).ok();
875        assert!(dataset.is_some());
876        let dataset = dataset.unwrap_or_else(|| panic!("Dataset should be Some"));
877        assert_eq!(dataset.len(), 5);
878        assert_eq!(dataset.num_batches(), 1);
879    }
880
881    #[test]
882    fn test_get_row() {
883        let batch = create_test_batch(0, 10);
884        let dataset = ArrowDataset::from_batch(batch)
885            .ok()
886            .unwrap_or_else(|| panic!("Should create dataset"));
887
888        let row = dataset.get(5);
889        assert!(row.is_some());
890        let row = row.unwrap_or_else(|| panic!("Row should exist"));
891        assert_eq!(row.num_rows(), 1);
892
893        // Out of bounds
894        assert!(dataset.get(100).is_none());
895    }
896
897    #[test]
898    fn test_get_row_across_batches() {
899        let batch1 = create_test_batch(0, 5);
900        let batch2 = create_test_batch(5, 5);
901        let dataset = ArrowDataset::new(vec![batch1, batch2])
902            .ok()
903            .unwrap_or_else(|| panic!("Should create dataset"));
904
905        assert_eq!(dataset.len(), 10);
906        assert_eq!(dataset.num_batches(), 2);
907
908        // Row in first batch
909        let row = dataset.get(3);
910        assert!(row.is_some());
911
912        // Row in second batch
913        let row = dataset.get(7);
914        assert!(row.is_some());
915    }
916
917    #[test]
918    fn test_iter() {
919        let batch = create_test_batch(0, 10);
920        let dataset = ArrowDataset::from_batch(batch)
921            .ok()
922            .unwrap_or_else(|| panic!("Should create dataset"));
923
924        let batches: Vec<RecordBatch> = dataset.iter().collect();
925        assert_eq!(batches.len(), 1);
926        assert_eq!(batches[0].num_rows(), 10);
927    }
928
929    #[test]
930    fn test_row_iterator() {
931        let batch = create_test_batch(0, 5);
932        let dataset = ArrowDataset::from_batch(batch)
933            .ok()
934            .unwrap_or_else(|| panic!("Should create dataset"));
935
936        let rows: Vec<RecordBatch> = dataset.rows().collect();
937        assert_eq!(rows.len(), 5);
938        for row in rows {
939            assert_eq!(row.num_rows(), 1);
940        }
941    }
942
943    #[test]
944    fn test_row_iterator_exact_size() {
945        let batch = create_test_batch(0, 10);
946        let dataset = ArrowDataset::from_batch(batch)
947            .ok()
948            .unwrap_or_else(|| panic!("Should create dataset"));
949
950        let iter = dataset.rows();
951        assert_eq!(iter.len(), 10);
952    }
953
954    #[test]
955    fn test_schema() {
956        let batch = create_test_batch(0, 5);
957        let expected_schema = batch.schema();
958        let dataset = ArrowDataset::from_batch(batch)
959            .ok()
960            .unwrap_or_else(|| panic!("Should create dataset"));
961
962        assert_eq!(dataset.schema(), expected_schema);
963    }
964
965    #[test]
966    fn test_is_empty() {
967        let batch = create_test_batch(0, 5);
968        let dataset = ArrowDataset::from_batch(batch)
969            .ok()
970            .unwrap_or_else(|| panic!("Should create dataset"));
971
972        assert!(!dataset.is_empty());
973    }
974
975    #[test]
976    fn test_get_batch() {
977        let batch1 = create_test_batch(0, 5);
978        let batch2 = create_test_batch(5, 5);
979        let dataset = ArrowDataset::new(vec![batch1, batch2])
980            .ok()
981            .unwrap_or_else(|| panic!("Should create dataset"));
982
983        assert!(dataset.get_batch(0).is_some());
984        assert!(dataset.get_batch(1).is_some());
985        assert!(dataset.get_batch(2).is_none());
986    }
987
988    #[test]
989    fn test_into_batches() {
990        let batch = create_test_batch(0, 5);
991        let dataset = ArrowDataset::from_batch(batch)
992            .ok()
993            .unwrap_or_else(|| panic!("Should create dataset"));
994
995        let batches = dataset.into_batches();
996        assert_eq!(batches.len(), 1);
997    }
998
999    #[test]
1000    fn test_parquet_roundtrip() {
1001        let batch = create_test_batch(0, 10);
1002        let dataset = ArrowDataset::from_batch(batch)
1003            .ok()
1004            .unwrap_or_else(|| panic!("Should create dataset"));
1005
1006        let temp_dir = tempfile::tempdir()
1007            .ok()
1008            .unwrap_or_else(|| panic!("Should create temp dir"));
1009        let path = temp_dir.path().join("test.parquet");
1010
1011        dataset
1012            .to_parquet(&path)
1013            .ok()
1014            .unwrap_or_else(|| panic!("Should write parquet"));
1015
1016        let loaded = ArrowDataset::from_parquet(&path)
1017            .ok()
1018            .unwrap_or_else(|| panic!("Should load parquet"));
1019
1020        assert_eq!(loaded.len(), dataset.len());
1021        assert_eq!(loaded.schema(), dataset.schema());
1022    }
1023
1024    #[test]
1025    fn test_csv_roundtrip() {
1026        let batch = create_test_batch(0, 10);
1027        let dataset = ArrowDataset::from_batch(batch)
1028            .ok()
1029            .unwrap_or_else(|| panic!("Should create dataset"));
1030
1031        let temp_dir = tempfile::tempdir()
1032            .ok()
1033            .unwrap_or_else(|| panic!("Should create temp dir"));
1034        let path = temp_dir.path().join("test.csv");
1035
1036        dataset
1037            .to_csv(&path)
1038            .ok()
1039            .unwrap_or_else(|| panic!("Should write csv"));
1040
1041        let loaded = ArrowDataset::from_csv(&path)
1042            .ok()
1043            .unwrap_or_else(|| panic!("Should load csv"));
1044
1045        assert_eq!(loaded.len(), dataset.len());
1046    }
1047
1048    #[test]
1049    fn test_ipc_roundtrip() {
1050        let batch = create_test_batch(0, 10);
1051        let dataset = ArrowDataset::from_batch(batch)
1052            .ok()
1053            .unwrap_or_else(|| panic!("Should create dataset"));
1054
1055        let temp_dir = tempfile::tempdir()
1056            .ok()
1057            .unwrap_or_else(|| panic!("Should create temp dir"));
1058        let path = temp_dir.path().join("test.arrow");
1059
1060        dataset
1061            .to_ipc(&path)
1062            .ok()
1063            .unwrap_or_else(|| panic!("Should write IPC"));
1064
1065        let loaded = ArrowDataset::from_ipc(&path)
1066            .ok()
1067            .unwrap_or_else(|| panic!("Should load IPC"));
1068
1069        assert_eq!(loaded.len(), dataset.len());
1070        assert_eq!(loaded.schema(), dataset.schema());
1071    }
1072
1073    #[test]
1074    fn test_ipc_stream_roundtrip() {
1075        let batch = create_test_batch(0, 10);
1076        let dataset = ArrowDataset::from_batch(batch)
1077            .ok()
1078            .unwrap_or_else(|| panic!("Should create dataset"));
1079
1080        let temp_dir = tempfile::tempdir()
1081            .ok()
1082            .unwrap_or_else(|| panic!("Should create temp dir"));
1083        let path = temp_dir.path().join("test.arrows");
1084
1085        dataset
1086            .to_ipc_stream(&path)
1087            .ok()
1088            .unwrap_or_else(|| panic!("Should write IPC stream"));
1089
1090        let loaded = ArrowDataset::from_ipc_stream(&path)
1091            .ok()
1092            .unwrap_or_else(|| panic!("Should load IPC stream"));
1093
1094        assert_eq!(loaded.len(), dataset.len());
1095        assert_eq!(loaded.schema(), dataset.schema());
1096    }
1097
1098    #[test]
1099    fn test_ipc_error_nonexistent() {
1100        let result = ArrowDataset::from_ipc("/nonexistent/path/to/file.arrow");
1101        assert!(result.is_err());
1102    }
1103
1104    #[test]
1105    fn test_ipc_stream_error_nonexistent() {
1106        let result = ArrowDataset::from_ipc_stream("/nonexistent/path/to/file.arrows");
1107        assert!(result.is_err());
1108    }
1109
1110    #[test]
1111    fn test_csv_options() {
1112        let options = CsvOptions::new()
1113            .with_header(true)
1114            .with_delimiter(b',')
1115            .with_batch_size(1024);
1116
1117        assert!(options.has_header);
1118        assert_eq!(options.delimiter, Some(b','));
1119        assert_eq!(options.batch_size, 1024);
1120    }
1121
1122    #[test]
1123    fn test_csv_options_default() {
1124        let options = CsvOptions::default();
1125        assert!(options.has_header);
1126        assert!(options.delimiter.is_none());
1127        assert_eq!(options.batch_size, 8192);
1128        assert!(options.schema.is_none());
1129    }
1130
1131    #[test]
1132    fn test_json_roundtrip() {
1133        let batch = create_test_batch(0, 10);
1134        let dataset = ArrowDataset::from_batch(batch)
1135            .ok()
1136            .unwrap_or_else(|| panic!("Should create dataset"));
1137
1138        let temp_dir = tempfile::tempdir()
1139            .ok()
1140            .unwrap_or_else(|| panic!("Should create temp dir"));
1141        let path = temp_dir.path().join("test.jsonl");
1142
1143        dataset
1144            .to_json(&path)
1145            .ok()
1146            .unwrap_or_else(|| panic!("Should write json"));
1147
1148        let loaded = ArrowDataset::from_json(&path)
1149            .ok()
1150            .unwrap_or_else(|| panic!("Should load json"));
1151
1152        assert_eq!(loaded.len(), dataset.len());
1153    }
1154
1155    #[test]
1156    fn test_json_options() {
1157        let options = JsonOptions::new().with_batch_size(1024);
1158
1159        assert_eq!(options.batch_size, 1024);
1160        assert!(options.schema.is_none());
1161    }
1162
1163    #[test]
1164    fn test_json_options_default() {
1165        let options = JsonOptions::default();
1166        assert_eq!(options.batch_size, 8192);
1167        assert!(options.schema.is_none());
1168    }
1169
1170    #[test]
1171    fn test_clone() {
1172        let batch = create_test_batch(0, 5);
1173        let dataset = ArrowDataset::from_batch(batch)
1174            .ok()
1175            .unwrap_or_else(|| panic!("Should create dataset"));
1176
1177        let cloned = dataset.clone();
1178        assert_eq!(cloned.len(), dataset.len());
1179        assert_eq!(cloned.schema(), dataset.schema());
1180    }
1181
1182    #[test]
1183    fn test_debug() {
1184        let batch = create_test_batch(0, 5);
1185        let dataset = ArrowDataset::from_batch(batch)
1186            .ok()
1187            .unwrap_or_else(|| panic!("Should create dataset"));
1188
1189        let debug_str = format!("{:?}", dataset);
1190        assert!(debug_str.contains("ArrowDataset"));
1191    }
1192
1193    #[test]
1194    fn test_csv_with_schema() {
1195        let schema = Schema::new(vec![
1196            Field::new("id", DataType::Int32, false),
1197            Field::new("name", DataType::Utf8, false),
1198        ]);
1199
1200        let options = CsvOptions::new().with_schema(schema);
1201        assert!(options.schema.is_some());
1202    }
1203
1204    #[test]
1205    fn test_json_with_schema() {
1206        let schema = Schema::new(vec![
1207            Field::new("id", DataType::Int32, false),
1208            Field::new("name", DataType::Utf8, false),
1209        ]);
1210
1211        let options = JsonOptions::new().with_schema(schema);
1212        assert!(options.schema.is_some());
1213    }
1214
1215    #[test]
1216    fn test_schema_mismatch_error() {
1217        let schema1 = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1218        let schema2 = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
1219
1220        let batch1 = RecordBatch::try_new(schema1, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
1221            .ok()
1222            .unwrap_or_else(|| panic!("Should create batch"));
1223
1224        let batch2 = RecordBatch::try_new(
1225            schema2,
1226            vec![Arc::new(StringArray::from(vec!["a", "b", "c"]))],
1227        )
1228        .ok()
1229        .unwrap_or_else(|| panic!("Should create batch"));
1230
1231        let result = ArrowDataset::new(vec![batch1, batch2]);
1232        assert!(result.is_err());
1233    }
1234
1235    #[test]
1236    fn test_from_parquet_error() {
1237        let result = ArrowDataset::from_parquet("/nonexistent/path/to/file.parquet");
1238        assert!(result.is_err());
1239    }
1240
1241    #[test]
1242    fn test_from_csv_error() {
1243        let result = ArrowDataset::from_csv("/nonexistent/path/to/file.csv");
1244        assert!(result.is_err());
1245    }
1246
1247    #[test]
1248    fn test_from_json_error() {
1249        let result = ArrowDataset::from_json("/nonexistent/path/to/file.json");
1250        assert!(result.is_err());
1251    }
1252
1253    #[test]
1254    fn test_with_transform() {
1255        use crate::transform::Select;
1256
1257        let batch = create_test_batch(0, 10);
1258        let dataset = ArrowDataset::from_batch(batch)
1259            .ok()
1260            .unwrap_or_else(|| panic!("Should create dataset"));
1261
1262        let transform = Select::new(vec!["id"]);
1263        let transformed = dataset
1264            .with_transform(&transform)
1265            .ok()
1266            .unwrap_or_else(|| panic!("Should apply transform"));
1267
1268        assert_eq!(transformed.schema().fields().len(), 1);
1269        assert_eq!(transformed.len(), 10);
1270    }
1271
1272    #[test]
1273    fn test_batches_accessor() {
1274        let batch = create_test_batch(0, 10);
1275        let dataset = ArrowDataset::from_batch(batch)
1276            .ok()
1277            .unwrap_or_else(|| panic!("Should create dataset"));
1278
1279        let batches = dataset.batches();
1280        assert_eq!(batches.len(), 1);
1281        assert_eq!(batches[0].num_rows(), 10);
1282    }
1283
1284    #[test]
1285    fn test_rows_size_hint() {
1286        let batch = create_test_batch(0, 10);
1287        let dataset = ArrowDataset::from_batch(batch)
1288            .ok()
1289            .unwrap_or_else(|| panic!("Should create dataset"));
1290
1291        let rows = dataset.rows();
1292        assert_eq!(rows.size_hint(), (10, Some(10)));
1293    }
1294
1295    #[test]
1296    fn test_multiple_batches_iteration() {
1297        let batch1 = create_test_batch(0, 5);
1298        let batch2 = create_test_batch(5, 3);
1299        let batch3 = create_test_batch(8, 2);
1300
1301        let dataset = ArrowDataset::new(vec![batch1, batch2, batch3])
1302            .ok()
1303            .unwrap_or_else(|| panic!("Should create dataset"));
1304
1305        let total_rows: usize = dataset.iter().map(|b| b.num_rows()).sum();
1306        assert_eq!(total_rows, 10);
1307
1308        // Test rows iterator across batches
1309        let row_count = dataset.rows().count();
1310        assert_eq!(row_count, 10);
1311    }
1312
1313    #[test]
1314    fn test_csv_options_debug() {
1315        let options = CsvOptions::default();
1316        let debug_str = format!("{:?}", options);
1317        assert!(debug_str.contains("CsvOptions"));
1318    }
1319
1320    #[test]
1321    fn test_json_options_debug() {
1322        let options = JsonOptions::default();
1323        let debug_str = format!("{:?}", options);
1324        assert!(debug_str.contains("JsonOptions"));
1325    }
1326
1327    // === Additional coverage tests ===
1328
1329    #[test]
1330    fn test_from_parquet_bytes_and_to_parquet_bytes() {
1331        let batch = create_test_batch(0, 10);
1332        let dataset = ArrowDataset::from_batch(batch)
1333            .ok()
1334            .unwrap_or_else(|| panic!("Should create dataset"));
1335
1336        let bytes = dataset
1337            .to_parquet_bytes()
1338            .ok()
1339            .unwrap_or_else(|| panic!("Should convert to bytes"));
1340
1341        let loaded = ArrowDataset::from_parquet_bytes(&bytes)
1342            .ok()
1343            .unwrap_or_else(|| panic!("Should load from bytes"));
1344
1345        assert_eq!(loaded.len(), 10);
1346        assert_eq!(loaded.schema(), dataset.schema());
1347    }
1348
1349    #[test]
1350    fn test_from_csv_str() {
1351        let csv = "id,name\n1,Alice\n2,Bob\n3,Charlie";
1352        let dataset = ArrowDataset::from_csv_str(csv)
1353            .ok()
1354            .unwrap_or_else(|| panic!("Should parse CSV string"));
1355
1356        assert_eq!(dataset.len(), 3);
1357        assert_eq!(dataset.schema().fields().len(), 2);
1358    }
1359
1360    #[test]
1361    fn test_from_json_str() {
1362        let json = r#"{"id": 1, "name": "Alice"}
1363{"id": 2, "name": "Bob"}
1364{"id": 3, "name": "Charlie"}"#;
1365        let dataset = ArrowDataset::from_json_str(json)
1366            .ok()
1367            .unwrap_or_else(|| panic!("Should parse JSON string"));
1368
1369        assert_eq!(dataset.len(), 3);
1370    }
1371
1372    #[test]
1373    fn test_csv_with_options_custom_delimiter() {
1374        let temp_dir = tempfile::tempdir()
1375            .ok()
1376            .unwrap_or_else(|| panic!("Should create temp dir"));
1377        let path = temp_dir.path().join("test.tsv");
1378
1379        // Write TSV file
1380        std::fs::write(&path, "id\tname\n1\tAlice\n2\tBob\n3\tCharlie")
1381            .ok()
1382            .unwrap_or_else(|| panic!("Should write TSV"));
1383
1384        let options = CsvOptions::new().with_delimiter(b'\t');
1385        let dataset = ArrowDataset::from_csv_with_options(&path, options)
1386            .ok()
1387            .unwrap_or_else(|| panic!("Should load TSV"));
1388
1389        assert_eq!(dataset.len(), 3);
1390    }
1391
1392    #[test]
1393    fn test_csv_without_header() {
1394        let temp_dir = tempfile::tempdir()
1395            .ok()
1396            .unwrap_or_else(|| panic!("Should create temp dir"));
1397        let path = temp_dir.path().join("no_header.csv");
1398
1399        // Write CSV without header
1400        std::fs::write(&path, "1,Alice\n2,Bob\n3,Charlie")
1401            .ok()
1402            .unwrap_or_else(|| panic!("Should write CSV"));
1403
1404        let options = CsvOptions::new().with_header(false);
1405        let dataset = ArrowDataset::from_csv_with_options(&path, options)
1406            .ok()
1407            .unwrap_or_else(|| panic!("Should load CSV"));
1408
1409        assert_eq!(dataset.len(), 3);
1410    }
1411
1412    #[test]
1413    fn test_json_with_options_batch_size() {
1414        let temp_dir = tempfile::tempdir()
1415            .ok()
1416            .unwrap_or_else(|| panic!("Should create temp dir"));
1417        let path = temp_dir.path().join("test.jsonl");
1418
1419        let json = r#"{"id": 1, "name": "Alice"}
1420{"id": 2, "name": "Bob"}
1421{"id": 3, "name": "Charlie"}"#;
1422        std::fs::write(&path, json)
1423            .ok()
1424            .unwrap_or_else(|| panic!("Should write JSON"));
1425
1426        let options = JsonOptions::new().with_batch_size(1024);
1427        let dataset = ArrowDataset::from_json_with_options(&path, options)
1428            .ok()
1429            .unwrap_or_else(|| panic!("Should load JSON"));
1430
1431        assert_eq!(dataset.len(), 3);
1432    }
1433
1434    #[test]
1435    fn test_row_iterator_multiple_batches_size_hint() {
1436        let batch1 = create_test_batch(0, 5);
1437        let batch2 = create_test_batch(5, 3);
1438        let batch3 = create_test_batch(8, 2);
1439
1440        let dataset = ArrowDataset::new(vec![batch1, batch2, batch3])
1441            .ok()
1442            .unwrap_or_else(|| panic!("Should create dataset"));
1443
1444        let mut iter = dataset.rows();
1445        assert_eq!(iter.size_hint(), (10, Some(10)));
1446
1447        // Consume some elements
1448        iter.next();
1449        iter.next();
1450        assert_eq!(iter.size_hint(), (8, Some(8)));
1451
1452        // Consume more to cross batch boundary
1453        for _ in 0..3 {
1454            iter.next();
1455        }
1456        assert_eq!(iter.size_hint(), (5, Some(5)));
1457    }
1458
1459    #[test]
1460    fn test_find_row_boundary_conditions() {
1461        let batch1 = create_test_batch(0, 3);
1462        let batch2 = create_test_batch(3, 2);
1463
1464        let dataset = ArrowDataset::new(vec![batch1, batch2])
1465            .ok()
1466            .unwrap_or_else(|| panic!("Should create dataset"));
1467
1468        // First row of first batch
1469        let row0 = dataset.get(0);
1470        assert!(row0.is_some());
1471
1472        // Last row of first batch
1473        let row2 = dataset.get(2);
1474        assert!(row2.is_some());
1475
1476        // First row of second batch
1477        let row3 = dataset.get(3);
1478        assert!(row3.is_some());
1479
1480        // Last row of second batch
1481        let row4 = dataset.get(4);
1482        assert!(row4.is_some());
1483
1484        // Out of bounds
1485        let row5 = dataset.get(5);
1486        assert!(row5.is_none());
1487    }
1488
1489    #[test]
1490    fn test_empty_csv_str_error() {
1491        // Empty CSV should error
1492        let result = ArrowDataset::from_csv_str("");
1493        assert!(result.is_err());
1494    }
1495
1496    #[test]
1497    fn test_empty_json_str_error() {
1498        // Empty JSON should error
1499        let result = ArrowDataset::from_json_str("");
1500        assert!(result.is_err());
1501    }
1502
1503    #[test]
1504    fn test_dataset_trait_is_empty_for_nonempty() {
1505        let batch = create_test_batch(0, 1);
1506        let dataset = ArrowDataset::from_batch(batch)
1507            .ok()
1508            .unwrap_or_else(|| panic!("Should create dataset"));
1509
1510        assert!(!<ArrowDataset as Dataset>::is_empty(&dataset));
1511    }
1512
1513    #[test]
1514    fn test_rows_exact_size_iterator_len_exhaustion() {
1515        let batch = create_test_batch(0, 3);
1516        let dataset = ArrowDataset::from_batch(batch)
1517            .ok()
1518            .unwrap_or_else(|| panic!("Should create dataset"));
1519
1520        let mut iter = dataset.rows();
1521        assert_eq!(iter.len(), 3);
1522
1523        iter.next();
1524        assert_eq!(iter.len(), 2);
1525
1526        iter.next();
1527        assert_eq!(iter.len(), 1);
1528
1529        iter.next();
1530        assert_eq!(iter.len(), 0);
1531
1532        // Exhausted
1533        assert!(iter.next().is_none());
1534        assert_eq!(iter.len(), 0);
1535    }
1536
1537    #[test]
1538    fn test_csv_options_clone() {
1539        let options = CsvOptions::new()
1540            .with_header(false)
1541            .with_delimiter(b';')
1542            .with_batch_size(512);
1543        let cloned = options.clone();
1544
1545        assert_eq!(cloned.has_header, options.has_header);
1546        assert_eq!(cloned.delimiter, options.delimiter);
1547        assert_eq!(cloned.batch_size, options.batch_size);
1548    }
1549
1550    #[test]
1551    fn test_json_options_clone() {
1552        let options = JsonOptions::new().with_batch_size(256);
1553        let cloned = options.clone();
1554
1555        assert_eq!(cloned.batch_size, options.batch_size);
1556    }
1557
1558    #[test]
1559    fn test_iter_returns_cloned_batches() {
1560        let batch = create_test_batch(0, 5);
1561        let dataset = ArrowDataset::from_batch(batch)
1562            .ok()
1563            .unwrap_or_else(|| panic!("Should create dataset"));
1564
1565        let mut iter = dataset.iter();
1566        let first = iter.next();
1567        assert!(first.is_some());
1568
1569        // Original dataset should still be usable
1570        assert_eq!(dataset.len(), 5);
1571    }
1572
1573    #[test]
1574    fn test_row_iterator_empty_batch_handling() {
1575        // Create batches where one might be empty-ish
1576        let batch1 = create_test_batch(0, 2);
1577        let batch2 = create_test_batch(2, 1);
1578
1579        let dataset = ArrowDataset::new(vec![batch1, batch2])
1580            .ok()
1581            .unwrap_or_else(|| panic!("Should create dataset"));
1582
1583        let rows: Vec<_> = dataset.rows().collect();
1584        assert_eq!(rows.len(), 3);
1585    }
1586
1587    #[test]
1588    fn test_large_batch_count() {
1589        let batches: Vec<RecordBatch> = (0..20).map(|i| create_test_batch(i * 2, 2)).collect();
1590
1591        let dataset = ArrowDataset::new(batches)
1592            .ok()
1593            .unwrap_or_else(|| panic!("Should create dataset"));
1594
1595        assert_eq!(dataset.len(), 40);
1596        assert_eq!(dataset.num_batches(), 20);
1597
1598        // Access row from last batch
1599        let row = dataset.get(39);
1600        assert!(row.is_some());
1601    }
1602
1603    #[test]
1604    fn test_csv_write_and_verify_content() {
1605        let batch = create_test_batch(0, 3);
1606        let dataset = ArrowDataset::from_batch(batch)
1607            .ok()
1608            .unwrap_or_else(|| panic!("Should create dataset"));
1609
1610        let temp_dir = tempfile::tempdir()
1611            .ok()
1612            .unwrap_or_else(|| panic!("Should create temp dir"));
1613        let path = temp_dir.path().join("verify.csv");
1614
1615        dataset
1616            .to_csv(&path)
1617            .ok()
1618            .unwrap_or_else(|| panic!("Should write CSV"));
1619
1620        // Read back and verify
1621        let content = std::fs::read_to_string(&path)
1622            .ok()
1623            .unwrap_or_else(|| panic!("Should read file"));
1624        assert!(content.contains("id"));
1625        assert!(content.contains("name"));
1626    }
1627
1628    #[test]
1629    fn test_json_write_and_verify_content() {
1630        let batch = create_test_batch(0, 2);
1631        let dataset = ArrowDataset::from_batch(batch)
1632            .ok()
1633            .unwrap_or_else(|| panic!("Should create dataset"));
1634
1635        let temp_dir = tempfile::tempdir()
1636            .ok()
1637            .unwrap_or_else(|| panic!("Should create temp dir"));
1638        let path = temp_dir.path().join("verify.jsonl");
1639
1640        dataset
1641            .to_json(&path)
1642            .ok()
1643            .unwrap_or_else(|| panic!("Should write JSON"));
1644
1645        // Read back and verify
1646        let content = std::fs::read_to_string(&path)
1647            .ok()
1648            .unwrap_or_else(|| panic!("Should read file"));
1649        assert!(content.contains("\"id\""));
1650        assert!(content.contains("\"name\""));
1651    }
1652
1653    #[test]
1654    fn test_to_ipc_error_invalid_path() {
1655        let batch = create_test_batch(0, 5);
1656        let dataset = ArrowDataset::from_batch(batch)
1657            .ok()
1658            .unwrap_or_else(|| panic!("Should create dataset"));
1659
1660        let result = dataset.to_ipc("/nonexistent/path/output.arrow");
1661        assert!(result.is_err());
1662    }
1663
1664    #[test]
1665    fn test_to_ipc_stream_error_invalid_path() {
1666        let batch = create_test_batch(0, 5);
1667        let dataset = ArrowDataset::from_batch(batch)
1668            .ok()
1669            .unwrap_or_else(|| panic!("Should create dataset"));
1670
1671        let result = dataset.to_ipc_stream("/nonexistent/path/output.arrows");
1672        assert!(result.is_err());
1673    }
1674
1675    #[test]
1676    fn test_to_parquet_error_invalid_path() {
1677        let batch = create_test_batch(0, 5);
1678        let dataset = ArrowDataset::from_batch(batch)
1679            .ok()
1680            .unwrap_or_else(|| panic!("Should create dataset"));
1681
1682        let result = dataset.to_parquet("/nonexistent/path/output.parquet");
1683        assert!(result.is_err());
1684    }
1685
1686    #[test]
1687    fn test_to_csv_error_invalid_path() {
1688        let batch = create_test_batch(0, 5);
1689        let dataset = ArrowDataset::from_batch(batch)
1690            .ok()
1691            .unwrap_or_else(|| panic!("Should create dataset"));
1692
1693        let result = dataset.to_csv("/nonexistent/path/output.csv");
1694        assert!(result.is_err());
1695    }
1696
1697    #[test]
1698    fn test_to_json_error_invalid_path() {
1699        let batch = create_test_batch(0, 5);
1700        let dataset = ArrowDataset::from_batch(batch)
1701            .ok()
1702            .unwrap_or_else(|| panic!("Should create dataset"));
1703
1704        let result = dataset.to_json("/nonexistent/path/output.jsonl");
1705        assert!(result.is_err());
1706    }
1707}