use std::sync::Arc;
use kithara_decode::PcmChunk;
use kithara_platform::tokio::sync::Notify;
use tracing::trace;
use super::{AudioWorkerSource, handle::TrackRegistration, types::ServiceClass};
use crate::{
pipeline::{fetch::Fetch, track_fsm::TrackStep},
runtime::{AtomicServiceClass, Inlet, Node, Outlet, TickResult},
};
#[derive(Default)]
#[non_exhaustive]
pub(crate) struct DecoderRuntime {
pub(crate) eof_sent: bool,
pub(crate) preloaded: bool,
pub(crate) seek_epoch: u64,
pub(crate) chunks_sent: usize,
}
pub(crate) struct DecoderNode {
preload_notify: Arc<Notify>,
source: Box<dyn AudioWorkerSource<Chunk = PcmChunk>>,
runtime: DecoderRuntime,
outlet: Outlet<Fetch<PcmChunk>>,
trash_inlet: Inlet<PcmChunk>,
service_class: Arc<AtomicServiceClass>,
preload_chunks: usize,
}
impl DecoderNode {
fn complete_preload(&mut self) {
if !self.runtime.preloaded {
self.preload_notify.notify_one();
self.runtime.preloaded = true;
}
}
fn mark_preload_progress(&mut self) {
if self.runtime.preloaded {
return;
}
self.runtime.chunks_sent += 1;
if self.runtime.chunks_sent >= self.preload_chunks && !self.outlet.has_pending() {
self.complete_preload();
}
}
fn drain_trash(&mut self) {
while self.trash_inlet.try_pop().is_some() {}
}
fn sync_seek_epoch(&mut self) {
if !self.source.timeline().did_take_decoder_node_seek() {
return;
}
let current = self.source.timeline().seek_epoch();
if current == self.runtime.seek_epoch {
return;
}
let _ = self.outlet.take_pending();
self.runtime = DecoderRuntime {
seek_epoch: current,
..Default::default()
};
}
}
impl From<TrackRegistration> for DecoderNode {
fn from(reg: TrackRegistration) -> Self {
let seek_epoch = reg.source.timeline().seek_epoch();
Self {
source: reg.source,
outlet: reg.outlet,
trash_inlet: reg.trash_inlet,
service_class: reg.service_class,
preload_notify: reg.preload_notify,
preload_chunks: reg.preload_chunks,
runtime: DecoderRuntime {
seek_epoch,
..Default::default()
},
}
}
}
impl Node for DecoderNode {
fn on_cancel(&mut self) {
self.complete_preload();
}
fn service_class(&self) -> ServiceClass {
self.service_class.load()
}
fn tick(&mut self) -> TickResult {
self.drain_trash();
self.sync_seek_epoch();
if !self.outlet.flush() {
return TickResult::Backpressured;
}
if self.runtime.chunks_sent >= self.preload_chunks && !self.runtime.preloaded {
self.complete_preload();
}
match self.source.step_track() {
TrackStep::Produced(fetch) => {
self.runtime.eof_sent = false;
let _ = self.outlet.try_push(fetch);
self.mark_preload_progress();
TickResult::Progress
}
TrackStep::StateChanged => {
self.runtime.eof_sent = false;
TickResult::Progress
}
TrackStep::Blocked(reason) => {
trace!(?reason, "track blocked");
TickResult::Waiting
}
TrackStep::Eof if self.runtime.eof_sent => TickResult::Backpressured,
TrackStep::Eof => {
let epoch = self.source.timeline().seek_epoch();
let marker = Fetch::new(PcmChunk::default(), true, epoch);
if let Ok(()) = self.outlet.try_push(marker) {
self.complete_preload();
self.runtime.eof_sent = true;
TickResult::Progress
} else {
debug_assert!(false, "EOF marker rejected — overflow invariant violated");
TickResult::Waiting
}
}
TrackStep::Failed => {
let epoch = self.source.timeline().seek_epoch();
let marker = Fetch::failure(PcmChunk::default(), epoch);
if let Ok(()) = self.outlet.try_push(marker) {
self.complete_preload();
if self.outlet.has_pending() {
TickResult::Progress
} else {
TickResult::Done
}
} else {
debug_assert!(
false,
"Failed marker rejected — overflow invariant violated"
);
TickResult::Waiting
}
}
}
}
}
#[cfg(test)]
mod tests {
use kithara_stream::Timeline;
use kithara_test_utils::kithara;
use unimock::{MockFn, Unimock, matching};
use super::*;
use crate::{
runtime::{Inlet, Outlet, connect},
worker::MockAudioWorkerSource,
};
fn test_node(
source: Box<dyn AudioWorkerSource<Chunk = PcmChunk>>,
outlet: Outlet<Fetch<PcmChunk>>,
preload_notify: Arc<Notify>,
) -> DecoderNode {
let (_trash_outlet, trash_inlet) = connect::<PcmChunk>(4, None);
DecoderNode {
source,
outlet,
trash_inlet,
preload_notify,
service_class: Arc::new(AtomicServiceClass::new(ServiceClass::default())),
preload_chunks: 1,
runtime: DecoderRuntime::default(),
}
}
#[kithara::test]
fn decoder_node_eof_under_backpressure() {
let notify = Arc::new(Notify::new());
let (mut outlet, _inlet) = connect::<Fetch<PcmChunk>>(1, None);
outlet
.try_push(Fetch::new(PcmChunk::default(), false, 0))
.unwrap();
outlet
.try_push(Fetch::new(PcmChunk::default(), false, 0))
.unwrap();
assert!(outlet.has_pending());
let timeline = Timeline::new();
let source = Box::new(Unimock::new((
MockAudioWorkerSource::step_track
.next_call(matching!())
.returns(TrackStep::Eof),
MockAudioWorkerSource::timeline.stub(|each| {
each.call(matching!()).returns(timeline.clone());
}),
)));
let mut node = test_node(source, outlet, notify);
assert_eq!(node.tick(), TickResult::Backpressured);
assert!(!node.runtime.eof_sent);
let _ = node.outlet.take_pending();
assert_eq!(node.tick(), TickResult::Progress);
assert!(node.runtime.eof_sent);
assert!(node.outlet.has_pending());
}
#[kithara::test]
fn decoder_node_distinguishes_failed_from_eof_on_the_wire() {
use std::fmt::Debug;
use crate::pipeline::fetch::FetchKind;
fn drain_marker_kind<T: Debug>(
outlet: &mut Outlet<Fetch<T>>,
inlet: &mut Inlet<Fetch<T>>,
) -> FetchKind {
outlet.flush();
inlet
.try_pop()
.expect("producer pushed a terminal marker")
.kind
}
let notify = Arc::new(Notify::new());
let (eof_outlet, mut eof_inlet) = connect::<Fetch<PcmChunk>>(1, None);
let eof_timeline = Timeline::new();
let eof_source = Box::new(Unimock::new((
MockAudioWorkerSource::step_track
.next_call(matching!())
.returns(TrackStep::Eof),
MockAudioWorkerSource::timeline.stub(|each| {
each.call(matching!()).returns(eof_timeline.clone());
}),
)));
let mut eof_node = test_node(eof_source, eof_outlet, Arc::clone(¬ify));
assert_eq!(eof_node.tick(), TickResult::Progress);
let eof_kind = drain_marker_kind(&mut eof_node.outlet, &mut eof_inlet);
let (failed_outlet, mut failed_inlet) = connect::<Fetch<PcmChunk>>(1, None);
let failed_timeline = Timeline::new();
let failed_source = Box::new(Unimock::new((
MockAudioWorkerSource::step_track
.next_call(matching!())
.returns(TrackStep::Failed),
MockAudioWorkerSource::timeline.stub(|each| {
each.call(matching!()).returns(failed_timeline.clone());
}),
)));
let mut failed_node = test_node(failed_source, failed_outlet, notify);
let _ = failed_node.tick();
let failed_kind = drain_marker_kind(&mut failed_node.outlet, &mut failed_inlet);
assert_ne!(
eof_kind, failed_kind,
"TrackStep::Eof and TrackStep::Failed must not collapse into \
the same wire marker — the consumer has to distinguish \
natural end-of-clip from a transient decoder/source failure, \
otherwise a post-seek failure cascades into PlayerTrack::\
Finished and empties the track arena mid-clip"
);
}
#[kithara::test]
fn decoder_node_preload_notify_waits_for_ring() {
let notify = Arc::new(Notify::new());
let (mut outlet, mut inlet) = connect::<Fetch<PcmChunk>>(1, None);
outlet
.try_push(Fetch::new(PcmChunk::default(), false, 0))
.unwrap();
let timeline = Timeline::new();
let source = Box::new(Unimock::new((
MockAudioWorkerSource::step_track
.next_call(matching!())
.returns(TrackStep::Produced(Fetch::new(
PcmChunk::default(),
false,
0,
))),
MockAudioWorkerSource::step_track
.next_call(matching!())
.returns(TrackStep::Blocked(
crate::pipeline::track_fsm::WaitingReason::Waiting,
)),
MockAudioWorkerSource::timeline.stub(|each| {
each.call(matching!()).returns(timeline.clone());
}),
)));
let mut node = test_node(source, outlet, notify.clone());
assert_eq!(node.tick(), TickResult::Progress);
assert_eq!(node.runtime.chunks_sent, 1);
assert!(!node.runtime.preloaded);
assert_eq!(node.tick(), TickResult::Backpressured);
assert!(!node.runtime.preloaded);
let _ = inlet.try_pop();
assert_eq!(node.tick(), TickResult::Waiting);
assert!(node.runtime.preloaded);
}
}