1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_net::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[serde_with::serde_as]
25#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
26#[serde(deny_unknown_fields)]
27#[non_exhaustive]
28pub struct ServerTlsConfig {
29 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
31 #[serde(default, skip_serializing_if = "Vec::is_empty")]
32 #[serde_as(as = "serde_with::OneOrMany<_>")]
33 pub cert: Vec<PathBuf>,
34
35 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
37 #[serde(default, skip_serializing_if = "Vec::is_empty")]
38 #[serde_as(as = "serde_with::OneOrMany<_>")]
39 pub key: Vec<PathBuf>,
40
41 #[arg(
44 long = "tls-generate",
45 id = "tls-generate",
46 value_delimiter = ',',
47 env = "MOQ_SERVER_TLS_GENERATE"
48 )]
49 #[serde(default, skip_serializing_if = "Vec::is_empty")]
50 #[serde_as(as = "serde_with::OneOrMany<_>")]
51 pub generate: Vec<String>,
52
53 #[arg(
62 long = "server-tls-root",
63 id = "server-tls-root",
64 value_delimiter = ',',
65 env = "MOQ_SERVER_TLS_ROOT"
66 )]
67 #[serde(default, skip_serializing_if = "Vec::is_empty")]
68 #[serde_as(as = "serde_with::OneOrMany<_>")]
69 pub root: Vec<PathBuf>,
70}
71
72impl ServerTlsConfig {
73 pub fn load_roots(&self) -> anyhow::Result<rustls::RootCertStore> {
75 use rustls::pki_types::CertificateDer;
76
77 let mut roots = rustls::RootCertStore::empty();
78 for path in &self.root {
79 let file = std::fs::File::open(path).context("failed to open root CA")?;
80 let mut reader = std::io::BufReader::new(file);
81 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
82 .collect::<Result<_, _>>()
83 .context("failed to parse root CA PEM")?;
84 anyhow::ensure!(!certs.is_empty(), "no certificates found in root CA");
85 for cert in certs {
86 roots.add(cert).context("failed to add root CA")?;
87 }
88 }
89 Ok(roots)
90 }
91}
92
93#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
95#[serde(deny_unknown_fields, default)]
96#[non_exhaustive]
97pub struct ServerConfig {
98 #[serde(alias = "listen")]
106 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
107 pub bind: Option<String>,
108
109 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
112 pub backend: Option<QuicBackend>,
113
114 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
117 #[serde(default, skip_serializing_if = "Option::is_none")]
118 pub quic_lb_id: Option<ServerId>,
119
120 #[arg(
123 id = "server-quic-lb-nonce",
124 long = "server-quic-lb-nonce",
125 requires = "server-quic-lb-id",
126 env = "MOQ_SERVER_QUIC_LB_NONCE"
127 )]
128 #[serde(default, skip_serializing_if = "Option::is_none")]
129 pub quic_lb_nonce: Option<usize>,
130
131 #[serde(skip_serializing_if = "Option::is_none")]
133 #[arg(
134 id = "server-max-streams",
135 long = "server-max-streams",
136 env = "MOQ_SERVER_MAX_STREAMS"
137 )]
138 pub max_streams: Option<u64>,
139
140 #[serde(default, skip_serializing_if = "Vec::is_empty")]
148 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
149 pub version: Vec<moq_net::Version>,
150
151 #[command(flatten)]
152 #[serde(default)]
153 pub tls: ServerTlsConfig,
154}
155
156impl ServerConfig {
157 pub fn init(self) -> anyhow::Result<Server> {
158 Server::new(self)
159 }
160
161 pub fn versions(&self) -> moq_net::Versions {
163 if self.version.is_empty() {
164 moq_net::Versions::all()
165 } else {
166 moq_net::Versions::from(self.version.clone())
167 }
168 }
169}
170
171pub(crate) const DEFAULT_BIND: &str = "[::]:443";
173
174pub struct Server {
178 moq: moq_net::Server,
179 versions: moq_net::Versions,
180 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
181 #[cfg(feature = "iroh")]
182 iroh: Option<iroh::Endpoint>,
183 #[cfg(feature = "noq")]
184 noq: Option<crate::noq::NoqServer>,
185 #[cfg(feature = "quinn")]
186 quinn: Option<crate::quinn::QuinnServer>,
187 #[cfg(feature = "quiche")]
188 quiche: Option<crate::quiche::QuicheServer>,
189 #[cfg(feature = "websocket")]
190 websocket: Option<crate::websocket::WebSocketListener>,
191}
192
193impl Server {
194 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
195 let backend = config.backend.clone().unwrap_or({
196 #[cfg(feature = "quinn")]
197 {
198 QuicBackend::Quinn
199 }
200 #[cfg(all(feature = "noq", not(feature = "quinn")))]
201 {
202 QuicBackend::Noq
203 }
204 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
205 {
206 QuicBackend::Quiche
207 }
208 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
209 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
210 });
211
212 let versions = config.versions();
213
214 if !config.tls.root.is_empty() {
215 #[cfg(feature = "quinn")]
216 let quinn_backend = matches!(backend, QuicBackend::Quinn);
217 #[cfg(not(feature = "quinn"))]
218 let quinn_backend = false;
219 anyhow::ensure!(quinn_backend, "tls.root (mTLS) is only supported by the quinn backend");
220 }
221
222 #[cfg(feature = "noq")]
223 #[allow(unreachable_patterns)]
224 let noq = match backend {
225 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
226 _ => None,
227 };
228
229 #[cfg(feature = "quinn")]
230 #[allow(unreachable_patterns)]
231 let quinn = match backend {
232 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
233 _ => None,
234 };
235
236 #[cfg(feature = "quiche")]
237 let quiche = match backend {
238 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
239 _ => None,
240 };
241
242 Ok(Server {
243 accept: Default::default(),
244 moq: moq_net::Server::new().with_versions(versions.clone()),
245 versions,
246 #[cfg(feature = "iroh")]
247 iroh: None,
248 #[cfg(feature = "noq")]
249 noq,
250 #[cfg(feature = "quinn")]
251 quinn,
252 #[cfg(feature = "quiche")]
253 quiche,
254 #[cfg(feature = "websocket")]
255 websocket: None,
256 })
257 }
258
259 #[cfg(feature = "websocket")]
265 pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
266 self.websocket = websocket;
267 self
268 }
269
270 #[cfg(feature = "iroh")]
271 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
272 self.iroh = iroh;
273 self
274 }
275
276 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
277 self.moq = self.moq.with_publish(publish);
278 self
279 }
280
281 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
282 self.moq = self.moq.with_consume(consume);
283 self
284 }
285
286 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
288 self.moq = self.moq.with_stats(stats);
289 self
290 }
291
292 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
294 #[cfg(feature = "noq")]
295 if let Some(noq) = self.noq.as_ref() {
296 return noq.tls_info();
297 }
298 #[cfg(feature = "quinn")]
299 if let Some(quinn) = self.quinn.as_ref() {
300 return quinn.tls_info();
301 }
302 #[cfg(feature = "quiche")]
303 if let Some(quiche) = self.quiche.as_ref() {
304 return quiche.tls_info();
305 }
306 unreachable!("no QUIC backend compiled");
307 }
308
309 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
310 pub async fn accept(&mut self) -> Option<Request> {
311 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
312 }
313
314 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
322 pub async fn accept(&mut self) -> Option<Request> {
323 loop {
324 #[cfg(feature = "noq")]
326 let noq_accept = async {
327 #[cfg(feature = "noq")]
328 if let Some(noq) = self.noq.as_mut() {
329 return noq.accept().await;
330 }
331 None
332 };
333 #[cfg(not(feature = "noq"))]
334 let noq_accept = async { None::<()> };
335
336 #[cfg(feature = "iroh")]
337 let iroh_accept = async {
338 #[cfg(feature = "iroh")]
339 if let Some(endpoint) = self.iroh.as_mut() {
340 return endpoint.accept().await;
341 }
342 None
343 };
344 #[cfg(not(feature = "iroh"))]
345 let iroh_accept = async { None::<()> };
346
347 #[cfg(feature = "quinn")]
348 let quinn_accept = async {
349 #[cfg(feature = "quinn")]
350 if let Some(quinn) = self.quinn.as_mut() {
351 return quinn.accept().await;
352 }
353 None
354 };
355 #[cfg(not(feature = "quinn"))]
356 let quinn_accept = async { None::<()> };
357
358 #[cfg(feature = "quiche")]
359 let quiche_accept = async {
360 #[cfg(feature = "quiche")]
361 if let Some(quiche) = self.quiche.as_mut() {
362 return quiche.accept().await;
363 }
364 None
365 };
366 #[cfg(not(feature = "quiche"))]
367 let quiche_accept = async { None::<()> };
368
369 #[cfg(feature = "websocket")]
370 let ws_ref = self.websocket.as_ref();
371 #[cfg(feature = "websocket")]
372 let ws_accept = async {
373 match ws_ref {
374 Some(ws) => ws.accept().await,
375 None => std::future::pending().await,
376 }
377 };
378 #[cfg(not(feature = "websocket"))]
379 let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
380
381 let server = self.moq.clone();
382 let versions = self.versions.clone();
383
384 tokio::select! {
385 Some(_conn) = noq_accept => {
386 #[cfg(feature = "noq")]
387 {
388 let alpns = versions.alpns();
389 self.accept.push(async move {
390 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
391 Ok(Request {
392 server,
393 kind: RequestKind::Noq(noq),
394 })
395 }.boxed());
396 }
397 }
398 Some(_conn) = quinn_accept => {
399 #[cfg(feature = "quinn")]
400 {
401 let alpns = versions.alpns();
402 self.accept.push(async move {
403 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
404 Ok(Request {
405 server,
406 kind: RequestKind::Quinn(Box::new(quinn)),
407 })
408 }.boxed());
409 }
410 }
411 Some(_conn) = quiche_accept => {
412 #[cfg(feature = "quiche")]
413 {
414 let alpns = versions.alpns();
415 self.accept.push(async move {
416 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
417 Ok(Request {
418 server,
419 kind: RequestKind::Quiche(quiche),
420 })
421 }.boxed());
422 }
423 }
424 Some(_conn) = iroh_accept => {
425 #[cfg(feature = "iroh")]
426 self.accept.push(async move {
427 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
428 Ok(Request {
429 server,
430 kind: RequestKind::Iroh(iroh),
431 })
432 }.boxed());
433 }
434 Some(_res) = ws_accept => {
435 #[cfg(feature = "websocket")]
436 match _res {
437 Ok(session) => {
438 return Some(Request {
439 server,
440 kind: RequestKind::WebSocket(session),
441 });
442 }
443 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
444 }
445 }
446 Some(res) = self.accept.next() => {
447 match res {
448 Ok(session) => return Some(session),
449 Err(err) => tracing::debug!(%err, "failed to accept session"),
450 }
451 }
452 _ = tokio::signal::ctrl_c() => {
453 self.close().await;
454 return None;
455 }
456 }
457 }
458 }
459
460 #[cfg(feature = "iroh")]
461 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
462 self.iroh.as_ref()
463 }
464
465 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
466 #[cfg(feature = "noq")]
467 if let Some(noq) = self.noq.as_ref() {
468 return noq.local_addr();
469 }
470 #[cfg(feature = "quinn")]
471 if let Some(quinn) = self.quinn.as_ref() {
472 return quinn.local_addr();
473 }
474 #[cfg(feature = "quiche")]
475 if let Some(quiche) = self.quiche.as_ref() {
476 return quiche.local_addr();
477 }
478 unreachable!("no QUIC backend compiled");
479 }
480
481 #[cfg(feature = "websocket")]
482 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
483 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
484 }
485
486 pub async fn close(&mut self) {
487 #[cfg(feature = "noq")]
488 if let Some(noq) = self.noq.as_mut() {
489 noq.close();
490 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
491 }
492 #[cfg(feature = "quinn")]
493 if let Some(quinn) = self.quinn.as_mut() {
494 quinn.close();
495 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
496 }
497 #[cfg(feature = "quiche")]
498 if let Some(quiche) = self.quiche.as_mut() {
499 quiche.close();
500 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
501 }
502 #[cfg(feature = "iroh")]
503 if let Some(iroh) = self.iroh.take() {
504 iroh.close().await;
505 }
506 #[cfg(feature = "websocket")]
507 {
508 let _ = self.websocket.take();
509 }
510 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
511 unreachable!("no QUIC backend compiled");
512 }
513}
514
515#[derive(Clone, Debug, Default)]
518#[non_exhaustive]
519pub struct PeerIdentity {}
520
521pub(crate) enum RequestKind {
523 #[cfg(feature = "noq")]
524 Noq(crate::noq::NoqRequest),
525 #[cfg(feature = "quinn")]
526 Quinn(Box<crate::quinn::QuinnRequest>),
527 #[cfg(feature = "quiche")]
528 Quiche(crate::quiche::QuicheRequest),
529 #[cfg(feature = "iroh")]
530 Iroh(crate::iroh::IrohRequest),
531 #[cfg(feature = "websocket")]
532 WebSocket(qmux::Session),
533}
534
535pub struct Request {
540 server: moq_net::Server,
541 kind: RequestKind,
542}
543
544impl Request {
545 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
547 match self.kind {
548 #[cfg(feature = "noq")]
549 RequestKind::Noq(request) => {
550 let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
551 request.close(status).await?;
552 Ok(())
553 }
554 #[cfg(feature = "quinn")]
555 RequestKind::Quinn(request) => {
556 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
557 request.close(status).await?;
558 Ok(())
559 }
560 #[cfg(feature = "quiche")]
561 RequestKind::Quiche(request) => {
562 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
563 request
564 .reject(status)
565 .await
566 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
567 Ok(())
568 }
569 #[cfg(feature = "iroh")]
570 RequestKind::Iroh(request) => {
571 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
572 request.close(status).await?;
573 Ok(())
574 }
575 #[cfg(feature = "websocket")]
576 RequestKind::WebSocket(_session) => {
577 Ok(())
579 }
580 }
581 }
582
583 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
585 self.server = self.server.with_publish(publish);
586 self
587 }
588
589 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
591 self.server = self.server.with_consume(consume);
592 self
593 }
594
595 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
597 self.server = self.server.with_stats(stats);
598 self
599 }
600
601 pub async fn ok(self) -> anyhow::Result<Session> {
603 match self.kind {
604 #[cfg(feature = "noq")]
605 RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
606 #[cfg(feature = "quinn")]
607 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
608 #[cfg(feature = "quiche")]
609 RequestKind::Quiche(request) => {
610 let conn = request
611 .ok()
612 .await
613 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
614 Ok(self.server.accept(conn).await?)
615 }
616 #[cfg(feature = "iroh")]
617 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
618 #[cfg(feature = "websocket")]
619 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
620 }
621 }
622
623 pub fn transport(&self) -> &'static str {
625 match self.kind {
626 #[cfg(feature = "noq")]
627 RequestKind::Noq(_) => "quic",
628 #[cfg(feature = "quinn")]
629 RequestKind::Quinn(_) => "quic",
630 #[cfg(feature = "quiche")]
631 RequestKind::Quiche(_) => "quic",
632 #[cfg(feature = "iroh")]
633 RequestKind::Iroh(_) => "iroh",
634 #[cfg(feature = "websocket")]
635 RequestKind::WebSocket(_) => "websocket",
636 }
637 }
638
639 pub fn url(&self) -> Option<&Url> {
641 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
642 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
643
644 match self.kind {
645 #[cfg(feature = "noq")]
646 RequestKind::Noq(ref request) => request.url(),
647 #[cfg(feature = "quinn")]
648 RequestKind::Quinn(ref request) => request.url(),
649 #[cfg(feature = "quiche")]
650 RequestKind::Quiche(ref request) => request.url(),
651 #[cfg(feature = "iroh")]
652 RequestKind::Iroh(ref request) => request.url(),
653 #[cfg(feature = "websocket")]
654 RequestKind::WebSocket(_) => None,
655 }
656 }
657
658 pub fn peer_identity(&self) -> anyhow::Result<Option<PeerIdentity>> {
664 match self.kind {
665 #[cfg(feature = "quinn")]
666 RequestKind::Quinn(ref request) => request.peer_identity(),
667 #[cfg(feature = "noq")]
668 RequestKind::Noq(_) => Ok(None),
669 #[cfg(feature = "quiche")]
670 RequestKind::Quiche(_) => Ok(None),
671 #[cfg(feature = "iroh")]
672 RequestKind::Iroh(_) => Ok(None),
673 #[cfg(feature = "websocket")]
674 RequestKind::WebSocket(_) => Ok(None),
675 #[cfg(not(any(
676 feature = "noq",
677 feature = "quinn",
678 feature = "quiche",
679 feature = "iroh",
680 feature = "websocket"
681 )))]
682 _ => Ok(None),
683 }
684 }
685}
686
687#[derive(Debug)]
689pub struct ServerTlsInfo {
690 #[cfg(any(feature = "noq", feature = "quinn"))]
691 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
692 pub fingerprints: Vec<String>,
693}
694
695#[serde_with::serde_as]
697#[derive(Clone, serde::Serialize, serde::Deserialize)]
698pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
699
700impl ServerId {
701 #[allow(dead_code)]
702 pub(crate) fn len(&self) -> usize {
703 self.0.len()
704 }
705}
706
707impl std::fmt::Debug for ServerId {
708 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
709 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
710 }
711}
712
713impl std::str::FromStr for ServerId {
714 type Err = hex::FromHexError;
715
716 fn from_str(s: &str) -> Result<Self, Self::Err> {
717 hex::decode(s).map(Self)
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724
725 #[test]
726 fn test_tls_string_or_array() {
727 let single = r#"
729 cert = "cert.pem"
730 key = "key.pem"
731 "#;
732 let config: ServerTlsConfig = toml::from_str(single).unwrap();
733 assert_eq!(config.cert, vec![PathBuf::from("cert.pem")]);
734 assert_eq!(config.key, vec![PathBuf::from("key.pem")]);
735
736 let array = r#"
738 cert = ["a.pem", "b.pem"]
739 key = ["a.key", "b.key"]
740 generate = ["localhost"]
741 root = ["ca.pem"]
742 "#;
743 let config: ServerTlsConfig = toml::from_str(array).unwrap();
744 assert_eq!(config.cert, vec![PathBuf::from("a.pem"), PathBuf::from("b.pem")]);
745 assert_eq!(config.key, vec![PathBuf::from("a.key"), PathBuf::from("b.key")]);
746 assert_eq!(config.generate, vec!["localhost".to_string()]);
747 assert_eq!(config.root, vec![PathBuf::from("ca.pem")]);
748 }
749}