arcly-stream 0.1.4

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Egress packaging: turn a live frame stream into HLS/LL-HLS segments.
//!
//! Gated behind `hls`. Provides the [`Muxer`] and [`Packager`] contracts, a
//! sliding-window [`HlsPlaylist`] generator, and an [`HlsSegmenter`] that cuts
//! segments on keyframe boundaries and writes both media segments and the
//! `.m3u8` playlist through any [`StorageBackend`].
//!
//! The container byte-format is pluggable via [`Muxer`]: a [`PassthroughMuxer`]
//! (elementary-stream concatenation) ships here; production MPEG-TS or fMP4/CMAF
//! muxers implement the same trait and drop in unchanged.
//!
//! ```no_run
//! # #[cfg(feature = "storage-fs")]
//! # async fn demo(handle: arcly_stream::StreamHandle) -> arcly_stream::Result<()> {
//! use arcly_stream::packager::{HlsSegmenter, Packager, PassthroughMuxer};
//! use arcly_stream::storage::FsStorage;
//!
//! let storage = FsStorage::new("/var/hls");
//! let mut seg = HlsSegmenter::new(PassthroughMuxer::new("ts"), storage, "live/cam", 6, 5);
//! let mut sub = handle.subscribe_resilient();
//! while let Some(frame) = sub.recv().await {
//!     seg.push(&frame).await?;
//! }
//! seg.finish().await?;
//! # Ok(())
//! # }
//! ```

mod playlist;

#[cfg(feature = "mpegts")]
#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
mod mpegts;

#[cfg(feature = "mpegts")]
#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
pub use mpegts::MpegTsMuxer;

#[cfg(feature = "fmp4")]
#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
mod fmp4;

#[cfg(feature = "fmp4")]
#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
pub use fmp4::Fmp4Muxer;

pub use playlist::{HlsPlaylist, Segment};

use crate::traits::StorageBackend;
use crate::{CodecId, FrameFlags, MediaFrame, Result};
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};

/// Container muxer: accumulates frames into one segment's bytes.
///
/// Implement this for MPEG-TS, fMP4/CMAF, etc. The segmenter calls
/// [`start_segment`](Self::start_segment) at each boundary, [`write`](Self::write)
/// per frame, and [`finish_segment`](Self::finish_segment) to take the bytes.
pub trait Muxer: Send {
    /// File extension for produced segments (e.g. `"ts"`, `"m4s"`).
    fn extension(&self) -> &'static str;

    /// Begin a fresh segment, discarding any partial state.
    fn start_segment(&mut self) -> Result<()>;

    /// Append one frame to the current segment.
    fn write(&mut self, frame: &MediaFrame) -> Result<()>;

    /// Take the finished segment's bytes, resetting for the next one.
    fn finish_segment(&mut self) -> Result<Bytes>;

    /// Build an fMP4 initialization segment (`ftyp`+`moov` carrying the
    /// `avcC`/`hvcC`/`av1C`/`vvcC` decoder config) from a CONFIG access unit.
    ///
    /// Returning `Some` makes [`HlsSegmenter`] write it once and reference it via
    /// `#EXT-X-MAP`. The default returns `None` — correct for self-initializing
    /// containers like MPEG-TS and the [`PassthroughMuxer`].
    fn init_segment(&mut self, _codec: CodecId, _config_record: &[u8]) -> Result<Option<Bytes>> {
        Ok(None)
    }

    /// The HLS `CODECS` attribute for this rendition's master-playlist entry
    /// (e.g. `"hvc1.1.6.L120.B0"`), once known.
    fn codec_string(&self) -> Option<String> {
        None
    }
}

/// A trivial muxer that concatenates frame payloads (elementary stream).
///
/// Useful for testing and raw recording; not a real container. Swap in a TS or
/// fMP4 muxer for player-compatible output.
pub struct PassthroughMuxer {
    ext: &'static str,
    buf: BytesMut,
}

impl PassthroughMuxer {
    /// New passthrough muxer producing segments with the given extension.
    pub fn new(ext: &'static str) -> Self {
        Self {
            ext,
            buf: BytesMut::new(),
        }
    }
}

