kithara_audio/runtime/node.rs
1use std::sync::atomic::{AtomicU8, Ordering};
2
3/// Priority class for worker scheduling.
4///
5/// Nodes with higher service class are served first when the scheduler
6/// selects which node to process next.
7#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Ord, PartialOrd)]
8pub enum ServiceClass {
9 /// Not playing, not needed soon. Lowest priority.
10 #[default]
11 Idle,
12 /// Preloading or about to play. Medium priority.
13 Warm,
14 /// Currently audible. Highest priority.
15 Audible,
16}
17
18impl From<ServiceClass> for u8 {
19 fn from(class: ServiceClass) -> Self {
20 match class {
21 ServiceClass::Idle => 0,
22 ServiceClass::Warm => 1,
23 ServiceClass::Audible => 2,
24 }
25 }
26}
27
28impl From<u8> for ServiceClass {
29 fn from(value: u8) -> Self {
30 match value {
31 2 => Self::Audible,
32 1 => Self::Warm,
33 _ => Self::Idle,
34 }
35 }
36}
37
38/// Lock-free shared `ServiceClass`, written wait-free by the real-time
39/// consumer (`Audio::set_service_class` during fade transitions) and read
40/// by the worker scheduler each pass. Avoids the scheduler command channel
41/// — and its periodic allocation — on the real-time audio thread.
42pub(crate) struct AtomicServiceClass(AtomicU8);
43
44impl AtomicServiceClass {
45 pub(crate) fn new(class: ServiceClass) -> Self {
46 Self(AtomicU8::new(class.into()))
47 }
48
49 pub(crate) fn load(&self) -> ServiceClass {
50 self.0.load(Ordering::Relaxed).into()
51 }
52
53 pub(crate) fn store(&self, class: ServiceClass) {
54 self.0.store(class.into(), Ordering::Relaxed);
55 }
56}
57
58/// Result of a single node tick.
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
60pub(crate) enum TickResult {
61 /// Node made progress (produced or consumed data, applied internal state change).
62 Progress,
63 /// Node is alive but waiting on an upstream source (`step_track()`
64 /// returned `Blocked`). The scheduler interprets this as
65 /// "progress is *expected* but not happening" — the hang watchdog
66 /// ticks here so a forever-blocked source surfaces as a panic
67 /// instead of an indefinite park.
68 Waiting,
69 /// Node is alive but its downstream consumer is not pulling
70 /// (PCM ring full / outlet overflow). The scheduler treats this
71 /// as a paused/idle player — progress is *not expected* until the
72 /// consumer drains the ring, so the hang watchdog must NOT tick.
73 /// Distinguishing this from `Waiting` is what keeps an idle
74 /// `Audio` handle from panicking after the watchdog budget
75 /// expires (the symptom that prompted bug #1).
76 Backpressured,
77 /// Node has finished its work (EOF, failed, terminal).
78 Done,
79}
80
81/// A component that can be executed by the scheduler.
82pub(crate) trait Node: Send + 'static {
83 /// Called when the scheduler is cancelled or the node is unregistered.
84 fn on_cancel(&mut self) {}
85
86 /// Return the current service class (priority) of this node.
87 fn service_class(&self) -> ServiceClass {
88 ServiceClass::Audible
89 }
90
91 /// Perform one quantum of work.
92 fn tick(&mut self) -> TickResult;
93}
94
95impl Node for Box<dyn Node> {
96 fn on_cancel(&mut self) {
97 (**self).on_cancel();
98 }
99
100 fn service_class(&self) -> ServiceClass {
101 (**self).service_class()
102 }
103
104 fn tick(&mut self) -> TickResult {
105 (**self).tick()
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use kithara_test_utils::kithara;
112
113 use super::*;
114
115 #[kithara::test]
116 fn service_class_ordering() {
117 assert!(ServiceClass::Idle < ServiceClass::Warm);
118 assert!(ServiceClass::Warm < ServiceClass::Audible);
119 }
120
121 #[kithara::test]
122 fn service_class_default_is_idle() {
123 assert_eq!(ServiceClass::default(), ServiceClass::Idle);
124 }
125}