mod engine;
mod playlist;
use engine::{SegmentEngine, BANDWIDTH_HINT};
#[cfg(feature = "mpegts")]
#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
mod mpegts;
#[cfg(feature = "mpegts")]
#[cfg_attr(docsrs, doc(cfg(feature = "mpegts")))]
pub use mpegts::MpegTsMuxer;
#[cfg(feature = "fmp4")]
#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
mod fmp4;
#[cfg(feature = "fmp4")]
#[cfg_attr(docsrs, doc(cfg(feature = "fmp4")))]
pub use fmp4::Fmp4Muxer;
pub use playlist::{render_master, HlsPlaylist, Part, Segment};
#[cfg(feature = "dash")]
#[cfg_attr(docsrs, doc(cfg(feature = "dash")))]
mod dash;
#[cfg(feature = "dash")]
#[cfg_attr(docsrs, doc(cfg(feature = "dash")))]
pub use dash::{DashManifest, DashPackager};
use crate::traits::StorageBackend;
use crate::{CodecId, FrameFlags, MediaFrame, Result};
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
pub trait Muxer: Send {
fn extension(&self) -> &'static str;
fn start_segment(&mut self) -> Result<()>;
fn write(&mut self, frame: &MediaFrame) -> Result<()>;
fn finish_segment(&mut self) -> Result<Bytes>;
fn take_partial(&mut self) -> Result<Option<Bytes>> {
Ok(None)
}
fn init_segment(&mut self, _codec: CodecId, _config_record: &[u8]) -> Result<Option<Bytes>> {
Ok(None)
}
fn build_init_from(&mut self, configs: &[(CodecId, Bytes)]) -> Result<Option<Bytes>> {
match configs.first() {
Some((codec, data)) => self.init_segment(*codec, data),
None => Ok(None),
}
}
fn codec_string(&self) -> Option<String> {
None
}
}
pub struct PassthroughMuxer {
ext: &'static str,
buf: BytesMut,
}
impl PassthroughMuxer {
pub fn new(ext: &'static str) -> Self {
Self {
ext,
buf: BytesMut::new(),
}
}
}
impl Muxer for PassthroughMuxer {
fn extension(&self) -> &'static str {
self.ext
}
fn start_segment(&mut self) -> Result<()> {
self.buf.clear();
Ok(())
}
fn write(&mut self, frame: &MediaFrame) -> Result<()> {
self.buf.put_slice(&frame.data);
Ok(())
}
fn finish_segment(&mut self) -> Result<Bytes> {
Ok(std::mem::take(&mut self.buf).freeze())
}
fn take_partial(&mut self) -> Result<Option<Bytes>> {
if self.buf.is_empty() {
return Ok(None);
}
Ok(Some(std::mem::take(&mut self.buf).freeze()))
}
}
#[async_trait]
pub trait Packager: Send {
async fn push(&mut self, frame: &MediaFrame) -> Result<()>;
async fn finish(&mut self) -> Result<()>;
}
pub struct HlsSegmenter<M: Muxer, S: StorageBackend> {
engine: SegmentEngine<M, S>,
playlist: HlsPlaylist,
master_written: bool,
codec_string: Option<String>,
part_target_ms: i64,
part_idx: u64,
part_anchor_pts: Option<i64>,
last_pts: i64,
seg_part_bytes: Vec<Bytes>,
}
impl<M: Muxer, S: StorageBackend> HlsSegmenter<M, S> {
pub fn new(
muxer: M,
storage: S,
prefix: impl Into<String>,
target_duration: u64,
window: usize,
) -> Self {
Self {
engine: SegmentEngine::new(muxer, storage, prefix, target_duration),
playlist: HlsPlaylist::new(target_duration, window),
master_written: false,
codec_string: None,
part_target_ms: 0,
part_idx: 0,
part_anchor_pts: None,
last_pts: 0,
seg_part_bytes: Vec::new(),
}
}
pub fn codec_string(&self) -> Option<String> {
self.codec_string
.clone()
.or_else(|| self.engine.muxer.codec_string())
}
async fn ensure_init_segment(&mut self, frame: &MediaFrame) -> Result<()> {
if self.codec_string.is_none()
&& frame.is_video()
&& frame.flags.contains(FrameFlags::CONFIG)
{
self.codec_string = crate::codec::dispatch::parse_config(frame.codec, &frame.data)
.and_then(|p| crate::codec::dispatch::hls_codec_string(frame.codec, &p));
}
if let Some(uri) = self.engine.ensure_init(frame).await? {
self.playlist.set_map(uri);
}
Ok(())
}
pub fn low_latency(mut self, part_target: f64) -> Self {
self.playlist = self.playlist.low_latency(part_target);
self.part_target_ms = (part_target.max(0.05) * 1000.0) as i64;
self
}
pub fn playlist_key(&self) -> String {
format!("{}/index.m3u8", self.engine.prefix)
}
pub fn master_key(&self) -> String {
format!("{}/master.m3u8", self.engine.prefix)
}
async fn ensure_master_playlist(&mut self) -> Result<()> {
if self.master_written {
return Ok(());
}
let Some(codecs) = self.codec_string() else {
return Ok(());
};
let body = playlist::render_master("index.m3u8", Some(&codecs), BANDWIDTH_HINT);
self.engine
.storage
.put(
&self.engine.key("master.m3u8"),
Bytes::from(body.into_bytes()),
)
.await?;
self.master_written = true;
Ok(())
}
async fn flush_part(&mut self, end_pts: i64) -> Result<()> {
let Some(bytes) = self.engine.muxer.take_partial()? else {
return Ok(());
};
let anchor = self.part_anchor_pts.unwrap_or(end_pts);
let duration = (end_pts - anchor).max(0) as f64 / 1000.0;
let uri = self.engine.part_uri(self.engine.seq, self.part_idx);
let key = self.engine.key(&uri);
self.engine.storage.put(&key, bytes.clone()).await?;
self.seg_part_bytes.push(bytes);
self.playlist.add_pending_part(Part {
uri,
duration,
independent: self.part_idx == 0,
});
self.playlist
.set_preload_hint(self.engine.part_uri(self.engine.seq, self.part_idx + 1));
self.part_idx += 1;
self.part_anchor_pts = Some(end_pts);
self.write_playlist().await
}
async fn cut(&mut self, duration: f64) -> Result<()> {
let uri = self.engine.segment_uri(self.engine.seq);
if self.part_target_ms > 0 {
self.flush_part(self.last_pts).await?;
let mut full = BytesMut::new();
for b in &self.seg_part_bytes {
full.put_slice(b);
}
let full = if full.is_empty() {
self.engine.muxer.finish_segment()?
} else {
full.freeze()
};
let key = self.engine.key(&uri);
self.engine.storage.put(&key, full).await?;
self.playlist.commit_segment(Segment {
seq: self.engine.seq,
duration,
uri,
discontinuity: false,
parts: Vec::new(), });
self.seg_part_bytes.clear();
self.part_idx = 0;
self.part_anchor_pts = None;
} else {
let bytes = self.engine.muxer.finish_segment()?;
let key = self.engine.key(&uri);
self.engine.storage.put(&key, bytes).await?;
self.playlist.push(Segment {
seq: self.engine.seq,
duration,
uri,
discontinuity: false,
parts: Vec::new(),
});
}
self.write_playlist().await?;
self.engine.seq += 1;
Ok(())
}
async fn write_playlist(&mut self) -> Result<()> {
let body = self.playlist.render();
self.engine
.storage
.put(
&self.engine.key("index.m3u8"),
Bytes::from(body.into_bytes()),
)
.await
}
}
#[async_trait]
impl<M: Muxer, S: StorageBackend> Packager for HlsSegmenter<M, S> {
async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
self.ensure_init_segment(frame).await?;
self.ensure_master_playlist().await?;
let decision = self.engine.observe(frame);
if decision.skip {
return Ok(());
}
if let Some(duration) = decision.cut_previous {
self.cut(duration).await?;
}
if decision.open_new {
self.engine.muxer.start_segment()?;
self.part_anchor_pts = Some(frame.pts);
self.part_idx = 0;
self.seg_part_bytes.clear();
}
self.engine.muxer.write(frame)?;
self.last_pts = frame.pts;
if self.part_target_ms > 0 {
if let Some(anchor) = self.part_anchor_pts {
if frame.pts - anchor >= self.part_target_ms {
self.flush_part(frame.pts).await?;
}
}
}
Ok(())
}
async fn finish(&mut self) -> Result<()> {
if let Some(duration) = self.engine.flush() {
self.cut(duration).await?;
}
self.playlist.finish();
self.write_playlist().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::{video_frame, InMemoryStorage};
#[tokio::test]
async fn segments_on_keyframe_after_target_and_writes_playlist() {
let store = InMemoryStorage::new();
let mut seg =
HlsSegmenter::new(PassthroughMuxer::new("ts"), store.clone(), "live/cam", 2, 5);
for i in 0..5 {
let pts = i * 1000;
seg.push(&video_frame(pts, true)).await.unwrap();
seg.push(&video_frame(pts + 500, false)).await.unwrap();
}
seg.finish().await.unwrap();
let playlist = store.get("live/cam/index.m3u8").await.unwrap();
let text = String::from_utf8(playlist.to_vec()).unwrap();
assert!(text.contains("#EXTM3U"));
assert!(text.contains("#EXT-X-ENDLIST"));
assert!(store.get("live/cam/seg0.ts").await.is_ok());
assert!(!store.get("live/cam/seg0.ts").await.unwrap().is_empty());
}
#[tokio::test]
async fn ll_hls_emits_part_files_and_ext_x_part() {
let store = InMemoryStorage::new();
let mut seg =
HlsSegmenter::new(PassthroughMuxer::new("m4s"), store.clone(), "live/ll", 2, 5)
.low_latency(0.5);
seg.push(&video_frame(0, true)).await.unwrap();
for i in 1..8 {
seg.push(&video_frame(i * 250, false)).await.unwrap();
}
seg.push(&video_frame(2000, true)).await.unwrap(); seg.push(&video_frame(2250, false)).await.unwrap();
seg.finish().await.unwrap();
let text =
String::from_utf8(store.get("live/ll/index.m3u8").await.unwrap().to_vec()).unwrap();
assert!(text.contains("#EXT-X-PART-INF:PART-TARGET=0.500"));
assert!(
text.contains("#EXT-X-PART:DURATION="),
"renders EXT-X-PART lines"
);
assert!(
text.contains("INDEPENDENT=YES"),
"first part is independent"
);
assert!(
store.get("live/ll/seg0.0.m4s").await.is_ok(),
"part 0 written"
);
assert!(
store.get("live/ll/seg0.1.m4s").await.is_ok(),
"part 1 written"
);
assert!(
store.get("live/ll/seg0.m4s").await.is_ok(),
"full segment 0 written"
);
let mut concat = Vec::new();
let mut p = 0;
while let Ok(part) = store.get(&format!("live/ll/seg0.{p}.m4s")).await {
concat.extend_from_slice(&part);
p += 1;
}
assert!(p >= 2, "segment 0 had multiple parts");
let full = store.get("live/ll/seg0.m4s").await.unwrap();
assert_eq!(&concat[..], &full[..], "full segment == concat of parts");
}
#[tokio::test]
async fn ll_hls_falls_back_to_full_segment_when_muxer_has_no_parts() {
let store = InMemoryStorage::new();
let mut seg = HlsSegmenter::new(
InitMuxer {
buf: BytesMut::new(),
},
store.clone(),
"live/nopart",
2,
5,
)
.low_latency(0.5);
seg.push(&video_frame(0, true)).await.unwrap();
for i in 1..6 {
seg.push(&video_frame(i * 1000, i % 2 == 0)).await.unwrap();
}
seg.finish().await.unwrap();
let seg0 = store.get("live/nopart/seg0.m4s").await.unwrap();
assert!(!seg0.is_empty(), "LL-HLS segment must not be empty");
}
#[tokio::test]
async fn writes_master_playlist_with_hevc_codecs() {
use crate::FrameFlags;
let store = InMemoryStorage::new();
let mut seg = HlsSegmenter::new(
InitMuxer {
buf: BytesMut::new(),
},
store.clone(),
"live/hevc",
2,
5,
);
let mut cfg = video_frame(0, true);
cfg.codec = CodecId::H265;
cfg.flags |= FrameFlags::CONFIG;
seg.push(&cfg).await.unwrap();
for i in 1..4 {
seg.push(&video_frame(i * 1000, true)).await.unwrap();
}
seg.finish().await.unwrap();
let master =
String::from_utf8(store.get("live/hevc/master.m3u8").await.unwrap().to_vec()).unwrap();
assert!(master.contains("#EXT-X-STREAM-INF:"));
assert!(
master.contains("CODECS=\"hvc1.1.6.L120.B0\""),
"master advertises HEVC codec string: {master}"
);
assert!(master.contains("index.m3u8"));
}
struct InitMuxer {
buf: BytesMut,
}
impl Muxer for InitMuxer {
fn extension(&self) -> &'static str {
"m4s"
}
fn start_segment(&mut self) -> Result<()> {
self.buf.clear();
Ok(())
}
fn write(&mut self, frame: &MediaFrame) -> Result<()> {
self.buf.put_slice(&frame.data);
Ok(())
}
fn finish_segment(&mut self) -> Result<Bytes> {
Ok(std::mem::take(&mut self.buf).freeze())
}
fn init_segment(&mut self, _codec: CodecId, config_record: &[u8]) -> Result<Option<Bytes>> {
Ok(Some(Bytes::copy_from_slice(config_record)))
}
fn codec_string(&self) -> Option<String> {
Some("hvc1.1.6.L120.B0".into())
}
}
#[tokio::test]
async fn fmp4_muxer_writes_init_segment_and_ext_x_map() {
use crate::FrameFlags;
let store = InMemoryStorage::new();
let mut seg = HlsSegmenter::new(
InitMuxer {
buf: BytesMut::new(),
},
store.clone(),
"live/hevc",
2,
5,
);
assert_eq!(seg.codec_string().as_deref(), Some("hvc1.1.6.L120.B0"));
let mut cfg = video_frame(0, true);
cfg.codec = CodecId::H265;
cfg.flags |= FrameFlags::CONFIG;
seg.push(&cfg).await.unwrap();
for i in 1..6 {
seg.push(&video_frame(i * 1000, true)).await.unwrap();
}
seg.finish().await.unwrap();
assert!(store.get("live/hevc/init.m4s").await.is_ok());
let pl =
String::from_utf8(store.get("live/hevc/index.m3u8").await.unwrap().to_vec()).unwrap();
assert!(pl.contains("#EXT-X-MAP:URI=\"init.m4s\""));
}
}