arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! HLS / LL-HLS media-playlist (`.m3u8`) generation.

use std::collections::VecDeque;
use std::fmt::Write as _;

/// One LL-HLS partial segment (`#EXT-X-PART`).
#[derive(Debug, Clone)]
pub struct Part {
    /// Part URI (relative path under the stream's storage prefix).
    pub uri: String,
    /// Part duration in seconds.
    pub duration: f64,
    /// Whether the part begins an independent (keyframe-led) access unit, set as
    /// `INDEPENDENT=YES` so a player can start decoding from it.
    pub independent: bool,
}

/// One segment entry in a media playlist.
#[derive(Debug, Clone)]
pub struct Segment {
    /// Media sequence number (monotonic, never reused).
    pub seq: u64,
    /// Segment duration in seconds.
    pub duration: f64,
    /// Segment URI (relative path written under the stream's storage prefix).
    pub uri: String,
    /// Whether a discontinuity precedes this segment.
    pub discontinuity: bool,
    /// The LL-HLS partial segments that make up this segment (empty for plain
    /// HLS). Rendered as `#EXT-X-PART` lines before the segment's `#EXTINF`.
    pub parts: Vec<Part>,
}

/// A sliding-window HLS media playlist.
///
/// Holds the most recent `window` segments and renders a spec-compliant
/// `#EXTM3U` document. Set `low_latency` to emit `#EXT-X-PART` hints and the
/// LL-HLS preload/blocking tags (the segmenter supplies parts).
#[derive(Debug, Clone)]
pub struct HlsPlaylist {
    target_duration: u64,
    window: usize,
    low_latency: bool,
    part_target: f64,
    media_sequence: u64,
    discontinuity_sequence: u64,
    segments: VecDeque<Segment>,
    map_uri: Option<String>,
    finished: bool,
    /// Parts of the in-progress (not-yet-complete) segment, rendered at the live
    /// edge after the last complete segment.
    pending_parts: Vec<Part>,
    /// The URI of the next part not yet available, advertised via
    /// `#EXT-X-PRELOAD-HINT` so players can issue a blocking request for it.
    preload_hint: Option<String>,
}

impl HlsPlaylist {
    /// A live playlist holding `window` segments of up to `target_duration` secs.
    pub fn new(target_duration: u64, window: usize) -> Self {
        Self {
            target_duration,
            window: window.max(1),
            low_latency: false,
            part_target: 0.0,
            media_sequence: 0,
            discontinuity_sequence: 0,
            segments: VecDeque::new(),
            map_uri: None,
            finished: false,
            pending_parts: Vec::new(),
            preload_hint: None,
        }
    }

    /// Enable LL-HLS output with the given partial-segment target duration.
    pub fn low_latency(mut self, part_target: f64) -> Self {
        self.low_latency = true;
        self.part_target = part_target;
        self
    }

    /// Set the fMP4 initialization-segment URI, emitted as `#EXT-X-MAP`.
    /// Required for fragmented-MP4 renditions (HEVC, AV1, VVC).
    pub fn set_map(&mut self, uri: impl Into<String>) {
        self.map_uri = Some(uri.into());
    }

    /// Append a segment, evicting the oldest if the window is full.
    pub fn push(&mut self, seg: Segment) {
        if self.segments.is_empty() {
            self.media_sequence = seg.seq;
        }
        self.segments.push_back(seg);
        while self.segments.len() > self.window {
            if let Some(old) = self.segments.pop_front() {
                if old.discontinuity {
                    self.discontinuity_sequence += 1;
                }
                self.media_sequence = self.segments.front().map(|s| s.seq).unwrap_or(old.seq + 1);
            }
        }
    }

    /// Append a partial segment to the in-progress (live-edge) segment.
    pub fn add_pending_part(&mut self, part: Part) {
        self.pending_parts.push(part);
    }

    /// Advertise the next not-yet-available part via `#EXT-X-PRELOAD-HINT`.
    pub fn set_preload_hint(&mut self, uri: impl Into<String>) {
        self.preload_hint = Some(uri.into());
    }

