1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_net::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[serde_with::serde_as]
25#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
26#[serde(deny_unknown_fields)]
27#[non_exhaustive]
28pub struct ServerTlsConfig {
29 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
31 #[serde(default, skip_serializing_if = "Vec::is_empty")]
32 #[serde_as(as = "serde_with::OneOrMany<_>")]
33 pub cert: Vec<PathBuf>,
34
35 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
37 #[serde(default, skip_serializing_if = "Vec::is_empty")]
38 #[serde_as(as = "serde_with::OneOrMany<_>")]
39 pub key: Vec<PathBuf>,
40
41 #[arg(
44 long = "tls-generate",
45 id = "tls-generate",
46 value_delimiter = ',',
47 env = "MOQ_SERVER_TLS_GENERATE"
48 )]
49 #[serde(default, skip_serializing_if = "Vec::is_empty")]
50 #[serde_as(as = "serde_with::OneOrMany<_>")]
51 pub generate: Vec<String>,
52
53 #[arg(
62 long = "server-tls-root",
63 id = "server-tls-root",
64 value_delimiter = ',',
65 env = "MOQ_SERVER_TLS_ROOT"
66 )]
67 #[serde(default, skip_serializing_if = "Vec::is_empty")]
68 #[serde_as(as = "serde_with::OneOrMany<_>")]
69 pub root: Vec<PathBuf>,
70}
71
72impl ServerTlsConfig {
73 pub fn load_roots(&self) -> anyhow::Result<rustls::RootCertStore> {
75 use rustls::pki_types::CertificateDer;
76 use rustls::pki_types::pem::PemObject;
77
78 let mut roots = rustls::RootCertStore::empty();
79 for path in &self.root {
80 let file = std::fs::File::open(path).context("failed to open root CA")?;
81 let mut reader = std::io::BufReader::new(file);
82 let certs: Vec<CertificateDer<'static>> = CertificateDer::pem_reader_iter(&mut reader)
83 .collect::<Result<_, _>>()
84 .context("failed to parse root CA PEM")?;
85 anyhow::ensure!(!certs.is_empty(), "no certificates found in root CA");
86 for cert in certs {
87 roots.add(cert).context("failed to add root CA")?;
88 }
89 }
90 Ok(roots)
91 }
92}
93
94#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
96#[serde(deny_unknown_fields, default)]
97#[non_exhaustive]
98pub struct ServerConfig {
99 #[serde(alias = "listen")]
107 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
108 pub bind: Option<String>,
109
110 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
113 pub backend: Option<QuicBackend>,
114
115 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
118 #[serde(default, skip_serializing_if = "Option::is_none")]
119 pub quic_lb_id: Option<ServerId>,
120
121 #[arg(
124 id = "server-quic-lb-nonce",
125 long = "server-quic-lb-nonce",
126 requires = "server-quic-lb-id",
127 env = "MOQ_SERVER_QUIC_LB_NONCE"
128 )]
129 #[serde(default, skip_serializing_if = "Option::is_none")]
130 pub quic_lb_nonce: Option<usize>,
131
132 #[serde(skip_serializing_if = "Option::is_none")]
134 #[arg(
135 id = "server-max-streams",
136 long = "server-max-streams",
137 env = "MOQ_SERVER_MAX_STREAMS"
138 )]
139 pub max_streams: Option<u64>,
140
141 #[serde(default, skip_serializing_if = "Vec::is_empty")]
149 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
150 pub version: Vec<moq_net::Version>,
151
152 #[command(flatten)]
153 #[serde(default)]
154 pub tls: ServerTlsConfig,
155}
156
157impl ServerConfig {
158 pub fn init(self) -> anyhow::Result<Server> {
159 Server::new(self)
160 }
161
162 pub fn versions(&self) -> moq_net::Versions {
164 if self.version.is_empty() {
165 moq_net::Versions::all()
166 } else {
167 moq_net::Versions::from(self.version.clone())
168 }
169 }
170}
171
172pub(crate) const DEFAULT_BIND: &str = "[::]:443";
174
175pub struct Server {
179 moq: moq_net::Server,
180 versions: moq_net::Versions,
181 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
182 #[cfg(feature = "iroh")]
183 iroh: Option<iroh::Endpoint>,
184 #[cfg(feature = "noq")]
185 noq: Option<crate::noq::NoqServer>,
186 #[cfg(feature = "quinn")]
187 quinn: Option<crate::quinn::QuinnServer>,
188 #[cfg(feature = "quiche")]
189 quiche: Option<crate::quiche::QuicheServer>,
190 #[cfg(feature = "websocket")]
191 websocket: Option<crate::websocket::WebSocketListener>,
192}
193
194impl Server {
195 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
196 let backend = config.backend.clone().unwrap_or({
197 #[cfg(feature = "quinn")]
198 {
199 QuicBackend::Quinn
200 }
201 #[cfg(all(feature = "noq", not(feature = "quinn")))]
202 {
203 QuicBackend::Noq
204 }
205 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
206 {
207 QuicBackend::Quiche
208 }
209 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
210 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
211 });
212
213 let versions = config.versions();
214
215 if !config.tls.root.is_empty() {
216 #[cfg(feature = "quinn")]
217 let quinn_backend = matches!(backend, QuicBackend::Quinn);
218 #[cfg(not(feature = "quinn"))]
219 let quinn_backend = false;
220 anyhow::ensure!(quinn_backend, "tls.root (mTLS) is only supported by the quinn backend");
221 }
222
223 #[cfg(feature = "noq")]
224 #[allow(unreachable_patterns)]
225 let noq = match backend {
226 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
227 _ => None,
228 };
229
230 #[cfg(feature = "quinn")]
231 #[allow(unreachable_patterns)]
232 let quinn = match backend {
233 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
234 _ => None,
235 };
236
237 #[cfg(feature = "quiche")]
238 let quiche = match backend {
239 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
240 _ => None,
241 };
242
243 Ok(Server {
244 accept: Default::default(),
245 moq: moq_net::Server::new().with_versions(versions.clone()),
246 versions,
247 #[cfg(feature = "iroh")]
248 iroh: None,
249 #[cfg(feature = "noq")]
250 noq,
251 #[cfg(feature = "quinn")]
252 quinn,
253 #[cfg(feature = "quiche")]
254 quiche,
255 #[cfg(feature = "websocket")]
256 websocket: None,
257 })
258 }
259
260 #[cfg(feature = "websocket")]
266 pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
267 self.websocket = websocket;
268 self
269 }
270
271 #[cfg(feature = "iroh")]
272 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
273 self.iroh = iroh;
274 self
275 }
276
277 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
278 self.moq = self.moq.with_publish(publish);
279 self
280 }
281
282 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
283 self.moq = self.moq.with_consume(consume);
284 self
285 }
286
287 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
289 self.moq = self.moq.with_stats(stats);
290 self
291 }
292
293 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
295 #[cfg(feature = "noq")]
296 if let Some(noq) = self.noq.as_ref() {
297 return noq.tls_info();
298 }
299 #[cfg(feature = "quinn")]
300 if let Some(quinn) = self.quinn.as_ref() {
301 return quinn.tls_info();
302 }
303 #[cfg(feature = "quiche")]
304 if let Some(quiche) = self.quiche.as_ref() {
305 return quiche.tls_info();
306 }
307 unreachable!("no QUIC backend compiled");
308 }
309
310 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
311 pub async fn accept(&mut self) -> Option<Request> {
312 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
313 }
314
315 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
323 pub async fn accept(&mut self) -> Option<Request> {
324 loop {
325 #[cfg(feature = "noq")]
327 let noq_accept = async {
328 #[cfg(feature = "noq")]
329 if let Some(noq) = self.noq.as_mut() {
330 return noq.accept().await;
331 }
332 None
333 };
334 #[cfg(not(feature = "noq"))]
335 let noq_accept = async { None::<()> };
336
337 #[cfg(feature = "iroh")]
338 let iroh_accept = async {
339 #[cfg(feature = "iroh")]
340 if let Some(endpoint) = self.iroh.as_mut() {
341 return endpoint.accept().await;
342 }
343 None
344 };
345 #[cfg(not(feature = "iroh"))]
346 let iroh_accept = async { None::<()> };
347
348 #[cfg(feature = "quinn")]
349 let quinn_accept = async {
350 #[cfg(feature = "quinn")]
351 if let Some(quinn) = self.quinn.as_mut() {
352 return quinn.accept().await;
353 }
354 None
355 };
356 #[cfg(not(feature = "quinn"))]
357 let quinn_accept = async { None::<()> };
358
359 #[cfg(feature = "quiche")]
360 let quiche_accept = async {
361 #[cfg(feature = "quiche")]
362 if let Some(quiche) = self.quiche.as_mut() {
363 return quiche.accept().await;
364 }
365 None
366 };
367 #[cfg(not(feature = "quiche"))]
368 let quiche_accept = async { None::<()> };
369
370 #[cfg(feature = "websocket")]
371 let ws_ref = self.websocket.as_ref();
372 #[cfg(feature = "websocket")]
373 let ws_accept = async {
374 match ws_ref {
375 Some(ws) => ws.accept().await,
376 None => std::future::pending().await,
377 }
378 };
379 #[cfg(not(feature = "websocket"))]
380 let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
381
382 let server = self.moq.clone();
383 let versions = self.versions.clone();
384
385 tokio::select! {
386 Some(_conn) = noq_accept => {
387 #[cfg(feature = "noq")]
388 {
389 let alpns = versions.alpns();
390 self.accept.push(async move {
391 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
392 Ok(Request {
393 server,
394 kind: RequestKind::Noq(noq),
395 })
396 }.boxed());
397 }
398 }
399 Some(_conn) = quinn_accept => {
400 #[cfg(feature = "quinn")]
401 {
402 let alpns = versions.alpns();
403 self.accept.push(async move {
404 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
405 Ok(Request {
406 server,
407 kind: RequestKind::Quinn(Box::new(quinn)),
408 })
409 }.boxed());
410 }
411 }
412 Some(_conn) = quiche_accept => {
413 #[cfg(feature = "quiche")]
414 {
415 let alpns = versions.alpns();
416 self.accept.push(async move {
417 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
418 Ok(Request {
419 server,
420 kind: RequestKind::Quiche(quiche),
421 })
422 }.boxed());
423 }
424 }
425 Some(_conn) = iroh_accept => {
426 #[cfg(feature = "iroh")]
427 self.accept.push(async move {
428 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
429 Ok(Request {
430 server,
431 kind: RequestKind::Iroh(iroh),
432 })
433 }.boxed());
434 }
435 Some(_res) = ws_accept => {
436 #[cfg(feature = "websocket")]
437 match _res {
438 Ok(session) => {
439 return Some(Request {
440 server,
441 kind: RequestKind::WebSocket(session),
442 });
443 }
444 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
445 }
446 }
447 Some(res) = self.accept.next() => {
448 match res {
449 Ok(session) => return Some(session),
450 Err(err) => tracing::debug!(%err, "failed to accept session"),
451 }
452 }
453 _ = tokio::signal::ctrl_c() => {
454 self.close().await;
455 return None;
456 }
457 }
458 }
459 }
460
461 #[cfg(feature = "iroh")]
462 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
463 self.iroh.as_ref()
464 }
465
466 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
467 #[cfg(feature = "noq")]
468 if let Some(noq) = self.noq.as_ref() {
469 return noq.local_addr();
470 }
471 #[cfg(feature = "quinn")]
472 if let Some(quinn) = self.quinn.as_ref() {
473 return quinn.local_addr();
474 }
475 #[cfg(feature = "quiche")]
476 if let Some(quiche) = self.quiche.as_ref() {
477 return quiche.local_addr();
478 }
479 unreachable!("no QUIC backend compiled");
480 }
481
482 #[cfg(feature = "websocket")]
483 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
484 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
485 }
486
487 pub async fn close(&mut self) {
488 #[cfg(feature = "noq")]
489 if let Some(noq) = self.noq.as_mut() {
490 noq.close();
491 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
492 }
493 #[cfg(feature = "quinn")]
494 if let Some(quinn) = self.quinn.as_mut() {
495 quinn.close();
496 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
497 }
498 #[cfg(feature = "quiche")]
499 if let Some(quiche) = self.quiche.as_mut() {
500 quiche.close();
501 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
502 }
503 #[cfg(feature = "iroh")]
504 if let Some(iroh) = self.iroh.take() {
505 iroh.close().await;
506 }
507 #[cfg(feature = "websocket")]
508 {
509 let _ = self.websocket.take();
510 }
511 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
512 unreachable!("no QUIC backend compiled");
513 }
514}
515
516#[derive(Clone, Debug, Default)]
519#[non_exhaustive]
520pub struct PeerIdentity {}
521
522pub(crate) enum RequestKind {
524 #[cfg(feature = "noq")]
525 Noq(crate::noq::NoqRequest),
526 #[cfg(feature = "quinn")]
527 Quinn(Box<crate::quinn::QuinnRequest>),
528 #[cfg(feature = "quiche")]
529 Quiche(crate::quiche::QuicheRequest),
530 #[cfg(feature = "iroh")]
531 Iroh(crate::iroh::IrohRequest),
532 #[cfg(feature = "websocket")]
533 WebSocket(qmux::Session),
534}
535
536pub struct Request {
541 server: moq_net::Server,
542 kind: RequestKind,
543}
544
545impl Request {
546 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
548 match self.kind {
549 #[cfg(feature = "noq")]
550 RequestKind::Noq(request) => {
551 let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
552 request.close(status).await?;
553 Ok(())
554 }
555 #[cfg(feature = "quinn")]
556 RequestKind::Quinn(request) => {
557 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
558 request.close(status).await?;
559 Ok(())
560 }
561 #[cfg(feature = "quiche")]
562 RequestKind::Quiche(request) => {
563 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
564 request
565 .reject(status)
566 .await
567 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
568 Ok(())
569 }
570 #[cfg(feature = "iroh")]
571 RequestKind::Iroh(request) => {
572 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
573 request.close(status).await?;
574 Ok(())
575 }
576 #[cfg(feature = "websocket")]
577 RequestKind::WebSocket(_session) => {
578 Ok(())
580 }
581 }
582 }
583
584 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
586 self.server = self.server.with_publish(publish);
587 self
588 }
589
590 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
592 self.server = self.server.with_consume(consume);
593 self
594 }
595
596 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
598 self.server = self.server.with_stats(stats);
599 self
600 }
601
602 pub async fn ok(self) -> anyhow::Result<Session> {
604 match self.kind {
605 #[cfg(feature = "noq")]
606 RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
607 #[cfg(feature = "quinn")]
608 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
609 #[cfg(feature = "quiche")]
610 RequestKind::Quiche(request) => {
611 let conn = request
612 .ok()
613 .await
614 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
615 Ok(self.server.accept(conn).await?)
616 }
617 #[cfg(feature = "iroh")]
618 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
619 #[cfg(feature = "websocket")]
620 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
621 }
622 }
623
624 pub fn transport(&self) -> &'static str {
626 match self.kind {
627 #[cfg(feature = "noq")]
628 RequestKind::Noq(_) => "quic",
629 #[cfg(feature = "quinn")]
630 RequestKind::Quinn(_) => "quic",
631 #[cfg(feature = "quiche")]
632 RequestKind::Quiche(_) => "quic",
633 #[cfg(feature = "iroh")]
634 RequestKind::Iroh(_) => "iroh",
635 #[cfg(feature = "websocket")]
636 RequestKind::WebSocket(_) => "websocket",
637 }
638 }
639
640 pub fn url(&self) -> Option<&Url> {
642 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
643 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
644
645 match self.kind {
646 #[cfg(feature = "noq")]
647 RequestKind::Noq(ref request) => request.url(),
648 #[cfg(feature = "quinn")]
649 RequestKind::Quinn(ref request) => request.url(),
650 #[cfg(feature = "quiche")]
651 RequestKind::Quiche(ref request) => request.url(),
652 #[cfg(feature = "iroh")]
653 RequestKind::Iroh(ref request) => request.url(),
654 #[cfg(feature = "websocket")]
655 RequestKind::WebSocket(_) => None,
656 }
657 }
658
659 pub fn peer_identity(&self) -> anyhow::Result<Option<PeerIdentity>> {
665 match self.kind {
666 #[cfg(feature = "quinn")]
667 RequestKind::Quinn(ref request) => request.peer_identity(),
668 #[cfg(feature = "noq")]
669 RequestKind::Noq(_) => Ok(None),
670 #[cfg(feature = "quiche")]
671 RequestKind::Quiche(_) => Ok(None),
672 #[cfg(feature = "iroh")]
673 RequestKind::Iroh(_) => Ok(None),
674 #[cfg(feature = "websocket")]
675 RequestKind::WebSocket(_) => Ok(None),
676 #[cfg(not(any(
677 feature = "noq",
678 feature = "quinn",
679 feature = "quiche",
680 feature = "iroh",
681 feature = "websocket"
682 )))]
683 _ => Ok(None),
684 }
685 }
686}
687
688#[derive(Debug)]
690pub struct ServerTlsInfo {
691 #[cfg(any(feature = "noq", feature = "quinn"))]
692 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
693 pub fingerprints: Vec<String>,
694}
695
696#[serde_with::serde_as]
698#[derive(Clone, serde::Serialize, serde::Deserialize)]
699pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
700
701impl ServerId {
702 #[allow(dead_code)]
703 pub(crate) fn len(&self) -> usize {
704 self.0.len()
705 }
706}
707
708impl std::fmt::Debug for ServerId {
709 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
711 }
712}
713
714impl std::str::FromStr for ServerId {
715 type Err = hex::FromHexError;
716
717 fn from_str(s: &str) -> Result<Self, Self::Err> {
718 hex::decode(s).map(Self)
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725
726 #[test]
727 fn test_tls_string_or_array() {
728 let single = r#"
730 cert = "cert.pem"
731 key = "key.pem"
732 "#;
733 let config: ServerTlsConfig = toml::from_str(single).unwrap();
734 assert_eq!(config.cert, vec![PathBuf::from("cert.pem")]);
735 assert_eq!(config.key, vec![PathBuf::from("key.pem")]);
736
737 let array = r#"
739 cert = ["a.pem", "b.pem"]
740 key = ["a.key", "b.key"]
741 generate = ["localhost"]
742 root = ["ca.pem"]
743 "#;
744 let config: ServerTlsConfig = toml::from_str(array).unwrap();
745 assert_eq!(config.cert, vec![PathBuf::from("a.pem"), PathBuf::from("b.pem")]);
746 assert_eq!(config.key, vec![PathBuf::from("a.key"), PathBuf::from("b.key")]);
747 assert_eq!(config.generate, vec!["localhost".to_string()]);
748 assert_eq!(config.root, vec![PathBuf::from("ca.pem")]);
749 }
750}