use super::utils::IndexableBuffer;
pub struct StreamSeeker<'a> {
upstream_buffer: &'a dyn IndexableBuffer,
files_data_end: u64,
max_chunk_size: u64,
buffer_start: u64,
buffer: Vec<u8>,
}
impl<'a> StreamSeeker<'a> {
pub fn new(
upstream_buffer: &'a dyn IndexableBuffer,
files_data_start: u64,
files_data_end: u64,
max_chunk_size: u64,
) -> Self {
Self {
upstream_buffer,
files_data_end,
max_chunk_size,
buffer_start: files_data_start,
buffer: Vec::new(),
}
}
pub fn seek_and_write<W: std::io::Write>(
&mut self,
file_start: u64,
file_length: u64,
read_ahead_bytes: u64,
writer: &mut W,
) -> crate::Result<()> {
assert!(file_start >= self.buffer_start);
let skip = (file_start - self.buffer_start) as usize;
self.buffer = self.buffer[skip..].to_vec();
self.buffer_start = file_start;
loop {
let file_byte_count_remaining =
file_length as i64 - (self.buffer_start - file_start) as i64;
if file_byte_count_remaining <= 0 {
return Ok(());
}
let file_byte_count_remaining = file_byte_count_remaining as u64;
if !self.buffer.is_empty() {
let take = std::cmp::min(file_byte_count_remaining as usize, self.buffer.len());
writer.write_all(&self.buffer[..take])?;
self.buffer = self.buffer[take..].to_vec();
self.buffer_start += take as u64;
} else {
let start_of_fetch = self.buffer_start + self.buffer.len() as u64;
let bytes_left_in_stream = self.files_data_end as i64 - start_of_fetch as i64;
if bytes_left_in_stream <= 0 {
crate::logging::error(&format!(
"StreamSeeker: no bytes left upstream (start_of_fetch={}, files_data_end={})",
start_of_fetch, self.files_data_end
));
return Ok(());
}
let bytes_left_in_stream = bytes_left_in_stream as u64;
let mut fetch_size = std::cmp::min(
self.max_chunk_size,
file_byte_count_remaining + read_ahead_bytes,
);
if fetch_size > bytes_left_in_stream {
crate::logging::error(&format!(
"StreamSeeker: fetch_size ({}) > bytes_left_in_stream ({}); clamping",
fetch_size, bytes_left_in_stream
));
fetch_size = bytes_left_in_stream;
}
let fetched_data = self.upstream_buffer.get_data(start_of_fetch, fetch_size)?;
self.buffer.extend_from_slice(&fetched_data);
}
}
}
}