1mod archive;
15mod auth_middleware;
16mod captions;
17#[cfg(feature = "cluster")]
18pub mod cluster_claim;
19mod config;
20mod handle;
21mod hls;
22mod signed_url;
23mod ws;
24
25pub use archive::sign_playback_url;
26pub use config::ServeConfig;
27#[cfg(feature = "transcode")]
28pub use config::{parse_one_transcode_rendition, parse_transcode_renditions};
29pub use handle::ServerHandle;
30pub use lvqr_admin::{LatencyTracker, SloEntry};
34pub use signed_url::{LiveScheme, sign_live_url};
35
36use anyhow::Result;
37use axum::middleware::from_fn_with_state;
38use axum::routing::get;
39use lvqr_auth::{NoopAuthProvider, SharedAuth};
40use lvqr_core::{EventBus, RelayEvent};
41use lvqr_dash::{BroadcasterDashBridge, DashConfig};
42use lvqr_fragment::FragmentBroadcasterRegistry;
43use lvqr_hls::{MultiHlsServer, PlaylistBuilderConfig};
44use std::sync::Arc;
45use std::sync::atomic::Ordering;
46use tokio_util::sync::CancellationToken;
47use tower_http::cors::CorsLayer;
48
49#[cfg(feature = "c2pa")]
50use crate::archive::verify_router;
51use crate::archive::{BroadcasterArchiveIndexer, playback_router};
52use crate::auth_middleware::{
53 LivePlaybackAuthState, SignalAuthState, live_playback_auth_middleware, signal_auth_middleware,
54};
55use crate::hls::BroadcasterHlsBridge;
56use crate::ws::{WsRelayState, spawn_recordings, ws_ingest_handler, ws_relay_handler};
57
58pub async fn start(config: ServeConfig) -> Result<ServerHandle> {
66 tracing::info!(
67 relay = %config.relay_addr,
68 rtmp = %config.rtmp_addr,
69 admin = %config.admin_addr,
70 mesh = config.mesh_enabled,
71 "starting LVQR server"
72 );
73
74 let prom_handle = match (config.install_prometheus, config.otel_metrics_recorder.clone()) {
87 (true, Some(otel_recorder)) => {
88 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
89 let handle = prom_recorder.handle();
90 let fanout = metrics_util::layers::FanoutBuilder::default()
91 .add_recorder(prom_recorder)
92 .add_recorder(otel_recorder)
93 .build();
94 metrics::set_global_recorder(fanout)
95 .map_err(|e| anyhow::anyhow!("failed to install metrics fanout recorder: {e}"))?;
96 Some(handle)
97 }
98 (true, None) => Some(
99 metrics_exporter_prometheus::PrometheusBuilder::new()
100 .install_recorder()
101 .map_err(|e| anyhow::anyhow!("failed to install Prometheus recorder: {e}"))?,
102 ),
103 (false, Some(otel_recorder)) => {
104 metrics::set_global_recorder(otel_recorder)
105 .map_err(|e| anyhow::anyhow!("failed to install OTLP metrics recorder: {e}"))?;
106 None
107 }
108 (false, None) => None,
109 };
110
111 let shutdown = CancellationToken::new();
112
113 let auth: SharedAuth = config
115 .auth
116 .clone()
117 .unwrap_or_else(|| Arc::new(NoopAuthProvider) as SharedAuth);
118
119 let events = EventBus::default();
122
123 let relay_config = lvqr_relay::RelayConfig::new(config.relay_addr);
126 let mut relay = lvqr_relay::RelayServer::new(relay_config);
127 relay.set_auth_provider(auth.clone());
128 let (mut moq_server, relay_bound) = relay.init_server()?;
129 tracing::info!(addr = %relay_bound, "MoQ relay bound");
130
131 #[cfg(feature = "cluster")]
135 let cluster = if let Some(listen) = config.cluster_listen {
136 let ccfg = lvqr_cluster::ClusterConfig {
137 listen,
138 seeds: config.cluster_seeds.clone(),
139 node_id: config.cluster_node_id.clone().map(lvqr_cluster::NodeId::new),
140 cluster_id: config
141 .cluster_id
142 .clone()
143 .unwrap_or_else(|| lvqr_cluster::ClusterConfig::default().cluster_id),
144 ..lvqr_cluster::ClusterConfig::default()
145 };
146 let c = lvqr_cluster::Cluster::bootstrap(ccfg)
147 .await
148 .map_err(|e| anyhow::anyhow!("cluster bootstrap failed: {e}"))?;
149 let c = std::sync::Arc::new(c);
150 if config.cluster_advertise_hls.is_some()
151 || config.cluster_advertise_dash.is_some()
152 || config.cluster_advertise_rtsp.is_some()
153 {
154 let endpoints = lvqr_cluster::NodeEndpoints {
155 hls: config.cluster_advertise_hls.clone(),
156 dash: config.cluster_advertise_dash.clone(),
157 rtsp: config.cluster_advertise_rtsp.clone(),
158 };
159 c.set_endpoints(&endpoints)
160 .await
161 .map_err(|e| anyhow::anyhow!("cluster set_endpoints failed: {e}"))?;
162 }
163 tracing::info!(
164 node = %c.self_id(),
165 %listen,
166 advertise_hls = ?config.cluster_advertise_hls,
167 advertise_dash = ?config.cluster_advertise_dash,
168 advertise_rtsp = ?config.cluster_advertise_rtsp,
169 "cluster enabled"
170 );
171 Some(c)
172 } else {
173 None
174 };
175 #[cfg(feature = "cluster")]
176 let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = cluster.as_ref().map(|c| {
177 let c = c.clone();
178 let resolver: lvqr_hls::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
179 let c = c.clone();
180 Box::pin(async move {
181 let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
182 endpoints.hls
183 })
184 });
185 resolver
186 });
187 #[cfg(not(feature = "cluster"))]
188 let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = None;
189 #[cfg(feature = "cluster")]
190 let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = cluster.as_ref().map(|c| {
191 let c = c.clone();
192 let resolver: lvqr_dash::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
193 let c = c.clone();
194 Box::pin(async move {
195 let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
196 endpoints.dash
197 })
198 });
199 resolver
200 });
201 #[cfg(not(feature = "cluster"))]
202 let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = None;
203 #[cfg(feature = "cluster")]
204 let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = cluster.as_ref().map(|c| {
205 let c = c.clone();
206 let resolver: lvqr_rtsp::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
207 let c = c.clone();
208 Box::pin(async move {
209 let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
210 endpoints.rtsp
211 })
212 });
213 resolver
214 });
215 #[cfg(not(feature = "cluster"))]
216 let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = None;
217
218 let target_dur = config.hls_target_duration_secs;
229 let part_target_secs = config.hls_part_target_ms as f32 / 1000.0;
230 let max_segments = if config.hls_dvr_window_secs == 0 || target_dur == 0 {
231 None
232 } else {
233 Some((config.hls_dvr_window_secs / target_dur) as usize)
234 };
235 let hls_server = config.hls_addr.map(|_| {
236 let playlist_cfg = PlaylistBuilderConfig {
237 target_duration_secs: target_dur,
238 part_target_secs,
239 max_segments,
240 ..PlaylistBuilderConfig::default()
241 };
242 match hls_owner_resolver.clone() {
243 Some(r) => MultiHlsServer::with_owner_resolver(playlist_cfg, r),
244 None => MultiHlsServer::new(playlist_cfg),
245 }
246 });
247
248 let dash_server = config.dash_addr.map(|_| match dash_owner_resolver.clone() {
255 Some(r) => lvqr_dash::MultiDashServer::with_owner_resolver(DashConfig::default(), r),
256 None => lvqr_dash::MultiDashServer::new(DashConfig::default()),
257 });
258
259 let shared_registry = FragmentBroadcasterRegistry::new();
265
266 let slo_tracker = lvqr_admin::LatencyTracker::new();
273
274 #[cfg(feature = "cluster")]
281 if let Some(ref c) = cluster {
282 cluster_claim::install_cluster_claim_bridge(c.clone(), cluster_claim::DEFAULT_CLAIM_LEASE, &shared_registry);
283 }
284
285 let (wasm_filter_handle, wasm_reloader_handles, wasm_slot_counters) = if config.wasm_filter.is_empty() {
297 (None, Vec::new(), Vec::new())
298 } else {
299 let mut shareds: Vec<lvqr_wasm::SharedFilter> = Vec::with_capacity(config.wasm_filter.len());
300 let mut reloaders: Vec<lvqr_wasm::WasmFilterReloader> = Vec::with_capacity(config.wasm_filter.len());
301 for path in &config.wasm_filter {
302 let filter = lvqr_wasm::WasmFilter::load(path)
303 .map_err(|e| anyhow::anyhow!("WASM filter load at {} failed: {e}", path.display()))?;
304 tracing::info!(path = %path.display(), "WASM fragment filter loaded");
305 let shared = lvqr_wasm::SharedFilter::new(filter);
306 let reloader = lvqr_wasm::WasmFilterReloader::spawn(path, shared.clone())
307 .map_err(|e| anyhow::anyhow!("WASM filter hot-reload watcher at {} failed: {e}", path.display()))?;
308 shareds.push(shared);
309 reloaders.push(reloader);
310 }
311 let chain = lvqr_wasm::ChainFilter::new(shareds);
312 let chain_len = chain.len();
313 let slot_counters = chain.slot_counters();
320 tracing::info!(chain_len, "WASM fragment filter chain installed");
321 let chain_shared = lvqr_wasm::SharedFilter::new(chain);
322 let bridge = lvqr_wasm::install_wasm_filter_bridge(&shared_registry, chain_shared, chain_len);
323 (Some(bridge), reloaders, slot_counters)
324 };
325
326 let mut bridge_builder = lvqr_ingest::RtmpMoqBridge::with_auth(relay.origin().clone(), auth.clone())
329 .with_events(events.clone())
330 .with_registry(shared_registry.clone());
331
332 let archive_index = if let Some(ref dir) = config.archive_dir {
338 std::fs::create_dir_all(dir)
339 .map_err(|e| anyhow::anyhow!("archive: failed to create {}: {e}", dir.display()))?;
340 let db_path = dir.join("archive.redb");
341 let index = lvqr_archive::RedbSegmentIndex::open(&db_path)
342 .map_err(|e| anyhow::anyhow!("archive: failed to open {}: {e}", db_path.display()))?;
343 tracing::info!(dir = %dir.display(), "DVR archive index enabled");
344 Some((dir.clone(), Arc::new(index)))
345 } else {
346 None
347 };
348
349 if let Some((ref dir, ref index)) = archive_index {
353 #[cfg(feature = "c2pa")]
354 BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry, config.c2pa.clone());
355 #[cfg(not(feature = "c2pa"))]
356 BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry);
357 }
358
359 if let Some(ref hls) = hls_server {
367 BroadcasterHlsBridge::install(
368 hls.clone(),
369 config.hls_target_duration_secs * 1000,
370 config.hls_part_target_ms,
371 &shared_registry,
372 Some(slo_tracker.clone()),
373 );
374 captions::BroadcasterCaptionsBridge::install(hls.clone(), &shared_registry);
380 }
381
382 #[cfg(feature = "whisper")]
393 let agent_runner_handle = if let Some(ref path) = config.whisper_model {
394 if hls_server.is_none() {
395 tracing::warn!(
403 path = %path.display(),
404 "whisper captions agent enabled without HLS surface; browser clients will not receive captions"
405 );
406 }
407 let factory =
408 lvqr_agent_whisper::WhisperCaptionsFactory::new(lvqr_agent_whisper::WhisperConfig::new(path.clone()))
409 .with_caption_registry(shared_registry.clone());
410 tracing::info!(path = %path.display(), "whisper captions agent enabled");
411 Some(
412 lvqr_agent::AgentRunner::new()
413 .with_factory(factory)
414 .install(&shared_registry),
415 )
416 } else {
417 None
418 };
419
420 if let Some(ref dash) = dash_server {
426 BroadcasterDashBridge::install(dash.clone(), &shared_registry, Some(slo_tracker.clone()));
427 }
428
429 #[cfg(feature = "transcode")]
440 let transcode_runner_handle = if config.transcode_renditions.is_empty() {
441 None
442 } else {
443 let mut runner = lvqr_transcode::TranscodeRunner::new();
444 let skip_suffixes: Vec<String> = config.transcode_renditions.iter().map(|r| r.name.clone()).collect();
445 for spec in &config.transcode_renditions {
446 let video_factory = lvqr_transcode::SoftwareTranscoderFactory::new(spec.clone(), shared_registry.clone())
447 .skip_source_suffixes(skip_suffixes.clone());
448 let audio_factory =
449 lvqr_transcode::AudioPassthroughTranscoderFactory::new(spec.clone(), shared_registry.clone())
450 .skip_source_suffixes(skip_suffixes.clone());
451 runner = runner.with_factory(video_factory).with_factory(audio_factory);
452 }
453 tracing::info!(
454 renditions = ?config
455 .transcode_renditions
456 .iter()
457 .map(|r| r.name.clone())
458 .collect::<Vec<_>>(),
459 "transcode ladder enabled",
460 );
461 if let Some(ref hls) = hls_server {
463 let meta: Vec<lvqr_hls::RenditionMeta> = config
464 .transcode_renditions
465 .iter()
466 .map(|r| lvqr_hls::RenditionMeta {
467 name: r.name.clone(),
468 bandwidth_bps: lvqr_hls::RenditionMeta::bandwidth_bps_with_overhead(
469 r.video_bitrate_kbps + r.audio_bitrate_kbps,
470 ),
471 resolution: Some((r.width, r.height)),
472 codecs: "avc1.640028,mp4a.40.2".into(),
476 })
477 .collect();
478 hls.set_ladder(meta);
479 hls.set_source_bandwidth_bps(
480 config
481 .source_bandwidth_kbps
482 .map(|kbps| (kbps as u64).saturating_mul(1_000)),
483 );
484 }
485 Some(runner.install(&shared_registry))
486 };
487
488 #[cfg(feature = "cluster")]
497 let federation_runner_handle = if config.federation_links.is_empty() {
498 None
499 } else {
500 tracing::info!(links = config.federation_links.len(), "starting federation runner");
501 Some(lvqr_cluster::FederationRunner::start(
502 config.federation_links.clone(),
503 relay.origin().clone(),
504 shutdown.clone(),
505 ))
506 };
507
508 let whep_server = if let Some(addr) = config.whep_addr {
515 let str0m_cfg = lvqr_whep::Str0mConfig { host_ip: addr.ip() };
516 let answerer_builder = lvqr_whep::Str0mAnswerer::new(str0m_cfg).with_slo_tracker(slo_tracker.clone());
521 #[cfg(feature = "transcode")]
529 let answerer_builder =
530 answerer_builder.with_aac_to_opus_factory(Arc::new(lvqr_transcode::AacToOpusEncoderFactory::new()));
531 let answerer = Arc::new(answerer_builder) as Arc<dyn lvqr_whep::SdpAnswerer>;
532 let server = lvqr_whep::WhepServer::new(answerer);
533 let observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
534 bridge_builder = bridge_builder.with_raw_sample_observer(observer);
535 Some(server)
536 } else {
537 None
538 };
539
540 let (whip_server, whip_bridge) = if let Some(addr) = config.whip_addr {
546 let mut whip_bridge = lvqr_whip::WhipMoqBridge::new(relay.origin().clone())
547 .with_events(events.clone())
548 .with_registry(shared_registry.clone());
549 if let Some(ref server) = whep_server {
550 let raw_observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
551 whip_bridge = whip_bridge.with_raw_sample_observer(raw_observer);
552 }
553 let whip_bridge_arc = Arc::new(whip_bridge);
554 let sink = whip_bridge_arc.clone() as Arc<dyn lvqr_whip::IngestSampleSink>;
555 let str0m_cfg = lvqr_whip::Str0mIngestConfig { host_ip: addr.ip() };
556 let answerer =
557 Arc::new(lvqr_whip::Str0mIngestAnswerer::new(str0m_cfg, sink)) as Arc<dyn lvqr_whip::SdpAnswerer>;
558 let server = lvqr_whip::WhipServer::with_auth_provider(answerer, auth.clone());
559 (Some(server), Some(whip_bridge_arc))
560 } else {
561 (None, None)
562 };
563
564 let (srt_server, srt_bound) = if let Some(addr) = config.srt_addr {
568 let mut server =
569 lvqr_srt::SrtIngestServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
570 let bound = server.bind().await?;
571 tracing::info!(addr = %bound, "SRT ingest bound");
572 (Some(server), Some(bound))
573 } else {
574 (None, None)
575 };
576 let srt_events_clone = events.clone();
577 let srt_shutdown_token = shutdown.clone();
578
579 let (rtsp_server, rtsp_bound) = if let Some(addr) = config.rtsp_addr {
584 let mut server = lvqr_rtsp::RtspServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
585 if let Some(r) = rtsp_owner_resolver.clone() {
586 server = server.with_owner_resolver(r);
587 }
588 let bound = server.bind().await?;
589 tracing::info!(addr = %bound, "RTSP ingest bound");
590 (Some(server), Some(bound))
591 } else {
592 (None, None)
593 };
594 let rtsp_events_clone = events.clone();
595 let rtsp_shutdown_token = shutdown.clone();
596
597 let bridge = Arc::new(bridge_builder);
598 let rtmp_config = lvqr_ingest::RtmpConfig {
599 bind_addr: config.rtmp_addr,
600 };
601 let rtmp_server = bridge.create_rtmp_server(rtmp_config);
602 let rtmp_listener = tokio::net::TcpListener::bind(config.rtmp_addr).await?;
603 let rtmp_bound = rtmp_listener.local_addr()?;
604 tracing::info!(addr = %rtmp_bound, "RTMP ingest bound");
605
606 let admin_listener = tokio::net::TcpListener::bind(config.admin_addr).await?;
608 let admin_bound = admin_listener.local_addr()?;
609 tracing::info!(addr = %admin_bound, "admin HTTP bound");
610
611 let (hls_listener, hls_bound) = if let Some(addr) = config.hls_addr {
615 let listener = tokio::net::TcpListener::bind(addr).await?;
616 let bound = listener.local_addr()?;
617 tracing::info!(addr = %bound, "LL-HLS HTTP bound");
618 (Some(listener), Some(bound))
619 } else {
620 (None, None)
621 };
622
623 let (whep_listener, whep_bound) = if let Some(addr) = config.whep_addr {
627 let listener = tokio::net::TcpListener::bind(addr).await?;
628 let bound = listener.local_addr()?;
629 tracing::info!(addr = %bound, "WHEP HTTP bound");
630 (Some(listener), Some(bound))
631 } else {
632 (None, None)
633 };
634
635 let (dash_listener, dash_bound) = if let Some(addr) = config.dash_addr {
639 let listener = tokio::net::TcpListener::bind(addr).await?;
640 let bound = listener.local_addr()?;
641 tracing::info!(addr = %bound, "MPEG-DASH HTTP bound");
642 (Some(listener), Some(bound))
643 } else {
644 (None, None)
645 };
646
647 let (whip_listener, whip_bound) = if let Some(addr) = config.whip_addr {
652 let listener = tokio::net::TcpListener::bind(addr).await?;
653 let bound = listener.local_addr()?;
654 tracing::info!(addr = %bound, "WHIP HTTP bound");
655 (Some(listener), Some(bound))
656 } else {
657 (None, None)
658 };
659
660 if let Some(ref dir) = config.record_dir {
662 let recorder = lvqr_record::BroadcastRecorder::new(dir);
663 let origin = relay.origin().clone();
664 let event_rx = events.subscribe();
665 let record_shutdown = shutdown.clone();
666 tracing::info!(dir = %dir.display(), "recording enabled");
667 tokio::spawn(async move {
668 spawn_recordings(recorder, origin, event_rx, record_shutdown).await;
669 });
670 }
671
672 if let Some(ref hls) = hls_server {
677 let hls_for_finalize = hls.clone();
678 let mut hls_event_rx = events.subscribe();
679 let hls_finalize_shutdown = shutdown.clone();
680 tokio::spawn(async move {
681 loop {
682 tokio::select! {
683 _ = hls_finalize_shutdown.cancelled() => break,
684 msg = hls_event_rx.recv() => {
685 match msg {
686 Ok(RelayEvent::BroadcastStopped { name }) => {
687 tracing::info!(broadcast = %name, "finalizing HLS broadcast");
688 hls_for_finalize.finalize_broadcast(&name).await;
689 }
690 Ok(_) => {}
691 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
692 tracing::warn!(missed = n, "HLS finalize subscriber lagged");
693 }
694 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
695 }
696 }
697 }
698 }
699 });
700 }
701
702 if let Some(ref dash) = dash_server {
706 let dash_for_finalize = dash.clone();
707 let mut dash_event_rx = events.subscribe();
708 let dash_finalize_shutdown = shutdown.clone();
709 tokio::spawn(async move {
710 loop {
711 tokio::select! {
712 _ = dash_finalize_shutdown.cancelled() => break,
713 msg = dash_event_rx.recv() => {
714 match msg {
715 Ok(RelayEvent::BroadcastStopped { name }) => {
716 tracing::info!(broadcast = %name, "finalizing DASH broadcast");
717 dash_for_finalize.finalize_broadcast(&name);
718 }
719 Ok(_) => {}
720 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
721 tracing::warn!(missed = n, "DASH finalize subscriber lagged");
722 }
723 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
724 }
725 }
726 }
727 }
728 });
729 }
730
731 let metrics_state = relay.metrics().clone();
733 let bridge_for_stats = bridge.clone();
734 let bridge_for_streams = bridge.clone();
735
736 let admin_state = lvqr_admin::AdminState::new(
737 move || {
738 let active = bridge_for_stats.active_stream_count() as u64;
739 lvqr_core::RelayStats {
740 publishers: active,
741 tracks: active * 2,
742 subscribers: metrics_state.connections_active.load(Ordering::Relaxed),
743 bytes_received: 0,
744 bytes_sent: 0,
745 uptime_secs: 0,
746 }
747 },
748 move || {
749 bridge_for_streams
750 .stream_names()
751 .into_iter()
752 .map(|name| lvqr_admin::StreamInfo { name, subscribers: 0 })
753 .collect()
754 },
755 )
756 .with_auth(auth.clone());
757 let admin_state = if let Some(prom) = prom_handle {
758 admin_state.with_metrics(Arc::new(move || prom.render()))
759 } else {
760 admin_state
761 };
762 #[cfg(feature = "cluster")]
766 let admin_state = match cluster.as_ref() {
767 Some(c) => admin_state.with_cluster(c.clone()),
768 None => admin_state,
769 };
770 #[cfg(feature = "cluster")]
775 let admin_state = match federation_runner_handle.as_ref() {
776 Some(runner) => admin_state.with_federation_status(runner.status_handle()),
777 None => admin_state,
778 };
779 let admin_state = admin_state.with_slo(slo_tracker.clone());
783
784 let admin_state = match wasm_filter_handle.clone() {
793 Some(bridge) => {
794 let slot_counters = wasm_slot_counters.clone();
795 admin_state.with_wasm_filter(move || {
796 let broadcasts = bridge
797 .tracked()
798 .into_iter()
799 .map(|(broadcast, track)| lvqr_admin::WasmFilterBroadcastStats {
800 seen: bridge.fragments_seen(&broadcast, &track),
801 kept: bridge.fragments_kept(&broadcast, &track),
802 dropped: bridge.fragments_dropped(&broadcast, &track),
803 broadcast,
804 track,
805 })
806 .collect();
807 let slots = slot_counters
808 .iter()
809 .enumerate()
810 .map(|(index, c)| lvqr_admin::WasmFilterSlotStats {
811 index,
812 seen: c.seen(),
813 kept: c.kept(),
814 dropped: c.dropped(),
815 })
816 .collect();
817 lvqr_admin::WasmFilterState {
818 enabled: true,
819 chain_length: bridge.chain_length(),
820 broadcasts,
821 slots,
822 }
823 })
824 }
825 None => admin_state,
826 };
827
828 let mesh_coordinator: Option<Arc<lvqr_mesh::MeshCoordinator>> = if config.mesh_enabled {
834 let default_mesh = lvqr_mesh::MeshConfig::default();
835 let mesh_config = lvqr_mesh::MeshConfig {
836 max_children: config.max_peers,
837 root_peer_count: config.mesh_root_peer_count.unwrap_or(default_mesh.root_peer_count),
838 ..default_mesh
839 };
840 Some(Arc::new(lvqr_mesh::MeshCoordinator::new(mesh_config)))
841 } else {
842 None
843 };
844
845 let ws_state = WsRelayState {
851 origin: relay.origin().clone(),
852 init_segments: Arc::new(dashmap::DashMap::new()),
853 auth: auth.clone(),
854 events: events.clone(),
855 registry: shared_registry.clone(),
856 slo: Some(slo_tracker.clone()),
857 mesh: mesh_coordinator.clone(),
858 };
859 let ws_router = axum::Router::new()
860 .route("/ws/{*broadcast}", get(ws_relay_handler))
861 .route("/ingest/{*broadcast}", get(ws_ingest_handler))
862 .with_state(ws_state);
863
864 let hmac_playback_secret: Option<Arc<[u8]>> = config.hmac_playback_secret.as_ref().map(|s| Arc::from(s.as_bytes()));
872
873 let combined_router = {
874 let admin_router = if let Some(mesh) = mesh_coordinator.clone() {
875 let mesh_for_cb = mesh.clone();
876 relay.set_connection_callback(Arc::new(move |conn_id, connected| {
877 let peer_id = format!("conn-{conn_id}");
878 if connected {
879 match mesh_for_cb.add_peer(peer_id.clone(), "default".to_string(), None) {
880 Ok(a) => {
881 tracing::info!(peer = %peer_id, role = ?a.role, depth = a.depth, "mesh: peer assigned");
882 }
883 Err(e) => {
884 tracing::warn!(peer = %peer_id, error = ?e, "mesh: assign failed");
885 }
886 }
887 } else {
888 let orphans = mesh_for_cb.remove_peer(&peer_id);
889 for orphan in orphans {
890 let _ = mesh_for_cb.reassign_peer(&orphan);
891 }
892 }
893 }));
894
895 let mesh_for_reaper = mesh.clone();
896 let reaper_shutdown = shutdown.clone();
897 tokio::spawn(async move {
898 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
899 loop {
900 tokio::select! {
901 _ = interval.tick() => {
902 let dead = mesh_for_reaper.find_dead_peers();
903 for peer_id in dead {
904 tracing::info!(peer = %peer_id, "mesh: removing dead peer");
905 let orphans = mesh_for_reaper.remove_peer(&peer_id);
906 for orphan in orphans {
907 let _ = mesh_for_reaper.reassign_peer(&orphan);
908 }
909 }
910 }
911 _ = reaper_shutdown.cancelled() => {
912 tracing::debug!("mesh reaper shutting down");
913 break;
914 }
915 }
916 }
917 });
918
919 let mesh_for_signal = mesh.clone();
920 let mut signal = lvqr_signal::SignalServer::new();
921 let ice_servers_for_signal = config.mesh_ice_servers.clone();
937 let global_max_children = config.max_peers as u32;
942 signal.set_peer_callback(Arc::new(move |event: &lvqr_signal::PeerEvent<'_>| {
943 let peer_id = event.peer_id;
944 let track = event.track;
945 if event.connected {
946 if let Some(existing) = mesh_for_signal.get_peer(peer_id) {
947 tracing::debug!(
948 peer = %peer_id,
949 role = ?existing.role,
950 depth = existing.depth,
951 "mesh: signal reusing existing peer entry from WS relay"
952 );
953 return Some(lvqr_signal::SignalMessage::AssignParent {
954 peer_id: peer_id.to_string(),
955 role: format!("{:?}", existing.role),
956 parent_id: existing.parent.clone(),
957 depth: existing.depth,
958 ice_servers: ice_servers_for_signal.clone(),
959 });
960 }
961 let clamped_capacity = event.capacity.map(|c| c.min(global_max_children));
962 match mesh_for_signal.add_peer(peer_id.to_string(), track.to_string(), clamped_capacity) {
963 Ok(a) => {
964 tracing::info!(
965 peer = %peer_id,
966 role = ?a.role,
967 depth = a.depth,
968 capacity = ?clamped_capacity,
969 "mesh: signal peer assigned"
970 );
971 Some(lvqr_signal::SignalMessage::AssignParent {
972 peer_id: peer_id.to_string(),
973 role: format!("{:?}", a.role),
974 parent_id: a.parent,
975 depth: a.depth,
976 ice_servers: ice_servers_for_signal.clone(),
977 })
978 }
979 Err(e) => {
980 tracing::warn!(peer = %peer_id, error = ?e, "mesh: signal assign failed");
981 None
982 }
983 }
984 } else {
985 let orphans = mesh_for_signal.remove_peer(peer_id);
986 for orphan in orphans {
987 let _ = mesh_for_signal.reassign_peer(&orphan);
988 }
989 None
990 }
991 }));
992
993 let mesh_for_report = mesh.clone();
999 signal.set_forward_report_callback(Arc::new(move |peer_id, forwarded_frames| {
1000 mesh_for_report.record_forward_report(peer_id, forwarded_frames);
1001 }));
1002
1003 let mesh_for_admin = mesh.clone();
1004 let admin_with_mesh = admin_state.with_mesh(move || {
1005 let peers = mesh_for_admin
1010 .tree_snapshot()
1011 .into_iter()
1012 .map(|p| lvqr_admin::MeshPeerStats {
1013 peer_id: p.id.clone(),
1014 role: format!("{:?}", p.role),
1015 parent: p.parent.clone(),
1016 depth: p.depth,
1017 intended_children: p.children.len(),
1018 forwarded_frames: p.forwarded_frames,
1019 capacity: p.capacity,
1020 })
1021 .collect();
1022 lvqr_admin::MeshState {
1023 enabled: true,
1024 peer_count: mesh_for_admin.peer_count(),
1025 offload_percentage: mesh_for_admin.offload_percentage(),
1026 peers,
1027 }
1028 });
1029
1030 tracing::info!(
1031 max_children = config.max_peers,
1032 auth_gate = !config.no_auth_signal,
1033 "peer mesh enabled (/signal endpoint active)"
1034 );
1035
1036 let mut signal_router = signal.router();
1042 if !config.no_auth_signal {
1043 signal_router = signal_router.layer(from_fn_with_state(
1044 SignalAuthState { auth: auth.clone() },
1045 signal_auth_middleware,
1046 ));
1047 }
1048
1049 let router = lvqr_admin::build_router(admin_with_mesh);
1050 router.merge(signal_router)
1051 } else {
1052 lvqr_admin::build_router(admin_state)
1053 };
1054
1055 let combined = admin_router.merge(ws_router);
1056 let combined = if let Some((ref dir, ref index)) = archive_index {
1057 combined.merge(playback_router(
1058 dir.clone(),
1059 Arc::clone(index),
1060 auth.clone(),
1061 hmac_playback_secret.clone(),
1062 ))
1063 } else {
1064 combined
1065 };
1066 #[cfg(feature = "c2pa")]
1072 let combined = if let Some((ref dir, _)) = archive_index {
1073 combined.merge(verify_router(dir.clone(), auth.clone()))
1074 } else {
1075 combined
1076 };
1077 combined
1078 }
1079 .layer(CorsLayer::permissive());
1080
1081 let relay_shutdown = shutdown.clone();
1084 let rtmp_shutdown = shutdown.clone();
1085 let admin_shutdown = shutdown.clone();
1086 let hls_shutdown = shutdown.clone();
1087 let dash_shutdown = shutdown.clone();
1088 let whep_shutdown = shutdown.clone();
1089 let whip_shutdown = shutdown.clone();
1090 let bg_shutdown_for_task = shutdown.clone();
1091 let hls_router_pair =
1092 hls_listener.map(|listener| (listener, hls_server.expect("hls_server set when listener is set")));
1093 let dash_router_pair =
1094 dash_listener.map(|listener| (listener, dash_server.expect("dash_server set when listener is set")));
1095 let whep_router_pair =
1096 whep_listener.map(|listener| (listener, whep_server.expect("whep_server set when listener is set")));
1097 let whip_router_pair =
1098 whip_listener.map(|listener| (listener, whip_server.expect("whip_server set when listener is set")));
1099 let whip_bridge_keepalive = whip_bridge;
1103
1104 let relay_origin = relay.origin().clone();
1109
1110 let join = tokio::spawn(async move {
1111 let shutdown_on_exit_relay = bg_shutdown_for_task.clone();
1112 let relay_fut = async move {
1113 if let Err(e) = relay.accept_loop(&mut moq_server, relay_shutdown).await {
1114 tracing::error!(error = %e, "relay server error");
1115 }
1116 shutdown_on_exit_relay.cancel();
1117 };
1118
1119 let shutdown_on_exit_rtmp = bg_shutdown_for_task.clone();
1120 let rtmp_server_task = rtmp_server;
1121 let rtmp_fut = async move {
1122 if let Err(e) = rtmp_server_task.run_with_listener(rtmp_listener, rtmp_shutdown).await {
1123 tracing::error!(error = %e, "RTMP server error");
1124 }
1125 shutdown_on_exit_rtmp.cancel();
1126 };
1127
1128 let shutdown_on_exit_admin = bg_shutdown_for_task.clone();
1129 let admin_fut = async move {
1130 let result = axum::serve(admin_listener, combined_router)
1131 .with_graceful_shutdown(async move { admin_shutdown.cancelled().await })
1132 .await;
1133 if let Err(e) = &result {
1134 tracing::error!(error = %e, "admin server error");
1135 }
1136 shutdown_on_exit_admin.cancel();
1137 };
1138
1139 let shutdown_on_exit_hls = bg_shutdown_for_task.clone();
1140 let hls_auth = auth.clone();
1141 let hls_auth_disabled = config.no_auth_live_playback;
1142 let hls_hmac_secret = hmac_playback_secret.clone();
1146 let hls_fut = async move {
1147 let Some((listener, server)) = hls_router_pair else {
1148 return;
1149 };
1150 let mut router = server.router();
1158 if !hls_auth_disabled {
1159 let state = LivePlaybackAuthState {
1160 auth: hls_auth,
1161 entry: "hls_live",
1162 scheme: crate::signed_url::LiveScheme::Hls,
1163 hmac_secret: hls_hmac_secret,
1164 };
1165 router = router.layer(from_fn_with_state(state, live_playback_auth_middleware));
1166 }
1167 let router = router.layer(CorsLayer::permissive());
1168 let result = axum::serve(listener, router)
1169 .with_graceful_shutdown(async move { hls_shutdown.cancelled().await })
1170 .await;
1171 if let Err(e) = &result {
1172 tracing::error!(error = %e, "HLS server error");
1173 }
1174 shutdown_on_exit_hls.cancel();
1175 };
1176
1177 let shutdown_on_exit_dash = bg_shutdown_for_task.clone();
1178 let dash_auth = auth.clone();
1179 let dash_auth_disabled = config.no_auth_live_playback;
1180 let dash_hmac_secret = hmac_playback_secret.clone();
1181 let dash_fut = async move {
1182 let Some((listener, server)) = dash_router_pair else {
1183 return;
1184 };
1185 let mut router = server.router();
1186 if !dash_auth_disabled {
1187 let state = LivePlaybackAuthState {
1188 auth: dash_auth,
1189 entry: "dash_live",
1190 scheme: crate::signed_url::LiveScheme::Dash,
1191 hmac_secret: dash_hmac_secret,
1192 };
1193 router = router.layer(from_fn_with_state(state, live_playback_auth_middleware));
1194 }
1195 let router = router.layer(CorsLayer::permissive());
1196 let result = axum::serve(listener, router)
1197 .with_graceful_shutdown(async move { dash_shutdown.cancelled().await })
1198 .await;
1199 if let Err(e) = &result {
1200 tracing::error!(error = %e, "DASH server error");
1201 }
1202 shutdown_on_exit_dash.cancel();
1203 };
1204
1205 let shutdown_on_exit_whep = bg_shutdown_for_task.clone();
1206 let whep_fut = async move {
1207 let Some((listener, server)) = whep_router_pair else {
1208 return;
1209 };
1210 let router = lvqr_whep::router_for(server);
1211 let result = axum::serve(listener, router)
1212 .with_graceful_shutdown(async move { whep_shutdown.cancelled().await })
1213 .await;
1214 if let Err(e) = &result {
1215 tracing::error!(error = %e, "WHEP server error");
1216 }
1217 shutdown_on_exit_whep.cancel();
1218 };
1219
1220 let shutdown_on_exit_whip = bg_shutdown_for_task.clone();
1221 let whip_fut = async move {
1222 let Some((listener, server)) = whip_router_pair else {
1223 return;
1224 };
1225 let router = lvqr_whip::router_for(server);
1226 let result = axum::serve(listener, router)
1227 .with_graceful_shutdown(async move { whip_shutdown.cancelled().await })
1228 .await;
1229 if let Err(e) = &result {
1230 tracing::error!(error = %e, "WHIP server error");
1231 }
1232 shutdown_on_exit_whip.cancel();
1233 };
1234
1235 let srt_shutdown = bg_shutdown_for_task.clone();
1236 let srt_events = srt_events_clone;
1237 let srt_cancel = srt_shutdown_token;
1238 let srt_fut = async move {
1239 let Some(server) = srt_server else { return };
1240 if let Err(e) = server.run(srt_events, srt_cancel).await {
1241 tracing::error!(error = %e, "SRT server error");
1242 }
1243 srt_shutdown.cancel();
1244 };
1245
1246 let rtsp_shutdown = bg_shutdown_for_task.clone();
1247 let rtsp_events = rtsp_events_clone;
1248 let rtsp_cancel = rtsp_shutdown_token;
1249 let rtsp_fut = async move {
1250 let Some(server) = rtsp_server else { return };
1251 if let Err(e) = server.run(rtsp_events, rtsp_cancel).await {
1252 tracing::error!(error = %e, "RTSP server error");
1253 }
1254 rtsp_shutdown.cancel();
1255 };
1256
1257 let _ = tokio::join!(
1258 relay_fut, rtmp_fut, admin_fut, hls_fut, dash_fut, whep_fut, whip_fut, srt_fut, rtsp_fut
1259 );
1260 drop(whip_bridge_keepalive);
1261 tracing::info!("shutdown complete");
1262 });
1263
1264 Ok(ServerHandle {
1265 relay_addr: relay_bound,
1266 rtmp_addr: rtmp_bound,
1267 admin_addr: admin_bound,
1268 hls_addr: hls_bound,
1269 whep_addr: whep_bound,
1270 whip_addr: whip_bound,
1271 dash_addr: dash_bound,
1272 rtsp_addr: rtsp_bound,
1273 srt_addr: srt_bound,
1274 shutdown,
1275 join: Some(join),
1276 #[cfg(feature = "cluster")]
1277 cluster,
1278 wasm_filter: wasm_filter_handle,
1279 _wasm_reloaders: wasm_reloader_handles,
1280 #[cfg(feature = "whisper")]
1281 agent_runner: agent_runner_handle,
1282 #[cfg(feature = "transcode")]
1283 transcode_runner: transcode_runner_handle,
1284 slo: slo_tracker,
1285 mesh_coordinator,
1286 fragment_registry: shared_registry,
1287 origin: relay_origin,
1288 #[cfg(feature = "cluster")]
1289 federation_runner: federation_runner_handle,
1290 })
1291}
1292
1293