Skip to main content

kithara_audio/worker/
handle.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicU64, Ordering},
4};
5
6use kithara_decode::PcmChunk;
7use kithara_platform::tokio::sync::Notify;
8use tokio_util::sync::CancellationToken;
9
10use super::{
11    AudioWorkerSource,
12    decoder_node::DecoderNode,
13    hang_observer::HangWatchdogObserver,
14    types::{TrackId, TrackIdGen},
15};
16use crate::{
17    pipeline::fetch::Fetch,
18    runtime::{AtomicServiceClass, Scheduler, SchedulerHandle},
19};
20
21/// Everything needed to register a track with the shared worker.
22pub(crate) struct TrackRegistration {
23    pub(crate) preload_notify: Arc<Notify>,
24    pub(crate) source: Box<dyn AudioWorkerSource<Chunk = PcmChunk>>,
25    pub(crate) outlet: crate::runtime::Outlet<Fetch<PcmChunk>>,
26    /// Spent-chunk return ring: the real-time consumer ([`crate::Audio`])
27    /// hands every consumed `PcmChunk` here instead of dropping it, so the
28    /// pooled buffer is freed/recycled on the worker thread rather than on
29    /// the audio thread. See `crates/kithara-audio/README.md`.
30    pub(crate) trash_inlet: crate::runtime::Inlet<PcmChunk>,
31    /// Shared priority hint. The real-time consumer writes it wait-free
32    /// (`Audio::set_service_class`); the worker scheduler reads it each pass.
33    pub(crate) service_class: Arc<AtomicServiceClass>,
34    pub(crate) preload_chunks: usize,
35}
36
37/// Clonable handle to a shared audio worker.
38///
39/// Multiple [`Audio`](crate::Audio) handles can share one worker by cloning
40/// the handle and passing it via [`AudioConfig`](crate::AudioConfig).
41pub struct AudioWorkerHandle {
42    id_gen: Arc<TrackIdGen>,
43    inner: SchedulerHandle<Box<dyn crate::runtime::Node>>,
44}
45
46impl Clone for AudioWorkerHandle {
47    fn clone(&self) -> Self {
48        Self {
49            inner: self.inner.clone(),
50            id_gen: Arc::clone(&self.id_gen),
51        }
52    }
53}
54
55/// Monotonic counter for unique audio-worker thread names.
56static AUDIO_WORKER_ID: AtomicU64 = AtomicU64::new(0);
57
58impl AudioWorkerHandle {
59    /// Spawn a new shared worker with a fresh orphan cancel token.
60    ///
61    /// Convenience for tests and standalone usage. Production paths use
62    /// [`AudioWorkerHandle::with_cancel`] with a child of the player
63    /// master — see `kithara-play/README.md` "Cancel Hierarchy".
64    #[must_use]
65    // ast-grep-ignore: style.prefer-default-derive
66    pub fn new() -> Self {
67        Self::with_cancel(CancellationToken::new()) // kithara:cancel:owner
68    }
69
70    /// Register a track. Returns the assigned [`TrackId`].
71    ///
72    /// If the worker thread has already exited (e.g. after shutdown), the
73    /// registration is silently lost and the returned track will produce no
74    /// data. Callers must ensure the worker is alive before registering.
75    pub(crate) fn register_track(&self, reg: TrackRegistration) -> TrackId {
76        let id = self.id_gen.next();
77        let node: Box<dyn crate::runtime::Node> = Box::new(DecoderNode::from(reg));
78        self.inner.register(id, node);
79        id
80    }
81
82    /// Request graceful shutdown and cancel the worker.
83    pub fn shutdown(&self) {
84        self.inner.shutdown();
85    }
86
87    /// Remove a track by ID.
88    pub(crate) fn unregister_track(&self, track_id: TrackId) {
89        self.inner.unregister(track_id);
90    }
91
92    /// Wake the worker (e.g. when new data arrives from downloader).
93    pub fn wake(&self) {
94        self.inner.wake();
95    }
96
97    /// Spawn a new shared worker thread bound to the given cancel token
98    /// and return a handle. Production callers (e.g. `EngineImpl`) pass a
99    /// child of the player master so worker shutdown participates in the
100    /// unified cancel hierarchy.
101    #[must_use]
102    pub fn with_cancel(cancel: CancellationToken) -> Self {
103        let id = AUDIO_WORKER_ID.fetch_add(1, Ordering::Relaxed);
104        let inner = Scheduler::<Box<dyn crate::runtime::Node>, HangWatchdogObserver>::start(
105            format!("kithara-audio-worker-{id}"),
106            HangWatchdogObserver::new(),
107            cancel,
108        );
109
110        Self {
111            inner,
112            id_gen: Arc::new(TrackIdGen::new()),
113        }
114    }
115}
116
117impl Default for AudioWorkerHandle {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::{
126        sync::{
127            Arc,
128            atomic::{AtomicBool, Ordering},
129        },
130        time::Duration,
131    };
132
133    use kithara_decode::PcmChunk;
134    use kithara_platform::{
135        thread::sleep as thread_sleep,
136        time::{Instant, timeout as platform_timeout},
137        tokio::sync::Notify,
138    };
139    use kithara_stream::Timeline;
140    use kithara_test_utils::kithara;
141
142    use super::*;
143    use crate::{
144        pipeline::track_fsm::{TrackStep, WaitingReason},
145        runtime::connect,
146        worker::{AudioWorkerSource, thread_wake::ThreadWake, types::ServiceClass},
147    };
148
149    struct MockSource {
150        timeline: Timeline,
151        ready: bool,
152        should_panic: bool,
153        chunks_to_produce: usize,
154        cursor: usize,
155    }
156
157    impl MockSource {
158        fn new(chunks: usize) -> Self {
159            Self {
160                timeline: Timeline::new(),
161                chunks_to_produce: chunks,
162                cursor: 0,
163                ready: true,
164                should_panic: false,
165            }
166        }
167
168        fn not_ready(chunks: usize) -> Self {
169            Self {
170                ready: false,
171                ..Self::new(chunks)
172            }
173        }
174
175        fn panicking() -> Self {
176            Self {
177                should_panic: true,
178                ..Self::new(100)
179            }
180        }
181    }
182
183    impl AudioWorkerSource for MockSource {
184        type Chunk = PcmChunk;
185
186        fn step_track(&mut self) -> TrackStep<PcmChunk> {
187            if self.timeline.is_seek_pending() || self.timeline.is_flushing() {
188                let epoch = self.timeline.seek_epoch();
189                self.timeline.complete_seek(epoch);
190                self.timeline.clear_seek_pending(epoch);
191                return TrackStep::StateChanged;
192            }
193            if !self.ready {
194                return TrackStep::Blocked(WaitingReason::Waiting);
195            }
196            if self.should_panic {
197                panic!("mock panic for testing");
198            }
199            if self.cursor >= self.chunks_to_produce {
200                return TrackStep::Eof;
201            }
202            self.cursor += 1;
203            TrackStep::Produced(Fetch::new(PcmChunk::default(), false, 0))
204        }
205
206        fn timeline(&self) -> &Timeline {
207            &self.timeline
208        }
209    }
210
211    fn make_registration<S>(
212        source: S,
213        ringbuf_capacity: usize,
214        preload_chunks: usize,
215    ) -> (
216        TrackRegistration,
217        crate::runtime::Inlet<Fetch<PcmChunk>>,
218        Arc<Notify>,
219    )
220    where
221        S: AudioWorkerSource<Chunk = PcmChunk> + 'static,
222    {
223        let wake = Arc::new(ThreadWake::default());
224        let (outlet, inlet) = connect::<Fetch<PcmChunk>>(ringbuf_capacity, Some(wake.clone()));
225        let (_trash_outlet, trash_inlet) = connect::<PcmChunk>(ringbuf_capacity + 2, None);
226        let preload_notify = Arc::new(Notify::new());
227
228        let reg = TrackRegistration {
229            outlet,
230            trash_inlet,
231            preload_chunks,
232            source: Box::new(source),
233            preload_notify: Arc::clone(&preload_notify),
234            service_class: Arc::new(AtomicServiceClass::new(ServiceClass::Audible)),
235        };
236        (reg, inlet, preload_notify)
237    }
238
239    fn wait_for_chunks(
240        rx: &mut crate::runtime::Inlet<Fetch<PcmChunk>>,
241        count: usize,
242        timeout: Duration,
243    ) -> usize {
244        let start = Instant::now();
245        let mut received = 0;
246        while received < count && start.elapsed() < timeout {
247            if rx.try_pop().is_some() {
248                received += 1;
249            } else {
250                thread_sleep(Duration::from_millis(1));
251            }
252        }
253        received
254    }
255
256    #[kithara::test]
257    fn worker_creates_and_drops_cleanly() {
258        let handle = AudioWorkerHandle::new();
259        thread_sleep(Duration::from_millis(10));
260        handle.shutdown();
261        thread_sleep(Duration::from_millis(50));
262    }
263
264    #[kithara::test]
265    fn worker_delivers_chunks() {
266        let handle = AudioWorkerHandle::new();
267        let (reg, mut data_rx, _preload_notify) = make_registration(MockSource::new(10), 32, 3);
268
269        let _id = handle.register_track(reg);
270
271        let received = wait_for_chunks(&mut data_rx, 5, Duration::from_secs(5));
272        assert!(received >= 5, "expected >=5 chunks, got {received}");
273
274        handle.shutdown();
275    }
276
277    #[kithara::test]
278    fn worker_multi_track_round_robin() {
279        let handle = AudioWorkerHandle::new();
280
281        let (reg_a, mut rx_a, _) = make_registration(MockSource::new(10), 32, 1);
282        let (reg_b, mut rx_b, _) = make_registration(MockSource::new(10), 32, 1);
283
284        let _id_a = handle.register_track(reg_a);
285        let _id_b = handle.register_track(reg_b);
286
287        let a = wait_for_chunks(&mut rx_a, 3, Duration::from_secs(5));
288        let b = wait_for_chunks(&mut rx_b, 3, Duration::from_secs(5));
289        assert!(a >= 3, "track A: expected >=3 chunks, got {a}");
290        assert!(b >= 3, "track B: expected >=3 chunks, got {b}");
291
292        handle.shutdown();
293    }
294
295    #[kithara::test]
296    fn worker_skips_not_ready_tracks() {
297        let handle = AudioWorkerHandle::new();
298
299        let (reg_a, mut rx_a, _) = make_registration(MockSource::new(10), 32, 1);
300        let (reg_b, mut rx_b, _) = make_registration(MockSource::not_ready(10), 32, 1);
301
302        let _id_a = handle.register_track(reg_a);
303        let _id_b = handle.register_track(reg_b);
304
305        thread_sleep(Duration::from_millis(100));
306
307        let a = wait_for_chunks(&mut rx_a, 1, Duration::from_millis(100));
308        let b = wait_for_chunks(&mut rx_b, 1, Duration::from_millis(50));
309        assert!(a >= 1, "track A should receive chunks");
310        assert_eq!(b, 0, "track B should receive nothing (not ready)");
311
312        handle.shutdown();
313    }
314
315    #[kithara::test]
316    fn worker_overflow_on_full_ringbuf() {
317        let handle = AudioWorkerHandle::new();
318
319        let (reg, mut rx, _) = make_registration(MockSource::new(5), 1, 1);
320
321        let _id = handle.register_track(reg);
322
323        thread_sleep(Duration::from_millis(50));
324
325        let first = rx.try_pop();
326        assert!(first.is_some(), "should have at least one chunk");
327
328        thread_sleep(Duration::from_millis(50));
329
330        let second = rx.try_pop();
331        assert!(second.is_some(), "overflow slot should have been flushed");
332
333        handle.shutdown();
334    }
335
336    #[kithara::test]
337    fn worker_panic_isolation() {
338        let handle = AudioWorkerHandle::new();
339
340        let (reg_a, _, _) = make_registration(MockSource::panicking(), 32, 1);
341        let (reg_b, mut rx_b, _) = make_registration(MockSource::new(10), 32, 1);
342
343        let _id_a = handle.register_track(reg_a);
344        let _id_b = handle.register_track(reg_b);
345
346        let b = wait_for_chunks(&mut rx_b, 3, Duration::from_secs(5));
347        assert!(
348            b >= 3,
349            "track B should keep working after track A panics, got {b}"
350        );
351
352        handle.shutdown();
353    }
354
355    #[kithara::test]
356    fn worker_seek_enters_pending_reset() {
357        let handle = AudioWorkerHandle::new();
358
359        let source = MockSource::new(100);
360        let timeline = source.timeline.clone();
361        let (reg, mut rx, _) = make_registration(source, 32, 1);
362
363        let _id = handle.register_track(reg);
364
365        let got = wait_for_chunks(&mut rx, 2, Duration::from_secs(5));
366        assert!(got >= 2);
367
368        let _ = timeline.initiate_seek(Duration::from_secs(10));
369        handle.wake();
370
371        thread_sleep(Duration::from_millis(100));
372
373        let after_seek = wait_for_chunks(&mut rx, 1, Duration::from_secs(5));
374        assert!(after_seek >= 1, "should resume decoding after seek");
375
376        handle.shutdown();
377    }
378
379    #[kithara::test]
380    fn worker_preload_notify_fires() {
381        let handle = AudioWorkerHandle::new();
382
383        let (reg, _rx, _preload_notify) = make_registration(MockSource::new(10), 32, 3);
384
385        let _id = handle.register_track(reg);
386
387        thread_sleep(Duration::from_millis(200));
388
389        handle.shutdown();
390    }
391
392    #[kithara::test(tokio)]
393    async fn worker_preload_notify_rearms_after_seek() {
394        let handle = AudioWorkerHandle::new();
395
396        let (reg, _rx, preload_notify) = make_registration(MockSource::new(10), 32, 1);
397        let timeline = reg.source.timeline().clone();
398        let _id = handle.register_track(reg);
399
400        platform_timeout(Duration::from_secs(1), preload_notify.notified())
401            .await
402            .expect("initial preload notify must fire");
403
404        let _ = timeline.initiate_seek(Duration::from_secs(1));
405        handle.wake();
406
407        platform_timeout(Duration::from_secs(1), preload_notify.notified())
408            .await
409            .expect("seek must re-arm preload notify");
410
411        handle.shutdown();
412    }
413
414    #[kithara::test]
415    fn worker_unregister_removes_track() {
416        let handle = AudioWorkerHandle::new();
417
418        let (reg, mut rx, _) = make_registration(MockSource::new(100), 32, 1);
419
420        let id = handle.register_track(reg);
421
422        let got = wait_for_chunks(&mut rx, 2, Duration::from_secs(5));
423        assert!(got >= 2);
424
425        handle.unregister_track(id);
426        thread_sleep(Duration::from_millis(50));
427
428        while rx.try_pop().is_some() {}
429
430        thread_sleep(Duration::from_millis(50));
431        assert!(rx.try_pop().is_none(), "no chunks after unregister");
432
433        handle.shutdown();
434    }
435
436    #[kithara::test]
437    fn worker_service_class_prioritises_audible() {
438        let handle = AudioWorkerHandle::new();
439
440        let (reg_a, mut rx_a, _) = make_registration(MockSource::new(100), 4, 0);
441        let class_a = Arc::clone(&reg_a.service_class);
442        let _id_a = handle.register_track(reg_a);
443
444        let (reg_b, mut rx_b, _) = make_registration(MockSource::new(100), 4, 0);
445        let class_b = Arc::clone(&reg_b.service_class);
446        let _id_b = handle.register_track(reg_b);
447
448        thread_sleep(Duration::from_millis(30));
449
450        while rx_a.try_pop().is_some() {}
451        while rx_b.try_pop().is_some() {}
452
453        class_a.store(ServiceClass::Idle);
454        class_b.store(ServiceClass::Audible);
455        handle.wake();
456
457        thread_sleep(Duration::from_millis(50));
458
459        let got_a = {
460            let mut n = 0;
461            while rx_a.try_pop().is_some() {
462                n += 1;
463            }
464            n
465        };
466        let got_b = {
467            let mut n = 0;
468            while rx_b.try_pop().is_some() {
469                n += 1;
470            }
471            n
472        };
473        assert!(
474            got_b >= got_a,
475            "Audible track should get at least as many chunks: A={got_a}, B={got_b}"
476        );
477
478        handle.shutdown();
479    }
480
481    /// A slow/blocked track must not starve a producing track.
482    ///
483    /// Reproduces the production bug: HLS track waiting for network data
484    /// blocks the shared worker's `step_track()` call, causing MP3 track
485    /// audio to stutter.
486    ///
487    /// The mock simulates a track whose `step_track()` blocks the thread
488    /// for 50ms (like a real `wait_range()` call waiting for network data).
489    /// The worker must still deliver chunks to the ready track at a
490    /// rate sufficient for glitch-free playback.
491    #[kithara::test]
492    fn shared_worker_blocking_track_does_not_starve_producing_track() {
493        struct BlockingSource {
494            timeline: Timeline,
495            blocking: Arc<AtomicBool>,
496        }
497
498        impl AudioWorkerSource for BlockingSource {
499            type Chunk = PcmChunk;
500
501            fn step_track(&mut self) -> TrackStep<PcmChunk> {
502                if self.blocking.load(Ordering::Relaxed) {
503                    thread_sleep(Duration::from_millis(10));
504                    TrackStep::Blocked(WaitingReason::Waiting)
505                } else {
506                    TrackStep::Blocked(WaitingReason::Waiting)
507                }
508            }
509
510            fn timeline(&self) -> &Timeline {
511                &self.timeline
512            }
513        }
514
515        let handle = AudioWorkerHandle::new();
516
517        let (reg_a, mut rx_a, _) = make_registration(MockSource::new(100), 32, 0);
518        let _id_a = handle.register_track(reg_a);
519
520        let blocking = Arc::new(AtomicBool::new(true));
521        let blocking_source = BlockingSource {
522            timeline: Timeline::new(),
523            blocking: Arc::clone(&blocking),
524        };
525        let (reg_b, _rx_b, _) = make_registration(blocking_source, 32, 0);
526        let _id_b = handle.register_track(reg_b);
527
528        thread_sleep(Duration::from_millis(500));
529
530        let mut got_a = 0;
531        while rx_a.try_pop().is_some() {
532            got_a += 1;
533        }
534
535        assert!(
536            got_a >= 11,
537            "Producing track must not be starved by blocking track: \
538             got only {got_a} chunks in 1s (expected ≥11 for glitch-free)"
539        );
540
541        blocking.store(false, Ordering::Relaxed);
542        handle.shutdown();
543    }
544
545    /// A track that blocks inside `step_track()` (simulating Symphonia read
546    /// waiting on network data) must not starve other tracks.
547    ///
548    /// This is the REAL production bug: HLS decode path enters `wait_range()`
549    /// which blocks the entire worker thread. MP3 track's ringbuf drains
550    /// during the block, causing audio underrun.
551    ///
552    /// Target: even with 50ms blocking per HLS step, MP3 track should
553    /// still receive enough chunks for glitch-free playback.
554    #[kithara::test]
555    fn shared_worker_sync_blocking_step_starves_other_tracks() {
556        struct SlowDecodeSource {
557            timeline: Timeline,
558            block_ms: u64,
559        }
560
561        impl AudioWorkerSource for SlowDecodeSource {
562            type Chunk = PcmChunk;
563
564            fn step_track(&mut self) -> TrackStep<PcmChunk> {
565                thread_sleep(Duration::from_millis(self.block_ms));
566                TrackStep::Produced(Fetch::new(PcmChunk::default(), false, 0))
567            }
568
569            fn timeline(&self) -> &Timeline {
570                &self.timeline
571            }
572        }
573
574        let handle = AudioWorkerHandle::new();
575
576        let (reg_a, mut rx_a, _) = make_registration(MockSource::new(1000), 32, 0);
577        let _id_a = handle.register_track(reg_a);
578
579        let slow_source = SlowDecodeSource {
580            timeline: Timeline::new(),
581            block_ms: 10,
582        };
583        let (reg_b, mut rx_b, _) = make_registration(slow_source, 32, 0);
584        let _id_b = handle.register_track(reg_b);
585
586        let mut max_gap = Duration::ZERO;
587        let mut last_chunk_time = Instant::now();
588        let mut total_chunks = 0u32;
589        let deadline = Instant::now() + Duration::from_secs(1);
590
591        while Instant::now() < deadline {
592            if rx_a.try_pop().is_some() {
593                let gap = last_chunk_time.elapsed();
594                if total_chunks > 0 && gap > max_gap {
595                    max_gap = gap;
596                }
597                last_chunk_time = Instant::now();
598                total_chunks += 1;
599            }
600            while rx_b.try_pop().is_some() {}
601            thread_sleep(Duration::from_millis(5));
602        }
603
604        assert!(
605            max_gap < Duration::from_millis(46),
606            "Max gap between chunks for fast track: {max_gap:?} (limit 46ms). \
607             Slow track's sync blocking causes starvation. \
608             Total chunks delivered: {total_chunks}"
609        );
610    }
611}