impl Muxer for PassthroughMuxer {
    fn extension(&self) -> &'static str {
        self.ext
    }
    fn start_segment(&mut self) -> Result<()> {
        self.buf.clear();
        Ok(())
    }
    fn write(&mut self, frame: &MediaFrame) -> Result<()> {
        self.buf.put_slice(&frame.data);
        Ok(())
    }
    fn finish_segment(&mut self) -> Result<Bytes> {
        Ok(std::mem::take(&mut self.buf).freeze())
    }
}

/// Consumes a live frame stream and produces a packaged rendition.
#[async_trait]
pub trait Packager: Send {
    /// Feed one frame; may finalize and emit a segment as a side effect.
    async fn push(&mut self, frame: &MediaFrame) -> Result<()>;
    /// Flush the final segment and close the rendition.
    async fn finish(&mut self) -> Result<()>;
}

/// Keyframe-boundary HLS segmenter writing through a [`StorageBackend`].
///
/// Starts at the first keyframe, cuts a new segment at the first keyframe at or
/// after `target_duration` seconds, and after each cut writes the media segment
/// and the regenerated `index.m3u8` to storage under `prefix`.
pub struct HlsSegmenter<M: Muxer, S: StorageBackend> {
    muxer: M,
    storage: S,
    prefix: String,
    playlist: HlsPlaylist,
    clock: crate::segment::SegmentClock,
    seq: u64,
    init_written: bool,
}

impl<M: Muxer, S: StorageBackend> HlsSegmenter<M, S> {
    /// New segmenter writing under `prefix`, targeting `target_duration`-second
    /// segments and a `window`-segment live playlist.
    pub fn new(
        muxer: M,
        storage: S,
        prefix: impl Into<String>,
        target_duration: u64,
        window: usize,
    ) -> Self {
        Self {
            muxer,
            storage,
            prefix: prefix.into(),
            playlist: HlsPlaylist::new(target_duration, window),
            clock: crate::segment::SegmentClock::new(target_duration),
            seq: 0,
            init_written: false,
        }
    }

    /// The HLS `CODECS` attribute reported by the muxer, for a master playlist.
    pub fn codec_string(&self) -> Option<String> {
        self.muxer.codec_string()
    }

    /// On the first CONFIG access unit, ask the muxer for an fMP4 init segment;
    /// if produced, write it and reference it from the playlist via `#EXT-X-MAP`.
    async fn ensure_init_segment(&mut self, frame: &MediaFrame) -> Result<()> {
        if self.init_written || !frame.flags.contains(FrameFlags::CONFIG) {
            return Ok(());
        }
        if let Some(init) = self.muxer.init_segment(frame.codec, &frame.data)? {
            let uri = format!("init.{}", self.muxer.extension());
            let key = format!("{}/{}", self.prefix, uri);
            self.storage.put(&key, init).await?;
            self.playlist.set_map(uri);
        }
        self.init_written = true;
        Ok(())
    }

    /// Enable LL-HLS playlist output with the given part target duration.
    pub fn low_latency(mut self, part_target: f64) -> Self {
        self.playlist = self.playlist.low_latency(part_target);
        self
    }

    /// The storage key of the media playlist.
    pub fn playlist_key(&self) -> String {
        format!("{}/index.m3u8", self.prefix)
    }

    fn segment_uri(&self, seq: u64) -> String {
        format!("seg{}.{}", seq, self.muxer.extension())
    }

    async fn cut(&mut self, duration: f64) -> Result<()> {
        let bytes = self.muxer.finish_segment()?;
        let uri = self.segment_uri(self.seq);
        let key = format!("{}/{}", self.prefix, uri);
        self.storage.put(&key, bytes).await?;
        self.playlist.push(Segment {
            seq: self.seq,
            duration,
            uri,
            discontinuity: false,
        });
        self.write_playlist().await?;
        self.seq += 1;
        Ok(())
    }

    async fn write_playlist(&mut self) -> Result<()> {
        let body = self.playlist.render();
        let key = self.playlist_key();
        self.storage.put(&key, Bytes::from(body.into_bytes())).await
    }
}

