1use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::{Duration, Instant};
10
11enum ClockState {
24 Stopped,
25 Running { started_at: Instant, base: Duration },
26 Paused { frozen_at: Duration },
27}
28
29pub struct PlaybackClock {
54 state: ClockState,
55 rate: f64,
57 seek_offset: Duration,
60}
61
62impl PlaybackClock {
63 #[must_use]
65 pub fn new() -> Self {
66 Self {
67 state: ClockState::Stopped,
68 rate: 1.0,
69 seek_offset: Duration::ZERO,
70 }
71 }
72
73 pub fn start(&mut self) {
81 let base = match &self.state {
82 ClockState::Running { .. } => return,
83 ClockState::Stopped => self.seek_offset,
84 ClockState::Paused { frozen_at } => *frozen_at,
85 };
86 self.state = ClockState::Running {
87 started_at: Instant::now(),
88 base,
89 };
90 }
91
92 pub fn stop(&mut self) {
97 self.state = ClockState::Stopped;
98 self.seek_offset = Duration::ZERO;
99 }
100
101 pub fn pause(&mut self) {
106 if let ClockState::Running { started_at, base } = &self.state {
107 let elapsed = started_at.elapsed().mul_f64(self.rate);
108 self.state = ClockState::Paused {
109 frozen_at: *base + elapsed,
110 };
111 }
112 }
113
114 pub fn resume(&mut self) {
116 if let ClockState::Paused { frozen_at } = self.state {
117 self.state = ClockState::Running {
118 started_at: Instant::now(),
119 base: frozen_at,
120 };
121 }
122 }
123
124 #[must_use]
129 pub fn current_time(&self) -> Duration {
130 match &self.state {
131 ClockState::Stopped => Duration::ZERO,
132 ClockState::Paused { frozen_at } => *frozen_at,
133 ClockState::Running { started_at, base } => {
134 *base + started_at.elapsed().mul_f64(self.rate)
135 }
136 }
137 }
138
139 #[must_use]
145 pub fn current_pts(&self) -> Duration {
146 match &self.state {
147 ClockState::Stopped => self.seek_offset,
148 _ => self.current_time(),
149 }
150 }
151
152 #[must_use]
154 pub fn is_running(&self) -> bool {
155 matches!(self.state, ClockState::Running { .. })
156 }
157
158 pub fn set_rate(&mut self, rate: f64) {
163 if rate <= 0.0 {
164 return;
165 }
166 if let ClockState::Running { started_at, base } = &mut self.state {
167 let elapsed = started_at.elapsed().mul_f64(self.rate);
169 *base += elapsed;
170 *started_at = Instant::now();
171 }
172 self.rate = rate;
173 }
174
175 #[must_use]
177 pub fn rate(&self) -> f64 {
178 self.rate
179 }
180
181 pub fn set_position(&mut self, pts: Duration) {
191 self.seek_offset = pts;
193 if matches!(self.state, ClockState::Running { .. }) {
194 self.state = ClockState::Running {
196 started_at: Instant::now(),
197 base: pts,
198 };
199 } else if matches!(self.state, ClockState::Paused { .. }) {
200 self.state = ClockState::Paused { frozen_at: pts };
201 }
202 }
204}
205
206impl Default for PlaybackClock {
207 fn default() -> Self {
208 Self::new()
209 }
210}
211
212pub(crate) enum MasterClock {
219 Audio {
220 samples_consumed: Arc<AtomicU64>,
221 sample_rate: u32,
222 fallback: Option<(Instant, Duration)>,
231 },
232 System {
233 started_at: Instant,
234 base_pts: Duration,
235 },
236}
237
238impl MasterClock {
239 #[allow(clippy::cast_precision_loss)]
241 pub(crate) fn current_pts(&self) -> Duration {
242 match self {
243 Self::Audio {
244 samples_consumed,
245 sample_rate,
246 fallback,
247 } => {
248 let s = samples_consumed.load(Ordering::Relaxed);
249 if s > 0 {
250 Duration::from_secs_f64(s as f64 / f64::from(*sample_rate))
252 } else if let Some((started_at, base_pts)) = fallback {
253 *base_pts + started_at.elapsed()
255 } else {
256 Duration::ZERO
257 }
258 }
259 Self::System {
260 started_at,
261 base_pts,
262 } => *base_pts + started_at.elapsed(),
263 }
264 }
265
266 pub(crate) fn should_sync(&self) -> bool {
277 match self {
278 Self::System { .. } => true,
279 Self::Audio {
280 samples_consumed,
281 fallback,
282 ..
283 } => samples_consumed.load(Ordering::Relaxed) > 0 || fallback.is_some(),
284 }
285 }
286
287 pub(crate) fn activate_fallback_if_no_audio(&mut self, base_pts: Duration) {
301 if let Self::Audio {
302 samples_consumed,
303 fallback,
304 ..
305 } = self
306 && samples_consumed.load(Ordering::Relaxed) == 0
307 && fallback.is_none()
308 {
309 *fallback = Some((Instant::now(), base_pts));
310 }
311 }
312
313 pub(crate) fn reset(&mut self, base: Duration) {
324 match self {
325 Self::System {
326 started_at,
327 base_pts,
328 } => {
329 *started_at = Instant::now();
330 *base_pts = base;
331 }
332 Self::Audio { fallback, .. } => {
333 if fallback.is_some() {
334 *fallback = Some((Instant::now(), base));
335 }
336 }
337 }
338 }
339}
340
341#[cfg(test)]
344mod tests {
345 use super::*;
346 use std::thread;
347
348 #[test]
349 fn clock_stopped_should_return_zero() {
350 let clock = PlaybackClock::new();
352 assert_eq!(clock.current_time(), Duration::ZERO);
353
354 let mut clock = PlaybackClock::new();
356 clock.start();
357 thread::sleep(Duration::from_millis(5));
358 clock.stop();
359 assert_eq!(
360 clock.current_time(),
361 Duration::ZERO,
362 "current_time() must be ZERO after stop()"
363 );
364 }
365
366 #[test]
367 fn clock_paused_should_freeze_at_pause_time() {
368 let mut clock = PlaybackClock::new();
369 clock.start();
370 thread::sleep(Duration::from_millis(10));
371 clock.pause();
372
373 let t1 = clock.current_time();
374 thread::sleep(Duration::from_millis(10));
375 let t2 = clock.current_time();
376
377 assert_eq!(t1, t2, "current_time() must not advance while paused");
378 assert!(
379 !clock.is_running(),
380 "clock must not report running while paused"
381 );
382 }
383
384 #[test]
385 fn clock_resumed_should_continue_from_pause() {
386 let mut clock = PlaybackClock::new();
387 clock.start();
388 thread::sleep(Duration::from_millis(10));
389 clock.pause();
390 let t_paused = clock.current_time();
391
392 thread::sleep(Duration::from_millis(10));
394 assert_eq!(clock.current_time(), t_paused);
395
396 clock.resume();
397 assert!(clock.is_running());
398 thread::sleep(Duration::from_millis(10));
399
400 let t_after = clock.current_time();
401 assert!(
402 t_after > t_paused,
403 "current_time() must advance after resume(); paused={t_paused:?} after={t_after:?}"
404 );
405 }
406
407 #[test]
408 fn clock_start_should_be_noop_when_already_running() {
409 let mut clock = PlaybackClock::new();
410 clock.start();
411 thread::sleep(Duration::from_millis(10));
412 let t_before = clock.current_time();
413
414 clock.start();
416 let t_after = clock.current_time();
417
418 assert!(
419 t_after >= t_before,
420 "second start() must not reset the clock; before={t_before:?} after={t_after:?}"
421 );
422 }
423
424 #[test]
425 fn clock_resume_should_be_noop_when_not_paused() {
426 let mut clock = PlaybackClock::new();
428 clock.resume();
429 assert!(!clock.is_running());
430 assert_eq!(clock.current_time(), Duration::ZERO);
431
432 clock.start();
434 thread::sleep(Duration::from_millis(5));
435 let t = clock.current_time();
436 clock.resume(); assert!(clock.is_running());
438 assert!(clock.current_time() >= t);
439 }
440
441 #[test]
442 fn clock_default_should_equal_new() {
443 let a = PlaybackClock::new();
444 let b = PlaybackClock::default();
445 assert_eq!(a.current_time(), b.current_time());
446 assert_eq!(a.is_running(), b.is_running());
447 }
448
449 #[test]
450 fn set_rate_should_reject_non_positive_values() {
451 let mut clock = PlaybackClock::new();
452
453 clock.set_rate(0.0);
454 assert!(
455 (clock.rate() - 1.0).abs() < f64::EPSILON,
456 "rate must remain 1.0 after set_rate(0.0)"
457 );
458
459 clock.set_rate(-1.0);
460 assert!(
461 (clock.rate() - 1.0).abs() < f64::EPSILON,
462 "rate must remain 1.0 after set_rate(-1.0)"
463 );
464 }
465
466 #[test]
467 fn set_rate_should_update_rate_when_stopped_or_paused() {
468 let mut clock = PlaybackClock::new();
470 clock.set_rate(0.5);
471 assert!((clock.rate() - 0.5).abs() < f64::EPSILON);
472
473 let mut clock = PlaybackClock::new();
475 clock.start();
476 clock.pause();
477 clock.set_rate(2.0);
478 assert!((clock.rate() - 2.0).abs() < f64::EPSILON);
479 assert!(
480 !clock.is_running(),
481 "clock must remain paused after set_rate"
482 );
483 }
484
485 #[test]
486 fn set_rate_running_should_not_jump_current_time() {
487 let mut clock = PlaybackClock::new();
488 clock.start();
489 thread::sleep(Duration::from_millis(10));
490 let before = clock.current_time();
491 clock.set_rate(2.0);
492 let after = clock.current_time();
493
494 assert!(
497 after >= before,
498 "current_time() must not go backward on set_rate; before={before:?} after={after:?}"
499 );
500 assert!(
501 after - before < Duration::from_millis(20),
502 "current_time() must not jump forward on set_rate; before={before:?} after={after:?}"
503 );
504 assert!((clock.rate() - 2.0).abs() < f64::EPSILON);
505 }
506
507 #[test]
508 #[ignore = "performance thresholds are environment-dependent; run explicitly with -- --include-ignored"]
509 fn rate_two_x_should_advance_at_double_speed() {
510 let mut clock = PlaybackClock::new();
511 clock.set_rate(2.0);
512 clock.start();
513 thread::sleep(Duration::from_millis(50));
514 let elapsed = clock.current_time();
515
516 assert!(
518 elapsed >= Duration::from_millis(80),
519 "2× rate: expected ≥80 ms after 50 ms wall time, got {elapsed:?}"
520 );
521 }
522
523 #[test]
524 fn set_position_should_shift_pts_by_seek_offset() {
525 let seek_target = Duration::from_secs(30);
526
527 let mut clock = PlaybackClock::new();
529 clock.set_position(seek_target);
530 assert_eq!(
531 clock.current_pts(),
532 seek_target,
533 "current_pts() must reflect seek_offset when stopped"
534 );
535
536 clock.start();
538 let pts = clock.current_pts();
539 assert!(
540 pts >= seek_target,
541 "current_pts() must be ≥ seek target after start(); target={seek_target:?} pts={pts:?}"
542 );
543 assert!(
544 clock.is_running(),
545 "clock must be running after set_position + start()"
546 );
547 }
548
549 #[test]
550 fn set_position_while_paused_should_update_frozen_time() {
551 let mut clock = PlaybackClock::new();
552 clock.start();
553 thread::sleep(Duration::from_millis(5));
554 clock.pause();
555
556 let seek_target = Duration::from_secs(10);
557 clock.set_position(seek_target);
558
559 let pts = clock.current_pts();
560 assert_eq!(
561 pts, seek_target,
562 "frozen time must update to seek target; expected={seek_target:?} got={pts:?}"
563 );
564 assert!(
565 !clock.is_running(),
566 "clock must remain paused after set_position"
567 );
568
569 clock.resume();
571 thread::sleep(Duration::from_millis(5));
572 let pts_after = clock.current_pts();
573 assert!(
574 pts_after > seek_target,
575 "current_pts() must advance past seek target after resume(); target={seek_target:?} after={pts_after:?}"
576 );
577 }
578
579 #[test]
580 fn set_position_while_running_should_continue_from_new_position() {
581 let mut clock = PlaybackClock::new();
582 clock.start();
583 thread::sleep(Duration::from_millis(5));
584
585 let seek_target = Duration::from_secs(60);
586 clock.set_position(seek_target);
587
588 let pts = clock.current_pts();
589 assert!(
590 pts >= seek_target,
591 "current_pts() must be ≥ seek target immediately after set_position while running; \
592 target={seek_target:?} pts={pts:?}"
593 );
594 assert!(
595 clock.is_running(),
596 "clock must remain running after set_position"
597 );
598 }
599
600 #[test]
601 fn stop_should_clear_seek_offset() {
602 let mut clock = PlaybackClock::new();
603 clock.set_position(Duration::from_secs(30));
604 clock.stop();
605
606 assert_eq!(
607 clock.current_pts(),
608 Duration::ZERO,
609 "stop() must reset seek_offset to ZERO"
610 );
611 }
612
613 #[test]
616 fn master_clock_system_should_advance_from_base_pts() {
617 let clock = MasterClock::System {
618 started_at: Instant::now(),
619 base_pts: Duration::from_secs(5),
620 };
621 let pts = clock.current_pts();
622 assert!(
623 pts >= Duration::from_secs(5),
624 "pts must be >= base_pts; got {pts:?}"
625 );
626 assert!(
627 pts < Duration::from_secs(6),
628 "pts must not advance 1 s in a unit test; got {pts:?}"
629 );
630 assert!(clock.should_sync(), "System clock must always sync");
631 }
632
633 #[test]
634 fn master_clock_system_reset_should_update_base_and_time_reference() {
635 let mut clock = MasterClock::System {
636 started_at: Instant::now() - Duration::from_secs(10),
637 base_pts: Duration::ZERO,
638 };
639 assert!(
640 clock.current_pts() >= Duration::from_secs(9),
641 "clock should show ~10 s before reset"
642 );
643 clock.reset(Duration::from_secs(5));
644 let pts = clock.current_pts();
645 assert!(
646 pts >= Duration::from_secs(5),
647 "pts must be >= new base after reset; got {pts:?}"
648 );
649 assert!(
650 pts < Duration::from_secs(6),
651 "pts must not advance 1 s in a unit test after reset; got {pts:?}"
652 );
653 }
654
655 #[test]
656 fn master_clock_audio_should_not_sync_before_first_sample() {
657 let clock = MasterClock::Audio {
658 samples_consumed: Arc::new(AtomicU64::new(0)),
659 sample_rate: 48_000,
660 fallback: None,
661 };
662 assert!(
663 !clock.should_sync(),
664 "audio clock must not sync before any samples are consumed and before fallback is armed"
665 );
666 assert_eq!(
667 clock.current_pts(),
668 Duration::ZERO,
669 "audio clock PTS must be zero before any samples and before fallback is armed"
670 );
671 }
672
673 #[test]
674 fn master_clock_audio_should_sync_and_report_pts_after_samples_consumed() {
675 let consumed = Arc::new(AtomicU64::new(48_000));
676 let clock = MasterClock::Audio {
677 samples_consumed: Arc::clone(&consumed),
678 sample_rate: 48_000,
679 fallback: None,
680 };
681 assert!(
682 clock.should_sync(),
683 "audio clock must sync when samples > 0"
684 );
685 assert_eq!(
686 clock.current_pts(),
687 Duration::from_secs(1),
688 "48000 samples at 48000 Hz must equal 1 second"
689 );
690 }
691
692 #[test]
693 fn master_clock_audio_should_sync_after_fallback_activated() {
694 let mut clock = MasterClock::Audio {
695 samples_consumed: Arc::new(AtomicU64::new(0)),
696 sample_rate: 48_000,
697 fallback: None,
698 };
699 assert!(
700 !clock.should_sync(),
701 "must not sync before fallback is armed"
702 );
703 clock.activate_fallback_if_no_audio(Duration::from_secs(1));
704 assert!(
705 clock.should_sync(),
706 "must sync after fallback is activated even when samples_consumed == 0"
707 );
708 }
709
710 #[test]
711 fn master_clock_audio_fallback_current_pts_should_advance_from_base_pts() {
712 let mut clock = MasterClock::Audio {
713 samples_consumed: Arc::new(AtomicU64::new(0)),
714 sample_rate: 48_000,
715 fallback: None,
716 };
717 let base = Duration::from_secs(5);
718 clock.activate_fallback_if_no_audio(base);
719 let pts = clock.current_pts();
720 assert!(
721 pts >= base,
722 "fallback current_pts must be >= base_pts; got {pts:?}"
723 );
724 assert!(
725 pts < base + Duration::from_secs(1),
726 "fallback must not advance 1 s in a unit test; got {pts:?}"
727 );
728 }
729
730 #[test]
731 fn master_clock_audio_should_prefer_samples_over_fallback_when_consumer_starts() {
732 let consumed = Arc::new(AtomicU64::new(0));
733 let mut clock = MasterClock::Audio {
734 samples_consumed: Arc::clone(&consumed),
735 sample_rate: 48_000,
736 fallback: None,
737 };
738 clock.activate_fallback_if_no_audio(Duration::from_secs(2));
739 assert!(clock.should_sync(), "fallback must enable sync");
740 consumed.store(48_000, Ordering::Relaxed);
742 assert_eq!(
744 clock.current_pts(),
745 Duration::from_secs(1),
746 "48000 samples at 48 kHz must report 1 s even when fallback is also armed"
747 );
748 }
749
750 #[test]
751 fn master_clock_audio_activate_fallback_should_be_idempotent() {
752 let mut clock = MasterClock::Audio {
753 samples_consumed: Arc::new(AtomicU64::new(0)),
754 sample_rate: 48_000,
755 fallback: None,
756 };
757 clock.activate_fallback_if_no_audio(Duration::from_secs(1));
758 let pts1 = clock.current_pts();
759 thread::sleep(Duration::from_millis(5));
760 clock.activate_fallback_if_no_audio(Duration::from_secs(100));
762 let pts2 = clock.current_pts();
763 assert!(
764 pts2 > pts1,
765 "clock must keep advancing from the first base after second activate; \
766 pts1={pts1:?} pts2={pts2:?}"
767 );
768 assert!(
769 pts2 < Duration::from_secs(5),
770 "second activate must not reset clock to base=100 s; pts2={pts2:?}"
771 );
772 }
773
774 #[test]
775 fn master_clock_audio_reset_should_update_fallback_base_pts() {
776 let mut clock = MasterClock::Audio {
777 samples_consumed: Arc::new(AtomicU64::new(0)),
778 sample_rate: 48_000,
779 fallback: None,
780 };
781 clock.activate_fallback_if_no_audio(Duration::from_secs(5));
782 clock.reset(Duration::from_secs(10));
784 let pts = clock.current_pts();
785 assert!(
786 pts >= Duration::from_secs(10),
787 "after reset, fallback must advance from the new base_pts; got {pts:?}"
788 );
789 assert!(
790 pts < Duration::from_secs(11),
791 "fallback must not advance 1 s in a unit test after reset; got {pts:?}"
792 );
793 }
794
795 #[test]
796 fn master_clock_audio_reset_should_not_arm_fallback_if_not_yet_active() {
797 let mut clock = MasterClock::Audio {
798 samples_consumed: Arc::new(AtomicU64::new(0)),
799 sample_rate: 48_000,
800 fallback: None,
801 };
802 clock.reset(Duration::ZERO);
804 assert!(
805 !clock.should_sync(),
806 "reset() before activate_fallback_if_no_audio must not arm the fallback"
807 );
808 assert_eq!(
809 clock.current_pts(),
810 Duration::ZERO,
811 "PTS must remain ZERO when fallback is not yet armed"
812 );
813 }
814
815 #[test]
816 fn master_clock_system_activate_fallback_should_be_noop() {
817 let mut clock = MasterClock::System {
818 started_at: Instant::now(),
819 base_pts: Duration::ZERO,
820 };
821 clock.activate_fallback_if_no_audio(Duration::from_secs(99));
823 assert!(
824 clock.should_sync(),
825 "System clock must always sync regardless of activate_fallback_if_no_audio"
826 );
827 }
828}