Skip to main content

embeddenator_fs/fs/
streaming.rs

1//! Streaming Ingester for Memory-Efficient File Encoding
2//!
3//! This module provides a streaming API for ingesting large files without loading
4//! them entirely into memory. It processes data in chunks, encoding each chunk
5//! as it arrives and bundling progressively into the root engram.
6//!
7//! # Why Streaming?
8//!
9//! For large files (>100MB), loading everything into memory before encoding:
10//! - Causes memory pressure and potential OOM for multi-GB files
11//! - Increases latency (must read entire file before any encoding starts)
12//! - Wastes resources (chunks are independent, can encode in parallel)
13//!
14//! Streaming ingestion solves these by:
15//! - Processing fixed-size chunks as they arrive
16//! - Encoding each chunk immediately
17//! - Maintaining bounded memory usage (~chunk_size * pipeline_depth)
18//! - Enabling early error detection
19//!
20//! # Architecture
21//!
22//! ```text
23//! ┌─────────────┐     ┌─────────────────┐     ┌──────────────────┐
24//! │ Data Source │────▶│ StreamingIngester│────▶│ VersionedEmbrFS │
25//! │ (Read/io)   │     │ (chunk + encode) │     │ (store + bundle) │
26//! └─────────────┘     └─────────────────┘     └──────────────────┘
27//!                              │
28//!                              ├── Chunk Buffer (4KB default)
29//!                              ├── Correction Accumulator
30//!                              └── Progressive Hash
31//! ```
32//!
33//! # Example
34//!
35//! ```rust,ignore
36//! use embeddenator_fs::streaming::StreamingIngester;
37//! use std::fs::File;
38//! use std::io::BufReader;
39//!
40//! let file = File::open("large_file.bin")?;
41//! let reader = BufReader::new(file);
42//!
43//! let mut ingester = StreamingIngester::builder(&fs)
44//!     .with_chunk_size(8192)
45//!     .with_path("large_file.bin");
46//!
47//! ingester.ingest_reader(reader)?;
48//! let result = ingester.finalize()?;
49//! ```
50
51use crate::correction::ChunkCorrection;
52use crate::versioned::{ChunkId, VersionedChunk, VersionedFileEntry};
53use crate::versioned_embrfs::{
54    EmbrFSError, VersionedEmbrFS, DEFAULT_CHUNK_SIZE, ENCODING_FORMAT_REVERSIBLE_VSA,
55};
56use embeddenator_io::{wrap_or_legacy, BinaryWriteOptions, CompressionCodec, PayloadKind};
57use embeddenator_vsa::SparseVec;
58use sha2::{Digest, Sha256};
59use std::io::{BufRead, Read};
60use std::sync::atomic::{AtomicU64, Ordering};
61
62/// Result of a completed streaming ingestion
63#[derive(Debug, Clone)]
64pub struct StreamingResult {
65    /// Path of the ingested file
66    pub path: String,
67    /// Total bytes ingested
68    pub total_bytes: usize,
69    /// Number of chunks created
70    pub chunk_count: usize,
71    /// File version in the filesystem
72    pub version: u64,
73    /// Bytes saved by correction optimization (if applicable)
74    pub correction_savings: usize,
75}
76
77/// Pending chunk data for batch insertion
78struct PendingChunk {
79    chunk_id: ChunkId,
80    data: Vec<u8>,
81    vector: SparseVec,
82    correction: ChunkCorrection,
83}
84
85/// Builder for configuring streaming ingestion
86pub struct StreamingIngesterBuilder<'a> {
87    fs: &'a VersionedEmbrFS,
88    chunk_size: usize,
89    path: Option<String>,
90    expected_version: Option<u64>,
91    compression: Option<CompressionCodec>,
92    adaptive_chunking: bool,
93    correction_threshold: f64,
94}
95
96impl<'a> StreamingIngesterBuilder<'a> {
97    /// Create a new builder with default settings
98    pub fn new(fs: &'a VersionedEmbrFS) -> Self {
99        Self {
100            fs,
101            chunk_size: DEFAULT_CHUNK_SIZE,
102            path: None,
103            expected_version: None,
104            compression: None,
105            adaptive_chunking: false,
106            correction_threshold: 0.1,
107        }
108    }
109
110    /// Set the chunk size for encoding
111    ///
112    /// Smaller chunks improve memory efficiency but may reduce encoding quality
113    /// for highly correlated data. Default is 4KB.
114    pub fn with_chunk_size(mut self, size: usize) -> Self {
115        self.chunk_size = size.clamp(512, 1024 * 1024); // Clamp to 512B - 1MB
116        self
117    }
118
119    /// Set the target path for the file
120    pub fn with_path(mut self, path: impl Into<String>) -> Self {
121        self.path = Some(path.into());
122        self
123    }
124
125    /// Set expected version for optimistic locking (update existing file)
126    pub fn with_expected_version(mut self, version: u64) -> Self {
127        self.expected_version = Some(version);
128        self
129    }
130
131    /// Enable compression with specified codec
132    pub fn with_compression(mut self, codec: CompressionCodec) -> Self {
133        self.compression = Some(codec);
134        self
135    }
136
137    /// Enable adaptive chunking based on content similarity
138    ///
139    /// When enabled, the ingester may adjust chunk boundaries to improve
140    /// encoding quality for files with repeated patterns.
141    pub fn with_adaptive_chunking(mut self, enabled: bool) -> Self {
142        self.adaptive_chunking = enabled;
143        self
144    }
145
146    /// Set correction threshold for large file optimization
147    ///
148    /// For large files, corrections beyond this threshold trigger re-encoding
149    /// with adjusted parameters. Range: 0.0 - 1.0 (default 0.1 = 10%)
150    pub fn with_correction_threshold(mut self, threshold: f64) -> Self {
151        self.correction_threshold = threshold.clamp(0.0, 1.0);
152        self
153    }
154
155    /// Build the streaming ingester
156    pub fn build(self) -> Result<StreamingIngester<'a>, EmbrFSError> {
157        let path = self.path.ok_or_else(|| {
158            EmbrFSError::InvalidOperation("Path must be specified for streaming ingestion".into())
159        })?;
160
161        Ok(StreamingIngester {
162            fs: self.fs,
163            path,
164            chunk_size: self.chunk_size,
165            expected_version: self.expected_version,
166            compression: self.compression,
167            adaptive_chunking: self.adaptive_chunking,
168            correction_threshold: self.correction_threshold,
169            // State
170            buffer: Vec::with_capacity(self.chunk_size),
171            pending_chunks: Vec::new(),
172            chunk_ids: Vec::new(),
173            total_bytes: AtomicU64::new(0),
174            hasher: Sha256::new(),
175            correction_bytes: AtomicU64::new(0),
176        })
177    }
178}
179
180/// Streaming ingester for memory-efficient file encoding
181///
182/// Processes data in chunks as it arrives, maintaining bounded memory usage.
183/// Suitable for files of any size including multi-GB datasets.
184pub struct StreamingIngester<'a> {
185    // Configuration
186    fs: &'a VersionedEmbrFS,
187    path: String,
188    chunk_size: usize,
189    expected_version: Option<u64>,
190    compression: Option<CompressionCodec>,
191    adaptive_chunking: bool,
192    correction_threshold: f64,
193
194    // Mutable state
195    buffer: Vec<u8>,
196    pending_chunks: Vec<PendingChunk>,
197    chunk_ids: Vec<ChunkId>,
198    total_bytes: AtomicU64,
199    hasher: Sha256,
200    correction_bytes: AtomicU64,
201}
202
203impl<'a> StreamingIngester<'a> {
204    /// Create a builder for configuring a streaming ingester
205    pub fn builder(fs: &'a VersionedEmbrFS) -> StreamingIngesterBuilder<'a> {
206        StreamingIngesterBuilder::new(fs)
207    }
208
209    /// Ingest data from a reader
210    ///
211    /// Reads data in chunks and encodes progressively. Memory usage is bounded
212    /// by `chunk_size` regardless of total file size.
213    pub fn ingest_reader<R: Read>(&mut self, mut reader: R) -> Result<(), EmbrFSError> {
214        let mut read_buf = vec![0u8; self.chunk_size];
215
216        loop {
217            let bytes_read = reader
218                .read(&mut read_buf)
219                .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
220
221            if bytes_read == 0 {
222                break; // EOF
223            }
224
225            self.ingest_bytes(&read_buf[..bytes_read])?;
226        }
227
228        Ok(())
229    }
230
231    /// Ingest data from a buffered reader (more efficient)
232    pub fn ingest_buffered<R: BufRead>(&mut self, mut reader: R) -> Result<(), EmbrFSError> {
233        loop {
234            let buf = reader
235                .fill_buf()
236                .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
237
238            if buf.is_empty() {
239                break; // EOF
240            }
241
242            let len = buf.len();
243            self.ingest_bytes(buf)?;
244            reader.consume(len);
245        }
246
247        Ok(())
248    }
249
250    /// Ingest a slice of bytes
251    ///
252    /// Can be called multiple times. Data is buffered until a full chunk is
253    /// available, then encoded and stored.
254    pub fn ingest_bytes(&mut self, data: &[u8]) -> Result<(), EmbrFSError> {
255        self.total_bytes
256            .fetch_add(data.len() as u64, Ordering::Relaxed);
257        self.hasher.update(data);
258
259        // Add to buffer
260        self.buffer.extend_from_slice(data);
261
262        // Process complete chunks
263        while self.buffer.len() >= self.chunk_size {
264            let chunk_data: Vec<u8> = self.buffer.drain(..self.chunk_size).collect();
265            self.process_chunk(chunk_data)?;
266        }
267
268        Ok(())
269    }
270
271    /// Finalize the ingestion and commit to filesystem
272    ///
273    /// Processes any remaining buffered data and commits all chunks to the
274    /// versioned filesystem atomically.
275    pub fn finalize(mut self) -> Result<StreamingResult, EmbrFSError> {
276        // Process remaining data in buffer
277        if !self.buffer.is_empty() {
278            let remaining = std::mem::take(&mut self.buffer);
279            self.process_chunk(remaining)?;
280        }
281
282        let total_bytes = self.total_bytes.load(Ordering::Relaxed) as usize;
283        let chunk_count = self.chunk_ids.len();
284        let correction_bytes = self.correction_bytes.load(Ordering::Relaxed) as usize;
285
286        // Check existing file for version verification
287        let existing = self.fs.manifest.get_file(&self.path);
288
289        match (&existing, self.expected_version) {
290            (Some((entry, _)), Some(expected_ver)) => {
291                if entry.version != expected_ver {
292                    return Err(EmbrFSError::VersionMismatch {
293                        expected: expected_ver,
294                        actual: entry.version,
295                    });
296                }
297            }
298            (Some(_), None) => {
299                return Err(EmbrFSError::FileExists(self.path.clone()));
300            }
301            (None, Some(_)) => {
302                return Err(EmbrFSError::FileNotFound(self.path.clone()));
303            }
304            (None, None) => {}
305        }
306
307        // Build chunk updates from pending chunks
308        let chunk_updates: Vec<_> = self
309            .pending_chunks
310            .iter()
311            .map(|pc| {
312                let mut hash_bytes = [0u8; 8];
313                let mut hasher = Sha256::new();
314                hasher.update(&pc.data);
315                let hash = hasher.finalize();
316                hash_bytes.copy_from_slice(&hash[0..8]);
317                (
318                    pc.chunk_id,
319                    VersionedChunk::new(pc.vector.clone(), pc.data.len(), hash_bytes),
320                )
321            })
322            .collect();
323
324        // Insert chunks
325        self.fs.chunk_store.batch_insert_new(chunk_updates)?;
326
327        // Insert corrections
328        let corrections: Vec<_> = self
329            .pending_chunks
330            .into_iter()
331            .map(|pc| (pc.chunk_id as u64, pc.correction))
332            .collect();
333        self.fs.corrections.batch_insert_new(corrections)?;
334
335        // Create manifest entry
336        let is_text = total_bytes > 0 && is_likely_text(total_bytes);
337        let mut file_entry = if let Some(codec) = self.compression {
338            let codec_byte = match codec {
339                CompressionCodec::None => 0,
340                CompressionCodec::Zstd => 1,
341                CompressionCodec::Lz4 => 2,
342            };
343            VersionedFileEntry::new_compressed(
344                self.path.clone(),
345                is_text,
346                total_bytes, // compressed size (actual)
347                total_bytes, // original size
348                codec_byte,
349                self.chunk_ids.clone(),
350            )
351        } else {
352            VersionedFileEntry::new(
353                self.path.clone(),
354                is_text,
355                total_bytes,
356                self.chunk_ids.clone(),
357            )
358        };
359
360        // Set encoding format for holographic mode files
361        if self.fs.is_holographic() {
362            file_entry.encoding_format = Some(ENCODING_FORMAT_REVERSIBLE_VSA);
363        }
364
365        let version = if let Some((entry, _)) = existing {
366            self.fs
367                .manifest
368                .update_file(&self.path, file_entry, entry.version)?;
369            entry.version + 1
370        } else {
371            self.fs.manifest.add_file(file_entry)?;
372            0
373        };
374
375        // Bundle chunks into root
376        self.fs.bundle_chunks_to_root_streaming(&self.chunk_ids)?;
377
378        Ok(StreamingResult {
379            path: self.path,
380            total_bytes,
381            chunk_count,
382            version,
383            correction_savings: correction_bytes,
384        })
385    }
386
387    /// Process a single chunk
388    fn process_chunk(&mut self, data: Vec<u8>) -> Result<(), EmbrFSError> {
389        let chunk_id = self.fs.allocate_chunk_id();
390
391        // Apply compression if configured
392        let encoded_data = if let Some(codec) = self.compression {
393            if codec != CompressionCodec::None {
394                let write_opts = BinaryWriteOptions { codec, level: None };
395                wrap_or_legacy(PayloadKind::EngramBincode, write_opts, &data)
396                    .map_err(|e| EmbrFSError::IoError(format!("Compression failed: {}", e)))?
397            } else {
398                data.clone()
399            }
400        } else {
401            data.clone()
402        };
403
404        // Encode chunk using appropriate encoder based on mode
405        let chunk_vec = if self.fs.is_holographic() {
406            // Use ReversibleVSAEncoder for ~94% uncorrected accuracy
407            self.fs
408                .reversible_encoder()
409                .write()
410                .unwrap()
411                .encode(&encoded_data)
412        } else {
413            // Legacy mode: use SparseVec::encode_data
414            SparseVec::encode_data(&encoded_data, self.fs.config(), Some(&self.path))
415        };
416
417        // Decode and compute correction using matching decoder
418        let decoded = if self.fs.is_holographic() {
419            self.fs
420                .reversible_encoder()
421                .read()
422                .unwrap()
423                .decode(&chunk_vec, encoded_data.len())
424        } else {
425            chunk_vec.decode_data(self.fs.config(), Some(&self.path), encoded_data.len())
426        };
427        let correction = ChunkCorrection::new(chunk_id as u64, &encoded_data, &decoded);
428
429        // Track correction overhead
430        let corr_size = correction.storage_size();
431        self.correction_bytes
432            .fetch_add(corr_size as u64, Ordering::Relaxed);
433
434        // Check if correction exceeds threshold (triggers adaptive strategy)
435        if self.adaptive_chunking {
436            let correction_ratio = corr_size as f64 / encoded_data.len() as f64;
437            if correction_ratio > self.correction_threshold {
438                // For now, just log - future: implement re-encoding with different params
439                eprintln!(
440                    "Warning: High correction ratio {:.2}% for chunk {} - consider adjusting parameters",
441                    correction_ratio * 100.0,
442                    chunk_id
443                );
444            }
445        }
446
447        // Store pending chunk for batch insertion
448        self.pending_chunks.push(PendingChunk {
449            chunk_id,
450            data: encoded_data,
451            vector: chunk_vec,
452            correction,
453        });
454        self.chunk_ids.push(chunk_id);
455
456        Ok(())
457    }
458
459    /// Get current progress
460    pub fn progress(&self) -> StreamingProgress {
461        StreamingProgress {
462            bytes_processed: self.total_bytes.load(Ordering::Relaxed) as usize,
463            chunks_created: self.chunk_ids.len(),
464            buffer_usage: self.buffer.len(),
465            correction_overhead: self.correction_bytes.load(Ordering::Relaxed) as usize,
466        }
467    }
468}
469
470/// Progress information for streaming ingestion
471#[derive(Debug, Clone)]
472pub struct StreamingProgress {
473    /// Total bytes processed so far
474    pub bytes_processed: usize,
475    /// Number of chunks created
476    pub chunks_created: usize,
477    /// Current buffer usage in bytes
478    pub buffer_usage: usize,
479    /// Total correction overhead in bytes
480    pub correction_overhead: usize,
481}
482
483/// Heuristic to detect if data is likely text based on size
484fn is_likely_text(size: usize) -> bool {
485    // Small files are often text, large files are often binary
486    size < 1024 * 1024
487}
488
489// =============================================================================
490// ASYNC STREAMING SUPPORT
491// =============================================================================
492
493/// Async streaming ingester for memory-efficient file encoding
494///
495/// This is the async counterpart to `StreamingIngester`, supporting
496/// `tokio::io::AsyncRead` and `tokio::io::AsyncBufRead` for non-blocking I/O.
497///
498/// # Example
499///
500/// ```rust,ignore
501/// use embeddenator_fs::fs::AsyncStreamingIngester;
502/// use tokio::io::AsyncReadExt;
503///
504/// async fn ingest_async(fs: &VersionedEmbrFS, path: &str) {
505///     let file = tokio::fs::File::open("large_file.bin").await?;
506///     let reader = tokio::io::BufReader::new(file);
507///
508///     let mut ingester = AsyncStreamingIngester::builder(fs)
509///         .with_path(path)
510///         .build()
511///         .await?;
512///
513///     ingester.ingest_async_reader(reader).await?;
514///     let result = ingester.finalize().await?;
515/// }
516/// ```
517#[cfg(feature = "tokio")]
518pub struct AsyncStreamingIngester<'a> {
519    /// Inner sync ingester (we delegate chunk processing to it)
520    inner: StreamingIngester<'a>,
521}
522
523#[cfg(feature = "tokio")]
524impl<'a> AsyncStreamingIngester<'a> {
525    /// Create a new async streaming ingester builder
526    pub fn builder(fs: &'a VersionedEmbrFS) -> AsyncStreamingIngesterBuilder<'a> {
527        AsyncStreamingIngesterBuilder {
528            inner: StreamingIngesterBuilder::new(fs),
529        }
530    }
531
532    /// Ingest data from an async reader
533    ///
534    /// Reads data in chunks and processes asynchronously.
535    pub async fn ingest_async_reader<R>(&mut self, mut reader: R) -> Result<(), EmbrFSError>
536    where
537        R: tokio::io::AsyncRead + Unpin,
538    {
539        use tokio::io::AsyncReadExt;
540
541        let mut buf = vec![0u8; self.inner.chunk_size];
542
543        loop {
544            let n = reader
545                .read(&mut buf)
546                .await
547                .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
548
549            if n == 0 {
550                break;
551            }
552
553            // Delegate to sync ingester's bytes method
554            self.inner.ingest_bytes(&buf[..n])?;
555        }
556
557        Ok(())
558    }
559
560    /// Ingest data from an async buffered reader
561    ///
562    /// More efficient than `ingest_async_reader` as it avoids double-buffering.
563    pub async fn ingest_async_buffered<R>(&mut self, mut reader: R) -> Result<(), EmbrFSError>
564    where
565        R: tokio::io::AsyncBufRead + Unpin,
566    {
567        use tokio::io::AsyncBufReadExt;
568
569        loop {
570            let buf = reader
571                .fill_buf()
572                .await
573                .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
574
575            if buf.is_empty() {
576                break;
577            }
578
579            let len = buf.len();
580            self.inner.ingest_bytes(buf)?;
581            reader.consume(len);
582        }
583
584        Ok(())
585    }
586
587    /// Ingest a slice of bytes (synchronous, for convenience)
588    pub fn ingest_bytes(&mut self, data: &[u8]) -> Result<(), EmbrFSError> {
589        self.inner.ingest_bytes(data)
590    }
591
592    /// Get current ingestion progress
593    pub fn progress(&self) -> StreamingProgress {
594        self.inner.progress()
595    }
596
597    /// Finalize the ingestion and commit to filesystem
598    pub fn finalize(self) -> Result<StreamingResult, EmbrFSError> {
599        self.inner.finalize()
600    }
601}
602
603/// Builder for async streaming ingester
604#[cfg(feature = "tokio")]
605pub struct AsyncStreamingIngesterBuilder<'a> {
606    inner: StreamingIngesterBuilder<'a>,
607}
608
609#[cfg(feature = "tokio")]
610impl<'a> AsyncStreamingIngesterBuilder<'a> {
611    /// Set the target path for the file
612    pub fn with_path(mut self, path: impl Into<String>) -> Self {
613        self.inner = self.inner.with_path(path);
614        self
615    }
616
617    /// Set chunk size for processing
618    pub fn with_chunk_size(mut self, size: usize) -> Self {
619        self.inner = self.inner.with_chunk_size(size);
620        self
621    }
622
623    /// Set expected version for optimistic locking
624    pub fn with_expected_version(mut self, version: u64) -> Self {
625        self.inner = self.inner.with_expected_version(version);
626        self
627    }
628
629    /// Enable compression with specified codec
630    pub fn with_compression(mut self, codec: CompressionCodec) -> Self {
631        self.inner = self.inner.with_compression(codec);
632        self
633    }
634
635    /// Enable adaptive chunking
636    pub fn with_adaptive_chunking(mut self, enabled: bool) -> Self {
637        self.inner = self.inner.with_adaptive_chunking(enabled);
638        self
639    }
640
641    /// Set correction threshold
642    pub fn with_correction_threshold(mut self, threshold: f64) -> Self {
643        self.inner = self.inner.with_correction_threshold(threshold);
644        self
645    }
646
647    /// Build the async streaming ingester
648    pub fn build(self) -> Result<AsyncStreamingIngester<'a>, EmbrFSError> {
649        Ok(AsyncStreamingIngester {
650            inner: self.inner.build()?,
651        })
652    }
653}
654
655// =============================================================================
656// STREAMING DECODE SUPPORT
657// =============================================================================
658
659/// Streaming decoder for memory-efficient file reading
660///
661/// Decodes engram data incrementally, yielding chunks as they become available.
662/// Memory usage is bounded by a single chunk regardless of file size.
663///
664/// # Why Streaming Decode?
665///
666/// For large files, loading everything into memory before processing:
667/// - Causes memory pressure for multi-GB engrams
668/// - Increases latency (must decode all chunks before returning any data)
669/// - Prevents early termination for partial reads
670///
671/// Streaming decode solves these by:
672/// - Decoding chunks on-demand
673/// - Yielding data as it becomes available
674/// - Supporting early termination
675/// - Maintaining bounded memory usage
676///
677/// # Example
678///
679/// ```rust,ignore
680/// use embeddenator_fs::streaming::StreamingDecoder;
681/// use std::io::Read;
682///
683/// let decoder = StreamingDecoder::new(&fs, "large_file.bin")?;
684///
685/// // Read first 1KB without decoding entire file
686/// let mut buf = vec![0u8; 1024];
687/// decoder.read_exact(&mut buf)?;
688///
689/// // Or iterate over chunks
690/// for chunk_result in decoder {
691///     let chunk_data = chunk_result?;
692///     process(chunk_data);
693/// }
694/// ```
695pub struct StreamingDecoder<'a> {
696    fs: &'a VersionedEmbrFS,
697    path: String,
698    chunks: Vec<ChunkId>,
699    file_size: usize,
700    version: u64,
701    // State
702    current_chunk_idx: usize,
703    current_chunk_data: Vec<u8>,
704    position_in_chunk: usize,
705    total_bytes_read: usize,
706    /// Whether the file is stored compressed (for future decompression support)
707    #[allow(dead_code)]
708    is_compressed: bool,
709    /// Compression codec used (for future decompression support)
710    #[allow(dead_code)]
711    compression_codec: Option<u8>,
712    /// Encoding format (0=legacy, 1=reversible VSA)
713    encoding_format: Option<u8>,
714}
715
716impl<'a> StreamingDecoder<'a> {
717    /// Create a new streaming decoder for a file
718    pub fn new(fs: &'a VersionedEmbrFS, path: &str) -> Result<Self, EmbrFSError> {
719        let (file_entry, _) = fs
720            .manifest
721            .get_file(path)
722            .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
723
724        if file_entry.deleted {
725            return Err(EmbrFSError::FileNotFound(path.to_string()));
726        }
727
728        Ok(Self {
729            fs,
730            path: path.to_string(),
731            chunks: file_entry.chunks.clone(),
732            file_size: file_entry.size,
733            version: file_entry.version,
734            current_chunk_idx: 0,
735            current_chunk_data: Vec::new(),
736            position_in_chunk: 0,
737            total_bytes_read: 0,
738            is_compressed: file_entry
739                .compression_codec
740                .map(|c| c != 0)
741                .unwrap_or(false),
742            compression_codec: file_entry.compression_codec,
743            encoding_format: file_entry.encoding_format,
744        })
745    }
746
747    /// Get the file version
748    pub fn version(&self) -> u64 {
749        self.version
750    }
751
752    /// Get the total file size
753    pub fn file_size(&self) -> usize {
754        self.file_size
755    }
756
757    /// Get the number of chunks
758    pub fn chunk_count(&self) -> usize {
759        self.chunks.len()
760    }
761
762    /// Get current read position (bytes from start)
763    pub fn position(&self) -> usize {
764        self.total_bytes_read
765    }
766
767    /// Check if all data has been read
768    pub fn is_exhausted(&self) -> bool {
769        self.total_bytes_read >= self.file_size
770    }
771
772    /// Get progress information
773    pub fn progress(&self) -> StreamingDecodeProgress {
774        StreamingDecodeProgress {
775            bytes_read: self.total_bytes_read,
776            total_bytes: self.file_size,
777            chunks_decoded: self.current_chunk_idx,
778            total_chunks: self.chunks.len(),
779        }
780    }
781
782    /// Decode and return the next chunk's data
783    ///
784    /// Returns None when all chunks have been read.
785    fn decode_next_chunk(&mut self) -> Result<Option<Vec<u8>>, EmbrFSError> {
786        if self.current_chunk_idx >= self.chunks.len() {
787            return Ok(None);
788        }
789
790        let chunk_id = self.chunks[self.current_chunk_idx];
791
792        // Get chunk from store
793        let (chunk, _) = self
794            .fs
795            .chunk_store
796            .get(chunk_id)
797            .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
798
799        // Decode chunk using matching decoder based on encoding format
800        let decoded = if self.encoding_format == Some(ENCODING_FORMAT_REVERSIBLE_VSA) {
801            self.fs
802                .reversible_encoder()
803                .read()
804                .unwrap()
805                .decode(&chunk.vector, chunk.original_size)
806        } else {
807            chunk
808                .vector
809                .decode_data(self.fs.config(), Some(&self.path), chunk.original_size)
810        };
811
812        // Apply correction
813        let corrected = self
814            .fs
815            .corrections
816            .get(chunk_id as u64)
817            .map(|(corr, _)| corr.apply(&decoded))
818            .unwrap_or(decoded);
819
820        self.current_chunk_idx += 1;
821
822        Ok(Some(corrected))
823    }
824
825    /// Skip to a specific byte position
826    ///
827    /// Efficiently skips chunks that don't need to be decoded.
828    pub fn seek_to(&mut self, position: usize) -> Result<(), EmbrFSError> {
829        if position >= self.file_size {
830            self.current_chunk_idx = self.chunks.len();
831            self.current_chunk_data.clear();
832            self.position_in_chunk = 0;
833            self.total_bytes_read = self.file_size;
834            return Ok(());
835        }
836
837        // Calculate which chunk contains this position
838        let chunk_size = DEFAULT_CHUNK_SIZE;
839        let target_chunk = position / chunk_size;
840        let offset_in_chunk = position % chunk_size;
841
842        // Reset state
843        self.current_chunk_idx = target_chunk;
844        self.current_chunk_data.clear();
845        self.position_in_chunk = offset_in_chunk;
846        self.total_bytes_read = position;
847
848        // Pre-load the target chunk if needed
849        if offset_in_chunk > 0 {
850            if let Some(data) = self.decode_next_chunk()? {
851                self.current_chunk_data = data;
852                self.current_chunk_idx -= 1; // decode_next_chunk incremented it
853            }
854        }
855
856        Ok(())
857    }
858
859    /// Read exactly n bytes, returning them
860    ///
861    /// Returns less than n bytes only at EOF.
862    pub fn read_n_bytes(&mut self, n: usize) -> Result<Vec<u8>, EmbrFSError> {
863        let mut result = Vec::with_capacity(n);
864        let remaining_in_file = self.file_size.saturating_sub(self.total_bytes_read);
865        let to_read = n.min(remaining_in_file);
866
867        while result.len() < to_read {
868            // Check if we need more data from current chunk
869            if self.position_in_chunk >= self.current_chunk_data.len() {
870                // Load next chunk
871                match self.decode_next_chunk()? {
872                    Some(data) => {
873                        self.current_chunk_data = data;
874                        self.position_in_chunk = 0;
875                    }
876                    None => break, // No more chunks
877                }
878            }
879
880            // Copy data from current chunk
881            let available = self.current_chunk_data.len() - self.position_in_chunk;
882            let needed = to_read - result.len();
883            let copy_len = available.min(needed);
884
885            result.extend_from_slice(
886                &self.current_chunk_data[self.position_in_chunk..self.position_in_chunk + copy_len],
887            );
888            self.position_in_chunk += copy_len;
889            self.total_bytes_read += copy_len;
890        }
891
892        // Truncate to file size if needed
893        let max_len = self
894            .file_size
895            .saturating_sub(self.total_bytes_read - result.len());
896        if result.len() > max_len {
897            result.truncate(max_len);
898        }
899
900        Ok(result)
901    }
902}
903
904impl std::io::Read for StreamingDecoder<'_> {
905    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
906        if self.is_exhausted() {
907            return Ok(0);
908        }
909
910        let data = self
911            .read_n_bytes(buf.len())
912            .map_err(|e| std::io::Error::other(e.to_string()))?;
913
914        let len = data.len();
915        buf[..len].copy_from_slice(&data);
916        Ok(len)
917    }
918}
919
920/// Iterator that yields decoded chunks
921impl<'a> Iterator for StreamingDecoder<'a> {
922    type Item = Result<Vec<u8>, EmbrFSError>;
923
924    fn next(&mut self) -> Option<Self::Item> {
925        if self.is_exhausted() {
926            return None;
927        }
928
929        match self.decode_next_chunk() {
930            Ok(Some(mut data)) => {
931                // Truncate last chunk to exact file size
932                let remaining = self.file_size.saturating_sub(self.total_bytes_read);
933                if data.len() > remaining {
934                    data.truncate(remaining);
935                }
936                self.total_bytes_read += data.len();
937                self.current_chunk_data.clear();
938                self.position_in_chunk = 0;
939                Some(Ok(data))
940            }
941            Ok(None) => None,
942            Err(e) => Some(Err(e)),
943        }
944    }
945}
946
947/// Progress information for streaming decode
948#[derive(Debug, Clone, Copy)]
949pub struct StreamingDecodeProgress {
950    /// Bytes read so far
951    pub bytes_read: usize,
952    /// Total bytes in file
953    pub total_bytes: usize,
954    /// Chunks decoded so far
955    pub chunks_decoded: usize,
956    /// Total chunks in file
957    pub total_chunks: usize,
958}
959
960impl StreamingDecodeProgress {
961    /// Get percentage complete (0.0 - 1.0)
962    pub fn percentage(&self) -> f64 {
963        if self.total_bytes == 0 {
964            1.0
965        } else {
966            self.bytes_read as f64 / self.total_bytes as f64
967        }
968    }
969}
970
971/// Builder for configuring streaming decode with options
972pub struct StreamingDecoderBuilder<'a> {
973    fs: &'a VersionedEmbrFS,
974    path: String,
975    start_offset: Option<usize>,
976    max_bytes: Option<usize>,
977}
978
979impl<'a> StreamingDecoderBuilder<'a> {
980    /// Create a new streaming decoder builder
981    pub fn new(fs: &'a VersionedEmbrFS, path: impl Into<String>) -> Self {
982        Self {
983            fs,
984            path: path.into(),
985            start_offset: None,
986            max_bytes: None,
987        }
988    }
989
990    /// Set starting offset for partial reads
991    pub fn with_offset(mut self, offset: usize) -> Self {
992        self.start_offset = Some(offset);
993        self
994    }
995
996    /// Limit maximum bytes to read
997    pub fn with_max_bytes(mut self, max: usize) -> Self {
998        self.max_bytes = Some(max);
999        self
1000    }
1001
1002    /// Build the streaming decoder
1003    pub fn build(self) -> Result<StreamingDecoder<'a>, EmbrFSError> {
1004        let mut decoder = StreamingDecoder::new(self.fs, &self.path)?;
1005
1006        if let Some(offset) = self.start_offset {
1007            decoder.seek_to(offset)?;
1008        }
1009
1010        // max_bytes is handled by the caller limiting reads
1011
1012        Ok(decoder)
1013    }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use super::*;
1019
1020    #[test]
1021    fn test_streaming_small_file() {
1022        let fs = VersionedEmbrFS::new();
1023        let data = b"Hello, streaming world!";
1024
1025        let mut ingester = StreamingIngester::builder(&fs)
1026            .with_path("test.txt")
1027            .build()
1028            .unwrap();
1029
1030        ingester.ingest_bytes(data).unwrap();
1031        let result = ingester.finalize().unwrap();
1032
1033        assert_eq!(result.path, "test.txt");
1034        assert_eq!(result.total_bytes, data.len());
1035        assert!(result.chunk_count >= 1);
1036
1037        // Verify data
1038        let (content, _) = fs.read_file("test.txt").unwrap();
1039        assert_eq!(&content[..], data);
1040    }
1041
1042    #[test]
1043    fn test_streaming_large_file() {
1044        let fs = VersionedEmbrFS::new();
1045
1046        // Create data larger than default chunk size
1047        let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 3 + 500)
1048            .map(|i| (i % 256) as u8)
1049            .collect();
1050
1051        let mut ingester = StreamingIngester::builder(&fs)
1052            .with_path("large.bin")
1053            .with_chunk_size(DEFAULT_CHUNK_SIZE)
1054            .build()
1055            .unwrap();
1056
1057        // Feed in smaller pieces to test buffering
1058        for chunk in data.chunks(1024) {
1059            ingester.ingest_bytes(chunk).unwrap();
1060        }
1061
1062        let result = ingester.finalize().unwrap();
1063
1064        assert_eq!(result.total_bytes, data.len());
1065        assert!(result.chunk_count >= 3); // At least 3 chunks
1066
1067        // Verify data integrity
1068        let (content, _) = fs.read_file("large.bin").unwrap();
1069        assert_eq!(content, data);
1070    }
1071
1072    #[test]
1073    fn test_streaming_progress() {
1074        let fs = VersionedEmbrFS::new();
1075
1076        let mut ingester = StreamingIngester::builder(&fs)
1077            .with_path("progress.txt")
1078            .with_chunk_size(1024)
1079            .build()
1080            .unwrap();
1081
1082        // Check initial progress
1083        let p1 = ingester.progress();
1084        assert_eq!(p1.bytes_processed, 0);
1085        assert_eq!(p1.chunks_created, 0);
1086
1087        // Add data
1088        ingester.ingest_bytes(&[0u8; 500]).unwrap();
1089        let p2 = ingester.progress();
1090        assert_eq!(p2.bytes_processed, 500);
1091        assert_eq!(p2.buffer_usage, 500);
1092        assert_eq!(p2.chunks_created, 0); // Not a full chunk yet
1093
1094        // Add more data to complete a chunk
1095        ingester.ingest_bytes(&[0u8; 600]).unwrap();
1096        let p3 = ingester.progress();
1097        assert_eq!(p3.bytes_processed, 1100);
1098        assert_eq!(p3.chunks_created, 1);
1099    }
1100
1101    #[test]
1102    fn test_streaming_reader() {
1103        use std::io::Cursor;
1104
1105        let fs = VersionedEmbrFS::new();
1106        let data = b"Data from a reader interface!";
1107        let reader = Cursor::new(data);
1108
1109        let mut ingester = StreamingIngester::builder(&fs)
1110            .with_path("from_reader.txt")
1111            .build()
1112            .unwrap();
1113
1114        ingester.ingest_reader(reader).unwrap();
1115        let result = ingester.finalize().unwrap();
1116
1117        assert_eq!(result.total_bytes, data.len());
1118
1119        let (content, _) = fs.read_file("from_reader.txt").unwrap();
1120        assert_eq!(&content[..], data);
1121    }
1122
1123    // =========================================================================
1124    // STREAMING DECODER TESTS
1125    // =========================================================================
1126
1127    #[test]
1128    fn test_streaming_decoder_small_file() {
1129        let fs = VersionedEmbrFS::new();
1130        let data = b"Hello, streaming decoder!";
1131
1132        // Write file first
1133        fs.write_file("decode_test.txt", data, None).unwrap();
1134
1135        // Create streaming decoder
1136        let mut decoder = StreamingDecoder::new(&fs, "decode_test.txt").unwrap();
1137
1138        assert_eq!(decoder.file_size(), data.len());
1139        assert_eq!(decoder.position(), 0);
1140        assert!(!decoder.is_exhausted());
1141
1142        // Read all data
1143        let read_data = decoder.read_n_bytes(data.len() + 10).unwrap();
1144
1145        assert_eq!(&read_data[..], data);
1146        assert!(decoder.is_exhausted());
1147    }
1148
1149    #[test]
1150    fn test_streaming_decoder_read_trait() {
1151        use std::io::Read;
1152
1153        let fs = VersionedEmbrFS::new();
1154        let data = b"Read trait test data";
1155
1156        fs.write_file("read_trait.txt", data, None).unwrap();
1157
1158        let mut decoder = StreamingDecoder::new(&fs, "read_trait.txt").unwrap();
1159        let mut buf = vec![0u8; 100];
1160
1161        let bytes_read = decoder.read(&mut buf).unwrap();
1162
1163        assert_eq!(bytes_read, data.len());
1164        assert_eq!(&buf[..bytes_read], data);
1165    }
1166
1167    #[test]
1168    fn test_streaming_decoder_iterator() {
1169        let fs = VersionedEmbrFS::new();
1170
1171        // Create data larger than chunk size
1172        let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 2 + 100)
1173            .map(|i| (i % 256) as u8)
1174            .collect();
1175
1176        fs.write_file("iterator_test.bin", &data, None).unwrap();
1177
1178        let decoder = StreamingDecoder::new(&fs, "iterator_test.bin").unwrap();
1179
1180        // Collect all chunks via iterator
1181        let chunks: Vec<Vec<u8>> = decoder.map(|r| r.unwrap()).collect();
1182
1183        // Should have at least 2 chunks
1184        assert!(chunks.len() >= 2);
1185
1186        // Verify total data matches
1187        let total: Vec<u8> = chunks.into_iter().flatten().collect();
1188        assert_eq!(total, data);
1189    }
1190
1191    #[test]
1192    fn test_streaming_decoder_partial_read() {
1193        let fs = VersionedEmbrFS::new();
1194        let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
1195
1196        fs.write_file("partial.bin", &data, None).unwrap();
1197
1198        let mut decoder = StreamingDecoder::new(&fs, "partial.bin").unwrap();
1199
1200        // Read only first 100 bytes
1201        let first_100 = decoder.read_n_bytes(100).unwrap();
1202        assert_eq!(first_100.len(), 100);
1203        assert_eq!(&first_100[..], &data[..100]);
1204
1205        // Position should be updated
1206        assert_eq!(decoder.position(), 100);
1207
1208        // Read next 50 bytes
1209        let next_50 = decoder.read_n_bytes(50).unwrap();
1210        assert_eq!(next_50.len(), 50);
1211        assert_eq!(&next_50[..], &data[100..150]);
1212    }
1213
1214    #[test]
1215    fn test_streaming_decoder_seek() {
1216        let fs = VersionedEmbrFS::new();
1217        let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 3)
1218            .map(|i| (i % 256) as u8)
1219            .collect();
1220
1221        fs.write_file("seek_test.bin", &data, None).unwrap();
1222
1223        let mut decoder = StreamingDecoder::new(&fs, "seek_test.bin").unwrap();
1224
1225        // Seek to middle of second chunk
1226        let seek_pos = DEFAULT_CHUNK_SIZE + 500;
1227        decoder.seek_to(seek_pos).unwrap();
1228
1229        assert_eq!(decoder.position(), seek_pos);
1230
1231        // Read some bytes and verify
1232        let read_data = decoder.read_n_bytes(100).unwrap();
1233        assert_eq!(&read_data[..], &data[seek_pos..seek_pos + 100]);
1234    }
1235
1236    #[test]
1237    fn test_streaming_decoder_progress() {
1238        let fs = VersionedEmbrFS::new();
1239        let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
1240
1241        fs.write_file("progress_decode.bin", &data, None).unwrap();
1242
1243        let mut decoder = StreamingDecoder::new(&fs, "progress_decode.bin").unwrap();
1244
1245        let p1 = decoder.progress();
1246        assert_eq!(p1.bytes_read, 0);
1247        assert_eq!(p1.total_bytes, 1000);
1248        assert!((p1.percentage() - 0.0).abs() < 0.001);
1249
1250        // Read half the data
1251        decoder.read_n_bytes(500).unwrap();
1252
1253        let p2 = decoder.progress();
1254        assert_eq!(p2.bytes_read, 500);
1255        assert!((p2.percentage() - 0.5).abs() < 0.001);
1256
1257        // Read remaining
1258        decoder.read_n_bytes(500).unwrap();
1259
1260        let p3 = decoder.progress();
1261        assert_eq!(p3.bytes_read, 1000);
1262        assert!((p3.percentage() - 1.0).abs() < 0.001);
1263    }
1264
1265    #[test]
1266    fn test_streaming_decoder_builder() {
1267        let fs = VersionedEmbrFS::new();
1268        let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1269
1270        fs.write_file("builder_decode.bin", &data, None).unwrap();
1271
1272        let mut decoder = StreamingDecoderBuilder::new(&fs, "builder_decode.bin")
1273            .with_offset(100)
1274            .build()
1275            .unwrap();
1276
1277        assert_eq!(decoder.position(), 100);
1278
1279        let read_data = decoder.read_n_bytes(50).unwrap();
1280        assert_eq!(&read_data[..], &data[100..150]);
1281    }
1282
1283    #[test]
1284    fn test_stream_decode_convenience_method() {
1285        let fs = VersionedEmbrFS::new();
1286        let data = b"Testing stream_decode convenience method";
1287
1288        fs.write_file("convenience.txt", data, None).unwrap();
1289
1290        // Use the convenience method
1291        let mut decoder = fs.stream_decode("convenience.txt").unwrap();
1292
1293        assert_eq!(decoder.file_size(), data.len());
1294        let read_data = decoder.read_n_bytes(data.len()).unwrap();
1295        assert_eq!(&read_data[..], data);
1296    }
1297
1298    #[test]
1299    fn test_stream_decode_range_convenience_method() {
1300        let fs = VersionedEmbrFS::new();
1301        let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
1302
1303        fs.write_file("range_convenience.bin", &data, None).unwrap();
1304
1305        // Use the convenience method with offset
1306        let mut decoder = fs
1307            .stream_decode_range("range_convenience.bin", 500, Some(200))
1308            .unwrap();
1309
1310        assert_eq!(decoder.position(), 500);
1311        let read_data = decoder.read_n_bytes(200).unwrap();
1312        assert_eq!(&read_data[..], &data[500..700]);
1313    }
1314
1315    #[test]
1316    fn test_streaming_decoder_memory_bounded() {
1317        // This test verifies that streaming decode maintains bounded memory usage
1318        // by processing a file much larger than the internal buffer.
1319        //
1320        // The key property is that StreamingDecoder only keeps one chunk in memory
1321        // at a time (current_chunk_data), regardless of total file size.
1322
1323        let fs = VersionedEmbrFS::new();
1324
1325        // Create a file spanning many chunks (e.g., 50 chunks = 200KB with 4KB chunks)
1326        // In production, this would be 1GB+, but we test the algorithm with smaller data
1327        let num_chunks = 50;
1328        let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * num_chunks)
1329            .map(|i| (i % 256) as u8)
1330            .collect();
1331
1332        fs.write_file("memory_bounded.bin", &data, None).unwrap();
1333
1334        // Create decoder and verify it only loads one chunk at a time
1335        let mut decoder = StreamingDecoder::new(&fs, "memory_bounded.bin").unwrap();
1336
1337        assert_eq!(decoder.chunk_count(), num_chunks);
1338        assert_eq!(decoder.file_size(), data.len());
1339
1340        // Read chunk by chunk via iterator to verify bounded memory
1341        let mut total_read = 0;
1342        let mut chunk_count = 0;
1343        for chunk_result in &mut decoder {
1344            let chunk_data = chunk_result.unwrap();
1345            // Each chunk should be at most DEFAULT_CHUNK_SIZE
1346            assert!(chunk_data.len() <= DEFAULT_CHUNK_SIZE);
1347            total_read += chunk_data.len();
1348            chunk_count += 1;
1349        }
1350
1351        assert_eq!(total_read, data.len());
1352        assert_eq!(chunk_count, num_chunks);
1353
1354        // Verify full data matches via fresh decoder with random access
1355        let mut decoder2 = StreamingDecoder::new(&fs, "memory_bounded.bin").unwrap();
1356        let full_data = decoder2.read_n_bytes(data.len()).unwrap();
1357        assert_eq!(full_data, data);
1358    }
1359
1360    #[test]
1361    fn test_streaming_decoder_seek_across_chunks() {
1362        let fs = VersionedEmbrFS::new();
1363
1364        // Create multi-chunk file
1365        let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 5)
1366            .map(|i| (i % 256) as u8)
1367            .collect();
1368
1369        fs.write_file("seek_multi.bin", &data, None).unwrap();
1370
1371        let mut decoder = StreamingDecoder::new(&fs, "seek_multi.bin").unwrap();
1372
1373        // Seek to middle of 3rd chunk
1374        let seek_pos = DEFAULT_CHUNK_SIZE * 2 + 100;
1375        decoder.seek_to(seek_pos).unwrap();
1376        assert_eq!(decoder.position(), seek_pos);
1377
1378        // Read and verify
1379        let read_data = decoder.read_n_bytes(500).unwrap();
1380        assert_eq!(&read_data[..], &data[seek_pos..seek_pos + 500]);
1381
1382        // Seek backward to 1st chunk
1383        decoder.seek_to(50).unwrap();
1384        assert_eq!(decoder.position(), 50);
1385
1386        let read_data2 = decoder.read_n_bytes(100).unwrap();
1387        assert_eq!(&read_data2[..], &data[50..150]);
1388
1389        // Seek to end
1390        decoder.seek_to(data.len()).unwrap();
1391        assert!(decoder.is_exhausted());
1392    }
1393}