Skip to main content

arcly_stream/packager/
mod.rs

1//! Egress packaging: turn a live frame stream into HLS/LL-HLS segments.
2//!
3//! Gated behind `hls`. Provides the [`Muxer`] and [`Packager`] contracts, a
4//! sliding-window [`HlsPlaylist`] generator, and an [`HlsSegmenter`] that cuts
5//! segments on keyframe boundaries and writes both media segments and the
6//! `.m3u8` playlist through any [`StorageBackend`].
7//!
8//! The container byte-format is pluggable via [`Muxer`]: a [`PassthroughMuxer`]
9//! (elementary-stream concatenation) ships here; production MPEG-TS or fMP4/CMAF
10//! muxers implement the same trait and drop in unchanged.
11//!
12//! ```no_run
13//! # #[cfg(feature = "storage-fs")]
14//! # async fn demo(handle: arcly_stream::StreamHandle) -> arcly_stream::Result<()> {
15//! use arcly_stream::packager::{HlsSegmenter, Packager, PassthroughMuxer};
16//! use arcly_stream::storage::FsStorage;
17//!
18//! let storage = FsStorage::new("/var/hls");
19//! let mut seg = HlsSegmenter::new(PassthroughMuxer::new("ts"), storage, "live/cam", 6, 5);
20//! let mut sub = handle.subscribe_resilient();
21//! while let Some(frame) = sub.recv().await {
22//!     seg.push(&frame).await?;
23//! }
24//! seg.finish().await?;
25//! # Ok(())
26//! # }
27//! ```
28
29mod playlist;
30
31#[cfg(feature = "mpegts")]
32#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
33mod mpegts;
34
35#[cfg(feature = "mpegts")]
36#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
37pub use mpegts::MpegTsMuxer;
38
39#[cfg(feature = "fmp4")]
40#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
41mod fmp4;
42
43#[cfg(feature = "fmp4")]
44#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
45pub use fmp4::Fmp4Muxer;
46
47pub use playlist::{HlsPlaylist, Segment};
48
49use crate::traits::StorageBackend;
50use crate::{CodecId, FrameFlags, MediaFrame, Result};
51use async_trait::async_trait;
52use bytes::{BufMut, Bytes, BytesMut};
53
54/// Container muxer: accumulates frames into one segment's bytes.
55///
56/// Implement this for MPEG-TS, fMP4/CMAF, etc. The segmenter calls
57/// [`start_segment`](Self::start_segment) at each boundary, [`write`](Self::write)
58/// per frame, and [`finish_segment`](Self::finish_segment) to take the bytes.
59pub trait Muxer: Send {
60    /// File extension for produced segments (e.g. `"ts"`, `"m4s"`).
61    fn extension(&self) -> &'static str;
62
63    /// Begin a fresh segment, discarding any partial state.
64    fn start_segment(&mut self) -> Result<()>;
65
66    /// Append one frame to the current segment.
67    fn write(&mut self, frame: &MediaFrame) -> Result<()>;
68
69    /// Take the finished segment's bytes, resetting for the next one.
70    fn finish_segment(&mut self) -> Result<Bytes>;
71
72    /// Build an fMP4 initialization segment (`ftyp`+`moov` carrying the
73    /// `avcC`/`hvcC`/`av1C`/`vvcC` decoder config) from a CONFIG access unit.
74    ///
75    /// Returning `Some` makes [`HlsSegmenter`] write it once and reference it via
76    /// `#EXT-X-MAP`. The default returns `None` — correct for self-initializing
77    /// containers like MPEG-TS and the [`PassthroughMuxer`].
78    fn init_segment(&mut self, _codec: CodecId, _config_record: &[u8]) -> Result<Option<Bytes>> {
79        Ok(None)
80    }
81
82    /// The HLS `CODECS` attribute for this rendition's master-playlist entry
83    /// (e.g. `"hvc1.1.6.L120.B0"`), once known.
84    fn codec_string(&self) -> Option<String> {
85        None
86    }
87}
88
89/// A trivial muxer that concatenates frame payloads (elementary stream).
90///
91/// Useful for testing and raw recording; not a real container. Swap in a TS or
92/// fMP4 muxer for player-compatible output.
93pub struct PassthroughMuxer {
94    ext: &'static str,
95    buf: BytesMut,
96}
97
98impl PassthroughMuxer {
99    /// New passthrough muxer producing segments with the given extension.
100    pub fn new(ext: &'static str) -> Self {
101        Self {
102            ext,
103            buf: BytesMut::new(),
104        }
105    }
106}
107
108impl Muxer for PassthroughMuxer {
109    fn extension(&self) -> &'static str {
110        self.ext
111    }
112    fn start_segment(&mut self) -> Result<()> {
113        self.buf.clear();
114        Ok(())
115    }
116    fn write(&mut self, frame: &MediaFrame) -> Result<()> {
117        self.buf.put_slice(&frame.data);
118        Ok(())
119    }
120    fn finish_segment(&mut self) -> Result<Bytes> {
121        Ok(std::mem::take(&mut self.buf).freeze())
122    }
123}
124
125/// Consumes a live frame stream and produces a packaged rendition.
126#[async_trait]
127pub trait Packager: Send {
128    /// Feed one frame; may finalize and emit a segment as a side effect.
129    async fn push(&mut self, frame: &MediaFrame) -> Result<()>;
130    /// Flush the final segment and close the rendition.
131    async fn finish(&mut self) -> Result<()>;
132}
133
134/// Keyframe-boundary HLS segmenter writing through a [`StorageBackend`].
135///
136/// Starts at the first keyframe, cuts a new segment at the first keyframe at or
137/// after `target_duration` seconds, and after each cut writes the media segment
138/// and the regenerated `index.m3u8` to storage under `prefix`.
139pub struct HlsSegmenter<M: Muxer, S: StorageBackend> {
140    muxer: M,
141    storage: S,
142    prefix: String,
143    playlist: HlsPlaylist,
144    clock: crate::segment::SegmentClock,
145    seq: u64,
146    init_written: bool,
147}
148
149impl<M: Muxer, S: StorageBackend> HlsSegmenter<M, S> {
150    /// New segmenter writing under `prefix`, targeting `target_duration`-second
151    /// segments and a `window`-segment live playlist.
152    pub fn new(
153        muxer: M,
154        storage: S,
155        prefix: impl Into<String>,
156        target_duration: u64,
157        window: usize,
158    ) -> Self {
159        Self {
160            muxer,
161            storage,
162            prefix: prefix.into(),
163            playlist: HlsPlaylist::new(target_duration, window),
164            clock: crate::segment::SegmentClock::new(target_duration),
165            seq: 0,
166            init_written: false,
167        }
168    }
169
170    /// The HLS `CODECS` attribute reported by the muxer, for a master playlist.
171    pub fn codec_string(&self) -> Option<String> {
172        self.muxer.codec_string()
173    }
174
175    /// On the first CONFIG access unit, ask the muxer for an fMP4 init segment;
176    /// if produced, write it and reference it from the playlist via `#EXT-X-MAP`.
177    async fn ensure_init_segment(&mut self, frame: &MediaFrame) -> Result<()> {
178        if self.init_written || !frame.flags.contains(FrameFlags::CONFIG) {
179            return Ok(());
180        }
181        if let Some(init) = self.muxer.init_segment(frame.codec, &frame.data)? {
182            let uri = format!("init.{}", self.muxer.extension());
183            let key = format!("{}/{}", self.prefix, uri);
184            self.storage.put(&key, init).await?;
185            self.playlist.set_map(uri);
186        }
187        self.init_written = true;
188        Ok(())
189    }
190
191    /// Enable LL-HLS playlist output with the given part target duration.
192    pub fn low_latency(mut self, part_target: f64) -> Self {
193        self.playlist = self.playlist.low_latency(part_target);
194        self
195    }
196
197    /// The storage key of the media playlist.
198    pub fn playlist_key(&self) -> String {
199        format!("{}/index.m3u8", self.prefix)
200    }
201
202    fn segment_uri(&self, seq: u64) -> String {
203        format!("seg{}.{}", seq, self.muxer.extension())
204    }
205
206    async fn cut(&mut self, duration: f64) -> Result<()> {
207        let bytes = self.muxer.finish_segment()?;
208        let uri = self.segment_uri(self.seq);
209        let key = format!("{}/{}", self.prefix, uri);
210        self.storage.put(&key, bytes).await?;
211        self.playlist.push(Segment {
212            seq: self.seq,
213            duration,
214            uri,
215            discontinuity: false,
216        });
217        self.write_playlist().await?;
218        self.seq += 1;
219        Ok(())
220    }
221
222    async fn write_playlist(&mut self) -> Result<()> {
223        let body = self.playlist.render();
224        let key = self.playlist_key();
225        self.storage.put(&key, Bytes::from(body.into_bytes())).await
226    }
227}
228
229#[async_trait]
230impl<M: Muxer, S: StorageBackend> Packager for HlsSegmenter<M, S> {
231    async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
232        self.ensure_init_segment(frame).await?;
233        let decision = self.clock.observe(frame);
234        if decision.skip {
235            return Ok(());
236        }
237        if let Some(duration) = decision.cut_previous {
238            self.cut(duration).await?;
239        }
240        if decision.open_new {
241            self.muxer.start_segment()?;
242        }
243        self.muxer.write(frame)
244    }
245
246    async fn finish(&mut self) -> Result<()> {
247        if let Some(duration) = self.clock.flush() {
248            self.cut(duration).await?;
249        }
250        self.playlist.finish();
251        self.write_playlist().await
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::testing::{video_frame, InMemoryStorage};
259
260    #[tokio::test]
261    async fn segments_on_keyframe_after_target_and_writes_playlist() {
262        let store = InMemoryStorage::new();
263        let mut seg =
264            HlsSegmenter::new(PassthroughMuxer::new("ts"), store.clone(), "live/cam", 2, 5);
265
266        // Keyframes every 1s; with a 2s target, a cut happens at the keyframe
267        // at/after 2s. pts in ms.
268        for i in 0..5 {
269            let pts = i * 1000;
270            seg.push(&video_frame(pts, true)).await.unwrap();
271            // a delta in between
272            seg.push(&video_frame(pts + 500, false)).await.unwrap();
273        }
274        seg.finish().await.unwrap();
275
276        let playlist = store.get("live/cam/index.m3u8").await.unwrap();
277        let text = String::from_utf8(playlist.to_vec()).unwrap();
278        assert!(text.contains("#EXTM3U"));
279        assert!(text.contains("#EXT-X-ENDLIST"));
280        // At least the first full segment must have been written to storage.
281        assert!(store.get("live/cam/seg0.ts").await.is_ok());
282        // Segment payload is the concatenation of its frames' bytes.
283        assert!(!store.get("live/cam/seg0.ts").await.unwrap().is_empty());
284    }
285
286    /// A muxer that emits an fMP4-style init segment, to exercise EXT-X-MAP.
287    struct InitMuxer {
288        buf: BytesMut,
289    }
290    impl Muxer for InitMuxer {
291        fn extension(&self) -> &'static str {
292            "m4s"
293        }
294        fn start_segment(&mut self) -> Result<()> {
295            self.buf.clear();
296            Ok(())
297        }
298        fn write(&mut self, frame: &MediaFrame) -> Result<()> {
299            self.buf.put_slice(&frame.data);
300            Ok(())
301        }
302        fn finish_segment(&mut self) -> Result<Bytes> {
303            Ok(std::mem::take(&mut self.buf).freeze())
304        }
305        fn init_segment(&mut self, _codec: CodecId, config_record: &[u8]) -> Result<Option<Bytes>> {
306            // A real muxer would wrap this in ftyp+moov; here we just echo it.
307            Ok(Some(Bytes::copy_from_slice(config_record)))
308        }
309        fn codec_string(&self) -> Option<String> {
310            Some("hvc1.1.6.L120.B0".into())
311        }
312    }
313
314    #[tokio::test]
315    async fn fmp4_muxer_writes_init_segment_and_ext_x_map() {
316        use crate::FrameFlags;
317        let store = InMemoryStorage::new();
318        let mut seg = HlsSegmenter::new(
319            InitMuxer {
320                buf: BytesMut::new(),
321            },
322            store.clone(),
323            "live/hevc",
324            2,
325            5,
326        );
327        assert_eq!(seg.codec_string().as_deref(), Some("hvc1.1.6.L120.B0"));
328
329        // First frame carries CONFIG → triggers the init segment.
330        let mut cfg = video_frame(0, true);
331        cfg.codec = CodecId::H265;
332        cfg.flags |= FrameFlags::CONFIG;
333        seg.push(&cfg).await.unwrap();
334        for i in 1..6 {
335            seg.push(&video_frame(i * 1000, true)).await.unwrap();
336        }
337        seg.finish().await.unwrap();
338
339        assert!(store.get("live/hevc/init.m4s").await.is_ok());
340        let pl =
341            String::from_utf8(store.get("live/hevc/index.m3u8").await.unwrap().to_vec()).unwrap();
342        assert!(pl.contains("#EXT-X-MAP:URI=\"init.m4s\""));
343    }
344}