lvqr-ingest 1.1.0

RTMP/WHIP/SRT ingest translated to MoQ tracks for LVQR
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
/// Bridge between RTMP ingest and MoQ Origin.
///
/// When an RTMP publisher connects, this module creates a MoQ broadcast
/// with CMAF-formatted tracks. Video and audio data from RTMP is remuxed
/// from FLV to fMP4 (CMAF) segments, compatible with MSE browser playback
/// and the MoQ ecosystem (moq-js).
use bytes::Bytes;
use dashmap::DashMap;
use lvqr_auth::{NoopAuthProvider, SharedAuth, extract};
use lvqr_core::{EventBus, RelayEvent, now_unix_ms};
use lvqr_fragment::{
    Fragment, FragmentBroadcasterRegistry, FragmentFlags, FragmentMeta, MoqTimingTrackSink, MoqTrackSink,
    TIMING_TRACK_NAME,
};
use lvqr_moq::Track;
use std::sync::Arc;
use tracing::{debug, info, warn};

use crate::dispatch::{publish_fragment, publish_init};
use crate::observer::SharedRawSampleObserver;
use crate::remux::{
    AudioConfig, FlvAudioTag, FlvVideoTag, VideoConfig, audio_init_segment, audio_segment, extract_resolution,
    generate_catalog, parse_audio_tag, parse_video_tag, video_init_segment_with_size,
};
use crate::rtmp::{AuthCallback, MediaCallback, RtmpConfig, RtmpServer, Scte35Callback, StreamCallback};

/// State for a single active RTMP stream being bridged to MoQ.
///
/// The track writes go through [`MoqTrackSink`] so this module is a
/// `Fragment`-shaped producer: every branch below constructs a `Fragment`
/// and calls `sink.push(..)`. This is the Tier 2.1 migration of the RTMP
/// bridge to the Unified Fragment Model.
struct ActiveStream {
    _broadcast: lvqr_moq::BroadcastProducer,
    video_sink: MoqTrackSink,
    audio_sink: MoqTrackSink,
    catalog_track: lvqr_moq::TrackProducer,
    /// Sibling `<broadcast>/0.timing` track sink (Phase A v1.1 #5,
    /// session 159). Emits one 16-byte LE anchor
    /// `(group_id_u64_le || ingest_time_ms_u64_le)` per video keyframe
    /// so pure-MoQ subscribers can compute glass-to-glass latency.
    /// `None` when track creation failed at broadcast-start; the
    /// video path keeps running unaffected.
    timing_sink: Option<MoqTimingTrackSink>,
    // Codec configuration (set when sequence headers arrive)
    video_config: Option<VideoConfig>,
    audio_config: Option<AudioConfig>,
    video_init: Option<Bytes>,
    audio_init: Option<Bytes>,
    // Segment sequencing
    video_seq: u32,
    audio_seq: u32,
    // DTS tracking (90kHz for video, sample_rate for audio)
    last_video_ts: Option<u32>,
}

/// Bridges RTMP ingest to a MoQ OriginProducer.
///
/// Creates MoQ broadcasts for each RTMP stream and remuxes video/audio
/// data from FLV to CMAF/fMP4 segments.
pub struct RtmpMoqBridge {
    origin: lvqr_moq::OriginProducer,
    streams: Arc<DashMap<String, ActiveStream>>,
    auth: SharedAuth,
    events: Option<EventBus>,
    raw_observer: Option<SharedRawSampleObserver>,
    /// Broadcaster registry every emitted fragment is published on.
    /// Consumers (archive, LL-HLS, DASH) subscribe through
    /// `on_entry_created` callbacks and drain fragments out of the
    /// per-broadcaster streams.
    registry: FragmentBroadcasterRegistry,
}

impl RtmpMoqBridge {
    pub fn new(origin: lvqr_moq::OriginProducer) -> Self {
        Self {
            origin,
            streams: Arc::new(DashMap::new()),
            auth: Arc::new(NoopAuthProvider),
            events: None,
            raw_observer: None,
            registry: FragmentBroadcasterRegistry::new(),
        }
    }

    /// Construct with a specific auth provider.
    pub fn with_auth(origin: lvqr_moq::OriginProducer, auth: SharedAuth) -> Self {
        Self {
            origin,
            streams: Arc::new(DashMap::new()),
            auth,
            events: None,
            raw_observer: None,
            registry: FragmentBroadcasterRegistry::new(),
        }
    }

