Skip to main content

kithara_audio/pipeline/
track_fsm.rs

1use kithara_decode::{DecodeError, Decoder};
2use kithara_platform::time::Duration;
3use kithara_stream::{MediaInfo, SourcePhase, SourceSeekAnchor};
4
5use crate::pipeline::fetch::Fetch;
6
7/// Explicit state machine for a single audio track in the worker thread.
8///
9/// Each variant carries exactly the context needed for that phase.
10/// Transitions happen inside `step_track()` — one transition per call.
11pub(crate) enum TrackState {
12    /// Normal decoding — produce PCM chunks.
13    Decoding,
14
15    /// Consumer requested a seek; not yet applied.
16    SeekRequested(SeekRequest),
17
18    /// Waiting for the underlying source to become ready.
19    WaitingForSource {
20        context: WaitContext,
21        reason: WaitingReason,
22    },
23
24    /// Actively applying a seek to the decoder.
25    ApplyingSeek(ApplySeekState),
26
27    /// Recreating the decoder (format boundary, codec change, seek recovery).
28    RecreatingDecoder(RecreateState),
29
30    /// Decoder recreated / seek applied; waiting for first valid chunk.
31    AwaitingResume(ResumeState),
32
33    /// End of stream reached.
34    AtEof,
35
36    /// Terminal failure.
37    Failed(TrackFailure),
38}
39
40/// Context for a pending seek, carried through multiple states.
41#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
42pub(crate) struct SeekContext {
43    pub(crate) target: Duration,
44    pub(crate) epoch: u64,
45}
46
47/// Stateful seek request tracked across retries and waits.
48#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
49pub(crate) struct SeekRequest {
50    pub(crate) seek: SeekContext,
51    pub(crate) attempt: u8,
52}
53
54/// Seek application mode resolved before touching the decoder.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub(crate) struct ApplySeekState {
57    pub(crate) mode: SeekMode,
58    pub(crate) request: SeekRequest,
59}
60
61/// Resume state after a seek has been applied to the decoder.
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63pub(crate) struct ResumeState {
64    /// Anchor byte offset from the seek — used for readiness checks and demand
65    /// when the decoder's stream position differs from the `StreamIndex` layout.
66    pub(crate) anchor_offset: Option<u64>,
67    pub(crate) skip: Option<Duration>,
68    pub(crate) seek: SeekContext,
69    pub(crate) recover_attempts: u8,
70}
71
72/// What to do once decoder recreation succeeds.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub(crate) enum RecreateNext {
75    /// Continue plain decoding from the new decoder.
76    Decode,
77    /// Re-run seek resolution on the recreated decoder.
78    Seek(SeekRequest),
79    /// Finish seek application by seeking the recreated decoder.
80    ApplySeek(SeekRequest),
81}
82
83/// Decoder recreation task tracked by the FSM.
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub(crate) struct RecreateState {
86    pub(crate) media_info: MediaInfo,
87    pub(crate) cause: RecreateCause,
88    pub(crate) next: RecreateNext,
89    pub(crate) offset: u64,
90    pub(crate) attempt: u8,
91}
92
93/// Outcome of one `execute_recreation` call.
94///
95/// `NeedsSourceWait` exists for the post-VariantChange WAV-ABR / fMP4
96/// case where the decoder factory's probe reads `[0..PROBE)` of the
97/// freshly-switched variant *before* the HLS scheduler has buffered
98/// those bytes — the probe surfaces an `ErrorClass::Interrupted`
99/// (`StreamPending(WaitBudgetExhausted)`). Treating that as a hard
100/// `RecreateFailed` deadlocks the audio worker (Cluster C/D/E/F in
101/// `pure-dancing-porcupine.md`, Wave 2.A); routing back through
102/// `wait_for_source_on_recreate` lets the source catch up.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub(crate) enum RecreateOutcome {
105    Done,
106    SoftFailed,
107    NeedsSourceWait,
108}
109
110/// What caused us to enter `WaitingForSource`.
111#[derive(Debug)]
112pub(crate) enum WaitContext {
113    /// Starvation during normal playback.
114    Playback,
115    /// Seek-initiated wait (source not ready for seek).
116    Seek(SeekRequest),
117    /// Anchor/direct seek resolved, waiting for source bytes before `decoder.seek()`.
118    ApplySeek(ApplySeekState),
119    /// Init bytes unavailable for decoder recreation.
120    Recreation(RecreateState),
121    /// `decoder.seek()` already succeeded and the FSM was in
122    /// `AwaitingResume` when the source stopped producing chunks.
123    /// Carries the `ResumeState` so the wait loop can demand the
124    /// anchor byte (instead of the stale pre-seek read head) and
125    /// then transition back to `AwaitingResume` once data arrives.
126    PostSeek(ResumeState),
127}
128
129/// Why the source is not ready, mirroring relevant `SourcePhase` variants.
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum WaitingReason {
132    /// Generic wait — data not yet available.
133    Waiting,
134    /// On-demand request already in flight.
135    WaitingDemand,
136    /// Metadata lookup in progress.
137    WaitingMetadata,
138}
139
140/// How the seek should be applied.
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub(crate) enum SeekMode {
143    /// Direct decoder seek (no anchor). When `target_byte` is `Some`, the FSM
144    /// gates the readiness check on that byte range so `decoder.seek()` only
145    /// runs once the source can answer the read the decoder is about to
146    /// issue. `None` keeps the historical "check current read head" gate for
147    /// callers that can't estimate the target byte.
148    Direct { target_byte: Option<u64> },
149    /// Anchor-based seek with segment alignment.
150    Anchor(SourceSeekAnchor),
151}
152
153/// Why the decoder needs to be recreated.
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub(crate) enum RecreateCause {
156    /// Codec boundary detected during playback.
157    FormatBoundary,
158    /// ABR switch changed the codec or variant.
159    VariantSwitch,
160}
161
162/// Terminal failure reasons.
163#[derive(Debug)]
164pub(crate) enum TrackFailure {
165    /// Decoder produced an error.
166    Decode(DecodeError),
167    /// Decoder recreation failed.
168    RecreateFailed { offset: u64 },
169    /// Source was cancelled.
170    SourceCancelled,
171}
172
173/// Holds the decoder and its associated metadata as an atomic unit.
174///
175/// Created whole — never partially mutated. On recreation failure
176/// the old session remains untouched.
177pub(crate) struct DecoderSession {
178    pub(crate) decoder: Box<dyn Decoder>,
179    pub(crate) media_info: Option<MediaInfo>,
180    pub(crate) base_offset: u64,
181    /// Seek epoch at which this session was installed. Used by
182    /// `detect_format_change` to suppress a redundant recreate when
183    /// the in-flight seek epoch still matches the epoch that produced
184    /// the session — the decoder is already aligned with the seek's
185    /// landing variant.
186    pub(crate) installed_at_seek_epoch: u64,
187}
188
189/// Result of a single `step_track()` call.
190pub enum TrackStep<C> {
191    /// Produced a chunk ready for the consumer.
192    Produced(Fetch<C>),
193    /// Source is not ready — cannot make progress.
194    Blocked(WaitingReason),
195    /// Internal state changed — caller should call `step_track()` again.
196    StateChanged,
197    /// End of stream.
198    Eof,
199    /// Terminal failure — details available via `TrackState::Failed`.
200    Failed,
201}
202
203/// Fieldless discriminant of [`TrackState`] for external phase queries.
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
205pub enum TrackPhaseTag {
206    Decoding,
207    SeekRequested,
208    WaitingForSource,
209    ApplyingSeek,
210    RecreatingDecoder,
211    AwaitingResume,
212    AtEof,
213    Failed,
214}
215
216/// Consumer-side phase for `Audio<S>`.
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub(crate) enum ConsumerPhase {
219    /// Initial state — waiting for first chunk.
220    Buffering,
221    /// Normal playback.
222    Playing,
223    /// Seek in progress — waiting for chunks with matching epoch.
224    SeekPending { epoch: u64 },
225    /// End of stream reached.
226    AtEof,
227    /// Unrecoverable failure.
228    Failed,
229}
230
231impl TrackState {
232    /// Returns `true` for terminal states that will never transition.
233    ///
234    /// `AtEof` is NOT terminal — seek-after-EOF is a valid transition.
235    /// Only `Failed` is truly terminal (track will be removed).
236    pub(crate) fn is_terminal(&self) -> bool {
237        matches!(self, Self::Failed(_))
238    }
239}
240
241impl From<&TrackState> for TrackPhaseTag {
242    #[inline(always)]
243    fn from(state: &TrackState) -> Self {
244        match state {
245            TrackState::Decoding => Self::Decoding,
246            TrackState::SeekRequested(_) => Self::SeekRequested,
247            TrackState::WaitingForSource { .. } => Self::WaitingForSource,
248            TrackState::ApplyingSeek(_) => Self::ApplyingSeek,
249            TrackState::RecreatingDecoder(_) => Self::RecreatingDecoder,
250            TrackState::AwaitingResume(_) => Self::AwaitingResume,
251            TrackState::AtEof => Self::AtEof,
252            TrackState::Failed(_) => Self::Failed,
253        }
254    }
255}
256
257impl ConsumerPhase {
258    /// Returns `true` for terminal states.
259    pub(crate) fn is_terminal(self) -> bool {
260        matches!(self, Self::AtEof | Self::Failed)
261    }
262}
263
264/// Map a `SourcePhase` to an optional `WaitingReason`.
265///
266/// Returns `Some(reason)` for wait states (`Waiting`, `WaitingDemand`,
267/// `WaitingMetadata`). Returns `None` for non-wait states (`Ready`, `Eof`,
268/// `Seeking`, `Cancelled`) — callers handle those separately.
269pub(crate) fn map_source_phase(phase: SourcePhase) -> Option<WaitingReason> {
270    match phase {
271        SourcePhase::Waiting => Some(WaitingReason::Waiting),
272        SourcePhase::WaitingDemand => Some(WaitingReason::WaitingDemand),
273        SourcePhase::WaitingMetadata => Some(WaitingReason::WaitingMetadata),
274        _ => None,
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use kithara_test_utils::kithara;
281
282    use super::*;
283
284    #[kithara::test]
285    fn is_terminal_for_each_state() {
286        let non_terminal = [
287            TrackState::Decoding,
288            TrackState::SeekRequested(SeekRequest {
289                seek: SeekContext {
290                    epoch: 1,
291                    target: Duration::from_secs(5),
292                },
293                ..Default::default()
294            }),
295            TrackState::WaitingForSource {
296                context: WaitContext::Playback,
297                reason: WaitingReason::Waiting,
298            },
299            TrackState::ApplyingSeek(ApplySeekState {
300                mode: SeekMode::Direct { target_byte: None },
301                request: SeekRequest {
302                    seek: SeekContext {
303                        epoch: 1,
304                        target: Duration::from_secs(5),
305                    },
306                    ..Default::default()
307                },
308            }),
309            TrackState::RecreatingDecoder(RecreateState {
310                attempt: 0,
311                cause: RecreateCause::FormatBoundary,
312                media_info: MediaInfo::default(),
313                next: RecreateNext::Decode,
314                offset: 0,
315            }),
316            TrackState::AwaitingResume(ResumeState {
317                recover_attempts: 0,
318                seek: SeekContext {
319                    epoch: 1,
320                    target: Duration::from_secs(5),
321                },
322                anchor_offset: None,
323                skip: None,
324            }),
325            TrackState::AtEof,
326        ];
327        for state in &non_terminal {
328            assert!(
329                !state.is_terminal(),
330                "expected non-terminal for {:?}",
331                TrackPhaseTag::from(state)
332            );
333        }
334
335        assert!(TrackState::Failed(TrackFailure::SourceCancelled).is_terminal());
336    }
337
338    #[kithara::test]
339    fn phase_tag_preserves_discriminant() {
340        assert_eq!(
341            TrackPhaseTag::from(&TrackState::Decoding),
342            TrackPhaseTag::Decoding
343        );
344        assert_eq!(
345            TrackPhaseTag::from(&TrackState::SeekRequested(SeekRequest {
346                seek: SeekContext {
347                    epoch: 1,
348                    target: Duration::ZERO,
349                },
350                ..Default::default()
351            })),
352            TrackPhaseTag::SeekRequested
353        );
354        assert_eq!(
355            TrackPhaseTag::from(&TrackState::WaitingForSource {
356                context: WaitContext::Playback,
357                reason: WaitingReason::WaitingDemand,
358            }),
359            TrackPhaseTag::WaitingForSource
360        );
361        assert_eq!(
362            TrackPhaseTag::from(&TrackState::ApplyingSeek(ApplySeekState {
363                mode: SeekMode::Direct { target_byte: None },
364                request: SeekRequest {
365                    seek: SeekContext {
366                        epoch: 1,
367                        target: Duration::ZERO,
368                    },
369                    ..Default::default()
370                },
371            })),
372            TrackPhaseTag::ApplyingSeek
373        );
374        assert_eq!(
375            TrackPhaseTag::from(&TrackState::RecreatingDecoder(RecreateState {
376                attempt: 1,
377                cause: RecreateCause::VariantSwitch,
378                media_info: MediaInfo::default(),
379                next: RecreateNext::ApplySeek(SeekRequest {
380                    attempt: 1,
381                    seek: SeekContext {
382                        epoch: 1,
383                        target: Duration::from_secs(10),
384                    },
385                }),
386                offset: 100,
387            })),
388            TrackPhaseTag::RecreatingDecoder
389        );
390        assert_eq!(
391            TrackPhaseTag::from(&TrackState::AwaitingResume(ResumeState {
392                recover_attempts: 0,
393                seek: SeekContext {
394                    epoch: 1,
395                    target: Duration::from_secs(10),
396                },
397                anchor_offset: None,
398                skip: None,
399            })),
400            TrackPhaseTag::AwaitingResume
401        );
402        assert_eq!(
403            TrackPhaseTag::from(&TrackState::AtEof),
404            TrackPhaseTag::AtEof
405        );
406        assert_eq!(
407            TrackPhaseTag::from(&TrackState::Failed(TrackFailure::SourceCancelled)),
408            TrackPhaseTag::Failed
409        );
410    }
411
412    #[kithara::test]
413    fn map_source_phase_table() {
414        assert_eq!(
415            map_source_phase(SourcePhase::Waiting),
416            Some(WaitingReason::Waiting)
417        );
418        assert_eq!(
419            map_source_phase(SourcePhase::WaitingDemand),
420            Some(WaitingReason::WaitingDemand)
421        );
422        assert_eq!(
423            map_source_phase(SourcePhase::WaitingMetadata),
424            Some(WaitingReason::WaitingMetadata)
425        );
426
427        assert_eq!(map_source_phase(SourcePhase::Ready), None);
428        assert_eq!(map_source_phase(SourcePhase::Eof), None);
429        assert_eq!(map_source_phase(SourcePhase::Seeking), None);
430        assert_eq!(map_source_phase(SourcePhase::Cancelled), None);
431    }
432
433    #[kithara::test]
434    fn consumer_phase_terminal() {
435        assert!(!ConsumerPhase::Buffering.is_terminal());
436        assert!(!ConsumerPhase::Playing.is_terminal());
437        assert!(!ConsumerPhase::SeekPending { epoch: 1 }.is_terminal());
438        assert!(ConsumerPhase::AtEof.is_terminal());
439        assert!(ConsumerPhase::Failed.is_terminal());
440    }
441
442    #[kithara::test]
443    fn seek_context_copy_and_eq() {
444        let ctx = SeekContext {
445            epoch: 42,
446            target: Duration::from_millis(500),
447        };
448        let copy = ctx;
449        assert_eq!(ctx, copy);
450        assert_eq!(copy.epoch, 42);
451        assert_eq!(copy.target, Duration::from_millis(500));
452    }
453
454    #[kithara::test]
455    fn at_eof_allows_seek_transition() {
456        let state = TrackState::AtEof;
457        assert!(!state.is_terminal());
458        assert_eq!(TrackPhaseTag::from(&state), TrackPhaseTag::AtEof);
459    }
460}