use bytes::Bytes;
use lvqr_core::now_unix_ms;
use lvqr_fragment::{Fragment, FragmentBroadcasterRegistry, FragmentFlags, FragmentMeta, SCTE35_TRACK};
pub fn publish_init(
registry: &FragmentBroadcasterRegistry,
broadcast: &str,
track: &str,
codec: &str,
timescale: u32,
init: Bytes,
) {
let bc = registry.get_or_create(broadcast, track, FragmentMeta::new(codec, timescale));
bc.set_init_segment(init);
}
pub fn publish_fragment(
registry: &FragmentBroadcasterRegistry,
broadcast: &str,
track: &str,
codec: &str,
timescale: u32,
mut frag: Fragment,
) {
if frag.ingest_time_ms == 0 {
frag.ingest_time_ms = now_unix_ms();
}
let bc = registry.get_or_create(broadcast, track, FragmentMeta::new(codec, timescale));
bc.emit(frag);
}
pub fn publish_scte35(
registry: &FragmentBroadcasterRegistry,
broadcast: &str,
event_id: u64,
pts_90k: u64,
duration_90k: u64,
section: Bytes,
) {
let bc = registry.get_or_create(broadcast, SCTE35_TRACK, FragmentMeta::new("scte35", 90_000));
let frag = Fragment::new(
SCTE35_TRACK,
event_id,
0,
0,
pts_90k,
pts_90k,
duration_90k,
FragmentFlags::KEYFRAME,
section,
)
.with_ingest_time_ms(now_unix_ms());
bc.emit(frag);
}
#[cfg(test)]
mod tests {
use super::*;
use lvqr_fragment::FragmentStream;
fn mk_frag(seq: u64, is_key: bool, payload: &'static [u8]) -> Fragment {
Fragment::new(
"0.mp4",
seq,
0,
0,
seq * 1000,
seq * 1000,
1000,
if is_key {
FragmentFlags::KEYFRAME
} else {
FragmentFlags::DELTA
},
Bytes::from_static(payload),
)
}
#[tokio::test]
async fn publish_init_installs_meta_on_registry() {
let reg = FragmentBroadcasterRegistry::new();
publish_init(®, "bcast", "0.mp4", "avc1", 90_000, Bytes::from_static(b"INIT"));
let bc = reg.get("bcast", "0.mp4").expect("broadcaster created");
assert_eq!(bc.meta().init_segment.as_ref().unwrap().as_ref(), b"INIT");
assert_eq!(bc.meta().timescale, 90_000);
}
#[tokio::test]
async fn publish_fragment_reaches_subscriber() {
let reg = FragmentBroadcasterRegistry::new();
let bc = reg.get_or_create("bcast", "0.mp4", FragmentMeta::new("avc1", 90_000));
let mut sub = bc.subscribe();
publish_fragment(®, "bcast", "0.mp4", "avc1", 90_000, mk_frag(1, true, b"kf"));
let delivered = sub.next_fragment().await.expect("broadcaster saw fragment");
assert_eq!(delivered.payload.as_ref(), b"kf");
assert!(delivered.flags.keyframe);
}
#[tokio::test]
async fn publish_scte35_lands_on_reserved_track() {
let reg = FragmentBroadcasterRegistry::new();
let bc = reg.get_or_create("live", SCTE35_TRACK, FragmentMeta::new("scte35", 90_000));
let mut sub = bc.subscribe();
let section = Bytes::from_static(&[0xFC, 0x30, 0x11, 0x00, 0x00]);
publish_scte35(®, "live", 0xDEADBEEF, 8_100_000, 2_700_000, section.clone());
let frag = sub.next_fragment().await.expect("scte35 fragment delivered");
assert_eq!(frag.track_id, SCTE35_TRACK);
assert_eq!(frag.group_id, 0xDEADBEEF);
assert_eq!(frag.pts, 8_100_000);
assert_eq!(frag.duration, 2_700_000);
assert_eq!(frag.payload, section);
assert!(frag.flags.keyframe);
}
#[tokio::test]
async fn publish_fragment_before_subscribe_is_dropped() {
let reg = FragmentBroadcasterRegistry::new();
publish_fragment(®, "bcast", "0.mp4", "avc1", 90_000, mk_frag(1, true, b"early"));
let bc = reg.get("bcast", "0.mp4").expect("broadcaster created by publish");
let mut sub = bc.subscribe();
publish_fragment(®, "bcast", "0.mp4", "avc1", 90_000, mk_frag(2, false, b"late"));
let delivered = sub.next_fragment().await.expect("late fragment arrives");
assert_eq!(delivered.payload.as_ref(), b"late");
}
}