use std::{
io::{Seek, SeekFrom},
ops::Range,
};
use kithara_stream::{PendingReason, SegmentLayout, StreamReadError};
use crate::{
error::{DecodeError, DecodeResult},
traits::{BoxedSource, InputReadOutcome},
};
fn map_stream_err(err: StreamReadError) -> DecodeError {
match err {
StreamReadError::Source(io_err) => DecodeError::from(io_err),
_ => DecodeError::InvalidData(format!("unknown stream read error: {err:?}")),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum FillStatus {
Ready,
Pending(PendingReason),
}
#[derive(Debug)]
pub(crate) struct SegmentReadState {
pub(crate) range: Range<u64>,
pub(crate) buffer: Vec<u8>,
pub(crate) filled: usize,
}
impl SegmentReadState {
pub(crate) fn new(range: Range<u64>) -> Self {
Self {
range,
buffer: Vec::new(),
filled: 0,
}
}
pub(crate) fn total(&self) -> usize {
usize::try_from(self.range.end - self.range.start)
.expect("BUG: segment range fits usize on supported targets")
}
}
#[derive(Clone, Copy)]
pub(crate) enum LiveRange<'a> {
Init(&'a dyn SegmentLayout),
Segment(&'a dyn SegmentLayout, u32),
}
impl<'a> LiveRange<'a> {
fn resolve(self) -> Option<Range<u64>> {
match self {
LiveRange::Init(layout) => {
let range = layout.init_segment_range();
if range.is_empty() { None } else { Some(range) }
}
LiveRange::Segment(layout, idx) => layout.segment_at_index(idx).map(|d| d.byte_range),
}
}
}
pub(crate) fn fill_segment_buffer(
source: &mut BoxedSource,
state: &mut SegmentReadState,
live: LiveRange<'_>,
) -> DecodeResult<FillStatus> {
loop {
refresh_range(state, live);
let total = state.total();
if state.buffer.len() != total {
state.buffer.resize(total, 0);
}
if state.filled >= total {
return Ok(FillStatus::Ready);
}
let abs_offset = state.range.start + state.filled as u64;
source.seek(SeekFrom::Start(abs_offset))?;
if refresh_range(state, live) {
let total_after = state.total();
if state.buffer.len() != total_after {
state.buffer.clear();
state.buffer.resize(total_after, 0);
}
if state.filled >= total_after {
return Ok(FillStatus::Ready);
}
let corrected = state.range.start + state.filled as u64;
source.seek(SeekFrom::Start(corrected))?;
}
match source
.try_read(&mut state.buffer[state.filled..])
.map_err(map_stream_err)?
{
InputReadOutcome::Bytes(n) => state.filled += n.get(),
InputReadOutcome::Pending(reason) => return Ok(FillStatus::Pending(reason)),
InputReadOutcome::Eof => {
if state.filled == state.total() {
return Ok(FillStatus::Ready);
}
if refresh_range(state, live) {
let new_total = state.total();
if state.buffer.len() != new_total {
state.buffer.resize(new_total, 0);
}
if state.filled >= new_total {
return Ok(FillStatus::Ready);
}
}
return Err(DecodeError::InvalidData(format!(
"unexpected EOF before segment buffer filled: {} / {}",
state.filled,
state.total()
)));
}
}
}
}
fn refresh_range(state: &mut SegmentReadState, live: LiveRange<'_>) -> bool {
let Some(new_range) = live.resolve() else {
return false;
};
if new_range == state.range {
return false;
}
let start_changed = new_range.start != state.range.start;
state.range = new_range;
if start_changed {
state.filled = 0;
} else {
let new_total = state.total();
if state.filled > new_total {
state.filled = new_total;
}
}
true
}