1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28 #[serde(default, skip_serializing_if = "Vec::is_empty")]
29 pub cert: Vec<PathBuf>,
30
31 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub key: Vec<PathBuf>,
35
36 #[arg(
39 long = "tls-generate",
40 id = "tls-generate",
41 value_delimiter = ',',
42 env = "MOQ_SERVER_TLS_GENERATE"
43 )]
44 #[serde(default, skip_serializing_if = "Vec::is_empty")]
45 pub generate: Vec<String>,
46
47 #[arg(
56 long = "server-tls-root",
57 id = "server-tls-root",
58 value_delimiter = ',',
59 env = "MOQ_SERVER_TLS_ROOT"
60 )]
61 #[serde(default, skip_serializing_if = "Vec::is_empty")]
62 pub root: Vec<PathBuf>,
63}
64
65impl ServerTlsConfig {
66 pub fn load_roots(&self) -> anyhow::Result<rustls::RootCertStore> {
68 use rustls::pki_types::CertificateDer;
69
70 let mut roots = rustls::RootCertStore::empty();
71 for path in &self.root {
72 let file = std::fs::File::open(path).context("failed to open root CA")?;
73 let mut reader = std::io::BufReader::new(file);
74 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
75 .collect::<Result<_, _>>()
76 .context("failed to parse root CA PEM")?;
77 anyhow::ensure!(!certs.is_empty(), "no certificates found in root CA");
78 for cert in certs {
79 roots.add(cert).context("failed to add root CA")?;
80 }
81 }
82 Ok(roots)
83 }
84}
85
86#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
88#[serde(deny_unknown_fields, default)]
89#[non_exhaustive]
90pub struct ServerConfig {
91 #[serde(alias = "listen")]
94 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
95 pub bind: Option<net::SocketAddr>,
96
97 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
100 pub backend: Option<QuicBackend>,
101
102 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub quic_lb_id: Option<ServerId>,
107
108 #[arg(
111 id = "server-quic-lb-nonce",
112 long = "server-quic-lb-nonce",
113 requires = "server-quic-lb-id",
114 env = "MOQ_SERVER_QUIC_LB_NONCE"
115 )]
116 #[serde(default, skip_serializing_if = "Option::is_none")]
117 pub quic_lb_nonce: Option<usize>,
118
119 #[serde(skip_serializing_if = "Option::is_none")]
121 #[arg(
122 id = "server-max-streams",
123 long = "server-max-streams",
124 env = "MOQ_SERVER_MAX_STREAMS"
125 )]
126 pub max_streams: Option<u64>,
127
128 #[serde(default, skip_serializing_if = "Vec::is_empty")]
136 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
137 pub version: Vec<moq_lite::Version>,
138
139 #[command(flatten)]
140 #[serde(default)]
141 pub tls: ServerTlsConfig,
142}
143
144impl ServerConfig {
145 pub fn init(self) -> anyhow::Result<Server> {
146 Server::new(self)
147 }
148
149 pub fn versions(&self) -> moq_lite::Versions {
151 if self.version.is_empty() {
152 moq_lite::Versions::all()
153 } else {
154 moq_lite::Versions::from(self.version.clone())
155 }
156 }
157}
158
159pub struct Server {
163 moq: moq_lite::Server,
164 versions: moq_lite::Versions,
165 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
166 #[cfg(feature = "iroh")]
167 iroh: Option<iroh::Endpoint>,
168 #[cfg(feature = "noq")]
169 noq: Option<crate::noq::NoqServer>,
170 #[cfg(feature = "quinn")]
171 quinn: Option<crate::quinn::QuinnServer>,
172 #[cfg(feature = "quiche")]
173 quiche: Option<crate::quiche::QuicheServer>,
174 #[cfg(feature = "websocket")]
175 websocket: Option<crate::websocket::WebSocketListener>,
176}
177
178impl Server {
179 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
180 let backend = config.backend.clone().unwrap_or({
181 #[cfg(feature = "quinn")]
182 {
183 QuicBackend::Quinn
184 }
185 #[cfg(all(feature = "noq", not(feature = "quinn")))]
186 {
187 QuicBackend::Noq
188 }
189 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
190 {
191 QuicBackend::Quiche
192 }
193 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
194 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
195 });
196
197 let versions = config.versions();
198
199 if !config.tls.root.is_empty() {
200 #[cfg(feature = "quinn")]
201 let quinn_backend = matches!(backend, QuicBackend::Quinn);
202 #[cfg(not(feature = "quinn"))]
203 let quinn_backend = false;
204 anyhow::ensure!(quinn_backend, "tls.root (mTLS) is only supported by the quinn backend");
205 }
206
207 #[cfg(feature = "noq")]
208 #[allow(unreachable_patterns)]
209 let noq = match backend {
210 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
211 _ => None,
212 };
213
214 #[cfg(feature = "quinn")]
215 #[allow(unreachable_patterns)]
216 let quinn = match backend {
217 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
218 _ => None,
219 };
220
221 #[cfg(feature = "quiche")]
222 let quiche = match backend {
223 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
224 _ => None,
225 };
226
227 Ok(Server {
228 accept: Default::default(),
229 moq: moq_lite::Server::new().with_versions(versions.clone()),
230 versions,
231 #[cfg(feature = "iroh")]
232 iroh: None,
233 #[cfg(feature = "noq")]
234 noq,
235 #[cfg(feature = "quinn")]
236 quinn,
237 #[cfg(feature = "quiche")]
238 quiche,
239 #[cfg(feature = "websocket")]
240 websocket: None,
241 })
242 }
243
244 #[cfg(feature = "websocket")]
250 pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
251 self.websocket = websocket;
252 self
253 }
254
255 #[cfg(feature = "iroh")]
256 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
257 self.iroh = iroh;
258 self
259 }
260
261 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
262 self.moq = self.moq.with_publish(publish);
263 self
264 }
265
266 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
267 self.moq = self.moq.with_consume(consume);
268 self
269 }
270
271 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
273 #[cfg(feature = "noq")]
274 if let Some(noq) = self.noq.as_ref() {
275 return noq.tls_info();
276 }
277 #[cfg(feature = "quinn")]
278 if let Some(quinn) = self.quinn.as_ref() {
279 return quinn.tls_info();
280 }
281 #[cfg(feature = "quiche")]
282 if let Some(quiche) = self.quiche.as_ref() {
283 return quiche.tls_info();
284 }
285 unreachable!("no QUIC backend compiled");
286 }
287
288 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
289 pub async fn accept(&mut self) -> Option<Request> {
290 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
291 }
292
293 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
301 pub async fn accept(&mut self) -> Option<Request> {
302 loop {
303 #[cfg(feature = "noq")]
305 let noq_accept = async {
306 #[cfg(feature = "noq")]
307 if let Some(noq) = self.noq.as_mut() {
308 return noq.accept().await;
309 }
310 None
311 };
312 #[cfg(not(feature = "noq"))]
313 let noq_accept = async { None::<()> };
314
315 #[cfg(feature = "iroh")]
316 let iroh_accept = async {
317 #[cfg(feature = "iroh")]
318 if let Some(endpoint) = self.iroh.as_mut() {
319 return endpoint.accept().await;
320 }
321 None
322 };
323 #[cfg(not(feature = "iroh"))]
324 let iroh_accept = async { None::<()> };
325
326 #[cfg(feature = "quinn")]
327 let quinn_accept = async {
328 #[cfg(feature = "quinn")]
329 if let Some(quinn) = self.quinn.as_mut() {
330 return quinn.accept().await;
331 }
332 None
333 };
334 #[cfg(not(feature = "quinn"))]
335 let quinn_accept = async { None::<()> };
336
337 #[cfg(feature = "quiche")]
338 let quiche_accept = async {
339 #[cfg(feature = "quiche")]
340 if let Some(quiche) = self.quiche.as_mut() {
341 return quiche.accept().await;
342 }
343 None
344 };
345 #[cfg(not(feature = "quiche"))]
346 let quiche_accept = async { None::<()> };
347
348 #[cfg(feature = "websocket")]
349 let ws_ref = self.websocket.as_ref();
350 #[cfg(feature = "websocket")]
351 let ws_accept = async {
352 match ws_ref {
353 Some(ws) => ws.accept().await,
354 None => std::future::pending().await,
355 }
356 };
357 #[cfg(not(feature = "websocket"))]
358 let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
359
360 let server = self.moq.clone();
361 let versions = self.versions.clone();
362
363 tokio::select! {
364 Some(_conn) = noq_accept => {
365 #[cfg(feature = "noq")]
366 {
367 let alpns = versions.alpns();
368 self.accept.push(async move {
369 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
370 Ok(Request {
371 server,
372 kind: RequestKind::Noq(noq),
373 })
374 }.boxed());
375 }
376 }
377 Some(_conn) = quinn_accept => {
378 #[cfg(feature = "quinn")]
379 {
380 let alpns = versions.alpns();
381 self.accept.push(async move {
382 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
383 Ok(Request {
384 server,
385 kind: RequestKind::Quinn(Box::new(quinn)),
386 })
387 }.boxed());
388 }
389 }
390 Some(_conn) = quiche_accept => {
391 #[cfg(feature = "quiche")]
392 {
393 let alpns = versions.alpns();
394 self.accept.push(async move {
395 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
396 Ok(Request {
397 server,
398 kind: RequestKind::Quiche(quiche),
399 })
400 }.boxed());
401 }
402 }
403 Some(_conn) = iroh_accept => {
404 #[cfg(feature = "iroh")]
405 self.accept.push(async move {
406 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
407 Ok(Request {
408 server,
409 kind: RequestKind::Iroh(iroh),
410 })
411 }.boxed());
412 }
413 Some(_res) = ws_accept => {
414 #[cfg(feature = "websocket")]
415 match _res {
416 Ok(session) => {
417 return Some(Request {
418 server,
419 kind: RequestKind::WebSocket(session),
420 });
421 }
422 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
423 }
424 }
425 Some(res) = self.accept.next() => {
426 match res {
427 Ok(session) => return Some(session),
428 Err(err) => tracing::debug!(%err, "failed to accept session"),
429 }
430 }
431 _ = tokio::signal::ctrl_c() => {
432 self.close().await;
433 return None;
434 }
435 }
436 }
437 }
438
439 #[cfg(feature = "iroh")]
440 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
441 self.iroh.as_ref()
442 }
443
444 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
445 #[cfg(feature = "noq")]
446 if let Some(noq) = self.noq.as_ref() {
447 return noq.local_addr();
448 }
449 #[cfg(feature = "quinn")]
450 if let Some(quinn) = self.quinn.as_ref() {
451 return quinn.local_addr();
452 }
453 #[cfg(feature = "quiche")]
454 if let Some(quiche) = self.quiche.as_ref() {
455 return quiche.local_addr();
456 }
457 unreachable!("no QUIC backend compiled");
458 }
459
460 #[cfg(feature = "websocket")]
461 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
462 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
463 }
464
465 pub async fn close(&mut self) {
466 #[cfg(feature = "noq")]
467 if let Some(noq) = self.noq.as_mut() {
468 noq.close();
469 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
470 }
471 #[cfg(feature = "quinn")]
472 if let Some(quinn) = self.quinn.as_mut() {
473 quinn.close();
474 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
475 }
476 #[cfg(feature = "quiche")]
477 if let Some(quiche) = self.quiche.as_mut() {
478 quiche.close();
479 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
480 }
481 #[cfg(feature = "iroh")]
482 if let Some(iroh) = self.iroh.take() {
483 iroh.close().await;
484 }
485 #[cfg(feature = "websocket")]
486 {
487 let _ = self.websocket.take();
488 }
489 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
490 unreachable!("no QUIC backend compiled");
491 }
492}
493
494#[derive(Clone, Debug)]
497#[non_exhaustive]
498pub struct PeerIdentity {
499 pub dns_name: Option<String>,
504}
505
506pub(crate) enum RequestKind {
508 #[cfg(feature = "noq")]
509 Noq(crate::noq::NoqRequest),
510 #[cfg(feature = "quinn")]
511 Quinn(Box<crate::quinn::QuinnRequest>),
512 #[cfg(feature = "quiche")]
513 Quiche(crate::quiche::QuicheRequest),
514 #[cfg(feature = "iroh")]
515 Iroh(crate::iroh::IrohRequest),
516 #[cfg(feature = "websocket")]
517 WebSocket(qmux::Session),
518}
519
520pub struct Request {
525 server: moq_lite::Server,
526 kind: RequestKind,
527}
528
529impl Request {
530 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
532 match self.kind {
533 #[cfg(feature = "noq")]
534 RequestKind::Noq(request) => {
535 let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
536 request.close(status).await?;
537 Ok(())
538 }
539 #[cfg(feature = "quinn")]
540 RequestKind::Quinn(request) => {
541 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
542 request.close(status).await?;
543 Ok(())
544 }
545 #[cfg(feature = "quiche")]
546 RequestKind::Quiche(request) => {
547 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
548 request
549 .reject(status)
550 .await
551 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
552 Ok(())
553 }
554 #[cfg(feature = "iroh")]
555 RequestKind::Iroh(request) => {
556 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
557 request.close(status).await?;
558 Ok(())
559 }
560 #[cfg(feature = "websocket")]
561 RequestKind::WebSocket(_session) => {
562 Ok(())
564 }
565 }
566 }
567
568 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
570 self.server = self.server.with_publish(publish);
571 self
572 }
573
574 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
576 self.server = self.server.with_consume(consume);
577 self
578 }
579
580 pub async fn ok(self) -> anyhow::Result<Session> {
582 match self.kind {
583 #[cfg(feature = "noq")]
584 RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
585 #[cfg(feature = "quinn")]
586 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
587 #[cfg(feature = "quiche")]
588 RequestKind::Quiche(request) => {
589 let conn = request
590 .ok()
591 .await
592 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
593 Ok(self.server.accept(conn).await?)
594 }
595 #[cfg(feature = "iroh")]
596 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
597 #[cfg(feature = "websocket")]
598 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
599 }
600 }
601
602 pub fn transport(&self) -> &'static str {
604 match self.kind {
605 #[cfg(feature = "noq")]
606 RequestKind::Noq(_) => "quic",
607 #[cfg(feature = "quinn")]
608 RequestKind::Quinn(_) => "quic",
609 #[cfg(feature = "quiche")]
610 RequestKind::Quiche(_) => "quic",
611 #[cfg(feature = "iroh")]
612 RequestKind::Iroh(_) => "iroh",
613 #[cfg(feature = "websocket")]
614 RequestKind::WebSocket(_) => "websocket",
615 }
616 }
617
618 pub fn url(&self) -> Option<&Url> {
620 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
621 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
622
623 match self.kind {
624 #[cfg(feature = "noq")]
625 RequestKind::Noq(ref request) => request.url(),
626 #[cfg(feature = "quinn")]
627 RequestKind::Quinn(ref request) => request.url(),
628 #[cfg(feature = "quiche")]
629 RequestKind::Quiche(ref request) => request.url(),
630 #[cfg(feature = "iroh")]
631 RequestKind::Iroh(ref request) => request.url(),
632 #[cfg(feature = "websocket")]
633 RequestKind::WebSocket(_) => None,
634 }
635 }
636
637 pub fn peer_identity(&self) -> anyhow::Result<Option<PeerIdentity>> {
643 match self.kind {
644 #[cfg(feature = "quinn")]
645 RequestKind::Quinn(ref request) => request.peer_identity(),
646 #[cfg(feature = "noq")]
647 RequestKind::Noq(_) => Ok(None),
648 #[cfg(feature = "quiche")]
649 RequestKind::Quiche(_) => Ok(None),
650 #[cfg(feature = "iroh")]
651 RequestKind::Iroh(_) => Ok(None),
652 #[cfg(feature = "websocket")]
653 RequestKind::WebSocket(_) => Ok(None),
654 #[cfg(not(any(
655 feature = "noq",
656 feature = "quinn",
657 feature = "quiche",
658 feature = "iroh",
659 feature = "websocket"
660 )))]
661 _ => Ok(None),
662 }
663 }
664}
665
666#[derive(Debug)]
668pub struct ServerTlsInfo {
669 #[cfg(any(feature = "noq", feature = "quinn"))]
670 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
671 pub fingerprints: Vec<String>,
672}
673
674#[serde_with::serde_as]
676#[derive(Clone, serde::Serialize, serde::Deserialize)]
677pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
678
679impl ServerId {
680 #[allow(dead_code)]
681 pub(crate) fn len(&self) -> usize {
682 self.0.len()
683 }
684}
685
686impl std::fmt::Debug for ServerId {
687 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
689 }
690}
691
692impl std::str::FromStr for ServerId {
693 type Err = hex::FromHexError;
694
695 fn from_str(s: &str) -> Result<Self, Self::Err> {
696 hex::decode(s).map(Self)
697 }
698}