Skip to main content

lvqr_transcode/
runner.rs

1//! [`TranscodeRunner`] + [`TranscodeRunnerHandle`] + [`TranscoderStats`].
2//!
3//! Wires registered [`crate::TranscoderFactory`] instances into a
4//! shared [`lvqr_fragment::FragmentBroadcasterRegistry`] and drives
5//! one tokio drain task per `(transcoder, rendition, broadcast,
6//! track)` instance. Mirrors [`lvqr_agent::AgentRunner`] one-for-
7//! one, with `(factory_name, rendition_name, broadcast, track)` as
8//! the four-tuple stats key so metrics distinguish renditions of
9//! the same factory.
10
11use std::panic::AssertUnwindSafe;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14
15use dashmap::DashMap;
16use dashmap::mapref::entry::Entry;
17use lvqr_fragment::{BroadcasterStream, FragmentBroadcaster, FragmentBroadcasterRegistry, FragmentStream};
18use parking_lot::RwLock;
19use tokio::runtime::Handle;
20use tokio::task::JoinHandle;
21use tracing::{info, warn};
22
23use crate::transcoder::{Transcoder, TranscoderContext, TranscoderFactory};
24
25/// Per-`(transcoder, rendition, broadcast, track)` outcome
26/// counters.
27#[derive(Debug, Default)]
28pub struct TranscoderStats {
29    /// Total fragments handed to [`Transcoder::on_fragment`]
30    /// (regardless of panic outcome).
31    pub fragments_seen: AtomicU64,
32
33    /// Count of caught panics across `on_start`, `on_fragment`,
34    /// and `on_stop` for this key.
35    pub panics: AtomicU64,
36}
37
38/// Stats key: `(transcoder_name, rendition_name, broadcast, track)`.
39/// Two factories of the same name targeting different renditions
40/// live under separate keys so metrics distinguish them.
41type StatsKey = (String, String, String, String);
42
43/// Shared runner state held jointly by the registry `on_entry_created`
44/// callback and the [`TranscodeRunnerHandle`]. The factory set is mutable
45/// (behind an `RwLock`) so renditions can be added / removed at runtime;
46/// `tasks` is keyed per drain instance so a removed rendition can abort just
47/// its tasks; `registry` is a (cheap, shared) clone so a runtime-added
48/// rendition can retroactively spawn drain tasks against already-live
49/// sources.
50struct RunnerInner {
51    registry: FragmentBroadcasterRegistry,
52    factories: RwLock<Vec<Arc<dyn TranscoderFactory>>>,
53    tasks: DashMap<StatsKey, JoinHandle<()>>,
54    stats: DashMap<StatsKey, Arc<TranscoderStats>>,
55}
56
57impl RunnerInner {
58    /// Build + spawn a drain task for `factory` against the source
59    /// broadcaster `bc`, unless an identical instance is already running or
60    /// the factory opts out of this `(broadcast, track)`. Returns true iff a
61    /// task was spawned. Idempotent on the `StatsKey` so retroactive spawning
62    /// (runtime add) cannot race the `on_entry_created` callback into a
63    /// duplicate drain (which would double-produce output fragments).
64    fn spawn_for(
65        &self,
66        broadcast: &str,
67        track: &str,
68        bc: &Arc<FragmentBroadcaster>,
69        factory: &Arc<dyn TranscoderFactory>,
70    ) -> bool {
71        let rendition = factory.rendition().clone();
72        let key: StatsKey = (
73            factory.name().to_string(),
74            rendition.name.clone(),
75            broadcast.to_string(),
76            track.to_string(),
77        );
78        // Cheap pre-check before the (potentially slow) factory build.
79        if self.tasks.contains_key(&key) {
80            return false;
81        }
82        // Never transcode a transcode OUTPUT. Outputs are named
83        // `<source>/<rendition>`, so the broadcast's final path segment is a
84        // rendition name. Checking against the LIVE ladder (not a per-factory
85        // frozen skip list) is what makes runtime `add_rendition` safe: a
86        // newly added rendition must not transcode another rendition's output
87        // (which would recurse), and the pre-existing factories' frozen skip
88        // lists do not know the new rendition's name.
89        let last_seg = broadcast.rsplit('/').next().unwrap_or(broadcast);
90        if self.factories.read().iter().any(|f| f.rendition().name == last_seg) {
91            return false;
92        }
93        let ctx = TranscoderContext {
94            broadcast: broadcast.to_string(),
95            track: track.to_string(),
96            meta: bc.meta(),
97            rendition: rendition.clone(),
98        };
99        let Some(transcoder) = factory.build(&ctx) else {
100            return false;
101        };
102        let handle = match Handle::try_current() {
103            Ok(h) => h,
104            Err(_) => {
105                warn!(
106                    broadcast = %broadcast,
107                    track = %track,
108                    "TranscodeRunner: no tokio runtime; no drain spawned",
109                );
110                return false;
111            }
112        };
113        // Reserve the key under the shard lock so a concurrent caller cannot
114        // also spawn this instance. Build happened above (outside the lock);
115        // if we lost the race the built transcoder is dropped here.
116        match self.tasks.entry(key.clone()) {
117            Entry::Occupied(_) => false,
118            Entry::Vacant(slot) => {
119                let sub = bc.subscribe();
120                let stat = Arc::clone(
121                    self.stats
122                        .entry(key.clone())
123                        .or_insert_with(|| Arc::new(TranscoderStats::default()))
124                        .value(),
125                );
126                let task = handle.spawn(drive(transcoder, key.0.clone(), ctx, sub, stat));
127                slot.insert(task);
128                true
129            }
130        }
131    }
132}
133
134/// Cheaply-cloneable handle returned by
135/// [`TranscodeRunner::install`].
136///
137/// Holds the spawned per-transcoder drain tasks alive for the
138/// server lifetime; tests and admin consumers read per-
139/// `(transcoder, rendition, broadcast, track)` counters off this
140/// handle. Mid-stride aborts (drop / [`Self::remove_rendition`]) do NOT
141/// call [`Transcoder::on_stop`], matching the
142/// [`lvqr_agent::AgentRunnerHandle`] shutdown shape.
143///
144/// The ladder is mutable at runtime via [`Self::add_rendition`] /
145/// [`Self::remove_rendition`]; [`Self::renditions`] reports the current set
146/// so introspection stays consistent with edits.
147#[derive(Clone)]
148pub struct TranscodeRunnerHandle {
149    inner: Arc<RunnerInner>,
150}
151
152impl std::fmt::Debug for TranscodeRunnerHandle {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("TranscodeRunnerHandle")
155            .field("tracked_keys", &self.inner.stats.len())
156            .field("renditions", &self.inner.factories.read().len())
157            .finish()
158    }
159}
160
161impl TranscodeRunnerHandle {
162    /// Total fragments observed by `transcoder` producing
163    /// `rendition` from `(broadcast, track)`. Returns 0 if no
164    /// transcoder under that key has fired yet.
165    pub fn fragments_seen(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
166        self.stat(transcoder, rendition, broadcast, track)
167            .map(|s| s.fragments_seen.load(Ordering::Relaxed))
168            .unwrap_or(0)
169    }
170
171    /// Caught-panic count for `transcoder` producing `rendition`
172    /// from `(broadcast, track)`. Aggregates `on_start`,
173    /// `on_fragment`, and `on_stop` panics under one counter.
174    pub fn panics(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
175        self.stat(transcoder, rendition, broadcast, track)
176            .map(|s| s.panics.load(Ordering::Relaxed))
177            .unwrap_or(0)
178    }
179
180    /// Snapshot of every `(transcoder, rendition, broadcast, track)`
181    /// quadruple the runner has spawned a drain task for.
182    pub fn tracked(&self) -> Vec<StatsKey> {
183        self.inner.stats.iter().map(|e| e.key().clone()).collect()
184    }
185
186    /// The current ladder's rendition specs, in registration order. Reflects
187    /// runtime [`Self::add_rendition`] / [`Self::remove_rendition`] edits, so
188    /// the admin introspection route stays consistent with the live ladder.
189    pub fn renditions(&self) -> Vec<crate::RenditionSpec> {
190        self.inner
191            .factories
192            .read()
193            .iter()
194            .map(|f| f.rendition().clone())
195            .collect()
196    }
197
198    /// Add a rendition factory to the live ladder. New broadcasts pick it up
199    /// via the `on_entry_created` callback; already-live sources get a drain
200    /// task spawned retroactively. No-op returning `false` when a factory with
201    /// the same `(name, rendition)` is already registered (the caller should
202    /// map that to a 409). Note a single rendition legitimately carries
203    /// multiple factories of different names -- e.g. a `"software"` video
204    /// encoder plus an `"audio-passthrough"` -- so the guard keys on the
205    /// `(factory name, rendition)` pair, not the rendition name alone. Must be
206    /// called from within a tokio runtime.
207    pub fn add_rendition(&self, factory: Arc<dyn TranscoderFactory>) -> bool {
208        {
209            let mut factories = self.inner.factories.write();
210            if factories
211                .iter()
212                .any(|f| f.name() == factory.name() && f.rendition().name == factory.rendition().name)
213            {
214                return false;
215            }
216            factories.push(Arc::clone(&factory));
217        }
218        // Retroactively spawn against every already-live source. The factory
219        // opts out of non-source / wrong-kind tracks via `build()`.
220        for (broadcast, track) in self.inner.registry.keys() {
221            if let Some(bc) = self.inner.registry.get(&broadcast, &track) {
222                self.inner.spawn_for(&broadcast, &track, &bc, &factory);
223            }
224        }
225        true
226    }
227
228    /// Remove every factory + drain task for `rendition`. Returns the number
229    /// of drain tasks aborted. Aborting skips `on_stop`; the rendition's
230    /// already-published output broadcasters drain to their subscribers and
231    /// close when the source ends. Returns 0 when no such rendition exists.
232    pub fn remove_rendition(&self, rendition: &str) -> usize {
233        self.inner.factories.write().retain(|f| f.rendition().name != rendition);
234        let mut aborted = 0usize;
235        self.inner.tasks.retain(|key, task| {
236            if key.1 == rendition {
237                task.abort();
238                aborted += 1;
239                false
240            } else {
241                true
242            }
243        });
244        aborted
245    }
246
247    fn stat(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> Option<Arc<TranscoderStats>> {
248        self.inner
249            .stats
250            .get(&(
251                transcoder.to_string(),
252                rendition.to_string(),
253                broadcast.to_string(),
254                track.to_string(),
255            ))
256            .map(|e| Arc::clone(e.value()))
257    }
258}
259
260/// Builder that collects [`TranscoderFactory`] registrations and
261/// installs them onto a [`FragmentBroadcasterRegistry`]. Typical
262/// usage -- three rungs of the default ladder:
263///
264/// ```no_run
265/// # use lvqr_transcode::{PassthroughTranscoderFactory, RenditionSpec, TranscodeRunner};
266/// # use lvqr_fragment::FragmentBroadcasterRegistry;
267/// let registry = FragmentBroadcasterRegistry::new();
268/// let _handle = TranscodeRunner::new()
269///     .with_ladder(RenditionSpec::default_ladder(), |spec| {
270///         PassthroughTranscoderFactory::new(spec)
271///     })
272///     .install(&registry);
273/// // hold _handle for the server lifetime
274/// ```
275#[derive(Default)]
276pub struct TranscodeRunner {
277    factories: Vec<Arc<dyn TranscoderFactory>>,
278}
279
280impl TranscodeRunner {
281    /// Construct an empty runner.
282    pub fn new() -> Self {
283        Self::default()
284    }
285
286    /// Register a transcoder factory by value.
287    pub fn with_factory<F: TranscoderFactory>(mut self, factory: F) -> Self {
288        self.factories.push(Arc::new(factory));
289        self
290    }
291
292    /// Register a pre-arc'd factory. Useful when the caller
293    /// already shares an `Arc<dyn TranscoderFactory>` with other
294    /// server-side state.
295    pub fn with_factory_arc(mut self, factory: Arc<dyn TranscoderFactory>) -> Self {
296        self.factories.push(factory);
297        self
298    }
299
300    /// Convenience: register one factory per rendition in the
301    /// supplied ladder, building each factory from its rendition
302    /// via `build`. Mirrors the `RenditionSpec::default_ladder()`
303    /// -> three `PassthroughTranscoderFactory` pattern without
304    /// forcing the caller to unroll it.
305    pub fn with_ladder<F, Fn_>(mut self, ladder: Vec<crate::RenditionSpec>, build: Fn_) -> Self
306    where
307        F: TranscoderFactory,
308        Fn_: Fn(crate::RenditionSpec) -> F,
309    {
310        for spec in ladder {
311            self.factories.push(Arc::new(build(spec)));
312        }
313        self
314    }
315
316    /// How many factories are currently registered. Useful for
317    /// `Default`-instantiated runners that want to gate their own
318    /// install calls.
319    pub fn factory_count(&self) -> usize {
320        self.factories.len()
321    }
322
323    /// Wire an `on_entry_created` callback on `registry` so every
324    /// new `(broadcast, track)` pair gets one drain task per
325    /// transcoder the registered factories opt into. Returns a
326    /// handle the caller MUST hold for the server lifetime;
327    /// dropping it aborts every spawned task.
328    ///
329    /// Callback semantics mirror [`lvqr_agent::AgentRunner::install`]:
330    /// the callback runs on the thread that wins the
331    /// `get_or_create` insertion race, subscribes synchronously
332    /// so no emit can race ahead of the drain loop, and spawns
333    /// the per-transcoder drain task on the current tokio
334    /// runtime. If no tokio runtime is available the warn logs
335    /// and no task spawns.
336    pub fn install(self, registry: &FragmentBroadcasterRegistry) -> TranscodeRunnerHandle {
337        let inner = Arc::new(RunnerInner {
338            registry: registry.clone(),
339            factories: RwLock::new(self.factories),
340            tasks: DashMap::new(),
341            stats: DashMap::new(),
342        });
343
344        let inner_cb = Arc::clone(&inner);
345        registry.on_entry_created(move |broadcast, track, bc| {
346            // Snapshot the current factory Arcs so a concurrent add/remove
347            // does not hold the read lock across the spawn loop.
348            let factories: Vec<Arc<dyn TranscoderFactory>> = inner_cb.factories.read().clone();
349            for factory in &factories {
350                inner_cb.spawn_for(broadcast, track, bc, factory);
351            }
352        });
353
354        info!(
355            renditions = inner.factories.read().len(),
356            "TranscodeRunner installed on FragmentBroadcasterRegistry",
357        );
358
359        TranscodeRunnerHandle { inner }
360    }
361}
362
363/// Per-transcoder drain task. Runs until the broadcaster closes.
364/// All trait dispatch is wrapped in `catch_unwind` so a panic in
365/// any of `on_start` / `on_fragment` / `on_stop` is logged +
366/// counted but does not propagate to the spawning runtime.
367async fn drive(
368    mut transcoder: Box<dyn Transcoder>,
369    transcoder_name: String,
370    ctx: TranscoderContext,
371    mut sub: BroadcasterStream,
372    stats: Arc<TranscoderStats>,
373) {
374    let rendition_name = ctx.rendition.name.clone();
375
376    // Refresh the meta snapshot before `on_start`. The
377    // `on_entry_created` callback fires synchronously inside
378    // `FragmentBroadcasterRegistry::get_or_create`, *before* the
379    // ingest side calls `set_init_segment`. A transcoder that
380    // reads `ctx.meta.init_segment` at on_start time would miss
381    // the header bytes -- which is a silent break for the
382    // software pipeline (qtdemux finds no playable streams). The
383    // refresh below catches the late init without changing the
384    // trait surface. Tier 4 item 4.6 session 106 C fix.
385    sub.refresh_meta();
386    let ctx = TranscoderContext {
387        broadcast: ctx.broadcast,
388        track: ctx.track,
389        meta: sub.meta().clone(),
390        rendition: ctx.rendition,
391    };
392
393    // on_start: a panic here means we abort the drain loop.
394    // Handing fragments to a transcoder whose setup panicked
395    // would amplify the fault, not contain it.
396    let started = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_start(&ctx)));
397    if started.is_err() {
398        stats.panics.fetch_add(1, Ordering::Relaxed);
399        metrics::counter!(
400            "lvqr_transcode_panics_total",
401            "transcoder" => transcoder_name.clone(),
402            "rendition" => rendition_name.clone(),
403            "phase" => "start",
404        )
405        .increment(1);
406        warn!(
407            transcoder = %transcoder_name,
408            rendition = %rendition_name,
409            broadcast = %ctx.broadcast,
410            track = %ctx.track,
411            "Transcoder::on_start panicked; skipping drain loop",
412        );
413        return;
414    }
415
416    while let Some(frag) = sub.next_fragment().await {
417        stats.fragments_seen.fetch_add(1, Ordering::Relaxed);
418        metrics::counter!(
419            "lvqr_transcode_fragments_total",
420            "transcoder" => transcoder_name.clone(),
421            "rendition" => rendition_name.clone(),
422        )
423        .increment(1);
424        let result = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_fragment(&frag)));
425        if result.is_err() {
426            stats.panics.fetch_add(1, Ordering::Relaxed);
427            metrics::counter!(
428                "lvqr_transcode_panics_total",
429                "transcoder" => transcoder_name.clone(),
430                "rendition" => rendition_name.clone(),
431                "phase" => "fragment",
432            )
433            .increment(1);
434            warn!(
435                transcoder = %transcoder_name,
436                rendition = %rendition_name,
437                broadcast = %ctx.broadcast,
438                track = %ctx.track,
439                group_id = frag.group_id,
440                object_id = frag.object_id,
441                "Transcoder::on_fragment panicked; skipping fragment and continuing",
442            );
443        }
444    }
445
446    let stopped = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_stop()));
447    if stopped.is_err() {
448        stats.panics.fetch_add(1, Ordering::Relaxed);
449        metrics::counter!(
450            "lvqr_transcode_panics_total",
451            "transcoder" => transcoder_name.clone(),
452            "rendition" => rendition_name.clone(),
453            "phase" => "stop",
454        )
455        .increment(1);
456        warn!(
457            transcoder = %transcoder_name,
458            rendition = %rendition_name,
459            broadcast = %ctx.broadcast,
460            track = %ctx.track,
461            "Transcoder::on_stop panicked",
462        );
463    }
464
465    info!(
466        transcoder = %transcoder_name,
467        rendition = %rendition_name,
468        broadcast = %ctx.broadcast,
469        track = %ctx.track,
470        seen = stats.fragments_seen.load(Ordering::Relaxed),
471        panics = stats.panics.load(Ordering::Relaxed),
472        "TranscodeRunner: drain terminated",
473    );
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use crate::passthrough::PassthroughTranscoderFactory;
480    use crate::rendition::RenditionSpec;
481    use bytes::Bytes;
482    use lvqr_fragment::{Fragment, FragmentFlags, FragmentMeta};
483    use parking_lot::Mutex as PMutex;
484    use std::time::Duration;
485
486    fn meta() -> FragmentMeta {
487        FragmentMeta::new("avc1.640028", 90_000)
488    }
489
490    fn frag(idx: u64) -> Fragment {
491        Fragment::new(
492            "0.mp4",
493            idx,
494            0,
495            0,
496            idx * 1000,
497            idx * 1000,
498            1000,
499            FragmentFlags::DELTA,
500            Bytes::from(vec![0xAB; 16]),
501        )
502    }
503
504    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
505    async fn passthrough_sees_every_fragment_and_stops() {
506        let registry = FragmentBroadcasterRegistry::new();
507        let handle = TranscodeRunner::new()
508            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
509            .install(&registry);
510
511        let bc = registry.get_or_create("live/demo", "0.mp4", meta());
512        for i in 0..5 {
513            bc.emit(frag(i));
514        }
515        drop(bc);
516        registry.remove("live/demo", "0.mp4");
517        tokio::time::sleep(Duration::from_millis(150)).await;
518
519        assert_eq!(handle.fragments_seen("passthrough", "720p", "live/demo", "0.mp4"), 5);
520        assert_eq!(handle.panics("passthrough", "720p", "live/demo", "0.mp4"), 0);
521    }
522
523    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
524    async fn default_ladder_spawns_one_task_per_rendition() {
525        let registry = FragmentBroadcasterRegistry::new();
526        let handle = TranscodeRunner::new()
527            .with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
528            .install(&registry);
529
530        let bc = registry.get_or_create("live/ladder", "0.mp4", meta());
531        bc.emit(frag(0));
532        bc.emit(frag(1));
533        tokio::time::sleep(Duration::from_millis(100)).await;
534
535        // Three renditions, each observing both fragments.
536        let mut tracked = handle.tracked();
537        tracked.sort();
538        assert_eq!(tracked.len(), 3, "one drain task per rendition");
539        for (_transcoder, rendition, _broadcast, _track) in &tracked {
540            let seen = handle.fragments_seen("passthrough", rendition, "live/ladder", "0.mp4");
541            assert_eq!(seen, 2, "rendition {rendition} saw both fragments");
542        }
543    }
544
545    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
546    async fn factory_opt_out_skips_non_video_tracks() {
547        let registry = FragmentBroadcasterRegistry::new();
548        let handle = TranscodeRunner::new()
549            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
550            .install(&registry);
551
552        let bc_audio = registry.get_or_create("live/demo", "1.mp4", FragmentMeta::new("mp4a.40.2", 48_000));
553        bc_audio.emit(frag(0));
554        tokio::time::sleep(Duration::from_millis(80)).await;
555
556        // Passthrough factory opts out of non-video tracks; no
557        // drain task spawns for the audio track.
558        assert!(handle.tracked().is_empty());
559    }
560
561    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
562    async fn panic_in_on_fragment_is_caught_and_counted() {
563        struct PanicAtTwo;
564        impl Transcoder for PanicAtTwo {
565            fn on_fragment(&mut self, fragment: &Fragment) {
566                if fragment.group_id == 2 {
567                    panic!("simulated encoder fault at group 2");
568                }
569            }
570        }
571        struct PanicAtTwoFactory {
572            rendition: RenditionSpec,
573        }
574        impl TranscoderFactory for PanicAtTwoFactory {
575            fn name(&self) -> &str {
576                "panicky"
577            }
578            fn rendition(&self) -> &RenditionSpec {
579                &self.rendition
580            }
581            fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
582                Some(Box::new(PanicAtTwo))
583            }
584        }
585
586        let registry = FragmentBroadcasterRegistry::new();
587        let handle = TranscodeRunner::new()
588            .with_factory(PanicAtTwoFactory {
589                rendition: RenditionSpec::preset_720p(),
590            })
591            .install(&registry);
592
593        let bc = registry.get_or_create("live/panic", "0.mp4", meta());
594        for i in 0..5 {
595            bc.emit(frag(i));
596        }
597        tokio::time::sleep(Duration::from_millis(120)).await;
598
599        assert_eq!(handle.fragments_seen("panicky", "720p", "live/panic", "0.mp4"), 5);
600        assert_eq!(handle.panics("panicky", "720p", "live/panic", "0.mp4"), 1);
601    }
602
603    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
604    async fn panic_in_on_start_skips_drain_loop() {
605        struct PanicStart;
606        impl Transcoder for PanicStart {
607            fn on_start(&mut self, _ctx: &TranscoderContext) {
608                panic!("simulated start failure");
609            }
610            fn on_fragment(&mut self, _fragment: &Fragment) {
611                unreachable!("on_fragment must not run after on_start panics");
612            }
613        }
614        struct PanicStartFactory {
615            rendition: RenditionSpec,
616        }
617        impl TranscoderFactory for PanicStartFactory {
618            fn name(&self) -> &str {
619                "bad_start"
620            }
621            fn rendition(&self) -> &RenditionSpec {
622                &self.rendition
623            }
624            fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
625                Some(Box::new(PanicStart))
626            }
627        }
628
629        let registry = FragmentBroadcasterRegistry::new();
630        let handle = TranscodeRunner::new()
631            .with_factory(PanicStartFactory {
632                rendition: RenditionSpec::preset_480p(),
633            })
634            .install(&registry);
635
636        let bc = registry.get_or_create("live/panic-start", "0.mp4", meta());
637        bc.emit(frag(0));
638        bc.emit(frag(1));
639        tokio::time::sleep(Duration::from_millis(100)).await;
640
641        assert_eq!(
642            handle.fragments_seen("bad_start", "480p", "live/panic-start", "0.mp4"),
643            0
644        );
645        assert_eq!(handle.panics("bad_start", "480p", "live/panic-start", "0.mp4"), 1);
646    }
647
648    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
649    async fn empty_runner_installs_callback_but_spawns_nothing() {
650        let registry = FragmentBroadcasterRegistry::new();
651        let handle = TranscodeRunner::new().install(&registry);
652
653        let bc = registry.get_or_create("live/empty", "0.mp4", meta());
654        bc.emit(frag(0));
655        tokio::time::sleep(Duration::from_millis(50)).await;
656
657        assert!(handle.tracked().is_empty());
658    }
659
660    #[test]
661    fn runner_default_is_empty() {
662        let r = TranscodeRunner::default();
663        assert_eq!(r.factory_count(), 0);
664    }
665
666    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
667    async fn add_rendition_spawns_for_existing_live_source() {
668        let registry = FragmentBroadcasterRegistry::new();
669        let handle = TranscodeRunner::new()
670            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
671            .install(&registry);
672
673        let bc = registry.get_or_create("live/x", "0.mp4", meta());
674        bc.emit(frag(0));
675        tokio::time::sleep(Duration::from_millis(60)).await;
676
677        // Add 480p at runtime; it must spawn a drain task against the
678        // already-live source.
679        assert!(handle.add_rendition(Arc::new(
680            PassthroughTranscoderFactory::new(RenditionSpec::preset_480p())
681        )));
682        tokio::time::sleep(Duration::from_millis(60)).await;
683
684        bc.emit(frag(1));
685        bc.emit(frag(2));
686        tokio::time::sleep(Duration::from_millis(120)).await;
687
688        assert_eq!(handle.fragments_seen("passthrough", "720p", "live/x", "0.mp4"), 3);
689        let s480 = handle.fragments_seen("passthrough", "480p", "live/x", "0.mp4");
690        assert!(s480 >= 2, "runtime-added 480p must see post-add fragments; saw {s480}");
691    }
692
693    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
694    async fn add_rendition_rejects_duplicate_name() {
695        let registry = FragmentBroadcasterRegistry::new();
696        let handle = TranscodeRunner::new()
697            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
698            .install(&registry);
699
700        // Same rendition name -> rejected.
701        assert!(!handle.add_rendition(Arc::new(
702            PassthroughTranscoderFactory::new(RenditionSpec::preset_720p())
703        )));
704        // Distinct name -> accepted.
705        assert!(handle.add_rendition(Arc::new(
706            PassthroughTranscoderFactory::new(RenditionSpec::preset_240p())
707        )));
708    }
709
710    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
711    async fn remove_rendition_aborts_its_drain_and_leaves_others() {
712        let registry = FragmentBroadcasterRegistry::new();
713        let handle = TranscodeRunner::new()
714            .with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
715            .install(&registry);
716
717        let bc = registry.get_or_create("live/r", "0.mp4", meta());
718        bc.emit(frag(0));
719        tokio::time::sleep(Duration::from_millis(80)).await;
720
721        let aborted = handle.remove_rendition("480p");
722        assert_eq!(aborted, 1, "exactly the 480p drain task aborts");
723        tokio::time::sleep(Duration::from_millis(40)).await;
724
725        bc.emit(frag(1));
726        bc.emit(frag(2));
727        tokio::time::sleep(Duration::from_millis(120)).await;
728
729        let s720 = handle.fragments_seen("passthrough", "720p", "live/r", "0.mp4");
730        let s480 = handle.fragments_seen("passthrough", "480p", "live/r", "0.mp4");
731        assert!(s720 >= 3, "surviving 720p keeps draining; saw {s720}");
732        assert!(s480 < s720, "removed 480p stopped draining ({s480}) vs 720p ({s720})");
733
734        let names: Vec<String> = handle.renditions().iter().map(|r| r.name.clone()).collect();
735        assert!(!names.contains(&"480p".to_string()), "480p gone from ladder: {names:?}");
736        assert!(names.contains(&"720p".to_string()));
737    }
738
739    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
740    async fn does_not_transcode_a_rendition_output_broadcast() {
741        // A broadcast whose final segment matches a rendition name is a
742        // transcode output and must never be (re-)transcoded, or runtime adds
743        // would recurse. The source "live/x" is transcoded; the output-shaped
744        // "live/x/720p" is skipped.
745        let registry = FragmentBroadcasterRegistry::new();
746        let handle = TranscodeRunner::new()
747            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
748            .install(&registry);
749
750        let src = registry.get_or_create("live/x", "0.mp4", meta());
751        let output = registry.get_or_create("live/x/720p", "0.mp4", meta());
752        src.emit(frag(0));
753        output.emit(frag(0));
754        tokio::time::sleep(Duration::from_millis(100)).await;
755
756        assert_eq!(handle.fragments_seen("passthrough", "720p", "live/x", "0.mp4"), 1);
757        assert_eq!(
758            handle.fragments_seen("passthrough", "720p", "live/x/720p", "0.mp4"),
759            0,
760            "output-shaped broadcast must not be transcoded"
761        );
762    }
763
764    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
765    async fn two_factories_share_a_rendition_name() {
766        // A rendition carries both a video encoder and an audio passthrough
767        // (different factory names, same rendition). The dup guard keys on
768        // (factory name, rendition), so both must register; removing the
769        // rendition drops both.
770        struct AltFactory {
771            rendition: RenditionSpec,
772        }
773        impl TranscoderFactory for AltFactory {
774            fn name(&self) -> &str {
775                "alt"
776            }
777            fn rendition(&self) -> &RenditionSpec {
778                &self.rendition
779            }
780            fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
781                None
782            }
783        }
784
785        let registry = FragmentBroadcasterRegistry::new();
786        let handle = TranscodeRunner::new()
787            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
788            .install(&registry);
789
790        // Same rendition "720p", different factory name "alt" -> accepted.
791        assert!(handle.add_rendition(Arc::new(AltFactory {
792            rendition: RenditionSpec::preset_720p(),
793        })));
794        assert_eq!(handle.renditions().len(), 2, "two factories, both for 720p");
795
796        // Removing the rendition drops both factories.
797        handle.remove_rendition("720p");
798        assert!(handle.renditions().is_empty());
799    }
800
801    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
802    async fn renditions_reflects_runtime_edits() {
803        let registry = FragmentBroadcasterRegistry::new();
804        let handle = TranscodeRunner::new()
805            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
806            .install(&registry);
807
808        let names =
809            |h: &TranscodeRunnerHandle| -> Vec<String> { h.renditions().iter().map(|r| r.name.clone()).collect() };
810        assert_eq!(names(&handle), vec!["720p".to_string()]);
811
812        assert!(handle.add_rendition(Arc::new(
813            PassthroughTranscoderFactory::new(RenditionSpec::preset_480p())
814        )));
815        let mut after_add = names(&handle);
816        after_add.sort();
817        assert_eq!(after_add, vec!["480p".to_string(), "720p".to_string()]);
818
819        assert_eq!(
820            handle.remove_rendition("720p"),
821            0,
822            "no live source, so no task to abort"
823        );
824        assert_eq!(names(&handle), vec!["480p".to_string()]);
825    }
826
827    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
828    async fn downstream_subscriber_still_sees_every_fragment() {
829        // A downstream consumer of the source broadcaster (e.g.
830        // the LL-HLS bridge) must not be perturbed by transcoder
831        // drain tasks. Assert the fan-out by subscribing
832        // independently and reading every fragment.
833        let registry = FragmentBroadcasterRegistry::new();
834        let _handle = TranscodeRunner::new()
835            .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_240p()))
836            .install(&registry);
837
838        let bc = registry.get_or_create("live/fanout", "0.mp4", meta());
839        let mut downstream = bc.subscribe();
840        let emitted = PMutex::new(Vec::<u64>::new());
841        for i in 0..4 {
842            bc.emit(frag(i));
843            emitted.lock().push(i);
844        }
845        tokio::time::sleep(Duration::from_millis(100)).await;
846        for expected in 0..4u64 {
847            let f = downstream.next_fragment().await.expect("downstream frag");
848            assert_eq!(f.group_id, expected);
849        }
850    }
851}