use super::chunk::ChunkHeader;
use super::{StreamingConfig, StreamingProgress};
use crate::enc::{Encode, EncoderImpl, VecWriter};
use crate::{config, Result};
#[cfg(feature = "std")]
use super::ProgressCallback;
#[cfg(feature = "std")]
use crate::Error;
#[cfg(feature = "alloc")]
extern crate alloc;
#[cfg(feature = "std")]
use std::io::Write;
#[cfg(feature = "std")]
pub struct StreamingEncoder<W: Write> {
writer: W,
config: StreamingConfig,
buffer: alloc::vec::Vec<u8>,
items_in_buffer: u32,
progress: StreamingProgress,
progress_callback: Option<ProgressCallback>,
}
#[cfg(feature = "std")]
impl<W: Write> StreamingEncoder<W> {
pub fn new(writer: W) -> Self {
Self::with_config(writer, StreamingConfig::default())
}
pub fn with_config(writer: W, config: StreamingConfig) -> Self {
Self {
writer,
config,
buffer: alloc::vec::Vec::new(),
items_in_buffer: 0,
progress: StreamingProgress::default(),
progress_callback: None,
}
}
pub fn with_progress_callback(mut self, callback: ProgressCallback) -> Self {
self.progress_callback = Some(callback);
self
}
pub fn set_estimated_total(&mut self, total: u64) {
self.progress.estimated_total = Some(total);
}
pub fn write_item<T: Encode>(&mut self, item: &T) -> Result<()> {
let item_writer = VecWriter::new();
let mut encoder = EncoderImpl::new(item_writer, config::standard());
item.encode(&mut encoder)?;
let item_bytes = encoder.into_writer().into_vec();
if !self.buffer.is_empty() && self.buffer.len() + item_bytes.len() > self.config.chunk_size
{
self.flush_chunk()?;
}
self.buffer.extend_from_slice(&item_bytes);
self.items_in_buffer += 1;
if self.config.flush_per_item {
self.flush_chunk()?;
}
Ok(())
}
pub fn write_all<T: Encode, I: IntoIterator<Item = T>>(&mut self, items: I) -> Result<()> {
for item in items {
self.write_item(&item)?;
}
Ok(())
}
fn flush_chunk(&mut self) -> Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let header = ChunkHeader::data(self.buffer.len() as u32, self.items_in_buffer);
self.writer
.write_all(&header.to_bytes())
.map_err(|e| Error::Io {
kind: e.kind(),
message: e.to_string(),
})?;
self.writer.write_all(&self.buffer).map_err(|e| Error::Io {
kind: e.kind(),
message: e.to_string(),
})?;
self.progress.items_processed += self.items_in_buffer as u64;
self.progress.bytes_processed += self.buffer.len() as u64;
self.progress.chunks_processed += 1;
if let Some(ref mut callback) = self.progress_callback {
callback(&self.progress);
}
self.buffer.clear();
self.items_in_buffer = 0;
Ok(())
}
pub fn finish(mut self) -> Result<W> {
self.flush_chunk()?;
let end_header = ChunkHeader::end();
self.writer
.write_all(&end_header.to_bytes())
.map_err(|e| Error::Io {
kind: e.kind(),
message: e.to_string(),
})?;
Ok(self.writer)
}
pub fn progress(&self) -> &StreamingProgress {
&self.progress
}
pub fn get_ref(&self) -> &W {
&self.writer
}
}
#[cfg(feature = "alloc")]
pub struct BufferStreamingEncoder {
buffer: alloc::vec::Vec<u8>,
config: StreamingConfig,
chunk_buffer: alloc::vec::Vec<u8>,
items_in_chunk: u32,
progress: StreamingProgress,
}
#[cfg(feature = "alloc")]
impl BufferStreamingEncoder {
pub fn new() -> Self {
Self::with_config(StreamingConfig::default())
}
pub fn with_config(config: StreamingConfig) -> Self {
Self {
buffer: alloc::vec::Vec::new(),
config,
chunk_buffer: alloc::vec::Vec::new(),
items_in_chunk: 0,
progress: StreamingProgress::default(),
}
}
pub fn write_item<T: Encode>(&mut self, item: &T) -> Result<()> {
let item_writer = VecWriter::new();
let mut encoder = EncoderImpl::new(item_writer, config::standard());
item.encode(&mut encoder)?;
let item_bytes = encoder.into_writer().into_vec();
if !self.chunk_buffer.is_empty()
&& self.chunk_buffer.len() + item_bytes.len() > self.config.chunk_size
{
self.flush_chunk();
}
self.chunk_buffer.extend_from_slice(&item_bytes);
self.items_in_chunk += 1;
Ok(())
}
fn flush_chunk(&mut self) {
if self.chunk_buffer.is_empty() {
return;
}
let header = ChunkHeader::data(self.chunk_buffer.len() as u32, self.items_in_chunk);
self.buffer.extend_from_slice(&header.to_bytes());
self.buffer.extend_from_slice(&self.chunk_buffer);
self.progress.items_processed += self.items_in_chunk as u64;
self.progress.bytes_processed += self.chunk_buffer.len() as u64;
self.progress.chunks_processed += 1;
self.chunk_buffer.clear();
self.items_in_chunk = 0;
}
pub fn finish(mut self) -> alloc::vec::Vec<u8> {
self.flush_chunk();
let end_header = ChunkHeader::end();
self.buffer.extend_from_slice(&end_header.to_bytes());
self.buffer
}
pub fn progress(&self) -> &StreamingProgress {
&self.progress
}
}
#[cfg(feature = "alloc")]
impl Default for BufferStreamingEncoder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "alloc")]
#[test]
fn test_buffer_encoder_basic() {
let mut encoder = BufferStreamingEncoder::new();
encoder.write_item(&42u32).expect("write failed");
encoder.write_item(&100u32).expect("write failed");
encoder.write_item(&255u32).expect("write failed");
let data = encoder.finish();
assert!(!data.is_empty());
assert!(data.len() > ChunkHeader::SIZE * 2);
}
#[cfg(feature = "std")]
#[test]
fn test_streaming_encoder_io() {
let mut buffer = alloc::vec::Vec::new();
{
let mut encoder = StreamingEncoder::new(&mut buffer);
for i in 0..100u32 {
encoder.write_item(&i).expect("write failed");
}
encoder.finish().expect("finish failed");
}
assert!(!buffer.is_empty());
}
#[cfg(feature = "alloc")]
#[test]
fn test_chunking() {
let config = StreamingConfig::new().with_chunk_size(1024);
let mut encoder = BufferStreamingEncoder::with_config(config);
for i in 0..1000u32 {
encoder.write_item(&i).expect("write failed");
}
let progress = encoder.progress().clone();
let _data = encoder.finish();
assert!(progress.chunks_processed >= 1);
}
}