    /// Complete the in-progress segment: push it carrying the parts accumulated
    /// via [`add_pending_part`](Self::add_pending_part), then clear the pending
    /// state for the next segment.
    pub fn commit_segment(&mut self, mut seg: Segment) {
        seg.parts = std::mem::take(&mut self.pending_parts);
        self.preload_hint = None;
        self.push(seg);
    }

    /// Mark the stream complete (appends `#EXT-X-ENDLIST`).
    pub fn finish(&mut self) {
        self.finished = true;
        self.pending_parts.clear();
        self.preload_hint = None;
    }

    /// Segments currently in the window.
    pub fn segments(&self) -> &VecDeque<Segment> {
        &self.segments
    }

    /// Render the playlist to an `.m3u8` string.
    pub fn render(&self) -> String {
        let mut s = String::with_capacity(256 + self.segments.len() * 64);
        s.push_str("#EXTM3U\n");
        s.push_str("#EXT-X-VERSION:");
        s.push_str(if self.low_latency { "9\n" } else { "3\n" });
        // `write!` formats directly into the buffer, avoiding the throwaway
        // String each `format!` would allocate (this runs on every segment cut).
        let _ = writeln!(s, "#EXT-X-TARGETDURATION:{}", self.target_duration);
        let _ = writeln!(s, "#EXT-X-MEDIA-SEQUENCE:{}", self.media_sequence);
        if self.discontinuity_sequence > 0 {
            let _ = writeln!(
                s,
                "#EXT-X-DISCONTINUITY-SEQUENCE:{}",
                self.discontinuity_sequence
            );
        }
        if self.low_latency {
            let _ = writeln!(s, "#EXT-X-PART-INF:PART-TARGET={:.3}", self.part_target);
            // PART-HOLD-BACK is REQUIRED by the LL-HLS spec when parts are present
            // (≥ 3× PART-TARGET); players such as hls.js refuse the stream without
            // it. Pair it with CAN-BLOCK-RELOAD for blocking playlist reload.
            let _ = writeln!(
                s,
                "#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK={:.3}",
                self.part_target * 3.0
            );
        }
        if let Some(uri) = &self.map_uri {
            let _ = writeln!(s, "#EXT-X-MAP:URI=\"{uri}\"");
        }
        for seg in &self.segments {
            if seg.discontinuity {
                s.push_str("#EXT-X-DISCONTINUITY\n");
            }
            // LL-HLS: a completed segment's parts precede its #EXTINF.
            for part in &seg.parts {
                Self::render_part(&mut s, part);
            }
            let _ = writeln!(s, "#EXTINF:{:.3},", seg.duration);
            s.push_str(&seg.uri);
            s.push('\n');
        }
        // Live edge: parts of the segment still being produced, then a hint for
        // the next part so players can issue a blocking preload request.
        if self.low_latency && !self.finished {
            for part in &self.pending_parts {
                Self::render_part(&mut s, part);
            }
            if let Some(hint) = &self.preload_hint {
                let _ = writeln!(s, "#EXT-X-PRELOAD-HINT:TYPE=PART,URI=\"{hint}\"");
            }
        }
        if self.finished {
            s.push_str("#EXT-X-ENDLIST\n");
        }
        s
    }

    /// Render one `#EXT-X-PART` line.
    fn render_part(s: &mut String, part: &Part) {
        let _ = write!(
            s,
            "#EXT-X-PART:DURATION={:.3},URI=\"{}\"",
            part.duration, part.uri
        );
        if part.independent {
            s.push_str(",INDEPENDENT=YES");
        }
        s.push('\n');
    }
}

/// Render a multivariant (master) playlist referencing a single media playlist.
///
/// This is the only place an HLS stream advertises its `CODECS` attribute (e.g.
/// `hvc1.1.6.L120.B0`). It is **required** for HEVC/H.265 (and AV1/VVC):
/// Safari and other players will not even attempt to decode H.265 from a bare
/// media playlist that lacks a `CODECS="hvc1.*"` entry. `BANDWIDTH` is a
/// mandatory `#EXT-X-STREAM-INF` attribute; pass a nominal estimate.
pub fn render_master(media_uri: &str, codecs: Option<&str>, bandwidth: u32) -> String {
    let mut s = String::with_capacity(160);
    s.push_str("#EXTM3U\n");
    s.push_str("#EXT-X-VERSION:7\n");
    let _ = write!(s, "#EXT-X-STREAM-INF:BANDWIDTH={bandwidth}");
    if let Some(c) = codecs {
        let _ = write!(s, ",CODECS=\"{c}\"");
    }
    s.push('\n');
    s.push_str(media_uri);
    s.push('\n');
    s
}

