use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use lvqr_fragment::{BroadcasterStream, FragmentBroadcaster, FragmentBroadcasterRegistry, FragmentStream};
use parking_lot::RwLock;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use tracing::{info, warn};
use crate::transcoder::{Transcoder, TranscoderContext, TranscoderFactory};
#[derive(Debug, Default)]
pub struct TranscoderStats {
pub fragments_seen: AtomicU64,
pub panics: AtomicU64,
}
type StatsKey = (String, String, String, String);
struct RunnerInner {
registry: FragmentBroadcasterRegistry,
factories: RwLock<Vec<Arc<dyn TranscoderFactory>>>,
tasks: DashMap<StatsKey, JoinHandle<()>>,
stats: DashMap<StatsKey, Arc<TranscoderStats>>,
}
impl RunnerInner {
fn spawn_for(
&self,
broadcast: &str,
track: &str,
bc: &Arc<FragmentBroadcaster>,
factory: &Arc<dyn TranscoderFactory>,
) -> bool {
let rendition = factory.rendition().clone();
let key: StatsKey = (
factory.name().to_string(),
rendition.name.clone(),
broadcast.to_string(),
track.to_string(),
);
if self.tasks.contains_key(&key) {
return false;
}
let last_seg = broadcast.rsplit('/').next().unwrap_or(broadcast);
if self.factories.read().iter().any(|f| f.rendition().name == last_seg) {
return false;
}
let ctx = TranscoderContext {
broadcast: broadcast.to_string(),
track: track.to_string(),
meta: bc.meta(),
rendition: rendition.clone(),
};
let Some(transcoder) = factory.build(&ctx) else {
return false;
};
let handle = match Handle::try_current() {
Ok(h) => h,
Err(_) => {
warn!(
broadcast = %broadcast,
track = %track,
"TranscodeRunner: no tokio runtime; no drain spawned",
);
return false;
}
};
match self.tasks.entry(key.clone()) {
Entry::Occupied(_) => false,
Entry::Vacant(slot) => {
let sub = bc.subscribe();
let stat = Arc::clone(
self.stats
.entry(key.clone())
.or_insert_with(|| Arc::new(TranscoderStats::default()))
.value(),
);
let task = handle.spawn(drive(transcoder, key.0.clone(), ctx, sub, stat));
slot.insert(task);
true
}
}
}
}
#[derive(Clone)]
pub struct TranscodeRunnerHandle {
inner: Arc<RunnerInner>,
}
impl std::fmt::Debug for TranscodeRunnerHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TranscodeRunnerHandle")
.field("tracked_keys", &self.inner.stats.len())
.field("renditions", &self.inner.factories.read().len())
.finish()
}
}
impl TranscodeRunnerHandle {
pub fn fragments_seen(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
self.stat(transcoder, rendition, broadcast, track)
.map(|s| s.fragments_seen.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn panics(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
self.stat(transcoder, rendition, broadcast, track)
.map(|s| s.panics.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn tracked(&self) -> Vec<StatsKey> {
self.inner.stats.iter().map(|e| e.key().clone()).collect()
}
pub fn renditions(&self) -> Vec<crate::RenditionSpec> {
self.inner
.factories
.read()
.iter()
.map(|f| f.rendition().clone())
.collect()
}
pub fn add_rendition(&self, factory: Arc<dyn TranscoderFactory>) -> bool {
{
let mut factories = self.inner.factories.write();
if factories
.iter()
.any(|f| f.name() == factory.name() && f.rendition().name == factory.rendition().name)
{
return false;
}
factories.push(Arc::clone(&factory));
}
for (broadcast, track) in self.inner.registry.keys() {
if let Some(bc) = self.inner.registry.get(&broadcast, &track) {
self.inner.spawn_for(&broadcast, &track, &bc, &factory);
}
}
true
}
pub fn remove_rendition(&self, rendition: &str) -> usize {
self.inner.factories.write().retain(|f| f.rendition().name != rendition);
let mut aborted = 0usize;
self.inner.tasks.retain(|key, task| {
if key.1 == rendition {
task.abort();
aborted += 1;
false
} else {
true
}
});
aborted
}
fn stat(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> Option<Arc<TranscoderStats>> {
self.inner
.stats
.get(&(
transcoder.to_string(),
rendition.to_string(),
broadcast.to_string(),
track.to_string(),
))
.map(|e| Arc::clone(e.value()))
}
}
#[derive(Default)]
pub struct TranscodeRunner {
factories: Vec<Arc<dyn TranscoderFactory>>,
}
impl TranscodeRunner {
pub fn new() -> Self {
Self::default()
}
pub fn with_factory<F: TranscoderFactory>(mut self, factory: F) -> Self {
self.factories.push(Arc::new(factory));
self
}
pub fn with_factory_arc(mut self, factory: Arc<dyn TranscoderFactory>) -> Self {
self.factories.push(factory);
self
}
pub fn with_ladder<F, Fn_>(mut self, ladder: Vec<crate::RenditionSpec>, build: Fn_) -> Self
where
F: TranscoderFactory,
Fn_: Fn(crate::RenditionSpec) -> F,
{
for spec in ladder {
self.factories.push(Arc::new(build(spec)));
}
self
}
pub fn factory_count(&self) -> usize {
self.factories.len()
}
pub fn install(self, registry: &FragmentBroadcasterRegistry) -> TranscodeRunnerHandle {
let inner = Arc::new(RunnerInner {
registry: registry.clone(),
factories: RwLock::new(self.factories),
tasks: DashMap::new(),
stats: DashMap::new(),
});
let inner_cb = Arc::clone(&inner);
registry.on_entry_created(move |broadcast, track, bc| {
let factories: Vec<Arc<dyn TranscoderFactory>> = inner_cb.factories.read().clone();
for factory in &factories {
inner_cb.spawn_for(broadcast, track, bc, factory);
}
});
info!(
renditions = inner.factories.read().len(),
"TranscodeRunner installed on FragmentBroadcasterRegistry",
);
TranscodeRunnerHandle { inner }
}
}
async fn drive(
mut transcoder: Box<dyn Transcoder>,
transcoder_name: String,
ctx: TranscoderContext,
mut sub: BroadcasterStream,
stats: Arc<TranscoderStats>,
) {
let rendition_name = ctx.rendition.name.clone();
sub.refresh_meta();
let ctx = TranscoderContext {
broadcast: ctx.broadcast,
track: ctx.track,
meta: sub.meta().clone(),
rendition: ctx.rendition,
};
let started = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_start(&ctx)));
if started.is_err() {
stats.panics.fetch_add(1, Ordering::Relaxed);
metrics::counter!(
"lvqr_transcode_panics_total",
"transcoder" => transcoder_name.clone(),
"rendition" => rendition_name.clone(),
"phase" => "start",
)
.increment(1);
warn!(
transcoder = %transcoder_name,
rendition = %rendition_name,
broadcast = %ctx.broadcast,
track = %ctx.track,
"Transcoder::on_start panicked; skipping drain loop",
);
return;
}
while let Some(frag) = sub.next_fragment().await {
stats.fragments_seen.fetch_add(1, Ordering::Relaxed);
metrics::counter!(
"lvqr_transcode_fragments_total",
"transcoder" => transcoder_name.clone(),
"rendition" => rendition_name.clone(),
)
.increment(1);
let result = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_fragment(&frag)));
if result.is_err() {
stats.panics.fetch_add(1, Ordering::Relaxed);
metrics::counter!(
"lvqr_transcode_panics_total",
"transcoder" => transcoder_name.clone(),
"rendition" => rendition_name.clone(),
"phase" => "fragment",
)
.increment(1);
warn!(
transcoder = %transcoder_name,
rendition = %rendition_name,
broadcast = %ctx.broadcast,
track = %ctx.track,
group_id = frag.group_id,
object_id = frag.object_id,
"Transcoder::on_fragment panicked; skipping fragment and continuing",
);
}
}
let stopped = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_stop()));
if stopped.is_err() {
stats.panics.fetch_add(1, Ordering::Relaxed);
metrics::counter!(
"lvqr_transcode_panics_total",
"transcoder" => transcoder_name.clone(),
"rendition" => rendition_name.clone(),
"phase" => "stop",
)
.increment(1);
warn!(
transcoder = %transcoder_name,
rendition = %rendition_name,
broadcast = %ctx.broadcast,
track = %ctx.track,
"Transcoder::on_stop panicked",
);
}
info!(
transcoder = %transcoder_name,
rendition = %rendition_name,
broadcast = %ctx.broadcast,
track = %ctx.track,
seen = stats.fragments_seen.load(Ordering::Relaxed),
panics = stats.panics.load(Ordering::Relaxed),
"TranscodeRunner: drain terminated",
);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::passthrough::PassthroughTranscoderFactory;
use crate::rendition::RenditionSpec;
use bytes::Bytes;
use lvqr_fragment::{Fragment, FragmentFlags, FragmentMeta};
use parking_lot::Mutex as PMutex;
use std::time::Duration;
fn meta() -> FragmentMeta {
FragmentMeta::new("avc1.640028", 90_000)
}
fn frag(idx: u64) -> Fragment {
Fragment::new(
"0.mp4",
idx,
0,
0,
idx * 1000,
idx * 1000,
1000,
FragmentFlags::DELTA,
Bytes::from(vec![0xAB; 16]),
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn passthrough_sees_every_fragment_and_stops() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
.install(®istry);
let bc = registry.get_or_create("live/demo", "0.mp4", meta());
for i in 0..5 {
bc.emit(frag(i));
}
drop(bc);
registry.remove("live/demo", "0.mp4");
tokio::time::sleep(Duration::from_millis(150)).await;
assert_eq!(handle.fragments_seen("passthrough", "720p", "live/demo", "0.mp4"), 5);
assert_eq!(handle.panics("passthrough", "720p", "live/demo", "0.mp4"), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn default_ladder_spawns_one_task_per_rendition() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
.install(®istry);
let bc = registry.get_or_create("live/ladder", "0.mp4", meta());
bc.emit(frag(0));
bc.emit(frag(1));
tokio::time::sleep(Duration::from_millis(100)).await;
let mut tracked = handle.tracked();
tracked.sort();
assert_eq!(tracked.len(), 3, "one drain task per rendition");
for (_transcoder, rendition, _broadcast, _track) in &tracked {
let seen = handle.fragments_seen("passthrough", rendition, "live/ladder", "0.mp4");
assert_eq!(seen, 2, "rendition {rendition} saw both fragments");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn factory_opt_out_skips_non_video_tracks() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
.install(®istry);
let bc_audio = registry.get_or_create("live/demo", "1.mp4", FragmentMeta::new("mp4a.40.2", 48_000));
bc_audio.emit(frag(0));
tokio::time::sleep(Duration::from_millis(80)).await;
assert!(handle.tracked().is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn panic_in_on_fragment_is_caught_and_counted() {
struct PanicAtTwo;
impl Transcoder for PanicAtTwo {
fn on_fragment(&mut self, fragment: &Fragment) {
if fragment.group_id == 2 {
panic!("simulated encoder fault at group 2");
}
}
}
struct PanicAtTwoFactory {
rendition: RenditionSpec,
}
impl TranscoderFactory for PanicAtTwoFactory {
fn name(&self) -> &str {
"panicky"
}
fn rendition(&self) -> &RenditionSpec {
&self.rendition
}
fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
Some(Box::new(PanicAtTwo))
}
}
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PanicAtTwoFactory {
rendition: RenditionSpec::preset_720p(),
})
.install(®istry);
let bc = registry.get_or_create("live/panic", "0.mp4", meta());
for i in 0..5 {
bc.emit(frag(i));
}
tokio::time::sleep(Duration::from_millis(120)).await;
assert_eq!(handle.fragments_seen("panicky", "720p", "live/panic", "0.mp4"), 5);
assert_eq!(handle.panics("panicky", "720p", "live/panic", "0.mp4"), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn panic_in_on_start_skips_drain_loop() {
struct PanicStart;
impl Transcoder for PanicStart {
fn on_start(&mut self, _ctx: &TranscoderContext) {
panic!("simulated start failure");
}
fn on_fragment(&mut self, _fragment: &Fragment) {
unreachable!("on_fragment must not run after on_start panics");
}
}
struct PanicStartFactory {
rendition: RenditionSpec,
}
impl TranscoderFactory for PanicStartFactory {
fn name(&self) -> &str {
"bad_start"
}
fn rendition(&self) -> &RenditionSpec {
&self.rendition
}
fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
Some(Box::new(PanicStart))
}
}
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PanicStartFactory {
rendition: RenditionSpec::preset_480p(),
})
.install(®istry);
let bc = registry.get_or_create("live/panic-start", "0.mp4", meta());
bc.emit(frag(0));
bc.emit(frag(1));
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(
handle.fragments_seen("bad_start", "480p", "live/panic-start", "0.mp4"),
0
);
assert_eq!(handle.panics("bad_start", "480p", "live/panic-start", "0.mp4"), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn empty_runner_installs_callback_but_spawns_nothing() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new().install(®istry);
let bc = registry.get_or_create("live/empty", "0.mp4", meta());
bc.emit(frag(0));
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(handle.tracked().is_empty());
}
#[test]
fn runner_default_is_empty() {
let r = TranscodeRunner::default();
assert_eq!(r.factory_count(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn add_rendition_spawns_for_existing_live_source() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
.install(®istry);
let bc = registry.get_or_create("live/x", "0.mp4", meta());
bc.emit(frag(0));
tokio::time::sleep(Duration::from_millis(60)).await;
assert!(handle.add_rendition(Arc::new(
PassthroughTranscoderFactory::new(RenditionSpec::preset_480p())
)));
tokio::time::sleep(Duration::from_millis(60)).await;
bc.emit(frag(1));
bc.emit(frag(2));
tokio::time::sleep(Duration::from_millis(120)).await;
assert_eq!(handle.fragments_seen("passthrough", "720p", "live/x", "0.mp4"), 3);
let s480 = handle.fragments_seen("passthrough", "480p", "live/x", "0.mp4");
assert!(s480 >= 2, "runtime-added 480p must see post-add fragments; saw {s480}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn add_rendition_rejects_duplicate_name() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
.install(®istry);
assert!(!handle.add_rendition(Arc::new(
PassthroughTranscoderFactory::new(RenditionSpec::preset_720p())
)));
assert!(handle.add_rendition(Arc::new(
PassthroughTranscoderFactory::new(RenditionSpec::preset_240p())
)));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remove_rendition_aborts_its_drain_and_leaves_others() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
.install(®istry);
let bc = registry.get_or_create("live/r", "0.mp4", meta());
bc.emit(frag(0));
tokio::time::sleep(Duration::from_millis(80)).await;
let aborted = handle.remove_rendition("480p");
assert_eq!(aborted, 1, "exactly the 480p drain task aborts");
tokio::time::sleep(Duration::from_millis(40)).await;
bc.emit(frag(1));
bc.emit(frag(2));
tokio::time::sleep(Duration::from_millis(120)).await;
let s720 = handle.fragments_seen("passthrough", "720p", "live/r", "0.mp4");
let s480 = handle.fragments_seen("passthrough", "480p", "live/r", "0.mp4");
assert!(s720 >= 3, "surviving 720p keeps draining; saw {s720}");
assert!(s480 < s720, "removed 480p stopped draining ({s480}) vs 720p ({s720})");
let names: Vec<String> = handle.renditions().iter().map(|r| r.name.clone()).collect();
assert!(!names.contains(&"480p".to_string()), "480p gone from ladder: {names:?}");
assert!(names.contains(&"720p".to_string()));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn does_not_transcode_a_rendition_output_broadcast() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
.install(®istry);
let src = registry.get_or_create("live/x", "0.mp4", meta());
let output = registry.get_or_create("live/x/720p", "0.mp4", meta());
src.emit(frag(0));
output.emit(frag(0));
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(handle.fragments_seen("passthrough", "720p", "live/x", "0.mp4"), 1);
assert_eq!(
handle.fragments_seen("passthrough", "720p", "live/x/720p", "0.mp4"),
0,
"output-shaped broadcast must not be transcoded"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn two_factories_share_a_rendition_name() {
struct AltFactory {
rendition: RenditionSpec,
}
impl TranscoderFactory for AltFactory {
fn name(&self) -> &str {
"alt"
}
fn rendition(&self) -> &RenditionSpec {
&self.rendition
}
fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
None
}
}
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
.install(®istry);
assert!(handle.add_rendition(Arc::new(AltFactory {
rendition: RenditionSpec::preset_720p(),
})));
assert_eq!(handle.renditions().len(), 2, "two factories, both for 720p");
handle.remove_rendition("720p");
assert!(handle.renditions().is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn renditions_reflects_runtime_edits() {
let registry = FragmentBroadcasterRegistry::new();
let handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
.install(®istry);
let names =
|h: &TranscodeRunnerHandle| -> Vec<String> { h.renditions().iter().map(|r| r.name.clone()).collect() };
assert_eq!(names(&handle), vec!["720p".to_string()]);
assert!(handle.add_rendition(Arc::new(
PassthroughTranscoderFactory::new(RenditionSpec::preset_480p())
)));
let mut after_add = names(&handle);
after_add.sort();
assert_eq!(after_add, vec!["480p".to_string(), "720p".to_string()]);
assert_eq!(
handle.remove_rendition("720p"),
0,
"no live source, so no task to abort"
);
assert_eq!(names(&handle), vec!["480p".to_string()]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn downstream_subscriber_still_sees_every_fragment() {
let registry = FragmentBroadcasterRegistry::new();
let _handle = TranscodeRunner::new()
.with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_240p()))
.install(®istry);
let bc = registry.get_or_create("live/fanout", "0.mp4", meta());
let mut downstream = bc.subscribe();
let emitted = PMutex::new(Vec::<u64>::new());
for i in 0..4 {
bc.emit(frag(i));
emitted.lock().push(i);
}
tokio::time::sleep(Duration::from_millis(100)).await;
for expected in 0..4u64 {
let f = downstream.next_fragment().await.expect("downstream frag");
assert_eq!(f.group_id, expected);
}
}
}