    /// Install an externally-owned broadcaster registry. Used when
    /// multiple ingest protocols share one registry.
    pub fn with_registry(mut self, registry: FragmentBroadcasterRegistry) -> Self {
        self.registry = registry;
        self
    }

    /// Handle to the broadcaster registry.
    pub fn registry(&self) -> FragmentBroadcasterRegistry {
        self.registry.clone()
    }

    /// Replace the auth provider after construction.
    pub fn set_auth(&mut self, auth: SharedAuth) {
        self.auth = auth;
    }

    /// Attach an `EventBus` so the bridge emits `BroadcastStarted` and
    /// `BroadcastStopped` events whenever an RTMP publisher connects or
    /// disconnects. Subscribers on the bus (e.g. the recorder) can react
    /// without polling `stream_names()`.
    pub fn with_events(mut self, events: EventBus) -> Self {
        self.events = Some(events);
        self
    }

    /// Attach or replace the event bus after construction.
    pub fn set_event_bus(&mut self, events: EventBus) {
        self.events = Some(events);
    }

    /// Attach a [`crate::RawSampleObserver`] so the bridge fans every
    /// per-NAL video sample and every raw AAC audio access unit out
    /// to a consumer that needs pre-mux access (notably the future
    /// WHEP RTP packetizer). Read-only tap; does not alter the
    /// fragment / MoQ / HLS paths.
    pub fn with_raw_sample_observer(mut self, observer: SharedRawSampleObserver) -> Self {
        self.raw_observer = Some(observer);
        self
    }

    /// Attach or replace the [`crate::RawSampleObserver`] after
    /// construction.
    pub fn set_raw_sample_observer(&mut self, observer: SharedRawSampleObserver) {
        self.raw_observer = Some(observer);
    }