#[cfg(test)]
mod tests {
    use super::*;

    fn seg(seq: u64, dur: f64) -> Segment {
        Segment {
            seq,
            duration: dur,
            uri: format!("seg{seq}.m4s"),
            discontinuity: false,
            parts: Vec::new(),
        }
    }

    #[test]
    fn renders_live_playlist_with_sliding_window() {
        let mut pl = HlsPlaylist::new(6, 3);
        for i in 0..5 {
            pl.push(seg(i, 5.0));
        }
        // Window keeps the last 3 segments; media sequence advances to 2.
        assert_eq!(pl.segments().len(), 3);
        let out = pl.render();
        assert!(out.starts_with("#EXTM3U\n"));
        assert!(out.contains("#EXT-X-TARGETDURATION:6\n"));
        assert!(out.contains("#EXT-X-MEDIA-SEQUENCE:2\n"));
        assert!(out.contains("seg4.m4s"));
        assert!(!out.contains("seg1.m4s")); // evicted
        assert!(!out.contains("#EXT-X-ENDLIST"));
    }

    #[test]
    fn finish_appends_endlist() {
        let mut pl = HlsPlaylist::new(6, 5);
        pl.push(seg(0, 4.0));
        pl.finish();
        assert!(pl.render().trim_end().ends_with("#EXT-X-ENDLIST"));
    }

    #[test]
    fn low_latency_emits_part_tags() {
        let pl = HlsPlaylist::new(4, 5).low_latency(0.33);
        let out = pl.render();
        assert!(out.contains("#EXT-X-VERSION:9"));
        assert!(out.contains("PART-TARGET=0.330"));
        assert!(out.contains("CAN-BLOCK-RELOAD=YES"));
        assert!(
            out.contains("PART-HOLD-BACK=0.990"),
            "LL-HLS requires PART-HOLD-BACK"
        );
    }

    #[test]
    fn renders_pending_parts_and_preload_hint_at_live_edge() {
        let mut pl = HlsPlaylist::new(4, 5).low_latency(0.5);
        pl.add_pending_part(Part {
            uri: "seg0.0.m4s".into(),
            duration: 0.5,
            independent: true,
        });
        pl.add_pending_part(Part {
            uri: "seg0.1.m4s".into(),
            duration: 0.5,
            independent: false,
        });
        pl.set_preload_hint("seg0.2.m4s");
        let out = pl.render();
        assert!(out.contains("#EXT-X-PART:DURATION=0.500,URI=\"seg0.0.m4s\",INDEPENDENT=YES"));
        assert!(out.contains("#EXT-X-PART:DURATION=0.500,URI=\"seg0.1.m4s\"\n"));
        assert!(out.contains("#EXT-X-PRELOAD-HINT:TYPE=PART,URI=\"seg0.2.m4s\""));
    }

    #[test]
    fn commit_segment_moves_pending_parts_into_segment() {
        let mut pl = HlsPlaylist::new(4, 5).low_latency(0.5);
        pl.add_pending_part(Part {
            uri: "seg0.0.m4s".into(),
            duration: 1.0,
            independent: true,
        });
        pl.commit_segment(seg(0, 1.0));
        // Parts now precede the committed segment's #EXTINF, and pending is clear.
        let out = pl.render();
        let part_pos = out.find("#EXT-X-PART").unwrap();
        let inf_pos = out.find("#EXTINF").unwrap();
        assert!(part_pos < inf_pos, "part precedes its segment's EXTINF");
        assert!(
            !out.contains("#EXT-X-PRELOAD-HINT"),
            "pending cleared on commit"
        );
    }
}