arcly_stream/packager/
mod.rs1mod playlist;
30
31#[cfg(feature = "mpegts")]
32#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
33mod mpegts;
34
35#[cfg(feature = "mpegts")]
36#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
37pub use mpegts::MpegTsMuxer;
38
39#[cfg(feature = "fmp4")]
40#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
41mod fmp4;
42
43#[cfg(feature = "fmp4")]
44#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
45pub use fmp4::Fmp4Muxer;
46
47pub use playlist::{HlsPlaylist, Segment};
48
49use crate::traits::StorageBackend;
50use crate::{CodecId, FrameFlags, MediaFrame, Result};
51use async_trait::async_trait;
52use bytes::{BufMut, Bytes, BytesMut};
53
54pub trait Muxer: Send {
60 fn extension(&self) -> &'static str;
62
63 fn start_segment(&mut self) -> Result<()>;
65
66 fn write(&mut self, frame: &MediaFrame) -> Result<()>;
68
69 fn finish_segment(&mut self) -> Result<Bytes>;
71
72 fn init_segment(&mut self, _codec: CodecId, _config_record: &[u8]) -> Result<Option<Bytes>> {
79 Ok(None)
80 }
81
82 fn codec_string(&self) -> Option<String> {
85 None
86 }
87}
88
89pub struct PassthroughMuxer {
94 ext: &'static str,
95 buf: BytesMut,
96}
97
98impl PassthroughMuxer {
99 pub fn new(ext: &'static str) -> Self {
101 Self {
102 ext,
103 buf: BytesMut::new(),
104 }
105 }
106}
107
108impl Muxer for PassthroughMuxer {
109 fn extension(&self) -> &'static str {
110 self.ext
111 }
112 fn start_segment(&mut self) -> Result<()> {
113 self.buf.clear();
114 Ok(())
115 }
116 fn write(&mut self, frame: &MediaFrame) -> Result<()> {
117 self.buf.put_slice(&frame.data);
118 Ok(())
119 }
120 fn finish_segment(&mut self) -> Result<Bytes> {
121 Ok(std::mem::take(&mut self.buf).freeze())
122 }
123}
124
125#[async_trait]
127pub trait Packager: Send {
128 async fn push(&mut self, frame: &MediaFrame) -> Result<()>;
130 async fn finish(&mut self) -> Result<()>;
132}
133
134pub struct HlsSegmenter<M: Muxer, S: StorageBackend> {
140 muxer: M,
141 storage: S,
142 prefix: String,
143 playlist: HlsPlaylist,
144 clock: crate::segment::SegmentClock,
145 seq: u64,
146 init_written: bool,
147}
148
149impl<M: Muxer, S: StorageBackend> HlsSegmenter<M, S> {
150 pub fn new(
153 muxer: M,
154 storage: S,
155 prefix: impl Into<String>,
156 target_duration: u64,
157 window: usize,
158 ) -> Self {
159 Self {
160 muxer,
161 storage,
162 prefix: prefix.into(),
163 playlist: HlsPlaylist::new(target_duration, window),
164 clock: crate::segment::SegmentClock::new(target_duration),
165 seq: 0,
166 init_written: false,
167 }
168 }
169
170 pub fn codec_string(&self) -> Option<String> {
172 self.muxer.codec_string()
173 }
174
175 async fn ensure_init_segment(&mut self, frame: &MediaFrame) -> Result<()> {
178 if self.init_written || !frame.flags.contains(FrameFlags::CONFIG) {
179 return Ok(());
180 }
181 if let Some(init) = self.muxer.init_segment(frame.codec, &frame.data)? {
182 let uri = format!("init.{}", self.muxer.extension());
183 let key = format!("{}/{}", self.prefix, uri);
184 self.storage.put(&key, init).await?;
185 self.playlist.set_map(uri);
186 }
187 self.init_written = true;
188 Ok(())
189 }
190
191 pub fn low_latency(mut self, part_target: f64) -> Self {
193 self.playlist = self.playlist.low_latency(part_target);
194 self
195 }
196
197 pub fn playlist_key(&self) -> String {
199 format!("{}/index.m3u8", self.prefix)
200 }
201
202 fn segment_uri(&self, seq: u64) -> String {
203 format!("seg{}.{}", seq, self.muxer.extension())
204 }
205
206 async fn cut(&mut self, duration: f64) -> Result<()> {
207 let bytes = self.muxer.finish_segment()?;
208 let uri = self.segment_uri(self.seq);
209 let key = format!("{}/{}", self.prefix, uri);
210 self.storage.put(&key, bytes).await?;
211 self.playlist.push(Segment {
212 seq: self.seq,
213 duration,
214 uri,
215 discontinuity: false,
216 });
217 self.write_playlist().await?;
218 self.seq += 1;
219 Ok(())
220 }
221
222 async fn write_playlist(&mut self) -> Result<()> {
223 let body = self.playlist.render();
224 let key = self.playlist_key();
225 self.storage.put(&key, Bytes::from(body.into_bytes())).await
226 }
227}
228
229#[async_trait]
230impl<M: Muxer, S: StorageBackend> Packager for HlsSegmenter<M, S> {
231 async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
232 self.ensure_init_segment(frame).await?;
233 let decision = self.clock.observe(frame);
234 if decision.skip {
235 return Ok(());
236 }
237 if let Some(duration) = decision.cut_previous {
238 self.cut(duration).await?;
239 }
240 if decision.open_new {
241 self.muxer.start_segment()?;
242 }
243 self.muxer.write(frame)
244 }
245
246 async fn finish(&mut self) -> Result<()> {
247 if let Some(duration) = self.clock.flush() {
248 self.cut(duration).await?;
249 }
250 self.playlist.finish();
251 self.write_playlist().await
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::testing::{video_frame, InMemoryStorage};
259
260 #[tokio::test]
261 async fn segments_on_keyframe_after_target_and_writes_playlist() {
262 let store = InMemoryStorage::new();
263 let mut seg =
264 HlsSegmenter::new(PassthroughMuxer::new("ts"), store.clone(), "live/cam", 2, 5);
265
266 for i in 0..5 {
269 let pts = i * 1000;
270 seg.push(&video_frame(pts, true)).await.unwrap();
271 seg.push(&video_frame(pts + 500, false)).await.unwrap();
273 }
274 seg.finish().await.unwrap();
275
276 let playlist = store.get("live/cam/index.m3u8").await.unwrap();
277 let text = String::from_utf8(playlist.to_vec()).unwrap();
278 assert!(text.contains("#EXTM3U"));
279 assert!(text.contains("#EXT-X-ENDLIST"));
280 assert!(store.get("live/cam/seg0.ts").await.is_ok());
282 assert!(!store.get("live/cam/seg0.ts").await.unwrap().is_empty());
284 }
285
286 struct InitMuxer {
288 buf: BytesMut,
289 }
290 impl Muxer for InitMuxer {
291 fn extension(&self) -> &'static str {
292 "m4s"
293 }
294 fn start_segment(&mut self) -> Result<()> {
295 self.buf.clear();
296 Ok(())
297 }
298 fn write(&mut self, frame: &MediaFrame) -> Result<()> {
299 self.buf.put_slice(&frame.data);
300 Ok(())
301 }
302 fn finish_segment(&mut self) -> Result<Bytes> {
303 Ok(std::mem::take(&mut self.buf).freeze())
304 }
305 fn init_segment(&mut self, _codec: CodecId, config_record: &[u8]) -> Result<Option<Bytes>> {
306 Ok(Some(Bytes::copy_from_slice(config_record)))
308 }
309 fn codec_string(&self) -> Option<String> {
310 Some("hvc1.1.6.L120.B0".into())
311 }
312 }
313
314 #[tokio::test]
315 async fn fmp4_muxer_writes_init_segment_and_ext_x_map() {
316 use crate::FrameFlags;
317 let store = InMemoryStorage::new();
318 let mut seg = HlsSegmenter::new(
319 InitMuxer {
320 buf: BytesMut::new(),
321 },
322 store.clone(),
323 "live/hevc",
324 2,
325 5,
326 );
327 assert_eq!(seg.codec_string().as_deref(), Some("hvc1.1.6.L120.B0"));
328
329 let mut cfg = video_frame(0, true);
331 cfg.codec = CodecId::H265;
332 cfg.flags |= FrameFlags::CONFIG;
333 seg.push(&cfg).await.unwrap();
334 for i in 1..6 {
335 seg.push(&video_frame(i * 1000, true)).await.unwrap();
336 }
337 seg.finish().await.unwrap();
338
339 assert!(store.get("live/hevc/init.m4s").await.is_ok());
340 let pl =
341 String::from_utf8(store.get("live/hevc/index.m3u8").await.unwrap().to_vec()).unwrap();
342 assert!(pl.contains("#EXT-X-MAP:URI=\"init.m4s\""));
343 }
344}