arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Keyframe-boundary segmentation timing, shared by the HLS packager and the
//! recorder.
//!
//! Both [`packager`](crate::packager) and [`record`](crate::record) need the
//! same decision: hold frames until the first keyframe, then cut a new segment
//! at the first keyframe at or after a target duration. [`SegmentClock`]
//! captures exactly that state machine so the two consumers share one tested
//! implementation rather than re-deriving it.

use crate::MediaFrame;

/// What a consumer should do with a frame, per [`SegmentClock::observe`].
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub struct SegmentDecision {
    /// Drop this frame: no keyframe has anchored a segment yet.
    pub skip: bool,
    /// Finalize the currently-open segment with this duration (seconds) *before*
    /// writing the current frame.
    pub cut_previous: Option<f64>,
    /// A new segment begins at this frame (the consumer should reset its muxer).
    pub open_new: bool,
}

/// Tracks segment boundaries from a frame stream's keyframes and timestamps.
///
/// ```
/// use arcly_stream::segment::SegmentClock;
/// use arcly_stream::{CodecId, MediaFrame};
/// use bytes::Bytes;
///
/// let key = |pts| MediaFrame::new_video(pts, pts, Bytes::new(), CodecId::H264, true);
/// let mut clock = SegmentClock::new(2); // 2-second target
///
/// // First keyframe opens segment 0.
/// let d = clock.observe(&key(0));
/// assert!(d.open_new && !d.skip && d.cut_previous.is_none());
///
/// // A keyframe before the target does not cut.
/// assert_eq!(clock.observe(&key(1000)).cut_previous, None);
///
/// // A keyframe at/after the target cuts the previous segment.
/// let d = clock.observe(&key(2000));
/// assert_eq!(d.cut_previous, Some(2.0));
/// assert!(d.open_new);
/// ```
#[derive(Debug, Clone)]
pub struct SegmentClock {
    target_ms: i64,
    started: bool,
    seg_start_pts: i64,
    last_pts: i64,
}

impl SegmentClock {
    /// A clock targeting `target_secs`-long segments (minimum 1 second).
    pub fn new(target_secs: u64) -> Self {
        Self {
            target_ms: (target_secs.max(1) * 1000) as i64,
            started: false,
            seg_start_pts: 0,
            last_pts: 0,
        }
    }

    /// Whether a segment is currently open.
    pub fn is_open(&self) -> bool {
        self.started
    }

    /// Feed one frame and learn what to do with it (call before writing it).
    pub fn observe(&mut self, frame: &MediaFrame) -> SegmentDecision {
        if !self.started {
            // Anchor the first segment on the first keyframe; drop anything
            // before it so every segment is independently decodable.
            if !frame.is_keyframe() {
                return SegmentDecision {
                    skip: true,
                    ..SegmentDecision::default()
                };
            }
            self.started = true;
            self.seg_start_pts = frame.pts;
            self.last_pts = frame.pts;
            return SegmentDecision {
                skip: false,
                cut_previous: None,
                open_new: true,
            };
        }

        let mut decision = SegmentDecision::default();
        if frame.is_keyframe() {
            let elapsed_ms = (frame.pts - self.seg_start_pts).max(0);
            if elapsed_ms >= self.target_ms {
                decision.cut_previous = Some(elapsed_ms as f64 / 1000.0);
                decision.open_new = true;
                self.seg_start_pts = frame.pts;
            }
        }
        self.last_pts = frame.pts;
        decision
    }

    /// Close any open segment at end-of-stream, returning its duration in
    /// seconds (or `None` if no segment is open).
    pub fn flush(&mut self) -> Option<f64> {
        if !self.started {
            return None;
        }
        self.started = false;
        Some((self.last_pts - self.seg_start_pts).max(0) as f64 / 1000.0)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::CodecId;
    use bytes::Bytes;

    fn frame(pts: i64, key: bool) -> MediaFrame {
        MediaFrame::new_video(pts, pts, Bytes::from_static(b"x"), CodecId::H264, key)
    }

    #[test]
    fn skips_until_first_keyframe() {
        let mut c = SegmentClock::new(2);
        assert!(c.observe(&frame(0, false)).skip);
        let d = c.observe(&frame(10, true));
        assert!(!d.skip && d.open_new && d.cut_previous.is_none());
        assert!(c.is_open());
    }

    #[test]
    fn cuts_on_keyframe_after_target_and_flushes_tail() {
        let mut c = SegmentClock::new(2);
        c.observe(&frame(0, true)); // open seg 0
        assert_eq!(c.observe(&frame(1000, true)).cut_previous, None); // under target
        let d = c.observe(&frame(2000, true)); // >= target
        assert_eq!(d.cut_previous, Some(2.0));
        assert!(d.open_new);
        c.observe(&frame(2500, false));
        assert_eq!(c.flush(), Some(0.5)); // tail of seg 1
        assert_eq!(c.flush(), None); // idempotent once closed
    }
}