arcly_stream/segment.rs
1//! Keyframe-boundary segmentation timing, shared by the HLS packager and the
2//! recorder.
3//!
4//! Both [`packager`](crate::packager) and [`record`](crate::record) need the
5//! same decision: hold frames until the first keyframe, then cut a new segment
6//! at the first keyframe at or after a target duration. [`SegmentClock`]
7//! captures exactly that state machine so the two consumers share one tested
8//! implementation rather than re-deriving it.
9
10use crate::MediaFrame;
11
12/// What a consumer should do with a frame, per [`SegmentClock::observe`].
13#[derive(Debug, Clone, Copy, PartialEq, Default)]
14pub struct SegmentDecision {
15 /// Drop this frame: no keyframe has anchored a segment yet.
16 pub skip: bool,
17 /// Finalize the currently-open segment with this duration (seconds) *before*
18 /// writing the current frame.
19 pub cut_previous: Option<f64>,
20 /// A new segment begins at this frame (the consumer should reset its muxer).
21 pub open_new: bool,
22}
23
24/// Tracks segment boundaries from a frame stream's keyframes and timestamps.
25///
26/// ```
27/// use arcly_stream::segment::SegmentClock;
28/// use arcly_stream::{CodecId, MediaFrame};
29/// use bytes::Bytes;
30///
31/// let key = |pts| MediaFrame::new_video(pts, pts, Bytes::new(), CodecId::H264, true);
32/// let mut clock = SegmentClock::new(2); // 2-second target
33///
34/// // First keyframe opens segment 0.
35/// let d = clock.observe(&key(0));
36/// assert!(d.open_new && !d.skip && d.cut_previous.is_none());
37///
38/// // A keyframe before the target does not cut.
39/// assert_eq!(clock.observe(&key(1000)).cut_previous, None);
40///
41/// // A keyframe at/after the target cuts the previous segment.
42/// let d = clock.observe(&key(2000));
43/// assert_eq!(d.cut_previous, Some(2.0));
44/// assert!(d.open_new);
45/// ```
46#[derive(Debug, Clone)]
47pub struct SegmentClock {
48 target_ms: i64,
49 started: bool,
50 seg_start_pts: i64,
51 last_pts: i64,
52}
53
54impl SegmentClock {
55 /// A clock targeting `target_secs`-long segments (minimum 1 second).
56 pub fn new(target_secs: u64) -> Self {
57 Self {
58 target_ms: (target_secs.max(1) * 1000) as i64,
59 started: false,
60 seg_start_pts: 0,
61 last_pts: 0,
62 }
63 }
64
65 /// Whether a segment is currently open.
66 pub fn is_open(&self) -> bool {
67 self.started
68 }
69
70 /// Feed one frame and learn what to do with it (call before writing it).
71 pub fn observe(&mut self, frame: &MediaFrame) -> SegmentDecision {
72 if !self.started {
73 // Anchor the first segment on the first keyframe; drop anything
74 // before it so every segment is independently decodable.
75 if !frame.is_keyframe() {
76 return SegmentDecision {
77 skip: true,
78 ..SegmentDecision::default()
79 };
80 }
81 self.started = true;
82 self.seg_start_pts = frame.pts;
83 self.last_pts = frame.pts;
84 return SegmentDecision {
85 skip: false,
86 cut_previous: None,
87 open_new: true,
88 };
89 }
90
91 let mut decision = SegmentDecision::default();
92 if frame.is_keyframe() {
93 let elapsed_ms = (frame.pts - self.seg_start_pts).max(0);
94 if elapsed_ms >= self.target_ms {
95 decision.cut_previous = Some(elapsed_ms as f64 / 1000.0);
96 decision.open_new = true;
97 self.seg_start_pts = frame.pts;
98 }
99 }
100 self.last_pts = frame.pts;
101 decision
102 }
103
104 /// Close any open segment at end-of-stream, returning its duration in
105 /// seconds (or `None` if no segment is open).
106 pub fn flush(&mut self) -> Option<f64> {
107 if !self.started {
108 return None;
109 }
110 self.started = false;
111 Some((self.last_pts - self.seg_start_pts).max(0) as f64 / 1000.0)
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use crate::CodecId;
119 use bytes::Bytes;
120
121 fn frame(pts: i64, key: bool) -> MediaFrame {
122 MediaFrame::new_video(pts, pts, Bytes::from_static(b"x"), CodecId::H264, key)
123 }
124
125 #[test]
126 fn skips_until_first_keyframe() {
127 let mut c = SegmentClock::new(2);
128 assert!(c.observe(&frame(0, false)).skip);
129 let d = c.observe(&frame(10, true));
130 assert!(!d.skip && d.open_new && d.cut_previous.is_none());
131 assert!(c.is_open());
132 }
133
134 #[test]
135 fn cuts_on_keyframe_after_target_and_flushes_tail() {
136 let mut c = SegmentClock::new(2);
137 c.observe(&frame(0, true)); // open seg 0
138 assert_eq!(c.observe(&frame(1000, true)).cut_previous, None); // under target
139 let d = c.observe(&frame(2000, true)); // >= target
140 assert_eq!(d.cut_previous, Some(2.0));
141 assert!(d.open_new);
142 c.observe(&frame(2500, false));
143 assert_eq!(c.flush(), Some(0.5)); // tail of seg 1
144 assert_eq!(c.flush(), None); // idempotent once closed
145 }
146}