use std::cmp;
use std::io::{self, BufRead, Read, Seek, SeekFrom, Write};
use crate::error::{Result, ZiporaError};
use crate::memory::SecureMemoryPool;
use crate::memory::simd_ops;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct StreamBufferConfig {
pub initial_capacity: usize,
pub max_capacity: usize,
pub growth_factor: f64,
pub page_alignment: usize,
pub use_secure_pool: bool,
pub bulk_read_threshold: usize,
pub enable_readahead: bool,
pub readahead_multiplier: usize,
}
impl Default for StreamBufferConfig {
fn default() -> Self {
Self {
initial_capacity: 64 * 1024, max_capacity: 2 * 1024 * 1024, growth_factor: 1.618, page_alignment: 4096, use_secure_pool: true, bulk_read_threshold: 8192, enable_readahead: true, readahead_multiplier: 2, }
}
}
impl StreamBufferConfig {
pub fn performance_optimized() -> Self {
Self {
initial_capacity: 128 * 1024, max_capacity: 4 * 1024 * 1024, growth_factor: 2.0, bulk_read_threshold: 4096, enable_readahead: true,
readahead_multiplier: 4, ..Default::default()
}
}
pub fn memory_efficient() -> Self {
Self {
initial_capacity: 16 * 1024, max_capacity: 512 * 1024, growth_factor: 1.414, bulk_read_threshold: 16384, enable_readahead: false, readahead_multiplier: 1,
..Default::default()
}
}
pub fn low_latency() -> Self {
Self {
initial_capacity: 8 * 1024, max_capacity: 256 * 1024, growth_factor: 1.5,
bulk_read_threshold: 2048, enable_readahead: false, readahead_multiplier: 1,
..Default::default()
}
}
}
pub struct StreamBufferedReader<R> {
inner: R,
buffer: Box<[u8]>,
pos: usize, end: usize, config: StreamBufferConfig,
total_read: u64, pool: Option<Arc<SecureMemoryPool>>,
}
impl<R: Read> StreamBufferedReader<R> {
pub fn new(inner: R) -> Result<Self> {
Self::with_config(inner, StreamBufferConfig::default())
}
pub fn with_config(inner: R, config: StreamBufferConfig) -> Result<Self> {
let pool = if config.use_secure_pool {
Some(SecureMemoryPool::new(
crate::memory::SecurePoolConfig::small_secure()
)?)
} else {
None
};
let buffer = Self::allocate_aligned_buffer(config.initial_capacity, config.page_alignment)?;
Ok(Self {
inner,
buffer,
pos: 0,
end: 0,
config,
total_read: 0,
pool,
})
}
pub fn performance_optimized(inner: R) -> Result<Self> {
Self::with_config(inner, StreamBufferConfig::performance_optimized())
}
pub fn memory_efficient(inner: R) -> Result<Self> {
Self::with_config(inner, StreamBufferConfig::memory_efficient())
}
pub fn low_latency(inner: R) -> Result<Self> {
Self::with_config(inner, StreamBufferConfig::low_latency())
}
fn allocate_aligned_buffer(size: usize, alignment: usize) -> Result<Box<[u8]>> {
let aligned_size = (size + alignment - 1) & !(alignment - 1);
let buffer = vec![0u8; aligned_size].into_boxed_slice();
Ok(buffer)
}
pub fn get_ref(&self) -> &R {
&self.inner
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
pub fn into_inner(self) -> R {
self.inner
}
#[inline]
pub fn capacity(&self) -> usize {
self.buffer.len()
}
pub fn buffer_usage(&self) -> usize {
self.end - self.pos
}
pub fn total_read(&self) -> u64 {
self.total_read
}
#[inline]
pub fn has_data_in_buffer(&self) -> bool {
self.pos < self.end
}
pub fn ensure_buffered(&mut self, needed: usize) -> Result<usize> {
let available = self.end - self.pos;
if available >= needed {
return Ok(available);
}
self.fill_buffer(needed)
}
fn fill_buffer(&mut self, min_needed: usize) -> Result<usize> {
self.fill_buffer_simd(min_needed)
}
fn fill_buffer_simd(&mut self, min_needed: usize) -> Result<usize> {
if self.pos > 0 {
let remaining = self.end - self.pos;
if remaining > 0 {
self.buffer.copy_within(self.pos..self.pos + remaining, 0);
}
self.end = remaining;
self.pos = 0;
}
let read_size = if self.config.enable_readahead {
let readahead_size = min_needed * self.config.readahead_multiplier;
cmp::min(
cmp::max(min_needed, readahead_size),
self.buffer.len() - self.end,
)
} else {
cmp::min(min_needed, self.buffer.len() - self.end)
};
if read_size == 0 {
return self.grow_buffer_and_retry(min_needed);
}
let bytes_read = self.inner.read(&mut self.buffer[self.end..self.end + read_size])
.map_err(|e| ZiporaError::io_error(format!("Failed to fill buffer: {}", e)))?;
self.end += bytes_read;
self.total_read += bytes_read as u64;
Ok(self.end - self.pos)
}
fn grow_buffer_and_retry(&mut self, min_needed: usize) -> Result<usize> {
let current_capacity = self.buffer.len();
let new_capacity = cmp::min(
cmp::max(
(current_capacity as f64 * self.config.growth_factor) as usize,
current_capacity + min_needed,
),
self.config.max_capacity,
);
if new_capacity <= current_capacity {
return Err(ZiporaError::io_error(
format!("Buffer at maximum capacity ({} bytes), cannot satisfy request for {} bytes",
current_capacity, min_needed)
));
}
let mut new_buffer = Self::allocate_aligned_buffer(new_capacity, self.config.page_alignment)?;
let existing_data = self.end - self.pos;
new_buffer[..existing_data].copy_from_slice(&self.buffer[self.pos..self.end]);
self.buffer = new_buffer;
self.end = existing_data;
self.pos = 0;
self.fill_buffer(min_needed)
}
#[inline]
pub fn read_byte_fast(&mut self) -> Result<u8> {
if self.pos < self.end {
let byte = self.buffer[self.pos];
self.pos += 1;
Ok(byte)
} else {
self.read_byte_slow()
}
}
#[cold]
fn read_byte_slow(&mut self) -> Result<u8> {
self.ensure_buffered(1)?;
if self.pos < self.end {
let byte = self.buffer[self.pos];
self.pos += 1;
Ok(byte)
} else {
Err(ZiporaError::io_error("Unexpected end of stream"))
}
}
pub fn read_slice(&mut self, len: usize) -> Result<Option<&[u8]>> {
self.ensure_buffered(len)?;
if self.pos + len <= self.end {
let slice = &self.buffer[self.pos..self.pos + len];
self.pos += len;
Ok(Some(slice))
} else {
Ok(None)
}
}
pub fn read_bulk(&mut self, buf: &mut [u8]) -> Result<usize> {
if buf.len() >= self.config.bulk_read_threshold {
let buffered = self.end - self.pos;
if buffered > 0 {
let to_copy = cmp::min(buffered, buf.len());
buf[..to_copy].copy_from_slice(&self.buffer[self.pos..self.pos + to_copy]);
self.pos += to_copy;
if to_copy == buf.len() {
return Ok(to_copy);
}
let remaining = self.inner.read(&mut buf[to_copy..])
.map_err(|e| ZiporaError::io_error(format!("Bulk read failed: {}", e)))?;
self.total_read += remaining as u64;
Ok(to_copy + remaining)
} else {
let bytes_read = self.inner.read(buf)
.map_err(|e| ZiporaError::io_error(format!("Bulk read failed: {}", e)))?;
self.total_read += bytes_read as u64;
Ok(bytes_read)
}
} else {
self.read_buffered(buf)
}
}
fn read_buffered(&mut self, buf: &mut [u8]) -> Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let mut total_read = 0;
let mut remaining = buf;
while !remaining.is_empty() {
let available = self.ensure_buffered(remaining.len())?;
if available == 0 {
break; }
let to_copy = cmp::min(available, remaining.len());
remaining[..to_copy].copy_from_slice(&self.buffer[self.pos..self.pos + to_copy]);
self.pos += to_copy;
total_read += to_copy;
remaining = &mut remaining[to_copy..];
}
Ok(total_read)
}
pub fn read_simd_optimized(&mut self, buf: &mut [u8]) -> Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let mut total_read = 0;
let mut remaining = buf;
while !remaining.is_empty() {
let available = self.ensure_buffered(remaining.len())?;
if available == 0 {
break; }
let to_copy = cmp::min(available, remaining.len());
let src_slice = &self.buffer[self.pos..self.pos + to_copy];
let dst_slice = &mut remaining[..to_copy];
if let Err(_) = simd_ops::fast_copy(src_slice, dst_slice) {
dst_slice.copy_from_slice(src_slice);
}
self.pos += to_copy;
total_read += to_copy;
remaining = &mut remaining[to_copy..];
}
Ok(total_read)
}
pub fn validate_utf8_buffered(&self) -> Result<bool> {
if self.pos >= self.end {
return Ok(true); }
let buffered_data = &self.buffer[self.pos..self.end];
Ok(std::str::from_utf8(buffered_data).is_ok())
}
}
impl<R: Read> Read for StreamBufferedReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.read_bulk(buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
}
impl<R: Read> BufRead for StreamBufferedReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.pos >= self.end {
self.fill_buffer(1)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
}
Ok(&self.buffer[self.pos..self.end])
}
fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt, self.end);
}
}
impl<R: Read + Seek> Seek for StreamBufferedReader<R> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.pos = 0;
self.end = 0;
self.inner.seek(pos)
}
}
pub struct StreamBufferedWriter<W> {
inner: W,
buffer: Box<[u8]>,
pos: usize, config: StreamBufferConfig,
total_written: u64,
pool: Option<Arc<SecureMemoryPool>>,
}
impl<W: Write> StreamBufferedWriter<W> {
pub fn new(inner: W) -> Result<Self> {
Self::with_config(inner, StreamBufferConfig::default())
}
pub fn with_config(inner: W, config: StreamBufferConfig) -> Result<Self> {
let pool = if config.use_secure_pool {
Some(SecureMemoryPool::new(
crate::memory::SecurePoolConfig::small_secure()
)?)
} else {
None
};
let buffer = StreamBufferedReader::<std::io::Empty>::allocate_aligned_buffer(
config.initial_capacity,
config.page_alignment
)?;
Ok(Self {
inner,
buffer,
pos: 0,
config,
total_written: 0,
pool,
})
}
pub fn get_ref(&self) -> &W {
&self.inner
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner
}
pub fn into_inner(mut self) -> io::Result<W> {
self.flush()?;
Ok(self.inner)
}
#[inline]
pub fn capacity(&self) -> usize {
self.buffer.len()
}
pub fn buffer_usage(&self) -> usize {
self.pos
}
pub fn total_written(&self) -> u64 {
self.total_written
}
#[inline]
pub fn write_byte_fast(&mut self, byte: u8) -> Result<()> {
if self.pos < self.buffer.len() {
self.buffer[self.pos] = byte;
self.pos += 1;
Ok(())
} else {
self.write_byte_slow(byte)
}
}
#[cold]
fn write_byte_slow(&mut self, byte: u8) -> Result<()> {
self.flush_buffer()?;
self.buffer[0] = byte;
self.pos = 1;
Ok(())
}
fn flush_buffer(&mut self) -> Result<()> {
if self.pos > 0 {
self.inner.write_all(&self.buffer[..self.pos])
.map_err(|e| ZiporaError::io_error(format!("Failed to flush buffer: {}", e)))?;
self.total_written += self.pos as u64;
self.pos = 0;
}
Ok(())
}
}
impl<W: Write> Write for StreamBufferedWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.len() >= self.config.bulk_read_threshold {
self.flush_buffer()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let written = self.inner.write(buf)?;
self.total_written += written as u64;
Ok(written)
} else {
let mut remaining = buf;
let mut total_written = 0;
while !remaining.is_empty() {
let available = self.buffer.len() - self.pos;
if available == 0 {
self.flush_buffer()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
continue;
}
let to_copy = cmp::min(available, remaining.len());
self.buffer[self.pos..self.pos + to_copy].copy_from_slice(&remaining[..to_copy]);
self.pos += to_copy;
total_written += to_copy;
remaining = &remaining[to_copy..];
}
Ok(total_written)
}
}
fn flush(&mut self) -> io::Result<()> {
self.flush_buffer()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.inner.flush()
}
}
impl<W: Write + Seek> Seek for StreamBufferedWriter<W> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.flush()?;
self.inner.seek(pos)
}
}
#[inline(always)]
fn likely(condition: bool) -> bool {
condition
}
#[cold]
#[inline(never)]
fn unlikely() {}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_stream_buffered_reader_basic() {
let data = b"Hello, World! This is a test of buffered reading.";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = [0u8; 5];
assert_eq!(reader.read(&mut buf).unwrap(), 5);
assert_eq!(&buf, b"Hello");
let mut buf = [0u8; 7];
assert_eq!(reader.read(&mut buf).unwrap(), 7);
assert_eq!(&buf, b", World");
}
#[test]
fn test_stream_buffered_reader_byte_reading() {
let data = b"ABC";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
assert_eq!(reader.read_byte_fast().unwrap(), b'A');
assert_eq!(reader.read_byte_fast().unwrap(), b'B');
assert_eq!(reader.read_byte_fast().unwrap(), b'C');
assert!(reader.read_byte_fast().is_err());
}
#[test]
fn test_stream_buffered_reader_slice_reading() {
let data = b"Hello, World!";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let slice = reader.read_slice(5).unwrap();
assert_eq!(slice, Some(&b"Hello"[..]));
let slice = reader.read_slice(2).unwrap();
assert_eq!(slice, Some(&b", "[..]));
}
#[test]
fn test_stream_buffered_reader_bulk_read() {
let data = vec![42u8; 100_000]; let cursor = Cursor::new(data.clone());
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; 100_000];
let bytes_read = reader.read_bulk(&mut buf).unwrap();
assert_eq!(bytes_read, 100_000);
assert_eq!(buf, data);
}
#[test]
fn test_stream_buffered_reader_configurations() {
let data = b"Test data";
let cursor = Cursor::new(data);
let reader = StreamBufferedReader::performance_optimized(cursor).unwrap();
assert!(reader.capacity() >= 128 * 1024);
let cursor = Cursor::new(data);
let reader = StreamBufferedReader::memory_efficient(cursor).unwrap();
assert_eq!(reader.capacity(), 16 * 1024);
let cursor = Cursor::new(data);
let reader = StreamBufferedReader::low_latency(cursor).unwrap();
assert_eq!(reader.capacity(), 8 * 1024);
}
#[test]
fn test_stream_buffered_writer_basic() {
let mut buffer = Vec::new();
{
let cursor = Cursor::new(&mut buffer);
let mut writer = StreamBufferedWriter::new(cursor).unwrap();
writer.write_all(b"Hello").unwrap();
writer.write_all(b", ").unwrap();
writer.write_all(b"World!").unwrap();
writer.flush().unwrap();
}
assert_eq!(buffer, b"Hello, World!");
}
#[test]
fn test_stream_buffered_writer_byte_writing() {
let mut buffer = Vec::new();
{
let cursor = Cursor::new(&mut buffer);
let mut writer = StreamBufferedWriter::new(cursor).unwrap();
writer.write_byte_fast(b'A').unwrap();
writer.write_byte_fast(b'B').unwrap();
writer.write_byte_fast(b'C').unwrap();
writer.flush().unwrap();
}
assert_eq!(buffer, b"ABC");
}
#[test]
fn test_stream_buffered_writer_large_write() {
let data = vec![42u8; 100_000];
let mut buffer = Vec::new();
{
let cursor = Cursor::new(&mut buffer);
let mut writer = StreamBufferedWriter::new(cursor).unwrap();
writer.write_all(&data).unwrap();
writer.flush().unwrap();
}
assert_eq!(buffer.len(), 100_000);
assert_eq!(buffer, data);
}
#[test]
fn test_stream_buffer_round_trip() {
let original_data = b"The quick brown fox jumps over the lazy dog. ".repeat(1000);
let mut buffer = Vec::new();
{
let cursor = Cursor::new(&mut buffer);
let mut writer = StreamBufferedWriter::new(cursor).unwrap();
writer.write_all(&original_data).unwrap();
writer.flush().unwrap();
}
let cursor = Cursor::new(&buffer);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut read_data = Vec::new();
reader.read_to_end(&mut read_data).unwrap();
assert_eq!(read_data, original_data);
}
#[test]
fn test_buffer_statistics() {
let data = b"Hello, World!";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
assert_eq!(reader.total_read(), 0);
assert!(!reader.has_data_in_buffer());
let mut buf = [0u8; 5];
reader.read(&mut buf).unwrap();
assert!(reader.total_read() > 0);
assert!(reader.has_data_in_buffer() || reader.total_read() == data.len() as u64);
}
#[test]
fn test_stream_reader_simd_optimized_read() {
let data = b"Hello, SIMD World! This tests SIMD-optimized reading.";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; data.len()];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert_eq!(bytes_read, data.len());
assert_eq!(&buf[..], data);
}
#[test]
fn test_stream_reader_simd_optimized_read_partial() {
let data = b"Hello, World!";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; 7];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert_eq!(bytes_read, 7);
assert_eq!(&buf, b"Hello, ");
let mut buf2 = vec![0u8; 6];
let bytes_read2 = reader.read_simd_optimized(&mut buf2).unwrap();
assert_eq!(bytes_read2, 6);
assert_eq!(&buf2, b"World!");
}
#[test]
fn test_stream_reader_simd_optimized_read_large() {
let large_data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
let cursor = Cursor::new(&large_data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; large_data.len()];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert_eq!(bytes_read, large_data.len());
assert_eq!(&buf, &large_data[..]);
}
#[test]
fn test_stream_reader_utf8_validation_valid() {
let data = "Hello, World! Valid UTF-8 text with unicode: δΈη π¦";
let cursor = Cursor::new(data.as_bytes());
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; 10];
let _ = reader.read(&mut buf).unwrap();
let is_valid = reader.validate_utf8_buffered().unwrap();
assert!(is_valid, "Valid UTF-8 should pass validation");
}
#[test]
fn test_stream_reader_utf8_validation_invalid() {
let mut data = Vec::from(b"Hello, ".as_ref());
data.push(0xFF); data.extend_from_slice(b" World!");
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; 5];
let _ = reader.read(&mut buf).unwrap();
let is_valid = reader.validate_utf8_buffered().unwrap();
assert!(!is_valid, "Invalid UTF-8 should fail validation");
}
#[test]
fn test_stream_reader_utf8_validation_empty_buffer() {
let data = b"Hello, World!";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; data.len()];
let _ = reader.read(&mut buf).unwrap();
let is_valid = reader.validate_utf8_buffered().unwrap();
assert!(is_valid, "Empty buffer should be valid UTF-8");
}
#[test]
fn test_stream_reader_utf8_validation_multibyte() {
let test_cases = vec![
("cafΓ©", true), ("ζ₯ζ¬θͺ", true), ("π¦π", true), ("Hello, δΈη! π¦", true), ];
for (text, expected_valid) in test_cases {
let cursor = Cursor::new(text.as_bytes());
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let _ = reader.ensure_buffered(text.len());
let is_valid = reader.validate_utf8_buffered().unwrap();
assert_eq!(is_valid, expected_valid, "UTF-8 validation mismatch for: {}", text);
}
}
#[test]
fn test_stream_reader_simd_vs_standard_read() {
let data = b"The quick brown fox jumps over the lazy dog";
let cursor1 = Cursor::new(data);
let mut reader1 = StreamBufferedReader::new(cursor1).unwrap();
let mut buf1 = vec![0u8; data.len()];
let bytes_read1 = reader1.read_simd_optimized(&mut buf1).unwrap();
let cursor2 = Cursor::new(data);
let mut reader2 = StreamBufferedReader::new(cursor2).unwrap();
let mut buf2 = vec![0u8; data.len()];
let bytes_read2 = reader2.read(&mut buf2).unwrap();
assert_eq!(bytes_read1, bytes_read2);
assert_eq!(buf1, buf2);
assert_eq!(&buf1[..], data);
}
#[test]
fn test_stream_reader_simd_with_different_configs() {
let data = b"Test data for different buffer configurations";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::performance_optimized(cursor).unwrap();
let mut buf = vec![0u8; data.len()];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert_eq!(bytes_read, data.len());
assert_eq!(&buf, data);
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::memory_efficient(cursor).unwrap();
let mut buf = vec![0u8; data.len()];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert_eq!(bytes_read, data.len());
assert_eq!(&buf, data);
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::low_latency(cursor).unwrap();
let mut buf = vec![0u8; data.len()];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert_eq!(bytes_read, data.len());
assert_eq!(&buf, data);
}
#[test]
fn test_stream_reader_simd_read_empty() {
let data = b"";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; 10];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert_eq!(bytes_read, 0);
}
#[test]
fn test_stream_reader_utf8_large_data() {
let large_text = "Hello, World! δΈη π¦ ".repeat(1000);
let cursor = Cursor::new(large_text.as_bytes());
let mut reader = StreamBufferedReader::new(cursor).unwrap();
let mut buf = vec![0u8; 27]; let _ = reader.read(&mut buf).unwrap();
let is_valid = reader.validate_utf8_buffered().unwrap();
assert!(is_valid, "Large valid UTF-8 buffer should pass validation");
}
#[test]
fn test_stream_reader_simd_fill_buffer() {
let data = b"Testing SIMD-optimized buffer compaction";
let cursor = Cursor::new(data);
let mut reader = StreamBufferedReader::new(cursor).unwrap();
for _ in 0..5 {
let mut buf = vec![0u8; 5];
let _ = reader.read(&mut buf);
}
let mut buf = vec![0u8; 100];
let bytes_read = reader.read_simd_optimized(&mut buf).unwrap();
assert!(bytes_read > 0 || reader.total_read() == data.len() as u64);
}
}