1#![allow(clippy::cast_possible_truncation)]
23
24use std::{
25 fs::File,
26 io::{BufReader, Read, Seek, SeekFrom},
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30
31use arrow::{array::RecordBatch, datatypes::SchemaRef};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35 error::{Error, Result},
36 format::{flags, Compression, HEADER_SIZE, MAGIC},
37};
38
39pub const DEFAULT_CHUNK_SIZE: usize = 65536; pub const STREAMING_THRESHOLD: u64 = 100 * 1024 * 1024;
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct ChunkEntry {
48 pub row_offset: u64,
50 pub num_rows: u32,
52 pub byte_offset: u64,
54 pub compressed_size: u32,
56 pub uncompressed_size: u32,
58}
59
60impl ChunkEntry {
61 pub fn new(
63 row_offset: u64,
64 num_rows: u32,
65 byte_offset: u64,
66 compressed_size: u32,
67 uncompressed_size: u32,
68 ) -> Self {
69 Self {
70 row_offset,
71 num_rows,
72 byte_offset,
73 compressed_size,
74 uncompressed_size,
75 }
76 }
77
78 pub fn contains_row(&self, row: u64) -> bool {
80 row >= self.row_offset && row < self.row_offset + u64::from(self.num_rows)
81 }
82
83 pub fn end_row(&self) -> u64 {
85 self.row_offset + u64::from(self.num_rows)
86 }
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
91pub struct ChunkIndex {
92 entries: Vec<ChunkEntry>,
94 total_rows: u64,
96}
97
98impl ChunkIndex {
99 pub fn new() -> Self {
101 Self::default()
102 }
103
104 pub fn from_entries(entries: Vec<ChunkEntry>) -> Self {
106 let total_rows = entries.last().map_or(0, ChunkEntry::end_row);
107 Self {
108 entries,
109 total_rows,
110 }
111 }
112
113 pub fn push(&mut self, entry: ChunkEntry) {
115 self.total_rows = entry.end_row();
116 self.entries.push(entry);
117 }
118
119 pub fn len(&self) -> usize {
121 self.entries.len()
122 }
123
124 pub fn is_empty(&self) -> bool {
126 self.entries.is_empty()
127 }
128
129 pub fn total_rows(&self) -> u64 {
131 self.total_rows
132 }
133
134 pub fn get(&self, index: usize) -> Option<&ChunkEntry> {
136 self.entries.get(index)
137 }
138
139 pub fn find_chunk_for_row(&self, row: u64) -> Option<usize> {
141 if row >= self.total_rows {
142 return None;
143 }
144 self.entries
146 .binary_search_by(|entry| {
147 if row < entry.row_offset {
148 std::cmp::Ordering::Greater
149 } else if row >= entry.end_row() {
150 std::cmp::Ordering::Less
151 } else {
152 std::cmp::Ordering::Equal
153 }
154 })
155 .ok()
156 }
157
158 pub fn iter(&self) -> impl Iterator<Item = &ChunkEntry> {
160 self.entries.iter()
161 }
162
163 pub fn to_bytes(&self) -> Result<Vec<u8>> {
165 rmp_serde::to_vec(self)
166 .map_err(|e| Error::Format(format!("Failed to serialize chunk index: {e}")))
167 }
168
169 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
171 rmp_serde::from_slice(bytes)
172 .map_err(|e| Error::Format(format!("Failed to deserialize chunk index: {e}")))
173 }
174}
175
176pub struct StreamingDataset {
180 path: PathBuf,
182 index: ChunkIndex,
184 schema: SchemaRef,
186 compression: Compression,
188 payload_offset: u64,
190}
191
192impl StreamingDataset {
193 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
203 let path = path.as_ref();
204 let file = File::open(path).map_err(|e| Error::io(e, path))?;
205 let mut reader = BufReader::new(file);
206
207 let mut header = [0u8; HEADER_SIZE];
209 reader
210 .read_exact(&mut header)
211 .map_err(|e| Error::io(e, path))?;
212
213 if header[0..4] != MAGIC {
215 return Err(Error::Format("Invalid magic bytes".into()));
216 }
217
218 let header_flags = header[6];
220 if header_flags & flags::STREAMING == 0 {
221 return Err(Error::Format(
222 "File does not have STREAMING flag set. Use regular load() instead.".into(),
223 ));
224 }
225
226 let compression = Compression::from_u8(header[7])
228 .ok_or_else(|| Error::Format(format!("Unknown compression type: {}", header[7])))?;
229
230 let metadata_size = u64::from(u32::from_le_bytes([
232 header[12], header[13], header[14], header[15],
233 ]));
234 let schema_size = u64::from(u32::from_le_bytes([
235 header[16], header[17], header[18], header[19],
236 ]));
237 let index_size = u64::from(u32::from_le_bytes([
238 header[20], header[21], header[22], header[23],
239 ]));
240
241 let schema_offset = u64::from(HEADER_SIZE as u32) + metadata_size;
243 let index_offset = schema_offset + schema_size;
244 let payload_offset = index_offset + index_size;
245
246 reader
248 .seek(SeekFrom::Start(schema_offset))
249 .map_err(|e| Error::io(e, path))?;
250 let mut schema_bytes = vec![0u8; schema_size as usize];
251 reader
252 .read_exact(&mut schema_bytes)
253 .map_err(|e| Error::io(e, path))?;
254 let schema = Self::deserialize_schema(&schema_bytes)?;
255
256 let mut index_bytes = vec![0u8; index_size as usize];
258 reader
259 .read_exact(&mut index_bytes)
260 .map_err(|e| Error::io(e, path))?;
261 let index = ChunkIndex::from_bytes(&index_bytes)?;
262
263 Ok(Self {
264 path: path.to_path_buf(),
265 index,
266 schema,
267 compression,
268 payload_offset,
269 })
270 }
271
272 pub fn num_rows(&self) -> u64 {
274 self.index.total_rows()
275 }
276
277 pub fn num_chunks(&self) -> usize {
279 self.index.len()
280 }
281
282 pub fn schema(&self) -> SchemaRef {
284 Arc::clone(&self.schema)
285 }
286
287 pub fn get_chunk(&self, chunk_idx: usize) -> Result<RecordBatch> {
293 let entry = self
294 .index
295 .get(chunk_idx)
296 .ok_or_else(|| Error::IndexOutOfBounds {
297 index: chunk_idx,
298 len: self.index.len(),
299 })?;
300
301 self.load_chunk(entry)
302 }
303
304 pub fn get_rows(&self, start: u64, count: u64) -> Result<RecordBatch> {
310 if start >= self.num_rows() {
311 return Err(Error::IndexOutOfBounds {
312 index: start as usize,
313 len: self.num_rows() as usize,
314 });
315 }
316
317 let end = (start + count).min(self.num_rows());
318 let actual_count = end - start;
319
320 let start_chunk =
322 self.index
323 .find_chunk_for_row(start)
324 .ok_or_else(|| Error::IndexOutOfBounds {
325 index: start as usize,
326 len: self.num_rows() as usize,
327 })?;
328
329 let end_chunk = self
330 .index
331 .find_chunk_for_row(end.saturating_sub(1))
332 .unwrap_or(start_chunk);
333
334 let mut batches = Vec::new();
336 let mut remaining = actual_count;
337 let mut current_row = start;
338
339 for chunk_idx in start_chunk..=end_chunk {
340 let entry = self
341 .index
342 .get(chunk_idx)
343 .ok_or_else(|| Error::IndexOutOfBounds {
344 index: chunk_idx,
345 len: self.index.len(),
346 })?;
347
348 let batch = self.load_chunk(entry)?;
349
350 let chunk_start = if current_row > entry.row_offset {
352 (current_row - entry.row_offset) as usize
353 } else {
354 0
355 };
356
357 let chunk_take = remaining.min(u64::from(entry.num_rows) - chunk_start as u64) as usize;
358
359 let sliced = batch.slice(chunk_start, chunk_take);
360 batches.push(sliced);
361
362 remaining -= chunk_take as u64;
363 current_row += chunk_take as u64;
364 }
365
366 if batches.len() == 1 {
368 Ok(batches
369 .into_iter()
370 .next()
371 .ok_or_else(|| Error::Format("No batches loaded".into()))?)
372 } else {
373 use arrow::compute::concat_batches;
374 concat_batches(&self.schema, &batches).map_err(Error::Arrow)
375 }
376 }
377
378 pub fn chunks(&self) -> ChunkIterator<'_> {
380 ChunkIterator {
381 dataset: self,
382 current: 0,
383 }
384 }
385
386 fn load_chunk(&self, entry: &ChunkEntry) -> Result<RecordBatch> {
388 let file = File::open(&self.path).map_err(|e| Error::io(e, &self.path))?;
389 let mut reader = BufReader::new(file);
390
391 let offset = self.payload_offset + entry.byte_offset;
392 reader
393 .seek(SeekFrom::Start(offset))
394 .map_err(|e| Error::io(e, &self.path))?;
395
396 let mut compressed_data = vec![0u8; entry.compressed_size as usize];
397 reader
398 .read_exact(&mut compressed_data)
399 .map_err(|e| Error::io(e, &self.path))?;
400
401 let decompressed = self.decompress(&compressed_data, entry.uncompressed_size as usize)?;
403
404 Self::deserialize_batch(&decompressed)
406 }
407
408 fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>> {
410 match self.compression {
411 Compression::None => Ok(data.to_vec()),
412 Compression::ZstdL3 | Compression::ZstdL19 => {
413 let mut output = Vec::with_capacity(expected_size);
414 zstd::stream::copy_decode(data, &mut output)
415 .map_err(|e| Error::Format(format!("Zstd decompression failed: {e}")))?;
416 Ok(output)
417 }
418 Compression::Lz4 => lz4_flex::decompress(data, expected_size)
419 .map_err(|e| Error::Format(format!("LZ4 decompression failed: {e}"))),
420 }
421 }
422
423 fn deserialize_schema(bytes: &[u8]) -> Result<SchemaRef> {
425 use std::io::Cursor;
426
427 use arrow::ipc::reader::StreamReader;
428
429 let cursor = Cursor::new(bytes);
430 let reader = StreamReader::try_new(cursor, None).map_err(Error::Arrow)?;
431 Ok(reader.schema())
432 }
433
434 fn deserialize_batch(bytes: &[u8]) -> Result<RecordBatch> {
436 use std::io::Cursor;
437
438 use arrow::ipc::reader::StreamReader;
439
440 let cursor = Cursor::new(bytes);
441 let mut reader = StreamReader::try_new(cursor, None).map_err(Error::Arrow)?;
442
443 reader
444 .next()
445 .ok_or_else(|| Error::Format("No batch in IPC data".into()))?
446 .map_err(Error::Arrow)
447 }
448}
449
450impl std::fmt::Debug for StreamingDataset {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 f.debug_struct("StreamingDataset")
453 .field("path", &self.path)
454 .field("num_rows", &self.num_rows())
455 .field("num_chunks", &self.num_chunks())
456 .field("compression", &self.compression)
457 .finish_non_exhaustive()
458 }
459}
460
461pub struct ChunkIterator<'a> {
463 dataset: &'a StreamingDataset,
464 current: usize,
465}
466
467impl Iterator for ChunkIterator<'_> {
468 type Item = Result<RecordBatch>;
469
470 fn next(&mut self) -> Option<Self::Item> {
471 if self.current >= self.dataset.num_chunks() {
472 return None;
473 }
474
475 let result = self.dataset.get_chunk(self.current);
476 self.current += 1;
477 Some(result)
478 }
479
480 fn size_hint(&self) -> (usize, Option<usize>) {
481 let remaining = self.dataset.num_chunks() - self.current;
482 (remaining, Some(remaining))
483 }
484}
485
486impl ExactSizeIterator for ChunkIterator<'_> {}
487
488pub fn save_streaming<P, I>(
501 batches: I,
502 schema: &SchemaRef,
503 path: P,
504 chunk_size: Option<usize>,
505 compression: Compression,
506) -> Result<()>
507where
508 P: AsRef<Path>,
509 I: Iterator<Item = RecordBatch>,
510{
511 use std::io::{BufWriter, Write};
512
513 let path = path.as_ref();
514 let chunk_size = chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
515
516 let file = File::create(path).map_err(|e| Error::io(e, path))?;
517 let mut writer = BufWriter::new(file);
518
519 let chunks = collect_chunks(batches, schema, chunk_size, compression)?;
520
521 let index = ChunkIndex::from_entries(chunks.iter().map(|(e, _)| e.clone()).collect());
523 let index_bytes = index.to_bytes()?;
524
525 let schema_bytes = serialize_schema(schema)?;
527
528 let metadata = crate::format::Metadata::default();
530 let metadata_bytes = rmp_serde::to_vec(&metadata)
531 .map_err(|e| Error::Format(format!("Failed to serialize metadata: {e}")))?;
532
533 let mut header = [0u8; HEADER_SIZE];
535 header[0..4].copy_from_slice(&MAGIC);
536 header[4] = crate::format::FORMAT_VERSION_MAJOR;
537 header[5] = crate::format::FORMAT_VERSION_MINOR;
538 header[6] = flags::STREAMING;
539 header[7] = compression.as_u8();
540 header[8..10].copy_from_slice(&1u16.to_le_bytes());
542 header[10..12].copy_from_slice(&[0, 0]);
544 header[12..16].copy_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
546 header[16..20].copy_from_slice(&(schema_bytes.len() as u32).to_le_bytes());
547 header[20..24].copy_from_slice(&(index_bytes.len() as u32).to_le_bytes());
548 let payload_size: u64 = chunks
550 .iter()
551 .map(|(e, _)| u64::from(e.compressed_size))
552 .sum();
553 header[24..32].copy_from_slice(&payload_size.to_le_bytes());
554
555 writer.write_all(&header).map_err(|e| Error::io(e, path))?;
556 writer
557 .write_all(&metadata_bytes)
558 .map_err(|e| Error::io(e, path))?;
559 writer
560 .write_all(&schema_bytes)
561 .map_err(|e| Error::io(e, path))?;
562 writer
563 .write_all(&index_bytes)
564 .map_err(|e| Error::io(e, path))?;
565
566 for (_, data) in &chunks {
568 writer.write_all(data).map_err(|e| Error::io(e, path))?;
569 }
570
571 writer.flush().map_err(|e| Error::io(e, path))?;
572
573 Ok(())
574}
575
576fn collect_chunks<I>(
578 batches: I,
579 schema: &SchemaRef,
580 chunk_size: usize,
581 compression: Compression,
582) -> Result<Vec<(ChunkEntry, Vec<u8>)>>
583where
584 I: Iterator<Item = RecordBatch>,
585{
586 let mut chunks: Vec<(ChunkEntry, Vec<u8>)> = Vec::new();
587 let mut current_rows: Vec<RecordBatch> = Vec::new();
588 let mut current_row_count = 0usize;
589 let mut total_row_offset = 0u64;
590 let mut byte_offset = 0u64;
591
592 for batch in batches {
593 current_rows.push(batch.clone());
594 current_row_count += batch.num_rows();
595
596 while current_row_count >= chunk_size {
597 let (chunk_batch, remaining) = split_batches(¤t_rows, chunk_size, schema)?;
598 let (entry, data) =
599 build_chunk(&chunk_batch, total_row_offset, byte_offset, compression)?;
600
601 total_row_offset += u64::from(entry.num_rows);
602 byte_offset += u64::from(entry.compressed_size);
603
604 chunks.push((entry, data));
605
606 current_rows = remaining;
607 current_row_count = current_rows.iter().map(RecordBatch::num_rows).sum();
608 }
609 }
610
611 if !current_rows.is_empty() {
612 let chunk_batch = concat_batches_vec(¤t_rows, schema)?;
613 let (entry, data) = build_chunk(&chunk_batch, total_row_offset, byte_offset, compression)?;
614 chunks.push((entry, data));
615 }
616
617 Ok(chunks)
618}
619
620fn build_chunk(
622 batch: &RecordBatch,
623 row_offset: u64,
624 byte_offset: u64,
625 compression: Compression,
626) -> Result<(ChunkEntry, Vec<u8>)> {
627 let uncompressed = serialize_batch(batch)?;
629 let uncompressed_size = uncompressed.len();
630
631 let compressed = match compression {
633 Compression::None => uncompressed,
634 Compression::ZstdL3 => zstd::encode_all(uncompressed.as_slice(), 3)
635 .map_err(|e| Error::Format(format!("Zstd compression failed: {e}")))?,
636 Compression::ZstdL19 => zstd::encode_all(uncompressed.as_slice(), 19)
637 .map_err(|e| Error::Format(format!("Zstd compression failed: {e}")))?,
638 Compression::Lz4 => lz4_flex::compress_prepend_size(&uncompressed),
639 };
640
641 let entry = ChunkEntry::new(
642 row_offset,
643 batch.num_rows() as u32,
644 byte_offset,
645 compressed.len() as u32,
646 uncompressed_size as u32,
647 );
648
649 Ok((entry, compressed))
650}
651
652fn serialize_schema(schema: &SchemaRef) -> Result<Vec<u8>> {
654 use arrow::ipc::writer::StreamWriter;
655
656 let mut buf = Vec::new();
657 {
658 let mut writer = StreamWriter::try_new(&mut buf, schema).map_err(Error::Arrow)?;
659 writer.finish().map_err(Error::Arrow)?;
660 }
661 Ok(buf)
662}
663
664fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>> {
666 use arrow::ipc::writer::StreamWriter;
667
668 let mut buf = Vec::new();
669 {
670 let mut writer = StreamWriter::try_new(&mut buf, &batch.schema()).map_err(Error::Arrow)?;
671 writer.write(batch).map_err(Error::Arrow)?;
672 writer.finish().map_err(Error::Arrow)?;
673 }
674 Ok(buf)
675}
676
677fn split_batches(
679 batches: &[RecordBatch],
680 chunk_size: usize,
681 schema: &SchemaRef,
682) -> Result<(RecordBatch, Vec<RecordBatch>)> {
683 use arrow::compute::concat_batches;
684
685 let combined = concat_batches(schema, batches).map_err(Error::Arrow)?;
687
688 if combined.num_rows() <= chunk_size {
689 return Ok((combined, Vec::new()));
690 }
691
692 let chunk = combined.slice(0, chunk_size);
694 let remaining = combined.slice(chunk_size, combined.num_rows() - chunk_size);
695
696 Ok((chunk, vec![remaining]))
697}
698
699fn concat_batches_vec(batches: &[RecordBatch], schema: &SchemaRef) -> Result<RecordBatch> {
701 use arrow::compute::concat_batches;
702 concat_batches(schema, batches).map_err(Error::Arrow)
703}
704
705#[cfg(test)]
706mod tests {
707 use std::sync::Arc;
708
709 use arrow::{
710 array::{Float64Array, Int32Array},
711 datatypes::{DataType, Field, Schema},
712 };
713 use tempfile::NamedTempFile;
714
715 use super::*;
716
717 fn make_test_batch(n: usize, offset: usize) -> RecordBatch {
719 let schema = Arc::new(Schema::new(vec![
720 Field::new("id", DataType::Int32, false),
721 Field::new("value", DataType::Float64, false),
722 ]));
723
724 let ids: Vec<i32> = (offset..offset + n).map(|i| i as i32).collect();
725 let values: Vec<f64> = (offset..offset + n).map(|i| i as f64 * 1.5).collect();
726
727 RecordBatch::try_new(
728 schema,
729 vec![
730 Arc::new(Int32Array::from(ids)),
731 Arc::new(Float64Array::from(values)),
732 ],
733 )
734 .expect("batch creation")
735 }
736
737 fn test_schema() -> SchemaRef {
738 Arc::new(Schema::new(vec![
739 Field::new("id", DataType::Int32, false),
740 Field::new("value", DataType::Float64, false),
741 ]))
742 }
743
744 #[test]
747 fn test_chunk_entry_new() {
748 let entry = ChunkEntry::new(100, 50, 1000, 500, 800);
749
750 assert_eq!(entry.row_offset, 100);
751 assert_eq!(entry.num_rows, 50);
752 assert_eq!(entry.byte_offset, 1000);
753 assert_eq!(entry.compressed_size, 500);
754 assert_eq!(entry.uncompressed_size, 800);
755 }
756
757 #[test]
758 fn test_chunk_entry_contains_row() {
759 let entry = ChunkEntry::new(100, 50, 0, 0, 0);
760
761 assert!(!entry.contains_row(99));
762 assert!(entry.contains_row(100));
763 assert!(entry.contains_row(125));
764 assert!(entry.contains_row(149));
765 assert!(!entry.contains_row(150));
766 }
767
768 #[test]
769 fn test_chunk_entry_end_row() {
770 let entry = ChunkEntry::new(100, 50, 0, 0, 0);
771 assert_eq!(entry.end_row(), 150);
772 }
773
774 #[test]
777 fn test_chunk_index_new_empty() {
778 let index = ChunkIndex::new();
779
780 assert!(index.is_empty());
781 assert_eq!(index.len(), 0);
782 assert_eq!(index.total_rows(), 0);
783 }
784
785 #[test]
786 fn test_chunk_index_push() {
787 let mut index = ChunkIndex::new();
788
789 index.push(ChunkEntry::new(0, 100, 0, 500, 800));
790 assert_eq!(index.len(), 1);
791 assert_eq!(index.total_rows(), 100);
792
793 index.push(ChunkEntry::new(100, 100, 500, 500, 800));
794 assert_eq!(index.len(), 2);
795 assert_eq!(index.total_rows(), 200);
796 }
797
798 #[test]
799 fn test_chunk_index_from_entries() {
800 let entries = vec![
801 ChunkEntry::new(0, 100, 0, 500, 800),
802 ChunkEntry::new(100, 100, 500, 500, 800),
803 ChunkEntry::new(200, 50, 1000, 250, 400),
804 ];
805
806 let index = ChunkIndex::from_entries(entries);
807
808 assert_eq!(index.len(), 3);
809 assert_eq!(index.total_rows(), 250);
810 }
811
812 #[test]
813 fn test_chunk_index_get() {
814 let entries = vec![
815 ChunkEntry::new(0, 100, 0, 500, 800),
816 ChunkEntry::new(100, 100, 500, 500, 800),
817 ];
818 let index = ChunkIndex::from_entries(entries);
819
820 assert!(index.get(0).is_some());
821 assert!(index.get(1).is_some());
822 assert!(index.get(2).is_none());
823
824 assert_eq!(index.get(0).map(|e| e.row_offset), Some(0));
825 assert_eq!(index.get(1).map(|e| e.row_offset), Some(100));
826 }
827
828 #[test]
829 fn test_chunk_index_find_chunk_for_row() {
830 let entries = vec![
831 ChunkEntry::new(0, 100, 0, 500, 800),
832 ChunkEntry::new(100, 100, 500, 500, 800),
833 ChunkEntry::new(200, 50, 1000, 250, 400),
834 ];
835 let index = ChunkIndex::from_entries(entries);
836
837 assert_eq!(index.find_chunk_for_row(0), Some(0));
838 assert_eq!(index.find_chunk_for_row(50), Some(0));
839 assert_eq!(index.find_chunk_for_row(99), Some(0));
840 assert_eq!(index.find_chunk_for_row(100), Some(1));
841 assert_eq!(index.find_chunk_for_row(150), Some(1));
842 assert_eq!(index.find_chunk_for_row(200), Some(2));
843 assert_eq!(index.find_chunk_for_row(249), Some(2));
844 assert_eq!(index.find_chunk_for_row(250), None);
845 assert_eq!(index.find_chunk_for_row(1000), None);
846 }
847
848 #[test]
849 fn test_chunk_index_serialization() {
850 let entries = vec![
851 ChunkEntry::new(0, 100, 0, 500, 800),
852 ChunkEntry::new(100, 100, 500, 500, 800),
853 ];
854 let index = ChunkIndex::from_entries(entries);
855
856 let bytes = index.to_bytes().expect("serialize");
857 let restored = ChunkIndex::from_bytes(&bytes).expect("deserialize");
858
859 assert_eq!(restored.len(), index.len());
860 assert_eq!(restored.total_rows(), index.total_rows());
861 assert_eq!(restored.get(0), index.get(0));
862 assert_eq!(restored.get(1), index.get(1));
863 }
864
865 #[test]
868 fn test_save_streaming_creates_file() {
869 let batches = vec![make_test_batch(100, 0), make_test_batch(100, 100)];
870 let schema = test_schema();
871
872 let temp = NamedTempFile::new().expect("temp file");
873 let path = temp.path();
874
875 save_streaming(
876 batches.into_iter(),
877 &schema,
878 path,
879 Some(64),
880 Compression::None,
881 )
882 .expect("save");
883
884 assert!(path.exists());
885 assert!(std::fs::metadata(path).expect("metadata").len() > 0);
886 }
887
888 #[test]
889 fn test_save_streaming_with_compression() {
890 let batches = vec![make_test_batch(1000, 0)];
891 let schema = test_schema();
892
893 let temp_none = NamedTempFile::new().expect("temp");
894 let temp_zstd = NamedTempFile::new().expect("temp");
895
896 save_streaming(
897 batches.clone().into_iter(),
898 &schema,
899 temp_none.path(),
900 Some(500),
901 Compression::None,
902 )
903 .expect("save none");
904
905 save_streaming(
906 batches.into_iter(),
907 &schema,
908 temp_zstd.path(),
909 Some(500),
910 Compression::ZstdL3,
911 )
912 .expect("save zstd");
913
914 let size_none = std::fs::metadata(temp_none.path()).expect("meta").len();
915 let size_zstd = std::fs::metadata(temp_zstd.path()).expect("meta").len();
916
917 assert!(
919 size_zstd < size_none,
920 "Zstd should compress: {size_zstd} >= {size_none}"
921 );
922 }
923
924 #[test]
927 fn test_streaming_dataset_open() {
928 let batches = vec![make_test_batch(100, 0), make_test_batch(100, 100)];
929 let schema = test_schema();
930
931 let temp = NamedTempFile::new().expect("temp");
932 save_streaming(
933 batches.into_iter(),
934 &schema,
935 temp.path(),
936 Some(64),
937 Compression::ZstdL3,
938 )
939 .expect("save");
940
941 let dataset = StreamingDataset::open(temp.path()).expect("open");
942
943 assert_eq!(dataset.num_rows(), 200);
944 assert!(dataset.num_chunks() > 0);
945 }
946
947 #[test]
948 fn test_streaming_dataset_rejects_non_streaming_file() {
949 let result = StreamingDataset::open("/nonexistent/path.ald");
951 assert!(result.is_err());
952 }
953
954 #[test]
957 fn test_get_chunk_returns_correct_data() {
958 let batch1 = make_test_batch(100, 0);
959 let batch2 = make_test_batch(100, 100);
960 let schema = test_schema();
961
962 let temp = NamedTempFile::new().expect("temp");
963 save_streaming(
964 vec![batch1, batch2].into_iter(),
965 &schema,
966 temp.path(),
967 Some(100),
968 Compression::None,
969 )
970 .expect("save");
971
972 let dataset = StreamingDataset::open(temp.path()).expect("open");
973
974 let chunk0 = dataset.get_chunk(0).expect("chunk 0");
975 assert_eq!(chunk0.num_rows(), 100);
976
977 let ids = chunk0
979 .column(0)
980 .as_any()
981 .downcast_ref::<Int32Array>()
982 .expect("downcast");
983 assert_eq!(ids.value(0), 0);
984 }
985
986 #[test]
987 fn test_get_chunk_out_of_bounds() {
988 let batches = vec![make_test_batch(100, 0)];
989 let schema = test_schema();
990
991 let temp = NamedTempFile::new().expect("temp");
992 save_streaming(
993 batches.into_iter(),
994 &schema,
995 temp.path(),
996 Some(50),
997 Compression::None,
998 )
999 .expect("save");
1000
1001 let dataset = StreamingDataset::open(temp.path()).expect("open");
1002 let result = dataset.get_chunk(999);
1003
1004 assert!(result.is_err());
1005 }
1006
1007 #[test]
1010 fn test_get_rows_within_chunk() {
1011 let batches = vec![make_test_batch(100, 0)];
1012 let schema = test_schema();
1013
1014 let temp = NamedTempFile::new().expect("temp");
1015 save_streaming(
1016 batches.into_iter(),
1017 &schema,
1018 temp.path(),
1019 Some(100),
1020 Compression::None,
1021 )
1022 .expect("save");
1023
1024 let dataset = StreamingDataset::open(temp.path()).expect("open");
1025 let rows = dataset.get_rows(10, 20).expect("get_rows");
1026
1027 assert_eq!(rows.num_rows(), 20);
1028
1029 let ids = rows
1030 .column(0)
1031 .as_any()
1032 .downcast_ref::<Int32Array>()
1033 .expect("downcast");
1034 assert_eq!(ids.value(0), 10);
1035 assert_eq!(ids.value(19), 29);
1036 }
1037
1038 #[test]
1039 fn test_get_rows_spanning_chunks() {
1040 let batches = vec![make_test_batch(50, 0), make_test_batch(50, 50)];
1041 let schema = test_schema();
1042
1043 let temp = NamedTempFile::new().expect("temp");
1044 save_streaming(
1045 batches.into_iter(),
1046 &schema,
1047 temp.path(),
1048 Some(50),
1049 Compression::None,
1050 )
1051 .expect("save");
1052
1053 let dataset = StreamingDataset::open(temp.path()).expect("open");
1054
1055 let rows = dataset.get_rows(40, 20).expect("get_rows");
1057
1058 assert_eq!(rows.num_rows(), 20);
1059
1060 let ids = rows
1061 .column(0)
1062 .as_any()
1063 .downcast_ref::<Int32Array>()
1064 .expect("downcast");
1065 assert_eq!(ids.value(0), 40);
1066 assert_eq!(ids.value(9), 49); assert_eq!(ids.value(10), 50); assert_eq!(ids.value(19), 59);
1069 }
1070
1071 #[test]
1072 fn test_get_rows_out_of_bounds() {
1073 let batches = vec![make_test_batch(100, 0)];
1074 let schema = test_schema();
1075
1076 let temp = NamedTempFile::new().expect("temp");
1077 save_streaming(
1078 batches.into_iter(),
1079 &schema,
1080 temp.path(),
1081 Some(50),
1082 Compression::None,
1083 )
1084 .expect("save");
1085
1086 let dataset = StreamingDataset::open(temp.path()).expect("open");
1087
1088 let result = dataset.get_rows(200, 10);
1089 assert!(result.is_err());
1090 }
1091
1092 #[test]
1095 fn test_chunks_iterator() {
1096 let batches = vec![
1097 make_test_batch(100, 0),
1098 make_test_batch(100, 100),
1099 make_test_batch(50, 200),
1100 ];
1101 let schema = test_schema();
1102
1103 let temp = NamedTempFile::new().expect("temp");
1104 save_streaming(
1105 batches.into_iter(),
1106 &schema,
1107 temp.path(),
1108 Some(100),
1109 Compression::None,
1110 )
1111 .expect("save");
1112
1113 let dataset = StreamingDataset::open(temp.path()).expect("open");
1114
1115 let mut total_rows = 0;
1116 for chunk_result in dataset.chunks() {
1117 let chunk = chunk_result.expect("chunk");
1118 total_rows += chunk.num_rows();
1119 }
1120
1121 assert_eq!(total_rows, 250);
1122 }
1123
1124 #[test]
1125 fn test_chunks_iterator_size_hint() {
1126 let batches = vec![make_test_batch(200, 0)];
1127 let schema = test_schema();
1128
1129 let temp = NamedTempFile::new().expect("temp");
1130 save_streaming(
1131 batches.into_iter(),
1132 &schema,
1133 temp.path(),
1134 Some(50),
1135 Compression::None,
1136 )
1137 .expect("save");
1138
1139 let dataset = StreamingDataset::open(temp.path()).expect("open");
1140 let chunks = dataset.chunks();
1141
1142 let (lower, upper) = chunks.size_hint();
1143 assert_eq!(lower, dataset.num_chunks());
1144 assert_eq!(upper, Some(dataset.num_chunks()));
1145 }
1146
1147 #[test]
1148 fn test_chunk_index_iter() {
1149 let entries = vec![
1150 ChunkEntry::new(0, 100, 0, 500, 800),
1151 ChunkEntry::new(100, 100, 500, 500, 800),
1152 ];
1153 let index = ChunkIndex::from_entries(entries);
1154
1155 let collected: Vec<_> = index.iter().collect();
1156 assert_eq!(collected.len(), 2);
1157 assert_eq!(collected[0].row_offset, 0);
1158 assert_eq!(collected[1].row_offset, 100);
1159 }
1160
1161 #[test]
1162 fn test_chunk_index_from_entries_empty() {
1163 let index = ChunkIndex::from_entries(vec![]);
1164 assert!(index.is_empty());
1165 assert_eq!(index.total_rows(), 0);
1166 }
1167
1168 #[test]
1169 fn test_streaming_dataset_schema() {
1170 let batches = vec![make_test_batch(100, 0)];
1171 let schema = test_schema();
1172
1173 let temp = NamedTempFile::new().expect("temp");
1174 save_streaming(
1175 batches.into_iter(),
1176 &schema,
1177 temp.path(),
1178 Some(50),
1179 Compression::None,
1180 )
1181 .expect("save");
1182
1183 let dataset = StreamingDataset::open(temp.path()).expect("open");
1184 let ds_schema = dataset.schema();
1185 assert_eq!(ds_schema.fields().len(), 2);
1186 assert_eq!(ds_schema.field(0).name(), "id");
1187 assert_eq!(ds_schema.field(1).name(), "value");
1188 }
1189
1190 #[test]
1191 fn test_streaming_dataset_debug() {
1192 let batches = vec![make_test_batch(100, 0)];
1193 let schema = test_schema();
1194
1195 let temp = NamedTempFile::new().expect("temp");
1196 save_streaming(
1197 batches.into_iter(),
1198 &schema,
1199 temp.path(),
1200 Some(50),
1201 Compression::None,
1202 )
1203 .expect("save");
1204
1205 let dataset = StreamingDataset::open(temp.path()).expect("open");
1206 let debug = format!("{:?}", dataset);
1207 assert!(debug.contains("StreamingDataset"));
1208 assert!(debug.contains("num_rows"));
1209 assert!(debug.contains("num_chunks"));
1210 }
1211
1212 #[test]
1213 fn test_save_streaming_with_lz4_compression() {
1214 let batches = vec![make_test_batch(1000, 0)];
1215 let schema = test_schema();
1216
1217 let temp = NamedTempFile::new().expect("temp");
1218 save_streaming(
1219 batches.into_iter(),
1220 &schema,
1221 temp.path(),
1222 Some(500),
1223 Compression::Lz4,
1224 )
1225 .expect("save");
1226
1227 let dataset = StreamingDataset::open(temp.path()).expect("open");
1228 assert_eq!(dataset.num_rows(), 1000);
1229 }
1230
1231 #[test]
1232 fn test_save_streaming_with_zstd_l19_compression() {
1233 let batches = vec![make_test_batch(500, 0)];
1234 let schema = test_schema();
1235
1236 let temp = NamedTempFile::new().expect("temp");
1237 save_streaming(
1238 batches.into_iter(),
1239 &schema,
1240 temp.path(),
1241 Some(250),
1242 Compression::ZstdL19,
1243 )
1244 .expect("save");
1245
1246 let dataset = StreamingDataset::open(temp.path()).expect("open");
1247 assert_eq!(dataset.num_rows(), 500);
1248 }
1249
1250 #[test]
1251 fn test_save_streaming_default_chunk_size() {
1252 let batches = vec![make_test_batch(100, 0)];
1253 let schema = test_schema();
1254
1255 let temp = NamedTempFile::new().expect("temp");
1256 save_streaming(
1257 batches.into_iter(),
1258 &schema,
1259 temp.path(),
1260 None, Compression::None,
1262 )
1263 .expect("save");
1264
1265 let dataset = StreamingDataset::open(temp.path()).expect("open");
1266 assert_eq!(dataset.num_rows(), 100);
1267 }
1268
1269 #[test]
1270 fn test_get_rows_clamps_to_end() {
1271 let batches = vec![make_test_batch(100, 0)];
1272 let schema = test_schema();
1273
1274 let temp = NamedTempFile::new().expect("temp");
1275 save_streaming(
1276 batches.into_iter(),
1277 &schema,
1278 temp.path(),
1279 Some(50),
1280 Compression::None,
1281 )
1282 .expect("save");
1283
1284 let dataset = StreamingDataset::open(temp.path()).expect("open");
1285 let rows = dataset.get_rows(90, 50).expect("get_rows");
1287 assert_eq!(rows.num_rows(), 10); }
1289
1290 #[test]
1291 fn test_chunk_entry_clone_and_eq() {
1292 let entry1 = ChunkEntry::new(0, 100, 0, 500, 800);
1293 let entry2 = entry1.clone();
1294 assert_eq!(entry1, entry2);
1295 }
1296
1297 #[test]
1298 fn test_chunk_index_clone() {
1299 let entries = vec![
1300 ChunkEntry::new(0, 100, 0, 500, 800),
1301 ChunkEntry::new(100, 100, 500, 500, 800),
1302 ];
1303 let index = ChunkIndex::from_entries(entries);
1304 let cloned = index.clone();
1305
1306 assert_eq!(cloned.len(), index.len());
1307 assert_eq!(cloned.total_rows(), index.total_rows());
1308 }
1309
1310 #[test]
1311 fn test_chunk_entry_debug() {
1312 let entry = ChunkEntry::new(0, 100, 0, 500, 800);
1313 let debug = format!("{:?}", entry);
1314 assert!(debug.contains("ChunkEntry"));
1315 assert!(debug.contains("row_offset"));
1316 }
1317
1318 #[test]
1319 fn test_chunk_index_debug() {
1320 let index = ChunkIndex::new();
1321 let debug = format!("{:?}", index);
1322 assert!(debug.contains("ChunkIndex"));
1323 }
1324
1325 #[test]
1326 fn test_constants() {
1327 assert_eq!(DEFAULT_CHUNK_SIZE, 65536);
1328 assert_eq!(STREAMING_THRESHOLD, 100 * 1024 * 1024);
1329 }
1330
1331 #[test]
1332 fn test_chunks_iterator_exact_size() {
1333 let batches = vec![make_test_batch(200, 0)];
1334 let schema = test_schema();
1335
1336 let temp = NamedTempFile::new().expect("temp");
1337 save_streaming(
1338 batches.into_iter(),
1339 &schema,
1340 temp.path(),
1341 Some(50),
1342 Compression::None,
1343 )
1344 .expect("save");
1345
1346 let dataset = StreamingDataset::open(temp.path()).expect("open");
1347 let chunks = dataset.chunks();
1348
1349 assert_eq!(chunks.len(), dataset.num_chunks());
1351 }
1352}