Skip to main content

lvqr_cli/
lib.rs

1//! LVQR server library entry point.
2//!
3//! `main.rs` parses CLI args and hands off to [`start`]; tests and embedders
4//! can call [`start`] directly with a pre-built [`ServeConfig`]. Every
5//! listener (MoQ/QUIC, RTMP/TCP, admin/TCP) is bound inside `start` before
6//! it returns, so callers who pass `port: 0` can read the real bound
7//! addresses back off the returned [`ServerHandle`] and point test clients
8//! at them without polling.
9//!
10//! This is the library target used by `lvqr-test-utils::TestServer` to
11//! spin up a full-stack LVQR instance on ephemeral ports inside
12//! integration tests.
13
14mod archive;
15mod auth_middleware;
16mod captions;
17#[cfg(feature = "cluster")]
18pub mod cluster_claim;
19mod config;
20mod handle;
21mod hls;
22mod signed_url;
23mod ws;
24
25pub use archive::sign_playback_url;
26pub use config::ServeConfig;
27#[cfg(feature = "transcode")]
28pub use config::{parse_one_transcode_rendition, parse_transcode_renditions};
29pub use handle::ServerHandle;
30/// Re-export of [`lvqr_admin::LatencyTracker`] so downstream callers
31/// (`lvqr-test-utils`, integration tests) do not need to pull
32/// `lvqr-admin` in as a direct dep. Tier 4 item 4.7 session A.
33pub use lvqr_admin::{LatencyTracker, SloEntry};
34pub use signed_url::{LiveScheme, sign_live_url};
35
36use anyhow::Result;
37use axum::middleware::from_fn_with_state;
38use axum::routing::get;
39use lvqr_auth::{NoopAuthProvider, SharedAuth};
40use lvqr_core::{EventBus, RelayEvent};
41use lvqr_dash::{BroadcasterDashBridge, DashConfig};
42use lvqr_fragment::FragmentBroadcasterRegistry;
43use lvqr_hls::{MultiHlsServer, PlaylistBuilderConfig};
44use std::sync::Arc;
45use std::sync::atomic::Ordering;
46use tokio_util::sync::CancellationToken;
47use tower_http::cors::CorsLayer;
48
49#[cfg(feature = "c2pa")]
50use crate::archive::verify_router;
51use crate::archive::{BroadcasterArchiveIndexer, playback_router};
52use crate::auth_middleware::{
53    LivePlaybackAuthState, SignalAuthState, live_playback_auth_middleware, signal_auth_middleware,
54};
55use crate::hls::BroadcasterHlsBridge;
56use crate::ws::{WsRelayState, spawn_recordings, ws_ingest_handler, ws_relay_handler};
57
58/// Start a full-stack LVQR server. All listeners are bound before the
59/// function returns, so the [`ServerHandle`] immediately reports real
60/// addresses even when the config requested ephemeral ports.
61///
62/// The returned handle owns a background task that runs the relay, RTMP,
63/// and admin subsystems under a shared cancellation token. Use
64/// [`ServerHandle::shutdown`] for deterministic teardown.
65pub async fn start(config: ServeConfig) -> Result<ServerHandle> {
66    tracing::info!(
67        relay = %config.relay_addr,
68        rtmp = %config.rtmp_addr,
69        admin = %config.admin_addr,
70        mesh = config.mesh_enabled,
71        "starting LVQR server"
72    );
73
74    // Metrics recorder install. Process-wide, must be skipped in
75    // tests (all four permutations below call
76    // `metrics::set_global_recorder` which panics or errors on
77    // second install). The four cases are:
78    //   Prom + OTel:   install a FanoutBuilder of both.
79    //   Prom only:     install the Prometheus recorder (legacy).
80    //   OTel only:     install the OTel-forwarding recorder.
81    //   Neither:       install nothing; metrics calls are no-ops.
82    // The `PrometheusRecorder` handle is exposed on
83    // `ServerHandle` for the admin `/metrics` scrape route, so
84    // we always capture it before handing the recorder off to a
85    // Fanout layer.
86    let prom_handle = match (config.install_prometheus, config.otel_metrics_recorder.clone()) {
87        (true, Some(otel_recorder)) => {
88            let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
89            let handle = prom_recorder.handle();
90            let fanout = metrics_util::layers::FanoutBuilder::default()
91                .add_recorder(prom_recorder)
92                .add_recorder(otel_recorder)
93                .build();
94            metrics::set_global_recorder(fanout)
95                .map_err(|e| anyhow::anyhow!("failed to install metrics fanout recorder: {e}"))?;
96            Some(handle)
97        }
98        (true, None) => Some(
99            metrics_exporter_prometheus::PrometheusBuilder::new()
100                .install_recorder()
101                .map_err(|e| anyhow::anyhow!("failed to install Prometheus recorder: {e}"))?,
102        ),
103        (false, Some(otel_recorder)) => {
104            metrics::set_global_recorder(otel_recorder)
105                .map_err(|e| anyhow::anyhow!("failed to install OTLP metrics recorder: {e}"))?;
106            None
107        }
108        (false, None) => None,
109    };
110
111    let shutdown = CancellationToken::new();
112
113    // Auth provider: caller-provided, or fall back to open access.
114    let auth: SharedAuth = config
115        .auth
116        .clone()
117        .unwrap_or_else(|| Arc::new(NoopAuthProvider) as SharedAuth);
118
119    // Shared lifecycle bus: bridge and WS ingest emit
120    // BroadcastStarted/Stopped here; recorder subscribes to it.
121    let events = EventBus::default();
122
123    // MoQ relay: init_server() binds the QUIC socket and reports the real
124    // bound address, which we surface back through ServerHandle.
125    let relay_config = lvqr_relay::RelayConfig::new(config.relay_addr);
126    let mut relay = lvqr_relay::RelayServer::new(relay_config);
127    relay.set_auth_provider(auth.clone());
128    let (mut moq_server, relay_bound) = relay.init_server()?;
129    tracing::info!(addr = %relay_bound, "MoQ relay bound");
130
131    // Optional cluster bootstrap. Resolver for `MultiHlsServer` is
132    // built up-front so the HLS constructor below can install it in
133    // one shot instead of patching the server after the fact.
134    #[cfg(feature = "cluster")]
135    let cluster = if let Some(listen) = config.cluster_listen {
136        let ccfg = lvqr_cluster::ClusterConfig {
137            listen,
138            seeds: config.cluster_seeds.clone(),
139            node_id: config.cluster_node_id.clone().map(lvqr_cluster::NodeId::new),
140            cluster_id: config
141                .cluster_id
142                .clone()
143                .unwrap_or_else(|| lvqr_cluster::ClusterConfig::default().cluster_id),
144            ..lvqr_cluster::ClusterConfig::default()
145        };
146        let c = lvqr_cluster::Cluster::bootstrap(ccfg)
147            .await
148            .map_err(|e| anyhow::anyhow!("cluster bootstrap failed: {e}"))?;
149        let c = std::sync::Arc::new(c);
150        if config.cluster_advertise_hls.is_some()
151            || config.cluster_advertise_dash.is_some()
152            || config.cluster_advertise_rtsp.is_some()
153        {
154            let endpoints = lvqr_cluster::NodeEndpoints {
155                hls: config.cluster_advertise_hls.clone(),
156                dash: config.cluster_advertise_dash.clone(),
157                rtsp: config.cluster_advertise_rtsp.clone(),
158            };
159            c.set_endpoints(&endpoints)
160                .await
161                .map_err(|e| anyhow::anyhow!("cluster set_endpoints failed: {e}"))?;
162        }
163        tracing::info!(
164            node = %c.self_id(),
165            %listen,
166            advertise_hls = ?config.cluster_advertise_hls,
167            advertise_dash = ?config.cluster_advertise_dash,
168            advertise_rtsp = ?config.cluster_advertise_rtsp,
169            "cluster enabled"
170        );
171        Some(c)
172    } else {
173        None
174    };
175    #[cfg(feature = "cluster")]
176    let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = cluster.as_ref().map(|c| {
177        let c = c.clone();
178        let resolver: lvqr_hls::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
179            let c = c.clone();
180            Box::pin(async move {
181                let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
182                endpoints.hls
183            })
184        });
185        resolver
186    });
187    #[cfg(not(feature = "cluster"))]
188    let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = None;
189    #[cfg(feature = "cluster")]
190    let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = cluster.as_ref().map(|c| {
191        let c = c.clone();
192        let resolver: lvqr_dash::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
193            let c = c.clone();
194            Box::pin(async move {
195                let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
196                endpoints.dash
197            })
198        });
199        resolver
200    });
201    #[cfg(not(feature = "cluster"))]
202    let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = None;
203    #[cfg(feature = "cluster")]
204    let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = cluster.as_ref().map(|c| {
205        let c = c.clone();
206        let resolver: lvqr_rtsp::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
207            let c = c.clone();
208            Box::pin(async move {
209                let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
210                endpoints.rtsp
211            })
212        });
213        resolver
214    });
215    #[cfg(not(feature = "cluster"))]
216    let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = None;
217
218    // Optional multi-broadcast LL-HLS server. The broadcaster-native
219    // HLS bridge (installed below) subscribes on the shared registry
220    // and pumps fragments into the shared `MultiHlsServer` state.
221    // Each broadcast gets its own per-broadcast `HlsServer` on first
222    // publish; the axum router demultiplexes requests under
223    // `/hls/{broadcast}/...`.
224    //
225    // When clustering is enabled, an `OwnerResolver` redirects
226    // subscribers of peer-owned broadcasts to the owning node's HLS
227    // URL instead of returning 404.
228    let target_dur = config.hls_target_duration_secs;
229    let part_target_secs = config.hls_part_target_ms as f32 / 1000.0;
230    let max_segments = if config.hls_dvr_window_secs == 0 || target_dur == 0 {
231        None
232    } else {
233        Some((config.hls_dvr_window_secs / target_dur) as usize)
234    };
235    let hls_server = config.hls_addr.map(|_| {
236        let playlist_cfg = PlaylistBuilderConfig {
237            target_duration_secs: target_dur,
238            part_target_secs,
239            max_segments,
240            ..PlaylistBuilderConfig::default()
241        };
242        match hls_owner_resolver.clone() {
243            Some(r) => MultiHlsServer::with_owner_resolver(playlist_cfg, r),
244            None => MultiHlsServer::new(playlist_cfg),
245        }
246    });
247
248    // Optional multi-broadcast MPEG-DASH server. Sibling of the
249    // LL-HLS fan-out above: a single `MultiDashServer` subscribes
250    // on the shared registry and projects fragments onto a
251    // per-broadcast axum router mounted under `/dash/{broadcast}/...`.
252    // Every ingest protocol (RTMP, WHIP, SRT, RTSP) feeds DASH via
253    // the same `BroadcasterDashBridge` install below.
254    let dash_server = config.dash_addr.map(|_| match dash_owner_resolver.clone() {
255        Some(r) => lvqr_dash::MultiDashServer::with_owner_resolver(DashConfig::default(), r),
256        None => lvqr_dash::MultiDashServer::new(DashConfig::default()),
257    });
258
259    // Shared FragmentBroadcasterRegistry used by every ingest crate
260    // and every consumer. Session 60 completed the Tier 2.1 migration:
261    // every ingest protocol publishes to this one registry, and every
262    // consumer (archive, LL-HLS, DASH) installs an on_entry_created
263    // callback against it.
264    let shared_registry = FragmentBroadcasterRegistry::new();
265
266    // Tier 4 item 4.7 session A: one shared `LatencyTracker` per
267    // server feeds samples from every instrumented egress surface
268    // (currently LL-HLS drain + WS relay) and powers the
269    // `/api/v1/slo` admin route + the
270    // `lvqr_subscriber_glass_to_glass_ms` Prometheus histogram.
271    // Tests read the snapshot directly off `ServerHandle::slo()`.
272    let slo_tracker = lvqr_admin::LatencyTracker::new();
273
274    // Auto-claim every new broadcast against the cluster so peers
275    // redirect correctly without the operator having to call
276    // `Cluster::claim_broadcast` by hand. The bridge holds the
277    // `Claim` alive until every ingest publisher for that
278    // broadcast disconnects. Feature-gated; no-op when
279    // single-node.
280    #[cfg(feature = "cluster")]
281    if let Some(ref c) = cluster {
282        cluster_claim::install_cluster_claim_bridge(c.clone(), cluster_claim::DEFAULT_CLAIM_LEASE, &shared_registry);
283    }
284
285    // Optional WASM fragment filter tap. Installed BEFORE any
286    // ingest listener accepts traffic so the very first fragment
287    // of the first broadcast flows through the filter chain. Each
288    // path in `config.wasm_filter` becomes its own
289    // `SharedFilter` + `WasmFilterReloader` pair; the bridge sees
290    // one `ChainFilter` wrapping the ordered list. Per-slot
291    // reloaders watch the module path for changes and call
292    // `SharedFilter::replace` atomically when the file changes;
293    // in-flight fragments finish on the old module and the next
294    // fragment sees the new one, without disturbing the other
295    // slots in the chain.
296    let (wasm_filter_handle, wasm_reloader_handles, wasm_slot_counters) = if config.wasm_filter.is_empty() {
297        (None, Vec::new(), Vec::new())
298    } else {
299        let mut shareds: Vec<lvqr_wasm::SharedFilter> = Vec::with_capacity(config.wasm_filter.len());
300        let mut reloaders: Vec<lvqr_wasm::WasmFilterReloader> = Vec::with_capacity(config.wasm_filter.len());
301        for path in &config.wasm_filter {
302            let filter = lvqr_wasm::WasmFilter::load(path)
303                .map_err(|e| anyhow::anyhow!("WASM filter load at {} failed: {e}", path.display()))?;
304            tracing::info!(path = %path.display(), "WASM fragment filter loaded");
305            let shared = lvqr_wasm::SharedFilter::new(filter);
306            let reloader = lvqr_wasm::WasmFilterReloader::spawn(path, shared.clone())
307                .map_err(|e| anyhow::anyhow!("WASM filter hot-reload watcher at {} failed: {e}", path.display()))?;
308            shareds.push(shared);
309            reloaders.push(reloader);
310        }
311        let chain = lvqr_wasm::ChainFilter::new(shareds);
312        let chain_len = chain.len();
313        // PLAN Phase D session 140: extract per-slot counter handles
314        // BEFORE wrapping the chain in the bridge's outer SharedFilter.
315        // The outer SharedFilter type-erases the ChainFilter; capturing
316        // the Arc<SlotCounters> handles here is how the admin closure
317        // below reads per-slot seen/kept/dropped for
318        // `GET /api/v1/wasm-filter`.
319        let slot_counters = chain.slot_counters();
320        tracing::info!(chain_len, "WASM fragment filter chain installed");
321        let chain_shared = lvqr_wasm::SharedFilter::new(chain);
322        let bridge = lvqr_wasm::install_wasm_filter_bridge(&shared_registry, chain_shared, chain_len);
323        (Some(bridge), reloaders, slot_counters)
324    };
325
326    // RTMP ingest bridged to MoQ. Pre-bind the TCP listener so we can
327    // report the real bound port (for ephemeral-port test setups).
328    let mut bridge_builder = lvqr_ingest::RtmpMoqBridge::with_auth(relay.origin().clone(), auth.clone())
329        .with_events(events.clone())
330        .with_registry(shared_registry.clone());
331
332    // Optional DVR archive index. Opened before the bridge is frozen
333    // so the BroadcasterArchiveIndexer can install its on_entry_created
334    // callback on the shared registry. The index file lives at
335    // `<archive_dir>/archive.redb`; the directory is created on
336    // demand if it does not already exist.
337    let archive_index = if let Some(ref dir) = config.archive_dir {
338        std::fs::create_dir_all(dir)
339            .map_err(|e| anyhow::anyhow!("archive: failed to create {}: {e}", dir.display()))?;
340        let db_path = dir.join("archive.redb");
341        let index = lvqr_archive::RedbSegmentIndex::open(&db_path)
342            .map_err(|e| anyhow::anyhow!("archive: failed to open {}: {e}", db_path.display()))?;
343        tracing::info!(dir = %dir.display(), "DVR archive index enabled");
344        Some((dir.clone(), Arc::new(index)))
345    } else {
346        None
347    };
348
349    // Install the broadcaster-based archive indexer on the shared
350    // registry. Every subsequent ingest-side emit is drained to disk +
351    // redb by a per-broadcaster tokio task the indexer spawns.
352    if let Some((ref dir, ref index)) = archive_index {
353        #[cfg(feature = "c2pa")]
354        BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry, config.c2pa.clone());
355        #[cfg(not(feature = "c2pa"))]
356        BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry);
357    }
358
359    // Install the broadcaster-based LL-HLS composition bridge on the
360    // shared registry. Every ingest crate's first `publish_init` for a
361    // `(broadcast, track)` pair fires the callback; the callback
362    // subscribes and spawns a drain task that projects fragments onto
363    // the shared `MultiHlsServer`. Session 60 consumer-side switchover:
364    // replaces the FragmentObserver path the HLS bridge used through
365    // session 59.
366    if let Some(ref hls) = hls_server {
367        BroadcasterHlsBridge::install(
368            hls.clone(),
369            config.hls_target_duration_secs * 1000,
370            config.hls_part_target_ms,
371            &shared_registry,
372            Some(slo_tracker.clone()),
373        );
374        // Tier 4 item 4.5 session C: feed the captions
375        // sub-track into the per-broadcast subtitles
376        // rendition. The bridge no-ops on every track that
377        // is not `"captions"`, so it composes safely with
378        // the LL-HLS bridge above.
379        captions::BroadcasterCaptionsBridge::install(hls.clone(), &shared_registry);
380    }
381
382    // Tier 4 item 4.5 session D: if the operator passed
383    // `--whisper-model <PATH>`, build the
384    // `WhisperCaptionsFactory` + `AgentRunner` and install it
385    // onto the shared registry so every new
386    // `(broadcast, "1.mp4")` triggers a WhisperCaptionsAgent.
387    // The agent republishes each caption cue onto
388    // `(broadcast, "captions")` where the
389    // `BroadcasterCaptionsBridge` above picks it up and feeds
390    // the HLS subtitle rendition. Without the flag (or without
391    // the `whisper` feature at all) no AI state is constructed.
392    #[cfg(feature = "whisper")]
393    let agent_runner_handle = if let Some(ref path) = config.whisper_model {
394        if hls_server.is_none() {
395            // The captions track reaches browser players only via
396            // the HLS subtitle rendition that `BroadcasterCaptionsBridge`
397            // wires above. With HLS disabled the WhisperCaptionsAgent
398            // still runs and publishes cues onto the registry and the
399            // in-process `CaptionStream`, but browser subscribers see
400            // nothing. Warn so misconfigured deployments surface early
401            // rather than through silent captions loss.
402            tracing::warn!(
403                path = %path.display(),
404                "whisper captions agent enabled without HLS surface; browser clients will not receive captions"
405            );
406        }
407        let factory =
408            lvqr_agent_whisper::WhisperCaptionsFactory::new(lvqr_agent_whisper::WhisperConfig::new(path.clone()))
409                .with_caption_registry(shared_registry.clone());
410        tracing::info!(path = %path.display(), "whisper captions agent enabled");
411        Some(
412            lvqr_agent::AgentRunner::new()
413                .with_factory(factory)
414                .install(&shared_registry),
415        )
416    } else {
417        None
418    };
419
420    // Install the broadcaster-based DASH composition bridge. Same
421    // pattern as LL-HLS: the callback spawns a drain task per
422    // `(broadcast, track)` that stamps a monotonic `$Number$` counter
423    // onto every observed fragment and pushes it into the per-broadcast
424    // `DashServer`. Session 60: completes the consumer-side switchover.
425    if let Some(ref dash) = dash_server {
426        BroadcasterDashBridge::install(dash.clone(), &shared_registry, Some(slo_tracker.clone()));
427    }
428
429    // Tier 4 item 4.6 session 106 C: if the operator passed
430    // `--transcode-rendition <NAME>` one or more times, install one
431    // `SoftwareTranscoderFactory` (GStreamer-backed video encoder) +
432    // one `AudioPassthroughTranscoderFactory` (zero-dep audio copy)
433    // per rendition against the shared registry. Every source
434    // broadcast's video + audio tracks then fan out into
435    // `<source>/<rendition>/{0,1}.mp4` output broadcasters the HLS
436    // bridge drains automatically. The ladder's metadata is also
437    // registered on the HLS server so the master-playlist composer
438    // emits one `#EXT-X-STREAM-INF` per rendition sibling.
439    #[cfg(feature = "transcode")]
440    let transcode_runner_handle = if config.transcode_renditions.is_empty() {
441        None
442    } else {
443        let mut runner = lvqr_transcode::TranscodeRunner::new();
444        let skip_suffixes: Vec<String> = config.transcode_renditions.iter().map(|r| r.name.clone()).collect();
445        for spec in &config.transcode_renditions {
446            let video_factory = lvqr_transcode::SoftwareTranscoderFactory::new(spec.clone(), shared_registry.clone())
447                .skip_source_suffixes(skip_suffixes.clone());
448            let audio_factory =
449                lvqr_transcode::AudioPassthroughTranscoderFactory::new(spec.clone(), shared_registry.clone())
450                    .skip_source_suffixes(skip_suffixes.clone());
451            runner = runner.with_factory(video_factory).with_factory(audio_factory);
452        }
453        tracing::info!(
454            renditions = ?config
455                .transcode_renditions
456                .iter()
457                .map(|r| r.name.clone())
458                .collect::<Vec<_>>(),
459            "transcode ladder enabled",
460        );
461        // Publish ladder metadata to the HLS master-playlist composer.
462        if let Some(ref hls) = hls_server {
463            let meta: Vec<lvqr_hls::RenditionMeta> = config
464                .transcode_renditions
465                .iter()
466                .map(|r| lvqr_hls::RenditionMeta {
467                    name: r.name.clone(),
468                    bandwidth_bps: lvqr_hls::RenditionMeta::bandwidth_bps_with_overhead(
469                        r.video_bitrate_kbps + r.audio_bitrate_kbps,
470                    ),
471                    resolution: Some((r.width, r.height)),
472                    // Hard-coded placeholder per session 106 C
473                    // decision (d): real SPS / ASC parsing is a
474                    // session-107-or-later job.
475                    codecs: "avc1.640028,mp4a.40.2".into(),
476                })
477                .collect();
478            hls.set_ladder(meta);
479            hls.set_source_bandwidth_bps(
480                config
481                    .source_bandwidth_kbps
482                    .map(|kbps| (kbps as u64).saturating_mul(1_000)),
483            );
484        }
485        Some(runner.install(&shared_registry))
486    };
487
488    // Tier 4 item 4.4 session B: start a FederationRunner against any
489    // configured peer-cluster MoQ relays. Each runner task opens an
490    // outbound MoQ session, drains the remote origin's announcement
491    // stream, filters by the link's forwarded_broadcasts list, and
492    // re-publishes matched broadcasts into the local relay's origin
493    // producer so every MoQ subscriber on this node sees them as if
494    // they were ingested locally. No-op when the links list is empty.
495    // Feature-gated on `cluster` so single-node builds stay thin.
496    #[cfg(feature = "cluster")]
497    let federation_runner_handle = if config.federation_links.is_empty() {
498        None
499    } else {
500        tracing::info!(links = config.federation_links.len(), "starting federation runner");
501        Some(lvqr_cluster::FederationRunner::start(
502            config.federation_links.clone(),
503            relay.origin().clone(),
504            shutdown.clone(),
505        ))
506    };
507
508    // Optional WHEP surface. Constructed before the bridge is
509    // frozen into an `Arc` so we can attach the `WhepServer` as a
510    // `RawSampleObserver`; both the observer clone and the axum
511    // router clone share the same underlying session registry, so
512    // a POST on the router is immediately visible to the raw-sample
513    // fanout path.
514    let whep_server = if let Some(addr) = config.whep_addr {
515        let str0m_cfg = lvqr_whep::Str0mConfig { host_ip: addr.ip() };
516        // Tier 4 item 4.7 session 110 B: thread the shared
517        // LatencyTracker into the str0m answerer so every spawned
518        // session's poll loop records one sample per successful
519        // `Writer::write` under transport="whep".
520        let answerer_builder = lvqr_whep::Str0mAnswerer::new(str0m_cfg).with_slo_tracker(slo_tracker.clone());
521        // Session 113: when built with the `transcode` meta-feature
522        // (which activates `lvqr-whep/aac-opus`), attach the AAC-to-
523        // Opus encoder factory so RTMP / SRT / RTSP AAC publishers
524        // reach Opus-only WHEP subscribers with audio. The factory
525        // probes GStreamer elements once; missing elements log and
526        // the factory opts out per-session, so a misconfigured host
527        // still serves video to WHEP without panicking.
528        #[cfg(feature = "transcode")]
529        let answerer_builder =
530            answerer_builder.with_aac_to_opus_factory(Arc::new(lvqr_transcode::AacToOpusEncoderFactory::new()));
531        let answerer = Arc::new(answerer_builder) as Arc<dyn lvqr_whep::SdpAnswerer>;
532        let server = lvqr_whep::WhepServer::new(answerer);
533        let observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
534        bridge_builder = bridge_builder.with_raw_sample_observer(observer);
535        Some(server)
536    } else {
537        None
538    };
539
540    // Optional WHIP ingest surface. The bridge side is a sibling
541    // of `RtmpMoqBridge`: it owns its own `BroadcastProducer` state
542    // but publishes fragments onto the same shared registry, so every
543    // existing egress (MoQ, LL-HLS, DASH, disk record, DVR archive)
544    // picks up WHIP publishers with zero additional wiring.
545    let (whip_server, whip_bridge) = if let Some(addr) = config.whip_addr {
546        let mut whip_bridge = lvqr_whip::WhipMoqBridge::new(relay.origin().clone())
547            .with_events(events.clone())
548            .with_registry(shared_registry.clone());
549        if let Some(ref server) = whep_server {
550            let raw_observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
551            whip_bridge = whip_bridge.with_raw_sample_observer(raw_observer);
552        }
553        let whip_bridge_arc = Arc::new(whip_bridge);
554        let sink = whip_bridge_arc.clone() as Arc<dyn lvqr_whip::IngestSampleSink>;
555        let str0m_cfg = lvqr_whip::Str0mIngestConfig { host_ip: addr.ip() };
556        let answerer =
557            Arc::new(lvqr_whip::Str0mIngestAnswerer::new(str0m_cfg, sink)) as Arc<dyn lvqr_whip::SdpAnswerer>;
558        let server = lvqr_whip::WhipServer::with_auth_provider(answerer, auth.clone());
559        (Some(server), Some(whip_bridge_arc))
560    } else {
561        (None, None)
562    };
563
564    // Optional SRT ingest server. Publishes to the shared registry;
565    // every broadcaster-native consumer picks up SRT publishers
566    // automatically.
567    let (srt_server, srt_bound) = if let Some(addr) = config.srt_addr {
568        let mut server =
569            lvqr_srt::SrtIngestServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
570        let bound = server.bind().await?;
571        tracing::info!(addr = %bound, "SRT ingest bound");
572        (Some(server), Some(bound))
573    } else {
574        (None, None)
575    };
576    let srt_events_clone = events.clone();
577    let srt_shutdown_token = shutdown.clone();
578
579    // Optional RTSP ingest server. Publishes to the shared registry
580    // alongside every other ingest protocol. When clustering is
581    // enabled, the owner resolver redirects DESCRIBE / PLAY for
582    // peer-owned broadcasts with RTSP 302.
583    let (rtsp_server, rtsp_bound) = if let Some(addr) = config.rtsp_addr {
584        let mut server = lvqr_rtsp::RtspServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
585        if let Some(r) = rtsp_owner_resolver.clone() {
586            server = server.with_owner_resolver(r);
587        }
588        let bound = server.bind().await?;
589        tracing::info!(addr = %bound, "RTSP ingest bound");
590        (Some(server), Some(bound))
591    } else {
592        (None, None)
593    };
594    let rtsp_events_clone = events.clone();
595    let rtsp_shutdown_token = shutdown.clone();
596
597    let bridge = Arc::new(bridge_builder);
598    let rtmp_config = lvqr_ingest::RtmpConfig {
599        bind_addr: config.rtmp_addr,
600    };
601    let rtmp_server = bridge.create_rtmp_server(rtmp_config);
602    let rtmp_listener = tokio::net::TcpListener::bind(config.rtmp_addr).await?;
603    let rtmp_bound = rtmp_listener.local_addr()?;
604    tracing::info!(addr = %rtmp_bound, "RTMP ingest bound");
605
606    // Admin listener: pre-bind to capture the real port.
607    let admin_listener = tokio::net::TcpListener::bind(config.admin_addr).await?;
608    let admin_bound = admin_listener.local_addr()?;
609    tracing::info!(addr = %admin_bound, "admin HTTP bound");
610
611    // HLS listener: pre-bind so the test harness can read the
612    // ephemeral port back via `ServerHandle::hls_addr` immediately
613    // after `start()` returns.
614    let (hls_listener, hls_bound) = if let Some(addr) = config.hls_addr {
615        let listener = tokio::net::TcpListener::bind(addr).await?;
616        let bound = listener.local_addr()?;
617        tracing::info!(addr = %bound, "LL-HLS HTTP bound");
618        (Some(listener), Some(bound))
619    } else {
620        (None, None)
621    };
622
623    // WHEP listener: pre-bind the same way so test harnesses can
624    // read the ephemeral port back immediately. `whep_server` was
625    // built earlier and is `None` if `config.whep_addr` is `None`.
626    let (whep_listener, whep_bound) = if let Some(addr) = config.whep_addr {
627        let listener = tokio::net::TcpListener::bind(addr).await?;
628        let bound = listener.local_addr()?;
629        tracing::info!(addr = %bound, "WHEP HTTP bound");
630        (Some(listener), Some(bound))
631    } else {
632        (None, None)
633    };
634
635    // DASH listener: pre-bind so ephemeral-port test harnesses can
636    // read the real port back via `ServerHandle::dash_addr`
637    // immediately after `start()` returns.
638    let (dash_listener, dash_bound) = if let Some(addr) = config.dash_addr {
639        let listener = tokio::net::TcpListener::bind(addr).await?;
640        let bound = listener.local_addr()?;
641        tracing::info!(addr = %bound, "MPEG-DASH HTTP bound");
642        (Some(listener), Some(bound))
643    } else {
644        (None, None)
645    };
646
647    // WHIP listener: pre-bind for the same reason. Keeping the
648    // bridge arc alive for the lifetime of the server task is
649    // important: dropping it would tear down every active MoQ
650    // broadcast produced by a WHIP publisher.
651    let (whip_listener, whip_bound) = if let Some(addr) = config.whip_addr {
652        let listener = tokio::net::TcpListener::bind(addr).await?;
653        let bound = listener.local_addr()?;
654        tracing::info!(addr = %bound, "WHIP HTTP bound");
655        (Some(listener), Some(bound))
656    } else {
657        (None, None)
658    };
659
660    // Optional disk recorder.
661    if let Some(ref dir) = config.record_dir {
662        let recorder = lvqr_record::BroadcastRecorder::new(dir);
663        let origin = relay.origin().clone();
664        let event_rx = events.subscribe();
665        let record_shutdown = shutdown.clone();
666        tracing::info!(dir = %dir.display(), "recording enabled");
667        tokio::spawn(async move {
668            spawn_recordings(recorder, origin, event_rx, record_shutdown).await;
669        });
670    }
671
672    // HLS finalization subscriber: when a broadcaster disconnects
673    // the RTMP bridge emits BroadcastStopped, and this task calls
674    // MultiHlsServer::finalize_broadcast so the playlist gains
675    // EXT-X-ENDLIST and the retained window becomes a VOD surface.
676    if let Some(ref hls) = hls_server {
677        let hls_for_finalize = hls.clone();
678        let mut hls_event_rx = events.subscribe();
679        let hls_finalize_shutdown = shutdown.clone();
680        tokio::spawn(async move {
681            loop {
682                tokio::select! {
683                    _ = hls_finalize_shutdown.cancelled() => break,
684                    msg = hls_event_rx.recv() => {
685                        match msg {
686                            Ok(RelayEvent::BroadcastStopped { name }) => {
687                                tracing::info!(broadcast = %name, "finalizing HLS broadcast");
688                                hls_for_finalize.finalize_broadcast(&name).await;
689                            }
690                            Ok(_) => {}
691                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
692                                tracing::warn!(missed = n, "HLS finalize subscriber lagged");
693                            }
694                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
695                        }
696                    }
697                }
698            }
699        });
700    }
701
702    // DASH finalization subscriber: same pattern as HLS above.
703    // Switches the MPD from type="dynamic" to type="static" so
704    // DASH clients stop polling for new segments.
705    if let Some(ref dash) = dash_server {
706        let dash_for_finalize = dash.clone();
707        let mut dash_event_rx = events.subscribe();
708        let dash_finalize_shutdown = shutdown.clone();
709        tokio::spawn(async move {
710            loop {
711                tokio::select! {
712                    _ = dash_finalize_shutdown.cancelled() => break,
713                    msg = dash_event_rx.recv() => {
714                        match msg {
715                            Ok(RelayEvent::BroadcastStopped { name }) => {
716                                tracing::info!(broadcast = %name, "finalizing DASH broadcast");
717                                dash_for_finalize.finalize_broadcast(&name);
718                            }
719                            Ok(_) => {}
720                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
721                                tracing::warn!(missed = n, "DASH finalize subscriber lagged");
722                            }
723                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
724                        }
725                    }
726                }
727            }
728        });
729    }
730
731    // Admin HTTP state and router.
732    let metrics_state = relay.metrics().clone();
733    let bridge_for_stats = bridge.clone();
734    let bridge_for_streams = bridge.clone();
735
736    let admin_state = lvqr_admin::AdminState::new(
737        move || {
738            let active = bridge_for_stats.active_stream_count() as u64;
739            lvqr_core::RelayStats {
740                publishers: active,
741                tracks: active * 2,
742                subscribers: metrics_state.connections_active.load(Ordering::Relaxed),
743                bytes_received: 0,
744                bytes_sent: 0,
745                uptime_secs: 0,
746            }
747        },
748        move || {
749            bridge_for_streams
750                .stream_names()
751                .into_iter()
752                .map(|name| lvqr_admin::StreamInfo { name, subscribers: 0 })
753                .collect()
754        },
755    )
756    .with_auth(auth.clone());
757    let admin_state = if let Some(prom) = prom_handle {
758        admin_state.with_metrics(Arc::new(move || prom.render()))
759    } else {
760        admin_state
761    };
762    // Wire the cluster into `/api/v1/cluster/*`. Without this the
763    // feature-gated routes in `lvqr-admin` reply 500 with a
764    // "cluster not wired" message.
765    #[cfg(feature = "cluster")]
766    let admin_state = match cluster.as_ref() {
767        Some(c) => admin_state.with_cluster(c.clone()),
768        None => admin_state,
769    };
770    // Wire the federation status handle into
771    // `/api/v1/cluster/federation`. Tier 4 item 4.4 session C.
772    // When no federation links are configured (handle is None),
773    // the route serves an empty list.
774    #[cfg(feature = "cluster")]
775    let admin_state = match federation_runner_handle.as_ref() {
776        Some(runner) => admin_state.with_federation_status(runner.status_handle()),
777        None => admin_state,
778    };
779    // Tier 4 item 4.7 session A: expose the shared latency tracker
780    // so `GET /api/v1/slo` returns per-(broadcast, transport)
781    // p50 / p95 / p99 / max samples.
782    let admin_state = admin_state.with_slo(slo_tracker.clone());
783
784    // PLAN Phase D session 137: expose the configured WASM filter
785    // chain on `GET /api/v1/wasm-filter` when `--wasm-filter` was
786    // set. The handle is already in scope from the bridge install
787    // earlier in `start()`; the closure reads a snapshot per call
788    // so the route always reflects the current chain_length + per-
789    // broadcast counters. When no filter is configured the default
790    // closure returns `{enabled: false, chain_length: 0,
791    // broadcasts: []}` so dashboards can pre-bake the shape.
792    let admin_state = match wasm_filter_handle.clone() {
793        Some(bridge) => {
794            let slot_counters = wasm_slot_counters.clone();
795            admin_state.with_wasm_filter(move || {
796                let broadcasts = bridge
797                    .tracked()
798                    .into_iter()
799                    .map(|(broadcast, track)| lvqr_admin::WasmFilterBroadcastStats {
800                        seen: bridge.fragments_seen(&broadcast, &track),
801                        kept: bridge.fragments_kept(&broadcast, &track),
802                        dropped: bridge.fragments_dropped(&broadcast, &track),
803                        broadcast,
804                        track,
805                    })
806                    .collect();
807                let slots = slot_counters
808                    .iter()
809                    .enumerate()
810                    .map(|(index, c)| lvqr_admin::WasmFilterSlotStats {
811                        index,
812                        seen: c.seen(),
813                        kept: c.kept(),
814                        dropped: c.dropped(),
815                    })
816                    .collect();
817                lvqr_admin::WasmFilterState {
818                    enabled: true,
819                    chain_length: bridge.chain_length(),
820                    broadcasts,
821                    slots,
822                }
823            })
824        }
825        None => admin_state,
826    };
827
828    // Session 111-B1: hoist `MeshCoordinator` construction out of
829    // the admin-router block so it can be stored on `ServerHandle`
830    // and accessed by integration tests + the session 111-B2
831    // `ws_relay_session` subscriber-registration wiring. `None`
832    // when `mesh_enabled = false`; `Some(Arc::new(..))` otherwise.
833    let mesh_coordinator: Option<Arc<lvqr_mesh::MeshCoordinator>> = if config.mesh_enabled {
834        let default_mesh = lvqr_mesh::MeshConfig::default();
835        let mesh_config = lvqr_mesh::MeshConfig {
836            max_children: config.max_peers,
837            root_peer_count: config.mesh_root_peer_count.unwrap_or(default_mesh.root_peer_count),
838            ..default_mesh
839        };
840        Some(Arc::new(lvqr_mesh::MeshCoordinator::new(mesh_config)))
841    } else {
842        None
843    };
844
845    // WebSocket fMP4 relay + WebSocket ingest state. Built AFTER
846    // `mesh_coordinator` so the mesh field can be wired through
847    // to `ws_relay_session` for session-111-B2 subscriber
848    // registration; when mesh is disabled the field stays `None`
849    // and the relay session behaves exactly as pre-111-B2.
850    let ws_state = WsRelayState {
851        origin: relay.origin().clone(),
852        init_segments: Arc::new(dashmap::DashMap::new()),
853        auth: auth.clone(),
854        events: events.clone(),
855        registry: shared_registry.clone(),
856        slo: Some(slo_tracker.clone()),
857        mesh: mesh_coordinator.clone(),
858    };
859    let ws_router = axum::Router::new()
860        .route("/ws/{*broadcast}", get(ws_relay_handler))
861        .route("/ingest/{*broadcast}", get(ws_ingest_handler))
862        .with_state(ws_state);
863
864    // PLAN v1.1 row 121 + session 128: when `ServeConfig.hmac_playback_secret`
865    // is set, every playback surface accepts `?sig=...&exp=...` as an
866    // alternative auth path that short-circuits the `SharedAuth`
867    // subscribe gate. Wrap the secret in `Arc<[u8]>` so every handler
868    // and middleware clone shares one copy. Hoisted above the
869    // `combined_router` block so the downstream HLS + DASH spawn
870    // blocks can also capture it into their `LivePlaybackAuthState`.
871    let hmac_playback_secret: Option<Arc<[u8]>> = config.hmac_playback_secret.as_ref().map(|s| Arc::from(s.as_bytes()));
872
873    let combined_router = {
874        let admin_router = if let Some(mesh) = mesh_coordinator.clone() {
875            let mesh_for_cb = mesh.clone();
876            relay.set_connection_callback(Arc::new(move |conn_id, connected| {
877                let peer_id = format!("conn-{conn_id}");
878                if connected {
879                    match mesh_for_cb.add_peer(peer_id.clone(), "default".to_string(), None) {
880                        Ok(a) => {
881                            tracing::info!(peer = %peer_id, role = ?a.role, depth = a.depth, "mesh: peer assigned");
882                        }
883                        Err(e) => {
884                            tracing::warn!(peer = %peer_id, error = ?e, "mesh: assign failed");
885                        }
886                    }
887                } else {
888                    let orphans = mesh_for_cb.remove_peer(&peer_id);
889                    for orphan in orphans {
890                        let _ = mesh_for_cb.reassign_peer(&orphan);
891                    }
892                }
893            }));
894
895            let mesh_for_reaper = mesh.clone();
896            let reaper_shutdown = shutdown.clone();
897            tokio::spawn(async move {
898                let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
899                loop {
900                    tokio::select! {
901                        _ = interval.tick() => {
902                            let dead = mesh_for_reaper.find_dead_peers();
903                            for peer_id in dead {
904                                tracing::info!(peer = %peer_id, "mesh: removing dead peer");
905                                let orphans = mesh_for_reaper.remove_peer(&peer_id);
906                                for orphan in orphans {
907                                    let _ = mesh_for_reaper.reassign_peer(&orphan);
908                                }
909                            }
910                        }
911                        _ = reaper_shutdown.cancelled() => {
912                            tracing::debug!("mesh reaper shutting down");
913                            break;
914                        }
915                    }
916                }
917            });
918
919            let mesh_for_signal = mesh.clone();
920            let mut signal = lvqr_signal::SignalServer::new();
921            // Session 111-B2: the callback is idempotent on
922            // Register. A client that already opened `/ws/*`
923            // (and therefore already holds a `ws-{n}` peer_id
924            // assigned by `ws_relay_session`) can reuse that
925            // peer_id on `/signal` without getting a second
926            // tree entry: the callback looks the peer up first
927            // and returns its existing assignment. Clients
928            // that open `/signal` without first opening `/ws`
929            // fall through to the pre-111-B2 path of
930            // `add_peer` + fresh assignment.
931            // Session 143: capture the operator-configured ICE-server
932            // list once. Every AssignParent emitted by this callback
933            // includes a clone of the snapshot. Empty when
934            // `--mesh-ice-servers` was not set; clients then fall
935            // back to their constructor-provided list.
936            let ice_servers_for_signal = config.mesh_ice_servers.clone();
937            // Session 144: clamp the client's self-reported capacity to
938            // the operator's configured global ceiling at register time
939            // so on-disk PeerInfo.capacity never exceeds it.
940            // `effective_capacity` clamps again as a defense in depth.
941            let global_max_children = config.max_peers as u32;
942            signal.set_peer_callback(Arc::new(move |event: &lvqr_signal::PeerEvent<'_>| {
943                let peer_id = event.peer_id;
944                let track = event.track;
945                if event.connected {
946                    if let Some(existing) = mesh_for_signal.get_peer(peer_id) {
947                        tracing::debug!(
948                            peer = %peer_id,
949                            role = ?existing.role,
950                            depth = existing.depth,
951                            "mesh: signal reusing existing peer entry from WS relay"
952                        );
953                        return Some(lvqr_signal::SignalMessage::AssignParent {
954                            peer_id: peer_id.to_string(),
955                            role: format!("{:?}", existing.role),
956                            parent_id: existing.parent.clone(),
957                            depth: existing.depth,
958                            ice_servers: ice_servers_for_signal.clone(),
959                        });
960                    }
961                    let clamped_capacity = event.capacity.map(|c| c.min(global_max_children));
962                    match mesh_for_signal.add_peer(peer_id.to_string(), track.to_string(), clamped_capacity) {
963                        Ok(a) => {
964                            tracing::info!(
965                                peer = %peer_id,
966                                role = ?a.role,
967                                depth = a.depth,
968                                capacity = ?clamped_capacity,
969                                "mesh: signal peer assigned"
970                            );
971                            Some(lvqr_signal::SignalMessage::AssignParent {
972                                peer_id: peer_id.to_string(),
973                                role: format!("{:?}", a.role),
974                                parent_id: a.parent,
975                                depth: a.depth,
976                                ice_servers: ice_servers_for_signal.clone(),
977                            })
978                        }
979                        Err(e) => {
980                            tracing::warn!(peer = %peer_id, error = ?e, "mesh: signal assign failed");
981                            None
982                        }
983                    }
984                } else {
985                    let orphans = mesh_for_signal.remove_peer(peer_id);
986                    for orphan in orphans {
987                        let _ = mesh_for_signal.reassign_peer(&orphan);
988                    }
989                    None
990                }
991            }));
992
993            // Session 141: bridge ForwardReport signal messages into
994            // the mesh coordinator so the admin route can expose
995            // actual-vs-intended offload. The callback is invoked with
996            // the peer_id resolved from the WS session state, not from
997            // a wire field, so a peer can only report for itself.
998            let mesh_for_report = mesh.clone();
999            signal.set_forward_report_callback(Arc::new(move |peer_id, forwarded_frames| {
1000                mesh_for_report.record_forward_report(peer_id, forwarded_frames);
1001            }));
1002
1003            let mesh_for_admin = mesh.clone();
1004            let admin_with_mesh = admin_state.with_mesh(move || {
1005                // Session 141: per-peer stats derived from the tree
1006                // snapshot. `intended_children` is the topology
1007                // planner's assignment; `forwarded_frames` is the
1008                // cumulative value from the most recent ForwardReport.
1009                let peers = mesh_for_admin
1010                    .tree_snapshot()
1011                    .into_iter()
1012                    .map(|p| lvqr_admin::MeshPeerStats {
1013                        peer_id: p.id.clone(),
1014                        role: format!("{:?}", p.role),
1015                        parent: p.parent.clone(),
1016                        depth: p.depth,
1017                        intended_children: p.children.len(),
1018                        forwarded_frames: p.forwarded_frames,
1019                        capacity: p.capacity,
1020                    })
1021                    .collect();
1022                lvqr_admin::MeshState {
1023                    enabled: true,
1024                    peer_count: mesh_for_admin.peer_count(),
1025                    offload_percentage: mesh_for_admin.offload_percentage(),
1026                    peers,
1027                }
1028            });
1029
1030            tracing::info!(
1031                max_children = config.max_peers,
1032                auth_gate = !config.no_auth_signal,
1033                "peer mesh enabled (/signal endpoint active)"
1034            );
1035
1036            // Session 111-B1: gate /signal with SubscribeAuth
1037            // unless `--no-auth-signal` was set. `?token=<token>`
1038            // query parameter carries the bearer; Sec-WebSocket-
1039            // Protocol support is deferred to 111-B2 pending a
1040            // subprotocol-echo upstream in `lvqr-signal`.
1041            let mut signal_router = signal.router();
1042            if !config.no_auth_signal {
1043                signal_router = signal_router.layer(from_fn_with_state(
1044                    SignalAuthState { auth: auth.clone() },
1045                    signal_auth_middleware,
1046                ));
1047            }
1048
1049            let router = lvqr_admin::build_router(admin_with_mesh);
1050            router.merge(signal_router)
1051        } else {
1052            lvqr_admin::build_router(admin_state)
1053        };
1054
1055        let combined = admin_router.merge(ws_router);
1056        let combined = if let Some((ref dir, ref index)) = archive_index {
1057            combined.merge(playback_router(
1058                dir.clone(),
1059                Arc::clone(index),
1060                auth.clone(),
1061                hmac_playback_secret.clone(),
1062            ))
1063        } else {
1064            combined
1065        };
1066        // Tier 4 item 4.3 session B3: feature-gated `/playback/verify/
1067        // {broadcast}` admin route. Mounted only when the `c2pa`
1068        // feature is on AND an archive directory is configured (the
1069        // verify route reads `<archive>/<broadcast>/<track>/
1070        // finalized.*` off disk, so an archive is a hard prerequisite).
1071        #[cfg(feature = "c2pa")]
1072        let combined = if let Some((ref dir, _)) = archive_index {
1073            combined.merge(verify_router(dir.clone(), auth.clone()))
1074        } else {
1075            combined
1076        };
1077        combined
1078    }
1079    .layer(CorsLayer::permissive());
1080
1081    // Spawn a single background task that joins relay + RTMP + admin and
1082    // signals the shared shutdown token if any subsystem exits early.
1083    let relay_shutdown = shutdown.clone();
1084    let rtmp_shutdown = shutdown.clone();
1085    let admin_shutdown = shutdown.clone();
1086    let hls_shutdown = shutdown.clone();
1087    let dash_shutdown = shutdown.clone();
1088    let whep_shutdown = shutdown.clone();
1089    let whip_shutdown = shutdown.clone();
1090    let bg_shutdown_for_task = shutdown.clone();
1091    let hls_router_pair =
1092        hls_listener.map(|listener| (listener, hls_server.expect("hls_server set when listener is set")));
1093    let dash_router_pair =
1094        dash_listener.map(|listener| (listener, dash_server.expect("dash_server set when listener is set")));
1095    let whep_router_pair =
1096        whep_listener.map(|listener| (listener, whep_server.expect("whep_server set when listener is set")));
1097    let whip_router_pair =
1098        whip_listener.map(|listener| (listener, whip_server.expect("whip_server set when listener is set")));
1099    // Moved into the spawned task below so it lives as long as
1100    // the WHIP poll loops; see `drop(_whip_bridge_keepalive)` at
1101    // the end of the join block.
1102    let whip_bridge_keepalive = whip_bridge;
1103
1104    // Clone the relay's OriginProducer for the ServerHandle. `relay`
1105    // itself moves into the accept-loop below, so the clone is how
1106    // callers (federation tests, admin consumers) reach the origin
1107    // for the server's lifetime.
1108    let relay_origin = relay.origin().clone();
1109
1110    let join = tokio::spawn(async move {
1111        let shutdown_on_exit_relay = bg_shutdown_for_task.clone();
1112        let relay_fut = async move {
1113            if let Err(e) = relay.accept_loop(&mut moq_server, relay_shutdown).await {
1114                tracing::error!(error = %e, "relay server error");
1115            }
1116            shutdown_on_exit_relay.cancel();
1117        };
1118
1119        let shutdown_on_exit_rtmp = bg_shutdown_for_task.clone();
1120        let rtmp_server_task = rtmp_server;
1121        let rtmp_fut = async move {
1122            if let Err(e) = rtmp_server_task.run_with_listener(rtmp_listener, rtmp_shutdown).await {
1123                tracing::error!(error = %e, "RTMP server error");
1124            }
1125            shutdown_on_exit_rtmp.cancel();
1126        };
1127
1128        let shutdown_on_exit_admin = bg_shutdown_for_task.clone();
1129        let admin_fut = async move {
1130            let result = axum::serve(admin_listener, combined_router)
1131                .with_graceful_shutdown(async move { admin_shutdown.cancelled().await })
1132                .await;
1133            if let Err(e) = &result {
1134                tracing::error!(error = %e, "admin server error");
1135            }
1136            shutdown_on_exit_admin.cancel();
1137        };
1138
1139        let shutdown_on_exit_hls = bg_shutdown_for_task.clone();
1140        let hls_auth = auth.clone();
1141        let hls_auth_disabled = config.no_auth_live_playback;
1142        // PLAN v1.1 session 128: share the same HMAC secret with the
1143        // live HLS + DASH auth middleware so one --hmac-playback-secret
1144        // configuration mints signed URLs across all three route trees.
1145        let hls_hmac_secret = hmac_playback_secret.clone();
1146        let hls_fut = async move {
1147            let Some((listener, server)) = hls_router_pair else {
1148                return;
1149            };
1150            // Session 112: apply the subscribe-auth gate to live
1151            // HLS routes. Noop provider deployments see no
1152            // behavior change (provider always allows). Configured
1153            // deployments (static token, JWT) get an automatic
1154            // 401 on requests without a valid bearer. Escape hatch
1155            // is `--no-auth-live-playback` for deployments that
1156            // deliberately want open live playback.
1157            let mut router = server.router();
1158            if !hls_auth_disabled {
1159                let state = LivePlaybackAuthState {
1160                    auth: hls_auth,
1161                    entry: "hls_live",
1162                    scheme: crate::signed_url::LiveScheme::Hls,
1163                    hmac_secret: hls_hmac_secret,
1164                };
1165                router = router.layer(from_fn_with_state(state, live_playback_auth_middleware));
1166            }
1167            let router = router.layer(CorsLayer::permissive());
1168            let result = axum::serve(listener, router)
1169                .with_graceful_shutdown(async move { hls_shutdown.cancelled().await })
1170                .await;
1171            if let Err(e) = &result {
1172                tracing::error!(error = %e, "HLS server error");
1173            }
1174            shutdown_on_exit_hls.cancel();
1175        };
1176
1177        let shutdown_on_exit_dash = bg_shutdown_for_task.clone();
1178        let dash_auth = auth.clone();
1179        let dash_auth_disabled = config.no_auth_live_playback;
1180        let dash_hmac_secret = hmac_playback_secret.clone();
1181        let dash_fut = async move {
1182            let Some((listener, server)) = dash_router_pair else {
1183                return;
1184            };
1185            let mut router = server.router();
1186            if !dash_auth_disabled {
1187                let state = LivePlaybackAuthState {
1188                    auth: dash_auth,
1189                    entry: "dash_live",
1190                    scheme: crate::signed_url::LiveScheme::Dash,
1191                    hmac_secret: dash_hmac_secret,
1192                };
1193                router = router.layer(from_fn_with_state(state, live_playback_auth_middleware));
1194            }
1195            let router = router.layer(CorsLayer::permissive());
1196            let result = axum::serve(listener, router)
1197                .with_graceful_shutdown(async move { dash_shutdown.cancelled().await })
1198                .await;
1199            if let Err(e) = &result {
1200                tracing::error!(error = %e, "DASH server error");
1201            }
1202            shutdown_on_exit_dash.cancel();
1203        };
1204
1205        let shutdown_on_exit_whep = bg_shutdown_for_task.clone();
1206        let whep_fut = async move {
1207            let Some((listener, server)) = whep_router_pair else {
1208                return;
1209            };
1210            let router = lvqr_whep::router_for(server);
1211            let result = axum::serve(listener, router)
1212                .with_graceful_shutdown(async move { whep_shutdown.cancelled().await })
1213                .await;
1214            if let Err(e) = &result {
1215                tracing::error!(error = %e, "WHEP server error");
1216            }
1217            shutdown_on_exit_whep.cancel();
1218        };
1219
1220        let shutdown_on_exit_whip = bg_shutdown_for_task.clone();
1221        let whip_fut = async move {
1222            let Some((listener, server)) = whip_router_pair else {
1223                return;
1224            };
1225            let router = lvqr_whip::router_for(server);
1226            let result = axum::serve(listener, router)
1227                .with_graceful_shutdown(async move { whip_shutdown.cancelled().await })
1228                .await;
1229            if let Err(e) = &result {
1230                tracing::error!(error = %e, "WHIP server error");
1231            }
1232            shutdown_on_exit_whip.cancel();
1233        };
1234
1235        let srt_shutdown = bg_shutdown_for_task.clone();
1236        let srt_events = srt_events_clone;
1237        let srt_cancel = srt_shutdown_token;
1238        let srt_fut = async move {
1239            let Some(server) = srt_server else { return };
1240            if let Err(e) = server.run(srt_events, srt_cancel).await {
1241                tracing::error!(error = %e, "SRT server error");
1242            }
1243            srt_shutdown.cancel();
1244        };
1245
1246        let rtsp_shutdown = bg_shutdown_for_task.clone();
1247        let rtsp_events = rtsp_events_clone;
1248        let rtsp_cancel = rtsp_shutdown_token;
1249        let rtsp_fut = async move {
1250            let Some(server) = rtsp_server else { return };
1251            if let Err(e) = server.run(rtsp_events, rtsp_cancel).await {
1252                tracing::error!(error = %e, "RTSP server error");
1253            }
1254            rtsp_shutdown.cancel();
1255        };
1256
1257        let _ = tokio::join!(
1258            relay_fut, rtmp_fut, admin_fut, hls_fut, dash_fut, whep_fut, whip_fut, srt_fut, rtsp_fut
1259        );
1260        drop(whip_bridge_keepalive);
1261        tracing::info!("shutdown complete");
1262    });
1263
1264    Ok(ServerHandle {
1265        relay_addr: relay_bound,
1266        rtmp_addr: rtmp_bound,
1267        admin_addr: admin_bound,
1268        hls_addr: hls_bound,
1269        whep_addr: whep_bound,
1270        whip_addr: whip_bound,
1271        dash_addr: dash_bound,
1272        rtsp_addr: rtsp_bound,
1273        srt_addr: srt_bound,
1274        shutdown,
1275        join: Some(join),
1276        #[cfg(feature = "cluster")]
1277        cluster,
1278        wasm_filter: wasm_filter_handle,
1279        _wasm_reloaders: wasm_reloader_handles,
1280        #[cfg(feature = "whisper")]
1281        agent_runner: agent_runner_handle,
1282        #[cfg(feature = "transcode")]
1283        transcode_runner: transcode_runner_handle,
1284        slo: slo_tracker,
1285        mesh_coordinator,
1286        fragment_registry: shared_registry,
1287        origin: relay_origin,
1288        #[cfg(feature = "cluster")]
1289        federation_runner: federation_runner_handle,
1290    })
1291}
1292
1293// Auth middleware extracted to `crate::auth_middleware`.
1294// WS relay + ingest + recorder event bridge extracted to `crate::ws`.