use std::fs;
use std::io::Read;
use std::path::Path;
use crate::{ChdCompressor, ChdDataHandler, ChdError, CompressStep, CompressionProgress, Result};
pub(crate) struct StreamingSource<R: Read> {
reader: R,
cursor: u64,
total: u64,
}
impl<R: Read> StreamingSource<R> {
pub fn new(reader: R, total: u64) -> Self {
Self {
reader,
cursor: 0,
total,
}
}
}
impl<R: Read> ChdDataHandler for StreamingSource<R> {
fn read_data(&mut self, dest: &mut [u8], offset: u64) -> u32 {
let length = dest.len();
assert_eq!(
offset, self.cursor,
"StreamingSource: out-of-order read (cursor {}, requested {})",
self.cursor, offset
);
debug_assert!(
offset.saturating_add(length as u64) <= self.total,
"compressor requested {} bytes at {} past logical end {}",
length,
offset,
self.total
);
let mut filled = 0usize;
while filled < length {
match self.reader.read(&mut dest[filled..]) {
Ok(0) => break,
Ok(n) => filled += n,
Err(_) => break,
}
}
for b in &mut dest[filled..] {
*b = 0;
}
self.cursor = offset + length as u64;
length as u32
}
}
pub(crate) fn run_compression(
mut compressor: ChdCompressor,
output_path: &Path,
progress: &mut dyn FnMut(CompressionProgress),
cancel: &dyn Fn() -> bool,
) -> Result<()> {
compressor.compress_begin();
let mut cancelled = false;
let mut compressor_error: Option<ChdError> = None;
loop {
if !cancelled && cancel() {
cancelled = true;
}
match compressor.compress_continue() {
Ok(CompressStep::Continue(p)) => {
if !cancelled {
progress(p);
}
}
Ok(CompressStep::Done(p)) => {
if !cancelled {
progress(p);
}
break;
}
Err(e) => {
compressor_error = Some(e);
break;
}
}
}
drop(compressor);
if let Some(e) = compressor_error {
let _ = fs::remove_file(output_path);
return Err(e);
}
if cancelled {
let _ = fs::remove_file(output_path);
return Err(ChdError::Cancelled);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn zero_pads_past_eof() {
let src = vec![0xAAu8; 100];
let mut s = StreamingSource::new(Cursor::new(src), 256);
let mut buf = vec![0xFFu8; 256];
let n = s.read_data(&mut buf, 0);
assert_eq!(n, 256);
assert!(buf[..100].iter().all(|&b| b == 0xAA));
assert!(buf[100..].iter().all(|&b| b == 0));
}
#[test]
fn sequential_reads_advance() {
let src: Vec<u8> = (0..200u32).map(|i| i as u8).collect();
let mut s = StreamingSource::new(Cursor::new(src.clone()), 200);
let mut a = vec![0u8; 100];
s.read_data(&mut a, 0);
let mut b = vec![0u8; 100];
s.read_data(&mut b, 100);
let mut combined = a;
combined.extend(b);
assert_eq!(combined, src);
}
#[test]
#[should_panic(expected = "out-of-order")]
fn rejects_non_monotonic_offset() {
let mut s = StreamingSource::new(Cursor::new(vec![0u8; 100]), 100);
let mut buf = [0u8; 50];
s.read_data(&mut buf, 0);
s.read_data(&mut buf, 0);
}
#[test]
fn handles_short_reads_from_underlying_reader() {
struct Drip {
data: Vec<u8>,
pos: usize,
}
impl std::io::Read for Drip {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.pos >= self.data.len() || buf.is_empty() {
return Ok(0);
}
buf[0] = self.data[self.pos];
self.pos += 1;
Ok(1)
}
}
let src: Vec<u8> = (0..32u8).collect();
let mut s = StreamingSource::new(
Drip {
data: src.clone(),
pos: 0,
},
32,
);
let mut buf = vec![0u8; 32];
s.read_data(&mut buf, 0);
assert_eq!(buf, src);
}
}