1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28 #[serde(default, skip_serializing_if = "Vec::is_empty")]
29 pub cert: Vec<PathBuf>,
30
31 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub key: Vec<PathBuf>,
35
36 #[arg(
39 long = "tls-generate",
40 id = "tls-generate",
41 value_delimiter = ',',
42 env = "MOQ_SERVER_TLS_GENERATE"
43 )]
44 #[serde(default, skip_serializing_if = "Vec::is_empty")]
45 pub generate: Vec<String>,
46
47 #[arg(
56 long = "server-tls-root",
57 id = "server-tls-root",
58 value_delimiter = ',',
59 env = "MOQ_SERVER_TLS_ROOT"
60 )]
61 #[serde(default, skip_serializing_if = "Vec::is_empty")]
62 pub root: Vec<PathBuf>,
63}
64
65impl ServerTlsConfig {
66 pub fn load_roots(&self) -> anyhow::Result<rustls::RootCertStore> {
68 use rustls::pki_types::CertificateDer;
69
70 let mut roots = rustls::RootCertStore::empty();
71 for path in &self.root {
72 let file = std::fs::File::open(path).context("failed to open root CA")?;
73 let mut reader = std::io::BufReader::new(file);
74 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
75 .collect::<Result<_, _>>()
76 .context("failed to parse root CA PEM")?;
77 anyhow::ensure!(!certs.is_empty(), "no certificates found in root CA");
78 for cert in certs {
79 roots.add(cert).context("failed to add root CA")?;
80 }
81 }
82 Ok(roots)
83 }
84}
85
86#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
88#[serde(deny_unknown_fields, default)]
89#[non_exhaustive]
90pub struct ServerConfig {
91 #[serde(alias = "listen")]
99 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
100 pub bind: Option<String>,
101
102 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
105 pub backend: Option<QuicBackend>,
106
107 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 pub quic_lb_id: Option<ServerId>,
112
113 #[arg(
116 id = "server-quic-lb-nonce",
117 long = "server-quic-lb-nonce",
118 requires = "server-quic-lb-id",
119 env = "MOQ_SERVER_QUIC_LB_NONCE"
120 )]
121 #[serde(default, skip_serializing_if = "Option::is_none")]
122 pub quic_lb_nonce: Option<usize>,
123
124 #[serde(skip_serializing_if = "Option::is_none")]
126 #[arg(
127 id = "server-max-streams",
128 long = "server-max-streams",
129 env = "MOQ_SERVER_MAX_STREAMS"
130 )]
131 pub max_streams: Option<u64>,
132
133 #[serde(default, skip_serializing_if = "Vec::is_empty")]
141 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
142 pub version: Vec<moq_lite::Version>,
143
144 #[command(flatten)]
145 #[serde(default)]
146 pub tls: ServerTlsConfig,
147}
148
149impl ServerConfig {
150 pub fn init(self) -> anyhow::Result<Server> {
151 Server::new(self)
152 }
153
154 pub fn versions(&self) -> moq_lite::Versions {
156 if self.version.is_empty() {
157 moq_lite::Versions::all()
158 } else {
159 moq_lite::Versions::from(self.version.clone())
160 }
161 }
162}
163
164pub(crate) const DEFAULT_BIND: &str = "[::]:443";
166
167pub struct Server {
171 moq: moq_lite::Server,
172 versions: moq_lite::Versions,
173 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
174 #[cfg(feature = "iroh")]
175 iroh: Option<iroh::Endpoint>,
176 #[cfg(feature = "noq")]
177 noq: Option<crate::noq::NoqServer>,
178 #[cfg(feature = "quinn")]
179 quinn: Option<crate::quinn::QuinnServer>,
180 #[cfg(feature = "quiche")]
181 quiche: Option<crate::quiche::QuicheServer>,
182 #[cfg(feature = "websocket")]
183 websocket: Option<crate::websocket::WebSocketListener>,
184}
185
186impl Server {
187 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
188 let backend = config.backend.clone().unwrap_or({
189 #[cfg(feature = "quinn")]
190 {
191 QuicBackend::Quinn
192 }
193 #[cfg(all(feature = "noq", not(feature = "quinn")))]
194 {
195 QuicBackend::Noq
196 }
197 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
198 {
199 QuicBackend::Quiche
200 }
201 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
202 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
203 });
204
205 let versions = config.versions();
206
207 if !config.tls.root.is_empty() {
208 #[cfg(feature = "quinn")]
209 let quinn_backend = matches!(backend, QuicBackend::Quinn);
210 #[cfg(not(feature = "quinn"))]
211 let quinn_backend = false;
212 anyhow::ensure!(quinn_backend, "tls.root (mTLS) is only supported by the quinn backend");
213 }
214
215 #[cfg(feature = "noq")]
216 #[allow(unreachable_patterns)]
217 let noq = match backend {
218 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
219 _ => None,
220 };
221
222 #[cfg(feature = "quinn")]
223 #[allow(unreachable_patterns)]
224 let quinn = match backend {
225 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
226 _ => None,
227 };
228
229 #[cfg(feature = "quiche")]
230 let quiche = match backend {
231 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
232 _ => None,
233 };
234
235 Ok(Server {
236 accept: Default::default(),
237 moq: moq_lite::Server::new().with_versions(versions.clone()),
238 versions,
239 #[cfg(feature = "iroh")]
240 iroh: None,
241 #[cfg(feature = "noq")]
242 noq,
243 #[cfg(feature = "quinn")]
244 quinn,
245 #[cfg(feature = "quiche")]
246 quiche,
247 #[cfg(feature = "websocket")]
248 websocket: None,
249 })
250 }
251
252 #[cfg(feature = "websocket")]
258 pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
259 self.websocket = websocket;
260 self
261 }
262
263 #[cfg(feature = "iroh")]
264 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
265 self.iroh = iroh;
266 self
267 }
268
269 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
270 self.moq = self.moq.with_publish(publish);
271 self
272 }
273
274 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
275 self.moq = self.moq.with_consume(consume);
276 self
277 }
278
279 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
281 #[cfg(feature = "noq")]
282 if let Some(noq) = self.noq.as_ref() {
283 return noq.tls_info();
284 }
285 #[cfg(feature = "quinn")]
286 if let Some(quinn) = self.quinn.as_ref() {
287 return quinn.tls_info();
288 }
289 #[cfg(feature = "quiche")]
290 if let Some(quiche) = self.quiche.as_ref() {
291 return quiche.tls_info();
292 }
293 unreachable!("no QUIC backend compiled");
294 }
295
296 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
297 pub async fn accept(&mut self) -> Option<Request> {
298 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
299 }
300
301 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
309 pub async fn accept(&mut self) -> Option<Request> {
310 loop {
311 #[cfg(feature = "noq")]
313 let noq_accept = async {
314 #[cfg(feature = "noq")]
315 if let Some(noq) = self.noq.as_mut() {
316 return noq.accept().await;
317 }
318 None
319 };
320 #[cfg(not(feature = "noq"))]
321 let noq_accept = async { None::<()> };
322
323 #[cfg(feature = "iroh")]
324 let iroh_accept = async {
325 #[cfg(feature = "iroh")]
326 if let Some(endpoint) = self.iroh.as_mut() {
327 return endpoint.accept().await;
328 }
329 None
330 };
331 #[cfg(not(feature = "iroh"))]
332 let iroh_accept = async { None::<()> };
333
334 #[cfg(feature = "quinn")]
335 let quinn_accept = async {
336 #[cfg(feature = "quinn")]
337 if let Some(quinn) = self.quinn.as_mut() {
338 return quinn.accept().await;
339 }
340 None
341 };
342 #[cfg(not(feature = "quinn"))]
343 let quinn_accept = async { None::<()> };
344
345 #[cfg(feature = "quiche")]
346 let quiche_accept = async {
347 #[cfg(feature = "quiche")]
348 if let Some(quiche) = self.quiche.as_mut() {
349 return quiche.accept().await;
350 }
351 None
352 };
353 #[cfg(not(feature = "quiche"))]
354 let quiche_accept = async { None::<()> };
355
356 #[cfg(feature = "websocket")]
357 let ws_ref = self.websocket.as_ref();
358 #[cfg(feature = "websocket")]
359 let ws_accept = async {
360 match ws_ref {
361 Some(ws) => ws.accept().await,
362 None => std::future::pending().await,
363 }
364 };
365 #[cfg(not(feature = "websocket"))]
366 let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
367
368 let server = self.moq.clone();
369 let versions = self.versions.clone();
370
371 tokio::select! {
372 Some(_conn) = noq_accept => {
373 #[cfg(feature = "noq")]
374 {
375 let alpns = versions.alpns();
376 self.accept.push(async move {
377 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
378 Ok(Request {
379 server,
380 kind: RequestKind::Noq(noq),
381 })
382 }.boxed());
383 }
384 }
385 Some(_conn) = quinn_accept => {
386 #[cfg(feature = "quinn")]
387 {
388 let alpns = versions.alpns();
389 self.accept.push(async move {
390 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
391 Ok(Request {
392 server,
393 kind: RequestKind::Quinn(Box::new(quinn)),
394 })
395 }.boxed());
396 }
397 }
398 Some(_conn) = quiche_accept => {
399 #[cfg(feature = "quiche")]
400 {
401 let alpns = versions.alpns();
402 self.accept.push(async move {
403 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
404 Ok(Request {
405 server,
406 kind: RequestKind::Quiche(quiche),
407 })
408 }.boxed());
409 }
410 }
411 Some(_conn) = iroh_accept => {
412 #[cfg(feature = "iroh")]
413 self.accept.push(async move {
414 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
415 Ok(Request {
416 server,
417 kind: RequestKind::Iroh(iroh),
418 })
419 }.boxed());
420 }
421 Some(_res) = ws_accept => {
422 #[cfg(feature = "websocket")]
423 match _res {
424 Ok(session) => {
425 return Some(Request {
426 server,
427 kind: RequestKind::WebSocket(session),
428 });
429 }
430 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
431 }
432 }
433 Some(res) = self.accept.next() => {
434 match res {
435 Ok(session) => return Some(session),
436 Err(err) => tracing::debug!(%err, "failed to accept session"),
437 }
438 }
439 _ = tokio::signal::ctrl_c() => {
440 self.close().await;
441 return None;
442 }
443 }
444 }
445 }
446
447 #[cfg(feature = "iroh")]
448 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
449 self.iroh.as_ref()
450 }
451
452 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
453 #[cfg(feature = "noq")]
454 if let Some(noq) = self.noq.as_ref() {
455 return noq.local_addr();
456 }
457 #[cfg(feature = "quinn")]
458 if let Some(quinn) = self.quinn.as_ref() {
459 return quinn.local_addr();
460 }
461 #[cfg(feature = "quiche")]
462 if let Some(quiche) = self.quiche.as_ref() {
463 return quiche.local_addr();
464 }
465 unreachable!("no QUIC backend compiled");
466 }
467
468 #[cfg(feature = "websocket")]
469 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
470 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
471 }
472
473 pub async fn close(&mut self) {
474 #[cfg(feature = "noq")]
475 if let Some(noq) = self.noq.as_mut() {
476 noq.close();
477 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
478 }
479 #[cfg(feature = "quinn")]
480 if let Some(quinn) = self.quinn.as_mut() {
481 quinn.close();
482 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
483 }
484 #[cfg(feature = "quiche")]
485 if let Some(quiche) = self.quiche.as_mut() {
486 quiche.close();
487 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
488 }
489 #[cfg(feature = "iroh")]
490 if let Some(iroh) = self.iroh.take() {
491 iroh.close().await;
492 }
493 #[cfg(feature = "websocket")]
494 {
495 let _ = self.websocket.take();
496 }
497 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
498 unreachable!("no QUIC backend compiled");
499 }
500}
501
502#[derive(Clone, Debug)]
505#[non_exhaustive]
506pub struct PeerIdentity {
507 pub dns_name: Option<String>,
512}
513
514pub(crate) enum RequestKind {
516 #[cfg(feature = "noq")]
517 Noq(crate::noq::NoqRequest),
518 #[cfg(feature = "quinn")]
519 Quinn(Box<crate::quinn::QuinnRequest>),
520 #[cfg(feature = "quiche")]
521 Quiche(crate::quiche::QuicheRequest),
522 #[cfg(feature = "iroh")]
523 Iroh(crate::iroh::IrohRequest),
524 #[cfg(feature = "websocket")]
525 WebSocket(qmux::Session),
526}
527
528pub struct Request {
533 server: moq_lite::Server,
534 kind: RequestKind,
535}
536
537impl Request {
538 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
540 match self.kind {
541 #[cfg(feature = "noq")]
542 RequestKind::Noq(request) => {
543 let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
544 request.close(status).await?;
545 Ok(())
546 }
547 #[cfg(feature = "quinn")]
548 RequestKind::Quinn(request) => {
549 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
550 request.close(status).await?;
551 Ok(())
552 }
553 #[cfg(feature = "quiche")]
554 RequestKind::Quiche(request) => {
555 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
556 request
557 .reject(status)
558 .await
559 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
560 Ok(())
561 }
562 #[cfg(feature = "iroh")]
563 RequestKind::Iroh(request) => {
564 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
565 request.close(status).await?;
566 Ok(())
567 }
568 #[cfg(feature = "websocket")]
569 RequestKind::WebSocket(_session) => {
570 Ok(())
572 }
573 }
574 }
575
576 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
578 self.server = self.server.with_publish(publish);
579 self
580 }
581
582 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
584 self.server = self.server.with_consume(consume);
585 self
586 }
587
588 pub async fn ok(self) -> anyhow::Result<Session> {
590 match self.kind {
591 #[cfg(feature = "noq")]
592 RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
593 #[cfg(feature = "quinn")]
594 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
595 #[cfg(feature = "quiche")]
596 RequestKind::Quiche(request) => {
597 let conn = request
598 .ok()
599 .await
600 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
601 Ok(self.server.accept(conn).await?)
602 }
603 #[cfg(feature = "iroh")]
604 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
605 #[cfg(feature = "websocket")]
606 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
607 }
608 }
609
610 pub fn transport(&self) -> &'static str {
612 match self.kind {
613 #[cfg(feature = "noq")]
614 RequestKind::Noq(_) => "quic",
615 #[cfg(feature = "quinn")]
616 RequestKind::Quinn(_) => "quic",
617 #[cfg(feature = "quiche")]
618 RequestKind::Quiche(_) => "quic",
619 #[cfg(feature = "iroh")]
620 RequestKind::Iroh(_) => "iroh",
621 #[cfg(feature = "websocket")]
622 RequestKind::WebSocket(_) => "websocket",
623 }
624 }
625
626 pub fn url(&self) -> Option<&Url> {
628 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
629 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
630
631 match self.kind {
632 #[cfg(feature = "noq")]
633 RequestKind::Noq(ref request) => request.url(),
634 #[cfg(feature = "quinn")]
635 RequestKind::Quinn(ref request) => request.url(),
636 #[cfg(feature = "quiche")]
637 RequestKind::Quiche(ref request) => request.url(),
638 #[cfg(feature = "iroh")]
639 RequestKind::Iroh(ref request) => request.url(),
640 #[cfg(feature = "websocket")]
641 RequestKind::WebSocket(_) => None,
642 }
643 }
644
645 pub fn peer_identity(&self) -> anyhow::Result<Option<PeerIdentity>> {
651 match self.kind {
652 #[cfg(feature = "quinn")]
653 RequestKind::Quinn(ref request) => request.peer_identity(),
654 #[cfg(feature = "noq")]
655 RequestKind::Noq(_) => Ok(None),
656 #[cfg(feature = "quiche")]
657 RequestKind::Quiche(_) => Ok(None),
658 #[cfg(feature = "iroh")]
659 RequestKind::Iroh(_) => Ok(None),
660 #[cfg(feature = "websocket")]
661 RequestKind::WebSocket(_) => Ok(None),
662 #[cfg(not(any(
663 feature = "noq",
664 feature = "quinn",
665 feature = "quiche",
666 feature = "iroh",
667 feature = "websocket"
668 )))]
669 _ => Ok(None),
670 }
671 }
672}
673
674#[derive(Debug)]
676pub struct ServerTlsInfo {
677 #[cfg(any(feature = "noq", feature = "quinn"))]
678 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
679 pub fingerprints: Vec<String>,
680}
681
682#[serde_with::serde_as]
684#[derive(Clone, serde::Serialize, serde::Deserialize)]
685pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
686
687impl ServerId {
688 #[allow(dead_code)]
689 pub(crate) fn len(&self) -> usize {
690 self.0.len()
691 }
692}
693
694impl std::fmt::Debug for ServerId {
695 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
696 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
697 }
698}
699
700impl std::str::FromStr for ServerId {
701 type Err = hex::FromHexError;
702
703 fn from_str(s: &str) -> Result<Self, Self::Err> {
704 hex::decode(s).map(Self)
705 }
706}