arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Shared mechanics for keyframe-boundary segmenters.
//!
//! [`HlsSegmenter`](super::HlsSegmenter) and [`DashPackager`](super::DashPackager)
//! differ only in their playlist/manifest format and (for HLS) LL-HLS part
//! handling. The keyframe-driven skeleton they share — owning the muxer, cutting
//! on the [`SegmentClock`](crate::segment::SegmentClock), building the init
//! segment from the first CONFIG access unit, naming segment/part files, and
//! writing them under a storage prefix — lives here so it is implemented once.

use super::Muxer;
use crate::segment::{SegmentClock, SegmentDecision};
use crate::traits::StorageBackend;
use crate::{CodecId, FrameFlags, MediaFrame, Result};
use bytes::Bytes;

/// A nominal per-rendition bitrate (bits/sec) for playlist/manifest `BANDWIDTH`
/// attributes. A real deployment measures actual bitrate per rendition; with a
/// single rendition the exact value does not affect player selection.
pub(crate) const BANDWIDTH_HINT: u32 = 2_000_000;

/// The muxer + storage + clock core shared by every keyframe-boundary segmenter.
pub(crate) struct SegmentEngine<M: Muxer, S: StorageBackend> {
    /// The container muxer. Public to the packager so it can drive
    /// `write`/`finish_segment`/`take_partial` directly.
    pub muxer: M,
    pub storage: S,
    pub prefix: String,
    pub clock: SegmentClock,
    /// Monotonic media-sequence number of the next segment to cut.
    pub seq: u64,
    init_written: bool,
    /// CONFIG access units accumulated before the first media sample (e.g. video
    /// `avcC` + audio `asc`), used to build a possibly-multi-track init segment.
    pending_configs: Vec<(CodecId, Bytes)>,
}

impl<M: Muxer, S: StorageBackend> SegmentEngine<M, S> {
    pub fn new(muxer: M, storage: S, prefix: impl Into<String>, target_duration: u64) -> Self {
        Self {
            muxer,
            storage,
            prefix: prefix.into(),
            clock: SegmentClock::new(target_duration),
            seq: 0,
            init_written: false,
            pending_configs: Vec::new(),
        }
    }

    /// `segN.<ext>` — the full-segment file name for sequence `seq`.
    pub fn segment_uri(&self, seq: u64) -> String {
        format!("seg{}.{}", seq, self.muxer.extension())
    }

    /// `segN.P.<ext>` — the LL-HLS partial-segment file name.
    pub fn part_uri(&self, seq: u64, part: u64) -> String {
        format!("seg{}.{}.{}", seq, part, self.muxer.extension())
    }

    /// The absolute storage key for a relative `uri` under this engine's prefix.
    ///
    /// A `&self` method (not an `async put`) so callers can `self.storage.put`
    /// directly: a future borrowing all of `&SegmentEngine` would not be `Send`
    /// (the muxer `M` is `Send` but not `Sync`), whereas `&S` alone is.
    pub fn key(&self, uri: &str) -> String {
        format!("{}/{}", self.prefix, uri)
    }

    /// Feed a frame to the segment clock (cut/open/skip decision).
    pub fn observe(&mut self, frame: &MediaFrame) -> SegmentDecision {
        self.clock.observe(frame)
    }

    /// Flush a trailing partial segment's duration at end-of-stream, if any.
    pub fn flush(&mut self) -> Option<f64> {
        self.clock.flush()
    }

    /// Accumulate CONFIG access units, then on the **first media sample** ask the
    /// muxer to build an init segment from all of them (so a multi-track muxer can
    /// include audio). If one is produced, write it under `init.<ext>` and return
    /// its relative URI for the caller to reference (HLS `#EXT-X-MAP`, DASH
    /// `initialization`). Builds at most once; later calls are no-ops.
    ///
    /// Deferring the build to the first sample is what lets the audio config —
    /// which arrives *after* the video config in the cached-config replay order —
    /// be part of the init segment.
    pub async fn ensure_init(&mut self, frame: &MediaFrame) -> Result<Option<String>> {
        if self.init_written {
            return Ok(None);
        }
        if frame.flags.contains(FrameFlags::CONFIG) {
            // Buffer this track's config; the init is built once media starts.
            self.pending_configs.push((frame.codec, frame.data.clone()));
            return Ok(None);
        }
        // First media sample: build the init from every config seen so far.
        self.init_written = true;
        if self.pending_configs.is_empty() {
            return Ok(None);
        }
        let configs = std::mem::take(&mut self.pending_configs);
        let Some(init) = self.muxer.build_init_from(&configs)? else {
            return Ok(None);
        };
        let uri = format!("init.{}", self.muxer.extension());
        self.storage.put(&self.key(&uri), init).await?;
        Ok(Some(uri))
    }
}