Skip to main content

lvqr_transcode/
runner.rs

1//! [`TranscodeRunner`] + [`TranscodeRunnerHandle`] + [`TranscoderStats`].
2//!
3//! Wires registered [`crate::TranscoderFactory`] instances into a
4//! shared [`lvqr_fragment::FragmentBroadcasterRegistry`] and drives
5//! one tokio drain task per `(transcoder, rendition, broadcast,
6//! track)` instance. Mirrors [`lvqr_agent::AgentRunner`] one-for-
7//! one, with `(factory_name, rendition_name, broadcast, track)` as
8//! the four-tuple stats key so metrics distinguish renditions of
9//! the same factory.
10
11use std::panic::AssertUnwindSafe;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14
15use dashmap::DashMap;
16use lvqr_fragment::{BroadcasterStream, FragmentBroadcasterRegistry, FragmentStream};
17use parking_lot::Mutex;
18use tokio::runtime::Handle;
19use tokio::task::JoinHandle;
20use tracing::{info, warn};
21
22use crate::transcoder::{Transcoder, TranscoderContext, TranscoderFactory};
23
24/// Per-`(transcoder, rendition, broadcast, track)` outcome
25/// counters.
26#[derive(Debug, Default)]
27pub struct TranscoderStats {
28    /// Total fragments handed to [`Transcoder::on_fragment`]
29    /// (regardless of panic outcome).
30    pub fragments_seen: AtomicU64,
31
32    /// Count of caught panics across `on_start`, `on_fragment`,
33    /// and `on_stop` for this key.
34    pub panics: AtomicU64,
35}
36
37/// Stats key: `(transcoder_name, rendition_name, broadcast, track)`.
38/// Two factories of the same name targeting different renditions
39/// live under separate keys so metrics distinguish them.
40type StatsKey = (String, String, String, String);
41
42/// Cheaply-cloneable handle returned by
43/// [`TranscodeRunner::install`].
44///
45/// Holds the spawned per-transcoder drain tasks alive for the
46/// server lifetime; tests and admin consumers read per-
47/// `(transcoder, rendition, broadcast, track)` counters off this
48/// handle. Dropping it aborts every spawned task; mid-stride
49/// aborts do NOT call [`Transcoder::on_stop`], matching the
50/// [`lvqr_agent::AgentRunnerHandle`] shutdown shape.
51#[derive(Clone)]
52pub struct TranscodeRunnerHandle {
53    stats: Arc<DashMap<StatsKey, Arc<TranscoderStats>>>,
54    _tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
55}
56
57impl std::fmt::Debug for TranscodeRunnerHandle {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("TranscodeRunnerHandle")
60            .field("tracked_keys", &self.stats.len())
61            .finish()
62    }
63}
64
65impl TranscodeRunnerHandle {
66    /// Total fragments observed by `transcoder` producing
67    /// `rendition` from `(broadcast, track)`. Returns 0 if no
68    /// transcoder under that key has fired yet.
69    pub fn fragments_seen(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
70        self.stat(transcoder, rendition, broadcast, track)
71            .map(|s| s.fragments_seen.load(Ordering::Relaxed))
72            .unwrap_or(0)
73    }
74
75    /// Caught-panic count for `transcoder` producing `rendition`
76    /// from `(broadcast, track)`. Aggregates `on_start`,
77    /// `on_fragment`, and `on_stop` panics under one counter.
78    pub fn panics(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
79        self.stat(transcoder, rendition, broadcast, track)
80            .map(|s| s.panics.load(Ordering::Relaxed))
81            .unwrap_or(0)
82    }
83
84    /// Snapshot of every `(transcoder, rendition, broadcast, track)`
85    /// quadruple the runner has spawned a drain task for.
86    pub fn tracked(&self) -> Vec<StatsKey> {
87        self.stats.iter().map(|e| e.key().clone()).collect()
88    }
89
90    fn stat(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> Option<Arc<TranscoderStats>> {
91        self.stats
92            .get(&(
93                transcoder.to_string(),
94                rendition.to_string(),
95                broadcast.to_string(),
96                track.to_string(),
97            ))
98            .map(|e| Arc::clone(e.value()))
99    }
100}
101
102/// Builder that collects [`TranscoderFactory`] registrations and
103/// installs them onto a [`FragmentBroadcasterRegistry`]. Typical
104/// usage -- three rungs of the default ladder:
105///
106/// ```no_run
107/// # use lvqr_transcode::{PassthroughTranscoderFactory, RenditionSpec, TranscodeRunner};
108/// # use lvqr_fragment::FragmentBroadcasterRegistry;
109/// let registry = FragmentBroadcasterRegistry::new();
110/// let _handle = TranscodeRunner::new()
111///     .with_ladder(RenditionSpec::default_ladder(), |spec| {
112///         PassthroughTranscoderFactory::new(spec)
113///     })
114///     .install(&registry);
115/// // hold _handle for the server lifetime
116/// ```
117#[derive(Default)]
118pub struct TranscodeRunner {
119    factories: Vec<Arc<dyn TranscoderFactory>>,
120}
121
122impl TranscodeRunner {
123    /// Construct an empty runner.
124    pub fn new() -> Self {
125        Self::default()
126    }
127
128    /// Register a transcoder factory by value.
129    pub fn with_factory<F: TranscoderFactory>(mut self, factory: F) -> Self {
130        self.factories.push(Arc::new(factory));
131        self
132    }
133
134    /// Register a pre-arc'd factory. Useful when the caller
135    /// already shares an `Arc<dyn TranscoderFactory>` with other
136    /// server-side state.
137    pub fn with_factory_arc(mut self, factory: Arc<dyn TranscoderFactory>) -> Self {
138        self.factories.push(factory);
139        self
140    }
141
142    /// Convenience: register one factory per rendition in the
143    /// supplied ladder, building each factory from its rendition
144    /// via `build`. Mirrors the `RenditionSpec::default_ladder()`
145    /// -> three `PassthroughTranscoderFactory` pattern without
146    /// forcing the caller to unroll it.
147    pub fn with_ladder<F, Fn_>(mut self, ladder: Vec<crate::RenditionSpec>, build: Fn_) -> Self
148    where
149        F: TranscoderFactory,
150        Fn_: Fn(crate::RenditionSpec) -> F,
151    {
152        for spec in ladder {
153            self.factories.push(Arc::new(build(spec)));
154        }
155        self
156    }
157
158    /// How many factories are currently registered. Useful for
159    /// `Default`-instantiated runners that want to gate their own
160    /// install calls.
161    pub fn factory_count(&self) -> usize {
162        self.factories.len()
163    }
164
165    /// Wire an `on_entry_created` callback on `registry` so every
166    /// new `(broadcast, track)` pair gets one drain task per
167    /// transcoder the registered factories opt into. Returns a
168    /// handle the caller MUST hold for the server lifetime;
169    /// dropping it aborts every spawned task.
170    ///
171    /// Callback semantics mirror [`lvqr_agent::AgentRunner::install`]:
172    /// the callback runs on the thread that wins the
173    /// `get_or_create` insertion race, subscribes synchronously
174    /// so no emit can race ahead of the drain loop, and spawns
175    /// the per-transcoder drain task on the current tokio
176    /// runtime. If no tokio runtime is available the warn logs
177    /// and no task spawns.
178    pub fn install(self, registry: &FragmentBroadcasterRegistry) -> TranscodeRunnerHandle {
179        let stats: Arc<DashMap<StatsKey, Arc<TranscoderStats>>> = Arc::new(DashMap::new());
180        let tasks: Arc<Mutex<Vec<JoinHandle<()>>>> = Arc::new(Mutex::new(Vec::new()));
181
182        let factories = self.factories;
183        let stats_cb = Arc::clone(&stats);
184        let tasks_cb = Arc::clone(&tasks);
185
186        registry.on_entry_created(move |broadcast, track, bc| {
187            let handle = match Handle::try_current() {
188                Ok(h) => h,
189                Err(_) => {
190                    warn!(
191                        broadcast = %broadcast,
192                        track = %track,
193                        "TranscodeRunner: registry callback fired outside tokio runtime; no drain spawned",
194                    );
195                    return;
196                }
197            };
198
199            for factory in &factories {
200                let rendition = factory.rendition().clone();
201                let ctx = TranscoderContext {
202                    broadcast: broadcast.to_string(),
203                    track: track.to_string(),
204                    meta: bc.meta(),
205                    rendition: rendition.clone(),
206                };
207                let Some(transcoder) = factory.build(&ctx) else {
208                    continue;
209                };
210
211                let sub = bc.subscribe();
212                let key: StatsKey = (
213                    factory.name().to_string(),
214                    rendition.name.clone(),
215                    broadcast.to_string(),
216                    track.to_string(),
217                );
218                let stat = Arc::clone(
219                    stats_cb
220                        .entry(key.clone())
221                        .or_insert_with(|| Arc::new(TranscoderStats::default()))
222                        .value(),
223                );
224                let factory_name = factory.name().to_string();
225                let ctx_for_task = ctx.clone();
226                let task = handle.spawn(drive(transcoder, factory_name, ctx_for_task, sub, stat));
227                tasks_cb.lock().push(task);
228            }
229        });
230
231        info!(
232            tracked = stats.len(),
233            "TranscodeRunner installed on FragmentBroadcasterRegistry",
234        );
235
236        TranscodeRunnerHandle { stats, _tasks: tasks }
237    }
238}
239
240/// Per-transcoder drain task. Runs until the broadcaster closes.
241/// All trait dispatch is wrapped in `catch_unwind` so a panic in
242/// any of `on_start` / `on_fragment` / `on_stop` is logged +
243/// counted but does not propagate to the spawning runtime.
244async fn drive(
245    mut transcoder: Box<dyn Transcoder>,
246    transcoder_name: String,
247    ctx: TranscoderContext,
248    mut sub: BroadcasterStream,
249    stats: Arc<TranscoderStats>,
250) {
251    let rendition_name = ctx.rendition.name.clone();
252
253    // Refresh the meta snapshot before `on_start`. The
254    // `on_entry_created` callback fires synchronously inside
255    // `FragmentBroadcasterRegistry::get_or_create`, *before* the
256    // ingest side calls `set_init_segment`. A transcoder that
257    // reads `ctx.meta.init_segment` at on_start time would miss
258    // the header bytes -- which is a silent break for the
259    // software pipeline (qtdemux finds no playable streams). The
260    // refresh below catches the late init without changing the
261    // trait surface. Tier 4 item 4.6 session 106 C fix.
262    sub.refresh_meta();
263    let ctx = TranscoderContext {
264        broadcast: ctx.broadcast,
265        track: ctx.track,
266        meta: sub.meta().clone(),
267        rendition: ctx.rendition,
268    };
269
270    // on_start: a panic here means we abort the drain loop.
271    // Handing fragments to a transcoder whose setup panicked
272    // would amplify the fault, not contain it.
273    let started = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_start(&ctx)));
274    if started.is_err() {
275        stats.panics.fetch_add(1, Ordering::Relaxed);
276        metrics::counter!(
277            "lvqr_transcode_panics_total",
278            "transcoder" => transcoder_name.clone(),
279            "rendition" => rendition_name.clone(),
280            "phase" => "start",
281        )
282        .increment(1);
283        warn!(
284            transcoder = %transcoder_name,
285            rendition = %rendition_name,
286            broadcast = %ctx.broadcast,
287            track = %ctx.track,
288            "Transcoder::on_start panicked; skipping drain loop",
289        );
290        return;
291    }
292
293    while let Some(frag) = sub.next_fragment().await {
294        stats.fragments_seen.fetch_add(1, Ordering::Relaxed);
295        metrics::counter!(
296            "lvqr_transcode_fragments_total",
297            "transcoder" => transcoder_name.clone(),
298            "rendition" => rendition_name.clone(),
299        )
300        .increment(1);
301        let result = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_fragment(&frag)));
302        if result.is_err() {
303            stats.panics.fetch_add(1, Ordering::Relaxed);
304            metrics::counter!(
305                "lvqr_transcode_panics_total",
306                "transcoder" => transcoder_name.clone(),
307                "rendition" => rendition_name.clone(),
308                "phase" => "fragment",
309            )
310            .increment(1);
311            warn!(
312                transcoder = %transcoder_name,
313                rendition = %rendition_name,
314                broadcast = %ctx.broadcast,
315                track = %ctx.track,
316                group_id = frag.group_id,
317                object_id = frag.object_id,
318                "Transcoder::on_fragment panicked; skipping fragment and continuing",
319            );
320        }
321    }
322
323    let stopped = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_stop()));
324    if stopped.is_err() {
325        stats.panics.fetch_add(1, Ordering::Relaxed);
326        metrics::counter!(
327            "lvqr_transcode_panics_total",
328            "transcoder" => transcoder_name.clone(),
329            "rendition" => rendition_name.clone(),
330            "phase" => "stop",
331        )
332        .increment(1);
333        warn!(
334            transcoder = %transcoder_name,
335            rendition = %rendition_name,
336            broadcast = %ctx.broadcast,
337            track = %ctx.track,
338            "Transcoder::on_stop panicked",
339        );
340    }
341
342    info!(
343        transcoder = %transcoder_name,
344        rendition = %rendition_name,
345        broadcast = %ctx.broadcast,
346        track = %ctx.track,
347        seen = stats.fragments_seen.load(Ordering::Relaxed),
348        panics = stats.panics.load(Ordering::Relaxed),
349        "TranscodeRunner: drain terminated",
350    );
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use crate::passthrough::PassthroughTranscoderFactory;
357    use crate::rendition::RenditionSpec;
358    use bytes::Bytes;
359    use lvqr_fragment::{Fragment, FragmentFlags, FragmentMeta};
360    use parking_lot::Mutex as PMutex;
361    use std::time::Duration;
362
363    fn meta() -> FragmentMeta {
364        FragmentMeta::new("avc1.640028", 90_000)
365    }
366
367    fn frag(idx: u64) -> Fragment {
368        Fragment::new(
369            "0.mp4",
370            idx,
371            0,
372            0,
373            idx * 1000,
374            idx * 1000,
375            1000,
376            FragmentFlags::DELTA,
377            Bytes::from(vec![0xAB; 16]),
378        )
379    }
380
381    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
382    async fn passthrough_sees_every_fragment_and_stops() {
383        let registry = FragmentBroadcasterRegistry::new();
384        let handle = TranscodeRunner::new()
385            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
386            .install(&registry);
387
388        let bc = registry.get_or_create("live/demo", "0.mp4", meta());
389        for i in 0..5 {
390            bc.emit(frag(i));
391        }
392        drop(bc);
393        registry.remove("live/demo", "0.mp4");
394        tokio::time::sleep(Duration::from_millis(150)).await;
395
396        assert_eq!(handle.fragments_seen("passthrough", "720p", "live/demo", "0.mp4"), 5);
397        assert_eq!(handle.panics("passthrough", "720p", "live/demo", "0.mp4"), 0);
398    }
399
400    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
401    async fn default_ladder_spawns_one_task_per_rendition() {
402        let registry = FragmentBroadcasterRegistry::new();
403        let handle = TranscodeRunner::new()
404            .with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
405            .install(&registry);
406
407        let bc = registry.get_or_create("live/ladder", "0.mp4", meta());
408        bc.emit(frag(0));
409        bc.emit(frag(1));
410        tokio::time::sleep(Duration::from_millis(100)).await;
411
412        // Three renditions, each observing both fragments.
413        let mut tracked = handle.tracked();
414        tracked.sort();
415        assert_eq!(tracked.len(), 3, "one drain task per rendition");
416        for (_transcoder, rendition, _broadcast, _track) in &tracked {
417            let seen = handle.fragments_seen("passthrough", rendition, "live/ladder", "0.mp4");
418            assert_eq!(seen, 2, "rendition {rendition} saw both fragments");
419        }
420    }
421
422    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
423    async fn factory_opt_out_skips_non_video_tracks() {
424        let registry = FragmentBroadcasterRegistry::new();
425        let handle = TranscodeRunner::new()
426            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
427            .install(&registry);
428
429        let bc_audio = registry.get_or_create("live/demo", "1.mp4", FragmentMeta::new("mp4a.40.2", 48_000));
430        bc_audio.emit(frag(0));
431        tokio::time::sleep(Duration::from_millis(80)).await;
432
433        // Passthrough factory opts out of non-video tracks; no
434        // drain task spawns for the audio track.
435        assert!(handle.tracked().is_empty());
436    }
437
438    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
439    async fn panic_in_on_fragment_is_caught_and_counted() {
440        struct PanicAtTwo;
441        impl Transcoder for PanicAtTwo {
442            fn on_fragment(&mut self, fragment: &Fragment) {
443                if fragment.group_id == 2 {
444                    panic!("simulated encoder fault at group 2");
445                }
446            }
447        }
448        struct PanicAtTwoFactory {
449            rendition: RenditionSpec,
450        }
451        impl TranscoderFactory for PanicAtTwoFactory {
452            fn name(&self) -> &str {
453                "panicky"
454            }
455            fn rendition(&self) -> &RenditionSpec {
456                &self.rendition
457            }
458            fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
459                Some(Box::new(PanicAtTwo))
460            }
461        }
462
463        let registry = FragmentBroadcasterRegistry::new();
464        let handle = TranscodeRunner::new()
465            .with_factory(PanicAtTwoFactory {
466                rendition: RenditionSpec::preset_720p(),
467            })
468            .install(&registry);
469
470        let bc = registry.get_or_create("live/panic", "0.mp4", meta());
471        for i in 0..5 {
472            bc.emit(frag(i));
473        }
474        tokio::time::sleep(Duration::from_millis(120)).await;
475
476        assert_eq!(handle.fragments_seen("panicky", "720p", "live/panic", "0.mp4"), 5);
477        assert_eq!(handle.panics("panicky", "720p", "live/panic", "0.mp4"), 1);
478    }
479
480    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
481    async fn panic_in_on_start_skips_drain_loop() {
482        struct PanicStart;
483        impl Transcoder for PanicStart {
484            fn on_start(&mut self, _ctx: &TranscoderContext) {
485                panic!("simulated start failure");
486            }
487            fn on_fragment(&mut self, _fragment: &Fragment) {
488                unreachable!("on_fragment must not run after on_start panics");
489            }
490        }
491        struct PanicStartFactory {
492            rendition: RenditionSpec,
493        }
494        impl TranscoderFactory for PanicStartFactory {
495            fn name(&self) -> &str {
496                "bad_start"
497            }
498            fn rendition(&self) -> &RenditionSpec {
499                &self.rendition
500            }
501            fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
502                Some(Box::new(PanicStart))
503            }
504        }
505
506        let registry = FragmentBroadcasterRegistry::new();
507        let handle = TranscodeRunner::new()
508            .with_factory(PanicStartFactory {
509                rendition: RenditionSpec::preset_480p(),
510            })
511            .install(&registry);
512
513        let bc = registry.get_or_create("live/panic-start", "0.mp4", meta());
514        bc.emit(frag(0));
515        bc.emit(frag(1));
516        tokio::time::sleep(Duration::from_millis(100)).await;
517
518        assert_eq!(
519            handle.fragments_seen("bad_start", "480p", "live/panic-start", "0.mp4"),
520            0
521        );
522        assert_eq!(handle.panics("bad_start", "480p", "live/panic-start", "0.mp4"), 1);
523    }
524
525    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
526    async fn empty_runner_installs_callback_but_spawns_nothing() {
527        let registry = FragmentBroadcasterRegistry::new();
528        let handle = TranscodeRunner::new().install(&registry);
529
530        let bc = registry.get_or_create("live/empty", "0.mp4", meta());
531        bc.emit(frag(0));
532        tokio::time::sleep(Duration::from_millis(50)).await;
533
534        assert!(handle.tracked().is_empty());
535    }
536
537    #[test]
538    fn runner_default_is_empty() {
539        let r = TranscodeRunner::default();
540        assert_eq!(r.factory_count(), 0);
541    }
542
543    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
544    async fn downstream_subscriber_still_sees_every_fragment() {
545        // A downstream consumer of the source broadcaster (e.g.
546        // the LL-HLS bridge) must not be perturbed by transcoder
547        // drain tasks. Assert the fan-out by subscribing
548        // independently and reading every fragment.
549        let registry = FragmentBroadcasterRegistry::new();
550        let _handle = TranscodeRunner::new()
551            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_240p()))
552            .install(&registry);
553
554        let bc = registry.get_or_create("live/fanout", "0.mp4", meta());
555        let mut downstream = bc.subscribe();
556        let emitted = PMutex::new(Vec::<u64>::new());
557        for i in 0..4 {
558            bc.emit(frag(i));
559            emitted.lock().push(i);
560        }
561        tokio::time::sleep(Duration::from_millis(100)).await;
562        for expected in 0..4u64 {
563            let f = downstream.next_fragment().await.expect("downstream frag");
564            assert_eq!(f.group_id, expected);
565        }
566    }
567}