Skip to main content

moq_native/
server.rs

1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18/// TLS configuration for the server.
19///
20/// Certificate and keys must currently be files on disk.
21/// Alternatively, you can generate a self-signed certificate given a list of hostnames.
22#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26	/// Load the given certificate from disk.
27	#[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28	#[serde(default, skip_serializing_if = "Vec::is_empty")]
29	pub cert: Vec<PathBuf>,
30
31	/// Load the given key from disk.
32	#[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33	#[serde(default, skip_serializing_if = "Vec::is_empty")]
34	pub key: Vec<PathBuf>,
35
36	/// Or generate a new certificate and key with the given hostnames.
37	/// This won't be valid unless the client uses the fingerprint or disables verification.
38	#[arg(
39		long = "tls-generate",
40		id = "tls-generate",
41		value_delimiter = ',',
42		env = "MOQ_SERVER_TLS_GENERATE"
43	)]
44	#[serde(default, skip_serializing_if = "Vec::is_empty")]
45	pub generate: Vec<String>,
46}
47
48/// Configuration for the MoQ server.
49#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53	/// Listen for UDP packets on the given address.
54	/// Defaults to `[::]:443` if not provided.
55	#[serde(alias = "listen")]
56	#[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57	pub bind: Option<net::SocketAddr>,
58
59	/// The QUIC backend to use.
60	/// Auto-detected from compiled features if not specified.
61	#[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62	pub backend: Option<QuicBackend>,
63
64	/// Server ID to embed in connection IDs for QUIC-LB compatibility.
65	/// If set, connection IDs will be derived semi-deterministically.
66	#[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67	#[serde(default, skip_serializing_if = "Option::is_none")]
68	pub quic_lb_id: Option<ServerId>,
69
70	/// Number of random nonce bytes in QUIC-LB connection IDs.
71	/// Must be at least 4, and server_id + nonce + 1 must not exceed 20.
72	#[arg(
73		id = "server-quic-lb-nonce",
74		long = "server-quic-lb-nonce",
75		requires = "server-quic-lb-id",
76		env = "MOQ_SERVER_QUIC_LB_NONCE"
77	)]
78	#[serde(default, skip_serializing_if = "Option::is_none")]
79	pub quic_lb_nonce: Option<usize>,
80
81	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
82	#[serde(skip_serializing_if = "Option::is_none")]
83	#[arg(
84		id = "server-max-streams",
85		long = "server-max-streams",
86		env = "MOQ_SERVER_MAX_STREAMS"
87	)]
88	pub max_streams: Option<u64>,
89
90	/// Restrict the server to specific MoQ protocol version(s).
91	///
92	/// By default, the server accepts all supported versions.
93	/// Use this to restrict to specific versions, e.g. `--server-version moq-lite-02`.
94	/// Can be specified multiple times to accept a subset of versions.
95	///
96	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16
97	#[serde(default, skip_serializing_if = "Vec::is_empty")]
98	#[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
99	pub version: Vec<moq_lite::Version>,
100
101	#[command(flatten)]
102	#[serde(default)]
103	pub tls: ServerTlsConfig,
104}
105
106impl ServerConfig {
107	pub fn init(self) -> anyhow::Result<Server> {
108		Server::new(self)
109	}
110
111	/// Returns the configured versions, defaulting to all if none specified.
112	pub fn versions(&self) -> moq_lite::Versions {
113		if self.version.is_empty() {
114			moq_lite::Versions::all()
115		} else {
116			moq_lite::Versions::from(self.version.clone())
117		}
118	}
119}
120
121/// Server for accepting MoQ connections over QUIC.
122///
123/// Create via [`ServerConfig::init`] or [`Server::new`].
124pub struct Server {
125	moq: moq_lite::Server,
126	versions: moq_lite::Versions,
127	accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
128	#[cfg(feature = "iroh")]
129	iroh: Option<iroh::Endpoint>,
130	#[cfg(feature = "noq")]
131	noq: Option<crate::noq::NoqServer>,
132	#[cfg(feature = "quinn")]
133	quinn: Option<crate::quinn::QuinnServer>,
134	#[cfg(feature = "quiche")]
135	quiche: Option<crate::quiche::QuicheServer>,
136	#[cfg(feature = "websocket")]
137	websocket: Option<crate::websocket::WebSocketListener>,
138}
139
140impl Server {
141	pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
142		let backend = config.backend.clone().unwrap_or({
143			#[cfg(feature = "quinn")]
144			{
145				QuicBackend::Quinn
146			}
147			#[cfg(all(feature = "noq", not(feature = "quinn")))]
148			{
149				QuicBackend::Noq
150			}
151			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
152			{
153				QuicBackend::Quiche
154			}
155			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
156			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
157		});
158
159		let versions = config.versions();
160
161		#[cfg(feature = "noq")]
162		#[allow(unreachable_patterns)]
163		let noq = match backend {
164			QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
165			_ => None,
166		};
167
168		#[cfg(feature = "quinn")]
169		#[allow(unreachable_patterns)]
170		let quinn = match backend {
171			QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
172			_ => None,
173		};
174
175		#[cfg(feature = "quiche")]
176		let quiche = match backend {
177			QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
178			_ => None,
179		};
180
181		Ok(Server {
182			accept: Default::default(),
183			moq: moq_lite::Server::new().with_versions(versions.clone()),
184			versions,
185			#[cfg(feature = "iroh")]
186			iroh: None,
187			#[cfg(feature = "noq")]
188			noq,
189			#[cfg(feature = "quinn")]
190			quinn,
191			#[cfg(feature = "quiche")]
192			quiche,
193			#[cfg(feature = "websocket")]
194			websocket: None,
195		})
196	}
197
198	/// Add a standalone WebSocket listener on a separate TCP port.
199	///
200	/// This is useful for simple applications that want WebSocket on a dedicated port.
201	/// For applications that need WebSocket on the same HTTP port (e.g. moq-relay),
202	/// use `qmux::Session::accept()` with your own HTTP framework instead.
203	#[cfg(feature = "websocket")]
204	pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
205		self.websocket = websocket;
206		self
207	}
208
209	#[cfg(feature = "iroh")]
210	pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
211		self.iroh = iroh;
212		self
213	}
214
215	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
216		self.moq = self.moq.with_publish(publish);
217		self
218	}
219
220	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
221		self.moq = self.moq.with_consume(consume);
222		self
223	}
224
225	// Return the SHA256 fingerprints of all our certificates.
226	pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
227		#[cfg(feature = "noq")]
228		if let Some(noq) = self.noq.as_ref() {
229			return noq.tls_info();
230		}
231		#[cfg(feature = "quinn")]
232		if let Some(quinn) = self.quinn.as_ref() {
233			return quinn.tls_info();
234		}
235		#[cfg(feature = "quiche")]
236		if let Some(quiche) = self.quiche.as_ref() {
237			return quiche.tls_info();
238		}
239		unreachable!("no QUIC backend compiled");
240	}
241
242	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
243	pub async fn accept(&mut self) -> Option<Request> {
244		unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
245	}
246
247	/// Returns the next partially established QUIC or WebTransport session.
248	///
249	/// This returns a [Request] instead of a [web_transport_quinn::Session]
250	/// so the connection can be rejected early on an invalid path or missing auth.
251	///
252	/// The [Request] is either a WebTransport or a raw QUIC request.
253	/// Call [Request::ok] or [Request::close] to complete the handshake.
254	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
255	pub async fn accept(&mut self) -> Option<Request> {
256		loop {
257			// tokio::select! does not support cfg directives on arms, so we need to create the futures here.
258			#[cfg(feature = "noq")]
259			let noq_accept = async {
260				#[cfg(feature = "noq")]
261				if let Some(noq) = self.noq.as_mut() {
262					return noq.accept().await;
263				}
264				None
265			};
266			#[cfg(not(feature = "noq"))]
267			let noq_accept = async { None::<()> };
268
269			#[cfg(feature = "iroh")]
270			let iroh_accept = async {
271				#[cfg(feature = "iroh")]
272				if let Some(endpoint) = self.iroh.as_mut() {
273					return endpoint.accept().await;
274				}
275				None
276			};
277			#[cfg(not(feature = "iroh"))]
278			let iroh_accept = async { None::<()> };
279
280			#[cfg(feature = "quinn")]
281			let quinn_accept = async {
282				#[cfg(feature = "quinn")]
283				if let Some(quinn) = self.quinn.as_mut() {
284					return quinn.accept().await;
285				}
286				None
287			};
288			#[cfg(not(feature = "quinn"))]
289			let quinn_accept = async { None::<()> };
290
291			#[cfg(feature = "quiche")]
292			let quiche_accept = async {
293				#[cfg(feature = "quiche")]
294				if let Some(quiche) = self.quiche.as_mut() {
295					return quiche.accept().await;
296				}
297				None
298			};
299			#[cfg(not(feature = "quiche"))]
300			let quiche_accept = async { None::<()> };
301
302			#[cfg(feature = "websocket")]
303			let ws_ref = self.websocket.as_ref();
304			#[cfg(feature = "websocket")]
305			let ws_accept = async {
306				match ws_ref {
307					Some(ws) => ws.accept().await,
308					None => std::future::pending().await,
309				}
310			};
311			#[cfg(not(feature = "websocket"))]
312			let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
313
314			let server = self.moq.clone();
315			let versions = self.versions.clone();
316
317			tokio::select! {
318				Some(_conn) = noq_accept => {
319					#[cfg(feature = "noq")]
320					{
321						let alpns = versions.alpns();
322						self.accept.push(async move {
323							let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
324							Ok(Request {
325								server,
326								kind: RequestKind::Noq(noq),
327							})
328						}.boxed());
329					}
330				}
331				Some(_conn) = quinn_accept => {
332					#[cfg(feature = "quinn")]
333					{
334						let alpns = versions.alpns();
335						self.accept.push(async move {
336							let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
337							Ok(Request {
338								server,
339								kind: RequestKind::Quinn(Box::new(quinn)),
340							})
341						}.boxed());
342					}
343				}
344				Some(_conn) = quiche_accept => {
345					#[cfg(feature = "quiche")]
346					{
347						let alpns = versions.alpns();
348						self.accept.push(async move {
349							let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
350							Ok(Request {
351								server,
352								kind: RequestKind::Quiche(quiche),
353							})
354						}.boxed());
355					}
356				}
357				Some(_conn) = iroh_accept => {
358					#[cfg(feature = "iroh")]
359					self.accept.push(async move {
360						let iroh = super::iroh::IrohRequest::accept(_conn).await?;
361						Ok(Request {
362							server,
363							kind: RequestKind::Iroh(iroh),
364						})
365					}.boxed());
366				}
367				Some(_res) = ws_accept => {
368					#[cfg(feature = "websocket")]
369					match _res {
370						Ok(session) => {
371							return Some(Request {
372								server,
373								kind: RequestKind::WebSocket(session),
374							});
375						}
376						Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
377					}
378				}
379				Some(res) = self.accept.next() => {
380					match res {
381						Ok(session) => return Some(session),
382						Err(err) => tracing::debug!(%err, "failed to accept session"),
383					}
384				}
385				_ = tokio::signal::ctrl_c() => {
386					self.close().await;
387					return None;
388				}
389			}
390		}
391	}
392
393	#[cfg(feature = "iroh")]
394	pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
395		self.iroh.as_ref()
396	}
397
398	pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
399		#[cfg(feature = "noq")]
400		if let Some(noq) = self.noq.as_ref() {
401			return noq.local_addr();
402		}
403		#[cfg(feature = "quinn")]
404		if let Some(quinn) = self.quinn.as_ref() {
405			return quinn.local_addr();
406		}
407		#[cfg(feature = "quiche")]
408		if let Some(quiche) = self.quiche.as_ref() {
409			return quiche.local_addr();
410		}
411		unreachable!("no QUIC backend compiled");
412	}
413
414	#[cfg(feature = "websocket")]
415	pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
416		self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
417	}
418
419	pub async fn close(&mut self) {
420		#[cfg(feature = "noq")]
421		if let Some(noq) = self.noq.as_mut() {
422			noq.close();
423			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
424		}
425		#[cfg(feature = "quinn")]
426		if let Some(quinn) = self.quinn.as_mut() {
427			quinn.close();
428			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
429		}
430		#[cfg(feature = "quiche")]
431		if let Some(quiche) = self.quiche.as_mut() {
432			quiche.close();
433			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
434		}
435		#[cfg(feature = "iroh")]
436		if let Some(iroh) = self.iroh.take() {
437			iroh.close().await;
438		}
439		#[cfg(feature = "websocket")]
440		{
441			let _ = self.websocket.take();
442		}
443		#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
444		unreachable!("no QUIC backend compiled");
445	}
446}
447
448/// An incoming connection that can be accepted or rejected.
449pub(crate) enum RequestKind {
450	#[cfg(feature = "noq")]
451	Noq(crate::noq::NoqRequest),
452	#[cfg(feature = "quinn")]
453	Quinn(Box<crate::quinn::QuinnRequest>),
454	#[cfg(feature = "quiche")]
455	Quiche(crate::quiche::QuicheRequest),
456	#[cfg(feature = "iroh")]
457	Iroh(crate::iroh::IrohRequest),
458	#[cfg(feature = "websocket")]
459	WebSocket(qmux::Session),
460}
461
462/// An incoming MoQ session that can be accepted or rejected.
463///
464/// [Self::with_publish] and [Self::with_consume] will configure what will be published and consumed from the session respectively.
465/// Otherwise, the Server's configuration is used by default.
466pub struct Request {
467	server: moq_lite::Server,
468	kind: RequestKind,
469}
470
471impl Request {
472	/// Reject the session, returning your favorite HTTP status code.
473	pub async fn close(self, _code: u16) -> anyhow::Result<()> {
474		match self.kind {
475			#[cfg(feature = "noq")]
476			RequestKind::Noq(request) => {
477				let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
478				request.close(status).await?;
479				Ok(())
480			}
481			#[cfg(feature = "quinn")]
482			RequestKind::Quinn(request) => {
483				let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
484				request.close(status).await?;
485				Ok(())
486			}
487			#[cfg(feature = "quiche")]
488			RequestKind::Quiche(request) => {
489				let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
490				request
491					.reject(status)
492					.await
493					.map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
494				Ok(())
495			}
496			#[cfg(feature = "iroh")]
497			RequestKind::Iroh(request) => {
498				let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
499				request.close(status).await?;
500				Ok(())
501			}
502			#[cfg(feature = "websocket")]
503			RequestKind::WebSocket(_session) => {
504				// WebSocket doesn't support HTTP status codes; just drop to close.
505				Ok(())
506			}
507		}
508	}
509
510	/// Publish the given origin to the session.
511	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
512		self.server = self.server.with_publish(publish);
513		self
514	}
515
516	/// Consume the given origin from the session.
517	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
518		self.server = self.server.with_consume(consume);
519		self
520	}
521
522	/// Accept the session, performing rest of the MoQ handshake.
523	pub async fn ok(self) -> anyhow::Result<Session> {
524		match self.kind {
525			#[cfg(feature = "noq")]
526			RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
527			#[cfg(feature = "quinn")]
528			RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
529			#[cfg(feature = "quiche")]
530			RequestKind::Quiche(request) => {
531				let conn = request
532					.ok()
533					.await
534					.map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
535				Ok(self.server.accept(conn).await?)
536			}
537			#[cfg(feature = "iroh")]
538			RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
539			#[cfg(feature = "websocket")]
540			RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
541		}
542	}
543
544	/// Returns the transport type as a string (e.g. "quic", "iroh").
545	pub fn transport(&self) -> &'static str {
546		match self.kind {
547			#[cfg(feature = "noq")]
548			RequestKind::Noq(_) => "quic",
549			#[cfg(feature = "quinn")]
550			RequestKind::Quinn(_) => "quic",
551			#[cfg(feature = "quiche")]
552			RequestKind::Quiche(_) => "quic",
553			#[cfg(feature = "iroh")]
554			RequestKind::Iroh(_) => "iroh",
555			#[cfg(feature = "websocket")]
556			RequestKind::WebSocket(_) => "websocket",
557		}
558	}
559
560	/// Returns the URL provided by the client.
561	pub fn url(&self) -> Option<&Url> {
562		#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
563		unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
564
565		match self.kind {
566			#[cfg(feature = "noq")]
567			RequestKind::Noq(ref request) => request.url(),
568			#[cfg(feature = "quinn")]
569			RequestKind::Quinn(ref request) => request.url(),
570			#[cfg(feature = "quiche")]
571			RequestKind::Quiche(ref request) => request.url(),
572			#[cfg(feature = "iroh")]
573			RequestKind::Iroh(ref request) => request.url(),
574			#[cfg(feature = "websocket")]
575			RequestKind::WebSocket(_) => None,
576		}
577	}
578}
579
580/// TLS certificate information including fingerprints.
581#[derive(Debug)]
582pub struct ServerTlsInfo {
583	#[cfg(any(feature = "noq", feature = "quinn"))]
584	pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
585	pub fingerprints: Vec<String>,
586}
587
588/// Server ID for QUIC-LB support.
589#[serde_with::serde_as]
590#[derive(Clone, serde::Serialize, serde::Deserialize)]
591pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
592
593impl ServerId {
594	#[allow(dead_code)]
595	pub(crate) fn len(&self) -> usize {
596		self.0.len()
597	}
598}
599
600impl std::fmt::Debug for ServerId {
601	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
602		f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
603	}
604}
605
606impl std::str::FromStr for ServerId {
607	type Err = hex::FromHexError;
608
609	fn from_str(s: &str) -> Result<Self, Self::Err> {
610		hex::decode(s).map(Self)
611	}
612}