use kithara_decode::{DecodeError, Decoder};
use kithara_platform::time::Duration;
use kithara_stream::{MediaInfo, SourcePhase, SourceSeekAnchor};
use crate::pipeline::fetch::Fetch;
pub(crate) enum TrackState {
Decoding,
SeekRequested(SeekRequest),
WaitingForSource {
context: WaitContext,
reason: WaitingReason,
},
ApplyingSeek(ApplySeekState),
RecreatingDecoder(RecreateState),
AwaitingResume(ResumeState),
AtEof,
Failed(TrackFailure),
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub(crate) struct SeekContext {
pub(crate) target: Duration,
pub(crate) epoch: u64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub(crate) struct SeekRequest {
pub(crate) seek: SeekContext,
pub(crate) attempt: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct ApplySeekState {
pub(crate) mode: SeekMode,
pub(crate) request: SeekRequest,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub(crate) struct ResumeState {
pub(crate) anchor_offset: Option<u64>,
pub(crate) skip: Option<Duration>,
pub(crate) seek: SeekContext,
pub(crate) recover_attempts: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum RecreateNext {
Decode,
Seek(SeekRequest),
ApplySeek(SeekRequest),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RecreateState {
pub(crate) media_info: MediaInfo,
pub(crate) cause: RecreateCause,
pub(crate) next: RecreateNext,
pub(crate) offset: u64,
pub(crate) attempt: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RecreateOutcome {
Done,
SoftFailed,
NeedsSourceWait,
}
#[derive(Debug)]
pub(crate) enum WaitContext {
Playback,
Seek(SeekRequest),
ApplySeek(ApplySeekState),
Recreation(RecreateState),
PostSeek(ResumeState),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitingReason {
Waiting,
WaitingDemand,
WaitingMetadata,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SeekMode {
Direct { target_byte: Option<u64> },
Anchor(SourceSeekAnchor),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RecreateCause {
FormatBoundary,
VariantSwitch,
}
#[derive(Debug)]
pub(crate) enum TrackFailure {
Decode(DecodeError),
RecreateFailed { offset: u64 },
SourceCancelled,
}
pub(crate) struct DecoderSession {
pub(crate) decoder: Box<dyn Decoder>,
pub(crate) media_info: Option<MediaInfo>,
pub(crate) base_offset: u64,
pub(crate) installed_at_seek_epoch: u64,
}
pub enum TrackStep<C> {
Produced(Fetch<C>),
Blocked(WaitingReason),
StateChanged,
Eof,
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TrackPhaseTag {
Decoding,
SeekRequested,
WaitingForSource,
ApplyingSeek,
RecreatingDecoder,
AwaitingResume,
AtEof,
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ConsumerPhase {
Buffering,
Playing,
SeekPending { epoch: u64 },
AtEof,
Failed,
}
impl TrackState {
pub(crate) fn is_terminal(&self) -> bool {
matches!(self, Self::Failed(_))
}
}
impl From<&TrackState> for TrackPhaseTag {
#[inline(always)]
fn from(state: &TrackState) -> Self {
match state {
TrackState::Decoding => Self::Decoding,
TrackState::SeekRequested(_) => Self::SeekRequested,
TrackState::WaitingForSource { .. } => Self::WaitingForSource,
TrackState::ApplyingSeek(_) => Self::ApplyingSeek,
TrackState::RecreatingDecoder(_) => Self::RecreatingDecoder,
TrackState::AwaitingResume(_) => Self::AwaitingResume,
TrackState::AtEof => Self::AtEof,
TrackState::Failed(_) => Self::Failed,
}
}
}
impl ConsumerPhase {
pub(crate) fn is_terminal(self) -> bool {
matches!(self, Self::AtEof | Self::Failed)
}
}
pub(crate) fn map_source_phase(phase: SourcePhase) -> Option<WaitingReason> {
match phase {
SourcePhase::Waiting => Some(WaitingReason::Waiting),
SourcePhase::WaitingDemand => Some(WaitingReason::WaitingDemand),
SourcePhase::WaitingMetadata => Some(WaitingReason::WaitingMetadata),
_ => None,
}
}
#[cfg(test)]
mod tests {
use kithara_test_utils::kithara;
use super::*;
#[kithara::test]
fn is_terminal_for_each_state() {
let non_terminal = [
TrackState::Decoding,
TrackState::SeekRequested(SeekRequest {
seek: SeekContext {
epoch: 1,
target: Duration::from_secs(5),
},
..Default::default()
}),
TrackState::WaitingForSource {
context: WaitContext::Playback,
reason: WaitingReason::Waiting,
},
TrackState::ApplyingSeek(ApplySeekState {
mode: SeekMode::Direct { target_byte: None },
request: SeekRequest {
seek: SeekContext {
epoch: 1,
target: Duration::from_secs(5),
},
..Default::default()
},
}),
TrackState::RecreatingDecoder(RecreateState {
attempt: 0,
cause: RecreateCause::FormatBoundary,
media_info: MediaInfo::default(),
next: RecreateNext::Decode,
offset: 0,
}),
TrackState::AwaitingResume(ResumeState {
recover_attempts: 0,
seek: SeekContext {
epoch: 1,
target: Duration::from_secs(5),
},
anchor_offset: None,
skip: None,
}),
TrackState::AtEof,
];
for state in &non_terminal {
assert!(
!state.is_terminal(),
"expected non-terminal for {:?}",
TrackPhaseTag::from(state)
);
}
assert!(TrackState::Failed(TrackFailure::SourceCancelled).is_terminal());
}
#[kithara::test]
fn phase_tag_preserves_discriminant() {
assert_eq!(
TrackPhaseTag::from(&TrackState::Decoding),
TrackPhaseTag::Decoding
);
assert_eq!(
TrackPhaseTag::from(&TrackState::SeekRequested(SeekRequest {
seek: SeekContext {
epoch: 1,
target: Duration::ZERO,
},
..Default::default()
})),
TrackPhaseTag::SeekRequested
);
assert_eq!(
TrackPhaseTag::from(&TrackState::WaitingForSource {
context: WaitContext::Playback,
reason: WaitingReason::WaitingDemand,
}),
TrackPhaseTag::WaitingForSource
);
assert_eq!(
TrackPhaseTag::from(&TrackState::ApplyingSeek(ApplySeekState {
mode: SeekMode::Direct { target_byte: None },
request: SeekRequest {
seek: SeekContext {
epoch: 1,
target: Duration::ZERO,
},
..Default::default()
},
})),
TrackPhaseTag::ApplyingSeek
);
assert_eq!(
TrackPhaseTag::from(&TrackState::RecreatingDecoder(RecreateState {
attempt: 1,
cause: RecreateCause::VariantSwitch,
media_info: MediaInfo::default(),
next: RecreateNext::ApplySeek(SeekRequest {
attempt: 1,
seek: SeekContext {
epoch: 1,
target: Duration::from_secs(10),
},
}),
offset: 100,
})),
TrackPhaseTag::RecreatingDecoder
);
assert_eq!(
TrackPhaseTag::from(&TrackState::AwaitingResume(ResumeState {
recover_attempts: 0,
seek: SeekContext {
epoch: 1,
target: Duration::from_secs(10),
},
anchor_offset: None,
skip: None,
})),
TrackPhaseTag::AwaitingResume
);
assert_eq!(
TrackPhaseTag::from(&TrackState::AtEof),
TrackPhaseTag::AtEof
);
assert_eq!(
TrackPhaseTag::from(&TrackState::Failed(TrackFailure::SourceCancelled)),
TrackPhaseTag::Failed
);
}
#[kithara::test]
fn map_source_phase_table() {
assert_eq!(
map_source_phase(SourcePhase::Waiting),
Some(WaitingReason::Waiting)
);
assert_eq!(
map_source_phase(SourcePhase::WaitingDemand),
Some(WaitingReason::WaitingDemand)
);
assert_eq!(
map_source_phase(SourcePhase::WaitingMetadata),
Some(WaitingReason::WaitingMetadata)
);
assert_eq!(map_source_phase(SourcePhase::Ready), None);
assert_eq!(map_source_phase(SourcePhase::Eof), None);
assert_eq!(map_source_phase(SourcePhase::Seeking), None);
assert_eq!(map_source_phase(SourcePhase::Cancelled), None);
}
#[kithara::test]
fn consumer_phase_terminal() {
assert!(!ConsumerPhase::Buffering.is_terminal());
assert!(!ConsumerPhase::Playing.is_terminal());
assert!(!ConsumerPhase::SeekPending { epoch: 1 }.is_terminal());
assert!(ConsumerPhase::AtEof.is_terminal());
assert!(ConsumerPhase::Failed.is_terminal());
}
#[kithara::test]
fn seek_context_copy_and_eq() {
let ctx = SeekContext {
epoch: 42,
target: Duration::from_millis(500),
};
let copy = ctx;
assert_eq!(ctx, copy);
assert_eq!(copy.epoch, 42);
assert_eq!(copy.target, Duration::from_millis(500));
}
#[kithara::test]
fn at_eof_allows_seek_transition() {
let state = TrackState::AtEof;
assert!(!state.is_terminal());
assert_eq!(TrackPhaseTag::from(&state), TrackPhaseTag::AtEof);
}
}