Skip to main content

arcly_stream/
segment.rs

1//! Keyframe-boundary segmentation timing, shared by the HLS packager and the
2//! recorder.
3//!
4//! Both [`packager`](crate::packager) and [`record`](crate::record) need the
5//! same decision: hold frames until the first keyframe, then cut a new segment
6//! at the first keyframe at or after a target duration. [`SegmentClock`]
7//! captures exactly that state machine so the two consumers share one tested
8//! implementation rather than re-deriving it.
9
10use crate::MediaFrame;
11
12/// What a consumer should do with a frame, per [`SegmentClock::observe`].
13#[derive(Debug, Clone, Copy, PartialEq, Default)]
14pub struct SegmentDecision {
15    /// Drop this frame: no keyframe has anchored a segment yet.
16    pub skip: bool,
17    /// Finalize the currently-open segment with this duration (seconds) *before*
18    /// writing the current frame.
19    pub cut_previous: Option<f64>,
20    /// A new segment begins at this frame (the consumer should reset its muxer).
21    pub open_new: bool,
22}
23
24/// Tracks segment boundaries from a frame stream's keyframes and timestamps.
25///
26/// ```
27/// use arcly_stream::segment::SegmentClock;
28/// use arcly_stream::{CodecId, MediaFrame};
29/// use bytes::Bytes;
30///
31/// let key = |pts| MediaFrame::new_video(pts, pts, Bytes::new(), CodecId::H264, true);
32/// let mut clock = SegmentClock::new(2); // 2-second target
33///
34/// // First keyframe opens segment 0.
35/// let d = clock.observe(&key(0));
36/// assert!(d.open_new && !d.skip && d.cut_previous.is_none());
37///
38/// // A keyframe before the target does not cut.
39/// assert_eq!(clock.observe(&key(1000)).cut_previous, None);
40///
41/// // A keyframe at/after the target cuts the previous segment.
42/// let d = clock.observe(&key(2000));
43/// assert_eq!(d.cut_previous, Some(2.0));
44/// assert!(d.open_new);
45/// ```
46#[derive(Debug, Clone)]
47pub struct SegmentClock {
48    target_ms: i64,
49    started: bool,
50    seg_start_pts: i64,
51    last_pts: i64,
52}
53
54impl SegmentClock {
55    /// A clock targeting `target_secs`-long segments (minimum 1 second).
56    pub fn new(target_secs: u64) -> Self {
57        Self {
58            target_ms: (target_secs.max(1) * 1000) as i64,
59            started: false,
60            seg_start_pts: 0,
61            last_pts: 0,
62        }
63    }
64
65    /// Whether a segment is currently open.
66    pub fn is_open(&self) -> bool {
67        self.started
68    }
69
70    /// Feed one frame and learn what to do with it (call before writing it).
71    pub fn observe(&mut self, frame: &MediaFrame) -> SegmentDecision {
72        if !self.started {
73            // Anchor the first segment on the first keyframe; drop anything
74            // before it so every segment is independently decodable.
75            if !frame.is_keyframe() {
76                return SegmentDecision {
77                    skip: true,
78                    ..SegmentDecision::default()
79                };
80            }
81            self.started = true;
82            self.seg_start_pts = frame.pts;
83            self.last_pts = frame.pts;
84            return SegmentDecision {
85                skip: false,
86                cut_previous: None,
87                open_new: true,
88            };
89        }
90
91        let mut decision = SegmentDecision::default();
92        if frame.is_keyframe() {
93            let elapsed_ms = (frame.pts - self.seg_start_pts).max(0);
94            if elapsed_ms >= self.target_ms {
95                decision.cut_previous = Some(elapsed_ms as f64 / 1000.0);
96                decision.open_new = true;
97                self.seg_start_pts = frame.pts;
98            }
99        }
100        self.last_pts = frame.pts;
101        decision
102    }
103
104    /// Close any open segment at end-of-stream, returning its duration in
105    /// seconds (or `None` if no segment is open).
106    pub fn flush(&mut self) -> Option<f64> {
107        if !self.started {
108            return None;
109        }
110        self.started = false;
111        Some((self.last_pts - self.seg_start_pts).max(0) as f64 / 1000.0)
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use crate::CodecId;
119    use bytes::Bytes;
120
121    fn frame(pts: i64, key: bool) -> MediaFrame {
122        MediaFrame::new_video(pts, pts, Bytes::from_static(b"x"), CodecId::H264, key)
123    }
124
125    #[test]
126    fn skips_until_first_keyframe() {
127        let mut c = SegmentClock::new(2);
128        assert!(c.observe(&frame(0, false)).skip);
129        let d = c.observe(&frame(10, true));
130        assert!(!d.skip && d.open_new && d.cut_previous.is_none());
131        assert!(c.is_open());
132    }
133
134    #[test]
135    fn cuts_on_keyframe_after_target_and_flushes_tail() {
136        let mut c = SegmentClock::new(2);
137        c.observe(&frame(0, true)); // open seg 0
138        assert_eq!(c.observe(&frame(1000, true)).cut_previous, None); // under target
139        let d = c.observe(&frame(2000, true)); // >= target
140        assert_eq!(d.cut_previous, Some(2.0));
141        assert!(d.open_new);
142        c.observe(&frame(2500, false));
143        assert_eq!(c.flush(), Some(0.5)); // tail of seg 1
144        assert_eq!(c.flush(), None); // idempotent once closed
145    }
146}