    /// Create an RTMP server wired to this bridge.
    pub fn create_rtmp_server(&self, config: RtmpConfig) -> RtmpServer {
        let origin = self.origin.clone();
        let streams_publish = self.streams.clone();
        let streams_unpublish = self.streams.clone();
        let streams_video = self.streams.clone();
        let streams_audio = self.streams.clone();
        let events_publish = self.events.clone();
        let events_unpublish = self.events.clone();

        let raw_observer_video = self.raw_observer.clone();
        let raw_observer_audio = self.raw_observer.clone();
        let registry_video = self.registry.clone();
        let registry_audio = self.registry.clone();
        let registry_unpublish = self.registry.clone();

        let on_publish: StreamCallback = Arc::new(move |app: &str, key: &str| {
            let stream_name = format!("{app}/{key}");
            info!(stream = %stream_name, "creating MoQ broadcast for RTMP stream");

            let Some(mut broadcast) = origin.create_broadcast(&stream_name) else {
                warn!(stream = %stream_name, "broadcast not allowed by origin");
                return;
            };

            // Track names compatible with moq-js CMAF convention
            let video_track = match broadcast.create_track(Track::new("0.mp4")) {
                Ok(t) => t,
                Err(e) => {
                    warn!(stream = %stream_name, error = ?e, "failed to create video track");
                    return;
                }
            };

            let audio_track = match broadcast.create_track(Track::new("1.mp4")) {
                Ok(t) => t,
                Err(e) => {
                    warn!(stream = %stream_name, error = ?e, "failed to create audio track");
                    return;
                }
            };

            let catalog_track = match broadcast.create_track(Track::new(".catalog")) {
                Ok(t) => t,
                Err(e) => {
                    warn!(stream = %stream_name, error = ?e, "failed to create catalog track");
                    return;
                }
            };

            // Session 159 PATH-X: sibling `<broadcast>/0.timing` MoQ
            // track for pure-MoQ glass-to-glass SLO sampling. Failure
            // to create it is logged + tolerated -- the video path
            // keeps running, the timing sink stays `None`, and the
            // bin's subscribe blind probably fails silently which
            // matches the "best-effort SLO push" contract.
            let timing_sink = match broadcast.create_track(Track::new(TIMING_TRACK_NAME)) {
                Ok(t) => Some(MoqTimingTrackSink::new(t)),
                Err(e) => {
                    warn!(
                        stream = %stream_name,
                        error = ?e,
                        "failed to create 0.timing track; pure-MoQ SLO sampling disabled for this broadcast"
                    );
                    None
                }
            };

            metrics::gauge!("lvqr_active_streams").increment(1.0);
            if let Some(bus) = &events_publish {
                bus.emit(RelayEvent::BroadcastStarted {
                    name: stream_name.clone(),
                });
            }
            // Build Fragment sinks around the freshly-created TrackProducers.
            // Init segments are not yet known (they arrive with the FLV
            // sequence headers); set_init_segment is called when they do.
            let video_sink = MoqTrackSink::new(video_track, FragmentMeta::new("avc1", 90000));
            let audio_sink = MoqTrackSink::new(audio_track, FragmentMeta::new("mp4a", 0));
            streams_publish.insert(
                stream_name,
                ActiveStream {
                    _broadcast: broadcast,
                    video_sink,
                    audio_sink,
                    catalog_track,
                    timing_sink,
                    video_config: None,
                    audio_config: None,
                    video_init: None,
                    audio_init: None,
                    video_seq: 0,
                    audio_seq: 0,
                    last_video_ts: None,
                },
            );
        });

        let on_unpublish: StreamCallback = Arc::new(move |app: &str, key: &str| {
            let stream_name = format!("{app}/{key}");
            if let Some((_, mut stream)) = streams_unpublish.remove(&stream_name) {
                // Explicitly close any open video group. Dropping the sink
                // would also do this, but being explicit makes the unpublish
                // path obvious to readers.
                stream.video_sink.finish_current_group();
                metrics::gauge!("lvqr_active_streams").decrement(1.0);
                if let Some(bus) = &events_unpublish {
                    bus.emit(RelayEvent::BroadcastStopped {
                        name: stream_name.clone(),
                    });
                }
                // Drop the shared FragmentBroadcaster entries for this
                // stream so consumer-side drain tasks (archive indexer,
                // LL-HLS bridge, etc.) see next_fragment() return None and
                // run their broadcast-end finalize paths. Tier 4 item 4.3
                // session B3 added this to drive C2PA finalize-on-unpublish
                // via the registry's on_entry_removed lifecycle hook. Prior
                // to this, registry entries lived until server shutdown so
                // per-broadcast finalize never fired in production.
                registry_unpublish.remove(&stream_name, "0.mp4");
                registry_unpublish.remove(&stream_name, "1.mp4");
                info!(stream = %stream_name, "removed MoQ broadcast");
            }
        });

        let on_video: MediaCallback = Arc::new(move |app: &str, key: &str, data: Bytes, timestamp: u32| {
            let stream_name = format!("{app}/{key}");
            let Some(mut entry) = streams_video.get_mut(&stream_name) else {
                return;
            };
            let stream = entry.value_mut();

            match parse_video_tag(&data) {
                FlvVideoTag::SequenceHeader(config) => {
                    let (width, height) = config
                        .sps_list
                        .first()
                        .and_then(|sps| extract_resolution(sps))
                        .unwrap_or((0, 0));
                    debug!(
                        stream = %stream_name,
                        codec = %config.codec_string(),
                        width, height,
                        "video sequence header"
                    );
                    let init = video_init_segment_with_size(&config, width as u16, height as u16);
                    // Hand the init segment to the sink so every new MoQ
                    // group starts with it.
                    stream.video_sink.set_init_segment(init.clone());
                    // Deliver the SPS/PPS parameter sets to raw-sample
                    // observers (notably WHEP) as an Annex B blob.
                    // RTMP keyframes carry only the IDR slice; the
                    // parameter sets live here in the sequence header,
                    // and a WebRTC subscriber needs them in-band before
                    // the IDR. Built before `config` is moved into the
                    // stream below.
                    if let Some(obs) = raw_observer_video.as_ref() {
                        let param_sets = annex_b_param_sets(&config);
                        if !param_sets.is_empty() {
                            obs.on_video_config(&stream_name, "0.mp4", crate::MediaCodec::H264, &param_sets);
                        }
                    }
                    stream.video_config = Some(config);
                    stream.video_init = Some(init.clone());
                    // Video init writer is hardcoded to 90 kHz
                    // (see `video_init_segment_with_size` ->
                    // `mvhd.timescale = 90000`), so the bridge
                    // reports the same value here.
                    publish_init(&registry_video, &stream_name, "0.mp4", "avc1", 90_000, init);
                    maybe_write_catalog(stream, &stream_name);
                }
                FlvVideoTag::Nalu {
                    keyframe,
                    cts,
                    data: nalu_data,
                } => {
                    if stream.video_config.is_none() || stream.video_init.is_none() {
                        return; // no sequence header yet
                    };

                    metrics::counter!("lvqr_frames_published_total", "type" => "video").increment(1);
                    metrics::counter!("lvqr_bytes_ingested_total", "type" => "video").increment(nalu_data.len() as u64);

                    // Compute duration from timestamp delta (default 33ms = ~30fps)
                    let duration_ms = match stream.last_video_ts {
                        Some(prev) => timestamp.saturating_sub(prev),
                        None => 33,
                    };
                    stream.last_video_ts = Some(timestamp);
                    let duration_ticks = duration_ms * 90; // 90kHz timescale
                    let base_dts = (timestamp as u64) * 90;

                    let sample = lvqr_cmaf::RawSample {
                        track_id: 1,
                        dts: base_dts,
                        cts_offset: cts * 90, // ms to 90kHz
                        duration: duration_ticks,
                        payload: nalu_data,
                        keyframe,
                    };

                    let ingest_ms = now_unix_ms();
                    if let Some(obs) = raw_observer_video.as_ref() {
                        // RTMP / FLV ingest is AVC-only in LVQR today;
                        // HEVC-over-RTMP (enhanced-RTMP) lives in a
                        // later session. The codec tag is therefore
                        // constant here, unlike the WHIP bridge.
                        // Session 110 B: stamp the same wall-clock
                        // `now_unix_ms()` onto the raw-sample
                        // observer call and the sibling `Fragment`
                        // below, so the WHEP egress SLO sample for
                        // this sample lines up with the HLS + DASH
                        // samples the downstream drains record from
                        // `Fragment::ingest_time_ms`.
                        obs.on_raw_sample(&stream_name, "0.mp4", crate::MediaCodec::H264, &sample, ingest_ms);
                    }

                    stream.video_seq += 1;
                    let seg = lvqr_cmaf::build_moof_mdat(stream.video_seq, 1, base_dts, &[sample]);

                    // Build a Fragment and push it through the sink. On a
                    // keyframe the sink closes the previous group, opens a
                    // new one, and prepends the init segment from
                    // FragmentMeta. On a delta the sink writes into the
                    // open group.
                    let flags = if keyframe {
                        FragmentFlags::KEYFRAME
                    } else {
                        FragmentFlags::DELTA
                    };
                    let frag = Fragment::new(
                        "0.mp4",
                        stream.video_seq as u64,
                        0,
                        0,
                        base_dts,
                        base_dts.saturating_add((cts as u64) * 90),
                        duration_ticks as u64,
                        flags,
                        seg,
                    )
                    .with_ingest_time_ms(ingest_ms);
                    let push_result = stream.video_sink.push(&frag);
                    match push_result {
                        Ok(Some(group_seq)) => {
                            // Session 159: keyframe opened a new MoQ group.
                            // Emit a sibling-track timing anchor so pure-MoQ
                            // subscribers can join (group_seq, ingest_time_ms)
                            // and compute glass-to-glass latency. Skip the
                            // push when ingest_time_ms is unset (zero) -- a
                            // (group_id, 0) anchor would compute 60-year
                            // latency on the subscriber side and corrupt the
                            // SLO histogram.
                            if frag.ingest_time_ms != 0 {
                                if let Some(timing) = stream.timing_sink.as_mut() {
                                    if let Err(e) = timing.push_anchor(group_seq, frag.ingest_time_ms) {
                                        debug!(
                                            error = ?e,
                                            group = group_seq,
                                            "failed to push timing anchor"
                                        );
                                    }
                                }
                            }
                        }
                        Ok(None) => {
                            // Delta or pre-keyframe drop; no timing anchor.
                        }
                        Err(e) => {
                            debug!(error = ?e, "failed to push video fragment through sink");
                        }
                    }
                    publish_fragment(&registry_video, &stream_name, "0.mp4", "avc1", 90_000, frag);
                }
                FlvVideoTag::EndOfSequence => {
                    stream.video_sink.finish_current_group();
                }
                FlvVideoTag::Unknown => {}
            }
        });

        let on_audio: MediaCallback = Arc::new(move |app: &str, key: &str, data: Bytes, timestamp: u32| {
            let stream_name = format!("{app}/{key}");
            let Some(mut entry) = streams_audio.get_mut(&stream_name) else {
                return;
            };
            let stream = entry.value_mut();

            match parse_audio_tag(&data) {
                FlvAudioTag::SequenceHeader(config) => {
                    debug!(stream = %stream_name, codec = %config.codec_string(), "audio sequence header");
                    // Capture the sample rate before moving `config`
                    // into `stream.audio_config`; the observer needs
                    // it as the track timescale so the LL-HLS bridge
                    // can build a `CmafPolicy` that matches the real
                    // audio sample rate (44.1 kHz vs 48 kHz vs other).
                    let audio_timescale = config.sample_rate;
                    let init = audio_init_segment(&config);
                    let asc = config.asc.clone();
                    stream.audio_sink.set_init_segment(init.clone());
                    stream.audio_config = Some(config);
                    stream.audio_init = Some(init.clone());
                    if let Some(obs) = raw_observer_audio.as_ref() {
                        // Session 113: fire the codec-config hook so
                        // the WHEP AAC-to-Opus transcoder has the
                        // AudioSpecificConfig before the first raw
                        // AAC sample arrives.
                        obs.on_audio_config(&stream_name, "1.mp4", crate::MediaCodec::Aac, &asc);
                    }
                    publish_init(
                        &registry_audio,
                        &stream_name,
                        "1.mp4",
                        "mp4a.40.2",
                        audio_timescale,
                        init,
                    );
                    maybe_write_catalog(stream, &stream_name);
                }
                FlvAudioTag::RawAac(aac_data) => {
                    let Some(ref config) = stream.audio_config else {
                        return;
                    };
                    if stream.audio_init.is_none() {
                        return;
                    }

                    metrics::counter!("lvqr_frames_published_total", "type" => "audio").increment(1);
                    metrics::counter!("lvqr_bytes_ingested_total", "type" => "audio").increment(aac_data.len() as u64);

                    stream.audio_seq += 1;
                    // AAC-LC uses 1024 samples per frame at the audio sample rate
                    let duration: u32 = 1024;
                    let base_dts = (timestamp as u64) * (config.sample_rate as u64) / 1000;

                    let ingest_ms = now_unix_ms();
                    if let Some(obs) = raw_observer_audio.as_ref() {
                        let sample = lvqr_cmaf::RawSample {
                            track_id: 2,
                            dts: base_dts,
                            cts_offset: 0,
                            duration,
                            payload: aac_data.clone(),
                            keyframe: true,
                        };
                        // RTMP audio is always AAC in LVQR today.
                        // Session 30 introduced the `Aac` variant
                        // on `MediaCodec` so this call no longer
                        // needs a placeholder value; downstream
                        // consumers can correctly distinguish
                        // RTMP-sourced AAC from WHIP-sourced Opus.
                        obs.on_raw_sample(&stream_name, "1.mp4", crate::MediaCodec::Aac, &sample, ingest_ms);
                    }

                    let seg = audio_segment(stream.audio_seq, base_dts, duration, &aac_data);

                    // Every audio fragment opens its own MoQ group (audio
                    // frames are independently decodable in AAC-LC), so we
                    // tag every one as a keyframe for the sink's purposes.
                    // The sink will close the previous group and open a new
                    // one on each push.
                    let frag = Fragment::new(
                        "1.mp4",
                        stream.audio_seq as u64,
                        0,
                        0,
                        base_dts,
                        base_dts,
                        duration as u64,
                        FragmentFlags::KEYFRAME,
                        seg,
                    )
                    .with_ingest_time_ms(ingest_ms);
                    if let Err(e) = stream.audio_sink.push(&frag) {
                        debug!(error = ?e, "failed to push audio fragment through sink");
                    }
                    let audio_timescale = stream.audio_config.as_ref().map(|c| c.sample_rate).unwrap_or(44100);
                    publish_fragment(
                        &registry_audio,
                        &stream_name,
                        "1.mp4",
                        "mp4a.40.2",
                        audio_timescale,
                        frag,
                    );
                    // Close the group immediately so every audio frame is
                    // its own group on the wire. Subsequent pushes will
                    // open fresh groups.
                    stream.audio_sink.finish_current_group();
                }
                FlvAudioTag::Unknown => {}
            }
        });

        let mut server = RtmpServer::from_callbacks(config, on_video, on_audio, on_publish, on_unpublish);

        // Wire the bridge's auth provider into RTMP publish validation.
        let auth = self.auth.clone();
        let validate: AuthCallback =
            Arc::new(move |app: &str, key: &str| auth.check(&extract::extract_rtmp(app, key)).is_allow());
        server.set_validate_publish(validate);

        // Session 152 RTMP unblock: wire the SCTE-35 onCuePoint
        // scte35-bin64 carriage into the shared
        // FragmentBroadcasterRegistry's reserved `"scte35"` track,
        // mirroring the SRT MPEG-TS PID 0x86 path. The patched
        // rml_rtmp `Amf0DataReceived` event surfaces these messages;
        // rtmp.rs's `parse_oncuepoint_scte35` decodes the AMF0 object
        // and base64; here we re-parse via lvqr-codec for CRC
        // verification + timing extraction, then publish onto the
        // registry. CRC failures and unparseable sections drop with
        // a counter bump (parity with the SRT path).
        let registry_scte35 = self.registry.clone();
        let on_scte35: Scte35Callback = Arc::new(move |app: &str, key: &str, raw: Bytes| {
            let stream_name = format!("{app}/{key}");
            match lvqr_codec::parse_splice_info_section(&raw) {
                Ok(info) => {
                    let pts = info.absolute_pts().unwrap_or(0);
                    let duration = info.duration.unwrap_or(0);
                    let event_id = info.event_id.unwrap_or(0) as u64;
                    crate::publish_scte35(&registry_scte35, &stream_name, event_id, pts, duration, raw);
                    metrics::counter!(
                        "lvqr_scte35_events_total",
                        "ingest" => "rtmp",
                        "command" => format!("{:#04x}", info.command_type),
                    )
                    .increment(1);
                }
                Err(e) => {
                    let reason = match &e {
                        lvqr_codec::CodecError::Scte35BadCrc { .. } => "crc",
                        lvqr_codec::CodecError::Scte35Malformed(_) => "malformed",
                        lvqr_codec::CodecError::EndOfStream { .. } => "truncated",
                        _ => "other",
                    };
                    warn!(stream = %stream_name, error = %e, "RTMP scte35-bin64 parse failure");
                    metrics::counter!(
                        "lvqr_scte35_drops_total",
                        "ingest" => "rtmp",
                        "reason" => reason,
                    )
                    .increment(1);
                }
            }
        });
        server.set_scte35_callback(on_scte35);
        server
    }

