1use std::sync::{
2 Arc,
3 atomic::{AtomicU64, Ordering},
4};
5
6use kithara_decode::PcmChunk;
7use kithara_platform::tokio::sync::Notify;
8use tokio_util::sync::CancellationToken;
9
10use super::{
11 AudioWorkerSource,
12 decoder_node::DecoderNode,
13 hang_observer::HangWatchdogObserver,
14 types::{TrackId, TrackIdGen},
15};
16use crate::{
17 pipeline::fetch::Fetch,
18 runtime::{AtomicServiceClass, Scheduler, SchedulerHandle},
19};
20
21pub(crate) struct TrackRegistration {
23 pub(crate) preload_notify: Arc<Notify>,
24 pub(crate) source: Box<dyn AudioWorkerSource<Chunk = PcmChunk>>,
25 pub(crate) outlet: crate::runtime::Outlet<Fetch<PcmChunk>>,
26 pub(crate) trash_inlet: crate::runtime::Inlet<PcmChunk>,
31 pub(crate) service_class: Arc<AtomicServiceClass>,
34 pub(crate) preload_chunks: usize,
35}
36
37pub struct AudioWorkerHandle {
42 id_gen: Arc<TrackIdGen>,
43 inner: SchedulerHandle<Box<dyn crate::runtime::Node>>,
44}
45
46impl Clone for AudioWorkerHandle {
47 fn clone(&self) -> Self {
48 Self {
49 inner: self.inner.clone(),
50 id_gen: Arc::clone(&self.id_gen),
51 }
52 }
53}
54
55static AUDIO_WORKER_ID: AtomicU64 = AtomicU64::new(0);
57
58impl AudioWorkerHandle {
59 #[must_use]
65 pub fn new() -> Self {
67 Self::with_cancel(CancellationToken::new()) }
69
70 pub(crate) fn register_track(&self, reg: TrackRegistration) -> TrackId {
76 let id = self.id_gen.next();
77 let node: Box<dyn crate::runtime::Node> = Box::new(DecoderNode::from(reg));
78 self.inner.register(id, node);
79 id
80 }
81
82 pub fn shutdown(&self) {
84 self.inner.shutdown();
85 }
86
87 pub(crate) fn unregister_track(&self, track_id: TrackId) {
89 self.inner.unregister(track_id);
90 }
91
92 pub fn wake(&self) {
94 self.inner.wake();
95 }
96
97 #[must_use]
102 pub fn with_cancel(cancel: CancellationToken) -> Self {
103 let id = AUDIO_WORKER_ID.fetch_add(1, Ordering::Relaxed);
104 let inner = Scheduler::<Box<dyn crate::runtime::Node>, HangWatchdogObserver>::start(
105 format!("kithara-audio-worker-{id}"),
106 HangWatchdogObserver::new(),
107 cancel,
108 );
109
110 Self {
111 inner,
112 id_gen: Arc::new(TrackIdGen::new()),
113 }
114 }
115}
116
117impl Default for AudioWorkerHandle {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::{
126 sync::{
127 Arc,
128 atomic::{AtomicBool, Ordering},
129 },
130 time::Duration,
131 };
132
133 use kithara_decode::PcmChunk;
134 use kithara_platform::{
135 thread::sleep as thread_sleep,
136 time::{Instant, timeout as platform_timeout},
137 tokio::sync::Notify,
138 };
139 use kithara_stream::Timeline;
140 use kithara_test_utils::kithara;
141
142 use super::*;
143 use crate::{
144 pipeline::track_fsm::{TrackStep, WaitingReason},
145 runtime::connect,
146 worker::{AudioWorkerSource, thread_wake::ThreadWake, types::ServiceClass},
147 };
148
149 struct MockSource {
150 timeline: Timeline,
151 ready: bool,
152 should_panic: bool,
153 chunks_to_produce: usize,
154 cursor: usize,
155 }
156
157 impl MockSource {
158 fn new(chunks: usize) -> Self {
159 Self {
160 timeline: Timeline::new(),
161 chunks_to_produce: chunks,
162 cursor: 0,
163 ready: true,
164 should_panic: false,
165 }
166 }
167
168 fn not_ready(chunks: usize) -> Self {
169 Self {
170 ready: false,
171 ..Self::new(chunks)
172 }
173 }
174
175 fn panicking() -> Self {
176 Self {
177 should_panic: true,
178 ..Self::new(100)
179 }
180 }
181 }
182
183 impl AudioWorkerSource for MockSource {
184 type Chunk = PcmChunk;
185
186 fn step_track(&mut self) -> TrackStep<PcmChunk> {
187 if self.timeline.is_seek_pending() || self.timeline.is_flushing() {
188 let epoch = self.timeline.seek_epoch();
189 self.timeline.complete_seek(epoch);
190 self.timeline.clear_seek_pending(epoch);
191 return TrackStep::StateChanged;
192 }
193 if !self.ready {
194 return TrackStep::Blocked(WaitingReason::Waiting);
195 }
196 if self.should_panic {
197 panic!("mock panic for testing");
198 }
199 if self.cursor >= self.chunks_to_produce {
200 return TrackStep::Eof;
201 }
202 self.cursor += 1;
203 TrackStep::Produced(Fetch::new(PcmChunk::default(), false, 0))
204 }
205
206 fn timeline(&self) -> &Timeline {
207 &self.timeline
208 }
209 }
210
211 fn make_registration<S>(
212 source: S,
213 ringbuf_capacity: usize,
214 preload_chunks: usize,
215 ) -> (
216 TrackRegistration,
217 crate::runtime::Inlet<Fetch<PcmChunk>>,
218 Arc<Notify>,
219 )
220 where
221 S: AudioWorkerSource<Chunk = PcmChunk> + 'static,
222 {
223 let wake = Arc::new(ThreadWake::default());
224 let (outlet, inlet) = connect::<Fetch<PcmChunk>>(ringbuf_capacity, Some(wake.clone()));
225 let (_trash_outlet, trash_inlet) = connect::<PcmChunk>(ringbuf_capacity + 2, None);
226 let preload_notify = Arc::new(Notify::new());
227
228 let reg = TrackRegistration {
229 outlet,
230 trash_inlet,
231 preload_chunks,
232 source: Box::new(source),
233 preload_notify: Arc::clone(&preload_notify),
234 service_class: Arc::new(AtomicServiceClass::new(ServiceClass::Audible)),
235 };
236 (reg, inlet, preload_notify)
237 }
238
239 fn wait_for_chunks(
240 rx: &mut crate::runtime::Inlet<Fetch<PcmChunk>>,
241 count: usize,
242 timeout: Duration,
243 ) -> usize {
244 let start = Instant::now();
245 let mut received = 0;
246 while received < count && start.elapsed() < timeout {
247 if rx.try_pop().is_some() {
248 received += 1;
249 } else {
250 thread_sleep(Duration::from_millis(1));
251 }
252 }
253 received
254 }
255
256 #[kithara::test]
257 fn worker_creates_and_drops_cleanly() {
258 let handle = AudioWorkerHandle::new();
259 thread_sleep(Duration::from_millis(10));
260 handle.shutdown();
261 thread_sleep(Duration::from_millis(50));
262 }
263
264 #[kithara::test]
265 fn worker_delivers_chunks() {
266 let handle = AudioWorkerHandle::new();
267 let (reg, mut data_rx, _preload_notify) = make_registration(MockSource::new(10), 32, 3);
268
269 let _id = handle.register_track(reg);
270
271 let received = wait_for_chunks(&mut data_rx, 5, Duration::from_secs(5));
272 assert!(received >= 5, "expected >=5 chunks, got {received}");
273
274 handle.shutdown();
275 }
276
277 #[kithara::test]
278 fn worker_multi_track_round_robin() {
279 let handle = AudioWorkerHandle::new();
280
281 let (reg_a, mut rx_a, _) = make_registration(MockSource::new(10), 32, 1);
282 let (reg_b, mut rx_b, _) = make_registration(MockSource::new(10), 32, 1);
283
284 let _id_a = handle.register_track(reg_a);
285 let _id_b = handle.register_track(reg_b);
286
287 let a = wait_for_chunks(&mut rx_a, 3, Duration::from_secs(5));
288 let b = wait_for_chunks(&mut rx_b, 3, Duration::from_secs(5));
289 assert!(a >= 3, "track A: expected >=3 chunks, got {a}");
290 assert!(b >= 3, "track B: expected >=3 chunks, got {b}");
291
292 handle.shutdown();
293 }
294
295 #[kithara::test]
296 fn worker_skips_not_ready_tracks() {
297 let handle = AudioWorkerHandle::new();
298
299 let (reg_a, mut rx_a, _) = make_registration(MockSource::new(10), 32, 1);
300 let (reg_b, mut rx_b, _) = make_registration(MockSource::not_ready(10), 32, 1);
301
302 let _id_a = handle.register_track(reg_a);
303 let _id_b = handle.register_track(reg_b);
304
305 thread_sleep(Duration::from_millis(100));
306
307 let a = wait_for_chunks(&mut rx_a, 1, Duration::from_millis(100));
308 let b = wait_for_chunks(&mut rx_b, 1, Duration::from_millis(50));
309 assert!(a >= 1, "track A should receive chunks");
310 assert_eq!(b, 0, "track B should receive nothing (not ready)");
311
312 handle.shutdown();
313 }
314
315 #[kithara::test]
316 fn worker_overflow_on_full_ringbuf() {
317 let handle = AudioWorkerHandle::new();
318
319 let (reg, mut rx, _) = make_registration(MockSource::new(5), 1, 1);
320
321 let _id = handle.register_track(reg);
322
323 thread_sleep(Duration::from_millis(50));
324
325 let first = rx.try_pop();
326 assert!(first.is_some(), "should have at least one chunk");
327
328 thread_sleep(Duration::from_millis(50));
329
330 let second = rx.try_pop();
331 assert!(second.is_some(), "overflow slot should have been flushed");
332
333 handle.shutdown();
334 }
335
336 #[kithara::test]
337 fn worker_panic_isolation() {
338 let handle = AudioWorkerHandle::new();
339
340 let (reg_a, _, _) = make_registration(MockSource::panicking(), 32, 1);
341 let (reg_b, mut rx_b, _) = make_registration(MockSource::new(10), 32, 1);
342
343 let _id_a = handle.register_track(reg_a);
344 let _id_b = handle.register_track(reg_b);
345
346 let b = wait_for_chunks(&mut rx_b, 3, Duration::from_secs(5));
347 assert!(
348 b >= 3,
349 "track B should keep working after track A panics, got {b}"
350 );
351
352 handle.shutdown();
353 }
354
355 #[kithara::test]
356 fn worker_seek_enters_pending_reset() {
357 let handle = AudioWorkerHandle::new();
358
359 let source = MockSource::new(100);
360 let timeline = source.timeline.clone();
361 let (reg, mut rx, _) = make_registration(source, 32, 1);
362
363 let _id = handle.register_track(reg);
364
365 let got = wait_for_chunks(&mut rx, 2, Duration::from_secs(5));
366 assert!(got >= 2);
367
368 let _ = timeline.initiate_seek(Duration::from_secs(10));
369 handle.wake();
370
371 thread_sleep(Duration::from_millis(100));
372
373 let after_seek = wait_for_chunks(&mut rx, 1, Duration::from_secs(5));
374 assert!(after_seek >= 1, "should resume decoding after seek");
375
376 handle.shutdown();
377 }
378
379 #[kithara::test]
380 fn worker_preload_notify_fires() {
381 let handle = AudioWorkerHandle::new();
382
383 let (reg, _rx, _preload_notify) = make_registration(MockSource::new(10), 32, 3);
384
385 let _id = handle.register_track(reg);
386
387 thread_sleep(Duration::from_millis(200));
388
389 handle.shutdown();
390 }
391
392 #[kithara::test(tokio)]
393 async fn worker_preload_notify_rearms_after_seek() {
394 let handle = AudioWorkerHandle::new();
395
396 let (reg, _rx, preload_notify) = make_registration(MockSource::new(10), 32, 1);
397 let timeline = reg.source.timeline().clone();
398 let _id = handle.register_track(reg);
399
400 platform_timeout(Duration::from_secs(1), preload_notify.notified())
401 .await
402 .expect("initial preload notify must fire");
403
404 let _ = timeline.initiate_seek(Duration::from_secs(1));
405 handle.wake();
406
407 platform_timeout(Duration::from_secs(1), preload_notify.notified())
408 .await
409 .expect("seek must re-arm preload notify");
410
411 handle.shutdown();
412 }
413
414 #[kithara::test]
415 fn worker_unregister_removes_track() {
416 let handle = AudioWorkerHandle::new();
417
418 let (reg, mut rx, _) = make_registration(MockSource::new(100), 32, 1);
419
420 let id = handle.register_track(reg);
421
422 let got = wait_for_chunks(&mut rx, 2, Duration::from_secs(5));
423 assert!(got >= 2);
424
425 handle.unregister_track(id);
426 thread_sleep(Duration::from_millis(50));
427
428 while rx.try_pop().is_some() {}
429
430 thread_sleep(Duration::from_millis(50));
431 assert!(rx.try_pop().is_none(), "no chunks after unregister");
432
433 handle.shutdown();
434 }
435
436 #[kithara::test]
437 fn worker_service_class_prioritises_audible() {
438 let handle = AudioWorkerHandle::new();
439
440 let (reg_a, mut rx_a, _) = make_registration(MockSource::new(100), 4, 0);
441 let class_a = Arc::clone(®_a.service_class);
442 let _id_a = handle.register_track(reg_a);
443
444 let (reg_b, mut rx_b, _) = make_registration(MockSource::new(100), 4, 0);
445 let class_b = Arc::clone(®_b.service_class);
446 let _id_b = handle.register_track(reg_b);
447
448 thread_sleep(Duration::from_millis(30));
449
450 while rx_a.try_pop().is_some() {}
451 while rx_b.try_pop().is_some() {}
452
453 class_a.store(ServiceClass::Idle);
454 class_b.store(ServiceClass::Audible);
455 handle.wake();
456
457 thread_sleep(Duration::from_millis(50));
458
459 let got_a = {
460 let mut n = 0;
461 while rx_a.try_pop().is_some() {
462 n += 1;
463 }
464 n
465 };
466 let got_b = {
467 let mut n = 0;
468 while rx_b.try_pop().is_some() {
469 n += 1;
470 }
471 n
472 };
473 assert!(
474 got_b >= got_a,
475 "Audible track should get at least as many chunks: A={got_a}, B={got_b}"
476 );
477
478 handle.shutdown();
479 }
480
481 #[kithara::test]
492 fn shared_worker_blocking_track_does_not_starve_producing_track() {
493 struct BlockingSource {
494 timeline: Timeline,
495 blocking: Arc<AtomicBool>,
496 }
497
498 impl AudioWorkerSource for BlockingSource {
499 type Chunk = PcmChunk;
500
501 fn step_track(&mut self) -> TrackStep<PcmChunk> {
502 if self.blocking.load(Ordering::Relaxed) {
503 thread_sleep(Duration::from_millis(10));
504 TrackStep::Blocked(WaitingReason::Waiting)
505 } else {
506 TrackStep::Blocked(WaitingReason::Waiting)
507 }
508 }
509
510 fn timeline(&self) -> &Timeline {
511 &self.timeline
512 }
513 }
514
515 let handle = AudioWorkerHandle::new();
516
517 let (reg_a, mut rx_a, _) = make_registration(MockSource::new(100), 32, 0);
518 let _id_a = handle.register_track(reg_a);
519
520 let blocking = Arc::new(AtomicBool::new(true));
521 let blocking_source = BlockingSource {
522 timeline: Timeline::new(),
523 blocking: Arc::clone(&blocking),
524 };
525 let (reg_b, _rx_b, _) = make_registration(blocking_source, 32, 0);
526 let _id_b = handle.register_track(reg_b);
527
528 thread_sleep(Duration::from_millis(500));
529
530 let mut got_a = 0;
531 while rx_a.try_pop().is_some() {
532 got_a += 1;
533 }
534
535 assert!(
536 got_a >= 11,
537 "Producing track must not be starved by blocking track: \
538 got only {got_a} chunks in 1s (expected ≥11 for glitch-free)"
539 );
540
541 blocking.store(false, Ordering::Relaxed);
542 handle.shutdown();
543 }
544
545 #[kithara::test]
555 fn shared_worker_sync_blocking_step_starves_other_tracks() {
556 struct SlowDecodeSource {
557 timeline: Timeline,
558 block_ms: u64,
559 }
560
561 impl AudioWorkerSource for SlowDecodeSource {
562 type Chunk = PcmChunk;
563
564 fn step_track(&mut self) -> TrackStep<PcmChunk> {
565 thread_sleep(Duration::from_millis(self.block_ms));
566 TrackStep::Produced(Fetch::new(PcmChunk::default(), false, 0))
567 }
568
569 fn timeline(&self) -> &Timeline {
570 &self.timeline
571 }
572 }
573
574 let handle = AudioWorkerHandle::new();
575
576 let (reg_a, mut rx_a, _) = make_registration(MockSource::new(1000), 32, 0);
577 let _id_a = handle.register_track(reg_a);
578
579 let slow_source = SlowDecodeSource {
580 timeline: Timeline::new(),
581 block_ms: 10,
582 };
583 let (reg_b, mut rx_b, _) = make_registration(slow_source, 32, 0);
584 let _id_b = handle.register_track(reg_b);
585
586 let mut max_gap = Duration::ZERO;
587 let mut last_chunk_time = Instant::now();
588 let mut total_chunks = 0u32;
589 let deadline = Instant::now() + Duration::from_secs(1);
590
591 while Instant::now() < deadline {
592 if rx_a.try_pop().is_some() {
593 let gap = last_chunk_time.elapsed();
594 if total_chunks > 0 && gap > max_gap {
595 max_gap = gap;
596 }
597 last_chunk_time = Instant::now();
598 total_chunks += 1;
599 }
600 while rx_b.try_pop().is_some() {}
601 thread_sleep(Duration::from_millis(5));
602 }
603
604 assert!(
605 max_gap < Duration::from_millis(46),
606 "Max gap between chunks for fast track: {max_gap:?} (limit 46ms). \
607 Slow track's sync blocking causes starvation. \
608 Total chunks delivered: {total_chunks}"
609 );
610 }
611}