Skip to main content

lvqr_cli/
lib.rs

1//! LVQR server library entry point.
2//!
3//! `main.rs` parses CLI args and hands off to [`start`]; tests and embedders
4//! can call [`start`] directly with a pre-built [`ServeConfig`]. Every
5//! listener (MoQ/QUIC, RTMP/TCP, admin/TCP) is bound inside `start` before
6//! it returns, so callers who pass `port: 0` can read the real bound
7//! addresses back off the returned [`ServerHandle`] and point test clients
8//! at them without polling.
9//!
10//! This is the library target used by `lvqr-test-utils::TestServer` to
11//! spin up a full-stack LVQR instance on ephemeral ports inside
12//! integration tests.
13
14mod archive;
15#[cfg(feature = "cluster")]
16pub mod cluster_claim;
17mod hls;
18
19use anyhow::Result;
20use axum::extract::ws::{Message, WebSocket};
21use axum::extract::{Path, Query, State, WebSocketUpgrade};
22use axum::http::{HeaderMap, StatusCode};
23use axum::response::{IntoResponse, Response};
24use axum::routing::get;
25use bytes::Bytes;
26use lvqr_auth::{AuthContext, AuthDecision, NoopAuthProvider, SharedAuth, extract};
27use lvqr_core::{EventBus, RelayEvent};
28use lvqr_dash::{BroadcasterDashBridge, DashConfig};
29use lvqr_fragment::FragmentBroadcasterRegistry;
30use lvqr_hls::{MultiHlsServer, PlaylistBuilderConfig};
31use lvqr_moq::Track;
32use std::collections::HashMap;
33use std::net::SocketAddr;
34use std::path::PathBuf;
35use std::sync::Arc;
36use std::sync::atomic::Ordering;
37use tokio_util::sync::CancellationToken;
38use tower_http::cors::CorsLayer;
39
40#[cfg(feature = "c2pa")]
41use crate::archive::verify_router;
42use crate::archive::{BroadcasterArchiveIndexer, playback_router};
43use crate::hls::BroadcasterHlsBridge;
44
45/// Configuration passed to [`start`] to bring up a full-stack LVQR server.
46///
47/// Every `*_addr` field accepts port `0` for ephemeral port binding; the
48/// real bound address is reported back through [`ServerHandle`].
49#[derive(Clone)]
50pub struct ServeConfig {
51    /// QUIC/MoQ relay bind address.
52    pub relay_addr: SocketAddr,
53    /// RTMP ingest bind address.
54    pub rtmp_addr: SocketAddr,
55    /// Admin HTTP (and WS relay/ingest) bind address.
56    pub admin_addr: SocketAddr,
57    /// Optional LL-HLS HTTP bind address. When `Some`, `start()` spins up a
58    /// dedicated `HlsServer` axum router on this address that observes the
59    /// RTMP bridge's fragment output and serves `/playlist.m3u8`,
60    /// `/init.mp4`, and the per-chunk media URIs the playlist references.
61    /// When `None`, no HLS surface is exposed.
62    pub hls_addr: Option<SocketAddr>,
63    /// DVR window depth in seconds for the LL-HLS sliding-window
64    /// eviction. Translated to `max_segments = dvr_secs /
65    /// target_duration_secs` at server construction. 0 means
66    /// unbounded (no eviction). Default: 120 (60 segments at 2 s).
67    pub hls_dvr_window_secs: u32,
68    /// LL-HLS target segment duration in seconds. Controls both the
69    /// `EXT-X-TARGETDURATION` declaration and the CMAF segmenter's
70    /// segment-close policy. Default: 2.
71    pub hls_target_duration_secs: u32,
72    /// LL-HLS target partial (chunk) duration in milliseconds.
73    /// Controls both `EXT-X-PART-INF:PART-TARGET` and the CMAF
74    /// segmenter's partial-close policy. Default: 200.
75    pub hls_part_target_ms: u32,
76    /// Optional WHEP (WebRTC HTTP Egress Protocol) HTTP bind address.
77    /// When `Some`, `start()` constructs a `Str0mAnswerer` and a
78    /// `WhepServer`, attaches the server as a `RawSampleObserver` on
79    /// the RTMP bridge, and spins up an axum router on this address
80    /// that accepts `POST /whep/{broadcast}` SDP offers, answers
81    /// them via `str0m`, and fans each ingest sample into every
82    /// subscribed session. When `None`, no WHEP surface is exposed
83    /// and no `str0m` state is constructed.
84    pub whep_addr: Option<SocketAddr>,
85    /// Optional WHIP (WebRTC HTTP Ingest Protocol) HTTP bind
86    /// address. When `Some`, `start()` constructs a
87    /// `Str0mIngestAnswerer` and a `WhipMoqBridge`, attaches it
88    /// as an ingest sink, and spins up an axum router on this
89    /// address that accepts `POST /whip/{broadcast}` SDP offers.
90    /// When `None`, no WHIP surface is exposed.
91    pub whip_addr: Option<SocketAddr>,
92    /// Optional MPEG-DASH HTTP bind address. When `Some`, `start()`
93    /// spins up a `MultiDashServer` axum router on this address
94    /// that observes the same fragment stream the LL-HLS bridge
95    /// observes and serves `/dash/{broadcast}/manifest.mpd` plus
96    /// numbered segment URIs. RTMP and WHIP publishers both feed
97    /// the DASH egress without any per-protocol wiring.
98    pub dash_addr: Option<SocketAddr>,
99    /// Optional RTSP ingest bind address. When `Some`, `start()`
100    /// spins up an `RtspServer` on this TCP address that accepts
101    /// RTSP ANNOUNCE/RECORD sessions with interleaved RTP and fans
102    /// depacketized H.264/HEVC through the fragment observer chain.
103    pub rtsp_addr: Option<SocketAddr>,
104    /// Optional SRT ingest bind address. When `Some`, `start()`
105    /// spins up an `SrtIngestServer` on this UDP address that
106    /// accepts MPEG-TS streams and fans them through the fragment
107    /// observer chain.
108    pub srt_addr: Option<SocketAddr>,
109    /// Enable the peer mesh coordinator and `/signal` endpoint.
110    pub mesh_enabled: bool,
111    /// Max children per mesh parent when `mesh_enabled`.
112    pub max_peers: usize,
113    /// Pre-built auth provider. `None` means open access (`NoopAuthProvider`).
114    pub auth: Option<SharedAuth>,
115    /// Recording directory. `None` disables recording.
116    pub record_dir: Option<PathBuf>,
117    /// DVR archive directory. When `Some`, `start()` opens a
118    /// `RedbSegmentIndex` under `<archive_dir>/archive.redb` and
119    /// attaches an archiving fragment observer to the RTMP bridge
120    /// that writes every emitted fragment to
121    /// `<archive_dir>/<broadcast>/<track>/<seq>.m4s` and records a
122    /// `SegmentRef` against the index. The index + segment files
123    /// back the DVR scrub / time-range playback surface (Tier 2.4).
124    pub archive_dir: Option<PathBuf>,
125    /// Optional C2PA provenance configuration. When set, every
126    /// `(broadcast, track)` drained by the archive indexer runs the
127    /// broadcast-end finalize path on drain termination (the moment
128    /// every producer-side clone of the broadcaster drops), which
129    /// writes `finalized.mp4` + `finalized.c2pa` next to the segment
130    /// files. The admin router also mounts `GET /playback/verify/
131    /// {broadcast}` for verifying the resulting manifest. Feature-
132    /// gated: the field is accessible only when `lvqr-cli` is built
133    /// with `--features c2pa` so the `c2pa` transitive closure does
134    /// not leak into deployments that do not need provenance.
135    /// Tier 4 item 4.3 session B3.
136    #[cfg(feature = "c2pa")]
137    pub c2pa: Option<lvqr_archive::provenance::C2paConfig>,
138    /// Optional path to a WASM fragment filter module. When set,
139    /// `start()` loads + compiles the module via
140    /// `lvqr_wasm::WasmFilter::load` and installs a filter tap
141    /// on the shared `FragmentBroadcasterRegistry` before any
142    /// ingest listener starts accepting traffic. The tap
143    /// observes every fragment flowing through every
144    /// broadcaster and drives
145    /// `lvqr_wasm_fragments_total{outcome=keep|drop}` counters;
146    /// in v1 it does NOT modify what downstream subscribers
147    /// receive (session-86 scope narrowing). Omit to disable.
148    pub wasm_filter: Option<PathBuf>,
149    /// Install the global Prometheus recorder. Must be `false` in tests
150    /// because `metrics-exporter-prometheus` panics on the second install
151    /// in a process. `main.rs` sets this to `true`.
152    pub install_prometheus: bool,
153    /// Pre-built OTLP metrics recorder handed off by
154    /// `lvqr_observability::init` when `LVQR_OTLP_ENDPOINT` is
155    /// set. When `Some`, `start()` installs it as the global
156    /// `metrics`-crate recorder -- either on its own or composed
157    /// with the Prometheus recorder via
158    /// `metrics_util::layers::FanoutBuilder` when
159    /// `install_prometheus` is also true. When `None`, only the
160    /// Prometheus path runs (legacy behavior).
161    pub otel_metrics_recorder: Option<lvqr_observability::OtelMetricsRecorder>,
162    /// Path to TLS certificate (PEM). Reserved; not consumed yet. The
163    /// relay auto-generates self-signed certs when unset.
164    pub tls_cert: Option<PathBuf>,
165    /// Path to TLS private key (PEM). Reserved; not consumed yet.
166    pub tls_key: Option<PathBuf>,
167    /// Optional cluster gossip bind address. When `Some`, `start()`
168    /// bootstraps an `lvqr_cluster::Cluster` on this address, wires
169    /// it into the admin router so `/api/v1/cluster/*` answers, and
170    /// installs an `OwnerResolver` on the HLS server so subscribers
171    /// hitting this node for a peer-owned broadcast receive a 302
172    /// pointing at the owner. `None` (default) disables clustering
173    /// and the node behaves as a standalone single-process server.
174    ///
175    /// Feature-gated on `cluster`; the field is present regardless
176    /// so `ServeConfig` stays ABI-stable across feature flips.
177    pub cluster_listen: Option<SocketAddr>,
178    /// Cluster peer seed addresses. Each entry is an `ip:port`
179    /// string the new node gossips to on boot. Ignored when
180    /// [`cluster_listen`](Self::cluster_listen) is `None`.
181    pub cluster_seeds: Vec<String>,
182    /// Cluster-node identifier. `None` auto-generates a random
183    /// `lvqr-<16 alphanumeric>` id at bootstrap.
184    pub cluster_node_id: Option<String>,
185    /// Cluster tag gossipped in every SYN. Chitchat rejects
186    /// cross-cluster gossip so two LVQR deployments on the same
187    /// subnet stay isolated. Empty string falls back to the
188    /// crate-default (`"lvqr"`).
189    pub cluster_id: Option<String>,
190    /// Externally-reachable HLS base URL this node advertises
191    /// (e.g. `"http://a.local:8888"`). When clustering is enabled,
192    /// `start()` writes this URL into the per-node `endpoints` KV
193    /// so peers redirecting subscribers know where to send them.
194    /// `None` skips the publish; peers will then 404 rather than
195    /// redirect for this node's broadcasts.
196    pub cluster_advertise_hls: Option<String>,
197    /// Externally-reachable DASH base URL this node advertises
198    /// (e.g. `"http://a.local:8888"`). Shape matches
199    /// [`cluster_advertise_hls`](Self::cluster_advertise_hls);
200    /// peers use this when composing a 302 `Location` for
201    /// `/dash/...` requests.
202    pub cluster_advertise_dash: Option<String>,
203    /// Externally-reachable RTSP base URL this node advertises
204    /// (e.g. `"rtsp://a.local:8554"`). Used by the RTSP 302
205    /// redirect-to-owner path on DESCRIBE / PLAY for peer-owned
206    /// broadcasts.
207    pub cluster_advertise_rtsp: Option<String>,
208}
209
210impl ServeConfig {
211    /// Minimal loopback config for tests: every listener on `127.0.0.1:0`,
212    /// open access, no recording, no Prometheus install.
213    pub fn loopback_ephemeral() -> Self {
214        let loopback: std::net::IpAddr = std::net::Ipv4Addr::LOCALHOST.into();
215        Self {
216            relay_addr: (loopback, 0).into(),
217            rtmp_addr: (loopback, 0).into(),
218            admin_addr: (loopback, 0).into(),
219            hls_addr: Some((loopback, 0).into()),
220            hls_dvr_window_secs: 120,
221            hls_target_duration_secs: 2,
222            hls_part_target_ms: 200,
223            whep_addr: None,
224            whip_addr: None,
225            dash_addr: None,
226            rtsp_addr: None,
227            srt_addr: None,
228            mesh_enabled: false,
229            max_peers: 3,
230            auth: None,
231            record_dir: None,
232            archive_dir: None,
233            #[cfg(feature = "c2pa")]
234            c2pa: None,
235            wasm_filter: None,
236            install_prometheus: false,
237            otel_metrics_recorder: None,
238            tls_cert: None,
239            tls_key: None,
240            cluster_listen: None,
241            cluster_seeds: Vec::new(),
242            cluster_node_id: None,
243            cluster_id: None,
244            cluster_advertise_hls: None,
245            cluster_advertise_dash: None,
246            cluster_advertise_rtsp: None,
247        }
248    }
249}
250
251/// Handle to a running LVQR server. Dropping the handle cancels the shared
252/// shutdown token but does not block on subsystem drain; call [`shutdown`]
253/// explicitly in tests that need deterministic teardown before the next
254/// fixture starts.
255///
256/// [`shutdown`]: ServerHandle::shutdown
257pub struct ServerHandle {
258    relay_addr: SocketAddr,
259    rtmp_addr: SocketAddr,
260    admin_addr: SocketAddr,
261    hls_addr: Option<SocketAddr>,
262    whep_addr: Option<SocketAddr>,
263    whip_addr: Option<SocketAddr>,
264    dash_addr: Option<SocketAddr>,
265    rtsp_addr: Option<SocketAddr>,
266    srt_addr: Option<SocketAddr>,
267    shutdown: CancellationToken,
268    join: Option<tokio::task::JoinHandle<()>>,
269    /// Cluster handle kept alive for the server's lifetime when
270    /// clustering is configured. Feature-gated; `None` when the
271    /// `cluster` feature is on but `ServeConfig::cluster_listen` is
272    /// `None`, and absent entirely when the feature is off.
273    #[cfg(feature = "cluster")]
274    cluster: Option<std::sync::Arc<lvqr_cluster::Cluster>>,
275    /// WASM fragment-filter tap handle kept alive for the
276    /// server's lifetime when `--wasm-filter` is configured.
277    /// Tests read fragment counters off this handle to assert
278    /// the filter actually saw the ingested broadcasts.
279    wasm_filter: Option<lvqr_wasm::WasmFilterBridgeHandle>,
280    /// WASM filter hot-reload watcher kept alive for the
281    /// server's lifetime when `--wasm-filter` is configured.
282    /// On every debounced change to the module path, the
283    /// reloader recompiles the module and swaps it into the
284    /// shared filter; in-flight `apply` calls complete on the
285    /// old module, subsequent calls see the new one. Unused
286    /// directly; held for its `Drop` side effect of stopping
287    /// the watcher thread on shutdown.
288    _wasm_reloader: Option<lvqr_wasm::WasmFilterReloader>,
289}
290
291impl ServerHandle {
292    /// Bound QUIC/MoQ relay address.
293    pub fn relay_addr(&self) -> SocketAddr {
294        self.relay_addr
295    }
296
297    /// Bound RTMP ingest address.
298    pub fn rtmp_addr(&self) -> SocketAddr {
299        self.rtmp_addr
300    }
301
302    /// Bound admin HTTP (and WS relay/ingest) address.
303    pub fn admin_addr(&self) -> SocketAddr {
304        self.admin_addr
305    }
306
307    /// Bound LL-HLS HTTP address, when HLS composition is enabled.
308    pub fn hls_addr(&self) -> Option<SocketAddr> {
309        self.hls_addr
310    }
311
312    /// Bound WHEP HTTP address, when WHEP egress is enabled.
313    pub fn whep_addr(&self) -> Option<SocketAddr> {
314        self.whep_addr
315    }
316
317    /// Bound WHIP HTTP address, when WHIP ingest is enabled.
318    pub fn whip_addr(&self) -> Option<SocketAddr> {
319        self.whip_addr
320    }
321
322    /// Bound MPEG-DASH HTTP address, when DASH egress is enabled.
323    pub fn dash_addr(&self) -> Option<SocketAddr> {
324        self.dash_addr
325    }
326
327    /// Bound RTSP ingest TCP address, when RTSP ingest is enabled.
328    pub fn rtsp_addr(&self) -> Option<SocketAddr> {
329        self.rtsp_addr
330    }
331
332    /// Bound SRT ingest UDP address, when SRT ingest is enabled.
333    pub fn srt_addr(&self) -> Option<SocketAddr> {
334        self.srt_addr
335    }
336
337    /// Cluster handle backing this server, when
338    /// [`ServeConfig::cluster_listen`] was `Some` at `start()`
339    /// time. Returns `None` for single-node servers. Callers
340    /// typically drive the handle to claim broadcasts or inspect
341    /// membership; the `shutdown()` method on this crate's
342    /// `ServerHandle` already tears the cluster down gracefully.
343    #[cfg(feature = "cluster")]
344    pub fn cluster(&self) -> Option<&std::sync::Arc<lvqr_cluster::Cluster>> {
345        self.cluster.as_ref()
346    }
347
348    /// HTTP URL pointing at a path on the DASH surface, e.g.
349    /// `dash_url("/dash/live/test/manifest.mpd")`. Returns `None`
350    /// when DASH is not enabled.
351    pub fn dash_url(&self, path: &str) -> Option<String> {
352        let addr = self.dash_addr?;
353        let path = if path.starts_with('/') {
354            path.to_string()
355        } else {
356            format!("/{path}")
357        };
358        Some(format!("http://{addr}{path}"))
359    }
360
361    /// HTTP base URL for the admin / WS surface.
362    pub fn http_base(&self) -> String {
363        format!("http://{}", self.admin_addr)
364    }
365
366    /// HTTP URL pointing at a path on the LL-HLS surface, e.g.
367    /// `hls_url("/playlist.m3u8")`. Returns `None` when HLS is not
368    /// enabled.
369    pub fn hls_url(&self, path: &str) -> Option<String> {
370        let addr = self.hls_addr?;
371        let path = if path.starts_with('/') {
372            path.to_string()
373        } else {
374            format!("/{path}")
375        };
376        Some(format!("http://{addr}{path}"))
377    }
378
379    /// Construct the WebSocket subscribe URL for a broadcast.
380    pub fn ws_url(&self, broadcast: &str) -> String {
381        format!("ws://{}/ws/{broadcast}", self.admin_addr)
382    }
383
384    /// Construct the WebSocket ingest URL for a broadcast.
385    pub fn ws_ingest_url(&self, broadcast: &str) -> String {
386        format!("ws://{}/ingest/{broadcast}", self.admin_addr)
387    }
388
389    /// Construct the RTMP publish URL for an app + stream key.
390    pub fn rtmp_url(&self, app: &str, stream_key: &str) -> String {
391        format!("rtmp://{}/{app}/{stream_key}", self.rtmp_addr)
392    }
393
394    /// Snapshot of per-`(broadcast, track)` WASM filter tap
395    /// counters. Returns `None` when `--wasm-filter` is not set.
396    /// Tests read this after an RTMP publish completes to assert
397    /// the filter actually observed the broadcast.
398    pub fn wasm_filter(&self) -> Option<&lvqr_wasm::WasmFilterBridgeHandle> {
399        self.wasm_filter.as_ref()
400    }
401
402    /// Trigger graceful shutdown and wait for every subsystem to drain.
403    pub async fn shutdown(mut self) -> Result<()> {
404        self.shutdown.cancel();
405        if let Some(join) = self.join.take()
406            && let Err(e) = join.await
407            && !e.is_cancelled()
408        {
409            return Err(anyhow::anyhow!("server task panicked: {e}"));
410        }
411        Ok(())
412    }
413}
414
415impl Drop for ServerHandle {
416    fn drop(&mut self) {
417        // Best-effort: signal shutdown so the background tasks wind down
418        // even if the caller forgot to `.shutdown().await`. We cannot block
419        // on the join handle from a sync drop inside an async runtime
420        // without risking a deadlock, so we just cancel and return.
421        self.shutdown.cancel();
422        if let Some(join) = self.join.take() {
423            join.abort();
424        }
425    }
426}
427
428/// Start a full-stack LVQR server. All listeners are bound before the
429/// function returns, so the [`ServerHandle`] immediately reports real
430/// addresses even when the config requested ephemeral ports.
431///
432/// The returned handle owns a background task that runs the relay, RTMP,
433/// and admin subsystems under a shared cancellation token. Use
434/// [`ServerHandle::shutdown`] for deterministic teardown.
435pub async fn start(config: ServeConfig) -> Result<ServerHandle> {
436    tracing::info!(
437        relay = %config.relay_addr,
438        rtmp = %config.rtmp_addr,
439        admin = %config.admin_addr,
440        mesh = config.mesh_enabled,
441        "starting LVQR server"
442    );
443
444    // Metrics recorder install. Process-wide, must be skipped in
445    // tests (all four permutations below call
446    // `metrics::set_global_recorder` which panics or errors on
447    // second install). The four cases are:
448    //   Prom + OTel:   install a FanoutBuilder of both.
449    //   Prom only:     install the Prometheus recorder (legacy).
450    //   OTel only:     install the OTel-forwarding recorder.
451    //   Neither:       install nothing; metrics calls are no-ops.
452    // The `PrometheusRecorder` handle is exposed on
453    // `ServerHandle` for the admin `/metrics` scrape route, so
454    // we always capture it before handing the recorder off to a
455    // Fanout layer.
456    let prom_handle = match (config.install_prometheus, config.otel_metrics_recorder.clone()) {
457        (true, Some(otel_recorder)) => {
458            let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
459            let handle = prom_recorder.handle();
460            let fanout = metrics_util::layers::FanoutBuilder::default()
461                .add_recorder(prom_recorder)
462                .add_recorder(otel_recorder)
463                .build();
464            metrics::set_global_recorder(fanout)
465                .map_err(|e| anyhow::anyhow!("failed to install metrics fanout recorder: {e}"))?;
466            Some(handle)
467        }
468        (true, None) => Some(
469            metrics_exporter_prometheus::PrometheusBuilder::new()
470                .install_recorder()
471                .map_err(|e| anyhow::anyhow!("failed to install Prometheus recorder: {e}"))?,
472        ),
473        (false, Some(otel_recorder)) => {
474            metrics::set_global_recorder(otel_recorder)
475                .map_err(|e| anyhow::anyhow!("failed to install OTLP metrics recorder: {e}"))?;
476            None
477        }
478        (false, None) => None,
479    };
480
481    let shutdown = CancellationToken::new();
482
483    // Auth provider: caller-provided, or fall back to open access.
484    let auth: SharedAuth = config
485        .auth
486        .clone()
487        .unwrap_or_else(|| Arc::new(NoopAuthProvider) as SharedAuth);
488
489    // Shared lifecycle bus: bridge and WS ingest emit
490    // BroadcastStarted/Stopped here; recorder subscribes to it.
491    let events = EventBus::default();
492
493    // MoQ relay: init_server() binds the QUIC socket and reports the real
494    // bound address, which we surface back through ServerHandle.
495    let relay_config = lvqr_relay::RelayConfig::new(config.relay_addr);
496    let mut relay = lvqr_relay::RelayServer::new(relay_config);
497    relay.set_auth_provider(auth.clone());
498    let (mut moq_server, relay_bound) = relay.init_server()?;
499    tracing::info!(addr = %relay_bound, "MoQ relay bound");
500
501    // Optional cluster bootstrap. Resolver for `MultiHlsServer` is
502    // built up-front so the HLS constructor below can install it in
503    // one shot instead of patching the server after the fact.
504    #[cfg(feature = "cluster")]
505    let cluster = if let Some(listen) = config.cluster_listen {
506        let ccfg = lvqr_cluster::ClusterConfig {
507            listen,
508            seeds: config.cluster_seeds.clone(),
509            node_id: config.cluster_node_id.clone().map(lvqr_cluster::NodeId::new),
510            cluster_id: config
511                .cluster_id
512                .clone()
513                .unwrap_or_else(|| lvqr_cluster::ClusterConfig::default().cluster_id),
514            ..lvqr_cluster::ClusterConfig::default()
515        };
516        let c = lvqr_cluster::Cluster::bootstrap(ccfg)
517            .await
518            .map_err(|e| anyhow::anyhow!("cluster bootstrap failed: {e}"))?;
519        let c = std::sync::Arc::new(c);
520        if config.cluster_advertise_hls.is_some()
521            || config.cluster_advertise_dash.is_some()
522            || config.cluster_advertise_rtsp.is_some()
523        {
524            let endpoints = lvqr_cluster::NodeEndpoints {
525                hls: config.cluster_advertise_hls.clone(),
526                dash: config.cluster_advertise_dash.clone(),
527                rtsp: config.cluster_advertise_rtsp.clone(),
528            };
529            c.set_endpoints(&endpoints)
530                .await
531                .map_err(|e| anyhow::anyhow!("cluster set_endpoints failed: {e}"))?;
532        }
533        tracing::info!(
534            node = %c.self_id(),
535            %listen,
536            advertise_hls = ?config.cluster_advertise_hls,
537            advertise_dash = ?config.cluster_advertise_dash,
538            advertise_rtsp = ?config.cluster_advertise_rtsp,
539            "cluster enabled"
540        );
541        Some(c)
542    } else {
543        None
544    };
545    #[cfg(feature = "cluster")]
546    let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = cluster.as_ref().map(|c| {
547        let c = c.clone();
548        let resolver: lvqr_hls::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
549            let c = c.clone();
550            Box::pin(async move {
551                let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
552                endpoints.hls
553            })
554        });
555        resolver
556    });
557    #[cfg(not(feature = "cluster"))]
558    let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = None;
559    #[cfg(feature = "cluster")]
560    let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = cluster.as_ref().map(|c| {
561        let c = c.clone();
562        let resolver: lvqr_dash::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
563            let c = c.clone();
564            Box::pin(async move {
565                let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
566                endpoints.dash
567            })
568        });
569        resolver
570    });
571    #[cfg(not(feature = "cluster"))]
572    let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = None;
573    #[cfg(feature = "cluster")]
574    let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = cluster.as_ref().map(|c| {
575        let c = c.clone();
576        let resolver: lvqr_rtsp::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
577            let c = c.clone();
578            Box::pin(async move {
579                let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
580                endpoints.rtsp
581            })
582        });
583        resolver
584    });
585    #[cfg(not(feature = "cluster"))]
586    let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = None;
587
588    // Optional multi-broadcast LL-HLS server. The broadcaster-native
589    // HLS bridge (installed below) subscribes on the shared registry
590    // and pumps fragments into the shared `MultiHlsServer` state.
591    // Each broadcast gets its own per-broadcast `HlsServer` on first
592    // publish; the axum router demultiplexes requests under
593    // `/hls/{broadcast}/...`.
594    //
595    // When clustering is enabled, an `OwnerResolver` redirects
596    // subscribers of peer-owned broadcasts to the owning node's HLS
597    // URL instead of returning 404.
598    let target_dur = config.hls_target_duration_secs;
599    let part_target_secs = config.hls_part_target_ms as f32 / 1000.0;
600    let max_segments = if config.hls_dvr_window_secs == 0 || target_dur == 0 {
601        None
602    } else {
603        Some((config.hls_dvr_window_secs / target_dur) as usize)
604    };
605    let hls_server = config.hls_addr.map(|_| {
606        let playlist_cfg = PlaylistBuilderConfig {
607            target_duration_secs: target_dur,
608            part_target_secs,
609            max_segments,
610            ..PlaylistBuilderConfig::default()
611        };
612        match hls_owner_resolver.clone() {
613            Some(r) => MultiHlsServer::with_owner_resolver(playlist_cfg, r),
614            None => MultiHlsServer::new(playlist_cfg),
615        }
616    });
617
618    // Optional multi-broadcast MPEG-DASH server. Sibling of the
619    // LL-HLS fan-out above: a single `MultiDashServer` subscribes
620    // on the shared registry and projects fragments onto a
621    // per-broadcast axum router mounted under `/dash/{broadcast}/...`.
622    // Every ingest protocol (RTMP, WHIP, SRT, RTSP) feeds DASH via
623    // the same `BroadcasterDashBridge` install below.
624    let dash_server = config.dash_addr.map(|_| match dash_owner_resolver.clone() {
625        Some(r) => lvqr_dash::MultiDashServer::with_owner_resolver(DashConfig::default(), r),
626        None => lvqr_dash::MultiDashServer::new(DashConfig::default()),
627    });
628
629    // Shared FragmentBroadcasterRegistry used by every ingest crate
630    // and every consumer. Session 60 completed the Tier 2.1 migration:
631    // every ingest protocol publishes to this one registry, and every
632    // consumer (archive, LL-HLS, DASH) installs an on_entry_created
633    // callback against it.
634    let shared_registry = FragmentBroadcasterRegistry::new();
635
636    // Auto-claim every new broadcast against the cluster so peers
637    // redirect correctly without the operator having to call
638    // `Cluster::claim_broadcast` by hand. The bridge holds the
639    // `Claim` alive until every ingest publisher for that
640    // broadcast disconnects. Feature-gated; no-op when
641    // single-node.
642    #[cfg(feature = "cluster")]
643    if let Some(ref c) = cluster {
644        cluster_claim::install_cluster_claim_bridge(c.clone(), cluster_claim::DEFAULT_CLAIM_LEASE, &shared_registry);
645    }
646
647    // Optional WASM fragment filter tap. Installed BEFORE any
648    // ingest listener accepts traffic so the very first fragment
649    // of the first broadcast flows through the filter. The
650    // reloader watches the module path for changes and calls
651    // `SharedFilter::replace` atomically when the file changes;
652    // in-flight fragments finish on the old module and the next
653    // fragment sees the new one.
654    let (wasm_filter_handle, wasm_reloader_handle) = if let Some(ref path) = config.wasm_filter {
655        let filter = lvqr_wasm::WasmFilter::load(path)
656            .map_err(|e| anyhow::anyhow!("WASM filter load at {} failed: {e}", path.display()))?;
657        tracing::info!(path = %path.display(), "WASM fragment filter loaded");
658        let shared = lvqr_wasm::SharedFilter::new(filter);
659        let bridge = lvqr_wasm::install_wasm_filter_bridge(&shared_registry, shared.clone());
660        let reloader = lvqr_wasm::WasmFilterReloader::spawn(path, shared)
661            .map_err(|e| anyhow::anyhow!("WASM filter hot-reload watcher at {} failed: {e}", path.display()))?;
662        (Some(bridge), Some(reloader))
663    } else {
664        (None, None)
665    };
666
667    // RTMP ingest bridged to MoQ. Pre-bind the TCP listener so we can
668    // report the real bound port (for ephemeral-port test setups).
669    let mut bridge_builder = lvqr_ingest::RtmpMoqBridge::with_auth(relay.origin().clone(), auth.clone())
670        .with_events(events.clone())
671        .with_registry(shared_registry.clone());
672
673    // Optional DVR archive index. Opened before the bridge is frozen
674    // so the BroadcasterArchiveIndexer can install its on_entry_created
675    // callback on the shared registry. The index file lives at
676    // `<archive_dir>/archive.redb`; the directory is created on
677    // demand if it does not already exist.
678    let archive_index = if let Some(ref dir) = config.archive_dir {
679        std::fs::create_dir_all(dir)
680            .map_err(|e| anyhow::anyhow!("archive: failed to create {}: {e}", dir.display()))?;
681        let db_path = dir.join("archive.redb");
682        let index = lvqr_archive::RedbSegmentIndex::open(&db_path)
683            .map_err(|e| anyhow::anyhow!("archive: failed to open {}: {e}", db_path.display()))?;
684        tracing::info!(dir = %dir.display(), "DVR archive index enabled");
685        Some((dir.clone(), Arc::new(index)))
686    } else {
687        None
688    };
689
690    // Install the broadcaster-based archive indexer on the shared
691    // registry. Every subsequent ingest-side emit is drained to disk +
692    // redb by a per-broadcaster tokio task the indexer spawns.
693    if let Some((ref dir, ref index)) = archive_index {
694        #[cfg(feature = "c2pa")]
695        BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry, config.c2pa.clone());
696        #[cfg(not(feature = "c2pa"))]
697        BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry);
698    }
699
700    // Install the broadcaster-based LL-HLS composition bridge on the
701    // shared registry. Every ingest crate's first `publish_init` for a
702    // `(broadcast, track)` pair fires the callback; the callback
703    // subscribes and spawns a drain task that projects fragments onto
704    // the shared `MultiHlsServer`. Session 60 consumer-side switchover:
705    // replaces the FragmentObserver path the HLS bridge used through
706    // session 59.
707    if let Some(ref hls) = hls_server {
708        BroadcasterHlsBridge::install(
709            hls.clone(),
710            config.hls_target_duration_secs * 1000,
711            config.hls_part_target_ms,
712            &shared_registry,
713        );
714    }
715
716    // Install the broadcaster-based DASH composition bridge. Same
717    // pattern as LL-HLS: the callback spawns a drain task per
718    // `(broadcast, track)` that stamps a monotonic `$Number$` counter
719    // onto every observed fragment and pushes it into the per-broadcast
720    // `DashServer`. Session 60: completes the consumer-side switchover.
721    if let Some(ref dash) = dash_server {
722        BroadcasterDashBridge::install(dash.clone(), &shared_registry);
723    }
724
725    // Optional WHEP surface. Constructed before the bridge is
726    // frozen into an `Arc` so we can attach the `WhepServer` as a
727    // `RawSampleObserver`; both the observer clone and the axum
728    // router clone share the same underlying session registry, so
729    // a POST on the router is immediately visible to the raw-sample
730    // fanout path.
731    let whep_server = if let Some(addr) = config.whep_addr {
732        let str0m_cfg = lvqr_whep::Str0mConfig { host_ip: addr.ip() };
733        let answerer = Arc::new(lvqr_whep::Str0mAnswerer::new(str0m_cfg)) as Arc<dyn lvqr_whep::SdpAnswerer>;
734        let server = lvqr_whep::WhepServer::new(answerer);
735        let observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
736        bridge_builder = bridge_builder.with_raw_sample_observer(observer);
737        Some(server)
738    } else {
739        None
740    };
741
742    // Optional WHIP ingest surface. The bridge side is a sibling
743    // of `RtmpMoqBridge`: it owns its own `BroadcastProducer` state
744    // but publishes fragments onto the same shared registry, so every
745    // existing egress (MoQ, LL-HLS, DASH, disk record, DVR archive)
746    // picks up WHIP publishers with zero additional wiring.
747    let (whip_server, whip_bridge) = if let Some(addr) = config.whip_addr {
748        let mut whip_bridge = lvqr_whip::WhipMoqBridge::new(relay.origin().clone())
749            .with_events(events.clone())
750            .with_registry(shared_registry.clone());
751        if let Some(ref server) = whep_server {
752            let raw_observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
753            whip_bridge = whip_bridge.with_raw_sample_observer(raw_observer);
754        }
755        let whip_bridge_arc = Arc::new(whip_bridge);
756        let sink = whip_bridge_arc.clone() as Arc<dyn lvqr_whip::IngestSampleSink>;
757        let str0m_cfg = lvqr_whip::Str0mIngestConfig { host_ip: addr.ip() };
758        let answerer =
759            Arc::new(lvqr_whip::Str0mIngestAnswerer::new(str0m_cfg, sink)) as Arc<dyn lvqr_whip::SdpAnswerer>;
760        let server = lvqr_whip::WhipServer::with_auth_provider(answerer, auth.clone());
761        (Some(server), Some(whip_bridge_arc))
762    } else {
763        (None, None)
764    };
765
766    // Optional SRT ingest server. Publishes to the shared registry;
767    // every broadcaster-native consumer picks up SRT publishers
768    // automatically.
769    let (srt_server, srt_bound) = if let Some(addr) = config.srt_addr {
770        let mut server =
771            lvqr_srt::SrtIngestServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
772        let bound = server.bind().await?;
773        tracing::info!(addr = %bound, "SRT ingest bound");
774        (Some(server), Some(bound))
775    } else {
776        (None, None)
777    };
778    let srt_events_clone = events.clone();
779    let srt_shutdown_token = shutdown.clone();
780
781    // Optional RTSP ingest server. Publishes to the shared registry
782    // alongside every other ingest protocol. When clustering is
783    // enabled, the owner resolver redirects DESCRIBE / PLAY for
784    // peer-owned broadcasts with RTSP 302.
785    let (rtsp_server, rtsp_bound) = if let Some(addr) = config.rtsp_addr {
786        let mut server = lvqr_rtsp::RtspServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
787        if let Some(r) = rtsp_owner_resolver.clone() {
788            server = server.with_owner_resolver(r);
789        }
790        let bound = server.bind().await?;
791        tracing::info!(addr = %bound, "RTSP ingest bound");
792        (Some(server), Some(bound))
793    } else {
794        (None, None)
795    };
796    let rtsp_events_clone = events.clone();
797    let rtsp_shutdown_token = shutdown.clone();
798
799    let bridge = Arc::new(bridge_builder);
800    let rtmp_config = lvqr_ingest::RtmpConfig {
801        bind_addr: config.rtmp_addr,
802    };
803    let rtmp_server = bridge.create_rtmp_server(rtmp_config);
804    let rtmp_listener = tokio::net::TcpListener::bind(config.rtmp_addr).await?;
805    let rtmp_bound = rtmp_listener.local_addr()?;
806    tracing::info!(addr = %rtmp_bound, "RTMP ingest bound");
807
808    // Admin listener: pre-bind to capture the real port.
809    let admin_listener = tokio::net::TcpListener::bind(config.admin_addr).await?;
810    let admin_bound = admin_listener.local_addr()?;
811    tracing::info!(addr = %admin_bound, "admin HTTP bound");
812
813    // HLS listener: pre-bind so the test harness can read the
814    // ephemeral port back via `ServerHandle::hls_addr` immediately
815    // after `start()` returns.
816    let (hls_listener, hls_bound) = if let Some(addr) = config.hls_addr {
817        let listener = tokio::net::TcpListener::bind(addr).await?;
818        let bound = listener.local_addr()?;
819        tracing::info!(addr = %bound, "LL-HLS HTTP bound");
820        (Some(listener), Some(bound))
821    } else {
822        (None, None)
823    };
824
825    // WHEP listener: pre-bind the same way so test harnesses can
826    // read the ephemeral port back immediately. `whep_server` was
827    // built earlier and is `None` if `config.whep_addr` is `None`.
828    let (whep_listener, whep_bound) = if let Some(addr) = config.whep_addr {
829        let listener = tokio::net::TcpListener::bind(addr).await?;
830        let bound = listener.local_addr()?;
831        tracing::info!(addr = %bound, "WHEP HTTP bound");
832        (Some(listener), Some(bound))
833    } else {
834        (None, None)
835    };
836
837    // DASH listener: pre-bind so ephemeral-port test harnesses can
838    // read the real port back via `ServerHandle::dash_addr`
839    // immediately after `start()` returns.
840    let (dash_listener, dash_bound) = if let Some(addr) = config.dash_addr {
841        let listener = tokio::net::TcpListener::bind(addr).await?;
842        let bound = listener.local_addr()?;
843        tracing::info!(addr = %bound, "MPEG-DASH HTTP bound");
844        (Some(listener), Some(bound))
845    } else {
846        (None, None)
847    };
848
849    // WHIP listener: pre-bind for the same reason. Keeping the
850    // bridge arc alive for the lifetime of the server task is
851    // important: dropping it would tear down every active MoQ
852    // broadcast produced by a WHIP publisher.
853    let (whip_listener, whip_bound) = if let Some(addr) = config.whip_addr {
854        let listener = tokio::net::TcpListener::bind(addr).await?;
855        let bound = listener.local_addr()?;
856        tracing::info!(addr = %bound, "WHIP HTTP bound");
857        (Some(listener), Some(bound))
858    } else {
859        (None, None)
860    };
861
862    // Optional disk recorder.
863    if let Some(ref dir) = config.record_dir {
864        let recorder = lvqr_record::BroadcastRecorder::new(dir);
865        let origin = relay.origin().clone();
866        let event_rx = events.subscribe();
867        let record_shutdown = shutdown.clone();
868        tracing::info!(dir = %dir.display(), "recording enabled");
869        tokio::spawn(async move {
870            spawn_recordings(recorder, origin, event_rx, record_shutdown).await;
871        });
872    }
873
874    // HLS finalization subscriber: when a broadcaster disconnects
875    // the RTMP bridge emits BroadcastStopped, and this task calls
876    // MultiHlsServer::finalize_broadcast so the playlist gains
877    // EXT-X-ENDLIST and the retained window becomes a VOD surface.
878    if let Some(ref hls) = hls_server {
879        let hls_for_finalize = hls.clone();
880        let mut hls_event_rx = events.subscribe();
881        let hls_finalize_shutdown = shutdown.clone();
882        tokio::spawn(async move {
883            loop {
884                tokio::select! {
885                    _ = hls_finalize_shutdown.cancelled() => break,
886                    msg = hls_event_rx.recv() => {
887                        match msg {
888                            Ok(RelayEvent::BroadcastStopped { name }) => {
889                                tracing::info!(broadcast = %name, "finalizing HLS broadcast");
890                                hls_for_finalize.finalize_broadcast(&name).await;
891                            }
892                            Ok(_) => {}
893                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
894                                tracing::warn!(missed = n, "HLS finalize subscriber lagged");
895                            }
896                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
897                        }
898                    }
899                }
900            }
901        });
902    }
903
904    // DASH finalization subscriber: same pattern as HLS above.
905    // Switches the MPD from type="dynamic" to type="static" so
906    // DASH clients stop polling for new segments.
907    if let Some(ref dash) = dash_server {
908        let dash_for_finalize = dash.clone();
909        let mut dash_event_rx = events.subscribe();
910        let dash_finalize_shutdown = shutdown.clone();
911        tokio::spawn(async move {
912            loop {
913                tokio::select! {
914                    _ = dash_finalize_shutdown.cancelled() => break,
915                    msg = dash_event_rx.recv() => {
916                        match msg {
917                            Ok(RelayEvent::BroadcastStopped { name }) => {
918                                tracing::info!(broadcast = %name, "finalizing DASH broadcast");
919                                dash_for_finalize.finalize_broadcast(&name);
920                            }
921                            Ok(_) => {}
922                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
923                                tracing::warn!(missed = n, "DASH finalize subscriber lagged");
924                            }
925                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
926                        }
927                    }
928                }
929            }
930        });
931    }
932
933    // Admin HTTP state and router.
934    let metrics_state = relay.metrics().clone();
935    let bridge_for_stats = bridge.clone();
936    let bridge_for_streams = bridge.clone();
937
938    let admin_state = lvqr_admin::AdminState::new(
939        move || {
940            let active = bridge_for_stats.active_stream_count() as u64;
941            lvqr_core::RelayStats {
942                publishers: active,
943                tracks: active * 2,
944                subscribers: metrics_state.connections_active.load(Ordering::Relaxed),
945                bytes_received: 0,
946                bytes_sent: 0,
947                uptime_secs: 0,
948            }
949        },
950        move || {
951            bridge_for_streams
952                .stream_names()
953                .into_iter()
954                .map(|name| lvqr_admin::StreamInfo { name, subscribers: 0 })
955                .collect()
956        },
957    )
958    .with_auth(auth.clone());
959    let admin_state = if let Some(prom) = prom_handle {
960        admin_state.with_metrics(Arc::new(move || prom.render()))
961    } else {
962        admin_state
963    };
964    // Wire the cluster into `/api/v1/cluster/*`. Without this the
965    // feature-gated routes in `lvqr-admin` reply 500 with a
966    // "cluster not wired" message.
967    #[cfg(feature = "cluster")]
968    let admin_state = match cluster.as_ref() {
969        Some(c) => admin_state.with_cluster(c.clone()),
970        None => admin_state,
971    };
972
973    // WebSocket fMP4 relay + WebSocket ingest state.
974    let ws_state = WsRelayState {
975        origin: relay.origin().clone(),
976        init_segments: Arc::new(dashmap::DashMap::new()),
977        auth: auth.clone(),
978        events: events.clone(),
979    };
980    let ws_router = axum::Router::new()
981        .route("/ws/{*broadcast}", get(ws_relay_handler))
982        .route("/ingest/{*broadcast}", get(ws_ingest_handler))
983        .with_state(ws_state);
984
985    let combined_router = {
986        let admin_router = if config.mesh_enabled {
987            let mesh_config = lvqr_mesh::MeshConfig {
988                max_children: config.max_peers,
989                ..Default::default()
990            };
991            let mesh = Arc::new(lvqr_mesh::MeshCoordinator::new(mesh_config));
992
993            let mesh_for_cb = mesh.clone();
994            relay.set_connection_callback(Arc::new(move |conn_id, connected| {
995                let peer_id = format!("conn-{conn_id}");
996                if connected {
997                    match mesh_for_cb.add_peer(peer_id.clone(), "default".to_string()) {
998                        Ok(a) => {
999                            tracing::info!(peer = %peer_id, role = ?a.role, depth = a.depth, "mesh: peer assigned");
1000                        }
1001                        Err(e) => {
1002                            tracing::warn!(peer = %peer_id, error = ?e, "mesh: assign failed");
1003                        }
1004                    }
1005                } else {
1006                    let orphans = mesh_for_cb.remove_peer(&peer_id);
1007                    for orphan in orphans {
1008                        let _ = mesh_for_cb.reassign_peer(&orphan);
1009                    }
1010                }
1011            }));
1012
1013            let mesh_for_reaper = mesh.clone();
1014            let reaper_shutdown = shutdown.clone();
1015            tokio::spawn(async move {
1016                let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1017                loop {
1018                    tokio::select! {
1019                        _ = interval.tick() => {
1020                            let dead = mesh_for_reaper.find_dead_peers();
1021                            for peer_id in dead {
1022                                tracing::info!(peer = %peer_id, "mesh: removing dead peer");
1023                                let orphans = mesh_for_reaper.remove_peer(&peer_id);
1024                                for orphan in orphans {
1025                                    let _ = mesh_for_reaper.reassign_peer(&orphan);
1026                                }
1027                            }
1028                        }
1029                        _ = reaper_shutdown.cancelled() => {
1030                            tracing::debug!("mesh reaper shutting down");
1031                            break;
1032                        }
1033                    }
1034                }
1035            });
1036
1037            let mesh_for_signal = mesh.clone();
1038            let mut signal = lvqr_signal::SignalServer::new();
1039            signal.set_peer_callback(Arc::new(move |peer_id, track, connected| {
1040                if connected {
1041                    match mesh_for_signal.add_peer(peer_id.to_string(), track.to_string()) {
1042                        Ok(a) => {
1043                            tracing::info!(peer = %peer_id, role = ?a.role, depth = a.depth, "mesh: signal peer assigned");
1044                            Some(lvqr_signal::SignalMessage::AssignParent {
1045                                peer_id: peer_id.to_string(),
1046                                role: format!("{:?}", a.role),
1047                                parent_id: a.parent,
1048                                depth: a.depth,
1049                            })
1050                        }
1051                        Err(e) => {
1052                            tracing::warn!(peer = %peer_id, error = ?e, "mesh: signal assign failed");
1053                            None
1054                        }
1055                    }
1056                } else {
1057                    let orphans = mesh_for_signal.remove_peer(peer_id);
1058                    for orphan in orphans {
1059                        let _ = mesh_for_signal.reassign_peer(&orphan);
1060                    }
1061                    None
1062                }
1063            }));
1064
1065            let mesh_for_admin = mesh.clone();
1066            let admin_with_mesh = admin_state.with_mesh(move || lvqr_admin::MeshState {
1067                enabled: true,
1068                peer_count: mesh_for_admin.peer_count(),
1069                offload_percentage: mesh_for_admin.offload_percentage(),
1070            });
1071
1072            tracing::info!(
1073                max_children = config.max_peers,
1074                "peer mesh enabled (/signal endpoint active)"
1075            );
1076
1077            let router = lvqr_admin::build_router(admin_with_mesh);
1078            router.merge(signal.router())
1079        } else {
1080            lvqr_admin::build_router(admin_state)
1081        };
1082
1083        let combined = admin_router.merge(ws_router);
1084        let combined = if let Some((ref dir, ref index)) = archive_index {
1085            combined.merge(playback_router(dir.clone(), Arc::clone(index), auth.clone()))
1086        } else {
1087            combined
1088        };
1089        // Tier 4 item 4.3 session B3: feature-gated `/playback/verify/
1090        // {broadcast}` admin route. Mounted only when the `c2pa`
1091        // feature is on AND an archive directory is configured (the
1092        // verify route reads `<archive>/<broadcast>/<track>/
1093        // finalized.*` off disk, so an archive is a hard prerequisite).
1094        #[cfg(feature = "c2pa")]
1095        let combined = if let Some((ref dir, _)) = archive_index {
1096            combined.merge(verify_router(dir.clone(), auth.clone()))
1097        } else {
1098            combined
1099        };
1100        combined
1101    }
1102    .layer(CorsLayer::permissive());
1103
1104    // Spawn a single background task that joins relay + RTMP + admin and
1105    // signals the shared shutdown token if any subsystem exits early.
1106    let relay_shutdown = shutdown.clone();
1107    let rtmp_shutdown = shutdown.clone();
1108    let admin_shutdown = shutdown.clone();
1109    let hls_shutdown = shutdown.clone();
1110    let dash_shutdown = shutdown.clone();
1111    let whep_shutdown = shutdown.clone();
1112    let whip_shutdown = shutdown.clone();
1113    let bg_shutdown_for_task = shutdown.clone();
1114    let hls_router_pair =
1115        hls_listener.map(|listener| (listener, hls_server.expect("hls_server set when listener is set")));
1116    let dash_router_pair =
1117        dash_listener.map(|listener| (listener, dash_server.expect("dash_server set when listener is set")));
1118    let whep_router_pair =
1119        whep_listener.map(|listener| (listener, whep_server.expect("whep_server set when listener is set")));
1120    let whip_router_pair =
1121        whip_listener.map(|listener| (listener, whip_server.expect("whip_server set when listener is set")));
1122    // Moved into the spawned task below so it lives as long as
1123    // the WHIP poll loops; see `drop(_whip_bridge_keepalive)` at
1124    // the end of the join block.
1125    let whip_bridge_keepalive = whip_bridge;
1126
1127    let join = tokio::spawn(async move {
1128        let shutdown_on_exit_relay = bg_shutdown_for_task.clone();
1129        let relay_fut = async move {
1130            if let Err(e) = relay.accept_loop(&mut moq_server, relay_shutdown).await {
1131                tracing::error!(error = %e, "relay server error");
1132            }
1133            shutdown_on_exit_relay.cancel();
1134        };
1135
1136        let shutdown_on_exit_rtmp = bg_shutdown_for_task.clone();
1137        let rtmp_server_task = rtmp_server;
1138        let rtmp_fut = async move {
1139            if let Err(e) = rtmp_server_task.run_with_listener(rtmp_listener, rtmp_shutdown).await {
1140                tracing::error!(error = %e, "RTMP server error");
1141            }
1142            shutdown_on_exit_rtmp.cancel();
1143        };
1144
1145        let shutdown_on_exit_admin = bg_shutdown_for_task.clone();
1146        let admin_fut = async move {
1147            let result = axum::serve(admin_listener, combined_router)
1148                .with_graceful_shutdown(async move { admin_shutdown.cancelled().await })
1149                .await;
1150            if let Err(e) = &result {
1151                tracing::error!(error = %e, "admin server error");
1152            }
1153            shutdown_on_exit_admin.cancel();
1154        };
1155
1156        let shutdown_on_exit_hls = bg_shutdown_for_task.clone();
1157        let hls_fut = async move {
1158            let Some((listener, server)) = hls_router_pair else {
1159                return;
1160            };
1161            let router = server.router().layer(CorsLayer::permissive());
1162            let result = axum::serve(listener, router)
1163                .with_graceful_shutdown(async move { hls_shutdown.cancelled().await })
1164                .await;
1165            if let Err(e) = &result {
1166                tracing::error!(error = %e, "HLS server error");
1167            }
1168            shutdown_on_exit_hls.cancel();
1169        };
1170
1171        let shutdown_on_exit_dash = bg_shutdown_for_task.clone();
1172        let dash_fut = async move {
1173            let Some((listener, server)) = dash_router_pair else {
1174                return;
1175            };
1176            let router = server.router().layer(CorsLayer::permissive());
1177            let result = axum::serve(listener, router)
1178                .with_graceful_shutdown(async move { dash_shutdown.cancelled().await })
1179                .await;
1180            if let Err(e) = &result {
1181                tracing::error!(error = %e, "DASH server error");
1182            }
1183            shutdown_on_exit_dash.cancel();
1184        };
1185
1186        let shutdown_on_exit_whep = bg_shutdown_for_task.clone();
1187        let whep_fut = async move {
1188            let Some((listener, server)) = whep_router_pair else {
1189                return;
1190            };
1191            let router = lvqr_whep::router_for(server);
1192            let result = axum::serve(listener, router)
1193                .with_graceful_shutdown(async move { whep_shutdown.cancelled().await })
1194                .await;
1195            if let Err(e) = &result {
1196                tracing::error!(error = %e, "WHEP server error");
1197            }
1198            shutdown_on_exit_whep.cancel();
1199        };
1200
1201        let shutdown_on_exit_whip = bg_shutdown_for_task.clone();
1202        let whip_fut = async move {
1203            let Some((listener, server)) = whip_router_pair else {
1204                return;
1205            };
1206            let router = lvqr_whip::router_for(server);
1207            let result = axum::serve(listener, router)
1208                .with_graceful_shutdown(async move { whip_shutdown.cancelled().await })
1209                .await;
1210            if let Err(e) = &result {
1211                tracing::error!(error = %e, "WHIP server error");
1212            }
1213            shutdown_on_exit_whip.cancel();
1214        };
1215
1216        let srt_shutdown = bg_shutdown_for_task.clone();
1217        let srt_events = srt_events_clone;
1218        let srt_cancel = srt_shutdown_token;
1219        let srt_fut = async move {
1220            let Some(server) = srt_server else { return };
1221            if let Err(e) = server.run(srt_events, srt_cancel).await {
1222                tracing::error!(error = %e, "SRT server error");
1223            }
1224            srt_shutdown.cancel();
1225        };
1226
1227        let rtsp_shutdown = bg_shutdown_for_task.clone();
1228        let rtsp_events = rtsp_events_clone;
1229        let rtsp_cancel = rtsp_shutdown_token;
1230        let rtsp_fut = async move {
1231            let Some(server) = rtsp_server else { return };
1232            if let Err(e) = server.run(rtsp_events, rtsp_cancel).await {
1233                tracing::error!(error = %e, "RTSP server error");
1234            }
1235            rtsp_shutdown.cancel();
1236        };
1237
1238        let _ = tokio::join!(
1239            relay_fut, rtmp_fut, admin_fut, hls_fut, dash_fut, whep_fut, whip_fut, srt_fut, rtsp_fut
1240        );
1241        drop(whip_bridge_keepalive);
1242        tracing::info!("shutdown complete");
1243    });
1244
1245    Ok(ServerHandle {
1246        relay_addr: relay_bound,
1247        rtmp_addr: rtmp_bound,
1248        admin_addr: admin_bound,
1249        hls_addr: hls_bound,
1250        whep_addr: whep_bound,
1251        whip_addr: whip_bound,
1252        dash_addr: dash_bound,
1253        rtsp_addr: rtsp_bound,
1254        srt_addr: srt_bound,
1255        shutdown,
1256        join: Some(join),
1257        #[cfg(feature = "cluster")]
1258        cluster,
1259        wasm_filter: wasm_filter_handle,
1260        _wasm_reloader: wasm_reloader_handle,
1261    })
1262}
1263
1264// =====================================================================
1265// Internal WS relay + WS ingest handlers
1266// =====================================================================
1267
1268/// Shared state for WebSocket relay and ingest handlers.
1269#[derive(Clone)]
1270struct WsRelayState {
1271    origin: lvqr_moq::OriginProducer,
1272    /// Stored init segments per broadcast, so viewers get them immediately
1273    /// on connect.
1274    init_segments: Arc<dashmap::DashMap<String, Bytes>>,
1275    /// Authentication provider applied to WS subscribe and ingest sessions.
1276    auth: SharedAuth,
1277    /// Lifecycle event bus.
1278    events: EventBus,
1279}
1280
1281async fn ws_relay_handler(
1282    ws: WebSocketUpgrade,
1283    State(state): State<WsRelayState>,
1284    Path(broadcast): Path<String>,
1285    Query(params): Query<HashMap<String, String>>,
1286    headers: HeaderMap,
1287) -> Response {
1288    tracing::info!(broadcast = %broadcast, "WebSocket relay request");
1289    let resolved = resolve_ws_token(&headers, &params, "ws_subscribe");
1290    let decision = state.auth.check(&AuthContext::Subscribe {
1291        token: resolved.token,
1292        broadcast: broadcast.clone(),
1293    });
1294    if let AuthDecision::Deny { reason } = decision {
1295        tracing::warn!(broadcast = %broadcast, reason = %reason, "WS relay denied");
1296        metrics::counter!("lvqr_auth_failures_total", "entry" => "ws").increment(1);
1297        return (StatusCode::UNAUTHORIZED, reason).into_response();
1298    }
1299    metrics::counter!("lvqr_ws_connections_total", "direction" => "subscribe").increment(1);
1300    let ws = match resolved.offered_subprotocol {
1301        Some(ref p) => ws.protocols(std::iter::once(p.clone())),
1302        None => ws,
1303    };
1304    ws.on_upgrade(move |socket| ws_relay_session(socket, state, broadcast))
1305        .into_response()
1306}
1307
1308async fn ws_relay_session(mut socket: WebSocket, state: WsRelayState, broadcast: String) {
1309    let consumer = state.origin.consume();
1310    let Some(bc) = consumer.consume_broadcast(&broadcast) else {
1311        tracing::warn!(broadcast = %broadcast, "broadcast not found for WS relay");
1312        let _ = socket
1313            .send(Message::Close(Some(axum::extract::ws::CloseFrame {
1314                code: 4404,
1315                reason: "broadcast not found".into(),
1316            })))
1317            .await;
1318        return;
1319    };
1320
1321    tracing::info!(broadcast = %broadcast, "WS relay session started");
1322
1323    let video_track = bc.subscribe_track(&Track::new("0.mp4")).ok();
1324    let audio_track = bc.subscribe_track(&Track::new("1.mp4")).ok();
1325
1326    if video_track.is_none() && audio_track.is_none() {
1327        tracing::warn!(broadcast = %broadcast, "no playable tracks for WS relay");
1328        return;
1329    }
1330
1331    let (tx, mut rx) = tokio::sync::mpsc::channel::<(u8, Bytes)>(64);
1332    let cancel = CancellationToken::new();
1333
1334    if let Some(track) = video_track {
1335        let tx = tx.clone();
1336        let cancel = cancel.clone();
1337        tokio::spawn(async move {
1338            relay_track(track, 0u8, tx, cancel).await;
1339        });
1340    }
1341    if let Some(track) = audio_track {
1342        let tx = tx.clone();
1343        let cancel = cancel.clone();
1344        tokio::spawn(async move {
1345            relay_track(track, 1u8, tx, cancel).await;
1346        });
1347    }
1348    drop(tx);
1349
1350    while let Some((track_id, payload)) = rx.recv().await {
1351        let mut framed = Vec::with_capacity(1 + payload.len());
1352        framed.push(track_id);
1353        framed.extend_from_slice(&payload);
1354        let len = framed.len() as u64;
1355        if let Err(e) = socket.send(Message::Binary(framed.into())).await {
1356            tracing::debug!(error = ?e, "WS send error");
1357            break;
1358        }
1359        metrics::counter!("lvqr_frames_relayed_total", "transport" => "ws").increment(1);
1360        metrics::counter!("lvqr_bytes_relayed_total", "transport" => "ws").increment(len);
1361    }
1362
1363    cancel.cancel();
1364    tracing::info!(broadcast = %broadcast, "WS relay session ended");
1365}
1366
1367async fn relay_track(
1368    mut track: lvqr_moq::TrackConsumer,
1369    track_id: u8,
1370    tx: tokio::sync::mpsc::Sender<(u8, Bytes)>,
1371    cancel: CancellationToken,
1372) {
1373    loop {
1374        let group = tokio::select! {
1375            res = track.next_group() => res,
1376            _ = cancel.cancelled() => return,
1377        };
1378        let mut group = match group {
1379            Ok(Some(g)) => g,
1380            Ok(None) => return,
1381            Err(e) => {
1382                tracing::debug!(track_id, error = ?e, "track error");
1383                return;
1384            }
1385        };
1386        loop {
1387            let frame = tokio::select! {
1388                res = group.read_frame() => res,
1389                _ = cancel.cancelled() => return,
1390            };
1391            match frame {
1392                Ok(Some(bytes)) => {
1393                    if tx.send((track_id, bytes)).await.is_err() {
1394                        return;
1395                    }
1396                }
1397                Ok(None) => break,
1398                Err(e) => {
1399                    tracing::debug!(track_id, error = ?e, "group read error");
1400                    return;
1401                }
1402            }
1403        }
1404    }
1405}
1406
1407async fn ws_ingest_handler(
1408    ws: WebSocketUpgrade,
1409    State(state): State<WsRelayState>,
1410    Path(broadcast): Path<String>,
1411    Query(params): Query<HashMap<String, String>>,
1412    headers: HeaderMap,
1413) -> Response {
1414    tracing::info!(broadcast = %broadcast, "WebSocket ingest request");
1415    let resolved = resolve_ws_token(&headers, &params, "ws_ingest");
1416    let decision = state
1417        .auth
1418        .check(&extract::extract_ws_ingest(resolved.token.as_deref(), &broadcast));
1419    if let AuthDecision::Deny { reason } = decision {
1420        tracing::warn!(broadcast = %broadcast, reason = %reason, "WS ingest denied");
1421        metrics::counter!("lvqr_auth_failures_total", "entry" => "ws_ingest").increment(1);
1422        return (StatusCode::UNAUTHORIZED, reason).into_response();
1423    }
1424    metrics::counter!("lvqr_ws_connections_total", "direction" => "publish").increment(1);
1425    let ws = match resolved.offered_subprotocol {
1426        Some(ref p) => ws.protocols(std::iter::once(p.clone())),
1427        None => ws,
1428    };
1429    ws.on_upgrade(move |socket| ws_ingest_session(socket, state, broadcast))
1430        .into_response()
1431}
1432
1433/// Result of extracting a bearer token from a WebSocket upgrade request.
1434///
1435/// The preferred transport is the `Sec-WebSocket-Protocol` header with a
1436/// value of `lvqr.bearer.<token>`. When the client offers that, the
1437/// matching subprotocol string is echoed back so axum's upgrade handshake
1438/// accepts it. The legacy `?token=` query parameter is still accepted as
1439/// a fallback but logs a deprecation warning.
1440struct WsTokenResolution {
1441    token: Option<String>,
1442    offered_subprotocol: Option<String>,
1443}
1444
1445fn resolve_ws_token(headers: &HeaderMap, params: &HashMap<String, String>, entry: &'static str) -> WsTokenResolution {
1446    if let Some(hv) = headers.get("sec-websocket-protocol")
1447        && let Ok(raw) = hv.to_str()
1448    {
1449        for item in raw.split(',') {
1450            let proto = item.trim();
1451            if let Some(tok) = proto.strip_prefix("lvqr.bearer.")
1452                && !tok.is_empty()
1453            {
1454                return WsTokenResolution {
1455                    token: Some(tok.to_string()),
1456                    offered_subprotocol: Some(proto.to_string()),
1457                };
1458            }
1459        }
1460    }
1461
1462    if let Some(tok) = params.get("token").filter(|t| !t.is_empty()) {
1463        tracing::warn!(
1464            entry = entry,
1465            "deprecated: ?token= query parameter; migrate to Sec-WebSocket-Protocol: lvqr.bearer.<token>"
1466        );
1467        return WsTokenResolution {
1468            token: Some(tok.clone()),
1469            offered_subprotocol: None,
1470        };
1471    }
1472
1473    WsTokenResolution {
1474        token: None,
1475        offered_subprotocol: None,
1476    }
1477}
1478
1479async fn ws_ingest_session(mut socket: WebSocket, state: WsRelayState, broadcast: String) {
1480    use lvqr_ingest::remux;
1481
1482    tracing::info!(broadcast = %broadcast, "WS ingest session started");
1483
1484    let Some(mut bc) = state.origin.create_broadcast(&broadcast) else {
1485        tracing::warn!(broadcast = %broadcast, "broadcast creation failed");
1486        let _ = socket
1487            .send(Message::Close(Some(axum::extract::ws::CloseFrame {
1488                code: 4409,
1489                reason: "broadcast already exists".into(),
1490            })))
1491            .await;
1492        return;
1493    };
1494
1495    state.events.emit(RelayEvent::BroadcastStarted {
1496        name: broadcast.clone(),
1497    });
1498
1499    let Ok(mut video_track) = bc.create_track(Track::new("0.mp4")) else {
1500        tracing::warn!("failed to create video track");
1501        return;
1502    };
1503    let Ok(mut catalog_track) = bc.create_track(Track::new(".catalog")) else {
1504        tracing::warn!("failed to create catalog track");
1505        return;
1506    };
1507
1508    let mut _video_config: Option<remux::VideoConfig> = None;
1509    let mut video_init: Option<Bytes> = None;
1510    let mut video_group: Option<lvqr_moq::GroupProducer> = None;
1511    let mut video_seq: u32 = 0;
1512    let _ = socket.send(Message::Text(r#"{"status":"ready"}"#.into())).await;
1513
1514    while let Some(msg) = socket.recv().await {
1515        let data = match msg {
1516            Ok(Message::Binary(data)) => data,
1517            Ok(Message::Close(_)) => break,
1518            Ok(_) => continue,
1519            Err(e) => {
1520                tracing::debug!(error = ?e, "WS ingest recv error");
1521                break;
1522            }
1523        };
1524
1525        if data.len() < 5 {
1526            continue;
1527        }
1528
1529        let msg_type = data[0];
1530        let timestamp = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1531        let payload = Bytes::from(data[5..].to_vec());
1532
1533        match msg_type {
1534            0 => {
1535                if payload.len() < 6 {
1536                    continue;
1537                }
1538                let vid_width = u16::from_be_bytes([payload[0], payload[1]]);
1539                let vid_height = u16::from_be_bytes([payload[2], payload[3]]);
1540                let avcc_data = &payload[4..];
1541
1542                match parse_avcc_record(avcc_data) {
1543                    Some(config) => {
1544                        tracing::info!(
1545                            broadcast = %broadcast,
1546                            codec = %config.codec_string(),
1547                            width = vid_width,
1548                            height = vid_height,
1549                            "WS ingest: video config received"
1550                        );
1551                        let init = remux::video_init_segment_with_size(&config, vid_width, vid_height);
1552                        _video_config = Some(config.clone());
1553                        video_init = Some(init.clone());
1554                        state.init_segments.insert(broadcast.clone(), init);
1555
1556                        let json = remux::generate_catalog(Some(&config), None);
1557                        if let Ok(mut group) = catalog_track.append_group() {
1558                            let _ = group.write_frame(Bytes::from(json));
1559                            let _ = group.finish();
1560                        }
1561                    }
1562                    None => {
1563                        tracing::warn!("invalid AVCC record from browser");
1564                    }
1565                }
1566            }
1567            1 => {
1568                let Some(ref init) = video_init else { continue };
1569                if let Some(mut g) = video_group.take() {
1570                    let _ = g.finish();
1571                }
1572                video_seq += 1;
1573                let base_dts = (timestamp as u64) * 90;
1574                let sample = lvqr_cmaf::RawSample {
1575                    track_id: 1,
1576                    dts: base_dts,
1577                    cts_offset: 0,
1578                    duration: 3000,
1579                    payload,
1580                    keyframe: true,
1581                };
1582                if let Ok(mut group) = video_track.append_group() {
1583                    let _ = group.write_frame(init.clone());
1584                    let seg = lvqr_cmaf::build_moof_mdat(video_seq, 1, base_dts, &[sample]);
1585                    let _ = group.write_frame(seg);
1586                    video_group = Some(group);
1587                }
1588            }
1589            2 => {
1590                if video_init.is_none() {
1591                    continue;
1592                }
1593                video_seq += 1;
1594                let base_dts = (timestamp as u64) * 90;
1595                let sample = lvqr_cmaf::RawSample {
1596                    track_id: 1,
1597                    dts: base_dts,
1598                    cts_offset: 0,
1599                    duration: 3000,
1600                    payload,
1601                    keyframe: false,
1602                };
1603                if let Some(ref mut group) = video_group {
1604                    let seg = lvqr_cmaf::build_moof_mdat(video_seq, 1, base_dts, &[sample]);
1605                    let _ = group.write_frame(seg);
1606                }
1607            }
1608            _ => {}
1609        }
1610    }
1611
1612    if let Some(mut g) = video_group.take() {
1613        let _ = g.finish();
1614    }
1615    state.events.emit(RelayEvent::BroadcastStopped {
1616        name: broadcast.clone(),
1617    });
1618    tracing::info!(broadcast = %broadcast, "WS ingest session ended");
1619}
1620
1621/// Background task that listens on the event bus for new broadcasts and
1622/// starts a recorder for each one. Event-driven so WS-ingested broadcasts
1623/// are recorded identically to RTMP-ingested ones.
1624async fn spawn_recordings(
1625    recorder: lvqr_record::BroadcastRecorder,
1626    origin: lvqr_moq::OriginProducer,
1627    mut events: tokio::sync::broadcast::Receiver<RelayEvent>,
1628    shutdown: CancellationToken,
1629) {
1630    let mut active: std::collections::HashSet<String> = std::collections::HashSet::new();
1631    loop {
1632        let event = tokio::select! {
1633            res = events.recv() => res,
1634            _ = shutdown.cancelled() => return,
1635        };
1636        let event = match event {
1637            Ok(e) => e,
1638            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1639                tracing::warn!(missed = n, "recorder event stream lagged");
1640                continue;
1641            }
1642            Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
1643        };
1644        match event {
1645            RelayEvent::BroadcastStarted { name } => {
1646                if !active.insert(name.clone()) {
1647                    continue;
1648                }
1649                let consumer = origin.consume();
1650                let Some(broadcast) = consumer.consume_broadcast(&name) else {
1651                    tracing::warn!(broadcast = %name, "recorder: broadcast not resolvable yet");
1652                    active.remove(&name);
1653                    continue;
1654                };
1655                let recorder = recorder.clone();
1656                let cancel = shutdown.clone();
1657                tracing::info!(broadcast = %name, "starting recording");
1658                let name_clone = name.clone();
1659                tokio::spawn(async move {
1660                    let _ = recorder
1661                        .record_broadcast(&name_clone, broadcast, lvqr_record::RecordOptions::default(), cancel)
1662                        .await;
1663                });
1664            }
1665            RelayEvent::BroadcastStopped { name } => {
1666                active.remove(&name);
1667            }
1668            RelayEvent::ViewerJoined { .. } | RelayEvent::ViewerLeft { .. } => {}
1669        }
1670    }
1671}
1672
1673/// Parse an AVCDecoderConfigurationRecord from a WS ingest `type=0` payload.
1674fn parse_avcc_record(data: &[u8]) -> Option<lvqr_ingest::remux::VideoConfig> {
1675    if data.len() < 6 {
1676        return None;
1677    }
1678    let profile = data[1];
1679    let compat = data[2];
1680    let level = data[3];
1681    let nalu_length_size = (data[4] & 0x03) + 1;
1682
1683    let num_sps = (data[5] & 0x1F) as usize;
1684    let mut offset = 6;
1685    let mut sps_list = Vec::with_capacity(num_sps);
1686    for _ in 0..num_sps {
1687        if offset + 2 > data.len() {
1688            return None;
1689        }
1690        let len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
1691        offset += 2;
1692        if offset + len > data.len() {
1693            return None;
1694        }
1695        sps_list.push(data[offset..offset + len].to_vec());
1696        offset += len;
1697    }
1698
1699    if offset >= data.len() {
1700        return None;
1701    }
1702    let num_pps = data[offset] as usize;
1703    offset += 1;
1704    let mut pps_list = Vec::with_capacity(num_pps);
1705    for _ in 0..num_pps {
1706        if offset + 2 > data.len() {
1707            return None;
1708        }
1709        let len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
1710        offset += 2;
1711        if offset + len > data.len() {
1712            return None;
1713        }
1714        pps_list.push(data[offset..offset + len].to_vec());
1715        offset += len;
1716    }
1717
1718    if sps_list.is_empty() || pps_list.is_empty() {
1719        return None;
1720    }
1721
1722    Some(lvqr_ingest::remux::VideoConfig {
1723        sps_list,
1724        pps_list,
1725        profile,
1726        compat,
1727        level,
1728        nalu_length_size,
1729    })
1730}