    /// Number of active RTMP streams being bridged.
    pub fn active_stream_count(&self) -> usize {
        self.streams.len()
    }

    /// Names of active RTMP streams (e.g. "live/mystream").
    pub fn stream_names(&self) -> Vec<String> {
        self.streams.iter().map(|e| e.key().clone()).collect()
    }
}

/// Build an Annex B byte stream of the H.264 parameter-set NALUs
/// (every SPS then every PPS, each 4-byte start-code prefixed) from an
/// FLV AVC sequence header.
///
/// WHEP prepends this verbatim before a keyframe IDR (RTMP keyframes
/// carry only the picture NALUs; the parameter sets live in the
/// sequence header). Returns an empty `Vec` when the config carries no
/// parameter sets.
fn annex_b_param_sets(config: &VideoConfig) -> Vec<u8> {
    const START_CODE: [u8; 4] = [0x00, 0x00, 0x00, 0x01];
    let mut out = Vec::new();
    for nal in config.sps_list.iter().chain(config.pps_list.iter()) {
        if nal.is_empty() {
            continue;
        }
        out.extend_from_slice(&START_CODE);
        out.extend_from_slice(nal);
    }
    out
}

/// Write the catalog track whenever codec configuration changes.
///
/// Each call creates a new MoQ group, so late-joining subscribers always
/// get the latest catalog. This handles the case where audio config arrives
/// after video config -- the catalog is rewritten with both tracks.
fn maybe_write_catalog(stream: &mut ActiveStream, stream_name: &str) {
    if stream.video_config.is_none() && stream.audio_config.is_none() {
        return;
    }

    let catalog_json = generate_catalog(stream.video_config.as_ref(), stream.audio_config.as_ref());

    match stream.catalog_track.append_group() {
        Ok(mut group) => {
            if let Err(e) = group.write_frame(Bytes::from(catalog_json)) {
                debug!(error = ?e, "failed to write catalog");
                return;
            }
            let _ = group.finish();
            info!(
                stream = %stream_name,
                has_video = stream.video_config.is_some(),
                has_audio = stream.audio_config.is_some(),
                "catalog published"
            );
        }
        Err(e) => {
            debug!(error = ?e, "failed to append catalog group");
        }
    }
}