1use crate::correction::ChunkCorrection;
52use crate::versioned::{ChunkId, VersionedChunk, VersionedFileEntry};
53use crate::versioned_embrfs::{
54 EmbrFSError, VersionedEmbrFS, DEFAULT_CHUNK_SIZE, ENCODING_FORMAT_REVERSIBLE_VSA,
55};
56use embeddenator_io::{wrap_or_legacy, BinaryWriteOptions, CompressionCodec, PayloadKind};
57use embeddenator_vsa::SparseVec;
58use sha2::{Digest, Sha256};
59use std::io::{BufRead, Read};
60use std::sync::atomic::{AtomicU64, Ordering};
61
62#[derive(Debug, Clone)]
64pub struct StreamingResult {
65 pub path: String,
67 pub total_bytes: usize,
69 pub chunk_count: usize,
71 pub version: u64,
73 pub correction_savings: usize,
75}
76
77struct PendingChunk {
79 chunk_id: ChunkId,
80 data: Vec<u8>,
81 vector: SparseVec,
82 correction: ChunkCorrection,
83}
84
85pub struct StreamingIngesterBuilder<'a> {
87 fs: &'a VersionedEmbrFS,
88 chunk_size: usize,
89 path: Option<String>,
90 expected_version: Option<u64>,
91 compression: Option<CompressionCodec>,
92 adaptive_chunking: bool,
93 correction_threshold: f64,
94}
95
96impl<'a> StreamingIngesterBuilder<'a> {
97 pub fn new(fs: &'a VersionedEmbrFS) -> Self {
99 Self {
100 fs,
101 chunk_size: DEFAULT_CHUNK_SIZE,
102 path: None,
103 expected_version: None,
104 compression: None,
105 adaptive_chunking: false,
106 correction_threshold: 0.1,
107 }
108 }
109
110 pub fn with_chunk_size(mut self, size: usize) -> Self {
115 self.chunk_size = size.clamp(512, 1024 * 1024); self
117 }
118
119 pub fn with_path(mut self, path: impl Into<String>) -> Self {
121 self.path = Some(path.into());
122 self
123 }
124
125 pub fn with_expected_version(mut self, version: u64) -> Self {
127 self.expected_version = Some(version);
128 self
129 }
130
131 pub fn with_compression(mut self, codec: CompressionCodec) -> Self {
133 self.compression = Some(codec);
134 self
135 }
136
137 pub fn with_adaptive_chunking(mut self, enabled: bool) -> Self {
142 self.adaptive_chunking = enabled;
143 self
144 }
145
146 pub fn with_correction_threshold(mut self, threshold: f64) -> Self {
151 self.correction_threshold = threshold.clamp(0.0, 1.0);
152 self
153 }
154
155 pub fn build(self) -> Result<StreamingIngester<'a>, EmbrFSError> {
157 let path = self.path.ok_or_else(|| {
158 EmbrFSError::InvalidOperation("Path must be specified for streaming ingestion".into())
159 })?;
160
161 Ok(StreamingIngester {
162 fs: self.fs,
163 path,
164 chunk_size: self.chunk_size,
165 expected_version: self.expected_version,
166 compression: self.compression,
167 adaptive_chunking: self.adaptive_chunking,
168 correction_threshold: self.correction_threshold,
169 buffer: Vec::with_capacity(self.chunk_size),
171 pending_chunks: Vec::new(),
172 chunk_ids: Vec::new(),
173 total_bytes: AtomicU64::new(0),
174 hasher: Sha256::new(),
175 correction_bytes: AtomicU64::new(0),
176 })
177 }
178}
179
180pub struct StreamingIngester<'a> {
185 fs: &'a VersionedEmbrFS,
187 path: String,
188 chunk_size: usize,
189 expected_version: Option<u64>,
190 compression: Option<CompressionCodec>,
191 adaptive_chunking: bool,
192 correction_threshold: f64,
193
194 buffer: Vec<u8>,
196 pending_chunks: Vec<PendingChunk>,
197 chunk_ids: Vec<ChunkId>,
198 total_bytes: AtomicU64,
199 hasher: Sha256,
200 correction_bytes: AtomicU64,
201}
202
203impl<'a> StreamingIngester<'a> {
204 pub fn builder(fs: &'a VersionedEmbrFS) -> StreamingIngesterBuilder<'a> {
206 StreamingIngesterBuilder::new(fs)
207 }
208
209 pub fn ingest_reader<R: Read>(&mut self, mut reader: R) -> Result<(), EmbrFSError> {
214 let mut read_buf = vec![0u8; self.chunk_size];
215
216 loop {
217 let bytes_read = reader
218 .read(&mut read_buf)
219 .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
220
221 if bytes_read == 0 {
222 break; }
224
225 self.ingest_bytes(&read_buf[..bytes_read])?;
226 }
227
228 Ok(())
229 }
230
231 pub fn ingest_buffered<R: BufRead>(&mut self, mut reader: R) -> Result<(), EmbrFSError> {
233 loop {
234 let buf = reader
235 .fill_buf()
236 .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
237
238 if buf.is_empty() {
239 break; }
241
242 let len = buf.len();
243 self.ingest_bytes(buf)?;
244 reader.consume(len);
245 }
246
247 Ok(())
248 }
249
250 pub fn ingest_bytes(&mut self, data: &[u8]) -> Result<(), EmbrFSError> {
255 self.total_bytes
256 .fetch_add(data.len() as u64, Ordering::Relaxed);
257 self.hasher.update(data);
258
259 self.buffer.extend_from_slice(data);
261
262 while self.buffer.len() >= self.chunk_size {
264 let chunk_data: Vec<u8> = self.buffer.drain(..self.chunk_size).collect();
265 self.process_chunk(chunk_data)?;
266 }
267
268 Ok(())
269 }
270
271 pub fn finalize(mut self) -> Result<StreamingResult, EmbrFSError> {
276 if !self.buffer.is_empty() {
278 let remaining = std::mem::take(&mut self.buffer);
279 self.process_chunk(remaining)?;
280 }
281
282 let total_bytes = self.total_bytes.load(Ordering::Relaxed) as usize;
283 let chunk_count = self.chunk_ids.len();
284 let correction_bytes = self.correction_bytes.load(Ordering::Relaxed) as usize;
285
286 let existing = self.fs.manifest.get_file(&self.path);
288
289 match (&existing, self.expected_version) {
290 (Some((entry, _)), Some(expected_ver)) => {
291 if entry.version != expected_ver {
292 return Err(EmbrFSError::VersionMismatch {
293 expected: expected_ver,
294 actual: entry.version,
295 });
296 }
297 }
298 (Some(_), None) => {
299 return Err(EmbrFSError::FileExists(self.path.clone()));
300 }
301 (None, Some(_)) => {
302 return Err(EmbrFSError::FileNotFound(self.path.clone()));
303 }
304 (None, None) => {}
305 }
306
307 let chunk_updates: Vec<_> = self
309 .pending_chunks
310 .iter()
311 .map(|pc| {
312 let mut hash_bytes = [0u8; 8];
313 let mut hasher = Sha256::new();
314 hasher.update(&pc.data);
315 let hash = hasher.finalize();
316 hash_bytes.copy_from_slice(&hash[0..8]);
317 (
318 pc.chunk_id,
319 VersionedChunk::new(pc.vector.clone(), pc.data.len(), hash_bytes),
320 )
321 })
322 .collect();
323
324 self.fs.chunk_store.batch_insert_new(chunk_updates)?;
326
327 let corrections: Vec<_> = self
329 .pending_chunks
330 .into_iter()
331 .map(|pc| (pc.chunk_id as u64, pc.correction))
332 .collect();
333 self.fs.corrections.batch_insert_new(corrections)?;
334
335 let is_text = total_bytes > 0 && is_likely_text(total_bytes);
337 let mut file_entry = if let Some(codec) = self.compression {
338 let codec_byte = match codec {
339 CompressionCodec::None => 0,
340 CompressionCodec::Zstd => 1,
341 CompressionCodec::Lz4 => 2,
342 };
343 VersionedFileEntry::new_compressed(
344 self.path.clone(),
345 is_text,
346 total_bytes, total_bytes, codec_byte,
349 self.chunk_ids.clone(),
350 )
351 } else {
352 VersionedFileEntry::new(
353 self.path.clone(),
354 is_text,
355 total_bytes,
356 self.chunk_ids.clone(),
357 )
358 };
359
360 if self.fs.is_holographic() {
362 file_entry.encoding_format = Some(ENCODING_FORMAT_REVERSIBLE_VSA);
363 }
364
365 let version = if let Some((entry, _)) = existing {
366 self.fs
367 .manifest
368 .update_file(&self.path, file_entry, entry.version)?;
369 entry.version + 1
370 } else {
371 self.fs.manifest.add_file(file_entry)?;
372 0
373 };
374
375 self.fs.bundle_chunks_to_root_streaming(&self.chunk_ids)?;
377
378 Ok(StreamingResult {
379 path: self.path,
380 total_bytes,
381 chunk_count,
382 version,
383 correction_savings: correction_bytes,
384 })
385 }
386
387 fn process_chunk(&mut self, data: Vec<u8>) -> Result<(), EmbrFSError> {
389 let chunk_id = self.fs.allocate_chunk_id();
390
391 let encoded_data = if let Some(codec) = self.compression {
393 if codec != CompressionCodec::None {
394 let write_opts = BinaryWriteOptions { codec, level: None };
395 wrap_or_legacy(PayloadKind::EngramBincode, write_opts, &data)
396 .map_err(|e| EmbrFSError::IoError(format!("Compression failed: {}", e)))?
397 } else {
398 data.clone()
399 }
400 } else {
401 data.clone()
402 };
403
404 let chunk_vec = if self.fs.is_holographic() {
406 self.fs
408 .reversible_encoder()
409 .write()
410 .unwrap()
411 .encode(&encoded_data)
412 } else {
413 SparseVec::encode_data(&encoded_data, self.fs.config(), Some(&self.path))
415 };
416
417 let decoded = if self.fs.is_holographic() {
419 self.fs
420 .reversible_encoder()
421 .read()
422 .unwrap()
423 .decode(&chunk_vec, encoded_data.len())
424 } else {
425 chunk_vec.decode_data(self.fs.config(), Some(&self.path), encoded_data.len())
426 };
427 let correction = ChunkCorrection::new(chunk_id as u64, &encoded_data, &decoded);
428
429 let corr_size = correction.storage_size();
431 self.correction_bytes
432 .fetch_add(corr_size as u64, Ordering::Relaxed);
433
434 if self.adaptive_chunking {
436 let correction_ratio = corr_size as f64 / encoded_data.len() as f64;
437 if correction_ratio > self.correction_threshold {
438 eprintln!(
440 "Warning: High correction ratio {:.2}% for chunk {} - consider adjusting parameters",
441 correction_ratio * 100.0,
442 chunk_id
443 );
444 }
445 }
446
447 self.pending_chunks.push(PendingChunk {
449 chunk_id,
450 data: encoded_data,
451 vector: chunk_vec,
452 correction,
453 });
454 self.chunk_ids.push(chunk_id);
455
456 Ok(())
457 }
458
459 pub fn progress(&self) -> StreamingProgress {
461 StreamingProgress {
462 bytes_processed: self.total_bytes.load(Ordering::Relaxed) as usize,
463 chunks_created: self.chunk_ids.len(),
464 buffer_usage: self.buffer.len(),
465 correction_overhead: self.correction_bytes.load(Ordering::Relaxed) as usize,
466 }
467 }
468}
469
470#[derive(Debug, Clone)]
472pub struct StreamingProgress {
473 pub bytes_processed: usize,
475 pub chunks_created: usize,
477 pub buffer_usage: usize,
479 pub correction_overhead: usize,
481}
482
483fn is_likely_text(size: usize) -> bool {
485 size < 1024 * 1024
487}
488
489#[cfg(feature = "tokio")]
518pub struct AsyncStreamingIngester<'a> {
519 inner: StreamingIngester<'a>,
521}
522
523#[cfg(feature = "tokio")]
524impl<'a> AsyncStreamingIngester<'a> {
525 pub fn builder(fs: &'a VersionedEmbrFS) -> AsyncStreamingIngesterBuilder<'a> {
527 AsyncStreamingIngesterBuilder {
528 inner: StreamingIngesterBuilder::new(fs),
529 }
530 }
531
532 pub async fn ingest_async_reader<R>(&mut self, mut reader: R) -> Result<(), EmbrFSError>
536 where
537 R: tokio::io::AsyncRead + Unpin,
538 {
539 use tokio::io::AsyncReadExt;
540
541 let mut buf = vec![0u8; self.inner.chunk_size];
542
543 loop {
544 let n = reader
545 .read(&mut buf)
546 .await
547 .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
548
549 if n == 0 {
550 break;
551 }
552
553 self.inner.ingest_bytes(&buf[..n])?;
555 }
556
557 Ok(())
558 }
559
560 pub async fn ingest_async_buffered<R>(&mut self, mut reader: R) -> Result<(), EmbrFSError>
564 where
565 R: tokio::io::AsyncBufRead + Unpin,
566 {
567 use tokio::io::AsyncBufReadExt;
568
569 loop {
570 let buf = reader
571 .fill_buf()
572 .await
573 .map_err(|e| EmbrFSError::IoError(e.to_string()))?;
574
575 if buf.is_empty() {
576 break;
577 }
578
579 let len = buf.len();
580 self.inner.ingest_bytes(buf)?;
581 reader.consume(len);
582 }
583
584 Ok(())
585 }
586
587 pub fn ingest_bytes(&mut self, data: &[u8]) -> Result<(), EmbrFSError> {
589 self.inner.ingest_bytes(data)
590 }
591
592 pub fn progress(&self) -> StreamingProgress {
594 self.inner.progress()
595 }
596
597 pub fn finalize(self) -> Result<StreamingResult, EmbrFSError> {
599 self.inner.finalize()
600 }
601}
602
603#[cfg(feature = "tokio")]
605pub struct AsyncStreamingIngesterBuilder<'a> {
606 inner: StreamingIngesterBuilder<'a>,
607}
608
609#[cfg(feature = "tokio")]
610impl<'a> AsyncStreamingIngesterBuilder<'a> {
611 pub fn with_path(mut self, path: impl Into<String>) -> Self {
613 self.inner = self.inner.with_path(path);
614 self
615 }
616
617 pub fn with_chunk_size(mut self, size: usize) -> Self {
619 self.inner = self.inner.with_chunk_size(size);
620 self
621 }
622
623 pub fn with_expected_version(mut self, version: u64) -> Self {
625 self.inner = self.inner.with_expected_version(version);
626 self
627 }
628
629 pub fn with_compression(mut self, codec: CompressionCodec) -> Self {
631 self.inner = self.inner.with_compression(codec);
632 self
633 }
634
635 pub fn with_adaptive_chunking(mut self, enabled: bool) -> Self {
637 self.inner = self.inner.with_adaptive_chunking(enabled);
638 self
639 }
640
641 pub fn with_correction_threshold(mut self, threshold: f64) -> Self {
643 self.inner = self.inner.with_correction_threshold(threshold);
644 self
645 }
646
647 pub fn build(self) -> Result<AsyncStreamingIngester<'a>, EmbrFSError> {
649 Ok(AsyncStreamingIngester {
650 inner: self.inner.build()?,
651 })
652 }
653}
654
655pub struct StreamingDecoder<'a> {
696 fs: &'a VersionedEmbrFS,
697 path: String,
698 chunks: Vec<ChunkId>,
699 file_size: usize,
700 version: u64,
701 current_chunk_idx: usize,
703 current_chunk_data: Vec<u8>,
704 position_in_chunk: usize,
705 total_bytes_read: usize,
706 #[allow(dead_code)]
708 is_compressed: bool,
709 #[allow(dead_code)]
711 compression_codec: Option<u8>,
712 encoding_format: Option<u8>,
714}
715
716impl<'a> StreamingDecoder<'a> {
717 pub fn new(fs: &'a VersionedEmbrFS, path: &str) -> Result<Self, EmbrFSError> {
719 let (file_entry, _) = fs
720 .manifest
721 .get_file(path)
722 .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
723
724 if file_entry.deleted {
725 return Err(EmbrFSError::FileNotFound(path.to_string()));
726 }
727
728 Ok(Self {
729 fs,
730 path: path.to_string(),
731 chunks: file_entry.chunks.clone(),
732 file_size: file_entry.size,
733 version: file_entry.version,
734 current_chunk_idx: 0,
735 current_chunk_data: Vec::new(),
736 position_in_chunk: 0,
737 total_bytes_read: 0,
738 is_compressed: file_entry
739 .compression_codec
740 .map(|c| c != 0)
741 .unwrap_or(false),
742 compression_codec: file_entry.compression_codec,
743 encoding_format: file_entry.encoding_format,
744 })
745 }
746
747 pub fn version(&self) -> u64 {
749 self.version
750 }
751
752 pub fn file_size(&self) -> usize {
754 self.file_size
755 }
756
757 pub fn chunk_count(&self) -> usize {
759 self.chunks.len()
760 }
761
762 pub fn position(&self) -> usize {
764 self.total_bytes_read
765 }
766
767 pub fn is_exhausted(&self) -> bool {
769 self.total_bytes_read >= self.file_size
770 }
771
772 pub fn progress(&self) -> StreamingDecodeProgress {
774 StreamingDecodeProgress {
775 bytes_read: self.total_bytes_read,
776 total_bytes: self.file_size,
777 chunks_decoded: self.current_chunk_idx,
778 total_chunks: self.chunks.len(),
779 }
780 }
781
782 fn decode_next_chunk(&mut self) -> Result<Option<Vec<u8>>, EmbrFSError> {
786 if self.current_chunk_idx >= self.chunks.len() {
787 return Ok(None);
788 }
789
790 let chunk_id = self.chunks[self.current_chunk_idx];
791
792 let (chunk, _) = self
794 .fs
795 .chunk_store
796 .get(chunk_id)
797 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
798
799 let decoded = if self.encoding_format == Some(ENCODING_FORMAT_REVERSIBLE_VSA) {
801 self.fs
802 .reversible_encoder()
803 .read()
804 .unwrap()
805 .decode(&chunk.vector, chunk.original_size)
806 } else {
807 chunk
808 .vector
809 .decode_data(self.fs.config(), Some(&self.path), chunk.original_size)
810 };
811
812 let corrected = self
814 .fs
815 .corrections
816 .get(chunk_id as u64)
817 .map(|(corr, _)| corr.apply(&decoded))
818 .unwrap_or(decoded);
819
820 self.current_chunk_idx += 1;
821
822 Ok(Some(corrected))
823 }
824
825 pub fn seek_to(&mut self, position: usize) -> Result<(), EmbrFSError> {
829 if position >= self.file_size {
830 self.current_chunk_idx = self.chunks.len();
831 self.current_chunk_data.clear();
832 self.position_in_chunk = 0;
833 self.total_bytes_read = self.file_size;
834 return Ok(());
835 }
836
837 let chunk_size = DEFAULT_CHUNK_SIZE;
839 let target_chunk = position / chunk_size;
840 let offset_in_chunk = position % chunk_size;
841
842 self.current_chunk_idx = target_chunk;
844 self.current_chunk_data.clear();
845 self.position_in_chunk = offset_in_chunk;
846 self.total_bytes_read = position;
847
848 if offset_in_chunk > 0 {
850 if let Some(data) = self.decode_next_chunk()? {
851 self.current_chunk_data = data;
852 self.current_chunk_idx -= 1; }
854 }
855
856 Ok(())
857 }
858
859 pub fn read_n_bytes(&mut self, n: usize) -> Result<Vec<u8>, EmbrFSError> {
863 let mut result = Vec::with_capacity(n);
864 let remaining_in_file = self.file_size.saturating_sub(self.total_bytes_read);
865 let to_read = n.min(remaining_in_file);
866
867 while result.len() < to_read {
868 if self.position_in_chunk >= self.current_chunk_data.len() {
870 match self.decode_next_chunk()? {
872 Some(data) => {
873 self.current_chunk_data = data;
874 self.position_in_chunk = 0;
875 }
876 None => break, }
878 }
879
880 let available = self.current_chunk_data.len() - self.position_in_chunk;
882 let needed = to_read - result.len();
883 let copy_len = available.min(needed);
884
885 result.extend_from_slice(
886 &self.current_chunk_data[self.position_in_chunk..self.position_in_chunk + copy_len],
887 );
888 self.position_in_chunk += copy_len;
889 self.total_bytes_read += copy_len;
890 }
891
892 let max_len = self
894 .file_size
895 .saturating_sub(self.total_bytes_read - result.len());
896 if result.len() > max_len {
897 result.truncate(max_len);
898 }
899
900 Ok(result)
901 }
902}
903
904impl std::io::Read for StreamingDecoder<'_> {
905 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
906 if self.is_exhausted() {
907 return Ok(0);
908 }
909
910 let data = self
911 .read_n_bytes(buf.len())
912 .map_err(|e| std::io::Error::other(e.to_string()))?;
913
914 let len = data.len();
915 buf[..len].copy_from_slice(&data);
916 Ok(len)
917 }
918}
919
920impl<'a> Iterator for StreamingDecoder<'a> {
922 type Item = Result<Vec<u8>, EmbrFSError>;
923
924 fn next(&mut self) -> Option<Self::Item> {
925 if self.is_exhausted() {
926 return None;
927 }
928
929 match self.decode_next_chunk() {
930 Ok(Some(mut data)) => {
931 let remaining = self.file_size.saturating_sub(self.total_bytes_read);
933 if data.len() > remaining {
934 data.truncate(remaining);
935 }
936 self.total_bytes_read += data.len();
937 self.current_chunk_data.clear();
938 self.position_in_chunk = 0;
939 Some(Ok(data))
940 }
941 Ok(None) => None,
942 Err(e) => Some(Err(e)),
943 }
944 }
945}
946
947#[derive(Debug, Clone, Copy)]
949pub struct StreamingDecodeProgress {
950 pub bytes_read: usize,
952 pub total_bytes: usize,
954 pub chunks_decoded: usize,
956 pub total_chunks: usize,
958}
959
960impl StreamingDecodeProgress {
961 pub fn percentage(&self) -> f64 {
963 if self.total_bytes == 0 {
964 1.0
965 } else {
966 self.bytes_read as f64 / self.total_bytes as f64
967 }
968 }
969}
970
971pub struct StreamingDecoderBuilder<'a> {
973 fs: &'a VersionedEmbrFS,
974 path: String,
975 start_offset: Option<usize>,
976 max_bytes: Option<usize>,
977}
978
979impl<'a> StreamingDecoderBuilder<'a> {
980 pub fn new(fs: &'a VersionedEmbrFS, path: impl Into<String>) -> Self {
982 Self {
983 fs,
984 path: path.into(),
985 start_offset: None,
986 max_bytes: None,
987 }
988 }
989
990 pub fn with_offset(mut self, offset: usize) -> Self {
992 self.start_offset = Some(offset);
993 self
994 }
995
996 pub fn with_max_bytes(mut self, max: usize) -> Self {
998 self.max_bytes = Some(max);
999 self
1000 }
1001
1002 pub fn build(self) -> Result<StreamingDecoder<'a>, EmbrFSError> {
1004 let mut decoder = StreamingDecoder::new(self.fs, &self.path)?;
1005
1006 if let Some(offset) = self.start_offset {
1007 decoder.seek_to(offset)?;
1008 }
1009
1010 Ok(decoder)
1013 }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use super::*;
1019
1020 #[test]
1021 fn test_streaming_small_file() {
1022 let fs = VersionedEmbrFS::new();
1023 let data = b"Hello, streaming world!";
1024
1025 let mut ingester = StreamingIngester::builder(&fs)
1026 .with_path("test.txt")
1027 .build()
1028 .unwrap();
1029
1030 ingester.ingest_bytes(data).unwrap();
1031 let result = ingester.finalize().unwrap();
1032
1033 assert_eq!(result.path, "test.txt");
1034 assert_eq!(result.total_bytes, data.len());
1035 assert!(result.chunk_count >= 1);
1036
1037 let (content, _) = fs.read_file("test.txt").unwrap();
1039 assert_eq!(&content[..], data);
1040 }
1041
1042 #[test]
1043 fn test_streaming_large_file() {
1044 let fs = VersionedEmbrFS::new();
1045
1046 let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 3 + 500)
1048 .map(|i| (i % 256) as u8)
1049 .collect();
1050
1051 let mut ingester = StreamingIngester::builder(&fs)
1052 .with_path("large.bin")
1053 .with_chunk_size(DEFAULT_CHUNK_SIZE)
1054 .build()
1055 .unwrap();
1056
1057 for chunk in data.chunks(1024) {
1059 ingester.ingest_bytes(chunk).unwrap();
1060 }
1061
1062 let result = ingester.finalize().unwrap();
1063
1064 assert_eq!(result.total_bytes, data.len());
1065 assert!(result.chunk_count >= 3); let (content, _) = fs.read_file("large.bin").unwrap();
1069 assert_eq!(content, data);
1070 }
1071
1072 #[test]
1073 fn test_streaming_progress() {
1074 let fs = VersionedEmbrFS::new();
1075
1076 let mut ingester = StreamingIngester::builder(&fs)
1077 .with_path("progress.txt")
1078 .with_chunk_size(1024)
1079 .build()
1080 .unwrap();
1081
1082 let p1 = ingester.progress();
1084 assert_eq!(p1.bytes_processed, 0);
1085 assert_eq!(p1.chunks_created, 0);
1086
1087 ingester.ingest_bytes(&[0u8; 500]).unwrap();
1089 let p2 = ingester.progress();
1090 assert_eq!(p2.bytes_processed, 500);
1091 assert_eq!(p2.buffer_usage, 500);
1092 assert_eq!(p2.chunks_created, 0); ingester.ingest_bytes(&[0u8; 600]).unwrap();
1096 let p3 = ingester.progress();
1097 assert_eq!(p3.bytes_processed, 1100);
1098 assert_eq!(p3.chunks_created, 1);
1099 }
1100
1101 #[test]
1102 fn test_streaming_reader() {
1103 use std::io::Cursor;
1104
1105 let fs = VersionedEmbrFS::new();
1106 let data = b"Data from a reader interface!";
1107 let reader = Cursor::new(data);
1108
1109 let mut ingester = StreamingIngester::builder(&fs)
1110 .with_path("from_reader.txt")
1111 .build()
1112 .unwrap();
1113
1114 ingester.ingest_reader(reader).unwrap();
1115 let result = ingester.finalize().unwrap();
1116
1117 assert_eq!(result.total_bytes, data.len());
1118
1119 let (content, _) = fs.read_file("from_reader.txt").unwrap();
1120 assert_eq!(&content[..], data);
1121 }
1122
1123 #[test]
1128 fn test_streaming_decoder_small_file() {
1129 let fs = VersionedEmbrFS::new();
1130 let data = b"Hello, streaming decoder!";
1131
1132 fs.write_file("decode_test.txt", data, None).unwrap();
1134
1135 let mut decoder = StreamingDecoder::new(&fs, "decode_test.txt").unwrap();
1137
1138 assert_eq!(decoder.file_size(), data.len());
1139 assert_eq!(decoder.position(), 0);
1140 assert!(!decoder.is_exhausted());
1141
1142 let read_data = decoder.read_n_bytes(data.len() + 10).unwrap();
1144
1145 assert_eq!(&read_data[..], data);
1146 assert!(decoder.is_exhausted());
1147 }
1148
1149 #[test]
1150 fn test_streaming_decoder_read_trait() {
1151 use std::io::Read;
1152
1153 let fs = VersionedEmbrFS::new();
1154 let data = b"Read trait test data";
1155
1156 fs.write_file("read_trait.txt", data, None).unwrap();
1157
1158 let mut decoder = StreamingDecoder::new(&fs, "read_trait.txt").unwrap();
1159 let mut buf = vec![0u8; 100];
1160
1161 let bytes_read = decoder.read(&mut buf).unwrap();
1162
1163 assert_eq!(bytes_read, data.len());
1164 assert_eq!(&buf[..bytes_read], data);
1165 }
1166
1167 #[test]
1168 fn test_streaming_decoder_iterator() {
1169 let fs = VersionedEmbrFS::new();
1170
1171 let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 2 + 100)
1173 .map(|i| (i % 256) as u8)
1174 .collect();
1175
1176 fs.write_file("iterator_test.bin", &data, None).unwrap();
1177
1178 let decoder = StreamingDecoder::new(&fs, "iterator_test.bin").unwrap();
1179
1180 let chunks: Vec<Vec<u8>> = decoder.map(|r| r.unwrap()).collect();
1182
1183 assert!(chunks.len() >= 2);
1185
1186 let total: Vec<u8> = chunks.into_iter().flatten().collect();
1188 assert_eq!(total, data);
1189 }
1190
1191 #[test]
1192 fn test_streaming_decoder_partial_read() {
1193 let fs = VersionedEmbrFS::new();
1194 let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
1195
1196 fs.write_file("partial.bin", &data, None).unwrap();
1197
1198 let mut decoder = StreamingDecoder::new(&fs, "partial.bin").unwrap();
1199
1200 let first_100 = decoder.read_n_bytes(100).unwrap();
1202 assert_eq!(first_100.len(), 100);
1203 assert_eq!(&first_100[..], &data[..100]);
1204
1205 assert_eq!(decoder.position(), 100);
1207
1208 let next_50 = decoder.read_n_bytes(50).unwrap();
1210 assert_eq!(next_50.len(), 50);
1211 assert_eq!(&next_50[..], &data[100..150]);
1212 }
1213
1214 #[test]
1215 fn test_streaming_decoder_seek() {
1216 let fs = VersionedEmbrFS::new();
1217 let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 3)
1218 .map(|i| (i % 256) as u8)
1219 .collect();
1220
1221 fs.write_file("seek_test.bin", &data, None).unwrap();
1222
1223 let mut decoder = StreamingDecoder::new(&fs, "seek_test.bin").unwrap();
1224
1225 let seek_pos = DEFAULT_CHUNK_SIZE + 500;
1227 decoder.seek_to(seek_pos).unwrap();
1228
1229 assert_eq!(decoder.position(), seek_pos);
1230
1231 let read_data = decoder.read_n_bytes(100).unwrap();
1233 assert_eq!(&read_data[..], &data[seek_pos..seek_pos + 100]);
1234 }
1235
1236 #[test]
1237 fn test_streaming_decoder_progress() {
1238 let fs = VersionedEmbrFS::new();
1239 let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
1240
1241 fs.write_file("progress_decode.bin", &data, None).unwrap();
1242
1243 let mut decoder = StreamingDecoder::new(&fs, "progress_decode.bin").unwrap();
1244
1245 let p1 = decoder.progress();
1246 assert_eq!(p1.bytes_read, 0);
1247 assert_eq!(p1.total_bytes, 1000);
1248 assert!((p1.percentage() - 0.0).abs() < 0.001);
1249
1250 decoder.read_n_bytes(500).unwrap();
1252
1253 let p2 = decoder.progress();
1254 assert_eq!(p2.bytes_read, 500);
1255 assert!((p2.percentage() - 0.5).abs() < 0.001);
1256
1257 decoder.read_n_bytes(500).unwrap();
1259
1260 let p3 = decoder.progress();
1261 assert_eq!(p3.bytes_read, 1000);
1262 assert!((p3.percentage() - 1.0).abs() < 0.001);
1263 }
1264
1265 #[test]
1266 fn test_streaming_decoder_builder() {
1267 let fs = VersionedEmbrFS::new();
1268 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1269
1270 fs.write_file("builder_decode.bin", &data, None).unwrap();
1271
1272 let mut decoder = StreamingDecoderBuilder::new(&fs, "builder_decode.bin")
1273 .with_offset(100)
1274 .build()
1275 .unwrap();
1276
1277 assert_eq!(decoder.position(), 100);
1278
1279 let read_data = decoder.read_n_bytes(50).unwrap();
1280 assert_eq!(&read_data[..], &data[100..150]);
1281 }
1282
1283 #[test]
1284 fn test_stream_decode_convenience_method() {
1285 let fs = VersionedEmbrFS::new();
1286 let data = b"Testing stream_decode convenience method";
1287
1288 fs.write_file("convenience.txt", data, None).unwrap();
1289
1290 let mut decoder = fs.stream_decode("convenience.txt").unwrap();
1292
1293 assert_eq!(decoder.file_size(), data.len());
1294 let read_data = decoder.read_n_bytes(data.len()).unwrap();
1295 assert_eq!(&read_data[..], data);
1296 }
1297
1298 #[test]
1299 fn test_stream_decode_range_convenience_method() {
1300 let fs = VersionedEmbrFS::new();
1301 let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
1302
1303 fs.write_file("range_convenience.bin", &data, None).unwrap();
1304
1305 let mut decoder = fs
1307 .stream_decode_range("range_convenience.bin", 500, Some(200))
1308 .unwrap();
1309
1310 assert_eq!(decoder.position(), 500);
1311 let read_data = decoder.read_n_bytes(200).unwrap();
1312 assert_eq!(&read_data[..], &data[500..700]);
1313 }
1314
1315 #[test]
1316 fn test_streaming_decoder_memory_bounded() {
1317 let fs = VersionedEmbrFS::new();
1324
1325 let num_chunks = 50;
1328 let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * num_chunks)
1329 .map(|i| (i % 256) as u8)
1330 .collect();
1331
1332 fs.write_file("memory_bounded.bin", &data, None).unwrap();
1333
1334 let mut decoder = StreamingDecoder::new(&fs, "memory_bounded.bin").unwrap();
1336
1337 assert_eq!(decoder.chunk_count(), num_chunks);
1338 assert_eq!(decoder.file_size(), data.len());
1339
1340 let mut total_read = 0;
1342 let mut chunk_count = 0;
1343 for chunk_result in &mut decoder {
1344 let chunk_data = chunk_result.unwrap();
1345 assert!(chunk_data.len() <= DEFAULT_CHUNK_SIZE);
1347 total_read += chunk_data.len();
1348 chunk_count += 1;
1349 }
1350
1351 assert_eq!(total_read, data.len());
1352 assert_eq!(chunk_count, num_chunks);
1353
1354 let mut decoder2 = StreamingDecoder::new(&fs, "memory_bounded.bin").unwrap();
1356 let full_data = decoder2.read_n_bytes(data.len()).unwrap();
1357 assert_eq!(full_data, data);
1358 }
1359
1360 #[test]
1361 fn test_streaming_decoder_seek_across_chunks() {
1362 let fs = VersionedEmbrFS::new();
1363
1364 let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 5)
1366 .map(|i| (i % 256) as u8)
1367 .collect();
1368
1369 fs.write_file("seek_multi.bin", &data, None).unwrap();
1370
1371 let mut decoder = StreamingDecoder::new(&fs, "seek_multi.bin").unwrap();
1372
1373 let seek_pos = DEFAULT_CHUNK_SIZE * 2 + 100;
1375 decoder.seek_to(seek_pos).unwrap();
1376 assert_eq!(decoder.position(), seek_pos);
1377
1378 let read_data = decoder.read_n_bytes(500).unwrap();
1380 assert_eq!(&read_data[..], &data[seek_pos..seek_pos + 500]);
1381
1382 decoder.seek_to(50).unwrap();
1384 assert_eq!(decoder.position(), 50);
1385
1386 let read_data2 = decoder.read_n_bytes(100).unwrap();
1387 assert_eq!(&read_data2[..], &data[50..150]);
1388
1389 decoder.seek_to(data.len()).unwrap();
1391 assert!(decoder.is_exhausted());
1392 }
1393}