mod playlist;
#[cfg(feature = "mpegts")]
#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
mod mpegts;
#[cfg(feature = "mpegts")]
#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
pub use mpegts::MpegTsMuxer;
pub use playlist::{HlsPlaylist, Segment};
use crate::traits::StorageBackend;
use crate::{CodecId, FrameFlags, MediaFrame, Result};
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
pub trait Muxer: Send {
fn extension(&self) -> &'static str;
fn start_segment(&mut self) -> Result<()>;
fn write(&mut self, frame: &MediaFrame) -> Result<()>;
fn finish_segment(&mut self) -> Result<Bytes>;
fn init_segment(&mut self, _codec: CodecId, _config_record: &[u8]) -> Result<Option<Bytes>> {
Ok(None)
}
fn codec_string(&self) -> Option<String> {
None
}
}
pub struct PassthroughMuxer {
ext: &'static str,
buf: BytesMut,
}
impl PassthroughMuxer {
pub fn new(ext: &'static str) -> Self {
Self {
ext,
buf: BytesMut::new(),
}
}
}
impl Muxer for PassthroughMuxer {
fn extension(&self) -> &'static str {
self.ext
}
fn start_segment(&mut self) -> Result<()> {
self.buf.clear();
Ok(())
}
fn write(&mut self, frame: &MediaFrame) -> Result<()> {
self.buf.put_slice(&frame.data);
Ok(())
}
fn finish_segment(&mut self) -> Result<Bytes> {
Ok(std::mem::take(&mut self.buf).freeze())
}
}
#[async_trait]
pub trait Packager: Send {
async fn push(&mut self, frame: &MediaFrame) -> Result<()>;
async fn finish(&mut self) -> Result<()>;
}
pub struct HlsSegmenter<M: Muxer, S: StorageBackend> {
muxer: M,
storage: S,
prefix: String,
playlist: HlsPlaylist,
clock: crate::segment::SegmentClock,
seq: u64,
init_written: bool,
}
impl<M: Muxer, S: StorageBackend> HlsSegmenter<M, S> {
pub fn new(
muxer: M,
storage: S,
prefix: impl Into<String>,
target_duration: u64,
window: usize,
) -> Self {
Self {
muxer,
storage,
prefix: prefix.into(),
playlist: HlsPlaylist::new(target_duration, window),
clock: crate::segment::SegmentClock::new(target_duration),
seq: 0,
init_written: false,
}
}
pub fn codec_string(&self) -> Option<String> {
self.muxer.codec_string()
}
async fn ensure_init_segment(&mut self, frame: &MediaFrame) -> Result<()> {
if self.init_written || !frame.flags.contains(FrameFlags::CONFIG) {
return Ok(());
}
if let Some(init) = self.muxer.init_segment(frame.codec, &frame.data)? {
let uri = format!("init.{}", self.muxer.extension());
let key = format!("{}/{}", self.prefix, uri);
self.storage.put(&key, init).await?;
self.playlist.set_map(uri);
}
self.init_written = true;
Ok(())
}
pub fn low_latency(mut self, part_target: f64) -> Self {
self.playlist = self.playlist.low_latency(part_target);
self
}
pub fn playlist_key(&self) -> String {
format!("{}/index.m3u8", self.prefix)
}
fn segment_uri(&self, seq: u64) -> String {
format!("seg{}.{}", seq, self.muxer.extension())
}
async fn cut(&mut self, duration: f64) -> Result<()> {
let bytes = self.muxer.finish_segment()?;
let uri = self.segment_uri(self.seq);
let key = format!("{}/{}", self.prefix, uri);
self.storage.put(&key, bytes).await?;
self.playlist.push(Segment {
seq: self.seq,
duration,
uri,
discontinuity: false,
});
self.write_playlist().await?;
self.seq += 1;
Ok(())
}
async fn write_playlist(&mut self) -> Result<()> {
let body = self.playlist.render();
let key = self.playlist_key();
self.storage.put(&key, Bytes::from(body.into_bytes())).await
}
}
#[async_trait]
impl<M: Muxer, S: StorageBackend> Packager for HlsSegmenter<M, S> {
async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
self.ensure_init_segment(frame).await?;
let decision = self.clock.observe(frame);
if decision.skip {
return Ok(());
}
if let Some(duration) = decision.cut_previous {
self.cut(duration).await?;
}
if decision.open_new {
self.muxer.start_segment()?;
}
self.muxer.write(frame)
}
async fn finish(&mut self) -> Result<()> {
if let Some(duration) = self.clock.flush() {
self.cut(duration).await?;
}
self.playlist.finish();
self.write_playlist().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::{video_frame, InMemoryStorage};
#[tokio::test]
async fn segments_on_keyframe_after_target_and_writes_playlist() {
let store = InMemoryStorage::new();
let mut seg =
HlsSegmenter::new(PassthroughMuxer::new("ts"), store.clone(), "live/cam", 2, 5);
for i in 0..5 {
let pts = i * 1000;
seg.push(&video_frame(pts, true)).await.unwrap();
seg.push(&video_frame(pts + 500, false)).await.unwrap();
}
seg.finish().await.unwrap();
let playlist = store.get("live/cam/index.m3u8").await.unwrap();
let text = String::from_utf8(playlist.to_vec()).unwrap();
assert!(text.contains("#EXTM3U"));
assert!(text.contains("#EXT-X-ENDLIST"));
assert!(store.get("live/cam/seg0.ts").await.is_ok());
assert!(!store.get("live/cam/seg0.ts").await.unwrap().is_empty());
}
struct InitMuxer {
buf: BytesMut,
}
impl Muxer for InitMuxer {
fn extension(&self) -> &'static str {
"m4s"
}
fn start_segment(&mut self) -> Result<()> {
self.buf.clear();
Ok(())
}
fn write(&mut self, frame: &MediaFrame) -> Result<()> {
self.buf.put_slice(&frame.data);
Ok(())
}
fn finish_segment(&mut self) -> Result<Bytes> {
Ok(std::mem::take(&mut self.buf).freeze())
}
fn init_segment(&mut self, _codec: CodecId, config_record: &[u8]) -> Result<Option<Bytes>> {
Ok(Some(Bytes::copy_from_slice(config_record)))
}
fn codec_string(&self) -> Option<String> {
Some("hvc1.1.6.L120.B0".into())
}
}
#[tokio::test]
async fn fmp4_muxer_writes_init_segment_and_ext_x_map() {
use crate::FrameFlags;
let store = InMemoryStorage::new();
let mut seg = HlsSegmenter::new(
InitMuxer {
buf: BytesMut::new(),
},
store.clone(),
"live/hevc",
2,
5,
);
assert_eq!(seg.codec_string().as_deref(), Some("hvc1.1.6.L120.B0"));
let mut cfg = video_frame(0, true);
cfg.codec = CodecId::H265;
cfg.flags |= FrameFlags::CONFIG;
seg.push(&cfg).await.unwrap();
for i in 1..6 {
seg.push(&video_frame(i * 1000, true)).await.unwrap();
}
seg.finish().await.unwrap();
assert!(store.get("live/hevc/init.m4s").await.is_ok());
let pl =
String::from_utf8(store.get("live/hevc/index.m3u8").await.unwrap().to_vec()).unwrap();
assert!(pl.contains("#EXT-X-MAP:URI=\"init.m4s\""));
}
}