#[async_trait]
impl<M: Muxer, S: StorageBackend> Packager for HlsSegmenter<M, S> {
    async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
        self.ensure_init_segment(frame).await?;
        let decision = self.clock.observe(frame);
        if decision.skip {
            return Ok(());
        }
        if let Some(duration) = decision.cut_previous {
            self.cut(duration).await?;
        }
        if decision.open_new {
            self.muxer.start_segment()?;
        }
        self.muxer.write(frame)
    }

    async fn finish(&mut self) -> Result<()> {
        if let Some(duration) = self.clock.flush() {
            self.cut(duration).await?;
        }
        self.playlist.finish();
        self.write_playlist().await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::testing::{video_frame, InMemoryStorage};

    #[tokio::test]
    async fn segments_on_keyframe_after_target_and_writes_playlist() {
        let store = InMemoryStorage::new();
        let mut seg =
            HlsSegmenter::new(PassthroughMuxer::new("ts"), store.clone(), "live/cam", 2, 5);

        // Keyframes every 1s; with a 2s target, a cut happens at the keyframe
        // at/after 2s. pts in ms.
        for i in 0..5 {
            let pts = i * 1000;
            seg.push(&video_frame(pts, true)).await.unwrap();
            // a delta in between
            seg.push(&video_frame(pts + 500, false)).await.unwrap();
        }
        seg.finish().await.unwrap();

        let playlist = store.get("live/cam/index.m3u8").await.unwrap();
        let text = String::from_utf8(playlist.to_vec()).unwrap();
        assert!(text.contains("#EXTM3U"));
        assert!(text.contains("#EXT-X-ENDLIST"));
        // At least the first full segment must have been written to storage.
        assert!(store.get("live/cam/seg0.ts").await.is_ok());
        // Segment payload is the concatenation of its frames' bytes.
        assert!(!store.get("live/cam/seg0.ts").await.unwrap().is_empty());
    }

    /// A muxer that emits an fMP4-style init segment, to exercise EXT-X-MAP.
    struct InitMuxer {
        buf: BytesMut,
    }
    impl Muxer for InitMuxer {
        fn extension(&self) -> &'static str {
            "m4s"
        }
        fn start_segment(&mut self) -> Result<()> {
            self.buf.clear();
            Ok(())
        }
        fn write(&mut self, frame: &MediaFrame) -> Result<()> {
            self.buf.put_slice(&frame.data);
            Ok(())
        }
        fn finish_segment(&mut self) -> Result<Bytes> {
            Ok(std::mem::take(&mut self.buf).freeze())
        }
        fn init_segment(&mut self, _codec: CodecId, config_record: &[u8]) -> Result<Option<Bytes>> {
            // A real muxer would wrap this in ftyp+moov; here we just echo it.
            Ok(Some(Bytes::copy_from_slice(config_record)))
        }
        fn codec_string(&self) -> Option<String> {
            Some("hvc1.1.6.L120.B0".into())
        }
    }

    #[tokio::test]
    async fn fmp4_muxer_writes_init_segment_and_ext_x_map() {
        use crate::FrameFlags;
        let store = InMemoryStorage::new();
        let mut seg = HlsSegmenter::new(
            InitMuxer {
                buf: BytesMut::new(),
            },
            store.clone(),
            "live/hevc",
            2,
            5,
        );
        assert_eq!(seg.codec_string().as_deref(), Some("hvc1.1.6.L120.B0"));

        // First frame carries CONFIG → triggers the init segment.
        let mut cfg = video_frame(0, true);
        cfg.codec = CodecId::H265;
        cfg.flags |= FrameFlags::CONFIG;
        seg.push(&cfg).await.unwrap();
        for i in 1..6 {
            seg.push(&video_frame(i * 1000, true)).await.unwrap();
        }
        seg.finish().await.unwrap();

        assert!(store.get("live/hevc/init.m4s").await.is_ok());
        let pl =
            String::from_utf8(store.get("live/hevc/index.m3u8").await.unwrap().to_vec()).unwrap();
        assert!(pl.contains("#EXT-X-MAP:URI=\"init.m4s\""));
    }
}