1use std::panic::AssertUnwindSafe;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14
15use dashmap::DashMap;
16use lvqr_fragment::{BroadcasterStream, FragmentBroadcasterRegistry, FragmentStream};
17use parking_lot::Mutex;
18use tokio::runtime::Handle;
19use tokio::task::JoinHandle;
20use tracing::{info, warn};
21
22use crate::transcoder::{Transcoder, TranscoderContext, TranscoderFactory};
23
24#[derive(Debug, Default)]
27pub struct TranscoderStats {
28 pub fragments_seen: AtomicU64,
31
32 pub panics: AtomicU64,
35}
36
37type StatsKey = (String, String, String, String);
41
42#[derive(Clone)]
52pub struct TranscodeRunnerHandle {
53 stats: Arc<DashMap<StatsKey, Arc<TranscoderStats>>>,
54 _tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
55}
56
57impl std::fmt::Debug for TranscodeRunnerHandle {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("TranscodeRunnerHandle")
60 .field("tracked_keys", &self.stats.len())
61 .finish()
62 }
63}
64
65impl TranscodeRunnerHandle {
66 pub fn fragments_seen(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
70 self.stat(transcoder, rendition, broadcast, track)
71 .map(|s| s.fragments_seen.load(Ordering::Relaxed))
72 .unwrap_or(0)
73 }
74
75 pub fn panics(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
79 self.stat(transcoder, rendition, broadcast, track)
80 .map(|s| s.panics.load(Ordering::Relaxed))
81 .unwrap_or(0)
82 }
83
84 pub fn tracked(&self) -> Vec<StatsKey> {
87 self.stats.iter().map(|e| e.key().clone()).collect()
88 }
89
90 fn stat(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> Option<Arc<TranscoderStats>> {
91 self.stats
92 .get(&(
93 transcoder.to_string(),
94 rendition.to_string(),
95 broadcast.to_string(),
96 track.to_string(),
97 ))
98 .map(|e| Arc::clone(e.value()))
99 }
100}
101
102#[derive(Default)]
118pub struct TranscodeRunner {
119 factories: Vec<Arc<dyn TranscoderFactory>>,
120}
121
122impl TranscodeRunner {
123 pub fn new() -> Self {
125 Self::default()
126 }
127
128 pub fn with_factory<F: TranscoderFactory>(mut self, factory: F) -> Self {
130 self.factories.push(Arc::new(factory));
131 self
132 }
133
134 pub fn with_factory_arc(mut self, factory: Arc<dyn TranscoderFactory>) -> Self {
138 self.factories.push(factory);
139 self
140 }
141
142 pub fn with_ladder<F, Fn_>(mut self, ladder: Vec<crate::RenditionSpec>, build: Fn_) -> Self
148 where
149 F: TranscoderFactory,
150 Fn_: Fn(crate::RenditionSpec) -> F,
151 {
152 for spec in ladder {
153 self.factories.push(Arc::new(build(spec)));
154 }
155 self
156 }
157
158 pub fn factory_count(&self) -> usize {
162 self.factories.len()
163 }
164
165 pub fn install(self, registry: &FragmentBroadcasterRegistry) -> TranscodeRunnerHandle {
179 let stats: Arc<DashMap<StatsKey, Arc<TranscoderStats>>> = Arc::new(DashMap::new());
180 let tasks: Arc<Mutex<Vec<JoinHandle<()>>>> = Arc::new(Mutex::new(Vec::new()));
181
182 let factories = self.factories;
183 let stats_cb = Arc::clone(&stats);
184 let tasks_cb = Arc::clone(&tasks);
185
186 registry.on_entry_created(move |broadcast, track, bc| {
187 let handle = match Handle::try_current() {
188 Ok(h) => h,
189 Err(_) => {
190 warn!(
191 broadcast = %broadcast,
192 track = %track,
193 "TranscodeRunner: registry callback fired outside tokio runtime; no drain spawned",
194 );
195 return;
196 }
197 };
198
199 for factory in &factories {
200 let rendition = factory.rendition().clone();
201 let ctx = TranscoderContext {
202 broadcast: broadcast.to_string(),
203 track: track.to_string(),
204 meta: bc.meta(),
205 rendition: rendition.clone(),
206 };
207 let Some(transcoder) = factory.build(&ctx) else {
208 continue;
209 };
210
211 let sub = bc.subscribe();
212 let key: StatsKey = (
213 factory.name().to_string(),
214 rendition.name.clone(),
215 broadcast.to_string(),
216 track.to_string(),
217 );
218 let stat = Arc::clone(
219 stats_cb
220 .entry(key.clone())
221 .or_insert_with(|| Arc::new(TranscoderStats::default()))
222 .value(),
223 );
224 let factory_name = factory.name().to_string();
225 let ctx_for_task = ctx.clone();
226 let task = handle.spawn(drive(transcoder, factory_name, ctx_for_task, sub, stat));
227 tasks_cb.lock().push(task);
228 }
229 });
230
231 info!(
232 tracked = stats.len(),
233 "TranscodeRunner installed on FragmentBroadcasterRegistry",
234 );
235
236 TranscodeRunnerHandle { stats, _tasks: tasks }
237 }
238}
239
240async fn drive(
245 mut transcoder: Box<dyn Transcoder>,
246 transcoder_name: String,
247 ctx: TranscoderContext,
248 mut sub: BroadcasterStream,
249 stats: Arc<TranscoderStats>,
250) {
251 let rendition_name = ctx.rendition.name.clone();
252
253 sub.refresh_meta();
263 let ctx = TranscoderContext {
264 broadcast: ctx.broadcast,
265 track: ctx.track,
266 meta: sub.meta().clone(),
267 rendition: ctx.rendition,
268 };
269
270 let started = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_start(&ctx)));
274 if started.is_err() {
275 stats.panics.fetch_add(1, Ordering::Relaxed);
276 metrics::counter!(
277 "lvqr_transcode_panics_total",
278 "transcoder" => transcoder_name.clone(),
279 "rendition" => rendition_name.clone(),
280 "phase" => "start",
281 )
282 .increment(1);
283 warn!(
284 transcoder = %transcoder_name,
285 rendition = %rendition_name,
286 broadcast = %ctx.broadcast,
287 track = %ctx.track,
288 "Transcoder::on_start panicked; skipping drain loop",
289 );
290 return;
291 }
292
293 while let Some(frag) = sub.next_fragment().await {
294 stats.fragments_seen.fetch_add(1, Ordering::Relaxed);
295 metrics::counter!(
296 "lvqr_transcode_fragments_total",
297 "transcoder" => transcoder_name.clone(),
298 "rendition" => rendition_name.clone(),
299 )
300 .increment(1);
301 let result = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_fragment(&frag)));
302 if result.is_err() {
303 stats.panics.fetch_add(1, Ordering::Relaxed);
304 metrics::counter!(
305 "lvqr_transcode_panics_total",
306 "transcoder" => transcoder_name.clone(),
307 "rendition" => rendition_name.clone(),
308 "phase" => "fragment",
309 )
310 .increment(1);
311 warn!(
312 transcoder = %transcoder_name,
313 rendition = %rendition_name,
314 broadcast = %ctx.broadcast,
315 track = %ctx.track,
316 group_id = frag.group_id,
317 object_id = frag.object_id,
318 "Transcoder::on_fragment panicked; skipping fragment and continuing",
319 );
320 }
321 }
322
323 let stopped = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_stop()));
324 if stopped.is_err() {
325 stats.panics.fetch_add(1, Ordering::Relaxed);
326 metrics::counter!(
327 "lvqr_transcode_panics_total",
328 "transcoder" => transcoder_name.clone(),
329 "rendition" => rendition_name.clone(),
330 "phase" => "stop",
331 )
332 .increment(1);
333 warn!(
334 transcoder = %transcoder_name,
335 rendition = %rendition_name,
336 broadcast = %ctx.broadcast,
337 track = %ctx.track,
338 "Transcoder::on_stop panicked",
339 );
340 }
341
342 info!(
343 transcoder = %transcoder_name,
344 rendition = %rendition_name,
345 broadcast = %ctx.broadcast,
346 track = %ctx.track,
347 seen = stats.fragments_seen.load(Ordering::Relaxed),
348 panics = stats.panics.load(Ordering::Relaxed),
349 "TranscodeRunner: drain terminated",
350 );
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use crate::passthrough::PassthroughTranscoderFactory;
357 use crate::rendition::RenditionSpec;
358 use bytes::Bytes;
359 use lvqr_fragment::{Fragment, FragmentFlags, FragmentMeta};
360 use parking_lot::Mutex as PMutex;
361 use std::time::Duration;
362
363 fn meta() -> FragmentMeta {
364 FragmentMeta::new("avc1.640028", 90_000)
365 }
366
367 fn frag(idx: u64) -> Fragment {
368 Fragment::new(
369 "0.mp4",
370 idx,
371 0,
372 0,
373 idx * 1000,
374 idx * 1000,
375 1000,
376 FragmentFlags::DELTA,
377 Bytes::from(vec![0xAB; 16]),
378 )
379 }
380
381 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
382 async fn passthrough_sees_every_fragment_and_stops() {
383 let registry = FragmentBroadcasterRegistry::new();
384 let handle = TranscodeRunner::new()
385 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
386 .install(®istry);
387
388 let bc = registry.get_or_create("live/demo", "0.mp4", meta());
389 for i in 0..5 {
390 bc.emit(frag(i));
391 }
392 drop(bc);
393 registry.remove("live/demo", "0.mp4");
394 tokio::time::sleep(Duration::from_millis(150)).await;
395
396 assert_eq!(handle.fragments_seen("passthrough", "720p", "live/demo", "0.mp4"), 5);
397 assert_eq!(handle.panics("passthrough", "720p", "live/demo", "0.mp4"), 0);
398 }
399
400 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
401 async fn default_ladder_spawns_one_task_per_rendition() {
402 let registry = FragmentBroadcasterRegistry::new();
403 let handle = TranscodeRunner::new()
404 .with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
405 .install(®istry);
406
407 let bc = registry.get_or_create("live/ladder", "0.mp4", meta());
408 bc.emit(frag(0));
409 bc.emit(frag(1));
410 tokio::time::sleep(Duration::from_millis(100)).await;
411
412 let mut tracked = handle.tracked();
414 tracked.sort();
415 assert_eq!(tracked.len(), 3, "one drain task per rendition");
416 for (_transcoder, rendition, _broadcast, _track) in &tracked {
417 let seen = handle.fragments_seen("passthrough", rendition, "live/ladder", "0.mp4");
418 assert_eq!(seen, 2, "rendition {rendition} saw both fragments");
419 }
420 }
421
422 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
423 async fn factory_opt_out_skips_non_video_tracks() {
424 let registry = FragmentBroadcasterRegistry::new();
425 let handle = TranscodeRunner::new()
426 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
427 .install(®istry);
428
429 let bc_audio = registry.get_or_create("live/demo", "1.mp4", FragmentMeta::new("mp4a.40.2", 48_000));
430 bc_audio.emit(frag(0));
431 tokio::time::sleep(Duration::from_millis(80)).await;
432
433 assert!(handle.tracked().is_empty());
436 }
437
438 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
439 async fn panic_in_on_fragment_is_caught_and_counted() {
440 struct PanicAtTwo;
441 impl Transcoder for PanicAtTwo {
442 fn on_fragment(&mut self, fragment: &Fragment) {
443 if fragment.group_id == 2 {
444 panic!("simulated encoder fault at group 2");
445 }
446 }
447 }
448 struct PanicAtTwoFactory {
449 rendition: RenditionSpec,
450 }
451 impl TranscoderFactory for PanicAtTwoFactory {
452 fn name(&self) -> &str {
453 "panicky"
454 }
455 fn rendition(&self) -> &RenditionSpec {
456 &self.rendition
457 }
458 fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
459 Some(Box::new(PanicAtTwo))
460 }
461 }
462
463 let registry = FragmentBroadcasterRegistry::new();
464 let handle = TranscodeRunner::new()
465 .with_factory(PanicAtTwoFactory {
466 rendition: RenditionSpec::preset_720p(),
467 })
468 .install(®istry);
469
470 let bc = registry.get_or_create("live/panic", "0.mp4", meta());
471 for i in 0..5 {
472 bc.emit(frag(i));
473 }
474 tokio::time::sleep(Duration::from_millis(120)).await;
475
476 assert_eq!(handle.fragments_seen("panicky", "720p", "live/panic", "0.mp4"), 5);
477 assert_eq!(handle.panics("panicky", "720p", "live/panic", "0.mp4"), 1);
478 }
479
480 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
481 async fn panic_in_on_start_skips_drain_loop() {
482 struct PanicStart;
483 impl Transcoder for PanicStart {
484 fn on_start(&mut self, _ctx: &TranscoderContext) {
485 panic!("simulated start failure");
486 }
487 fn on_fragment(&mut self, _fragment: &Fragment) {
488 unreachable!("on_fragment must not run after on_start panics");
489 }
490 }
491 struct PanicStartFactory {
492 rendition: RenditionSpec,
493 }
494 impl TranscoderFactory for PanicStartFactory {
495 fn name(&self) -> &str {
496 "bad_start"
497 }
498 fn rendition(&self) -> &RenditionSpec {
499 &self.rendition
500 }
501 fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
502 Some(Box::new(PanicStart))
503 }
504 }
505
506 let registry = FragmentBroadcasterRegistry::new();
507 let handle = TranscodeRunner::new()
508 .with_factory(PanicStartFactory {
509 rendition: RenditionSpec::preset_480p(),
510 })
511 .install(®istry);
512
513 let bc = registry.get_or_create("live/panic-start", "0.mp4", meta());
514 bc.emit(frag(0));
515 bc.emit(frag(1));
516 tokio::time::sleep(Duration::from_millis(100)).await;
517
518 assert_eq!(
519 handle.fragments_seen("bad_start", "480p", "live/panic-start", "0.mp4"),
520 0
521 );
522 assert_eq!(handle.panics("bad_start", "480p", "live/panic-start", "0.mp4"), 1);
523 }
524
525 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
526 async fn empty_runner_installs_callback_but_spawns_nothing() {
527 let registry = FragmentBroadcasterRegistry::new();
528 let handle = TranscodeRunner::new().install(®istry);
529
530 let bc = registry.get_or_create("live/empty", "0.mp4", meta());
531 bc.emit(frag(0));
532 tokio::time::sleep(Duration::from_millis(50)).await;
533
534 assert!(handle.tracked().is_empty());
535 }
536
537 #[test]
538 fn runner_default_is_empty() {
539 let r = TranscodeRunner::default();
540 assert_eq!(r.factory_count(), 0);
541 }
542
543 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
544 async fn downstream_subscriber_still_sees_every_fragment() {
545 let registry = FragmentBroadcasterRegistry::new();
550 let _handle = TranscodeRunner::new()
551 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_240p()))
552 .install(®istry);
553
554 let bc = registry.get_or_create("live/fanout", "0.mp4", meta());
555 let mut downstream = bc.subscribe();
556 let emitted = PMutex::new(Vec::<u64>::new());
557 for i in 0..4 {
558 bc.emit(frag(i));
559 emitted.lock().push(i);
560 }
561 tokio::time::sleep(Duration::from_millis(100)).await;
562 for expected in 0..4u64 {
563 let f = downstream.next_fragment().await.expect("downstream frag");
564 assert_eq!(f.group_id, expected);
565 }
566 }
567}