#[cfg(feature = "alloc")]
use alloc::boxed::Box;
mod chunk;
mod decoder;
mod encoder;
#[cfg(feature = "async-tokio")]
mod async_io;
pub use chunk::{ChunkHeader, CHUNK_MAGIC};
pub use decoder::BufferStreamingDecoder;
pub use encoder::BufferStreamingEncoder;
#[cfg(feature = "std")]
pub use decoder::StreamingDecoder;
#[cfg(feature = "std")]
pub use encoder::StreamingEncoder;
#[cfg(feature = "async-tokio")]
pub use async_io::{
AsyncStreamingDecoder, AsyncStreamingEncoder, CancellableAsyncDecoder, CancellableAsyncEncoder,
CancellationToken,
};
#[cfg(feature = "async-tokio")]
pub type AsyncEncoder<W> = AsyncStreamingEncoder<W>;
#[cfg(feature = "async-tokio")]
pub type AsyncDecoder<R> = AsyncStreamingDecoder<R>;
pub const DEFAULT_CHUNK_SIZE: usize = 64 * 1024;
pub const MAX_CHUNK_SIZE: usize = 16 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub chunk_size: usize,
pub max_buffer_size: usize,
pub flush_per_item: bool,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
chunk_size: DEFAULT_CHUNK_SIZE,
max_buffer_size: 4 * 1024 * 1024, flush_per_item: false,
}
}
}
impl StreamingConfig {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size.clamp(1024, MAX_CHUNK_SIZE);
self
}
#[inline]
pub fn with_max_buffer(mut self, size: usize) -> Self {
self.max_buffer_size = size;
self
}
#[inline]
pub fn with_flush_per_item(mut self, flush: bool) -> Self {
self.flush_per_item = flush;
self
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamingProgress {
pub items_processed: u64,
pub bytes_processed: u64,
pub chunks_processed: u64,
pub estimated_total: Option<u64>,
}
impl StreamingProgress {
pub fn percentage(&self) -> Option<f64> {
self.estimated_total.map(|total| {
if total == 0 {
100.0
} else {
100.0 * (self.items_processed as f64 / total as f64)
}
})
}
}
#[cfg(feature = "alloc")]
pub type ProgressCallback = Box<dyn FnMut(&StreamingProgress) + Send>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults() {
let config = StreamingConfig::default();
assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
assert!(!config.flush_per_item);
}
#[test]
fn test_config_builder() {
let config = StreamingConfig::new()
.with_chunk_size(1024 * 1024)
.with_max_buffer(8 * 1024 * 1024)
.with_flush_per_item(true);
assert_eq!(config.chunk_size, 1024 * 1024);
assert_eq!(config.max_buffer_size, 8 * 1024 * 1024);
assert!(config.flush_per_item);
}
#[test]
fn test_chunk_size_clamping() {
let config = StreamingConfig::new().with_chunk_size(100);
assert_eq!(config.chunk_size, 1024);
let config = StreamingConfig::new().with_chunk_size(100 * 1024 * 1024);
assert_eq!(config.chunk_size, MAX_CHUNK_SIZE);
}
#[test]
fn test_progress() {
let progress = StreamingProgress {
items_processed: 50,
bytes_processed: 5000,
chunks_processed: 5,
estimated_total: Some(100),
};
assert_eq!(progress.percentage(), Some(50.0));
}
}