1mod archive;
15#[cfg(feature = "cluster")]
16pub mod cluster_claim;
17mod hls;
18
19use anyhow::Result;
20use axum::extract::ws::{Message, WebSocket};
21use axum::extract::{Path, Query, State, WebSocketUpgrade};
22use axum::http::{HeaderMap, StatusCode};
23use axum::response::{IntoResponse, Response};
24use axum::routing::get;
25use bytes::Bytes;
26use lvqr_auth::{AuthContext, AuthDecision, NoopAuthProvider, SharedAuth, extract};
27use lvqr_core::{EventBus, RelayEvent};
28use lvqr_dash::{BroadcasterDashBridge, DashConfig};
29use lvqr_fragment::FragmentBroadcasterRegistry;
30use lvqr_hls::{MultiHlsServer, PlaylistBuilderConfig};
31use lvqr_moq::Track;
32use std::collections::HashMap;
33use std::net::SocketAddr;
34use std::path::PathBuf;
35use std::sync::Arc;
36use std::sync::atomic::Ordering;
37use tokio_util::sync::CancellationToken;
38use tower_http::cors::CorsLayer;
39
40#[cfg(feature = "c2pa")]
41use crate::archive::verify_router;
42use crate::archive::{BroadcasterArchiveIndexer, playback_router};
43use crate::hls::BroadcasterHlsBridge;
44
45#[derive(Clone)]
50pub struct ServeConfig {
51 pub relay_addr: SocketAddr,
53 pub rtmp_addr: SocketAddr,
55 pub admin_addr: SocketAddr,
57 pub hls_addr: Option<SocketAddr>,
63 pub hls_dvr_window_secs: u32,
68 pub hls_target_duration_secs: u32,
72 pub hls_part_target_ms: u32,
76 pub whep_addr: Option<SocketAddr>,
85 pub whip_addr: Option<SocketAddr>,
92 pub dash_addr: Option<SocketAddr>,
99 pub rtsp_addr: Option<SocketAddr>,
104 pub srt_addr: Option<SocketAddr>,
109 pub mesh_enabled: bool,
111 pub max_peers: usize,
113 pub auth: Option<SharedAuth>,
115 pub record_dir: Option<PathBuf>,
117 pub archive_dir: Option<PathBuf>,
125 #[cfg(feature = "c2pa")]
137 pub c2pa: Option<lvqr_archive::provenance::C2paConfig>,
138 pub wasm_filter: Option<PathBuf>,
149 pub install_prometheus: bool,
153 pub otel_metrics_recorder: Option<lvqr_observability::OtelMetricsRecorder>,
162 pub tls_cert: Option<PathBuf>,
165 pub tls_key: Option<PathBuf>,
167 pub cluster_listen: Option<SocketAddr>,
178 pub cluster_seeds: Vec<String>,
182 pub cluster_node_id: Option<String>,
185 pub cluster_id: Option<String>,
190 pub cluster_advertise_hls: Option<String>,
197 pub cluster_advertise_dash: Option<String>,
203 pub cluster_advertise_rtsp: Option<String>,
208}
209
210impl ServeConfig {
211 pub fn loopback_ephemeral() -> Self {
214 let loopback: std::net::IpAddr = std::net::Ipv4Addr::LOCALHOST.into();
215 Self {
216 relay_addr: (loopback, 0).into(),
217 rtmp_addr: (loopback, 0).into(),
218 admin_addr: (loopback, 0).into(),
219 hls_addr: Some((loopback, 0).into()),
220 hls_dvr_window_secs: 120,
221 hls_target_duration_secs: 2,
222 hls_part_target_ms: 200,
223 whep_addr: None,
224 whip_addr: None,
225 dash_addr: None,
226 rtsp_addr: None,
227 srt_addr: None,
228 mesh_enabled: false,
229 max_peers: 3,
230 auth: None,
231 record_dir: None,
232 archive_dir: None,
233 #[cfg(feature = "c2pa")]
234 c2pa: None,
235 wasm_filter: None,
236 install_prometheus: false,
237 otel_metrics_recorder: None,
238 tls_cert: None,
239 tls_key: None,
240 cluster_listen: None,
241 cluster_seeds: Vec::new(),
242 cluster_node_id: None,
243 cluster_id: None,
244 cluster_advertise_hls: None,
245 cluster_advertise_dash: None,
246 cluster_advertise_rtsp: None,
247 }
248 }
249}
250
251pub struct ServerHandle {
258 relay_addr: SocketAddr,
259 rtmp_addr: SocketAddr,
260 admin_addr: SocketAddr,
261 hls_addr: Option<SocketAddr>,
262 whep_addr: Option<SocketAddr>,
263 whip_addr: Option<SocketAddr>,
264 dash_addr: Option<SocketAddr>,
265 rtsp_addr: Option<SocketAddr>,
266 srt_addr: Option<SocketAddr>,
267 shutdown: CancellationToken,
268 join: Option<tokio::task::JoinHandle<()>>,
269 #[cfg(feature = "cluster")]
274 cluster: Option<std::sync::Arc<lvqr_cluster::Cluster>>,
275 wasm_filter: Option<lvqr_wasm::WasmFilterBridgeHandle>,
280 _wasm_reloader: Option<lvqr_wasm::WasmFilterReloader>,
289}
290
291impl ServerHandle {
292 pub fn relay_addr(&self) -> SocketAddr {
294 self.relay_addr
295 }
296
297 pub fn rtmp_addr(&self) -> SocketAddr {
299 self.rtmp_addr
300 }
301
302 pub fn admin_addr(&self) -> SocketAddr {
304 self.admin_addr
305 }
306
307 pub fn hls_addr(&self) -> Option<SocketAddr> {
309 self.hls_addr
310 }
311
312 pub fn whep_addr(&self) -> Option<SocketAddr> {
314 self.whep_addr
315 }
316
317 pub fn whip_addr(&self) -> Option<SocketAddr> {
319 self.whip_addr
320 }
321
322 pub fn dash_addr(&self) -> Option<SocketAddr> {
324 self.dash_addr
325 }
326
327 pub fn rtsp_addr(&self) -> Option<SocketAddr> {
329 self.rtsp_addr
330 }
331
332 pub fn srt_addr(&self) -> Option<SocketAddr> {
334 self.srt_addr
335 }
336
337 #[cfg(feature = "cluster")]
344 pub fn cluster(&self) -> Option<&std::sync::Arc<lvqr_cluster::Cluster>> {
345 self.cluster.as_ref()
346 }
347
348 pub fn dash_url(&self, path: &str) -> Option<String> {
352 let addr = self.dash_addr?;
353 let path = if path.starts_with('/') {
354 path.to_string()
355 } else {
356 format!("/{path}")
357 };
358 Some(format!("http://{addr}{path}"))
359 }
360
361 pub fn http_base(&self) -> String {
363 format!("http://{}", self.admin_addr)
364 }
365
366 pub fn hls_url(&self, path: &str) -> Option<String> {
370 let addr = self.hls_addr?;
371 let path = if path.starts_with('/') {
372 path.to_string()
373 } else {
374 format!("/{path}")
375 };
376 Some(format!("http://{addr}{path}"))
377 }
378
379 pub fn ws_url(&self, broadcast: &str) -> String {
381 format!("ws://{}/ws/{broadcast}", self.admin_addr)
382 }
383
384 pub fn ws_ingest_url(&self, broadcast: &str) -> String {
386 format!("ws://{}/ingest/{broadcast}", self.admin_addr)
387 }
388
389 pub fn rtmp_url(&self, app: &str, stream_key: &str) -> String {
391 format!("rtmp://{}/{app}/{stream_key}", self.rtmp_addr)
392 }
393
394 pub fn wasm_filter(&self) -> Option<&lvqr_wasm::WasmFilterBridgeHandle> {
399 self.wasm_filter.as_ref()
400 }
401
402 pub async fn shutdown(mut self) -> Result<()> {
404 self.shutdown.cancel();
405 if let Some(join) = self.join.take()
406 && let Err(e) = join.await
407 && !e.is_cancelled()
408 {
409 return Err(anyhow::anyhow!("server task panicked: {e}"));
410 }
411 Ok(())
412 }
413}
414
415impl Drop for ServerHandle {
416 fn drop(&mut self) {
417 self.shutdown.cancel();
422 if let Some(join) = self.join.take() {
423 join.abort();
424 }
425 }
426}
427
428pub async fn start(config: ServeConfig) -> Result<ServerHandle> {
436 tracing::info!(
437 relay = %config.relay_addr,
438 rtmp = %config.rtmp_addr,
439 admin = %config.admin_addr,
440 mesh = config.mesh_enabled,
441 "starting LVQR server"
442 );
443
444 let prom_handle = match (config.install_prometheus, config.otel_metrics_recorder.clone()) {
457 (true, Some(otel_recorder)) => {
458 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
459 let handle = prom_recorder.handle();
460 let fanout = metrics_util::layers::FanoutBuilder::default()
461 .add_recorder(prom_recorder)
462 .add_recorder(otel_recorder)
463 .build();
464 metrics::set_global_recorder(fanout)
465 .map_err(|e| anyhow::anyhow!("failed to install metrics fanout recorder: {e}"))?;
466 Some(handle)
467 }
468 (true, None) => Some(
469 metrics_exporter_prometheus::PrometheusBuilder::new()
470 .install_recorder()
471 .map_err(|e| anyhow::anyhow!("failed to install Prometheus recorder: {e}"))?,
472 ),
473 (false, Some(otel_recorder)) => {
474 metrics::set_global_recorder(otel_recorder)
475 .map_err(|e| anyhow::anyhow!("failed to install OTLP metrics recorder: {e}"))?;
476 None
477 }
478 (false, None) => None,
479 };
480
481 let shutdown = CancellationToken::new();
482
483 let auth: SharedAuth = config
485 .auth
486 .clone()
487 .unwrap_or_else(|| Arc::new(NoopAuthProvider) as SharedAuth);
488
489 let events = EventBus::default();
492
493 let relay_config = lvqr_relay::RelayConfig::new(config.relay_addr);
496 let mut relay = lvqr_relay::RelayServer::new(relay_config);
497 relay.set_auth_provider(auth.clone());
498 let (mut moq_server, relay_bound) = relay.init_server()?;
499 tracing::info!(addr = %relay_bound, "MoQ relay bound");
500
501 #[cfg(feature = "cluster")]
505 let cluster = if let Some(listen) = config.cluster_listen {
506 let ccfg = lvqr_cluster::ClusterConfig {
507 listen,
508 seeds: config.cluster_seeds.clone(),
509 node_id: config.cluster_node_id.clone().map(lvqr_cluster::NodeId::new),
510 cluster_id: config
511 .cluster_id
512 .clone()
513 .unwrap_or_else(|| lvqr_cluster::ClusterConfig::default().cluster_id),
514 ..lvqr_cluster::ClusterConfig::default()
515 };
516 let c = lvqr_cluster::Cluster::bootstrap(ccfg)
517 .await
518 .map_err(|e| anyhow::anyhow!("cluster bootstrap failed: {e}"))?;
519 let c = std::sync::Arc::new(c);
520 if config.cluster_advertise_hls.is_some()
521 || config.cluster_advertise_dash.is_some()
522 || config.cluster_advertise_rtsp.is_some()
523 {
524 let endpoints = lvqr_cluster::NodeEndpoints {
525 hls: config.cluster_advertise_hls.clone(),
526 dash: config.cluster_advertise_dash.clone(),
527 rtsp: config.cluster_advertise_rtsp.clone(),
528 };
529 c.set_endpoints(&endpoints)
530 .await
531 .map_err(|e| anyhow::anyhow!("cluster set_endpoints failed: {e}"))?;
532 }
533 tracing::info!(
534 node = %c.self_id(),
535 %listen,
536 advertise_hls = ?config.cluster_advertise_hls,
537 advertise_dash = ?config.cluster_advertise_dash,
538 advertise_rtsp = ?config.cluster_advertise_rtsp,
539 "cluster enabled"
540 );
541 Some(c)
542 } else {
543 None
544 };
545 #[cfg(feature = "cluster")]
546 let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = cluster.as_ref().map(|c| {
547 let c = c.clone();
548 let resolver: lvqr_hls::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
549 let c = c.clone();
550 Box::pin(async move {
551 let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
552 endpoints.hls
553 })
554 });
555 resolver
556 });
557 #[cfg(not(feature = "cluster"))]
558 let hls_owner_resolver: Option<lvqr_hls::OwnerResolver> = None;
559 #[cfg(feature = "cluster")]
560 let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = cluster.as_ref().map(|c| {
561 let c = c.clone();
562 let resolver: lvqr_dash::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
563 let c = c.clone();
564 Box::pin(async move {
565 let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
566 endpoints.dash
567 })
568 });
569 resolver
570 });
571 #[cfg(not(feature = "cluster"))]
572 let dash_owner_resolver: Option<lvqr_dash::OwnerResolver> = None;
573 #[cfg(feature = "cluster")]
574 let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = cluster.as_ref().map(|c| {
575 let c = c.clone();
576 let resolver: lvqr_rtsp::OwnerResolver = std::sync::Arc::new(move |broadcast: String| {
577 let c = c.clone();
578 Box::pin(async move {
579 let (_, endpoints) = c.find_owner_endpoints(&broadcast).await?;
580 endpoints.rtsp
581 })
582 });
583 resolver
584 });
585 #[cfg(not(feature = "cluster"))]
586 let rtsp_owner_resolver: Option<lvqr_rtsp::OwnerResolver> = None;
587
588 let target_dur = config.hls_target_duration_secs;
599 let part_target_secs = config.hls_part_target_ms as f32 / 1000.0;
600 let max_segments = if config.hls_dvr_window_secs == 0 || target_dur == 0 {
601 None
602 } else {
603 Some((config.hls_dvr_window_secs / target_dur) as usize)
604 };
605 let hls_server = config.hls_addr.map(|_| {
606 let playlist_cfg = PlaylistBuilderConfig {
607 target_duration_secs: target_dur,
608 part_target_secs,
609 max_segments,
610 ..PlaylistBuilderConfig::default()
611 };
612 match hls_owner_resolver.clone() {
613 Some(r) => MultiHlsServer::with_owner_resolver(playlist_cfg, r),
614 None => MultiHlsServer::new(playlist_cfg),
615 }
616 });
617
618 let dash_server = config.dash_addr.map(|_| match dash_owner_resolver.clone() {
625 Some(r) => lvqr_dash::MultiDashServer::with_owner_resolver(DashConfig::default(), r),
626 None => lvqr_dash::MultiDashServer::new(DashConfig::default()),
627 });
628
629 let shared_registry = FragmentBroadcasterRegistry::new();
635
636 #[cfg(feature = "cluster")]
643 if let Some(ref c) = cluster {
644 cluster_claim::install_cluster_claim_bridge(c.clone(), cluster_claim::DEFAULT_CLAIM_LEASE, &shared_registry);
645 }
646
647 let (wasm_filter_handle, wasm_reloader_handle) = if let Some(ref path) = config.wasm_filter {
655 let filter = lvqr_wasm::WasmFilter::load(path)
656 .map_err(|e| anyhow::anyhow!("WASM filter load at {} failed: {e}", path.display()))?;
657 tracing::info!(path = %path.display(), "WASM fragment filter loaded");
658 let shared = lvqr_wasm::SharedFilter::new(filter);
659 let bridge = lvqr_wasm::install_wasm_filter_bridge(&shared_registry, shared.clone());
660 let reloader = lvqr_wasm::WasmFilterReloader::spawn(path, shared)
661 .map_err(|e| anyhow::anyhow!("WASM filter hot-reload watcher at {} failed: {e}", path.display()))?;
662 (Some(bridge), Some(reloader))
663 } else {
664 (None, None)
665 };
666
667 let mut bridge_builder = lvqr_ingest::RtmpMoqBridge::with_auth(relay.origin().clone(), auth.clone())
670 .with_events(events.clone())
671 .with_registry(shared_registry.clone());
672
673 let archive_index = if let Some(ref dir) = config.archive_dir {
679 std::fs::create_dir_all(dir)
680 .map_err(|e| anyhow::anyhow!("archive: failed to create {}: {e}", dir.display()))?;
681 let db_path = dir.join("archive.redb");
682 let index = lvqr_archive::RedbSegmentIndex::open(&db_path)
683 .map_err(|e| anyhow::anyhow!("archive: failed to open {}: {e}", db_path.display()))?;
684 tracing::info!(dir = %dir.display(), "DVR archive index enabled");
685 Some((dir.clone(), Arc::new(index)))
686 } else {
687 None
688 };
689
690 if let Some((ref dir, ref index)) = archive_index {
694 #[cfg(feature = "c2pa")]
695 BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry, config.c2pa.clone());
696 #[cfg(not(feature = "c2pa"))]
697 BroadcasterArchiveIndexer::install(dir.clone(), Arc::clone(index), &shared_registry);
698 }
699
700 if let Some(ref hls) = hls_server {
708 BroadcasterHlsBridge::install(
709 hls.clone(),
710 config.hls_target_duration_secs * 1000,
711 config.hls_part_target_ms,
712 &shared_registry,
713 );
714 }
715
716 if let Some(ref dash) = dash_server {
722 BroadcasterDashBridge::install(dash.clone(), &shared_registry);
723 }
724
725 let whep_server = if let Some(addr) = config.whep_addr {
732 let str0m_cfg = lvqr_whep::Str0mConfig { host_ip: addr.ip() };
733 let answerer = Arc::new(lvqr_whep::Str0mAnswerer::new(str0m_cfg)) as Arc<dyn lvqr_whep::SdpAnswerer>;
734 let server = lvqr_whep::WhepServer::new(answerer);
735 let observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
736 bridge_builder = bridge_builder.with_raw_sample_observer(observer);
737 Some(server)
738 } else {
739 None
740 };
741
742 let (whip_server, whip_bridge) = if let Some(addr) = config.whip_addr {
748 let mut whip_bridge = lvqr_whip::WhipMoqBridge::new(relay.origin().clone())
749 .with_events(events.clone())
750 .with_registry(shared_registry.clone());
751 if let Some(ref server) = whep_server {
752 let raw_observer: lvqr_ingest::SharedRawSampleObserver = Arc::new(server.clone());
753 whip_bridge = whip_bridge.with_raw_sample_observer(raw_observer);
754 }
755 let whip_bridge_arc = Arc::new(whip_bridge);
756 let sink = whip_bridge_arc.clone() as Arc<dyn lvqr_whip::IngestSampleSink>;
757 let str0m_cfg = lvqr_whip::Str0mIngestConfig { host_ip: addr.ip() };
758 let answerer =
759 Arc::new(lvqr_whip::Str0mIngestAnswerer::new(str0m_cfg, sink)) as Arc<dyn lvqr_whip::SdpAnswerer>;
760 let server = lvqr_whip::WhipServer::with_auth_provider(answerer, auth.clone());
761 (Some(server), Some(whip_bridge_arc))
762 } else {
763 (None, None)
764 };
765
766 let (srt_server, srt_bound) = if let Some(addr) = config.srt_addr {
770 let mut server =
771 lvqr_srt::SrtIngestServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
772 let bound = server.bind().await?;
773 tracing::info!(addr = %bound, "SRT ingest bound");
774 (Some(server), Some(bound))
775 } else {
776 (None, None)
777 };
778 let srt_events_clone = events.clone();
779 let srt_shutdown_token = shutdown.clone();
780
781 let (rtsp_server, rtsp_bound) = if let Some(addr) = config.rtsp_addr {
786 let mut server = lvqr_rtsp::RtspServer::with_registry(addr, shared_registry.clone()).with_auth(auth.clone());
787 if let Some(r) = rtsp_owner_resolver.clone() {
788 server = server.with_owner_resolver(r);
789 }
790 let bound = server.bind().await?;
791 tracing::info!(addr = %bound, "RTSP ingest bound");
792 (Some(server), Some(bound))
793 } else {
794 (None, None)
795 };
796 let rtsp_events_clone = events.clone();
797 let rtsp_shutdown_token = shutdown.clone();
798
799 let bridge = Arc::new(bridge_builder);
800 let rtmp_config = lvqr_ingest::RtmpConfig {
801 bind_addr: config.rtmp_addr,
802 };
803 let rtmp_server = bridge.create_rtmp_server(rtmp_config);
804 let rtmp_listener = tokio::net::TcpListener::bind(config.rtmp_addr).await?;
805 let rtmp_bound = rtmp_listener.local_addr()?;
806 tracing::info!(addr = %rtmp_bound, "RTMP ingest bound");
807
808 let admin_listener = tokio::net::TcpListener::bind(config.admin_addr).await?;
810 let admin_bound = admin_listener.local_addr()?;
811 tracing::info!(addr = %admin_bound, "admin HTTP bound");
812
813 let (hls_listener, hls_bound) = if let Some(addr) = config.hls_addr {
817 let listener = tokio::net::TcpListener::bind(addr).await?;
818 let bound = listener.local_addr()?;
819 tracing::info!(addr = %bound, "LL-HLS HTTP bound");
820 (Some(listener), Some(bound))
821 } else {
822 (None, None)
823 };
824
825 let (whep_listener, whep_bound) = if let Some(addr) = config.whep_addr {
829 let listener = tokio::net::TcpListener::bind(addr).await?;
830 let bound = listener.local_addr()?;
831 tracing::info!(addr = %bound, "WHEP HTTP bound");
832 (Some(listener), Some(bound))
833 } else {
834 (None, None)
835 };
836
837 let (dash_listener, dash_bound) = if let Some(addr) = config.dash_addr {
841 let listener = tokio::net::TcpListener::bind(addr).await?;
842 let bound = listener.local_addr()?;
843 tracing::info!(addr = %bound, "MPEG-DASH HTTP bound");
844 (Some(listener), Some(bound))
845 } else {
846 (None, None)
847 };
848
849 let (whip_listener, whip_bound) = if let Some(addr) = config.whip_addr {
854 let listener = tokio::net::TcpListener::bind(addr).await?;
855 let bound = listener.local_addr()?;
856 tracing::info!(addr = %bound, "WHIP HTTP bound");
857 (Some(listener), Some(bound))
858 } else {
859 (None, None)
860 };
861
862 if let Some(ref dir) = config.record_dir {
864 let recorder = lvqr_record::BroadcastRecorder::new(dir);
865 let origin = relay.origin().clone();
866 let event_rx = events.subscribe();
867 let record_shutdown = shutdown.clone();
868 tracing::info!(dir = %dir.display(), "recording enabled");
869 tokio::spawn(async move {
870 spawn_recordings(recorder, origin, event_rx, record_shutdown).await;
871 });
872 }
873
874 if let Some(ref hls) = hls_server {
879 let hls_for_finalize = hls.clone();
880 let mut hls_event_rx = events.subscribe();
881 let hls_finalize_shutdown = shutdown.clone();
882 tokio::spawn(async move {
883 loop {
884 tokio::select! {
885 _ = hls_finalize_shutdown.cancelled() => break,
886 msg = hls_event_rx.recv() => {
887 match msg {
888 Ok(RelayEvent::BroadcastStopped { name }) => {
889 tracing::info!(broadcast = %name, "finalizing HLS broadcast");
890 hls_for_finalize.finalize_broadcast(&name).await;
891 }
892 Ok(_) => {}
893 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
894 tracing::warn!(missed = n, "HLS finalize subscriber lagged");
895 }
896 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
897 }
898 }
899 }
900 }
901 });
902 }
903
904 if let Some(ref dash) = dash_server {
908 let dash_for_finalize = dash.clone();
909 let mut dash_event_rx = events.subscribe();
910 let dash_finalize_shutdown = shutdown.clone();
911 tokio::spawn(async move {
912 loop {
913 tokio::select! {
914 _ = dash_finalize_shutdown.cancelled() => break,
915 msg = dash_event_rx.recv() => {
916 match msg {
917 Ok(RelayEvent::BroadcastStopped { name }) => {
918 tracing::info!(broadcast = %name, "finalizing DASH broadcast");
919 dash_for_finalize.finalize_broadcast(&name);
920 }
921 Ok(_) => {}
922 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
923 tracing::warn!(missed = n, "DASH finalize subscriber lagged");
924 }
925 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
926 }
927 }
928 }
929 }
930 });
931 }
932
933 let metrics_state = relay.metrics().clone();
935 let bridge_for_stats = bridge.clone();
936 let bridge_for_streams = bridge.clone();
937
938 let admin_state = lvqr_admin::AdminState::new(
939 move || {
940 let active = bridge_for_stats.active_stream_count() as u64;
941 lvqr_core::RelayStats {
942 publishers: active,
943 tracks: active * 2,
944 subscribers: metrics_state.connections_active.load(Ordering::Relaxed),
945 bytes_received: 0,
946 bytes_sent: 0,
947 uptime_secs: 0,
948 }
949 },
950 move || {
951 bridge_for_streams
952 .stream_names()
953 .into_iter()
954 .map(|name| lvqr_admin::StreamInfo { name, subscribers: 0 })
955 .collect()
956 },
957 )
958 .with_auth(auth.clone());
959 let admin_state = if let Some(prom) = prom_handle {
960 admin_state.with_metrics(Arc::new(move || prom.render()))
961 } else {
962 admin_state
963 };
964 #[cfg(feature = "cluster")]
968 let admin_state = match cluster.as_ref() {
969 Some(c) => admin_state.with_cluster(c.clone()),
970 None => admin_state,
971 };
972
973 let ws_state = WsRelayState {
975 origin: relay.origin().clone(),
976 init_segments: Arc::new(dashmap::DashMap::new()),
977 auth: auth.clone(),
978 events: events.clone(),
979 };
980 let ws_router = axum::Router::new()
981 .route("/ws/{*broadcast}", get(ws_relay_handler))
982 .route("/ingest/{*broadcast}", get(ws_ingest_handler))
983 .with_state(ws_state);
984
985 let combined_router = {
986 let admin_router = if config.mesh_enabled {
987 let mesh_config = lvqr_mesh::MeshConfig {
988 max_children: config.max_peers,
989 ..Default::default()
990 };
991 let mesh = Arc::new(lvqr_mesh::MeshCoordinator::new(mesh_config));
992
993 let mesh_for_cb = mesh.clone();
994 relay.set_connection_callback(Arc::new(move |conn_id, connected| {
995 let peer_id = format!("conn-{conn_id}");
996 if connected {
997 match mesh_for_cb.add_peer(peer_id.clone(), "default".to_string()) {
998 Ok(a) => {
999 tracing::info!(peer = %peer_id, role = ?a.role, depth = a.depth, "mesh: peer assigned");
1000 }
1001 Err(e) => {
1002 tracing::warn!(peer = %peer_id, error = ?e, "mesh: assign failed");
1003 }
1004 }
1005 } else {
1006 let orphans = mesh_for_cb.remove_peer(&peer_id);
1007 for orphan in orphans {
1008 let _ = mesh_for_cb.reassign_peer(&orphan);
1009 }
1010 }
1011 }));
1012
1013 let mesh_for_reaper = mesh.clone();
1014 let reaper_shutdown = shutdown.clone();
1015 tokio::spawn(async move {
1016 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1017 loop {
1018 tokio::select! {
1019 _ = interval.tick() => {
1020 let dead = mesh_for_reaper.find_dead_peers();
1021 for peer_id in dead {
1022 tracing::info!(peer = %peer_id, "mesh: removing dead peer");
1023 let orphans = mesh_for_reaper.remove_peer(&peer_id);
1024 for orphan in orphans {
1025 let _ = mesh_for_reaper.reassign_peer(&orphan);
1026 }
1027 }
1028 }
1029 _ = reaper_shutdown.cancelled() => {
1030 tracing::debug!("mesh reaper shutting down");
1031 break;
1032 }
1033 }
1034 }
1035 });
1036
1037 let mesh_for_signal = mesh.clone();
1038 let mut signal = lvqr_signal::SignalServer::new();
1039 signal.set_peer_callback(Arc::new(move |peer_id, track, connected| {
1040 if connected {
1041 match mesh_for_signal.add_peer(peer_id.to_string(), track.to_string()) {
1042 Ok(a) => {
1043 tracing::info!(peer = %peer_id, role = ?a.role, depth = a.depth, "mesh: signal peer assigned");
1044 Some(lvqr_signal::SignalMessage::AssignParent {
1045 peer_id: peer_id.to_string(),
1046 role: format!("{:?}", a.role),
1047 parent_id: a.parent,
1048 depth: a.depth,
1049 })
1050 }
1051 Err(e) => {
1052 tracing::warn!(peer = %peer_id, error = ?e, "mesh: signal assign failed");
1053 None
1054 }
1055 }
1056 } else {
1057 let orphans = mesh_for_signal.remove_peer(peer_id);
1058 for orphan in orphans {
1059 let _ = mesh_for_signal.reassign_peer(&orphan);
1060 }
1061 None
1062 }
1063 }));
1064
1065 let mesh_for_admin = mesh.clone();
1066 let admin_with_mesh = admin_state.with_mesh(move || lvqr_admin::MeshState {
1067 enabled: true,
1068 peer_count: mesh_for_admin.peer_count(),
1069 offload_percentage: mesh_for_admin.offload_percentage(),
1070 });
1071
1072 tracing::info!(
1073 max_children = config.max_peers,
1074 "peer mesh enabled (/signal endpoint active)"
1075 );
1076
1077 let router = lvqr_admin::build_router(admin_with_mesh);
1078 router.merge(signal.router())
1079 } else {
1080 lvqr_admin::build_router(admin_state)
1081 };
1082
1083 let combined = admin_router.merge(ws_router);
1084 let combined = if let Some((ref dir, ref index)) = archive_index {
1085 combined.merge(playback_router(dir.clone(), Arc::clone(index), auth.clone()))
1086 } else {
1087 combined
1088 };
1089 #[cfg(feature = "c2pa")]
1095 let combined = if let Some((ref dir, _)) = archive_index {
1096 combined.merge(verify_router(dir.clone(), auth.clone()))
1097 } else {
1098 combined
1099 };
1100 combined
1101 }
1102 .layer(CorsLayer::permissive());
1103
1104 let relay_shutdown = shutdown.clone();
1107 let rtmp_shutdown = shutdown.clone();
1108 let admin_shutdown = shutdown.clone();
1109 let hls_shutdown = shutdown.clone();
1110 let dash_shutdown = shutdown.clone();
1111 let whep_shutdown = shutdown.clone();
1112 let whip_shutdown = shutdown.clone();
1113 let bg_shutdown_for_task = shutdown.clone();
1114 let hls_router_pair =
1115 hls_listener.map(|listener| (listener, hls_server.expect("hls_server set when listener is set")));
1116 let dash_router_pair =
1117 dash_listener.map(|listener| (listener, dash_server.expect("dash_server set when listener is set")));
1118 let whep_router_pair =
1119 whep_listener.map(|listener| (listener, whep_server.expect("whep_server set when listener is set")));
1120 let whip_router_pair =
1121 whip_listener.map(|listener| (listener, whip_server.expect("whip_server set when listener is set")));
1122 let whip_bridge_keepalive = whip_bridge;
1126
1127 let join = tokio::spawn(async move {
1128 let shutdown_on_exit_relay = bg_shutdown_for_task.clone();
1129 let relay_fut = async move {
1130 if let Err(e) = relay.accept_loop(&mut moq_server, relay_shutdown).await {
1131 tracing::error!(error = %e, "relay server error");
1132 }
1133 shutdown_on_exit_relay.cancel();
1134 };
1135
1136 let shutdown_on_exit_rtmp = bg_shutdown_for_task.clone();
1137 let rtmp_server_task = rtmp_server;
1138 let rtmp_fut = async move {
1139 if let Err(e) = rtmp_server_task.run_with_listener(rtmp_listener, rtmp_shutdown).await {
1140 tracing::error!(error = %e, "RTMP server error");
1141 }
1142 shutdown_on_exit_rtmp.cancel();
1143 };
1144
1145 let shutdown_on_exit_admin = bg_shutdown_for_task.clone();
1146 let admin_fut = async move {
1147 let result = axum::serve(admin_listener, combined_router)
1148 .with_graceful_shutdown(async move { admin_shutdown.cancelled().await })
1149 .await;
1150 if let Err(e) = &result {
1151 tracing::error!(error = %e, "admin server error");
1152 }
1153 shutdown_on_exit_admin.cancel();
1154 };
1155
1156 let shutdown_on_exit_hls = bg_shutdown_for_task.clone();
1157 let hls_fut = async move {
1158 let Some((listener, server)) = hls_router_pair else {
1159 return;
1160 };
1161 let router = server.router().layer(CorsLayer::permissive());
1162 let result = axum::serve(listener, router)
1163 .with_graceful_shutdown(async move { hls_shutdown.cancelled().await })
1164 .await;
1165 if let Err(e) = &result {
1166 tracing::error!(error = %e, "HLS server error");
1167 }
1168 shutdown_on_exit_hls.cancel();
1169 };
1170
1171 let shutdown_on_exit_dash = bg_shutdown_for_task.clone();
1172 let dash_fut = async move {
1173 let Some((listener, server)) = dash_router_pair else {
1174 return;
1175 };
1176 let router = server.router().layer(CorsLayer::permissive());
1177 let result = axum::serve(listener, router)
1178 .with_graceful_shutdown(async move { dash_shutdown.cancelled().await })
1179 .await;
1180 if let Err(e) = &result {
1181 tracing::error!(error = %e, "DASH server error");
1182 }
1183 shutdown_on_exit_dash.cancel();
1184 };
1185
1186 let shutdown_on_exit_whep = bg_shutdown_for_task.clone();
1187 let whep_fut = async move {
1188 let Some((listener, server)) = whep_router_pair else {
1189 return;
1190 };
1191 let router = lvqr_whep::router_for(server);
1192 let result = axum::serve(listener, router)
1193 .with_graceful_shutdown(async move { whep_shutdown.cancelled().await })
1194 .await;
1195 if let Err(e) = &result {
1196 tracing::error!(error = %e, "WHEP server error");
1197 }
1198 shutdown_on_exit_whep.cancel();
1199 };
1200
1201 let shutdown_on_exit_whip = bg_shutdown_for_task.clone();
1202 let whip_fut = async move {
1203 let Some((listener, server)) = whip_router_pair else {
1204 return;
1205 };
1206 let router = lvqr_whip::router_for(server);
1207 let result = axum::serve(listener, router)
1208 .with_graceful_shutdown(async move { whip_shutdown.cancelled().await })
1209 .await;
1210 if let Err(e) = &result {
1211 tracing::error!(error = %e, "WHIP server error");
1212 }
1213 shutdown_on_exit_whip.cancel();
1214 };
1215
1216 let srt_shutdown = bg_shutdown_for_task.clone();
1217 let srt_events = srt_events_clone;
1218 let srt_cancel = srt_shutdown_token;
1219 let srt_fut = async move {
1220 let Some(server) = srt_server else { return };
1221 if let Err(e) = server.run(srt_events, srt_cancel).await {
1222 tracing::error!(error = %e, "SRT server error");
1223 }
1224 srt_shutdown.cancel();
1225 };
1226
1227 let rtsp_shutdown = bg_shutdown_for_task.clone();
1228 let rtsp_events = rtsp_events_clone;
1229 let rtsp_cancel = rtsp_shutdown_token;
1230 let rtsp_fut = async move {
1231 let Some(server) = rtsp_server else { return };
1232 if let Err(e) = server.run(rtsp_events, rtsp_cancel).await {
1233 tracing::error!(error = %e, "RTSP server error");
1234 }
1235 rtsp_shutdown.cancel();
1236 };
1237
1238 let _ = tokio::join!(
1239 relay_fut, rtmp_fut, admin_fut, hls_fut, dash_fut, whep_fut, whip_fut, srt_fut, rtsp_fut
1240 );
1241 drop(whip_bridge_keepalive);
1242 tracing::info!("shutdown complete");
1243 });
1244
1245 Ok(ServerHandle {
1246 relay_addr: relay_bound,
1247 rtmp_addr: rtmp_bound,
1248 admin_addr: admin_bound,
1249 hls_addr: hls_bound,
1250 whep_addr: whep_bound,
1251 whip_addr: whip_bound,
1252 dash_addr: dash_bound,
1253 rtsp_addr: rtsp_bound,
1254 srt_addr: srt_bound,
1255 shutdown,
1256 join: Some(join),
1257 #[cfg(feature = "cluster")]
1258 cluster,
1259 wasm_filter: wasm_filter_handle,
1260 _wasm_reloader: wasm_reloader_handle,
1261 })
1262}
1263
1264#[derive(Clone)]
1270struct WsRelayState {
1271 origin: lvqr_moq::OriginProducer,
1272 init_segments: Arc<dashmap::DashMap<String, Bytes>>,
1275 auth: SharedAuth,
1277 events: EventBus,
1279}
1280
1281async fn ws_relay_handler(
1282 ws: WebSocketUpgrade,
1283 State(state): State<WsRelayState>,
1284 Path(broadcast): Path<String>,
1285 Query(params): Query<HashMap<String, String>>,
1286 headers: HeaderMap,
1287) -> Response {
1288 tracing::info!(broadcast = %broadcast, "WebSocket relay request");
1289 let resolved = resolve_ws_token(&headers, ¶ms, "ws_subscribe");
1290 let decision = state.auth.check(&AuthContext::Subscribe {
1291 token: resolved.token,
1292 broadcast: broadcast.clone(),
1293 });
1294 if let AuthDecision::Deny { reason } = decision {
1295 tracing::warn!(broadcast = %broadcast, reason = %reason, "WS relay denied");
1296 metrics::counter!("lvqr_auth_failures_total", "entry" => "ws").increment(1);
1297 return (StatusCode::UNAUTHORIZED, reason).into_response();
1298 }
1299 metrics::counter!("lvqr_ws_connections_total", "direction" => "subscribe").increment(1);
1300 let ws = match resolved.offered_subprotocol {
1301 Some(ref p) => ws.protocols(std::iter::once(p.clone())),
1302 None => ws,
1303 };
1304 ws.on_upgrade(move |socket| ws_relay_session(socket, state, broadcast))
1305 .into_response()
1306}
1307
1308async fn ws_relay_session(mut socket: WebSocket, state: WsRelayState, broadcast: String) {
1309 let consumer = state.origin.consume();
1310 let Some(bc) = consumer.consume_broadcast(&broadcast) else {
1311 tracing::warn!(broadcast = %broadcast, "broadcast not found for WS relay");
1312 let _ = socket
1313 .send(Message::Close(Some(axum::extract::ws::CloseFrame {
1314 code: 4404,
1315 reason: "broadcast not found".into(),
1316 })))
1317 .await;
1318 return;
1319 };
1320
1321 tracing::info!(broadcast = %broadcast, "WS relay session started");
1322
1323 let video_track = bc.subscribe_track(&Track::new("0.mp4")).ok();
1324 let audio_track = bc.subscribe_track(&Track::new("1.mp4")).ok();
1325
1326 if video_track.is_none() && audio_track.is_none() {
1327 tracing::warn!(broadcast = %broadcast, "no playable tracks for WS relay");
1328 return;
1329 }
1330
1331 let (tx, mut rx) = tokio::sync::mpsc::channel::<(u8, Bytes)>(64);
1332 let cancel = CancellationToken::new();
1333
1334 if let Some(track) = video_track {
1335 let tx = tx.clone();
1336 let cancel = cancel.clone();
1337 tokio::spawn(async move {
1338 relay_track(track, 0u8, tx, cancel).await;
1339 });
1340 }
1341 if let Some(track) = audio_track {
1342 let tx = tx.clone();
1343 let cancel = cancel.clone();
1344 tokio::spawn(async move {
1345 relay_track(track, 1u8, tx, cancel).await;
1346 });
1347 }
1348 drop(tx);
1349
1350 while let Some((track_id, payload)) = rx.recv().await {
1351 let mut framed = Vec::with_capacity(1 + payload.len());
1352 framed.push(track_id);
1353 framed.extend_from_slice(&payload);
1354 let len = framed.len() as u64;
1355 if let Err(e) = socket.send(Message::Binary(framed.into())).await {
1356 tracing::debug!(error = ?e, "WS send error");
1357 break;
1358 }
1359 metrics::counter!("lvqr_frames_relayed_total", "transport" => "ws").increment(1);
1360 metrics::counter!("lvqr_bytes_relayed_total", "transport" => "ws").increment(len);
1361 }
1362
1363 cancel.cancel();
1364 tracing::info!(broadcast = %broadcast, "WS relay session ended");
1365}
1366
1367async fn relay_track(
1368 mut track: lvqr_moq::TrackConsumer,
1369 track_id: u8,
1370 tx: tokio::sync::mpsc::Sender<(u8, Bytes)>,
1371 cancel: CancellationToken,
1372) {
1373 loop {
1374 let group = tokio::select! {
1375 res = track.next_group() => res,
1376 _ = cancel.cancelled() => return,
1377 };
1378 let mut group = match group {
1379 Ok(Some(g)) => g,
1380 Ok(None) => return,
1381 Err(e) => {
1382 tracing::debug!(track_id, error = ?e, "track error");
1383 return;
1384 }
1385 };
1386 loop {
1387 let frame = tokio::select! {
1388 res = group.read_frame() => res,
1389 _ = cancel.cancelled() => return,
1390 };
1391 match frame {
1392 Ok(Some(bytes)) => {
1393 if tx.send((track_id, bytes)).await.is_err() {
1394 return;
1395 }
1396 }
1397 Ok(None) => break,
1398 Err(e) => {
1399 tracing::debug!(track_id, error = ?e, "group read error");
1400 return;
1401 }
1402 }
1403 }
1404 }
1405}
1406
1407async fn ws_ingest_handler(
1408 ws: WebSocketUpgrade,
1409 State(state): State<WsRelayState>,
1410 Path(broadcast): Path<String>,
1411 Query(params): Query<HashMap<String, String>>,
1412 headers: HeaderMap,
1413) -> Response {
1414 tracing::info!(broadcast = %broadcast, "WebSocket ingest request");
1415 let resolved = resolve_ws_token(&headers, ¶ms, "ws_ingest");
1416 let decision = state
1417 .auth
1418 .check(&extract::extract_ws_ingest(resolved.token.as_deref(), &broadcast));
1419 if let AuthDecision::Deny { reason } = decision {
1420 tracing::warn!(broadcast = %broadcast, reason = %reason, "WS ingest denied");
1421 metrics::counter!("lvqr_auth_failures_total", "entry" => "ws_ingest").increment(1);
1422 return (StatusCode::UNAUTHORIZED, reason).into_response();
1423 }
1424 metrics::counter!("lvqr_ws_connections_total", "direction" => "publish").increment(1);
1425 let ws = match resolved.offered_subprotocol {
1426 Some(ref p) => ws.protocols(std::iter::once(p.clone())),
1427 None => ws,
1428 };
1429 ws.on_upgrade(move |socket| ws_ingest_session(socket, state, broadcast))
1430 .into_response()
1431}
1432
1433struct WsTokenResolution {
1441 token: Option<String>,
1442 offered_subprotocol: Option<String>,
1443}
1444
1445fn resolve_ws_token(headers: &HeaderMap, params: &HashMap<String, String>, entry: &'static str) -> WsTokenResolution {
1446 if let Some(hv) = headers.get("sec-websocket-protocol")
1447 && let Ok(raw) = hv.to_str()
1448 {
1449 for item in raw.split(',') {
1450 let proto = item.trim();
1451 if let Some(tok) = proto.strip_prefix("lvqr.bearer.")
1452 && !tok.is_empty()
1453 {
1454 return WsTokenResolution {
1455 token: Some(tok.to_string()),
1456 offered_subprotocol: Some(proto.to_string()),
1457 };
1458 }
1459 }
1460 }
1461
1462 if let Some(tok) = params.get("token").filter(|t| !t.is_empty()) {
1463 tracing::warn!(
1464 entry = entry,
1465 "deprecated: ?token= query parameter; migrate to Sec-WebSocket-Protocol: lvqr.bearer.<token>"
1466 );
1467 return WsTokenResolution {
1468 token: Some(tok.clone()),
1469 offered_subprotocol: None,
1470 };
1471 }
1472
1473 WsTokenResolution {
1474 token: None,
1475 offered_subprotocol: None,
1476 }
1477}
1478
1479async fn ws_ingest_session(mut socket: WebSocket, state: WsRelayState, broadcast: String) {
1480 use lvqr_ingest::remux;
1481
1482 tracing::info!(broadcast = %broadcast, "WS ingest session started");
1483
1484 let Some(mut bc) = state.origin.create_broadcast(&broadcast) else {
1485 tracing::warn!(broadcast = %broadcast, "broadcast creation failed");
1486 let _ = socket
1487 .send(Message::Close(Some(axum::extract::ws::CloseFrame {
1488 code: 4409,
1489 reason: "broadcast already exists".into(),
1490 })))
1491 .await;
1492 return;
1493 };
1494
1495 state.events.emit(RelayEvent::BroadcastStarted {
1496 name: broadcast.clone(),
1497 });
1498
1499 let Ok(mut video_track) = bc.create_track(Track::new("0.mp4")) else {
1500 tracing::warn!("failed to create video track");
1501 return;
1502 };
1503 let Ok(mut catalog_track) = bc.create_track(Track::new(".catalog")) else {
1504 tracing::warn!("failed to create catalog track");
1505 return;
1506 };
1507
1508 let mut _video_config: Option<remux::VideoConfig> = None;
1509 let mut video_init: Option<Bytes> = None;
1510 let mut video_group: Option<lvqr_moq::GroupProducer> = None;
1511 let mut video_seq: u32 = 0;
1512 let _ = socket.send(Message::Text(r#"{"status":"ready"}"#.into())).await;
1513
1514 while let Some(msg) = socket.recv().await {
1515 let data = match msg {
1516 Ok(Message::Binary(data)) => data,
1517 Ok(Message::Close(_)) => break,
1518 Ok(_) => continue,
1519 Err(e) => {
1520 tracing::debug!(error = ?e, "WS ingest recv error");
1521 break;
1522 }
1523 };
1524
1525 if data.len() < 5 {
1526 continue;
1527 }
1528
1529 let msg_type = data[0];
1530 let timestamp = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1531 let payload = Bytes::from(data[5..].to_vec());
1532
1533 match msg_type {
1534 0 => {
1535 if payload.len() < 6 {
1536 continue;
1537 }
1538 let vid_width = u16::from_be_bytes([payload[0], payload[1]]);
1539 let vid_height = u16::from_be_bytes([payload[2], payload[3]]);
1540 let avcc_data = &payload[4..];
1541
1542 match parse_avcc_record(avcc_data) {
1543 Some(config) => {
1544 tracing::info!(
1545 broadcast = %broadcast,
1546 codec = %config.codec_string(),
1547 width = vid_width,
1548 height = vid_height,
1549 "WS ingest: video config received"
1550 );
1551 let init = remux::video_init_segment_with_size(&config, vid_width, vid_height);
1552 _video_config = Some(config.clone());
1553 video_init = Some(init.clone());
1554 state.init_segments.insert(broadcast.clone(), init);
1555
1556 let json = remux::generate_catalog(Some(&config), None);
1557 if let Ok(mut group) = catalog_track.append_group() {
1558 let _ = group.write_frame(Bytes::from(json));
1559 let _ = group.finish();
1560 }
1561 }
1562 None => {
1563 tracing::warn!("invalid AVCC record from browser");
1564 }
1565 }
1566 }
1567 1 => {
1568 let Some(ref init) = video_init else { continue };
1569 if let Some(mut g) = video_group.take() {
1570 let _ = g.finish();
1571 }
1572 video_seq += 1;
1573 let base_dts = (timestamp as u64) * 90;
1574 let sample = lvqr_cmaf::RawSample {
1575 track_id: 1,
1576 dts: base_dts,
1577 cts_offset: 0,
1578 duration: 3000,
1579 payload,
1580 keyframe: true,
1581 };
1582 if let Ok(mut group) = video_track.append_group() {
1583 let _ = group.write_frame(init.clone());
1584 let seg = lvqr_cmaf::build_moof_mdat(video_seq, 1, base_dts, &[sample]);
1585 let _ = group.write_frame(seg);
1586 video_group = Some(group);
1587 }
1588 }
1589 2 => {
1590 if video_init.is_none() {
1591 continue;
1592 }
1593 video_seq += 1;
1594 let base_dts = (timestamp as u64) * 90;
1595 let sample = lvqr_cmaf::RawSample {
1596 track_id: 1,
1597 dts: base_dts,
1598 cts_offset: 0,
1599 duration: 3000,
1600 payload,
1601 keyframe: false,
1602 };
1603 if let Some(ref mut group) = video_group {
1604 let seg = lvqr_cmaf::build_moof_mdat(video_seq, 1, base_dts, &[sample]);
1605 let _ = group.write_frame(seg);
1606 }
1607 }
1608 _ => {}
1609 }
1610 }
1611
1612 if let Some(mut g) = video_group.take() {
1613 let _ = g.finish();
1614 }
1615 state.events.emit(RelayEvent::BroadcastStopped {
1616 name: broadcast.clone(),
1617 });
1618 tracing::info!(broadcast = %broadcast, "WS ingest session ended");
1619}
1620
1621async fn spawn_recordings(
1625 recorder: lvqr_record::BroadcastRecorder,
1626 origin: lvqr_moq::OriginProducer,
1627 mut events: tokio::sync::broadcast::Receiver<RelayEvent>,
1628 shutdown: CancellationToken,
1629) {
1630 let mut active: std::collections::HashSet<String> = std::collections::HashSet::new();
1631 loop {
1632 let event = tokio::select! {
1633 res = events.recv() => res,
1634 _ = shutdown.cancelled() => return,
1635 };
1636 let event = match event {
1637 Ok(e) => e,
1638 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1639 tracing::warn!(missed = n, "recorder event stream lagged");
1640 continue;
1641 }
1642 Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
1643 };
1644 match event {
1645 RelayEvent::BroadcastStarted { name } => {
1646 if !active.insert(name.clone()) {
1647 continue;
1648 }
1649 let consumer = origin.consume();
1650 let Some(broadcast) = consumer.consume_broadcast(&name) else {
1651 tracing::warn!(broadcast = %name, "recorder: broadcast not resolvable yet");
1652 active.remove(&name);
1653 continue;
1654 };
1655 let recorder = recorder.clone();
1656 let cancel = shutdown.clone();
1657 tracing::info!(broadcast = %name, "starting recording");
1658 let name_clone = name.clone();
1659 tokio::spawn(async move {
1660 let _ = recorder
1661 .record_broadcast(&name_clone, broadcast, lvqr_record::RecordOptions::default(), cancel)
1662 .await;
1663 });
1664 }
1665 RelayEvent::BroadcastStopped { name } => {
1666 active.remove(&name);
1667 }
1668 RelayEvent::ViewerJoined { .. } | RelayEvent::ViewerLeft { .. } => {}
1669 }
1670 }
1671}
1672
1673fn parse_avcc_record(data: &[u8]) -> Option<lvqr_ingest::remux::VideoConfig> {
1675 if data.len() < 6 {
1676 return None;
1677 }
1678 let profile = data[1];
1679 let compat = data[2];
1680 let level = data[3];
1681 let nalu_length_size = (data[4] & 0x03) + 1;
1682
1683 let num_sps = (data[5] & 0x1F) as usize;
1684 let mut offset = 6;
1685 let mut sps_list = Vec::with_capacity(num_sps);
1686 for _ in 0..num_sps {
1687 if offset + 2 > data.len() {
1688 return None;
1689 }
1690 let len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
1691 offset += 2;
1692 if offset + len > data.len() {
1693 return None;
1694 }
1695 sps_list.push(data[offset..offset + len].to_vec());
1696 offset += len;
1697 }
1698
1699 if offset >= data.len() {
1700 return None;
1701 }
1702 let num_pps = data[offset] as usize;
1703 offset += 1;
1704 let mut pps_list = Vec::with_capacity(num_pps);
1705 for _ in 0..num_pps {
1706 if offset + 2 > data.len() {
1707 return None;
1708 }
1709 let len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
1710 offset += 2;
1711 if offset + len > data.len() {
1712 return None;
1713 }
1714 pps_list.push(data[offset..offset + len].to_vec());
1715 offset += len;
1716 }
1717
1718 if sps_list.is_empty() || pps_list.is_empty() {
1719 return None;
1720 }
1721
1722 Some(lvqr_ingest::remux::VideoConfig {
1723 sps_list,
1724 pps_list,
1725 profile,
1726 compat,
1727 level,
1728 nalu_length_size,
1729 })
1730}