1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28 #[serde(default, skip_serializing_if = "Vec::is_empty")]
29 pub cert: Vec<PathBuf>,
30
31 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub key: Vec<PathBuf>,
35
36 #[arg(
43 long = "server-tls-identity",
44 id = "server-tls-identity",
45 value_delimiter = ',',
46 env = "MOQ_SERVER_TLS_IDENTITY"
47 )]
48 #[serde(default, skip_serializing_if = "Vec::is_empty")]
49 pub identity: Vec<PathBuf>,
50
51 #[arg(
54 long = "tls-generate",
55 id = "tls-generate",
56 value_delimiter = ',',
57 env = "MOQ_SERVER_TLS_GENERATE"
58 )]
59 #[serde(default, skip_serializing_if = "Vec::is_empty")]
60 pub generate: Vec<String>,
61
62 #[arg(
71 long = "server-tls-root",
72 id = "server-tls-root",
73 value_delimiter = ',',
74 env = "MOQ_SERVER_TLS_ROOT"
75 )]
76 #[serde(default, skip_serializing_if = "Vec::is_empty")]
77 pub root: Vec<PathBuf>,
78}
79
80impl ServerTlsConfig {
81 pub fn load_roots(&self) -> anyhow::Result<rustls::RootCertStore> {
83 use rustls::pki_types::CertificateDer;
84
85 let mut roots = rustls::RootCertStore::empty();
86 for path in &self.root {
87 let file = std::fs::File::open(path).context("failed to open root CA")?;
88 let mut reader = std::io::BufReader::new(file);
89 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
90 .collect::<Result<_, _>>()
91 .context("failed to parse root CA PEM")?;
92 anyhow::ensure!(!certs.is_empty(), "no certificates found in root CA");
93 for cert in certs {
94 roots.add(cert).context("failed to add root CA")?;
95 }
96 }
97 Ok(roots)
98 }
99}
100
101#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
103#[serde(deny_unknown_fields, default)]
104#[non_exhaustive]
105pub struct ServerConfig {
106 #[serde(alias = "listen")]
109 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
110 pub bind: Option<net::SocketAddr>,
111
112 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
115 pub backend: Option<QuicBackend>,
116
117 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
120 #[serde(default, skip_serializing_if = "Option::is_none")]
121 pub quic_lb_id: Option<ServerId>,
122
123 #[arg(
126 id = "server-quic-lb-nonce",
127 long = "server-quic-lb-nonce",
128 requires = "server-quic-lb-id",
129 env = "MOQ_SERVER_QUIC_LB_NONCE"
130 )]
131 #[serde(default, skip_serializing_if = "Option::is_none")]
132 pub quic_lb_nonce: Option<usize>,
133
134 #[serde(skip_serializing_if = "Option::is_none")]
136 #[arg(
137 id = "server-max-streams",
138 long = "server-max-streams",
139 env = "MOQ_SERVER_MAX_STREAMS"
140 )]
141 pub max_streams: Option<u64>,
142
143 #[serde(default, skip_serializing_if = "Vec::is_empty")]
151 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
152 pub version: Vec<moq_lite::Version>,
153
154 #[command(flatten)]
155 #[serde(default)]
156 pub tls: ServerTlsConfig,
157}
158
159impl ServerConfig {
160 pub fn init(self) -> anyhow::Result<Server> {
161 Server::new(self)
162 }
163
164 pub fn versions(&self) -> moq_lite::Versions {
166 if self.version.is_empty() {
167 moq_lite::Versions::all()
168 } else {
169 moq_lite::Versions::from(self.version.clone())
170 }
171 }
172}
173
174pub struct Server {
178 moq: moq_lite::Server,
179 versions: moq_lite::Versions,
180 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
181 #[cfg(feature = "iroh")]
182 iroh: Option<iroh::Endpoint>,
183 #[cfg(feature = "noq")]
184 noq: Option<crate::noq::NoqServer>,
185 #[cfg(feature = "quinn")]
186 quinn: Option<crate::quinn::QuinnServer>,
187 #[cfg(feature = "quiche")]
188 quiche: Option<crate::quiche::QuicheServer>,
189 #[cfg(feature = "websocket")]
190 websocket: Option<crate::websocket::WebSocketListener>,
191}
192
193impl Server {
194 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
195 let backend = config.backend.clone().unwrap_or({
196 #[cfg(feature = "quinn")]
197 {
198 QuicBackend::Quinn
199 }
200 #[cfg(all(feature = "noq", not(feature = "quinn")))]
201 {
202 QuicBackend::Noq
203 }
204 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
205 {
206 QuicBackend::Quiche
207 }
208 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
209 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
210 });
211
212 let versions = config.versions();
213
214 if !config.tls.root.is_empty() {
215 #[cfg(feature = "quinn")]
216 let quinn_backend = matches!(backend, QuicBackend::Quinn);
217 #[cfg(not(feature = "quinn"))]
218 let quinn_backend = false;
219 anyhow::ensure!(quinn_backend, "tls.root (mTLS) is only supported by the quinn backend");
220 }
221
222 #[cfg(feature = "noq")]
223 #[allow(unreachable_patterns)]
224 let noq = match backend {
225 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
226 _ => None,
227 };
228
229 #[cfg(feature = "quinn")]
230 #[allow(unreachable_patterns)]
231 let quinn = match backend {
232 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
233 _ => None,
234 };
235
236 #[cfg(feature = "quiche")]
237 let quiche = match backend {
238 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
239 _ => None,
240 };
241
242 Ok(Server {
243 accept: Default::default(),
244 moq: moq_lite::Server::new().with_versions(versions.clone()),
245 versions,
246 #[cfg(feature = "iroh")]
247 iroh: None,
248 #[cfg(feature = "noq")]
249 noq,
250 #[cfg(feature = "quinn")]
251 quinn,
252 #[cfg(feature = "quiche")]
253 quiche,
254 #[cfg(feature = "websocket")]
255 websocket: None,
256 })
257 }
258
259 #[cfg(feature = "websocket")]
265 pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
266 self.websocket = websocket;
267 self
268 }
269
270 #[cfg(feature = "iroh")]
271 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
272 self.iroh = iroh;
273 self
274 }
275
276 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
277 self.moq = self.moq.with_publish(publish);
278 self
279 }
280
281 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
282 self.moq = self.moq.with_consume(consume);
283 self
284 }
285
286 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
288 #[cfg(feature = "noq")]
289 if let Some(noq) = self.noq.as_ref() {
290 return noq.tls_info();
291 }
292 #[cfg(feature = "quinn")]
293 if let Some(quinn) = self.quinn.as_ref() {
294 return quinn.tls_info();
295 }
296 #[cfg(feature = "quiche")]
297 if let Some(quiche) = self.quiche.as_ref() {
298 return quiche.tls_info();
299 }
300 unreachable!("no QUIC backend compiled");
301 }
302
303 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
304 pub async fn accept(&mut self) -> Option<Request> {
305 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
306 }
307
308 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
316 pub async fn accept(&mut self) -> Option<Request> {
317 loop {
318 #[cfg(feature = "noq")]
320 let noq_accept = async {
321 #[cfg(feature = "noq")]
322 if let Some(noq) = self.noq.as_mut() {
323 return noq.accept().await;
324 }
325 None
326 };
327 #[cfg(not(feature = "noq"))]
328 let noq_accept = async { None::<()> };
329
330 #[cfg(feature = "iroh")]
331 let iroh_accept = async {
332 #[cfg(feature = "iroh")]
333 if let Some(endpoint) = self.iroh.as_mut() {
334 return endpoint.accept().await;
335 }
336 None
337 };
338 #[cfg(not(feature = "iroh"))]
339 let iroh_accept = async { None::<()> };
340
341 #[cfg(feature = "quinn")]
342 let quinn_accept = async {
343 #[cfg(feature = "quinn")]
344 if let Some(quinn) = self.quinn.as_mut() {
345 return quinn.accept().await;
346 }
347 None
348 };
349 #[cfg(not(feature = "quinn"))]
350 let quinn_accept = async { None::<()> };
351
352 #[cfg(feature = "quiche")]
353 let quiche_accept = async {
354 #[cfg(feature = "quiche")]
355 if let Some(quiche) = self.quiche.as_mut() {
356 return quiche.accept().await;
357 }
358 None
359 };
360 #[cfg(not(feature = "quiche"))]
361 let quiche_accept = async { None::<()> };
362
363 #[cfg(feature = "websocket")]
364 let ws_ref = self.websocket.as_ref();
365 #[cfg(feature = "websocket")]
366 let ws_accept = async {
367 match ws_ref {
368 Some(ws) => ws.accept().await,
369 None => std::future::pending().await,
370 }
371 };
372 #[cfg(not(feature = "websocket"))]
373 let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
374
375 let server = self.moq.clone();
376 let versions = self.versions.clone();
377
378 tokio::select! {
379 Some(_conn) = noq_accept => {
380 #[cfg(feature = "noq")]
381 {
382 let alpns = versions.alpns();
383 self.accept.push(async move {
384 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
385 Ok(Request {
386 server,
387 kind: RequestKind::Noq(noq),
388 })
389 }.boxed());
390 }
391 }
392 Some(_conn) = quinn_accept => {
393 #[cfg(feature = "quinn")]
394 {
395 let alpns = versions.alpns();
396 self.accept.push(async move {
397 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
398 Ok(Request {
399 server,
400 kind: RequestKind::Quinn(Box::new(quinn)),
401 })
402 }.boxed());
403 }
404 }
405 Some(_conn) = quiche_accept => {
406 #[cfg(feature = "quiche")]
407 {
408 let alpns = versions.alpns();
409 self.accept.push(async move {
410 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
411 Ok(Request {
412 server,
413 kind: RequestKind::Quiche(quiche),
414 })
415 }.boxed());
416 }
417 }
418 Some(_conn) = iroh_accept => {
419 #[cfg(feature = "iroh")]
420 self.accept.push(async move {
421 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
422 Ok(Request {
423 server,
424 kind: RequestKind::Iroh(iroh),
425 })
426 }.boxed());
427 }
428 Some(_res) = ws_accept => {
429 #[cfg(feature = "websocket")]
430 match _res {
431 Ok(session) => {
432 return Some(Request {
433 server,
434 kind: RequestKind::WebSocket(session),
435 });
436 }
437 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
438 }
439 }
440 Some(res) = self.accept.next() => {
441 match res {
442 Ok(session) => return Some(session),
443 Err(err) => tracing::debug!(%err, "failed to accept session"),
444 }
445 }
446 _ = tokio::signal::ctrl_c() => {
447 self.close().await;
448 return None;
449 }
450 }
451 }
452 }
453
454 #[cfg(feature = "iroh")]
455 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
456 self.iroh.as_ref()
457 }
458
459 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
460 #[cfg(feature = "noq")]
461 if let Some(noq) = self.noq.as_ref() {
462 return noq.local_addr();
463 }
464 #[cfg(feature = "quinn")]
465 if let Some(quinn) = self.quinn.as_ref() {
466 return quinn.local_addr();
467 }
468 #[cfg(feature = "quiche")]
469 if let Some(quiche) = self.quiche.as_ref() {
470 return quiche.local_addr();
471 }
472 unreachable!("no QUIC backend compiled");
473 }
474
475 #[cfg(feature = "websocket")]
476 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
477 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
478 }
479
480 pub async fn close(&mut self) {
481 #[cfg(feature = "noq")]
482 if let Some(noq) = self.noq.as_mut() {
483 noq.close();
484 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
485 }
486 #[cfg(feature = "quinn")]
487 if let Some(quinn) = self.quinn.as_mut() {
488 quinn.close();
489 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
490 }
491 #[cfg(feature = "quiche")]
492 if let Some(quiche) = self.quiche.as_mut() {
493 quiche.close();
494 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
495 }
496 #[cfg(feature = "iroh")]
497 if let Some(iroh) = self.iroh.take() {
498 iroh.close().await;
499 }
500 #[cfg(feature = "websocket")]
501 {
502 let _ = self.websocket.take();
503 }
504 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
505 unreachable!("no QUIC backend compiled");
506 }
507}
508
509#[derive(Clone, Debug)]
512#[non_exhaustive]
513pub struct PeerIdentity {
514 pub dns_name: Option<String>,
519}
520
521pub(crate) enum RequestKind {
523 #[cfg(feature = "noq")]
524 Noq(crate::noq::NoqRequest),
525 #[cfg(feature = "quinn")]
526 Quinn(Box<crate::quinn::QuinnRequest>),
527 #[cfg(feature = "quiche")]
528 Quiche(crate::quiche::QuicheRequest),
529 #[cfg(feature = "iroh")]
530 Iroh(crate::iroh::IrohRequest),
531 #[cfg(feature = "websocket")]
532 WebSocket(qmux::Session),
533}
534
535pub struct Request {
540 server: moq_lite::Server,
541 kind: RequestKind,
542}
543
544impl Request {
545 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
547 match self.kind {
548 #[cfg(feature = "noq")]
549 RequestKind::Noq(request) => {
550 let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
551 request.close(status).await?;
552 Ok(())
553 }
554 #[cfg(feature = "quinn")]
555 RequestKind::Quinn(request) => {
556 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
557 request.close(status).await?;
558 Ok(())
559 }
560 #[cfg(feature = "quiche")]
561 RequestKind::Quiche(request) => {
562 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
563 request
564 .reject(status)
565 .await
566 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
567 Ok(())
568 }
569 #[cfg(feature = "iroh")]
570 RequestKind::Iroh(request) => {
571 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
572 request.close(status).await?;
573 Ok(())
574 }
575 #[cfg(feature = "websocket")]
576 RequestKind::WebSocket(_session) => {
577 Ok(())
579 }
580 }
581 }
582
583 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
585 self.server = self.server.with_publish(publish);
586 self
587 }
588
589 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
591 self.server = self.server.with_consume(consume);
592 self
593 }
594
595 pub async fn ok(self) -> anyhow::Result<Session> {
597 match self.kind {
598 #[cfg(feature = "noq")]
599 RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
600 #[cfg(feature = "quinn")]
601 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
602 #[cfg(feature = "quiche")]
603 RequestKind::Quiche(request) => {
604 let conn = request
605 .ok()
606 .await
607 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
608 Ok(self.server.accept(conn).await?)
609 }
610 #[cfg(feature = "iroh")]
611 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
612 #[cfg(feature = "websocket")]
613 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
614 }
615 }
616
617 pub fn transport(&self) -> &'static str {
619 match self.kind {
620 #[cfg(feature = "noq")]
621 RequestKind::Noq(_) => "quic",
622 #[cfg(feature = "quinn")]
623 RequestKind::Quinn(_) => "quic",
624 #[cfg(feature = "quiche")]
625 RequestKind::Quiche(_) => "quic",
626 #[cfg(feature = "iroh")]
627 RequestKind::Iroh(_) => "iroh",
628 #[cfg(feature = "websocket")]
629 RequestKind::WebSocket(_) => "websocket",
630 }
631 }
632
633 pub fn url(&self) -> Option<&Url> {
635 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
636 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
637
638 match self.kind {
639 #[cfg(feature = "noq")]
640 RequestKind::Noq(ref request) => request.url(),
641 #[cfg(feature = "quinn")]
642 RequestKind::Quinn(ref request) => request.url(),
643 #[cfg(feature = "quiche")]
644 RequestKind::Quiche(ref request) => request.url(),
645 #[cfg(feature = "iroh")]
646 RequestKind::Iroh(ref request) => request.url(),
647 #[cfg(feature = "websocket")]
648 RequestKind::WebSocket(_) => None,
649 }
650 }
651
652 pub fn peer_identity(&self) -> anyhow::Result<Option<PeerIdentity>> {
658 match self.kind {
659 #[cfg(feature = "quinn")]
660 RequestKind::Quinn(ref request) => request.peer_identity(),
661 #[cfg(feature = "noq")]
662 RequestKind::Noq(_) => Ok(None),
663 #[cfg(feature = "quiche")]
664 RequestKind::Quiche(_) => Ok(None),
665 #[cfg(feature = "iroh")]
666 RequestKind::Iroh(_) => Ok(None),
667 #[cfg(feature = "websocket")]
668 RequestKind::WebSocket(_) => Ok(None),
669 #[cfg(not(any(
670 feature = "noq",
671 feature = "quinn",
672 feature = "quiche",
673 feature = "iroh",
674 feature = "websocket"
675 )))]
676 _ => Ok(None),
677 }
678 }
679}
680
681#[derive(Debug)]
683pub struct ServerTlsInfo {
684 #[cfg(any(feature = "noq", feature = "quinn"))]
685 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
686 pub fingerprints: Vec<String>,
687}
688
689#[serde_with::serde_as]
691#[derive(Clone, serde::Serialize, serde::Deserialize)]
692pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
693
694impl ServerId {
695 #[allow(dead_code)]
696 pub(crate) fn len(&self) -> usize {
697 self.0.len()
698 }
699}
700
701impl std::fmt::Debug for ServerId {
702 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
703 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
704 }
705}
706
707impl std::str::FromStr for ServerId {
708 type Err = hex::FromHexError;
709
710 fn from_str(s: &str) -> Result<Self, Self::Err> {
711 hex::decode(s).map(Self)
712 }
713}