Skip to main content

kithara_stream/
timeline.rs

1#![forbid(unsafe_code)]
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
7    },
8    time::Duration,
9};
10
11use bitflags::bitflags;
12use kithara_test_utils::kithara;
13
14/// Decoder-reported chunk position used to advance the timeline.
15///
16/// This struct is the kithara-stream-local mirror of the fields
17/// [`Timeline::advance_committed_chunk`] needs from a decoder's
18/// per-chunk metadata. It exists because `PcmMeta` lives in
19/// `kithara-decode` (which depends on `kithara-stream`); a tiny mirror
20/// avoids the circular dep without forcing decoders to fragment their
21/// existing meta type.
22///
23/// Decoder backends fill it from their own meta — see
24/// `From<&PcmMeta> for ChunkPosition` in `kithara-decode`.
25#[derive(Debug, Clone, Copy)]
26pub struct ChunkPosition {
27    /// Absolute byte offset of the chunk's source data when the
28    /// decoder reports it (Apple `mStartOffset`, Android API 28+).
29    pub source_byte_offset: Option<u64>,
30    pub sample_rate: u32,
31    /// Decoder-reported wall-clock position **after** the chunk has
32    /// been emitted (or, for [`Timeline::commit_seek_landed`], the
33    /// landed position). Authoritative — derived from the decoder's
34    /// own frame counter inside its own arithmetic, so the timeline
35    /// never recomputes `frames * 1e9 / sample_rate`. Always strictly
36    /// greater than (or equal to, for seek landings) the chunk start.
37    pub end_position_ns: u64,
38    /// Absolute frame offset of the *first* frame in the chunk.
39    pub frame_offset: u64,
40    /// Number of frames the chunk covers.
41    pub frames: u64,
42    /// Source bytes the chunk decoded from (decoder ground truth).
43    pub source_bytes: u64,
44}
45
46bitflags! {
47    /// Boolean playback-state flags stored in a single `AtomicU8` on [`Timeline`].
48    ///
49    /// Consolidated into one atomic so readers (HLS peer priority, reader
50    /// wait loops, audio FSM) observe a coherent snapshot with a single
51    /// load and writers compose flag updates with `fetch_or` / `fetch_and`.
52    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
53    pub struct TimelineFlags: u8 {
54        /// Reserved for the audio FSM playback-activity writer (Task 4+).
55        const PLAYING      = 1 << 0;
56        /// Pipeline is being flushed (seek in progress); gates `wait_range` I/O.
57        const FLUSHING     = 1 << 1;
58        /// Seek initiated but the decoder has not yet repositioned.
59        const SEEK_PENDING = 1 << 2;
60    }
61}
62
63/// Shared playback timeline used across stream layers.
64///
65/// Stores canonical committed playback position. The byte cursor lives
66/// on the [`Source`](crate::Source) — sources own per-variant or
67/// per-file atomic cursors and expose them through
68/// [`Source::position`](crate::Source::position) /
69/// [`Source::advance`](crate::Source::advance) /
70/// [`Source::set_position`](crate::Source::set_position).
71#[derive(Clone, Debug)]
72pub struct Timeline {
73    /// Frame end (exclusive) of the last consumed slice — the consumer's
74    /// playhead in frame space. Single source of truth for "where is the
75    /// consumer in the stream"; both `committed_position_ns` (UI) and
76    /// the per-chunk consumption offset (`Audio::read`) are derived
77    /// from it. Decoder-driven via [`Self::advance_committed_to`].
78    committed_frame_end: Arc<AtomicU64>,
79    committed_position_ns: Arc<AtomicU64>,
80    /// Independent latch for `DecoderNode::sync_seek_epoch`: the
81    /// preempt latch above is destructively consumed inside
82    /// `StreamAudioSource`, so the wrapping decoder node — which has to
83    /// reset its preload counters / drop parked chunks on each new
84    /// epoch — needs its own one-shot signal. `initiate_seek` arms both.
85    decoder_node_seek_latch: Arc<AtomicBool>,
86    /// Consolidated boolean state: `FLUSHING`, `SEEK_PENDING`, `PLAYING`.
87    flags: Arc<AtomicU8>,
88    /// Sample rate (Hz) of the most recently committed chunk; lets
89    /// readers convert `committed_frame_end` ↔ `committed_position`
90    /// without external state.
91    last_sample_rate: Arc<AtomicU64>,
92
93    pending_seek_epoch: Arc<AtomicU64>,
94
95    seek_epoch: Arc<AtomicU64>,
96    /// Hot-path latch the audio worker reads on every `step_track` to
97    /// skip the multi-condition seek-preempt guard. Set by
98    /// `initiate_seek` once per seek (Release after `seek_epoch`/
99    /// `seek_target_ns` updates), consumed by the worker's
100    /// `swap(false, Acquire)`. A single bool load replaces two
101    /// `Arc<AtomicU64>` Acquire loads on the typical no-seek tick.
102    seek_preempt_latch: Arc<AtomicBool>,
103    seek_target_ns: Arc<AtomicU64>,
104    /// Byte offset at the start of the most recent `Stream::read()` call.
105    /// Used by `StreamContext::segment_index()` to resolve which segment
106    /// the last-read data belongs to — `byte_position` has already advanced
107    /// past the data boundary by the time the decoder queries metadata.
108    segment_position: Arc<AtomicU64>,
109
110    total_duration_ns: Arc<AtomicU64>,
111}
112
113impl Timeline {
114    const NO_DURATION: u64 = u64::MAX;
115    const NO_PENDING_SEEK: u64 = u64::MAX;
116    const NO_SEEK_TARGET: u64 = u64::MAX;
117
118    #[must_use]
119    // ast-grep-ignore: style.prefer-default-derive
120    pub fn new() -> Self {
121        Self {
122            committed_position_ns: Arc::new(AtomicU64::new(0)),
123            committed_frame_end: Arc::new(AtomicU64::new(0)),
124            last_sample_rate: Arc::new(AtomicU64::new(0)),
125            pending_seek_epoch: Arc::new(AtomicU64::new(Self::NO_PENDING_SEEK)),
126            total_duration_ns: Arc::new(AtomicU64::new(Self::NO_DURATION)),
127            segment_position: Arc::new(AtomicU64::new(0)),
128            seek_epoch: Arc::new(AtomicU64::new(0)),
129            seek_target_ns: Arc::new(AtomicU64::new(Self::NO_SEEK_TARGET)),
130            seek_preempt_latch: Arc::new(AtomicBool::new(false)),
131            decoder_node_seek_latch: Arc::new(AtomicBool::new(false)),
132            flags: Arc::new(AtomicU8::new(TimelineFlags::empty().bits())),
133        }
134    }
135
136    /// Advance the consumer's playhead to the end of the consumed
137    /// region described by `pos`. `pos.frame_offset + pos.frames`
138    /// must equal the absolute frame the consumer has now finished
139    /// playing through; the decoder owns these numbers, callers do
140    /// not invent them.
141    ///
142    /// `committed_position_ns` (UI) is derived from the new playhead
143    /// frame divided by `pos.sample_rate`. `byte_position` is set
144    /// from `pos.source_byte_offset + pos.source_bytes` when the
145    /// decoder reports absolute offsets (Apple, Android API 28+);
146    /// otherwise it is left untouched so the producer-side cursor
147    /// (`Stream::try_read` / `Stream::seek`) continues to drive it.
148    ///
149    /// Validates against `total_duration` in dev/test builds: a chunk
150    /// pushing the playhead past the declared duration is a real
151    /// arithmetic bug — the decoder's frame counter disagrees with
152    /// `total_duration`, somebody is wrong.
153    pub fn advance_committed_chunk(&self, pos: &ChunkPosition) {
154        self.write_playhead(
155            pos,
156            pos.frame_offset.saturating_add(pos.frames),
157            pos.source_byte_offset
158                .map(|off| off.saturating_add(pos.source_bytes)),
159        );
160    }
161
162    /// Clear seek-pending flag after the decoder successfully applied the seek.
163    ///
164    /// Only clears if `epoch` matches the current seek epoch, preventing a
165    /// stale completion from clearing a newer seek.
166    pub fn clear_seek_pending(&self, epoch: u64) {
167        if self.seek_epoch.load(Ordering::Acquire) == epoch {
168            self.remove_flags_with(TimelineFlags::SEEK_PENDING, Ordering::Release);
169        }
170    }
171
172    /// Pin the playhead to the decoder's actual landing frame after a
173    /// seek. Called by the worker once `decoder.seek` returns
174    /// [`DecoderSeekOutcome::Landed`] — the only authoritative source
175    /// for "where did we actually end up". `pos.frame_offset` carries
176    /// the landed frame; `pos.frames` should be `0` (we have not yet
177    /// consumed any chunk, just repositioned). `pos.source_byte_offset`
178    /// (if known) is the byte offset the decoder is now reading from.
179    pub fn commit_seek_landed(&self, pos: &ChunkPosition) {
180        self.write_playhead(pos, pos.frame_offset, pos.source_byte_offset);
181    }
182
183    #[must_use]
184    pub fn committed_position(&self) -> Duration {
185        Duration::from_nanos(self.committed_position_ns.load(Ordering::Acquire))
186    }
187
188    /// Complete a seek (`FLUSH_STOP`).
189    ///
190    /// Clears flushing flag only if `epoch` is still current.
191    /// A superseding `initiate_seek` will have incremented the epoch,
192    /// preventing an older completion from clearing the new seek.
193    ///
194    /// Uses a double-check to guard against the race where a new
195    /// `initiate_seek` fires between our epoch load and flushing store.
196    pub fn complete_seek(&self, epoch: u64) {
197        if self.seek_epoch.load(Ordering::SeqCst) != epoch {
198            return;
199        }
200        // NOTE: we do NOT clear seek_target_ns here.
201        self.remove_flags_with(TimelineFlags::FLUSHING, Ordering::SeqCst);
202        if self.seek_epoch.load(Ordering::SeqCst) != epoch {
203            self.insert_flags_with(TimelineFlags::FLUSHING, Ordering::SeqCst);
204        }
205    }
206
207    #[inline]
208    fn contains_flag(&self, flag: TimelineFlags) -> bool {
209        self.flags_snapshot_with(Ordering::Acquire).contains(flag)
210    }
211
212    #[must_use]
213    pub fn did_clear_pending_seek_epoch(&self, seek_epoch: u64) -> bool {
214        self.pending_seek_epoch
215            .compare_exchange(
216                seek_epoch,
217                Self::NO_PENDING_SEEK,
218                Ordering::AcqRel,
219                Ordering::Acquire,
220            )
221            .is_ok()
222    }
223
224    /// Consume the decoder-node seek latch with an Acquire swap.
225    ///
226    /// Independent from `take_seek_preempt`: the inner audio source
227    /// consumes that one inside `step_track`, while `DecoderNode` (the
228    /// wrapping scheduler node) needs its own signal so it can reset
229    /// preload state and drop parked chunks on a new epoch. `true`
230    /// here means `seek_epoch` was just bumped and the node must run
231    /// the cleanup branch; otherwise the tick falls through.
232    #[must_use]
233    pub fn did_take_decoder_node_seek(&self) -> bool {
234        self.decoder_node_seek_latch.swap(false, Ordering::Acquire)
235    }
236
237    /// Consume the seek-preempt latch with an Acquire swap.
238    ///
239    /// Returns `true` exactly once per `initiate_seek` call: the worker
240    /// uses this to short-circuit `step_track`'s preempt guard without
241    /// dereferencing two `Arc<AtomicU64>`s. The Acquire ordering
242    /// synchronises with the Release in `initiate_seek` so observing
243    /// `true` here means the new `seek_epoch` and `seek_target_ns`
244    /// stores are also visible.
245    #[must_use]
246    pub fn did_take_seek_preempt(&self) -> bool {
247        self.seek_preempt_latch.swap(false, Ordering::Acquire)
248    }
249
250    #[inline]
251    fn flags_snapshot_with(&self, order: Ordering) -> TimelineFlags {
252        TimelineFlags::from_bits_truncate(self.flags.load(order))
253    }
254
255    /// Initiate a seek (`FLUSH_START`).
256    ///
257    /// Sets flushing flag, records target position, increments epoch.
258    /// All blocking reads (`wait_range`) will observe `is_flushing()` and abort.
259    ///
260    /// Returns the new seek epoch.
261    ///
262    /// # Panics
263    /// Panics if `target` overflows `u64::MAX` nanoseconds (≈584 years —
264    /// not reachable for any realistic seek target).
265    #[must_use]
266    pub fn initiate_seek(&self, target: Duration) -> u64 {
267        let nanos = u64::try_from(target.as_nanos())
268            .expect("BUG: initiate_seek target.as_nanos() fits in u64 for any realistic Duration");
269        let epoch = self.seek_epoch.fetch_add(1, Ordering::SeqCst) + 1;
270        self.seek_target_ns.store(nanos, Ordering::Release);
271        // NOTE: do NOT pre-set `committed_position` to `target` here.
272        self.insert_flags_with(TimelineFlags::SEEK_PENDING, Ordering::Release);
273        self.insert_flags_with(TimelineFlags::FLUSHING, Ordering::Release);
274        self.seek_preempt_latch.store(true, Ordering::Release);
275        self.decoder_node_seek_latch.store(true, Ordering::Release);
276        epoch
277    }
278
279    #[inline]
280    fn insert_flags_with(&self, flags: TimelineFlags, order: Ordering) {
281        self.flags.fetch_or(flags.bits(), order);
282    }
283
284    /// Check if the pipeline is being flushed (seek pending).
285    #[must_use]
286    pub fn is_flushing(&self) -> bool {
287        self.contains_flag(TimelineFlags::FLUSHING)
288    }
289
290    /// Whether the audio FSM has claimed this Timeline as the currently
291    /// active decode target.
292    ///
293    /// Written by the audio pipeline (`StreamAudioSource`) on entry into
294    /// a decode-producing state and cleared on EOF / failure / unload.
295    /// Read by the Downloader peer implementations to decide whether a
296    /// track's fetches should be routed to the high-priority slot.
297    #[must_use]
298    pub fn is_playing(&self) -> bool {
299        self.contains_flag(TimelineFlags::PLAYING)
300    }
301
302    /// Check if a seek has been initiated but not yet applied by the decoder.
303    ///
304    /// Unlike `is_flushing()` (which gates I/O via `wait_range`), this flag
305    /// stays set until the decoder successfully repositions. Used by the worker
306    /// loop to trigger seek retry.
307    #[must_use]
308    pub fn is_seek_pending(&self) -> bool {
309        self.contains_flag(TimelineFlags::SEEK_PENDING)
310    }
311
312    pub fn mark_pending_seek_epoch(&self, seek_epoch: u64) {
313        self.pending_seek_epoch.store(seek_epoch, Ordering::Release);
314    }
315
316    #[must_use]
317    pub fn pending_seek_epoch(&self) -> Option<u64> {
318        let epoch = self.pending_seek_epoch.load(Ordering::Acquire);
319        if epoch == Self::NO_PENDING_SEEK {
320            None
321        } else {
322            Some(epoch)
323        }
324    }
325
326    #[inline]
327    fn remove_flags_with(&self, flags: TimelineFlags, order: Ordering) {
328        self.flags.fetch_and(!flags.bits(), order);
329    }
330
331    #[inline]
332    fn replace_flags(&self, flags: TimelineFlags, on: bool) {
333        if on {
334            self.insert_flags_with(flags, Ordering::Release);
335        } else {
336            self.remove_flags_with(flags, Ordering::Release);
337        }
338    }
339
340    /// Read the current seek epoch.
341    #[must_use]
342    pub fn seek_epoch(&self) -> u64 {
343        self.seek_epoch.load(Ordering::Acquire)
344    }
345
346    /// Cheap clone of the shared atomic seek epoch — same use case.
347    #[must_use]
348    pub fn seek_epoch_handle(&self) -> Arc<AtomicU64> {
349        Arc::clone(&self.seek_epoch)
350    }
351
352    /// Read the pending seek target position.
353    #[must_use]
354    pub fn seek_target(&self) -> Option<Duration> {
355        let ns = self.seek_target_ns.load(Ordering::Acquire);
356        if ns == Self::NO_SEEK_TARGET {
357            None
358        } else {
359            Some(Duration::from_nanos(ns))
360        }
361    }
362
363    /// # Panics
364    /// Panics if `position` overflows `u64::MAX` nanoseconds (≈584 years);
365    /// no realistic media stream can hit this.
366    pub fn set_committed_position(&self, position: Duration) {
367        let nanos = u64::try_from(position.as_nanos())
368            .expect("BUG: position.as_nanos() fits in u64 for any realistic playback time");
369        self.committed_position_ns.store(nanos, Ordering::Release);
370    }
371
372    /// Report the current download byte position. The value is not
373    /// stored on the timeline — it exists only as a USDT probe point
374    /// (`#[kithara::probe]`) for download-progress observability.
375    #[kithara::probe(position)]
376    pub fn set_download_position(&self, position: u64) {
377        let _ = position;
378    }
379
380    /// Toggle the `PLAYING` flag.
381    ///
382    /// Orthogonal to `FLUSHING` / `SEEK_PENDING`: toggling `PLAYING`
383    /// does not affect the seek state.
384    pub fn set_playing(&self, playing: bool) {
385        self.replace_flags(TimelineFlags::PLAYING, playing);
386    }
387
388    #[kithara::probe(position)]
389    pub fn set_segment_position(&self, position: u64) {
390        self.segment_position.store(position, Ordering::Release);
391    }
392
393    pub fn set_total_duration(&self, duration: Option<Duration>) {
394        let nanos = duration
395            .and_then(|value| u64::try_from(value.as_nanos()).ok())
396            .unwrap_or(Self::NO_DURATION);
397        self.total_duration_ns.store(nanos, Ordering::Release);
398    }
399
400    #[must_use]
401    pub fn total_duration(&self) -> Option<Duration> {
402        let nanos = self.total_duration_ns.load(Ordering::Acquire);
403        if nanos == Self::NO_DURATION {
404            None
405        } else {
406            Some(Duration::from_nanos(nanos))
407        }
408    }
409
410    #[kithara::probe(committed_ns = pos.end_position_ns, end_frame)]
411    fn write_playhead(&self, pos: &ChunkPosition, end_frame: u64, _source_byte_end: Option<u64>) {
412        let sr = u64::from(pos.sample_rate);
413        if sr == 0 {
414            return;
415        }
416        let duration_ns = self.total_duration_ns.load(Ordering::Acquire);
417        let cap = if duration_ns == Self::NO_DURATION {
418            u64::MAX
419        } else {
420            duration_ns
421        };
422        self.committed_position_ns
423            .store(pos.end_position_ns.min(cap), Ordering::Release);
424        self.committed_frame_end.store(end_frame, Ordering::Release);
425        self.last_sample_rate.store(sr, Ordering::Release);
426    }
427}
428
429impl Default for Timeline {
430    fn default() -> Self {
431        Self::new()
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use std::{
438        sync::{Arc, Barrier},
439        thread,
440    };
441
442    use kithara_test_utils::kithara;
443
444    use super::*;
445
446    #[kithara::test]
447    fn initiate_seek_sets_flushing_and_target() {
448        let tl = Timeline::new();
449        assert!(!tl.is_flushing());
450        assert!(tl.seek_target().is_none());
451        let initial_committed = tl.committed_position();
452
453        let epoch = tl.initiate_seek(Duration::from_secs(10));
454        assert_eq!(epoch, 1);
455        assert!(tl.is_flushing());
456        assert_eq!(tl.seek_target(), Some(Duration::from_secs(10)));
457        assert_eq!(tl.seek_epoch(), 1);
458        assert_eq!(tl.committed_position(), initial_committed);
459    }
460
461    #[kithara::test]
462    fn complete_seek_clears_flushing() {
463        let tl = Timeline::new();
464        let epoch = tl.initiate_seek(Duration::from_secs(5));
465        tl.complete_seek(epoch);
466        assert!(!tl.is_flushing());
467        assert_eq!(tl.seek_target(), Some(Duration::from_secs(5)));
468    }
469
470    #[kithara::test]
471    fn complete_seek_ignores_stale_epoch() {
472        let tl = Timeline::new();
473        let epoch1 = tl.initiate_seek(Duration::from_secs(5));
474        let epoch2 = tl.initiate_seek(Duration::from_secs(10));
475        tl.complete_seek(epoch1);
476        assert!(tl.is_flushing());
477        assert_eq!(tl.seek_target(), Some(Duration::from_secs(10)));
478        tl.complete_seek(epoch2);
479        assert!(!tl.is_flushing());
480    }
481
482    #[kithara::test]
483    fn seek_epoch_monotonically_increases() {
484        let tl = Timeline::new();
485        let e1 = tl.initiate_seek(Duration::from_secs(1));
486        let e2 = tl.initiate_seek(Duration::from_secs(2));
487        let e3 = tl.initiate_seek(Duration::from_secs(3));
488        assert_eq!(e1, 1);
489        assert_eq!(e2, 2);
490        assert_eq!(e3, 3);
491        assert_eq!(tl.seek_target(), Some(Duration::from_secs(3)));
492    }
493
494    #[kithara::test]
495    fn complete_seek_does_not_clobber_concurrent_target() {
496        let tl = Timeline::new();
497        let epoch1 = tl.initiate_seek(Duration::from_secs(5));
498        let _epoch2 = tl.initiate_seek(Duration::from_secs(15));
499        tl.complete_seek(epoch1);
500        assert!(tl.is_flushing());
501        assert_eq!(tl.seek_target(), Some(Duration::from_secs(15)));
502    }
503
504    #[kithara::test]
505    fn initiate_seek_is_visible_across_clones() {
506        let tl = Timeline::new();
507        let clone = tl.clone();
508        let _ = tl.initiate_seek(Duration::from_secs(7));
509        assert!(clone.is_flushing());
510        assert_eq!(clone.seek_target(), Some(Duration::from_secs(7)));
511    }
512
513    #[kithara::test]
514    fn initiate_seek_sets_seek_pending() {
515        let tl = Timeline::new();
516        assert!(!tl.is_seek_pending());
517        let _epoch = tl.initiate_seek(Duration::from_secs(5));
518        assert!(tl.is_seek_pending());
519    }
520
521    #[kithara::test]
522    fn clear_seek_pending_only_clears_matching_epoch() {
523        let tl = Timeline::new();
524        let epoch1 = tl.initiate_seek(Duration::from_secs(5));
525        let epoch2 = tl.initiate_seek(Duration::from_secs(10));
526        tl.clear_seek_pending(epoch1);
527        assert!(tl.is_seek_pending());
528        tl.clear_seek_pending(epoch2);
529        assert!(!tl.is_seek_pending());
530    }
531
532    #[kithara::test]
533    fn new_initiate_seek_resets_seek_pending() {
534        let tl = Timeline::new();
535        let epoch = tl.initiate_seek(Duration::from_secs(5));
536        tl.clear_seek_pending(epoch);
537        assert!(!tl.is_seek_pending());
538        let _epoch2 = tl.initiate_seek(Duration::from_secs(10));
539        assert!(tl.is_seek_pending());
540    }
541
542    #[kithara::test]
543    fn complete_seek_does_not_clear_seek_pending() {
544        let tl = Timeline::new();
545        let epoch = tl.initiate_seek(Duration::from_secs(5));
546        tl.complete_seek(epoch);
547        assert!(!tl.is_flushing());
548        assert!(tl.is_seek_pending());
549    }
550
551    #[kithara::test]
552    fn is_seek_pending_visible_across_clones() {
553        let tl = Timeline::new();
554        let clone = tl.clone();
555        let _epoch = tl.initiate_seek(Duration::from_secs(5));
556        assert!(clone.is_seek_pending());
557    }
558
559    #[kithara::test]
560    fn flag_pair_matrix_matches_bitflags_snapshot() {
561        for mask in 0u8..4 {
562            let tl = Timeline::new();
563            let want_flushing = mask & 1 != 0;
564            let want_seek_pending = mask & 2 != 0;
565
566            if want_flushing || want_seek_pending {
567                let _ = tl.initiate_seek(Duration::from_secs(1));
568                if !want_flushing {
569                    tl.complete_seek(tl.seek_epoch());
570                }
571                if !want_seek_pending {
572                    tl.clear_seek_pending(tl.seek_epoch());
573                }
574            }
575
576            assert_eq!(tl.is_flushing(), want_flushing, "mask {mask:#04b} flushing");
577            assert_eq!(
578                tl.is_seek_pending(),
579                want_seek_pending,
580                "mask {mask:#04b} seek_pending"
581            );
582
583            let snapshot = tl.flags_snapshot_with(Ordering::Acquire);
584            assert_eq!(
585                snapshot.contains(TimelineFlags::FLUSHING),
586                want_flushing,
587                "mask {mask:#04b} snapshot flushing"
588            );
589            assert_eq!(
590                snapshot.contains(TimelineFlags::SEEK_PENDING),
591                want_seek_pending,
592                "mask {mask:#04b} snapshot seek_pending"
593            );
594        }
595    }
596
597    #[kithara::test]
598    fn complete_seek_double_check_re_raises_flushing_when_newer_seek_interleaves() {
599        let tl = Timeline::new();
600        let epoch1 = tl.initiate_seek(Duration::from_secs(1));
601
602        tl.remove_flags_with(TimelineFlags::FLUSHING, Ordering::SeqCst);
603        let _epoch2 = tl.initiate_seek(Duration::from_secs(2));
604        tl.complete_seek(epoch1);
605
606        assert!(
607            tl.is_flushing(),
608            "FLUSHING must be re-raised when a newer seek interleaves mid-complete"
609        );
610    }
611
612    #[kithara::test]
613    fn concurrent_flag_toggles_preserve_independent_semantics() {
614        const ITER: usize = 50_000;
615
616        let tl = Timeline::new();
617        let barrier = Arc::new(Barrier::new(3));
618
619        let tl_a = tl.clone();
620        let barrier_a = Arc::clone(&barrier);
621        let a = thread::spawn(move || {
622            barrier_a.wait();
623            for i in 0..ITER {
624                tl_a.set_playing(i % 2 == 0);
625            }
626        });
627
628        let tl_b = tl.clone();
629        let barrier_b = Arc::clone(&barrier);
630        let b = thread::spawn(move || {
631            barrier_b.wait();
632            for _ in 0..ITER {
633                let epoch = tl_b.initiate_seek(Duration::from_millis(1));
634                tl_b.clear_seek_pending(epoch);
635                tl_b.complete_seek(epoch);
636            }
637        });
638
639        let tl_c = tl.clone();
640        let barrier_c = Arc::clone(&barrier);
641        let c = thread::spawn(move || {
642            barrier_c.wait();
643            let mut observed = 0u64;
644            for _ in 0..ITER {
645                let snap = tl_c.flags_snapshot_with(Ordering::Acquire);
646                observed ^= u64::from(snap.bits());
647            }
648            observed
649        });
650
651        a.join()
652            .expect("BUG: spawned thread A must not panic in this test");
653        b.join()
654            .expect("BUG: spawned thread B must not panic in this test");
655        let _ = c
656            .join()
657            .expect("BUG: spawned thread C must not panic in this test");
658
659        assert!(
660            !tl.is_playing(),
661            "PLAYING must match the last deterministic write"
662        );
663        assert!(!tl.is_flushing(), "FLUSHING must be fully cleared");
664        assert!(
665            !tl.is_seek_pending(),
666            "SEEK_PENDING must be fully cleared after last clear"
667        );
668    }
669
670    #[kithara::test]
671    fn playing_defaults_to_false() {
672        let tl = Timeline::new();
673        assert!(!tl.is_playing());
674    }
675
676    #[kithara::test]
677    fn set_playing_true_is_visible_across_clones() {
678        let tl = Timeline::new();
679        let clone = tl.clone();
680        tl.set_playing(true);
681        assert!(clone.is_playing());
682        clone.set_playing(false);
683        assert!(!tl.is_playing());
684    }
685
686    #[kithara::test]
687    fn set_playing_idempotent() {
688        let tl = Timeline::new();
689        tl.set_playing(true);
690        tl.set_playing(true);
691        assert!(tl.is_playing());
692        tl.set_playing(false);
693        tl.set_playing(false);
694        assert!(!tl.is_playing());
695    }
696
697    #[kithara::test]
698    fn playing_is_orthogonal_to_other_flags() {
699        for mask in 0u8..4 {
700            for &initial_playing in &[false, true] {
701                let tl = Timeline::new();
702                let want_flushing = mask & 1 != 0;
703                let want_seek_pending = mask & 2 != 0;
704
705                if want_flushing || want_seek_pending {
706                    let _ = tl.initiate_seek(Duration::from_secs(1));
707                    if !want_flushing {
708                        tl.complete_seek(tl.seek_epoch());
709                    }
710                    if !want_seek_pending {
711                        tl.clear_seek_pending(tl.seek_epoch());
712                    }
713                }
714                tl.set_playing(initial_playing);
715
716                assert_eq!(tl.is_playing(), initial_playing);
717                assert_eq!(
718                    tl.is_flushing(),
719                    want_flushing,
720                    "mask {mask:#04b} play={initial_playing} flushing"
721                );
722                assert_eq!(
723                    tl.is_seek_pending(),
724                    want_seek_pending,
725                    "mask {mask:#04b} play={initial_playing} seek_pending"
726                );
727
728                tl.set_playing(!initial_playing);
729                assert_eq!(tl.is_playing(), !initial_playing);
730                assert_eq!(tl.is_flushing(), want_flushing);
731                assert_eq!(tl.is_seek_pending(), want_seek_pending);
732            }
733        }
734    }
735
736    #[kithara::test]
737    fn initiate_seek_does_not_touch_playing() {
738        let tl = Timeline::new();
739        tl.set_playing(true);
740        let epoch = tl.initiate_seek(Duration::from_secs(5));
741        assert!(tl.is_playing(), "PLAYING must not be affected by seek");
742        tl.complete_seek(epoch);
743        assert!(tl.is_playing(), "PLAYING must survive complete_seek");
744        tl.clear_seek_pending(epoch);
745        assert!(tl.is_playing(), "PLAYING must survive clear_seek_pending");
746    }
747}