Skip to main content

alimentar/format/
streaming.rs

1//! Streaming dataset format with lazy chunk loading
2//!
3//! Enables lazy loading of large datasets (>100MB) by reading only
4//! the header and chunk index initially, then loading chunks on demand.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use alimentar::format::streaming::StreamingDataset;
10//!
11//! // Open dataset (only reads header + index)
12//! let dataset = StreamingDataset::open("large_data.ald")?;
13//!
14//! // Access chunks lazily
15//! println!("Total rows: {}", dataset.num_rows());
16//! for chunk in dataset.chunks() {
17//!     println!("Chunk with {} rows", chunk?.num_rows());
18//! }
19//! ```
20
21// Allow casts where truncation is intentional for file format sizes
22#![allow(clippy::cast_possible_truncation)]
23
24use std::{
25    fs::File,
26    io::{BufReader, Read, Seek, SeekFrom},
27    path::{Path, PathBuf},
28    sync::Arc,
29};
30
31use arrow::{array::RecordBatch, datatypes::SchemaRef};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35    error::{Error, Result},
36    format::{flags, Compression, HEADER_SIZE, MAGIC},
37};
38
39/// Default chunk size in rows for streaming format
40pub const DEFAULT_CHUNK_SIZE: usize = 65536; // 64K rows per chunk
41
42/// Minimum dataset size to recommend streaming (100MB)
43pub const STREAMING_THRESHOLD: u64 = 100 * 1024 * 1024;
44
45/// Entry in the chunk index describing one chunk's location
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct ChunkEntry {
48    /// Row offset (first row in this chunk)
49    pub row_offset: u64,
50    /// Number of rows in this chunk
51    pub num_rows: u32,
52    /// Byte offset in the payload section
53    pub byte_offset: u64,
54    /// Compressed size in bytes
55    pub compressed_size: u32,
56    /// Uncompressed size in bytes
57    pub uncompressed_size: u32,
58}
59
60impl ChunkEntry {
61    /// Create a new chunk entry
62    pub fn new(
63        row_offset: u64,
64        num_rows: u32,
65        byte_offset: u64,
66        compressed_size: u32,
67        uncompressed_size: u32,
68    ) -> Self {
69        Self {
70            row_offset,
71            num_rows,
72            byte_offset,
73            compressed_size,
74            uncompressed_size,
75        }
76    }
77
78    /// Check if this chunk contains the given row
79    pub fn contains_row(&self, row: u64) -> bool {
80        row >= self.row_offset && row < self.row_offset + u64::from(self.num_rows)
81    }
82
83    /// Get the last row index in this chunk (exclusive)
84    pub fn end_row(&self) -> u64 {
85        self.row_offset + u64::from(self.num_rows)
86    }
87}
88
89/// Index of all chunks in a streaming dataset
90#[derive(Debug, Clone, Default, Serialize, Deserialize)]
91pub struct ChunkIndex {
92    /// Chunk entries in order
93    entries: Vec<ChunkEntry>,
94    /// Total row count (cached)
95    total_rows: u64,
96}
97
98impl ChunkIndex {
99    /// Create a new empty chunk index
100    pub fn new() -> Self {
101        Self::default()
102    }
103
104    /// Create chunk index from entries
105    pub fn from_entries(entries: Vec<ChunkEntry>) -> Self {
106        let total_rows = entries.last().map_or(0, ChunkEntry::end_row);
107        Self {
108            entries,
109            total_rows,
110        }
111    }
112
113    /// Add a chunk entry
114    pub fn push(&mut self, entry: ChunkEntry) {
115        self.total_rows = entry.end_row();
116        self.entries.push(entry);
117    }
118
119    /// Get number of chunks
120    pub fn len(&self) -> usize {
121        self.entries.len()
122    }
123
124    /// Check if index is empty
125    pub fn is_empty(&self) -> bool {
126        self.entries.is_empty()
127    }
128
129    /// Get total row count
130    pub fn total_rows(&self) -> u64 {
131        self.total_rows
132    }
133
134    /// Get chunk entry by index
135    pub fn get(&self, index: usize) -> Option<&ChunkEntry> {
136        self.entries.get(index)
137    }
138
139    /// Find chunk containing the given row
140    pub fn find_chunk_for_row(&self, row: u64) -> Option<usize> {
141        if row >= self.total_rows {
142            return None;
143        }
144        // Binary search for efficiency
145        self.entries
146            .binary_search_by(|entry| {
147                if row < entry.row_offset {
148                    std::cmp::Ordering::Greater
149                } else if row >= entry.end_row() {
150                    std::cmp::Ordering::Less
151                } else {
152                    std::cmp::Ordering::Equal
153                }
154            })
155            .ok()
156    }
157
158    /// Iterate over entries
159    pub fn iter(&self) -> impl Iterator<Item = &ChunkEntry> {
160        self.entries.iter()
161    }
162
163    /// Serialize to bytes
164    pub fn to_bytes(&self) -> Result<Vec<u8>> {
165        rmp_serde::to_vec(self)
166            .map_err(|e| Error::Format(format!("Failed to serialize chunk index: {e}")))
167    }
168
169    /// Deserialize from bytes
170    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
171        rmp_serde::from_slice(bytes)
172            .map_err(|e| Error::Format(format!("Failed to deserialize chunk index: {e}")))
173    }
174}
175
176/// Streaming dataset with lazy chunk loading
177///
178/// Uses file seeking to load chunks on demand, minimizing memory usage.
179pub struct StreamingDataset {
180    /// Path to the dataset file
181    path: PathBuf,
182    /// Chunk index
183    index: ChunkIndex,
184    /// Arrow schema
185    schema: SchemaRef,
186    /// Compression type used
187    compression: Compression,
188    /// Offset where payload starts
189    payload_offset: u64,
190}
191
192impl StreamingDataset {
193    /// Open a streaming dataset from a file
194    ///
195    /// Only reads the header, metadata, and chunk index initially.
196    /// Chunks are loaded on demand.
197    ///
198    /// # Errors
199    ///
200    /// Returns error if file cannot be opened, is not a valid .ald file,
201    /// or does not have the STREAMING flag set.
202    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
203        let path = path.as_ref();
204        let file = File::open(path).map_err(|e| Error::io(e, path))?;
205        let mut reader = BufReader::new(file);
206
207        // Read header
208        let mut header = [0u8; HEADER_SIZE];
209        reader
210            .read_exact(&mut header)
211            .map_err(|e| Error::io(e, path))?;
212
213        // Verify magic bytes
214        if header[0..4] != MAGIC {
215            return Err(Error::Format("Invalid magic bytes".into()));
216        }
217
218        // Check streaming flag
219        let header_flags = header[6];
220        if header_flags & flags::STREAMING == 0 {
221            return Err(Error::Format(
222                "File does not have STREAMING flag set. Use regular load() instead.".into(),
223            ));
224        }
225
226        // Read compression type
227        let compression = Compression::from_u8(header[7])
228            .ok_or_else(|| Error::Format(format!("Unknown compression type: {}", header[7])))?;
229
230        // Read section sizes from header
231        let metadata_size = u64::from(u32::from_le_bytes([
232            header[12], header[13], header[14], header[15],
233        ]));
234        let schema_size = u64::from(u32::from_le_bytes([
235            header[16], header[17], header[18], header[19],
236        ]));
237        let index_size = u64::from(u32::from_le_bytes([
238            header[20], header[21], header[22], header[23],
239        ]));
240
241        // Calculate offsets
242        let schema_offset = u64::from(HEADER_SIZE as u32) + metadata_size;
243        let index_offset = schema_offset + schema_size;
244        let payload_offset = index_offset + index_size;
245
246        // Read schema
247        reader
248            .seek(SeekFrom::Start(schema_offset))
249            .map_err(|e| Error::io(e, path))?;
250        let mut schema_bytes = vec![0u8; schema_size as usize];
251        reader
252            .read_exact(&mut schema_bytes)
253            .map_err(|e| Error::io(e, path))?;
254        let schema = Self::deserialize_schema(&schema_bytes)?;
255
256        // Read chunk index
257        let mut index_bytes = vec![0u8; index_size as usize];
258        reader
259            .read_exact(&mut index_bytes)
260            .map_err(|e| Error::io(e, path))?;
261        let index = ChunkIndex::from_bytes(&index_bytes)?;
262
263        Ok(Self {
264            path: path.to_path_buf(),
265            index,
266            schema,
267            compression,
268            payload_offset,
269        })
270    }
271
272    /// Get total row count
273    pub fn num_rows(&self) -> u64 {
274        self.index.total_rows()
275    }
276
277    /// Get number of chunks
278    pub fn num_chunks(&self) -> usize {
279        self.index.len()
280    }
281
282    /// Get the schema
283    pub fn schema(&self) -> SchemaRef {
284        Arc::clone(&self.schema)
285    }
286
287    /// Load a specific chunk by index
288    ///
289    /// # Errors
290    ///
291    /// Returns error if chunk index is out of bounds or decompression fails.
292    pub fn get_chunk(&self, chunk_idx: usize) -> Result<RecordBatch> {
293        let entry = self
294            .index
295            .get(chunk_idx)
296            .ok_or_else(|| Error::IndexOutOfBounds {
297                index: chunk_idx,
298                len: self.index.len(),
299            })?;
300
301        self.load_chunk(entry)
302    }
303
304    /// Get random access to rows by range
305    ///
306    /// # Errors
307    ///
308    /// Returns error if row range is out of bounds.
309    pub fn get_rows(&self, start: u64, count: u64) -> Result<RecordBatch> {
310        if start >= self.num_rows() {
311            return Err(Error::IndexOutOfBounds {
312                index: start as usize,
313                len: self.num_rows() as usize,
314            });
315        }
316
317        let end = (start + count).min(self.num_rows());
318        let actual_count = end - start;
319
320        // Find chunks that contain these rows
321        let start_chunk =
322            self.index
323                .find_chunk_for_row(start)
324                .ok_or_else(|| Error::IndexOutOfBounds {
325                    index: start as usize,
326                    len: self.num_rows() as usize,
327                })?;
328
329        let end_chunk = self
330            .index
331            .find_chunk_for_row(end.saturating_sub(1))
332            .unwrap_or(start_chunk);
333
334        // Load and slice chunks
335        let mut batches = Vec::new();
336        let mut remaining = actual_count;
337        let mut current_row = start;
338
339        for chunk_idx in start_chunk..=end_chunk {
340            let entry = self
341                .index
342                .get(chunk_idx)
343                .ok_or_else(|| Error::IndexOutOfBounds {
344                    index: chunk_idx,
345                    len: self.index.len(),
346                })?;
347
348            let batch = self.load_chunk(entry)?;
349
350            // Calculate slice within this chunk
351            let chunk_start = if current_row > entry.row_offset {
352                (current_row - entry.row_offset) as usize
353            } else {
354                0
355            };
356
357            let chunk_take = remaining.min(u64::from(entry.num_rows) - chunk_start as u64) as usize;
358
359            let sliced = batch.slice(chunk_start, chunk_take);
360            batches.push(sliced);
361
362            remaining -= chunk_take as u64;
363            current_row += chunk_take as u64;
364        }
365
366        // Concatenate batches if needed
367        if batches.len() == 1 {
368            Ok(batches
369                .into_iter()
370                .next()
371                .ok_or_else(|| Error::Format("No batches loaded".into()))?)
372        } else {
373            use arrow::compute::concat_batches;
374            concat_batches(&self.schema, &batches).map_err(Error::Arrow)
375        }
376    }
377
378    /// Iterate over all chunks
379    pub fn chunks(&self) -> ChunkIterator<'_> {
380        ChunkIterator {
381            dataset: self,
382            current: 0,
383        }
384    }
385
386    /// Load a chunk from the file
387    fn load_chunk(&self, entry: &ChunkEntry) -> Result<RecordBatch> {
388        let file = File::open(&self.path).map_err(|e| Error::io(e, &self.path))?;
389        let mut reader = BufReader::new(file);
390
391        let offset = self.payload_offset + entry.byte_offset;
392        reader
393            .seek(SeekFrom::Start(offset))
394            .map_err(|e| Error::io(e, &self.path))?;
395
396        let mut compressed_data = vec![0u8; entry.compressed_size as usize];
397        reader
398            .read_exact(&mut compressed_data)
399            .map_err(|e| Error::io(e, &self.path))?;
400
401        // Decompress
402        let decompressed = self.decompress(&compressed_data, entry.uncompressed_size as usize)?;
403
404        // Deserialize Arrow IPC
405        Self::deserialize_batch(&decompressed)
406    }
407
408    /// Decompress data based on compression type
409    fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>> {
410        match self.compression {
411            Compression::None => Ok(data.to_vec()),
412            Compression::ZstdL3 | Compression::ZstdL19 => {
413                let mut output = Vec::with_capacity(expected_size);
414                zstd::stream::copy_decode(data, &mut output)
415                    .map_err(|e| Error::Format(format!("Zstd decompression failed: {e}")))?;
416                Ok(output)
417            }
418            Compression::Lz4 => lz4_flex::decompress(data, expected_size)
419                .map_err(|e| Error::Format(format!("LZ4 decompression failed: {e}"))),
420        }
421    }
422
423    /// Deserialize Arrow schema from bytes
424    fn deserialize_schema(bytes: &[u8]) -> Result<SchemaRef> {
425        use std::io::Cursor;
426
427        use arrow::ipc::reader::StreamReader;
428
429        let cursor = Cursor::new(bytes);
430        let reader = StreamReader::try_new(cursor, None).map_err(Error::Arrow)?;
431        Ok(reader.schema())
432    }
433
434    /// Deserialize Arrow batch from bytes
435    fn deserialize_batch(bytes: &[u8]) -> Result<RecordBatch> {
436        use std::io::Cursor;
437
438        use arrow::ipc::reader::StreamReader;
439
440        let cursor = Cursor::new(bytes);
441        let mut reader = StreamReader::try_new(cursor, None).map_err(Error::Arrow)?;
442
443        reader
444            .next()
445            .ok_or_else(|| Error::Format("No batch in IPC data".into()))?
446            .map_err(Error::Arrow)
447    }
448}
449
450impl std::fmt::Debug for StreamingDataset {
451    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452        f.debug_struct("StreamingDataset")
453            .field("path", &self.path)
454            .field("num_rows", &self.num_rows())
455            .field("num_chunks", &self.num_chunks())
456            .field("compression", &self.compression)
457            .finish_non_exhaustive()
458    }
459}
460
461/// Iterator over chunks in a streaming dataset
462pub struct ChunkIterator<'a> {
463    dataset: &'a StreamingDataset,
464    current: usize,
465}
466
467impl Iterator for ChunkIterator<'_> {
468    type Item = Result<RecordBatch>;
469
470    fn next(&mut self) -> Option<Self::Item> {
471        if self.current >= self.dataset.num_chunks() {
472            return None;
473        }
474
475        let result = self.dataset.get_chunk(self.current);
476        self.current += 1;
477        Some(result)
478    }
479
480    fn size_hint(&self) -> (usize, Option<usize>) {
481        let remaining = self.dataset.num_chunks() - self.current;
482        (remaining, Some(remaining))
483    }
484}
485
486impl ExactSizeIterator for ChunkIterator<'_> {}
487
488/// Save a dataset in streaming format
489///
490/// # Arguments
491/// * `batches` - Iterator of record batches to save
492/// * `schema` - Arrow schema
493/// * `path` - Output file path
494/// * `chunk_size` - Rows per chunk (default: 65536)
495/// * `compression` - Compression type to use
496///
497/// # Errors
498///
499/// Returns error if file cannot be written.
500pub fn save_streaming<P, I>(
501    batches: I,
502    schema: &SchemaRef,
503    path: P,
504    chunk_size: Option<usize>,
505    compression: Compression,
506) -> Result<()>
507where
508    P: AsRef<Path>,
509    I: Iterator<Item = RecordBatch>,
510{
511    use std::io::{BufWriter, Write};
512
513    let path = path.as_ref();
514    let chunk_size = chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
515
516    let file = File::create(path).map_err(|e| Error::io(e, path))?;
517    let mut writer = BufWriter::new(file);
518
519    let chunks = collect_chunks(batches, schema, chunk_size, compression)?;
520
521    // Build chunk index
522    let index = ChunkIndex::from_entries(chunks.iter().map(|(e, _)| e.clone()).collect());
523    let index_bytes = index.to_bytes()?;
524
525    // Serialize schema
526    let schema_bytes = serialize_schema(schema)?;
527
528    // Build metadata (minimal for streaming)
529    let metadata = crate::format::Metadata::default();
530    let metadata_bytes = rmp_serde::to_vec(&metadata)
531        .map_err(|e| Error::Format(format!("Failed to serialize metadata: {e}")))?;
532
533    // Write header
534    let mut header = [0u8; HEADER_SIZE];
535    header[0..4].copy_from_slice(&MAGIC);
536    header[4] = crate::format::FORMAT_VERSION_MAJOR;
537    header[5] = crate::format::FORMAT_VERSION_MINOR;
538    header[6] = flags::STREAMING;
539    header[7] = compression.as_u8();
540    // Dataset type (Tabular default)
541    header[8..10].copy_from_slice(&1u16.to_le_bytes());
542    // Reserved
543    header[10..12].copy_from_slice(&[0, 0]);
544    // Section sizes
545    header[12..16].copy_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
546    header[16..20].copy_from_slice(&(schema_bytes.len() as u32).to_le_bytes());
547    header[20..24].copy_from_slice(&(index_bytes.len() as u32).to_le_bytes());
548    // Payload size (sum of all chunks)
549    let payload_size: u64 = chunks
550        .iter()
551        .map(|(e, _)| u64::from(e.compressed_size))
552        .sum();
553    header[24..32].copy_from_slice(&payload_size.to_le_bytes());
554
555    writer.write_all(&header).map_err(|e| Error::io(e, path))?;
556    writer
557        .write_all(&metadata_bytes)
558        .map_err(|e| Error::io(e, path))?;
559    writer
560        .write_all(&schema_bytes)
561        .map_err(|e| Error::io(e, path))?;
562    writer
563        .write_all(&index_bytes)
564        .map_err(|e| Error::io(e, path))?;
565
566    // Write chunk data
567    for (_, data) in &chunks {
568        writer.write_all(data).map_err(|e| Error::io(e, path))?;
569    }
570
571    writer.flush().map_err(|e| Error::io(e, path))?;
572
573    Ok(())
574}
575
576/// Collect all batches into compressed chunks
577fn collect_chunks<I>(
578    batches: I,
579    schema: &SchemaRef,
580    chunk_size: usize,
581    compression: Compression,
582) -> Result<Vec<(ChunkEntry, Vec<u8>)>>
583where
584    I: Iterator<Item = RecordBatch>,
585{
586    let mut chunks: Vec<(ChunkEntry, Vec<u8>)> = Vec::new();
587    let mut current_rows: Vec<RecordBatch> = Vec::new();
588    let mut current_row_count = 0usize;
589    let mut total_row_offset = 0u64;
590    let mut byte_offset = 0u64;
591
592    for batch in batches {
593        current_rows.push(batch.clone());
594        current_row_count += batch.num_rows();
595
596        while current_row_count >= chunk_size {
597            let (chunk_batch, remaining) = split_batches(&current_rows, chunk_size, schema)?;
598            let (entry, data) =
599                build_chunk(&chunk_batch, total_row_offset, byte_offset, compression)?;
600
601            total_row_offset += u64::from(entry.num_rows);
602            byte_offset += u64::from(entry.compressed_size);
603
604            chunks.push((entry, data));
605
606            current_rows = remaining;
607            current_row_count = current_rows.iter().map(RecordBatch::num_rows).sum();
608        }
609    }
610
611    if !current_rows.is_empty() {
612        let chunk_batch = concat_batches_vec(&current_rows, schema)?;
613        let (entry, data) = build_chunk(&chunk_batch, total_row_offset, byte_offset, compression)?;
614        chunks.push((entry, data));
615    }
616
617    Ok(chunks)
618}
619
620/// Build a chunk from a record batch
621fn build_chunk(
622    batch: &RecordBatch,
623    row_offset: u64,
624    byte_offset: u64,
625    compression: Compression,
626) -> Result<(ChunkEntry, Vec<u8>)> {
627    // Serialize batch to Arrow IPC
628    let uncompressed = serialize_batch(batch)?;
629    let uncompressed_size = uncompressed.len();
630
631    // Compress
632    let compressed = match compression {
633        Compression::None => uncompressed,
634        Compression::ZstdL3 => zstd::encode_all(uncompressed.as_slice(), 3)
635            .map_err(|e| Error::Format(format!("Zstd compression failed: {e}")))?,
636        Compression::ZstdL19 => zstd::encode_all(uncompressed.as_slice(), 19)
637            .map_err(|e| Error::Format(format!("Zstd compression failed: {e}")))?,
638        Compression::Lz4 => lz4_flex::compress_prepend_size(&uncompressed),
639    };
640
641    let entry = ChunkEntry::new(
642        row_offset,
643        batch.num_rows() as u32,
644        byte_offset,
645        compressed.len() as u32,
646        uncompressed_size as u32,
647    );
648
649    Ok((entry, compressed))
650}
651
652/// Serialize Arrow schema to bytes
653fn serialize_schema(schema: &SchemaRef) -> Result<Vec<u8>> {
654    use arrow::ipc::writer::StreamWriter;
655
656    let mut buf = Vec::new();
657    {
658        let mut writer = StreamWriter::try_new(&mut buf, schema).map_err(Error::Arrow)?;
659        writer.finish().map_err(Error::Arrow)?;
660    }
661    Ok(buf)
662}
663
664/// Serialize Arrow batch to bytes
665fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>> {
666    use arrow::ipc::writer::StreamWriter;
667
668    let mut buf = Vec::new();
669    {
670        let mut writer = StreamWriter::try_new(&mut buf, &batch.schema()).map_err(Error::Arrow)?;
671        writer.write(batch).map_err(Error::Arrow)?;
672        writer.finish().map_err(Error::Arrow)?;
673    }
674    Ok(buf)
675}
676
677/// Split batches to get exactly chunk_size rows
678fn split_batches(
679    batches: &[RecordBatch],
680    chunk_size: usize,
681    schema: &SchemaRef,
682) -> Result<(RecordBatch, Vec<RecordBatch>)> {
683    use arrow::compute::concat_batches;
684
685    // Concatenate all batches
686    let combined = concat_batches(schema, batches).map_err(Error::Arrow)?;
687
688    if combined.num_rows() <= chunk_size {
689        return Ok((combined, Vec::new()));
690    }
691
692    // Split at chunk_size
693    let chunk = combined.slice(0, chunk_size);
694    let remaining = combined.slice(chunk_size, combined.num_rows() - chunk_size);
695
696    Ok((chunk, vec![remaining]))
697}
698
699/// Concatenate batches into one
700fn concat_batches_vec(batches: &[RecordBatch], schema: &SchemaRef) -> Result<RecordBatch> {
701    use arrow::compute::concat_batches;
702    concat_batches(schema, batches).map_err(Error::Arrow)
703}
704
705#[cfg(test)]
706mod tests {
707    use std::sync::Arc;
708
709    use arrow::{
710        array::{Float64Array, Int32Array},
711        datatypes::{DataType, Field, Schema},
712    };
713    use tempfile::NamedTempFile;
714
715    use super::*;
716
717    /// Helper to create a test batch with n rows
718    fn make_test_batch(n: usize, offset: usize) -> RecordBatch {
719        let schema = Arc::new(Schema::new(vec![
720            Field::new("id", DataType::Int32, false),
721            Field::new("value", DataType::Float64, false),
722        ]));
723
724        let ids: Vec<i32> = (offset..offset + n).map(|i| i as i32).collect();
725        let values: Vec<f64> = (offset..offset + n).map(|i| i as f64 * 1.5).collect();
726
727        RecordBatch::try_new(
728            schema,
729            vec![
730                Arc::new(Int32Array::from(ids)),
731                Arc::new(Float64Array::from(values)),
732            ],
733        )
734        .expect("batch creation")
735    }
736
737    fn test_schema() -> SchemaRef {
738        Arc::new(Schema::new(vec![
739            Field::new("id", DataType::Int32, false),
740            Field::new("value", DataType::Float64, false),
741        ]))
742    }
743
744    // ========== ChunkEntry tests ==========
745
746    #[test]
747    fn test_chunk_entry_new() {
748        let entry = ChunkEntry::new(100, 50, 1000, 500, 800);
749
750        assert_eq!(entry.row_offset, 100);
751        assert_eq!(entry.num_rows, 50);
752        assert_eq!(entry.byte_offset, 1000);
753        assert_eq!(entry.compressed_size, 500);
754        assert_eq!(entry.uncompressed_size, 800);
755    }
756
757    #[test]
758    fn test_chunk_entry_contains_row() {
759        let entry = ChunkEntry::new(100, 50, 0, 0, 0);
760
761        assert!(!entry.contains_row(99));
762        assert!(entry.contains_row(100));
763        assert!(entry.contains_row(125));
764        assert!(entry.contains_row(149));
765        assert!(!entry.contains_row(150));
766    }
767
768    #[test]
769    fn test_chunk_entry_end_row() {
770        let entry = ChunkEntry::new(100, 50, 0, 0, 0);
771        assert_eq!(entry.end_row(), 150);
772    }
773
774    // ========== ChunkIndex tests ==========
775
776    #[test]
777    fn test_chunk_index_new_empty() {
778        let index = ChunkIndex::new();
779
780        assert!(index.is_empty());
781        assert_eq!(index.len(), 0);
782        assert_eq!(index.total_rows(), 0);
783    }
784
785    #[test]
786    fn test_chunk_index_push() {
787        let mut index = ChunkIndex::new();
788
789        index.push(ChunkEntry::new(0, 100, 0, 500, 800));
790        assert_eq!(index.len(), 1);
791        assert_eq!(index.total_rows(), 100);
792
793        index.push(ChunkEntry::new(100, 100, 500, 500, 800));
794        assert_eq!(index.len(), 2);
795        assert_eq!(index.total_rows(), 200);
796    }
797
798    #[test]
799    fn test_chunk_index_from_entries() {
800        let entries = vec![
801            ChunkEntry::new(0, 100, 0, 500, 800),
802            ChunkEntry::new(100, 100, 500, 500, 800),
803            ChunkEntry::new(200, 50, 1000, 250, 400),
804        ];
805
806        let index = ChunkIndex::from_entries(entries);
807
808        assert_eq!(index.len(), 3);
809        assert_eq!(index.total_rows(), 250);
810    }
811
812    #[test]
813    fn test_chunk_index_get() {
814        let entries = vec![
815            ChunkEntry::new(0, 100, 0, 500, 800),
816            ChunkEntry::new(100, 100, 500, 500, 800),
817        ];
818        let index = ChunkIndex::from_entries(entries);
819
820        assert!(index.get(0).is_some());
821        assert!(index.get(1).is_some());
822        assert!(index.get(2).is_none());
823
824        assert_eq!(index.get(0).map(|e| e.row_offset), Some(0));
825        assert_eq!(index.get(1).map(|e| e.row_offset), Some(100));
826    }
827
828    #[test]
829    fn test_chunk_index_find_chunk_for_row() {
830        let entries = vec![
831            ChunkEntry::new(0, 100, 0, 500, 800),
832            ChunkEntry::new(100, 100, 500, 500, 800),
833            ChunkEntry::new(200, 50, 1000, 250, 400),
834        ];
835        let index = ChunkIndex::from_entries(entries);
836
837        assert_eq!(index.find_chunk_for_row(0), Some(0));
838        assert_eq!(index.find_chunk_for_row(50), Some(0));
839        assert_eq!(index.find_chunk_for_row(99), Some(0));
840        assert_eq!(index.find_chunk_for_row(100), Some(1));
841        assert_eq!(index.find_chunk_for_row(150), Some(1));
842        assert_eq!(index.find_chunk_for_row(200), Some(2));
843        assert_eq!(index.find_chunk_for_row(249), Some(2));
844        assert_eq!(index.find_chunk_for_row(250), None);
845        assert_eq!(index.find_chunk_for_row(1000), None);
846    }
847
848    #[test]
849    fn test_chunk_index_serialization() {
850        let entries = vec![
851            ChunkEntry::new(0, 100, 0, 500, 800),
852            ChunkEntry::new(100, 100, 500, 500, 800),
853        ];
854        let index = ChunkIndex::from_entries(entries);
855
856        let bytes = index.to_bytes().expect("serialize");
857        let restored = ChunkIndex::from_bytes(&bytes).expect("deserialize");
858
859        assert_eq!(restored.len(), index.len());
860        assert_eq!(restored.total_rows(), index.total_rows());
861        assert_eq!(restored.get(0), index.get(0));
862        assert_eq!(restored.get(1), index.get(1));
863    }
864
865    // ========== save_streaming tests ==========
866
867    #[test]
868    fn test_save_streaming_creates_file() {
869        let batches = vec![make_test_batch(100, 0), make_test_batch(100, 100)];
870        let schema = test_schema();
871
872        let temp = NamedTempFile::new().expect("temp file");
873        let path = temp.path();
874
875        save_streaming(
876            batches.into_iter(),
877            &schema,
878            path,
879            Some(64),
880            Compression::None,
881        )
882        .expect("save");
883
884        assert!(path.exists());
885        assert!(std::fs::metadata(path).expect("metadata").len() > 0);
886    }
887
888    #[test]
889    fn test_save_streaming_with_compression() {
890        let batches = vec![make_test_batch(1000, 0)];
891        let schema = test_schema();
892
893        let temp_none = NamedTempFile::new().expect("temp");
894        let temp_zstd = NamedTempFile::new().expect("temp");
895
896        save_streaming(
897            batches.clone().into_iter(),
898            &schema,
899            temp_none.path(),
900            Some(500),
901            Compression::None,
902        )
903        .expect("save none");
904
905        save_streaming(
906            batches.into_iter(),
907            &schema,
908            temp_zstd.path(),
909            Some(500),
910            Compression::ZstdL3,
911        )
912        .expect("save zstd");
913
914        let size_none = std::fs::metadata(temp_none.path()).expect("meta").len();
915        let size_zstd = std::fs::metadata(temp_zstd.path()).expect("meta").len();
916
917        // Compressed should be smaller
918        assert!(
919            size_zstd < size_none,
920            "Zstd should compress: {size_zstd} >= {size_none}"
921        );
922    }
923
924    // ========== StreamingDataset::open tests ==========
925
926    #[test]
927    fn test_streaming_dataset_open() {
928        let batches = vec![make_test_batch(100, 0), make_test_batch(100, 100)];
929        let schema = test_schema();
930
931        let temp = NamedTempFile::new().expect("temp");
932        save_streaming(
933            batches.into_iter(),
934            &schema,
935            temp.path(),
936            Some(64),
937            Compression::ZstdL3,
938        )
939        .expect("save");
940
941        let dataset = StreamingDataset::open(temp.path()).expect("open");
942
943        assert_eq!(dataset.num_rows(), 200);
944        assert!(dataset.num_chunks() > 0);
945    }
946
947    #[test]
948    fn test_streaming_dataset_rejects_non_streaming_file() {
949        // Test that opening a non-existent file fails
950        let result = StreamingDataset::open("/nonexistent/path.ald");
951        assert!(result.is_err());
952    }
953
954    // ========== get_chunk tests ==========
955
956    #[test]
957    fn test_get_chunk_returns_correct_data() {
958        let batch1 = make_test_batch(100, 0);
959        let batch2 = make_test_batch(100, 100);
960        let schema = test_schema();
961
962        let temp = NamedTempFile::new().expect("temp");
963        save_streaming(
964            vec![batch1, batch2].into_iter(),
965            &schema,
966            temp.path(),
967            Some(100),
968            Compression::None,
969        )
970        .expect("save");
971
972        let dataset = StreamingDataset::open(temp.path()).expect("open");
973
974        let chunk0 = dataset.get_chunk(0).expect("chunk 0");
975        assert_eq!(chunk0.num_rows(), 100);
976
977        // Check first row has id=0
978        let ids = chunk0
979            .column(0)
980            .as_any()
981            .downcast_ref::<Int32Array>()
982            .expect("downcast");
983        assert_eq!(ids.value(0), 0);
984    }
985
986    #[test]
987    fn test_get_chunk_out_of_bounds() {
988        let batches = vec![make_test_batch(100, 0)];
989        let schema = test_schema();
990
991        let temp = NamedTempFile::new().expect("temp");
992        save_streaming(
993            batches.into_iter(),
994            &schema,
995            temp.path(),
996            Some(50),
997            Compression::None,
998        )
999        .expect("save");
1000
1001        let dataset = StreamingDataset::open(temp.path()).expect("open");
1002        let result = dataset.get_chunk(999);
1003
1004        assert!(result.is_err());
1005    }
1006
1007    // ========== get_rows tests ==========
1008
1009    #[test]
1010    fn test_get_rows_within_chunk() {
1011        let batches = vec![make_test_batch(100, 0)];
1012        let schema = test_schema();
1013
1014        let temp = NamedTempFile::new().expect("temp");
1015        save_streaming(
1016            batches.into_iter(),
1017            &schema,
1018            temp.path(),
1019            Some(100),
1020            Compression::None,
1021        )
1022        .expect("save");
1023
1024        let dataset = StreamingDataset::open(temp.path()).expect("open");
1025        let rows = dataset.get_rows(10, 20).expect("get_rows");
1026
1027        assert_eq!(rows.num_rows(), 20);
1028
1029        let ids = rows
1030            .column(0)
1031            .as_any()
1032            .downcast_ref::<Int32Array>()
1033            .expect("downcast");
1034        assert_eq!(ids.value(0), 10);
1035        assert_eq!(ids.value(19), 29);
1036    }
1037
1038    #[test]
1039    fn test_get_rows_spanning_chunks() {
1040        let batches = vec![make_test_batch(50, 0), make_test_batch(50, 50)];
1041        let schema = test_schema();
1042
1043        let temp = NamedTempFile::new().expect("temp");
1044        save_streaming(
1045            batches.into_iter(),
1046            &schema,
1047            temp.path(),
1048            Some(50),
1049            Compression::None,
1050        )
1051        .expect("save");
1052
1053        let dataset = StreamingDataset::open(temp.path()).expect("open");
1054
1055        // Get rows that span the chunk boundary
1056        let rows = dataset.get_rows(40, 20).expect("get_rows");
1057
1058        assert_eq!(rows.num_rows(), 20);
1059
1060        let ids = rows
1061            .column(0)
1062            .as_any()
1063            .downcast_ref::<Int32Array>()
1064            .expect("downcast");
1065        assert_eq!(ids.value(0), 40);
1066        assert_eq!(ids.value(9), 49); // End of first chunk
1067        assert_eq!(ids.value(10), 50); // Start of second chunk
1068        assert_eq!(ids.value(19), 59);
1069    }
1070
1071    #[test]
1072    fn test_get_rows_out_of_bounds() {
1073        let batches = vec![make_test_batch(100, 0)];
1074        let schema = test_schema();
1075
1076        let temp = NamedTempFile::new().expect("temp");
1077        save_streaming(
1078            batches.into_iter(),
1079            &schema,
1080            temp.path(),
1081            Some(50),
1082            Compression::None,
1083        )
1084        .expect("save");
1085
1086        let dataset = StreamingDataset::open(temp.path()).expect("open");
1087
1088        let result = dataset.get_rows(200, 10);
1089        assert!(result.is_err());
1090    }
1091
1092    // ========== chunks iterator tests ==========
1093
1094    #[test]
1095    fn test_chunks_iterator() {
1096        let batches = vec![
1097            make_test_batch(100, 0),
1098            make_test_batch(100, 100),
1099            make_test_batch(50, 200),
1100        ];
1101        let schema = test_schema();
1102
1103        let temp = NamedTempFile::new().expect("temp");
1104        save_streaming(
1105            batches.into_iter(),
1106            &schema,
1107            temp.path(),
1108            Some(100),
1109            Compression::None,
1110        )
1111        .expect("save");
1112
1113        let dataset = StreamingDataset::open(temp.path()).expect("open");
1114
1115        let mut total_rows = 0;
1116        for chunk_result in dataset.chunks() {
1117            let chunk = chunk_result.expect("chunk");
1118            total_rows += chunk.num_rows();
1119        }
1120
1121        assert_eq!(total_rows, 250);
1122    }
1123
1124    #[test]
1125    fn test_chunks_iterator_size_hint() {
1126        let batches = vec![make_test_batch(200, 0)];
1127        let schema = test_schema();
1128
1129        let temp = NamedTempFile::new().expect("temp");
1130        save_streaming(
1131            batches.into_iter(),
1132            &schema,
1133            temp.path(),
1134            Some(50),
1135            Compression::None,
1136        )
1137        .expect("save");
1138
1139        let dataset = StreamingDataset::open(temp.path()).expect("open");
1140        let chunks = dataset.chunks();
1141
1142        let (lower, upper) = chunks.size_hint();
1143        assert_eq!(lower, dataset.num_chunks());
1144        assert_eq!(upper, Some(dataset.num_chunks()));
1145    }
1146
1147    #[test]
1148    fn test_chunk_index_iter() {
1149        let entries = vec![
1150            ChunkEntry::new(0, 100, 0, 500, 800),
1151            ChunkEntry::new(100, 100, 500, 500, 800),
1152        ];
1153        let index = ChunkIndex::from_entries(entries);
1154
1155        let collected: Vec<_> = index.iter().collect();
1156        assert_eq!(collected.len(), 2);
1157        assert_eq!(collected[0].row_offset, 0);
1158        assert_eq!(collected[1].row_offset, 100);
1159    }
1160
1161    #[test]
1162    fn test_chunk_index_from_entries_empty() {
1163        let index = ChunkIndex::from_entries(vec![]);
1164        assert!(index.is_empty());
1165        assert_eq!(index.total_rows(), 0);
1166    }
1167
1168    #[test]
1169    fn test_streaming_dataset_schema() {
1170        let batches = vec![make_test_batch(100, 0)];
1171        let schema = test_schema();
1172
1173        let temp = NamedTempFile::new().expect("temp");
1174        save_streaming(
1175            batches.into_iter(),
1176            &schema,
1177            temp.path(),
1178            Some(50),
1179            Compression::None,
1180        )
1181        .expect("save");
1182
1183        let dataset = StreamingDataset::open(temp.path()).expect("open");
1184        let ds_schema = dataset.schema();
1185        assert_eq!(ds_schema.fields().len(), 2);
1186        assert_eq!(ds_schema.field(0).name(), "id");
1187        assert_eq!(ds_schema.field(1).name(), "value");
1188    }
1189
1190    #[test]
1191    fn test_streaming_dataset_debug() {
1192        let batches = vec![make_test_batch(100, 0)];
1193        let schema = test_schema();
1194
1195        let temp = NamedTempFile::new().expect("temp");
1196        save_streaming(
1197            batches.into_iter(),
1198            &schema,
1199            temp.path(),
1200            Some(50),
1201            Compression::None,
1202        )
1203        .expect("save");
1204
1205        let dataset = StreamingDataset::open(temp.path()).expect("open");
1206        let debug = format!("{:?}", dataset);
1207        assert!(debug.contains("StreamingDataset"));
1208        assert!(debug.contains("num_rows"));
1209        assert!(debug.contains("num_chunks"));
1210    }
1211
1212    #[test]
1213    fn test_save_streaming_with_lz4_compression() {
1214        let batches = vec![make_test_batch(1000, 0)];
1215        let schema = test_schema();
1216
1217        let temp = NamedTempFile::new().expect("temp");
1218        save_streaming(
1219            batches.into_iter(),
1220            &schema,
1221            temp.path(),
1222            Some(500),
1223            Compression::Lz4,
1224        )
1225        .expect("save");
1226
1227        let dataset = StreamingDataset::open(temp.path()).expect("open");
1228        assert_eq!(dataset.num_rows(), 1000);
1229    }
1230
1231    #[test]
1232    fn test_save_streaming_with_zstd_l19_compression() {
1233        let batches = vec![make_test_batch(500, 0)];
1234        let schema = test_schema();
1235
1236        let temp = NamedTempFile::new().expect("temp");
1237        save_streaming(
1238            batches.into_iter(),
1239            &schema,
1240            temp.path(),
1241            Some(250),
1242            Compression::ZstdL19,
1243        )
1244        .expect("save");
1245
1246        let dataset = StreamingDataset::open(temp.path()).expect("open");
1247        assert_eq!(dataset.num_rows(), 500);
1248    }
1249
1250    #[test]
1251    fn test_save_streaming_default_chunk_size() {
1252        let batches = vec![make_test_batch(100, 0)];
1253        let schema = test_schema();
1254
1255        let temp = NamedTempFile::new().expect("temp");
1256        save_streaming(
1257            batches.into_iter(),
1258            &schema,
1259            temp.path(),
1260            None, // Use default chunk size
1261            Compression::None,
1262        )
1263        .expect("save");
1264
1265        let dataset = StreamingDataset::open(temp.path()).expect("open");
1266        assert_eq!(dataset.num_rows(), 100);
1267    }
1268
1269    #[test]
1270    fn test_get_rows_clamps_to_end() {
1271        let batches = vec![make_test_batch(100, 0)];
1272        let schema = test_schema();
1273
1274        let temp = NamedTempFile::new().expect("temp");
1275        save_streaming(
1276            batches.into_iter(),
1277            &schema,
1278            temp.path(),
1279            Some(50),
1280            Compression::None,
1281        )
1282        .expect("save");
1283
1284        let dataset = StreamingDataset::open(temp.path()).expect("open");
1285        // Request more rows than available
1286        let rows = dataset.get_rows(90, 50).expect("get_rows");
1287        assert_eq!(rows.num_rows(), 10); // Should clamp to 10 remaining rows
1288    }
1289
1290    #[test]
1291    fn test_chunk_entry_clone_and_eq() {
1292        let entry1 = ChunkEntry::new(0, 100, 0, 500, 800);
1293        let entry2 = entry1.clone();
1294        assert_eq!(entry1, entry2);
1295    }
1296
1297    #[test]
1298    fn test_chunk_index_clone() {
1299        let entries = vec![
1300            ChunkEntry::new(0, 100, 0, 500, 800),
1301            ChunkEntry::new(100, 100, 500, 500, 800),
1302        ];
1303        let index = ChunkIndex::from_entries(entries);
1304        let cloned = index.clone();
1305
1306        assert_eq!(cloned.len(), index.len());
1307        assert_eq!(cloned.total_rows(), index.total_rows());
1308    }
1309
1310    #[test]
1311    fn test_chunk_entry_debug() {
1312        let entry = ChunkEntry::new(0, 100, 0, 500, 800);
1313        let debug = format!("{:?}", entry);
1314        assert!(debug.contains("ChunkEntry"));
1315        assert!(debug.contains("row_offset"));
1316    }
1317
1318    #[test]
1319    fn test_chunk_index_debug() {
1320        let index = ChunkIndex::new();
1321        let debug = format!("{:?}", index);
1322        assert!(debug.contains("ChunkIndex"));
1323    }
1324
1325    #[test]
1326    fn test_constants() {
1327        assert_eq!(DEFAULT_CHUNK_SIZE, 65536);
1328        assert_eq!(STREAMING_THRESHOLD, 100 * 1024 * 1024);
1329    }
1330
1331    #[test]
1332    fn test_chunks_iterator_exact_size() {
1333        let batches = vec![make_test_batch(200, 0)];
1334        let schema = test_schema();
1335
1336        let temp = NamedTempFile::new().expect("temp");
1337        save_streaming(
1338            batches.into_iter(),
1339            &schema,
1340            temp.path(),
1341            Some(50),
1342            Compression::None,
1343        )
1344        .expect("save");
1345
1346        let dataset = StreamingDataset::open(temp.path()).expect("open");
1347        let chunks = dataset.chunks();
1348
1349        // ExactSizeIterator provides len()
1350        assert_eq!(chunks.len(), dataset.num_chunks());
1351    }
1352}