1use std::panic::AssertUnwindSafe;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14
15use dashmap::DashMap;
16use dashmap::mapref::entry::Entry;
17use lvqr_fragment::{BroadcasterStream, FragmentBroadcaster, FragmentBroadcasterRegistry, FragmentStream};
18use parking_lot::RwLock;
19use tokio::runtime::Handle;
20use tokio::task::JoinHandle;
21use tracing::{info, warn};
22
23use crate::transcoder::{Transcoder, TranscoderContext, TranscoderFactory};
24
25#[derive(Debug, Default)]
28pub struct TranscoderStats {
29 pub fragments_seen: AtomicU64,
32
33 pub panics: AtomicU64,
36}
37
38type StatsKey = (String, String, String, String);
42
43struct RunnerInner {
51 registry: FragmentBroadcasterRegistry,
52 factories: RwLock<Vec<Arc<dyn TranscoderFactory>>>,
53 tasks: DashMap<StatsKey, JoinHandle<()>>,
54 stats: DashMap<StatsKey, Arc<TranscoderStats>>,
55}
56
57impl RunnerInner {
58 fn spawn_for(
65 &self,
66 broadcast: &str,
67 track: &str,
68 bc: &Arc<FragmentBroadcaster>,
69 factory: &Arc<dyn TranscoderFactory>,
70 ) -> bool {
71 let rendition = factory.rendition().clone();
72 let key: StatsKey = (
73 factory.name().to_string(),
74 rendition.name.clone(),
75 broadcast.to_string(),
76 track.to_string(),
77 );
78 if self.tasks.contains_key(&key) {
80 return false;
81 }
82 let last_seg = broadcast.rsplit('/').next().unwrap_or(broadcast);
90 if self.factories.read().iter().any(|f| f.rendition().name == last_seg) {
91 return false;
92 }
93 let ctx = TranscoderContext {
94 broadcast: broadcast.to_string(),
95 track: track.to_string(),
96 meta: bc.meta(),
97 rendition: rendition.clone(),
98 };
99 let Some(transcoder) = factory.build(&ctx) else {
100 return false;
101 };
102 let handle = match Handle::try_current() {
103 Ok(h) => h,
104 Err(_) => {
105 warn!(
106 broadcast = %broadcast,
107 track = %track,
108 "TranscodeRunner: no tokio runtime; no drain spawned",
109 );
110 return false;
111 }
112 };
113 match self.tasks.entry(key.clone()) {
117 Entry::Occupied(_) => false,
118 Entry::Vacant(slot) => {
119 let sub = bc.subscribe();
120 let stat = Arc::clone(
121 self.stats
122 .entry(key.clone())
123 .or_insert_with(|| Arc::new(TranscoderStats::default()))
124 .value(),
125 );
126 let task = handle.spawn(drive(transcoder, key.0.clone(), ctx, sub, stat));
127 slot.insert(task);
128 true
129 }
130 }
131 }
132}
133
134#[derive(Clone)]
148pub struct TranscodeRunnerHandle {
149 inner: Arc<RunnerInner>,
150}
151
152impl std::fmt::Debug for TranscodeRunnerHandle {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("TranscodeRunnerHandle")
155 .field("tracked_keys", &self.inner.stats.len())
156 .field("renditions", &self.inner.factories.read().len())
157 .finish()
158 }
159}
160
161impl TranscodeRunnerHandle {
162 pub fn fragments_seen(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
166 self.stat(transcoder, rendition, broadcast, track)
167 .map(|s| s.fragments_seen.load(Ordering::Relaxed))
168 .unwrap_or(0)
169 }
170
171 pub fn panics(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> u64 {
175 self.stat(transcoder, rendition, broadcast, track)
176 .map(|s| s.panics.load(Ordering::Relaxed))
177 .unwrap_or(0)
178 }
179
180 pub fn tracked(&self) -> Vec<StatsKey> {
183 self.inner.stats.iter().map(|e| e.key().clone()).collect()
184 }
185
186 pub fn renditions(&self) -> Vec<crate::RenditionSpec> {
190 self.inner
191 .factories
192 .read()
193 .iter()
194 .map(|f| f.rendition().clone())
195 .collect()
196 }
197
198 pub fn add_rendition(&self, factory: Arc<dyn TranscoderFactory>) -> bool {
208 {
209 let mut factories = self.inner.factories.write();
210 if factories
211 .iter()
212 .any(|f| f.name() == factory.name() && f.rendition().name == factory.rendition().name)
213 {
214 return false;
215 }
216 factories.push(Arc::clone(&factory));
217 }
218 for (broadcast, track) in self.inner.registry.keys() {
221 if let Some(bc) = self.inner.registry.get(&broadcast, &track) {
222 self.inner.spawn_for(&broadcast, &track, &bc, &factory);
223 }
224 }
225 true
226 }
227
228 pub fn remove_rendition(&self, rendition: &str) -> usize {
233 self.inner.factories.write().retain(|f| f.rendition().name != rendition);
234 let mut aborted = 0usize;
235 self.inner.tasks.retain(|key, task| {
236 if key.1 == rendition {
237 task.abort();
238 aborted += 1;
239 false
240 } else {
241 true
242 }
243 });
244 aborted
245 }
246
247 fn stat(&self, transcoder: &str, rendition: &str, broadcast: &str, track: &str) -> Option<Arc<TranscoderStats>> {
248 self.inner
249 .stats
250 .get(&(
251 transcoder.to_string(),
252 rendition.to_string(),
253 broadcast.to_string(),
254 track.to_string(),
255 ))
256 .map(|e| Arc::clone(e.value()))
257 }
258}
259
260#[derive(Default)]
276pub struct TranscodeRunner {
277 factories: Vec<Arc<dyn TranscoderFactory>>,
278}
279
280impl TranscodeRunner {
281 pub fn new() -> Self {
283 Self::default()
284 }
285
286 pub fn with_factory<F: TranscoderFactory>(mut self, factory: F) -> Self {
288 self.factories.push(Arc::new(factory));
289 self
290 }
291
292 pub fn with_factory_arc(mut self, factory: Arc<dyn TranscoderFactory>) -> Self {
296 self.factories.push(factory);
297 self
298 }
299
300 pub fn with_ladder<F, Fn_>(mut self, ladder: Vec<crate::RenditionSpec>, build: Fn_) -> Self
306 where
307 F: TranscoderFactory,
308 Fn_: Fn(crate::RenditionSpec) -> F,
309 {
310 for spec in ladder {
311 self.factories.push(Arc::new(build(spec)));
312 }
313 self
314 }
315
316 pub fn factory_count(&self) -> usize {
320 self.factories.len()
321 }
322
323 pub fn install(self, registry: &FragmentBroadcasterRegistry) -> TranscodeRunnerHandle {
337 let inner = Arc::new(RunnerInner {
338 registry: registry.clone(),
339 factories: RwLock::new(self.factories),
340 tasks: DashMap::new(),
341 stats: DashMap::new(),
342 });
343
344 let inner_cb = Arc::clone(&inner);
345 registry.on_entry_created(move |broadcast, track, bc| {
346 let factories: Vec<Arc<dyn TranscoderFactory>> = inner_cb.factories.read().clone();
349 for factory in &factories {
350 inner_cb.spawn_for(broadcast, track, bc, factory);
351 }
352 });
353
354 info!(
355 renditions = inner.factories.read().len(),
356 "TranscodeRunner installed on FragmentBroadcasterRegistry",
357 );
358
359 TranscodeRunnerHandle { inner }
360 }
361}
362
363async fn drive(
368 mut transcoder: Box<dyn Transcoder>,
369 transcoder_name: String,
370 ctx: TranscoderContext,
371 mut sub: BroadcasterStream,
372 stats: Arc<TranscoderStats>,
373) {
374 let rendition_name = ctx.rendition.name.clone();
375
376 sub.refresh_meta();
386 let ctx = TranscoderContext {
387 broadcast: ctx.broadcast,
388 track: ctx.track,
389 meta: sub.meta().clone(),
390 rendition: ctx.rendition,
391 };
392
393 let started = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_start(&ctx)));
397 if started.is_err() {
398 stats.panics.fetch_add(1, Ordering::Relaxed);
399 metrics::counter!(
400 "lvqr_transcode_panics_total",
401 "transcoder" => transcoder_name.clone(),
402 "rendition" => rendition_name.clone(),
403 "phase" => "start",
404 )
405 .increment(1);
406 warn!(
407 transcoder = %transcoder_name,
408 rendition = %rendition_name,
409 broadcast = %ctx.broadcast,
410 track = %ctx.track,
411 "Transcoder::on_start panicked; skipping drain loop",
412 );
413 return;
414 }
415
416 while let Some(frag) = sub.next_fragment().await {
417 stats.fragments_seen.fetch_add(1, Ordering::Relaxed);
418 metrics::counter!(
419 "lvqr_transcode_fragments_total",
420 "transcoder" => transcoder_name.clone(),
421 "rendition" => rendition_name.clone(),
422 )
423 .increment(1);
424 let result = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_fragment(&frag)));
425 if result.is_err() {
426 stats.panics.fetch_add(1, Ordering::Relaxed);
427 metrics::counter!(
428 "lvqr_transcode_panics_total",
429 "transcoder" => transcoder_name.clone(),
430 "rendition" => rendition_name.clone(),
431 "phase" => "fragment",
432 )
433 .increment(1);
434 warn!(
435 transcoder = %transcoder_name,
436 rendition = %rendition_name,
437 broadcast = %ctx.broadcast,
438 track = %ctx.track,
439 group_id = frag.group_id,
440 object_id = frag.object_id,
441 "Transcoder::on_fragment panicked; skipping fragment and continuing",
442 );
443 }
444 }
445
446 let stopped = std::panic::catch_unwind(AssertUnwindSafe(|| transcoder.on_stop()));
447 if stopped.is_err() {
448 stats.panics.fetch_add(1, Ordering::Relaxed);
449 metrics::counter!(
450 "lvqr_transcode_panics_total",
451 "transcoder" => transcoder_name.clone(),
452 "rendition" => rendition_name.clone(),
453 "phase" => "stop",
454 )
455 .increment(1);
456 warn!(
457 transcoder = %transcoder_name,
458 rendition = %rendition_name,
459 broadcast = %ctx.broadcast,
460 track = %ctx.track,
461 "Transcoder::on_stop panicked",
462 );
463 }
464
465 info!(
466 transcoder = %transcoder_name,
467 rendition = %rendition_name,
468 broadcast = %ctx.broadcast,
469 track = %ctx.track,
470 seen = stats.fragments_seen.load(Ordering::Relaxed),
471 panics = stats.panics.load(Ordering::Relaxed),
472 "TranscodeRunner: drain terminated",
473 );
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::passthrough::PassthroughTranscoderFactory;
480 use crate::rendition::RenditionSpec;
481 use bytes::Bytes;
482 use lvqr_fragment::{Fragment, FragmentFlags, FragmentMeta};
483 use parking_lot::Mutex as PMutex;
484 use std::time::Duration;
485
486 fn meta() -> FragmentMeta {
487 FragmentMeta::new("avc1.640028", 90_000)
488 }
489
490 fn frag(idx: u64) -> Fragment {
491 Fragment::new(
492 "0.mp4",
493 idx,
494 0,
495 0,
496 idx * 1000,
497 idx * 1000,
498 1000,
499 FragmentFlags::DELTA,
500 Bytes::from(vec![0xAB; 16]),
501 )
502 }
503
504 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
505 async fn passthrough_sees_every_fragment_and_stops() {
506 let registry = FragmentBroadcasterRegistry::new();
507 let handle = TranscodeRunner::new()
508 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
509 .install(®istry);
510
511 let bc = registry.get_or_create("live/demo", "0.mp4", meta());
512 for i in 0..5 {
513 bc.emit(frag(i));
514 }
515 drop(bc);
516 registry.remove("live/demo", "0.mp4");
517 tokio::time::sleep(Duration::from_millis(150)).await;
518
519 assert_eq!(handle.fragments_seen("passthrough", "720p", "live/demo", "0.mp4"), 5);
520 assert_eq!(handle.panics("passthrough", "720p", "live/demo", "0.mp4"), 0);
521 }
522
523 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
524 async fn default_ladder_spawns_one_task_per_rendition() {
525 let registry = FragmentBroadcasterRegistry::new();
526 let handle = TranscodeRunner::new()
527 .with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
528 .install(®istry);
529
530 let bc = registry.get_or_create("live/ladder", "0.mp4", meta());
531 bc.emit(frag(0));
532 bc.emit(frag(1));
533 tokio::time::sleep(Duration::from_millis(100)).await;
534
535 let mut tracked = handle.tracked();
537 tracked.sort();
538 assert_eq!(tracked.len(), 3, "one drain task per rendition");
539 for (_transcoder, rendition, _broadcast, _track) in &tracked {
540 let seen = handle.fragments_seen("passthrough", rendition, "live/ladder", "0.mp4");
541 assert_eq!(seen, 2, "rendition {rendition} saw both fragments");
542 }
543 }
544
545 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
546 async fn factory_opt_out_skips_non_video_tracks() {
547 let registry = FragmentBroadcasterRegistry::new();
548 let handle = TranscodeRunner::new()
549 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
550 .install(®istry);
551
552 let bc_audio = registry.get_or_create("live/demo", "1.mp4", FragmentMeta::new("mp4a.40.2", 48_000));
553 bc_audio.emit(frag(0));
554 tokio::time::sleep(Duration::from_millis(80)).await;
555
556 assert!(handle.tracked().is_empty());
559 }
560
561 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
562 async fn panic_in_on_fragment_is_caught_and_counted() {
563 struct PanicAtTwo;
564 impl Transcoder for PanicAtTwo {
565 fn on_fragment(&mut self, fragment: &Fragment) {
566 if fragment.group_id == 2 {
567 panic!("simulated encoder fault at group 2");
568 }
569 }
570 }
571 struct PanicAtTwoFactory {
572 rendition: RenditionSpec,
573 }
574 impl TranscoderFactory for PanicAtTwoFactory {
575 fn name(&self) -> &str {
576 "panicky"
577 }
578 fn rendition(&self) -> &RenditionSpec {
579 &self.rendition
580 }
581 fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
582 Some(Box::new(PanicAtTwo))
583 }
584 }
585
586 let registry = FragmentBroadcasterRegistry::new();
587 let handle = TranscodeRunner::new()
588 .with_factory(PanicAtTwoFactory {
589 rendition: RenditionSpec::preset_720p(),
590 })
591 .install(®istry);
592
593 let bc = registry.get_or_create("live/panic", "0.mp4", meta());
594 for i in 0..5 {
595 bc.emit(frag(i));
596 }
597 tokio::time::sleep(Duration::from_millis(120)).await;
598
599 assert_eq!(handle.fragments_seen("panicky", "720p", "live/panic", "0.mp4"), 5);
600 assert_eq!(handle.panics("panicky", "720p", "live/panic", "0.mp4"), 1);
601 }
602
603 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
604 async fn panic_in_on_start_skips_drain_loop() {
605 struct PanicStart;
606 impl Transcoder for PanicStart {
607 fn on_start(&mut self, _ctx: &TranscoderContext) {
608 panic!("simulated start failure");
609 }
610 fn on_fragment(&mut self, _fragment: &Fragment) {
611 unreachable!("on_fragment must not run after on_start panics");
612 }
613 }
614 struct PanicStartFactory {
615 rendition: RenditionSpec,
616 }
617 impl TranscoderFactory for PanicStartFactory {
618 fn name(&self) -> &str {
619 "bad_start"
620 }
621 fn rendition(&self) -> &RenditionSpec {
622 &self.rendition
623 }
624 fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
625 Some(Box::new(PanicStart))
626 }
627 }
628
629 let registry = FragmentBroadcasterRegistry::new();
630 let handle = TranscodeRunner::new()
631 .with_factory(PanicStartFactory {
632 rendition: RenditionSpec::preset_480p(),
633 })
634 .install(®istry);
635
636 let bc = registry.get_or_create("live/panic-start", "0.mp4", meta());
637 bc.emit(frag(0));
638 bc.emit(frag(1));
639 tokio::time::sleep(Duration::from_millis(100)).await;
640
641 assert_eq!(
642 handle.fragments_seen("bad_start", "480p", "live/panic-start", "0.mp4"),
643 0
644 );
645 assert_eq!(handle.panics("bad_start", "480p", "live/panic-start", "0.mp4"), 1);
646 }
647
648 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
649 async fn empty_runner_installs_callback_but_spawns_nothing() {
650 let registry = FragmentBroadcasterRegistry::new();
651 let handle = TranscodeRunner::new().install(®istry);
652
653 let bc = registry.get_or_create("live/empty", "0.mp4", meta());
654 bc.emit(frag(0));
655 tokio::time::sleep(Duration::from_millis(50)).await;
656
657 assert!(handle.tracked().is_empty());
658 }
659
660 #[test]
661 fn runner_default_is_empty() {
662 let r = TranscodeRunner::default();
663 assert_eq!(r.factory_count(), 0);
664 }
665
666 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
667 async fn add_rendition_spawns_for_existing_live_source() {
668 let registry = FragmentBroadcasterRegistry::new();
669 let handle = TranscodeRunner::new()
670 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
671 .install(®istry);
672
673 let bc = registry.get_or_create("live/x", "0.mp4", meta());
674 bc.emit(frag(0));
675 tokio::time::sleep(Duration::from_millis(60)).await;
676
677 assert!(handle.add_rendition(Arc::new(
680 PassthroughTranscoderFactory::new(RenditionSpec::preset_480p())
681 )));
682 tokio::time::sleep(Duration::from_millis(60)).await;
683
684 bc.emit(frag(1));
685 bc.emit(frag(2));
686 tokio::time::sleep(Duration::from_millis(120)).await;
687
688 assert_eq!(handle.fragments_seen("passthrough", "720p", "live/x", "0.mp4"), 3);
689 let s480 = handle.fragments_seen("passthrough", "480p", "live/x", "0.mp4");
690 assert!(s480 >= 2, "runtime-added 480p must see post-add fragments; saw {s480}");
691 }
692
693 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
694 async fn add_rendition_rejects_duplicate_name() {
695 let registry = FragmentBroadcasterRegistry::new();
696 let handle = TranscodeRunner::new()
697 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
698 .install(®istry);
699
700 assert!(!handle.add_rendition(Arc::new(
702 PassthroughTranscoderFactory::new(RenditionSpec::preset_720p())
703 )));
704 assert!(handle.add_rendition(Arc::new(
706 PassthroughTranscoderFactory::new(RenditionSpec::preset_240p())
707 )));
708 }
709
710 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
711 async fn remove_rendition_aborts_its_drain_and_leaves_others() {
712 let registry = FragmentBroadcasterRegistry::new();
713 let handle = TranscodeRunner::new()
714 .with_ladder(RenditionSpec::default_ladder(), PassthroughTranscoderFactory::new)
715 .install(®istry);
716
717 let bc = registry.get_or_create("live/r", "0.mp4", meta());
718 bc.emit(frag(0));
719 tokio::time::sleep(Duration::from_millis(80)).await;
720
721 let aborted = handle.remove_rendition("480p");
722 assert_eq!(aborted, 1, "exactly the 480p drain task aborts");
723 tokio::time::sleep(Duration::from_millis(40)).await;
724
725 bc.emit(frag(1));
726 bc.emit(frag(2));
727 tokio::time::sleep(Duration::from_millis(120)).await;
728
729 let s720 = handle.fragments_seen("passthrough", "720p", "live/r", "0.mp4");
730 let s480 = handle.fragments_seen("passthrough", "480p", "live/r", "0.mp4");
731 assert!(s720 >= 3, "surviving 720p keeps draining; saw {s720}");
732 assert!(s480 < s720, "removed 480p stopped draining ({s480}) vs 720p ({s720})");
733
734 let names: Vec<String> = handle.renditions().iter().map(|r| r.name.clone()).collect();
735 assert!(!names.contains(&"480p".to_string()), "480p gone from ladder: {names:?}");
736 assert!(names.contains(&"720p".to_string()));
737 }
738
739 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
740 async fn does_not_transcode_a_rendition_output_broadcast() {
741 let registry = FragmentBroadcasterRegistry::new();
746 let handle = TranscodeRunner::new()
747 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
748 .install(®istry);
749
750 let src = registry.get_or_create("live/x", "0.mp4", meta());
751 let output = registry.get_or_create("live/x/720p", "0.mp4", meta());
752 src.emit(frag(0));
753 output.emit(frag(0));
754 tokio::time::sleep(Duration::from_millis(100)).await;
755
756 assert_eq!(handle.fragments_seen("passthrough", "720p", "live/x", "0.mp4"), 1);
757 assert_eq!(
758 handle.fragments_seen("passthrough", "720p", "live/x/720p", "0.mp4"),
759 0,
760 "output-shaped broadcast must not be transcoded"
761 );
762 }
763
764 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
765 async fn two_factories_share_a_rendition_name() {
766 struct AltFactory {
771 rendition: RenditionSpec,
772 }
773 impl TranscoderFactory for AltFactory {
774 fn name(&self) -> &str {
775 "alt"
776 }
777 fn rendition(&self) -> &RenditionSpec {
778 &self.rendition
779 }
780 fn build(&self, _ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
781 None
782 }
783 }
784
785 let registry = FragmentBroadcasterRegistry::new();
786 let handle = TranscodeRunner::new()
787 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
788 .install(®istry);
789
790 assert!(handle.add_rendition(Arc::new(AltFactory {
792 rendition: RenditionSpec::preset_720p(),
793 })));
794 assert_eq!(handle.renditions().len(), 2, "two factories, both for 720p");
795
796 handle.remove_rendition("720p");
798 assert!(handle.renditions().is_empty());
799 }
800
801 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
802 async fn renditions_reflects_runtime_edits() {
803 let registry = FragmentBroadcasterRegistry::new();
804 let handle = TranscodeRunner::new()
805 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_720p()))
806 .install(®istry);
807
808 let names =
809 |h: &TranscodeRunnerHandle| -> Vec<String> { h.renditions().iter().map(|r| r.name.clone()).collect() };
810 assert_eq!(names(&handle), vec!["720p".to_string()]);
811
812 assert!(handle.add_rendition(Arc::new(
813 PassthroughTranscoderFactory::new(RenditionSpec::preset_480p())
814 )));
815 let mut after_add = names(&handle);
816 after_add.sort();
817 assert_eq!(after_add, vec!["480p".to_string(), "720p".to_string()]);
818
819 assert_eq!(
820 handle.remove_rendition("720p"),
821 0,
822 "no live source, so no task to abort"
823 );
824 assert_eq!(names(&handle), vec!["480p".to_string()]);
825 }
826
827 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
828 async fn downstream_subscriber_still_sees_every_fragment() {
829 let registry = FragmentBroadcasterRegistry::new();
834 let _handle = TranscodeRunner::new()
835 .with_factory(PassthroughTranscoderFactory::new(RenditionSpec::preset_240p()))
836 .install(®istry);
837
838 let bc = registry.get_or_create("live/fanout", "0.mp4", meta());
839 let mut downstream = bc.subscribe();
840 let emitted = PMutex::new(Vec::<u64>::new());
841 for i in 0..4 {
842 bc.emit(frag(i));
843 emitted.lock().push(i);
844 }
845 tokio::time::sleep(Duration::from_millis(100)).await;
846 for expected in 0..4u64 {
847 let f = downstream.next_fragment().await.expect("downstream frag");
848 assert_eq!(f.group_id, expected);
849 }
850 }
851}