use std::io::{self, Write};
use crate::Result;
pub trait ExtractToSink<R> {
fn extract_to<W: Write>(&mut self, entry_index: usize, sink: &mut W) -> Result<u64>;
fn extract_to_with_progress<W, F>(
&mut self,
entry_index: usize,
sink: &mut W,
on_progress: F,
) -> Result<u64>
where
W: Write,
F: FnMut(u64, u64); }
#[derive(Debug)]
pub struct BoundedVecSink {
data: Vec<u8>,
max_size: usize,
bytes_written: u64,
}
impl BoundedVecSink {
pub fn new(max_size: usize) -> Self {
Self {
data: Vec::new(),
max_size,
bytes_written: 0,
}
}
pub fn with_capacity(capacity: usize, max_size: usize) -> Self {
Self {
data: Vec::with_capacity(capacity.min(max_size)),
max_size,
bytes_written: 0,
}
}
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn into_vec(self) -> Vec<u8> {
self.data
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
pub fn max_size(&self) -> usize {
self.max_size
}
pub fn remaining(&self) -> usize {
self.max_size.saturating_sub(self.data.len())
}
pub fn clear(&mut self) {
self.data.clear();
self.bytes_written = 0;
}
}
impl Write for BoundedVecSink {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let remaining = self.remaining();
if remaining == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"bounded sink size limit reached",
));
}
let to_write = buf.len().min(remaining);
self.data.extend_from_slice(&buf[..to_write]);
self.bytes_written += to_write as u64;
if to_write < buf.len() {
Err(io::Error::new(
io::ErrorKind::WriteZero,
"bounded sink size limit reached",
))
} else {
Ok(to_write)
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[derive(Debug)]
pub struct Crc32Sink {
hasher: crc32fast::Hasher,
bytes_processed: u64,
}
impl Crc32Sink {
pub fn new() -> Self {
Self {
hasher: crc32fast::Hasher::new(),
bytes_processed: 0,
}
}
pub fn finalize(self) -> u32 {
self.hasher.finalize()
}
pub fn crc(&self) -> u32 {
self.hasher.clone().finalize()
}
pub fn bytes_processed(&self) -> u64 {
self.bytes_processed
}
pub fn reset(&mut self) {
self.hasher = crc32fast::Hasher::new();
self.bytes_processed = 0;
}
}
impl Default for Crc32Sink {
fn default() -> Self {
Self::new()
}
}
impl Write for Crc32Sink {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.hasher.update(buf);
self.bytes_processed += buf.len() as u64;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[derive(Debug, Default)]
pub struct NullSink {
bytes_discarded: u64,
}
impl NullSink {
pub fn new() -> Self {
Self { bytes_discarded: 0 }
}
pub fn bytes_discarded(&self) -> u64 {
self.bytes_discarded
}
pub fn reset(&mut self) {
self.bytes_discarded = 0;
}
}
impl Write for NullSink {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.bytes_discarded += buf.len() as u64;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[derive(Debug)]
pub struct CountingSink<W> {
inner: W,
bytes_written: u64,
}
impl<W: Write> CountingSink<W> {
pub fn new(inner: W) -> Self {
Self {
inner,
bytes_written: 0,
}
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
pub fn get_ref(&self) -> &W {
&self.inner
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner
}
pub fn into_inner(self) -> W {
self.inner
}
pub fn reset_counter(&mut self) {
self.bytes_written = 0;
}
}
impl<W: Write> Write for CountingSink<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.inner.write(buf)?;
self.bytes_written += n as u64;
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
pub struct ProgressSink<W, F> {
inner: W,
callback: F,
bytes_written: u64,
total_bytes: Option<u64>,
bytes_since_callback: u64,
threshold: u64,
}
impl<W: Write, F: FnMut(u64, Option<u64>)> ProgressSink<W, F> {
pub fn new(inner: W, callback: F) -> Self {
Self {
inner,
callback,
bytes_written: 0,
total_bytes: None,
bytes_since_callback: 0,
threshold: 0,
}
}
pub fn with_total(mut self, total: u64) -> Self {
self.total_bytes = Some(total);
self
}
pub fn with_threshold(mut self, threshold: u64) -> Self {
self.threshold = threshold;
self
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
pub fn progress(&self) -> Option<f64> {
self.total_bytes.map(|total| {
if total == 0 {
1.0
} else {
self.bytes_written as f64 / total as f64
}
})
}
pub fn into_inner(self) -> W {
self.inner
}
}
impl<W: Write, F: FnMut(u64, Option<u64>)> Write for ProgressSink<W, F> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.inner.write(buf)?;
self.bytes_written += n as u64;
self.bytes_since_callback += n as u64;
if self.bytes_since_callback >= self.threshold {
(self.callback)(self.bytes_written, self.total_bytes);
self.bytes_since_callback = 0;
}
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
pub struct TeeSink<W1, W2> {
sink1: W1,
sink2: W2,
bytes_written: u64,
}
impl<W1: Write, W2: Write> TeeSink<W1, W2> {
pub fn new(sink1: W1, sink2: W2) -> Self {
Self {
sink1,
sink2,
bytes_written: 0,
}
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
pub fn into_inner(self) -> (W1, W2) {
(self.sink1, self.sink2)
}
}
impl<W1: Write, W2: Write> Write for TeeSink<W1, W2> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n1 = self.sink1.write(buf)?;
self.sink2.write_all(&buf[..n1])?;
self.bytes_written += n1 as u64;
Ok(n1)
}
fn flush(&mut self) -> io::Result<()> {
self.sink1.flush()?;
self.sink2.flush()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_vec_sink() {
let mut sink = BoundedVecSink::new(100);
sink.write_all(&[1, 2, 3, 4, 5]).unwrap();
assert_eq!(sink.data(), &[1, 2, 3, 4, 5]);
assert_eq!(sink.bytes_written(), 5);
assert_eq!(sink.remaining(), 95);
}
#[test]
fn test_bounded_vec_sink_limit() {
let mut sink = BoundedVecSink::new(5);
let result = sink.write_all(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
assert!(result.is_err());
assert_eq!(sink.data().len(), 5);
}
#[test]
fn test_crc32_sink() {
let mut sink = Crc32Sink::new();
sink.write_all(b"hello world").unwrap();
let crc = sink.finalize();
assert_eq!(crc, crc32fast::hash(b"hello world"));
}
#[test]
fn test_null_sink() {
let mut sink = NullSink::new();
sink.write_all(&[0u8; 1000]).unwrap();
assert_eq!(sink.bytes_discarded(), 1000);
}
#[test]
fn test_counting_sink() {
let mut sink = CountingSink::new(Vec::new());
sink.write_all(&[1, 2, 3]).unwrap();
sink.write_all(&[4, 5, 6, 7]).unwrap();
assert_eq!(sink.bytes_written(), 7);
assert_eq!(sink.into_inner(), vec![1, 2, 3, 4, 5, 6, 7]);
}
#[test]
fn test_progress_sink() {
let mut progress_values = Vec::new();
{
let callback = |bytes: u64, _total: Option<u64>| {
progress_values.push(bytes);
};
let mut sink = ProgressSink::new(Vec::new(), callback);
sink.write_all(&[1, 2, 3]).unwrap();
sink.write_all(&[4, 5, 6]).unwrap();
}
assert_eq!(progress_values, vec![3, 6]);
}
#[test]
fn test_tee_sink() {
let mut sink = TeeSink::new(Vec::new(), Vec::new());
sink.write_all(&[1, 2, 3]).unwrap();
let (v1, v2) = sink.into_inner();
assert_eq!(v1, vec![1, 2, 3]);
assert_eq!(v2, vec![1, 2, 3]);
}
#[test]
fn test_crc32_sink_reset() {
let mut sink = Crc32Sink::new();
sink.write_all(b"test").unwrap();
let crc1 = sink.crc();
sink.reset();
assert_eq!(sink.bytes_processed(), 0);
sink.write_all(b"test").unwrap();
let crc2 = sink.finalize();
assert_eq!(crc1, crc2);
}
}