1#![forbid(unsafe_code)]
2
3use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
7 },
8 time::Duration,
9};
10
11use bitflags::bitflags;
12use kithara_test_utils::kithara;
13
14#[derive(Debug, Clone, Copy)]
26pub struct ChunkPosition {
27 pub source_byte_offset: Option<u64>,
30 pub sample_rate: u32,
31 pub end_position_ns: u64,
38 pub frame_offset: u64,
40 pub frames: u64,
42 pub source_bytes: u64,
44}
45
46bitflags! {
47 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
53 pub struct TimelineFlags: u8 {
54 const PLAYING = 1 << 0;
56 const FLUSHING = 1 << 1;
58 const SEEK_PENDING = 1 << 2;
60 }
61}
62
63#[derive(Clone, Debug)]
72pub struct Timeline {
73 committed_frame_end: Arc<AtomicU64>,
79 committed_position_ns: Arc<AtomicU64>,
80 decoder_node_seek_latch: Arc<AtomicBool>,
86 flags: Arc<AtomicU8>,
88 last_sample_rate: Arc<AtomicU64>,
92
93 pending_seek_epoch: Arc<AtomicU64>,
94
95 seek_epoch: Arc<AtomicU64>,
96 seek_preempt_latch: Arc<AtomicBool>,
103 seek_target_ns: Arc<AtomicU64>,
104 segment_position: Arc<AtomicU64>,
109
110 total_duration_ns: Arc<AtomicU64>,
111}
112
113impl Timeline {
114 const NO_DURATION: u64 = u64::MAX;
115 const NO_PENDING_SEEK: u64 = u64::MAX;
116 const NO_SEEK_TARGET: u64 = u64::MAX;
117
118 #[must_use]
119 pub fn new() -> Self {
121 Self {
122 committed_position_ns: Arc::new(AtomicU64::new(0)),
123 committed_frame_end: Arc::new(AtomicU64::new(0)),
124 last_sample_rate: Arc::new(AtomicU64::new(0)),
125 pending_seek_epoch: Arc::new(AtomicU64::new(Self::NO_PENDING_SEEK)),
126 total_duration_ns: Arc::new(AtomicU64::new(Self::NO_DURATION)),
127 segment_position: Arc::new(AtomicU64::new(0)),
128 seek_epoch: Arc::new(AtomicU64::new(0)),
129 seek_target_ns: Arc::new(AtomicU64::new(Self::NO_SEEK_TARGET)),
130 seek_preempt_latch: Arc::new(AtomicBool::new(false)),
131 decoder_node_seek_latch: Arc::new(AtomicBool::new(false)),
132 flags: Arc::new(AtomicU8::new(TimelineFlags::empty().bits())),
133 }
134 }
135
136 pub fn advance_committed_chunk(&self, pos: &ChunkPosition) {
154 self.write_playhead(
155 pos,
156 pos.frame_offset.saturating_add(pos.frames),
157 pos.source_byte_offset
158 .map(|off| off.saturating_add(pos.source_bytes)),
159 );
160 }
161
162 pub fn clear_seek_pending(&self, epoch: u64) {
167 if self.seek_epoch.load(Ordering::Acquire) == epoch {
168 self.remove_flags_with(TimelineFlags::SEEK_PENDING, Ordering::Release);
169 }
170 }
171
172 pub fn commit_seek_landed(&self, pos: &ChunkPosition) {
180 self.write_playhead(pos, pos.frame_offset, pos.source_byte_offset);
181 }
182
183 #[must_use]
184 pub fn committed_position(&self) -> Duration {
185 Duration::from_nanos(self.committed_position_ns.load(Ordering::Acquire))
186 }
187
188 pub fn complete_seek(&self, epoch: u64) {
197 if self.seek_epoch.load(Ordering::SeqCst) != epoch {
198 return;
199 }
200 self.remove_flags_with(TimelineFlags::FLUSHING, Ordering::SeqCst);
202 if self.seek_epoch.load(Ordering::SeqCst) != epoch {
203 self.insert_flags_with(TimelineFlags::FLUSHING, Ordering::SeqCst);
204 }
205 }
206
207 #[inline]
208 fn contains_flag(&self, flag: TimelineFlags) -> bool {
209 self.flags_snapshot_with(Ordering::Acquire).contains(flag)
210 }
211
212 #[must_use]
213 pub fn did_clear_pending_seek_epoch(&self, seek_epoch: u64) -> bool {
214 self.pending_seek_epoch
215 .compare_exchange(
216 seek_epoch,
217 Self::NO_PENDING_SEEK,
218 Ordering::AcqRel,
219 Ordering::Acquire,
220 )
221 .is_ok()
222 }
223
224 #[must_use]
233 pub fn did_take_decoder_node_seek(&self) -> bool {
234 self.decoder_node_seek_latch.swap(false, Ordering::Acquire)
235 }
236
237 #[must_use]
246 pub fn did_take_seek_preempt(&self) -> bool {
247 self.seek_preempt_latch.swap(false, Ordering::Acquire)
248 }
249
250 #[inline]
251 fn flags_snapshot_with(&self, order: Ordering) -> TimelineFlags {
252 TimelineFlags::from_bits_truncate(self.flags.load(order))
253 }
254
255 #[must_use]
266 pub fn initiate_seek(&self, target: Duration) -> u64 {
267 let nanos = u64::try_from(target.as_nanos())
268 .expect("BUG: initiate_seek target.as_nanos() fits in u64 for any realistic Duration");
269 let epoch = self.seek_epoch.fetch_add(1, Ordering::SeqCst) + 1;
270 self.seek_target_ns.store(nanos, Ordering::Release);
271 self.insert_flags_with(TimelineFlags::SEEK_PENDING, Ordering::Release);
273 self.insert_flags_with(TimelineFlags::FLUSHING, Ordering::Release);
274 self.seek_preempt_latch.store(true, Ordering::Release);
275 self.decoder_node_seek_latch.store(true, Ordering::Release);
276 epoch
277 }
278
279 #[inline]
280 fn insert_flags_with(&self, flags: TimelineFlags, order: Ordering) {
281 self.flags.fetch_or(flags.bits(), order);
282 }
283
284 #[must_use]
286 pub fn is_flushing(&self) -> bool {
287 self.contains_flag(TimelineFlags::FLUSHING)
288 }
289
290 #[must_use]
298 pub fn is_playing(&self) -> bool {
299 self.contains_flag(TimelineFlags::PLAYING)
300 }
301
302 #[must_use]
308 pub fn is_seek_pending(&self) -> bool {
309 self.contains_flag(TimelineFlags::SEEK_PENDING)
310 }
311
312 pub fn mark_pending_seek_epoch(&self, seek_epoch: u64) {
313 self.pending_seek_epoch.store(seek_epoch, Ordering::Release);
314 }
315
316 #[must_use]
317 pub fn pending_seek_epoch(&self) -> Option<u64> {
318 let epoch = self.pending_seek_epoch.load(Ordering::Acquire);
319 if epoch == Self::NO_PENDING_SEEK {
320 None
321 } else {
322 Some(epoch)
323 }
324 }
325
326 #[inline]
327 fn remove_flags_with(&self, flags: TimelineFlags, order: Ordering) {
328 self.flags.fetch_and(!flags.bits(), order);
329 }
330
331 #[inline]
332 fn replace_flags(&self, flags: TimelineFlags, on: bool) {
333 if on {
334 self.insert_flags_with(flags, Ordering::Release);
335 } else {
336 self.remove_flags_with(flags, Ordering::Release);
337 }
338 }
339
340 #[must_use]
342 pub fn seek_epoch(&self) -> u64 {
343 self.seek_epoch.load(Ordering::Acquire)
344 }
345
346 #[must_use]
348 pub fn seek_epoch_handle(&self) -> Arc<AtomicU64> {
349 Arc::clone(&self.seek_epoch)
350 }
351
352 #[must_use]
354 pub fn seek_target(&self) -> Option<Duration> {
355 let ns = self.seek_target_ns.load(Ordering::Acquire);
356 if ns == Self::NO_SEEK_TARGET {
357 None
358 } else {
359 Some(Duration::from_nanos(ns))
360 }
361 }
362
363 pub fn set_committed_position(&self, position: Duration) {
367 let nanos = u64::try_from(position.as_nanos())
368 .expect("BUG: position.as_nanos() fits in u64 for any realistic playback time");
369 self.committed_position_ns.store(nanos, Ordering::Release);
370 }
371
372 #[kithara::probe(position)]
376 pub fn set_download_position(&self, position: u64) {
377 let _ = position;
378 }
379
380 pub fn set_playing(&self, playing: bool) {
385 self.replace_flags(TimelineFlags::PLAYING, playing);
386 }
387
388 #[kithara::probe(position)]
389 pub fn set_segment_position(&self, position: u64) {
390 self.segment_position.store(position, Ordering::Release);
391 }
392
393 pub fn set_total_duration(&self, duration: Option<Duration>) {
394 let nanos = duration
395 .and_then(|value| u64::try_from(value.as_nanos()).ok())
396 .unwrap_or(Self::NO_DURATION);
397 self.total_duration_ns.store(nanos, Ordering::Release);
398 }
399
400 #[must_use]
401 pub fn total_duration(&self) -> Option<Duration> {
402 let nanos = self.total_duration_ns.load(Ordering::Acquire);
403 if nanos == Self::NO_DURATION {
404 None
405 } else {
406 Some(Duration::from_nanos(nanos))
407 }
408 }
409
410 #[kithara::probe(committed_ns = pos.end_position_ns, end_frame)]
411 fn write_playhead(&self, pos: &ChunkPosition, end_frame: u64, _source_byte_end: Option<u64>) {
412 let sr = u64::from(pos.sample_rate);
413 if sr == 0 {
414 return;
415 }
416 let duration_ns = self.total_duration_ns.load(Ordering::Acquire);
417 let cap = if duration_ns == Self::NO_DURATION {
418 u64::MAX
419 } else {
420 duration_ns
421 };
422 self.committed_position_ns
423 .store(pos.end_position_ns.min(cap), Ordering::Release);
424 self.committed_frame_end.store(end_frame, Ordering::Release);
425 self.last_sample_rate.store(sr, Ordering::Release);
426 }
427}
428
429impl Default for Timeline {
430 fn default() -> Self {
431 Self::new()
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use std::{
438 sync::{Arc, Barrier},
439 thread,
440 };
441
442 use kithara_test_utils::kithara;
443
444 use super::*;
445
446 #[kithara::test]
447 fn initiate_seek_sets_flushing_and_target() {
448 let tl = Timeline::new();
449 assert!(!tl.is_flushing());
450 assert!(tl.seek_target().is_none());
451 let initial_committed = tl.committed_position();
452
453 let epoch = tl.initiate_seek(Duration::from_secs(10));
454 assert_eq!(epoch, 1);
455 assert!(tl.is_flushing());
456 assert_eq!(tl.seek_target(), Some(Duration::from_secs(10)));
457 assert_eq!(tl.seek_epoch(), 1);
458 assert_eq!(tl.committed_position(), initial_committed);
459 }
460
461 #[kithara::test]
462 fn complete_seek_clears_flushing() {
463 let tl = Timeline::new();
464 let epoch = tl.initiate_seek(Duration::from_secs(5));
465 tl.complete_seek(epoch);
466 assert!(!tl.is_flushing());
467 assert_eq!(tl.seek_target(), Some(Duration::from_secs(5)));
468 }
469
470 #[kithara::test]
471 fn complete_seek_ignores_stale_epoch() {
472 let tl = Timeline::new();
473 let epoch1 = tl.initiate_seek(Duration::from_secs(5));
474 let epoch2 = tl.initiate_seek(Duration::from_secs(10));
475 tl.complete_seek(epoch1);
476 assert!(tl.is_flushing());
477 assert_eq!(tl.seek_target(), Some(Duration::from_secs(10)));
478 tl.complete_seek(epoch2);
479 assert!(!tl.is_flushing());
480 }
481
482 #[kithara::test]
483 fn seek_epoch_monotonically_increases() {
484 let tl = Timeline::new();
485 let e1 = tl.initiate_seek(Duration::from_secs(1));
486 let e2 = tl.initiate_seek(Duration::from_secs(2));
487 let e3 = tl.initiate_seek(Duration::from_secs(3));
488 assert_eq!(e1, 1);
489 assert_eq!(e2, 2);
490 assert_eq!(e3, 3);
491 assert_eq!(tl.seek_target(), Some(Duration::from_secs(3)));
492 }
493
494 #[kithara::test]
495 fn complete_seek_does_not_clobber_concurrent_target() {
496 let tl = Timeline::new();
497 let epoch1 = tl.initiate_seek(Duration::from_secs(5));
498 let _epoch2 = tl.initiate_seek(Duration::from_secs(15));
499 tl.complete_seek(epoch1);
500 assert!(tl.is_flushing());
501 assert_eq!(tl.seek_target(), Some(Duration::from_secs(15)));
502 }
503
504 #[kithara::test]
505 fn initiate_seek_is_visible_across_clones() {
506 let tl = Timeline::new();
507 let clone = tl.clone();
508 let _ = tl.initiate_seek(Duration::from_secs(7));
509 assert!(clone.is_flushing());
510 assert_eq!(clone.seek_target(), Some(Duration::from_secs(7)));
511 }
512
513 #[kithara::test]
514 fn initiate_seek_sets_seek_pending() {
515 let tl = Timeline::new();
516 assert!(!tl.is_seek_pending());
517 let _epoch = tl.initiate_seek(Duration::from_secs(5));
518 assert!(tl.is_seek_pending());
519 }
520
521 #[kithara::test]
522 fn clear_seek_pending_only_clears_matching_epoch() {
523 let tl = Timeline::new();
524 let epoch1 = tl.initiate_seek(Duration::from_secs(5));
525 let epoch2 = tl.initiate_seek(Duration::from_secs(10));
526 tl.clear_seek_pending(epoch1);
527 assert!(tl.is_seek_pending());
528 tl.clear_seek_pending(epoch2);
529 assert!(!tl.is_seek_pending());
530 }
531
532 #[kithara::test]
533 fn new_initiate_seek_resets_seek_pending() {
534 let tl = Timeline::new();
535 let epoch = tl.initiate_seek(Duration::from_secs(5));
536 tl.clear_seek_pending(epoch);
537 assert!(!tl.is_seek_pending());
538 let _epoch2 = tl.initiate_seek(Duration::from_secs(10));
539 assert!(tl.is_seek_pending());
540 }
541
542 #[kithara::test]
543 fn complete_seek_does_not_clear_seek_pending() {
544 let tl = Timeline::new();
545 let epoch = tl.initiate_seek(Duration::from_secs(5));
546 tl.complete_seek(epoch);
547 assert!(!tl.is_flushing());
548 assert!(tl.is_seek_pending());
549 }
550
551 #[kithara::test]
552 fn is_seek_pending_visible_across_clones() {
553 let tl = Timeline::new();
554 let clone = tl.clone();
555 let _epoch = tl.initiate_seek(Duration::from_secs(5));
556 assert!(clone.is_seek_pending());
557 }
558
559 #[kithara::test]
560 fn flag_pair_matrix_matches_bitflags_snapshot() {
561 for mask in 0u8..4 {
562 let tl = Timeline::new();
563 let want_flushing = mask & 1 != 0;
564 let want_seek_pending = mask & 2 != 0;
565
566 if want_flushing || want_seek_pending {
567 let _ = tl.initiate_seek(Duration::from_secs(1));
568 if !want_flushing {
569 tl.complete_seek(tl.seek_epoch());
570 }
571 if !want_seek_pending {
572 tl.clear_seek_pending(tl.seek_epoch());
573 }
574 }
575
576 assert_eq!(tl.is_flushing(), want_flushing, "mask {mask:#04b} flushing");
577 assert_eq!(
578 tl.is_seek_pending(),
579 want_seek_pending,
580 "mask {mask:#04b} seek_pending"
581 );
582
583 let snapshot = tl.flags_snapshot_with(Ordering::Acquire);
584 assert_eq!(
585 snapshot.contains(TimelineFlags::FLUSHING),
586 want_flushing,
587 "mask {mask:#04b} snapshot flushing"
588 );
589 assert_eq!(
590 snapshot.contains(TimelineFlags::SEEK_PENDING),
591 want_seek_pending,
592 "mask {mask:#04b} snapshot seek_pending"
593 );
594 }
595 }
596
597 #[kithara::test]
598 fn complete_seek_double_check_re_raises_flushing_when_newer_seek_interleaves() {
599 let tl = Timeline::new();
600 let epoch1 = tl.initiate_seek(Duration::from_secs(1));
601
602 tl.remove_flags_with(TimelineFlags::FLUSHING, Ordering::SeqCst);
603 let _epoch2 = tl.initiate_seek(Duration::from_secs(2));
604 tl.complete_seek(epoch1);
605
606 assert!(
607 tl.is_flushing(),
608 "FLUSHING must be re-raised when a newer seek interleaves mid-complete"
609 );
610 }
611
612 #[kithara::test]
613 fn concurrent_flag_toggles_preserve_independent_semantics() {
614 const ITER: usize = 50_000;
615
616 let tl = Timeline::new();
617 let barrier = Arc::new(Barrier::new(3));
618
619 let tl_a = tl.clone();
620 let barrier_a = Arc::clone(&barrier);
621 let a = thread::spawn(move || {
622 barrier_a.wait();
623 for i in 0..ITER {
624 tl_a.set_playing(i % 2 == 0);
625 }
626 });
627
628 let tl_b = tl.clone();
629 let barrier_b = Arc::clone(&barrier);
630 let b = thread::spawn(move || {
631 barrier_b.wait();
632 for _ in 0..ITER {
633 let epoch = tl_b.initiate_seek(Duration::from_millis(1));
634 tl_b.clear_seek_pending(epoch);
635 tl_b.complete_seek(epoch);
636 }
637 });
638
639 let tl_c = tl.clone();
640 let barrier_c = Arc::clone(&barrier);
641 let c = thread::spawn(move || {
642 barrier_c.wait();
643 let mut observed = 0u64;
644 for _ in 0..ITER {
645 let snap = tl_c.flags_snapshot_with(Ordering::Acquire);
646 observed ^= u64::from(snap.bits());
647 }
648 observed
649 });
650
651 a.join()
652 .expect("BUG: spawned thread A must not panic in this test");
653 b.join()
654 .expect("BUG: spawned thread B must not panic in this test");
655 let _ = c
656 .join()
657 .expect("BUG: spawned thread C must not panic in this test");
658
659 assert!(
660 !tl.is_playing(),
661 "PLAYING must match the last deterministic write"
662 );
663 assert!(!tl.is_flushing(), "FLUSHING must be fully cleared");
664 assert!(
665 !tl.is_seek_pending(),
666 "SEEK_PENDING must be fully cleared after last clear"
667 );
668 }
669
670 #[kithara::test]
671 fn playing_defaults_to_false() {
672 let tl = Timeline::new();
673 assert!(!tl.is_playing());
674 }
675
676 #[kithara::test]
677 fn set_playing_true_is_visible_across_clones() {
678 let tl = Timeline::new();
679 let clone = tl.clone();
680 tl.set_playing(true);
681 assert!(clone.is_playing());
682 clone.set_playing(false);
683 assert!(!tl.is_playing());
684 }
685
686 #[kithara::test]
687 fn set_playing_idempotent() {
688 let tl = Timeline::new();
689 tl.set_playing(true);
690 tl.set_playing(true);
691 assert!(tl.is_playing());
692 tl.set_playing(false);
693 tl.set_playing(false);
694 assert!(!tl.is_playing());
695 }
696
697 #[kithara::test]
698 fn playing_is_orthogonal_to_other_flags() {
699 for mask in 0u8..4 {
700 for &initial_playing in &[false, true] {
701 let tl = Timeline::new();
702 let want_flushing = mask & 1 != 0;
703 let want_seek_pending = mask & 2 != 0;
704
705 if want_flushing || want_seek_pending {
706 let _ = tl.initiate_seek(Duration::from_secs(1));
707 if !want_flushing {
708 tl.complete_seek(tl.seek_epoch());
709 }
710 if !want_seek_pending {
711 tl.clear_seek_pending(tl.seek_epoch());
712 }
713 }
714 tl.set_playing(initial_playing);
715
716 assert_eq!(tl.is_playing(), initial_playing);
717 assert_eq!(
718 tl.is_flushing(),
719 want_flushing,
720 "mask {mask:#04b} play={initial_playing} flushing"
721 );
722 assert_eq!(
723 tl.is_seek_pending(),
724 want_seek_pending,
725 "mask {mask:#04b} play={initial_playing} seek_pending"
726 );
727
728 tl.set_playing(!initial_playing);
729 assert_eq!(tl.is_playing(), !initial_playing);
730 assert_eq!(tl.is_flushing(), want_flushing);
731 assert_eq!(tl.is_seek_pending(), want_seek_pending);
732 }
733 }
734 }
735
736 #[kithara::test]
737 fn initiate_seek_does_not_touch_playing() {
738 let tl = Timeline::new();
739 tl.set_playing(true);
740 let epoch = tl.initiate_seek(Duration::from_secs(5));
741 assert!(tl.is_playing(), "PLAYING must not be affected by seek");
742 tl.complete_seek(epoch);
743 assert!(tl.is_playing(), "PLAYING must survive complete_seek");
744 tl.clear_seek_pending(epoch);
745 assert!(tl.is_playing(), "PLAYING must survive clear_seek_pending");
746 }
747}