use crate::correction::ChunkCorrection;
use crate::versioned::{ChunkId, VersionedChunk, VersionedFileEntry};
use crate::versioned_embrfs::{
EmbrFSError, VersionedEmbrFS, DEFAULT_CHUNK_SIZE, ENCODING_FORMAT_REVERSIBLE_VSA,
};
use embeddenator_io::{wrap_or_legacy, BinaryWriteOptions, CompressionCodec, PayloadKind};
use embeddenator_vsa::SparseVec;
use sha2::{Digest, Sha256};
use std::io::{BufRead, Read};
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone)]
pub struct StreamingResult {
pub path: String,
pub total_bytes: usize,
pub chunk_count: usize,
pub version: u64,
pub correction_savings: usize,
}
struct PendingChunk {
chunk_id: ChunkId,
data: Vec<u8>,
vector: SparseVec,
correction: ChunkCorrection,
}
pub struct StreamingIngesterBuilder<'a> {
fs: &'a VersionedEmbrFS,
chunk_size: usize,
path: Option<String>,
expected_version: Option<u64>,
compression: Option<CompressionCodec>,
adaptive_chunking: bool,
correction_threshold: f64,
}
impl<'a> StreamingIngesterBuilder<'a> {
pub fn new(fs: &'a VersionedEmbrFS) -> Self {
Self {
fs,
chunk_size: DEFAULT_CHUNK_SIZE,
path: None,
expected_version: None,
compression: None,
adaptive_chunking: false,
correction_threshold: 0.1,
}
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size.clamp(512, 1024 * 1024); self
}
pub fn with_path(mut self, path: impl Into<String>) -> Self {
self.path = Some(path.into());
self
}
pub fn with_expected_version(mut self, version: u64) -> Self {
self.expected_version = Some(version);
self
}
pub fn with_compression(mut self, codec: CompressionCodec) -> Self {
self.compression = Some(codec);
self
}
pub fn with_adaptive_chunking(mut self, enabled: bool) -> Self {
self.adaptive_chunking = enabled;
self
}
pub fn with_correction_threshold(mut self, threshold: f64) -> Self {
self.correction_threshold = threshold.clamp(0.0, 1.0);
self
}
pub fn build(self) -> Result<StreamingIngester<'a>, EmbrFSError> {
let path = self.path.ok_or_else(|| {
EmbrFSError::InvalidOperation("Path must be specified for streaming ingestion".into())
})?;
Ok(StreamingIngester {
fs: self.fs,
path,
chunk_size: self.chunk_size,
expected_version: self.expected_version,
compression: self.compression,
adaptive_chunking: self.adaptive_chunking,
correction_threshold: self.correction_threshold,
buffer: Vec::with_capacity(self.chunk_size),
pending_chunks: Vec::new(),
chunk_ids: Vec::new(),
total_bytes: AtomicU64::new(0),
hasher: Sha256::new(),
correction_bytes: AtomicU64::new(0),
})
}
}
pub struct StreamingIngester<'a> {
fs: &'a VersionedEmbrFS,
path: String,
chunk_size: usize,
expected_version: Option<u64>,
compression: Option<CompressionCodec>,
adaptive_chunking: bool,
correction_threshold: f64,
buffer: Vec<u8>,
pending_chunks: Vec<PendingChunk>,
chunk_ids: Vec<ChunkId>,
total_bytes: AtomicU64,
hasher: Sha256,
correction_bytes: AtomicU64,
}
impl<'a> StreamingIngester<'a> {
pub fn builder(fs: &'a VersionedEmbrFS) -> StreamingIngesterBuilder<'a> {
StreamingIngesterBuilder::new(fs)
}
pub fn ingest_reader<R: Read>(&mut self, mut reader: R) -> Result<(), EmbrFSError> {
let mut read_buf = vec![0u8; self.chunk_size];
loop {
let bytes_read = reader
.read(&mut read_buf)
.map_err(|e| EmbrFSError::IoError(e.to_string()))?;
if bytes_read == 0 {
break; }
self.ingest_bytes(&read_buf[..bytes_read])?;
}
Ok(())
}
pub fn ingest_buffered<R: BufRead>(&mut self, mut reader: R) -> Result<(), EmbrFSError> {
loop {
let buf = reader
.fill_buf()
.map_err(|e| EmbrFSError::IoError(e.to_string()))?;
if buf.is_empty() {
break; }
let len = buf.len();
self.ingest_bytes(buf)?;
reader.consume(len);
}
Ok(())
}
pub fn ingest_bytes(&mut self, data: &[u8]) -> Result<(), EmbrFSError> {
self.total_bytes
.fetch_add(data.len() as u64, Ordering::Relaxed);
self.hasher.update(data);
self.buffer.extend_from_slice(data);
while self.buffer.len() >= self.chunk_size {
let chunk_data: Vec<u8> = self.buffer.drain(..self.chunk_size).collect();
self.process_chunk(chunk_data)?;
}
Ok(())
}
pub fn finalize(mut self) -> Result<StreamingResult, EmbrFSError> {
if !self.buffer.is_empty() {
let remaining = std::mem::take(&mut self.buffer);
self.process_chunk(remaining)?;
}
let total_bytes = self.total_bytes.load(Ordering::Relaxed) as usize;
let chunk_count = self.chunk_ids.len();
let correction_bytes = self.correction_bytes.load(Ordering::Relaxed) as usize;
let existing = self.fs.manifest.get_file(&self.path);
match (&existing, self.expected_version) {
(Some((entry, _)), Some(expected_ver)) => {
if entry.version != expected_ver {
return Err(EmbrFSError::VersionMismatch {
expected: expected_ver,
actual: entry.version,
});
}
}
(Some(_), None) => {
return Err(EmbrFSError::FileExists(self.path.clone()));
}
(None, Some(_)) => {
return Err(EmbrFSError::FileNotFound(self.path.clone()));
}
(None, None) => {}
}
let chunk_updates: Vec<_> = self
.pending_chunks
.iter()
.map(|pc| {
let mut hash_bytes = [0u8; 8];
let mut hasher = Sha256::new();
hasher.update(&pc.data);
let hash = hasher.finalize();
hash_bytes.copy_from_slice(&hash[0..8]);
(
pc.chunk_id,
VersionedChunk::new(pc.vector.clone(), pc.data.len(), hash_bytes),
)
})
.collect();
self.fs.chunk_store.batch_insert_new(chunk_updates)?;
let corrections: Vec<_> = self
.pending_chunks
.into_iter()
.map(|pc| (pc.chunk_id as u64, pc.correction))
.collect();
self.fs.corrections.batch_insert_new(corrections)?;
let is_text = total_bytes > 0 && is_likely_text(total_bytes);
let mut file_entry = if let Some(codec) = self.compression {
let codec_byte = match codec {
CompressionCodec::None => 0,
CompressionCodec::Zstd => 1,
CompressionCodec::Lz4 => 2,
};
VersionedFileEntry::new_compressed(
self.path.clone(),
is_text,
total_bytes, total_bytes, codec_byte,
self.chunk_ids.clone(),
)
} else {
VersionedFileEntry::new(
self.path.clone(),
is_text,
total_bytes,
self.chunk_ids.clone(),
)
};
if self.fs.is_holographic() {
file_entry.encoding_format = Some(ENCODING_FORMAT_REVERSIBLE_VSA);
}
let version = if let Some((entry, _)) = existing {
self.fs
.manifest
.update_file(&self.path, file_entry, entry.version)?;
entry.version + 1
} else {
self.fs.manifest.add_file(file_entry)?;
0
};
self.fs.bundle_chunks_to_root_streaming(&self.chunk_ids)?;
Ok(StreamingResult {
path: self.path,
total_bytes,
chunk_count,
version,
correction_savings: correction_bytes,
})
}
fn process_chunk(&mut self, data: Vec<u8>) -> Result<(), EmbrFSError> {
let chunk_id = self.fs.allocate_chunk_id();
let encoded_data = if let Some(codec) = self.compression {
if codec != CompressionCodec::None {
let write_opts = BinaryWriteOptions { codec, level: None };
wrap_or_legacy(PayloadKind::EngramBincode, write_opts, &data)
.map_err(|e| EmbrFSError::IoError(format!("Compression failed: {}", e)))?
} else {
data.clone()
}
} else {
data.clone()
};
let chunk_vec = if self.fs.is_holographic() {
self.fs
.reversible_encoder()
.write()
.unwrap()
.encode(&encoded_data)
} else {
SparseVec::encode_data(&encoded_data, self.fs.config(), Some(&self.path))
};
let decoded = if self.fs.is_holographic() {
self.fs
.reversible_encoder()
.read()
.unwrap()
.decode(&chunk_vec, encoded_data.len())
} else {
chunk_vec.decode_data(self.fs.config(), Some(&self.path), encoded_data.len())
};
let correction = ChunkCorrection::new(chunk_id as u64, &encoded_data, &decoded);
let corr_size = correction.storage_size();
self.correction_bytes
.fetch_add(corr_size as u64, Ordering::Relaxed);
if self.adaptive_chunking {
let correction_ratio = corr_size as f64 / encoded_data.len() as f64;
if correction_ratio > self.correction_threshold {
eprintln!(
"Warning: High correction ratio {:.2}% for chunk {} - consider adjusting parameters",
correction_ratio * 100.0,
chunk_id
);
}
}
self.pending_chunks.push(PendingChunk {
chunk_id,
data: encoded_data,
vector: chunk_vec,
correction,
});
self.chunk_ids.push(chunk_id);
Ok(())
}
pub fn progress(&self) -> StreamingProgress {
StreamingProgress {
bytes_processed: self.total_bytes.load(Ordering::Relaxed) as usize,
chunks_created: self.chunk_ids.len(),
buffer_usage: self.buffer.len(),
correction_overhead: self.correction_bytes.load(Ordering::Relaxed) as usize,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamingProgress {
pub bytes_processed: usize,
pub chunks_created: usize,
pub buffer_usage: usize,
pub correction_overhead: usize,
}
fn is_likely_text(size: usize) -> bool {
size < 1024 * 1024
}
#[cfg(feature = "tokio")]
pub struct AsyncStreamingIngester<'a> {
inner: StreamingIngester<'a>,
}
#[cfg(feature = "tokio")]
impl<'a> AsyncStreamingIngester<'a> {
pub fn builder(fs: &'a VersionedEmbrFS) -> AsyncStreamingIngesterBuilder<'a> {
AsyncStreamingIngesterBuilder {
inner: StreamingIngesterBuilder::new(fs),
}
}
pub async fn ingest_async_reader<R>(&mut self, mut reader: R) -> Result<(), EmbrFSError>
where
R: tokio::io::AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; self.inner.chunk_size];
loop {
let n = reader
.read(&mut buf)
.await
.map_err(|e| EmbrFSError::IoError(e.to_string()))?;
if n == 0 {
break;
}
self.inner.ingest_bytes(&buf[..n])?;
}
Ok(())
}
pub async fn ingest_async_buffered<R>(&mut self, mut reader: R) -> Result<(), EmbrFSError>
where
R: tokio::io::AsyncBufRead + Unpin,
{
use tokio::io::AsyncBufReadExt;
loop {
let buf = reader
.fill_buf()
.await
.map_err(|e| EmbrFSError::IoError(e.to_string()))?;
if buf.is_empty() {
break;
}
let len = buf.len();
self.inner.ingest_bytes(buf)?;
reader.consume(len);
}
Ok(())
}
pub fn ingest_bytes(&mut self, data: &[u8]) -> Result<(), EmbrFSError> {
self.inner.ingest_bytes(data)
}
pub fn progress(&self) -> StreamingProgress {
self.inner.progress()
}
pub fn finalize(self) -> Result<StreamingResult, EmbrFSError> {
self.inner.finalize()
}
}
#[cfg(feature = "tokio")]
pub struct AsyncStreamingIngesterBuilder<'a> {
inner: StreamingIngesterBuilder<'a>,
}
#[cfg(feature = "tokio")]
impl<'a> AsyncStreamingIngesterBuilder<'a> {
pub fn with_path(mut self, path: impl Into<String>) -> Self {
self.inner = self.inner.with_path(path);
self
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.inner = self.inner.with_chunk_size(size);
self
}
pub fn with_expected_version(mut self, version: u64) -> Self {
self.inner = self.inner.with_expected_version(version);
self
}
pub fn with_compression(mut self, codec: CompressionCodec) -> Self {
self.inner = self.inner.with_compression(codec);
self
}
pub fn with_adaptive_chunking(mut self, enabled: bool) -> Self {
self.inner = self.inner.with_adaptive_chunking(enabled);
self
}
pub fn with_correction_threshold(mut self, threshold: f64) -> Self {
self.inner = self.inner.with_correction_threshold(threshold);
self
}
pub fn build(self) -> Result<AsyncStreamingIngester<'a>, EmbrFSError> {
Ok(AsyncStreamingIngester {
inner: self.inner.build()?,
})
}
}
pub struct StreamingDecoder<'a> {
fs: &'a VersionedEmbrFS,
path: String,
chunks: Vec<ChunkId>,
file_size: usize,
version: u64,
current_chunk_idx: usize,
current_chunk_data: Vec<u8>,
position_in_chunk: usize,
total_bytes_read: usize,
#[allow(dead_code)]
is_compressed: bool,
#[allow(dead_code)]
compression_codec: Option<u8>,
encoding_format: Option<u8>,
}
impl<'a> StreamingDecoder<'a> {
pub fn new(fs: &'a VersionedEmbrFS, path: &str) -> Result<Self, EmbrFSError> {
let (file_entry, _) = fs
.manifest
.get_file(path)
.ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
if file_entry.deleted {
return Err(EmbrFSError::FileNotFound(path.to_string()));
}
Ok(Self {
fs,
path: path.to_string(),
chunks: file_entry.chunks.clone(),
file_size: file_entry.size,
version: file_entry.version,
current_chunk_idx: 0,
current_chunk_data: Vec::new(),
position_in_chunk: 0,
total_bytes_read: 0,
is_compressed: file_entry
.compression_codec
.map(|c| c != 0)
.unwrap_or(false),
compression_codec: file_entry.compression_codec,
encoding_format: file_entry.encoding_format,
})
}
pub fn version(&self) -> u64 {
self.version
}
pub fn file_size(&self) -> usize {
self.file_size
}
pub fn chunk_count(&self) -> usize {
self.chunks.len()
}
pub fn position(&self) -> usize {
self.total_bytes_read
}
pub fn is_exhausted(&self) -> bool {
self.total_bytes_read >= self.file_size
}
pub fn progress(&self) -> StreamingDecodeProgress {
StreamingDecodeProgress {
bytes_read: self.total_bytes_read,
total_bytes: self.file_size,
chunks_decoded: self.current_chunk_idx,
total_chunks: self.chunks.len(),
}
}
fn decode_next_chunk(&mut self) -> Result<Option<Vec<u8>>, EmbrFSError> {
if self.current_chunk_idx >= self.chunks.len() {
return Ok(None);
}
let chunk_id = self.chunks[self.current_chunk_idx];
let (chunk, _) = self
.fs
.chunk_store
.get(chunk_id)
.ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
let decoded = if self.encoding_format == Some(ENCODING_FORMAT_REVERSIBLE_VSA) {
self.fs
.reversible_encoder()
.read()
.unwrap()
.decode(&chunk.vector, chunk.original_size)
} else {
chunk
.vector
.decode_data(self.fs.config(), Some(&self.path), chunk.original_size)
};
let corrected = self
.fs
.corrections
.get(chunk_id as u64)
.map(|(corr, _)| corr.apply(&decoded))
.unwrap_or(decoded);
self.current_chunk_idx += 1;
Ok(Some(corrected))
}
pub fn seek_to(&mut self, position: usize) -> Result<(), EmbrFSError> {
if position >= self.file_size {
self.current_chunk_idx = self.chunks.len();
self.current_chunk_data.clear();
self.position_in_chunk = 0;
self.total_bytes_read = self.file_size;
return Ok(());
}
let chunk_size = DEFAULT_CHUNK_SIZE;
let target_chunk = position / chunk_size;
let offset_in_chunk = position % chunk_size;
self.current_chunk_idx = target_chunk;
self.current_chunk_data.clear();
self.position_in_chunk = offset_in_chunk;
self.total_bytes_read = position;
if offset_in_chunk > 0 {
if let Some(data) = self.decode_next_chunk()? {
self.current_chunk_data = data;
self.current_chunk_idx -= 1; }
}
Ok(())
}
pub fn read_n_bytes(&mut self, n: usize) -> Result<Vec<u8>, EmbrFSError> {
let mut result = Vec::with_capacity(n);
let remaining_in_file = self.file_size.saturating_sub(self.total_bytes_read);
let to_read = n.min(remaining_in_file);
while result.len() < to_read {
if self.position_in_chunk >= self.current_chunk_data.len() {
match self.decode_next_chunk()? {
Some(data) => {
self.current_chunk_data = data;
self.position_in_chunk = 0;
}
None => break, }
}
let available = self.current_chunk_data.len() - self.position_in_chunk;
let needed = to_read - result.len();
let copy_len = available.min(needed);
result.extend_from_slice(
&self.current_chunk_data[self.position_in_chunk..self.position_in_chunk + copy_len],
);
self.position_in_chunk += copy_len;
self.total_bytes_read += copy_len;
}
let max_len = self
.file_size
.saturating_sub(self.total_bytes_read - result.len());
if result.len() > max_len {
result.truncate(max_len);
}
Ok(result)
}
}
impl std::io::Read for StreamingDecoder<'_> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.is_exhausted() {
return Ok(0);
}
let data = self
.read_n_bytes(buf.len())
.map_err(|e| std::io::Error::other(e.to_string()))?;
let len = data.len();
buf[..len].copy_from_slice(&data);
Ok(len)
}
}
impl<'a> Iterator for StreamingDecoder<'a> {
type Item = Result<Vec<u8>, EmbrFSError>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_exhausted() {
return None;
}
match self.decode_next_chunk() {
Ok(Some(mut data)) => {
let remaining = self.file_size.saturating_sub(self.total_bytes_read);
if data.len() > remaining {
data.truncate(remaining);
}
self.total_bytes_read += data.len();
self.current_chunk_data.clear();
self.position_in_chunk = 0;
Some(Ok(data))
}
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct StreamingDecodeProgress {
pub bytes_read: usize,
pub total_bytes: usize,
pub chunks_decoded: usize,
pub total_chunks: usize,
}
impl StreamingDecodeProgress {
pub fn percentage(&self) -> f64 {
if self.total_bytes == 0 {
1.0
} else {
self.bytes_read as f64 / self.total_bytes as f64
}
}
}
pub struct StreamingDecoderBuilder<'a> {
fs: &'a VersionedEmbrFS,
path: String,
start_offset: Option<usize>,
max_bytes: Option<usize>,
}
impl<'a> StreamingDecoderBuilder<'a> {
pub fn new(fs: &'a VersionedEmbrFS, path: impl Into<String>) -> Self {
Self {
fs,
path: path.into(),
start_offset: None,
max_bytes: None,
}
}
pub fn with_offset(mut self, offset: usize) -> Self {
self.start_offset = Some(offset);
self
}
pub fn with_max_bytes(mut self, max: usize) -> Self {
self.max_bytes = Some(max);
self
}
pub fn build(self) -> Result<StreamingDecoder<'a>, EmbrFSError> {
let mut decoder = StreamingDecoder::new(self.fs, &self.path)?;
if let Some(offset) = self.start_offset {
decoder.seek_to(offset)?;
}
Ok(decoder)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_small_file() {
let fs = VersionedEmbrFS::new();
let data = b"Hello, streaming world!";
let mut ingester = StreamingIngester::builder(&fs)
.with_path("test.txt")
.build()
.unwrap();
ingester.ingest_bytes(data).unwrap();
let result = ingester.finalize().unwrap();
assert_eq!(result.path, "test.txt");
assert_eq!(result.total_bytes, data.len());
assert!(result.chunk_count >= 1);
let (content, _) = fs.read_file("test.txt").unwrap();
assert_eq!(&content[..], data);
}
#[test]
fn test_streaming_large_file() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 3 + 500)
.map(|i| (i % 256) as u8)
.collect();
let mut ingester = StreamingIngester::builder(&fs)
.with_path("large.bin")
.with_chunk_size(DEFAULT_CHUNK_SIZE)
.build()
.unwrap();
for chunk in data.chunks(1024) {
ingester.ingest_bytes(chunk).unwrap();
}
let result = ingester.finalize().unwrap();
assert_eq!(result.total_bytes, data.len());
assert!(result.chunk_count >= 3);
let (content, _) = fs.read_file("large.bin").unwrap();
assert_eq!(content, data);
}
#[test]
fn test_streaming_progress() {
let fs = VersionedEmbrFS::new();
let mut ingester = StreamingIngester::builder(&fs)
.with_path("progress.txt")
.with_chunk_size(1024)
.build()
.unwrap();
let p1 = ingester.progress();
assert_eq!(p1.bytes_processed, 0);
assert_eq!(p1.chunks_created, 0);
ingester.ingest_bytes(&[0u8; 500]).unwrap();
let p2 = ingester.progress();
assert_eq!(p2.bytes_processed, 500);
assert_eq!(p2.buffer_usage, 500);
assert_eq!(p2.chunks_created, 0);
ingester.ingest_bytes(&[0u8; 600]).unwrap();
let p3 = ingester.progress();
assert_eq!(p3.bytes_processed, 1100);
assert_eq!(p3.chunks_created, 1);
}
#[test]
fn test_streaming_reader() {
use std::io::Cursor;
let fs = VersionedEmbrFS::new();
let data = b"Data from a reader interface!";
let reader = Cursor::new(data);
let mut ingester = StreamingIngester::builder(&fs)
.with_path("from_reader.txt")
.build()
.unwrap();
ingester.ingest_reader(reader).unwrap();
let result = ingester.finalize().unwrap();
assert_eq!(result.total_bytes, data.len());
let (content, _) = fs.read_file("from_reader.txt").unwrap();
assert_eq!(&content[..], data);
}
#[test]
fn test_streaming_decoder_small_file() {
let fs = VersionedEmbrFS::new();
let data = b"Hello, streaming decoder!";
fs.write_file("decode_test.txt", data, None).unwrap();
let mut decoder = StreamingDecoder::new(&fs, "decode_test.txt").unwrap();
assert_eq!(decoder.file_size(), data.len());
assert_eq!(decoder.position(), 0);
assert!(!decoder.is_exhausted());
let read_data = decoder.read_n_bytes(data.len() + 10).unwrap();
assert_eq!(&read_data[..], data);
assert!(decoder.is_exhausted());
}
#[test]
fn test_streaming_decoder_read_trait() {
use std::io::Read;
let fs = VersionedEmbrFS::new();
let data = b"Read trait test data";
fs.write_file("read_trait.txt", data, None).unwrap();
let mut decoder = StreamingDecoder::new(&fs, "read_trait.txt").unwrap();
let mut buf = vec![0u8; 100];
let bytes_read = decoder.read(&mut buf).unwrap();
assert_eq!(bytes_read, data.len());
assert_eq!(&buf[..bytes_read], data);
}
#[test]
fn test_streaming_decoder_iterator() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 2 + 100)
.map(|i| (i % 256) as u8)
.collect();
fs.write_file("iterator_test.bin", &data, None).unwrap();
let decoder = StreamingDecoder::new(&fs, "iterator_test.bin").unwrap();
let chunks: Vec<Vec<u8>> = decoder.map(|r| r.unwrap()).collect();
assert!(chunks.len() >= 2);
let total: Vec<u8> = chunks.into_iter().flatten().collect();
assert_eq!(total, data);
}
#[test]
fn test_streaming_decoder_partial_read() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
fs.write_file("partial.bin", &data, None).unwrap();
let mut decoder = StreamingDecoder::new(&fs, "partial.bin").unwrap();
let first_100 = decoder.read_n_bytes(100).unwrap();
assert_eq!(first_100.len(), 100);
assert_eq!(&first_100[..], &data[..100]);
assert_eq!(decoder.position(), 100);
let next_50 = decoder.read_n_bytes(50).unwrap();
assert_eq!(next_50.len(), 50);
assert_eq!(&next_50[..], &data[100..150]);
}
#[test]
fn test_streaming_decoder_seek() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 3)
.map(|i| (i % 256) as u8)
.collect();
fs.write_file("seek_test.bin", &data, None).unwrap();
let mut decoder = StreamingDecoder::new(&fs, "seek_test.bin").unwrap();
let seek_pos = DEFAULT_CHUNK_SIZE + 500;
decoder.seek_to(seek_pos).unwrap();
assert_eq!(decoder.position(), seek_pos);
let read_data = decoder.read_n_bytes(100).unwrap();
assert_eq!(&read_data[..], &data[seek_pos..seek_pos + 100]);
}
#[test]
fn test_streaming_decoder_progress() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
fs.write_file("progress_decode.bin", &data, None).unwrap();
let mut decoder = StreamingDecoder::new(&fs, "progress_decode.bin").unwrap();
let p1 = decoder.progress();
assert_eq!(p1.bytes_read, 0);
assert_eq!(p1.total_bytes, 1000);
assert!((p1.percentage() - 0.0).abs() < 0.001);
decoder.read_n_bytes(500).unwrap();
let p2 = decoder.progress();
assert_eq!(p2.bytes_read, 500);
assert!((p2.percentage() - 0.5).abs() < 0.001);
decoder.read_n_bytes(500).unwrap();
let p3 = decoder.progress();
assert_eq!(p3.bytes_read, 1000);
assert!((p3.percentage() - 1.0).abs() < 0.001);
}
#[test]
fn test_streaming_decoder_builder() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
fs.write_file("builder_decode.bin", &data, None).unwrap();
let mut decoder = StreamingDecoderBuilder::new(&fs, "builder_decode.bin")
.with_offset(100)
.build()
.unwrap();
assert_eq!(decoder.position(), 100);
let read_data = decoder.read_n_bytes(50).unwrap();
assert_eq!(&read_data[..], &data[100..150]);
}
#[test]
fn test_stream_decode_convenience_method() {
let fs = VersionedEmbrFS::new();
let data = b"Testing stream_decode convenience method";
fs.write_file("convenience.txt", data, None).unwrap();
let mut decoder = fs.stream_decode("convenience.txt").unwrap();
assert_eq!(decoder.file_size(), data.len());
let read_data = decoder.read_n_bytes(data.len()).unwrap();
assert_eq!(&read_data[..], data);
}
#[test]
fn test_stream_decode_range_convenience_method() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
fs.write_file("range_convenience.bin", &data, None).unwrap();
let mut decoder = fs
.stream_decode_range("range_convenience.bin", 500, Some(200))
.unwrap();
assert_eq!(decoder.position(), 500);
let read_data = decoder.read_n_bytes(200).unwrap();
assert_eq!(&read_data[..], &data[500..700]);
}
#[test]
fn test_streaming_decoder_memory_bounded() {
let fs = VersionedEmbrFS::new();
let num_chunks = 50;
let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * num_chunks)
.map(|i| (i % 256) as u8)
.collect();
fs.write_file("memory_bounded.bin", &data, None).unwrap();
let mut decoder = StreamingDecoder::new(&fs, "memory_bounded.bin").unwrap();
assert_eq!(decoder.chunk_count(), num_chunks);
assert_eq!(decoder.file_size(), data.len());
let mut total_read = 0;
let mut chunk_count = 0;
for chunk_result in &mut decoder {
let chunk_data = chunk_result.unwrap();
assert!(chunk_data.len() <= DEFAULT_CHUNK_SIZE);
total_read += chunk_data.len();
chunk_count += 1;
}
assert_eq!(total_read, data.len());
assert_eq!(chunk_count, num_chunks);
let mut decoder2 = StreamingDecoder::new(&fs, "memory_bounded.bin").unwrap();
let full_data = decoder2.read_n_bytes(data.len()).unwrap();
assert_eq!(full_data, data);
}
#[test]
fn test_streaming_decoder_seek_across_chunks() {
let fs = VersionedEmbrFS::new();
let data: Vec<u8> = (0..DEFAULT_CHUNK_SIZE * 5)
.map(|i| (i % 256) as u8)
.collect();
fs.write_file("seek_multi.bin", &data, None).unwrap();
let mut decoder = StreamingDecoder::new(&fs, "seek_multi.bin").unwrap();
let seek_pos = DEFAULT_CHUNK_SIZE * 2 + 100;
decoder.seek_to(seek_pos).unwrap();
assert_eq!(decoder.position(), seek_pos);
let read_data = decoder.read_n_bytes(500).unwrap();
assert_eq!(&read_data[..], &data[seek_pos..seek_pos + 500]);
decoder.seek_to(50).unwrap();
assert_eq!(decoder.position(), 50);
let read_data2 = decoder.read_n_bytes(100).unwrap();
assert_eq!(&read_data2[..], &data[50..150]);
decoder.seek_to(data.len()).unwrap();
assert!(decoder.is_exhausted());
}
}