1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_net::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[serde_with::serde_as]
25#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
26#[serde(deny_unknown_fields)]
27#[non_exhaustive]
28pub struct ServerTlsConfig {
29 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
31 #[serde(default, skip_serializing_if = "Vec::is_empty")]
32 #[serde_as(as = "serde_with::OneOrMany<_>")]
33 pub cert: Vec<PathBuf>,
34
35 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
37 #[serde(default, skip_serializing_if = "Vec::is_empty")]
38 #[serde_as(as = "serde_with::OneOrMany<_>")]
39 pub key: Vec<PathBuf>,
40
41 #[arg(
44 long = "tls-generate",
45 id = "tls-generate",
46 value_delimiter = ',',
47 env = "MOQ_SERVER_TLS_GENERATE"
48 )]
49 #[serde(default, skip_serializing_if = "Vec::is_empty")]
50 #[serde_as(as = "serde_with::OneOrMany<_>")]
51 pub generate: Vec<String>,
52
53 #[arg(
62 long = "server-tls-root",
63 id = "server-tls-root",
64 value_delimiter = ',',
65 env = "MOQ_SERVER_TLS_ROOT"
66 )]
67 #[serde(default, skip_serializing_if = "Vec::is_empty")]
68 #[serde_as(as = "serde_with::OneOrMany<_>")]
69 pub root: Vec<PathBuf>,
70}
71
72impl ServerTlsConfig {
73 pub fn load_roots(&self) -> anyhow::Result<rustls::RootCertStore> {
75 use rustls::pki_types::CertificateDer;
76 use rustls::pki_types::pem::PemObject;
77
78 let mut roots = rustls::RootCertStore::empty();
79 for path in &self.root {
80 let file = std::fs::File::open(path).context("failed to open root CA")?;
81 let mut reader = std::io::BufReader::new(file);
82 let certs: Vec<CertificateDer<'static>> = CertificateDer::pem_reader_iter(&mut reader)
83 .collect::<Result<_, _>>()
84 .context("failed to parse root CA PEM")?;
85 anyhow::ensure!(!certs.is_empty(), "no certificates found in root CA");
86 for cert in certs {
87 roots.add(cert).context("failed to add root CA")?;
88 }
89 }
90 Ok(roots)
91 }
92}
93
94#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
96#[serde(deny_unknown_fields, default)]
97#[non_exhaustive]
98pub struct ServerConfig {
99 #[serde(alias = "listen")]
107 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
108 pub bind: Option<String>,
109
110 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
113 pub backend: Option<QuicBackend>,
114
115 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
118 #[serde(default, skip_serializing_if = "Option::is_none")]
119 pub quic_lb_id: Option<ServerId>,
120
121 #[arg(
124 id = "server-quic-lb-nonce",
125 long = "server-quic-lb-nonce",
126 requires = "server-quic-lb-id",
127 env = "MOQ_SERVER_QUIC_LB_NONCE"
128 )]
129 #[serde(default, skip_serializing_if = "Option::is_none")]
130 pub quic_lb_nonce: Option<usize>,
131
132 #[arg(
140 id = "server-preferred-v4",
141 long = "server-preferred-v4",
142 env = "MOQ_SERVER_PREFERRED_V4"
143 )]
144 #[serde(default, skip_serializing_if = "Option::is_none")]
145 pub preferred_v4: Option<net::SocketAddrV4>,
146
147 #[arg(
151 id = "server-preferred-v6",
152 long = "server-preferred-v6",
153 env = "MOQ_SERVER_PREFERRED_V6"
154 )]
155 #[serde(default, skip_serializing_if = "Option::is_none")]
156 pub preferred_v6: Option<net::SocketAddrV6>,
157
158 #[serde(skip_serializing_if = "Option::is_none")]
160 #[arg(
161 id = "server-max-streams",
162 long = "server-max-streams",
163 env = "MOQ_SERVER_MAX_STREAMS"
164 )]
165 pub max_streams: Option<u64>,
166
167 #[serde(default, skip_serializing_if = "Vec::is_empty")]
175 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
176 pub version: Vec<moq_net::Version>,
177
178 #[command(flatten)]
179 #[serde(default)]
180 pub tls: ServerTlsConfig,
181}
182
183impl ServerConfig {
184 pub fn init(self) -> anyhow::Result<Server> {
185 Server::new(self)
186 }
187
188 pub fn versions(&self) -> moq_net::Versions {
190 if self.version.is_empty() {
191 moq_net::Versions::all()
192 } else {
193 moq_net::Versions::from(self.version.clone())
194 }
195 }
196}
197
198pub(crate) const DEFAULT_BIND: &str = "[::]:443";
200
201pub struct Server {
205 moq: moq_net::Server,
206 versions: moq_net::Versions,
207 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
208 #[cfg(feature = "iroh")]
209 iroh: Option<iroh::Endpoint>,
210 #[cfg(feature = "noq")]
211 noq: Option<crate::noq::NoqServer>,
212 #[cfg(feature = "quinn")]
213 quinn: Option<crate::quinn::QuinnServer>,
214 #[cfg(feature = "quiche")]
215 quiche: Option<crate::quiche::QuicheServer>,
216 #[cfg(feature = "websocket")]
217 websocket: Option<crate::websocket::WebSocketListener>,
218}
219
220impl Server {
221 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
222 let backend = config.backend.clone().unwrap_or({
223 #[cfg(feature = "quinn")]
224 {
225 QuicBackend::Quinn
226 }
227 #[cfg(all(feature = "noq", not(feature = "quinn")))]
228 {
229 QuicBackend::Noq
230 }
231 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
232 {
233 QuicBackend::Quiche
234 }
235 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
236 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
237 });
238
239 let versions = config.versions();
240
241 if !config.tls.root.is_empty() {
242 #[cfg(feature = "quinn")]
243 let quinn_backend = matches!(backend, QuicBackend::Quinn);
244 #[cfg(not(feature = "quinn"))]
245 let quinn_backend = false;
246 anyhow::ensure!(quinn_backend, "tls.root (mTLS) is only supported by the quinn backend");
247 }
248
249 #[cfg(feature = "noq")]
250 #[allow(unreachable_patterns)]
251 let noq = match backend {
252 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
253 _ => None,
254 };
255
256 #[cfg(feature = "quinn")]
257 #[allow(unreachable_patterns)]
258 let quinn = match backend {
259 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
260 _ => None,
261 };
262
263 #[cfg(feature = "quiche")]
264 let quiche = match backend {
265 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
266 _ => None,
267 };
268
269 Ok(Server {
270 accept: Default::default(),
271 moq: moq_net::Server::new().with_versions(versions.clone()),
272 versions,
273 #[cfg(feature = "iroh")]
274 iroh: None,
275 #[cfg(feature = "noq")]
276 noq,
277 #[cfg(feature = "quinn")]
278 quinn,
279 #[cfg(feature = "quiche")]
280 quiche,
281 #[cfg(feature = "websocket")]
282 websocket: None,
283 })
284 }
285
286 #[cfg(feature = "websocket")]
292 pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
293 self.websocket = websocket;
294 self
295 }
296
297 #[cfg(feature = "iroh")]
298 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
299 self.iroh = iroh;
300 self
301 }
302
303 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
304 self.moq = self.moq.with_publish(publish);
305 self
306 }
307
308 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
309 self.moq = self.moq.with_consume(consume);
310 self
311 }
312
313 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
315 self.moq = self.moq.with_stats(stats);
316 self
317 }
318
319 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
321 #[cfg(feature = "noq")]
322 if let Some(noq) = self.noq.as_ref() {
323 return noq.tls_info();
324 }
325 #[cfg(feature = "quinn")]
326 if let Some(quinn) = self.quinn.as_ref() {
327 return quinn.tls_info();
328 }
329 #[cfg(feature = "quiche")]
330 if let Some(quiche) = self.quiche.as_ref() {
331 return quiche.tls_info();
332 }
333 unreachable!("no QUIC backend compiled");
334 }
335
336 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
337 pub async fn accept(&mut self) -> Option<Request> {
338 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
339 }
340
341 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
349 pub async fn accept(&mut self) -> Option<Request> {
350 loop {
351 #[cfg(feature = "noq")]
353 let noq_accept = async {
354 #[cfg(feature = "noq")]
355 if let Some(noq) = self.noq.as_mut() {
356 return noq.accept().await;
357 }
358 None
359 };
360 #[cfg(not(feature = "noq"))]
361 let noq_accept = async { None::<()> };
362
363 #[cfg(feature = "iroh")]
364 let iroh_accept = async {
365 #[cfg(feature = "iroh")]
366 if let Some(endpoint) = self.iroh.as_mut() {
367 return endpoint.accept().await;
368 }
369 None
370 };
371 #[cfg(not(feature = "iroh"))]
372 let iroh_accept = async { None::<()> };
373
374 #[cfg(feature = "quinn")]
375 let quinn_accept = async {
376 #[cfg(feature = "quinn")]
377 if let Some(quinn) = self.quinn.as_mut() {
378 return quinn.accept().await;
379 }
380 None
381 };
382 #[cfg(not(feature = "quinn"))]
383 let quinn_accept = async { None::<()> };
384
385 #[cfg(feature = "quiche")]
386 let quiche_accept = async {
387 #[cfg(feature = "quiche")]
388 if let Some(quiche) = self.quiche.as_mut() {
389 return quiche.accept().await;
390 }
391 None
392 };
393 #[cfg(not(feature = "quiche"))]
394 let quiche_accept = async { None::<()> };
395
396 #[cfg(feature = "websocket")]
397 let ws_ref = self.websocket.as_ref();
398 #[cfg(feature = "websocket")]
399 let ws_accept = async {
400 match ws_ref {
401 Some(ws) => ws.accept().await,
402 None => std::future::pending().await,
403 }
404 };
405 #[cfg(not(feature = "websocket"))]
406 let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
407
408 let server = self.moq.clone();
409 let versions = self.versions.clone();
410
411 tokio::select! {
412 Some(_conn) = noq_accept => {
413 #[cfg(feature = "noq")]
414 {
415 let alpns = versions.alpns();
416 self.accept.push(async move {
417 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
418 Ok(Request {
419 server,
420 kind: RequestKind::Noq(noq),
421 })
422 }.boxed());
423 }
424 }
425 Some(_conn) = quinn_accept => {
426 #[cfg(feature = "quinn")]
427 {
428 let alpns = versions.alpns();
429 self.accept.push(async move {
430 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
431 Ok(Request {
432 server,
433 kind: RequestKind::Quinn(Box::new(quinn)),
434 })
435 }.boxed());
436 }
437 }
438 Some(_conn) = quiche_accept => {
439 #[cfg(feature = "quiche")]
440 {
441 let alpns = versions.alpns();
442 self.accept.push(async move {
443 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
444 Ok(Request {
445 server,
446 kind: RequestKind::Quiche(quiche),
447 })
448 }.boxed());
449 }
450 }
451 Some(_conn) = iroh_accept => {
452 #[cfg(feature = "iroh")]
453 self.accept.push(async move {
454 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
455 Ok(Request {
456 server,
457 kind: RequestKind::Iroh(iroh),
458 })
459 }.boxed());
460 }
461 Some(_res) = ws_accept => {
462 #[cfg(feature = "websocket")]
463 match _res {
464 Ok(session) => {
465 return Some(Request {
466 server,
467 kind: RequestKind::WebSocket(session),
468 });
469 }
470 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
471 }
472 }
473 Some(res) = self.accept.next() => {
474 match res {
475 Ok(session) => return Some(session),
476 Err(err) => tracing::debug!(%err, "failed to accept session"),
477 }
478 }
479 _ = tokio::signal::ctrl_c() => {
480 self.close().await;
481 return None;
482 }
483 }
484 }
485 }
486
487 #[cfg(feature = "iroh")]
488 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
489 self.iroh.as_ref()
490 }
491
492 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
493 #[cfg(feature = "noq")]
494 if let Some(noq) = self.noq.as_ref() {
495 return noq.local_addr();
496 }
497 #[cfg(feature = "quinn")]
498 if let Some(quinn) = self.quinn.as_ref() {
499 return quinn.local_addr();
500 }
501 #[cfg(feature = "quiche")]
502 if let Some(quiche) = self.quiche.as_ref() {
503 return quiche.local_addr();
504 }
505 unreachable!("no QUIC backend compiled");
506 }
507
508 #[cfg(feature = "websocket")]
509 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
510 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
511 }
512
513 pub async fn close(&mut self) {
514 #[cfg(feature = "noq")]
515 if let Some(noq) = self.noq.as_mut() {
516 noq.close();
517 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
518 }
519 #[cfg(feature = "quinn")]
520 if let Some(quinn) = self.quinn.as_mut() {
521 quinn.close();
522 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
523 }
524 #[cfg(feature = "quiche")]
525 if let Some(quiche) = self.quiche.as_mut() {
526 quiche.close();
527 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
528 }
529 #[cfg(feature = "iroh")]
530 if let Some(iroh) = self.iroh.take() {
531 iroh.close().await;
532 }
533 #[cfg(feature = "websocket")]
534 {
535 let _ = self.websocket.take();
536 }
537 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
538 unreachable!("no QUIC backend compiled");
539 }
540}
541
542pub(crate) enum RequestKind {
544 #[cfg(feature = "noq")]
545 Noq(crate::noq::NoqRequest),
546 #[cfg(feature = "quinn")]
547 Quinn(Box<crate::quinn::QuinnRequest>),
548 #[cfg(feature = "quiche")]
549 Quiche(crate::quiche::QuicheRequest),
550 #[cfg(feature = "iroh")]
551 Iroh(crate::iroh::IrohRequest),
552 #[cfg(feature = "websocket")]
553 WebSocket(qmux::Session),
554}
555
556pub struct Request {
561 server: moq_net::Server,
562 kind: RequestKind,
563}
564
565impl Request {
566 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
568 match self.kind {
569 #[cfg(feature = "noq")]
570 RequestKind::Noq(request) => {
571 let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
572 request.close(status).await?;
573 Ok(())
574 }
575 #[cfg(feature = "quinn")]
576 RequestKind::Quinn(request) => {
577 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
578 request.close(status).await?;
579 Ok(())
580 }
581 #[cfg(feature = "quiche")]
582 RequestKind::Quiche(request) => {
583 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
584 request
585 .reject(status)
586 .await
587 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
588 Ok(())
589 }
590 #[cfg(feature = "iroh")]
591 RequestKind::Iroh(request) => {
592 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
593 request.close(status).await?;
594 Ok(())
595 }
596 #[cfg(feature = "websocket")]
597 RequestKind::WebSocket(_session) => {
598 Ok(())
600 }
601 }
602 }
603
604 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
606 self.server = self.server.with_publish(publish);
607 self
608 }
609
610 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
612 self.server = self.server.with_consume(consume);
613 self
614 }
615
616 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
618 self.server = self.server.with_stats(stats);
619 self
620 }
621
622 pub async fn ok(self) -> anyhow::Result<Session> {
624 match self.kind {
625 #[cfg(feature = "noq")]
626 RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
627 #[cfg(feature = "quinn")]
628 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
629 #[cfg(feature = "quiche")]
630 RequestKind::Quiche(request) => {
631 let conn = request
632 .ok()
633 .await
634 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
635 Ok(self.server.accept(conn).await?)
636 }
637 #[cfg(feature = "iroh")]
638 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
639 #[cfg(feature = "websocket")]
640 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
641 }
642 }
643
644 pub fn transport(&self) -> &'static str {
646 match self.kind {
647 #[cfg(feature = "noq")]
648 RequestKind::Noq(_) => "quic",
649 #[cfg(feature = "quinn")]
650 RequestKind::Quinn(_) => "quic",
651 #[cfg(feature = "quiche")]
652 RequestKind::Quiche(_) => "quic",
653 #[cfg(feature = "iroh")]
654 RequestKind::Iroh(_) => "iroh",
655 #[cfg(feature = "websocket")]
656 RequestKind::WebSocket(_) => "websocket",
657 }
658 }
659
660 pub fn url(&self) -> Option<&Url> {
662 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
663 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
664
665 match self.kind {
666 #[cfg(feature = "noq")]
667 RequestKind::Noq(ref request) => request.url(),
668 #[cfg(feature = "quinn")]
669 RequestKind::Quinn(ref request) => request.url(),
670 #[cfg(feature = "quiche")]
671 RequestKind::Quiche(ref request) => request.url(),
672 #[cfg(feature = "iroh")]
673 RequestKind::Iroh(ref request) => request.url(),
674 #[cfg(feature = "websocket")]
675 RequestKind::WebSocket(_) => None,
676 }
677 }
678
679 pub fn has_peer_certificate(&self) -> bool {
684 match self.kind {
685 #[cfg(feature = "quinn")]
686 RequestKind::Quinn(ref request) => request.has_peer_certificate(),
687 #[cfg(feature = "noq")]
688 RequestKind::Noq(_) => false,
689 #[cfg(feature = "quiche")]
690 RequestKind::Quiche(_) => false,
691 #[cfg(feature = "iroh")]
692 RequestKind::Iroh(_) => false,
693 #[cfg(feature = "websocket")]
694 RequestKind::WebSocket(_) => false,
695 #[cfg(not(any(
696 feature = "noq",
697 feature = "quinn",
698 feature = "quiche",
699 feature = "iroh",
700 feature = "websocket"
701 )))]
702 _ => false,
703 }
704 }
705}
706
707#[derive(Debug)]
709pub struct ServerTlsInfo {
710 #[cfg(any(feature = "noq", feature = "quinn"))]
711 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
712 pub fingerprints: Vec<String>,
713}
714
715#[serde_with::serde_as]
717#[derive(Clone, serde::Serialize, serde::Deserialize)]
718pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
719
720impl ServerId {
721 #[allow(dead_code)]
722 pub(crate) fn len(&self) -> usize {
723 self.0.len()
724 }
725}
726
727impl std::fmt::Debug for ServerId {
728 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
730 }
731}
732
733impl std::str::FromStr for ServerId {
734 type Err = hex::FromHexError;
735
736 fn from_str(s: &str) -> Result<Self, Self::Err> {
737 hex::decode(s).map(Self)
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744
745 #[test]
746 fn test_tls_string_or_array() {
747 let single = r#"
749 cert = "cert.pem"
750 key = "key.pem"
751 "#;
752 let config: ServerTlsConfig = toml::from_str(single).unwrap();
753 assert_eq!(config.cert, vec![PathBuf::from("cert.pem")]);
754 assert_eq!(config.key, vec![PathBuf::from("key.pem")]);
755
756 let array = r#"
758 cert = ["a.pem", "b.pem"]
759 key = ["a.key", "b.key"]
760 generate = ["localhost"]
761 root = ["ca.pem"]
762 "#;
763 let config: ServerTlsConfig = toml::from_str(array).unwrap();
764 assert_eq!(config.cert, vec![PathBuf::from("a.pem"), PathBuf::from("b.pem")]);
765 assert_eq!(config.key, vec![PathBuf::from("a.key"), PathBuf::from("b.key")]);
766 assert_eq!(config.generate, vec!["localhost".to_string()]);
767 assert_eq!(config.root, vec![PathBuf::from("ca.pem")]);
768 }
769}