use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use crate::unified::VolumeProvider;
pub struct MultiVolumeReader {
volume_provider: Arc<dyn VolumeProvider>,
current_reader: Option<Box<dyn Read + Send>>,
current_volume: u32,
total_volumes: Option<u32>,
volume_sizes: Vec<u64>,
position: u64,
total_size: u64,
buffer: Vec<u8>,
}
impl std::fmt::Debug for MultiVolumeReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MultiVolumeReader")
.field("current_volume", &self.current_volume)
.field("total_volumes", &self.total_volumes)
.field("volume_sizes", &self.volume_sizes)
.field("position", &self.position)
.field("total_size", &self.total_size)
.finish()
}
}
impl MultiVolumeReader {
pub fn new(volume_provider: Arc<dyn VolumeProvider>, volume_sizes: Vec<u64>) -> Self {
let total_size = volume_sizes.iter().sum();
let total_volumes = Some(volume_sizes.len() as u32);
Self {
volume_provider,
current_reader: None,
current_volume: 0,
total_volumes,
volume_sizes,
position: 0,
total_size,
buffer: Vec::new(),
}
}
fn volume_for_position(&self, pos: u64) -> Option<(u32, u64)> {
let mut cumulative = 0u64;
for (i, &size) in self.volume_sizes.iter().enumerate() {
if pos < cumulative + size {
return Some((i as u32, pos - cumulative));
}
cumulative += size;
}
None
}
fn open_volume_at(&mut self, volume: u32, offset: u64) -> std::io::Result<()> {
let reader = self
.volume_provider
.open_volume(volume)
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, format!("Cannot open volume {}", volume)))?;
self.current_reader = Some(reader);
self.current_volume = volume;
if offset > 0 {
let mut remaining = offset as usize;
self.buffer.resize(8192.min(remaining), 0);
while remaining > 0 {
let to_read = self.buffer.len().min(remaining);
let reader = self.current_reader.as_mut().unwrap();
let n = reader.read(&mut self.buffer[..to_read])?;
if n == 0 {
return Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Unexpected end of volume while seeking"));
}
remaining -= n;
}
}
Ok(())
}
}
impl Read for MultiVolumeReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if buf.is_empty() || self.position >= self.total_size {
return Ok(0);
}
if self.current_reader.is_none() {
if let Some((vol, offset)) = self.volume_for_position(self.position) {
self.open_volume_at(vol, offset)?;
} else {
return Ok(0);
}
}
let reader = self.current_reader.as_mut().unwrap();
match reader.read(buf) {
Ok(0) => {
let next_volume = self.current_volume + 1;
if let Some(total) = self.total_volumes {
if next_volume >= total {
return Ok(0);
}
}
self.current_reader = None;
self.open_volume_at(next_volume, 0)?;
let reader = self.current_reader.as_mut().unwrap();
let n = reader.read(buf)?;
self.position += n as u64;
Ok(n)
}
Ok(n) => {
self.position += n as u64;
Ok(n)
}
Err(e) => Err(e),
}
}
}
impl Seek for MultiVolumeReader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let new_pos = match pos {
SeekFrom::Start(offset) => offset,
SeekFrom::End(offset) => {
if offset >= 0 {
self.total_size.saturating_add(offset as u64)
} else {
self.total_size.saturating_sub((-offset) as u64)
}
}
SeekFrom::Current(offset) => {
if offset >= 0 {
self.position.saturating_add(offset as u64)
} else {
self.position.saturating_sub((-offset) as u64)
}
}
};
let new_pos = new_pos.min(self.total_size);
if let Some((vol, offset)) = self.volume_for_position(new_pos) {
let need_reopen = self.current_reader.is_none() || vol != self.current_volume || new_pos < self.position;
if need_reopen {
self.current_reader = None;
self.open_volume_at(vol, offset)?;
} else {
let bytes_to_skip = new_pos - self.position;
if bytes_to_skip > 0 {
let mut remaining = bytes_to_skip as usize;
self.buffer.resize(8192.min(remaining), 0);
while remaining > 0 {
let to_read = self.buffer.len().min(remaining);
let reader = self.current_reader.as_mut().unwrap();
let n = reader.read(&mut self.buffer[..to_read])?;
if n == 0 {
break;
}
remaining -= n;
}
}
}
self.position = new_pos;
} else if new_pos >= self.total_size {
self.position = self.total_size;
self.current_reader = None;
}
Ok(self.position)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
struct TestVolumeProvider {
volumes: Vec<Vec<u8>>,
}
impl VolumeProvider for TestVolumeProvider {
fn open_volume(&self, volume_number: u32) -> Option<Box<dyn Read + Send>> {
self.volumes
.get(volume_number as usize)
.map(|data| Box::new(Cursor::new(data.clone())) as Box<dyn Read + Send>)
}
}
#[test]
fn test_multi_volume_read_sequential() {
let provider = TestVolumeProvider {
volumes: vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8], vec![9, 10]],
};
let sizes = vec![4, 4, 2];
let mut reader = MultiVolumeReader::new(Arc::new(provider), sizes);
let mut buf = vec![0u8; 10];
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 4);
assert_eq!(&buf[..4], &[1, 2, 3, 4]);
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 4);
assert_eq!(&buf[..4], &[5, 6, 7, 8]);
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 2);
assert_eq!(&buf[..2], &[9, 10]);
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 0);
}
#[test]
fn test_multi_volume_seek() {
let provider = TestVolumeProvider {
volumes: vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8], vec![9, 10]],
};
let sizes = vec![4, 4, 2];
let mut reader = MultiVolumeReader::new(Arc::new(provider), sizes);
reader.seek(SeekFrom::Start(5)).unwrap();
let mut buf = [0u8; 1];
reader.read_exact(&mut buf).unwrap();
assert_eq!(buf[0], 6);
reader.seek(SeekFrom::Start(8)).unwrap();
reader.read_exact(&mut buf).unwrap();
assert_eq!(buf[0], 9);
reader.seek(SeekFrom::End(-2)).unwrap();
reader.read_exact(&mut buf).unwrap();
assert_eq!(buf[0], 9);
}
}