Skip to main content

moq_native/
server.rs

1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18/// TLS configuration for the server.
19///
20/// Certificate and keys must currently be files on disk.
21/// Alternatively, you can generate a self-signed certificate given a list of hostnames.
22#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26	/// Load the given certificate from disk.
27	#[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28	#[serde(default, skip_serializing_if = "Vec::is_empty")]
29	pub cert: Vec<PathBuf>,
30
31	/// Load the given key from disk.
32	#[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33	#[serde(default, skip_serializing_if = "Vec::is_empty")]
34	pub key: Vec<PathBuf>,
35
36	/// Or generate a new certificate and key with the given hostnames.
37	/// This won't be valid unless the client uses the fingerprint or disables verification.
38	#[arg(
39		long = "tls-generate",
40		id = "tls-generate",
41		value_delimiter = ',',
42		env = "MOQ_SERVER_TLS_GENERATE"
43	)]
44	#[serde(default, skip_serializing_if = "Vec::is_empty")]
45	pub generate: Vec<String>,
46}
47
48/// Configuration for the MoQ server.
49#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53	/// Listen for UDP packets on the given address.
54	/// Defaults to `[::]:443` if not provided.
55	#[serde(alias = "listen")]
56	#[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57	pub bind: Option<net::SocketAddr>,
58
59	/// The QUIC backend to use.
60	/// Auto-detected from compiled features if not specified.
61	#[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62	pub backend: Option<QuicBackend>,
63
64	/// Server ID to embed in connection IDs for QUIC-LB compatibility.
65	/// If set, connection IDs will be derived semi-deterministically.
66	#[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67	#[serde(default, skip_serializing_if = "Option::is_none")]
68	pub quic_lb_id: Option<ServerId>,
69
70	/// Number of random nonce bytes in QUIC-LB connection IDs.
71	/// Must be at least 4, and server_id + nonce + 1 must not exceed 20.
72	#[arg(
73		id = "server-quic-lb-nonce",
74		long = "server-quic-lb-nonce",
75		requires = "server-quic-lb-id",
76		env = "MOQ_SERVER_QUIC_LB_NONCE"
77	)]
78	#[serde(default, skip_serializing_if = "Option::is_none")]
79	pub quic_lb_nonce: Option<usize>,
80
81	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
82	#[serde(skip_serializing_if = "Option::is_none")]
83	#[arg(
84		id = "server-max-streams",
85		long = "server-max-streams",
86		env = "MOQ_SERVER_MAX_STREAMS"
87	)]
88	pub max_streams: Option<u64>,
89
90	/// Restrict the server to specific MoQ protocol version(s).
91	///
92	/// By default, the server accepts all supported versions.
93	/// Use this to restrict to specific versions, e.g. `--server-version moq-lite-02`.
94	/// Can be specified multiple times to accept a subset of versions.
95	///
96	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16
97	#[serde(default, skip_serializing_if = "Vec::is_empty")]
98	#[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
99	pub version: Vec<moq_lite::Version>,
100
101	#[command(flatten)]
102	#[serde(default)]
103	pub tls: ServerTlsConfig,
104}
105
106impl ServerConfig {
107	pub fn init(self) -> anyhow::Result<Server> {
108		Server::new(self)
109	}
110
111	/// Returns the configured versions, defaulting to all if none specified.
112	pub fn versions(&self) -> moq_lite::Versions {
113		if self.version.is_empty() {
114			moq_lite::Versions::all()
115		} else {
116			moq_lite::Versions::from(self.version.clone())
117		}
118	}
119}
120
121/// Server for accepting MoQ connections over QUIC.
122///
123/// Create via [`ServerConfig::init`] or [`Server::new`].
124pub struct Server {
125	moq: moq_lite::Server,
126	versions: moq_lite::Versions,
127	accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
128	#[cfg(feature = "iroh")]
129	iroh: Option<iroh::Endpoint>,
130	#[cfg(feature = "quinn")]
131	quinn: Option<crate::quinn::QuinnServer>,
132	#[cfg(feature = "quiche")]
133	quiche: Option<crate::quiche::QuicheServer>,
134}
135
136impl Server {
137	pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
138		let backend = config.backend.clone().unwrap_or({
139			#[cfg(feature = "quinn")]
140			{
141				QuicBackend::Quinn
142			}
143			#[cfg(all(feature = "quiche", not(feature = "quinn")))]
144			{
145				QuicBackend::Quiche
146			}
147			#[cfg(all(not(feature = "quiche"), not(feature = "quinn")))]
148			panic!("no QUIC backend compiled; enable quinn or quiche feature");
149		});
150
151		let versions = config.versions();
152
153		#[cfg(feature = "quinn")]
154		#[allow(unreachable_patterns)]
155		let quinn = match backend {
156			QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
157			_ => None,
158		};
159
160		#[cfg(feature = "quiche")]
161		let quiche = match backend {
162			QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
163			_ => None,
164		};
165
166		Ok(Server {
167			accept: Default::default(),
168			moq: moq_lite::Server::new().with_versions(versions.clone()),
169			versions,
170			#[cfg(feature = "iroh")]
171			iroh: None,
172			#[cfg(feature = "quinn")]
173			quinn,
174			#[cfg(feature = "quiche")]
175			quiche,
176		})
177	}
178
179	#[cfg(feature = "iroh")]
180	pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
181		self.iroh = iroh;
182		self
183	}
184
185	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
186		self.moq = self.moq.with_publish(publish);
187		self
188	}
189
190	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
191		self.moq = self.moq.with_consume(consume);
192		self
193	}
194
195	// Return the SHA256 fingerprints of all our certificates.
196	pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
197		#[cfg(feature = "quinn")]
198		if let Some(quinn) = self.quinn.as_ref() {
199			return quinn.tls_info();
200		}
201		#[cfg(feature = "quiche")]
202		if let Some(quiche) = self.quiche.as_ref() {
203			return quiche.tls_info();
204		}
205		unreachable!("no QUIC backend compiled");
206	}
207
208	#[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
209	pub async fn accept(&mut self) -> Option<Request> {
210		unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
211	}
212
213	/// Returns the next partially established QUIC or WebTransport session.
214	///
215	/// This returns a [Request] instead of a [web_transport_quinn::Session]
216	/// so the connection can be rejected early on an invalid path or missing auth.
217	///
218	/// The [Request] is either a WebTransport or a raw QUIC request.
219	/// Call [Request::ok] or [Request::close] to complete the handshake.
220	#[cfg(any(feature = "quinn", feature = "quiche", feature = "iroh"))]
221	pub async fn accept(&mut self) -> Option<Request> {
222		loop {
223			// tokio::select! does not support cfg directives on arms, so we need to create the futures here.
224			#[cfg(feature = "iroh")]
225			let iroh_accept = async {
226				#[cfg(feature = "iroh")]
227				if let Some(endpoint) = self.iroh.as_mut() {
228					return endpoint.accept().await;
229				}
230				None
231			};
232			#[cfg(not(feature = "iroh"))]
233			let iroh_accept = async { None::<()> };
234
235			#[cfg(feature = "quinn")]
236			let quinn_accept = async {
237				#[cfg(feature = "quinn")]
238				if let Some(quinn) = self.quinn.as_mut() {
239					return quinn.accept().await;
240				}
241				None
242			};
243			#[cfg(not(feature = "quinn"))]
244			let quinn_accept = async { None::<()> };
245
246			#[cfg(feature = "quiche")]
247			let quiche_accept = async {
248				#[cfg(feature = "quiche")]
249				if let Some(quiche) = self.quiche.as_mut() {
250					return quiche.accept().await;
251				}
252				None
253			};
254			#[cfg(not(feature = "quiche"))]
255			let quiche_accept = async { None::<()> };
256
257			let server = self.moq.clone();
258			let versions = self.versions.clone();
259
260			tokio::select! {
261				Some(_conn) = quinn_accept => {
262					#[cfg(feature = "quinn")]
263					{
264						let alpns = versions.alpns();
265						self.accept.push(async move {
266							let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
267							Ok(Request {
268								server,
269								kind: RequestKind::Quinn(quinn),
270							})
271						}.boxed());
272					}
273				}
274				Some(_conn) = quiche_accept => {
275					#[cfg(feature = "quiche")]
276					{
277						let alpns = versions.alpns();
278						self.accept.push(async move {
279							let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
280							Ok(Request {
281								server,
282								kind: RequestKind::Quiche(quiche),
283							})
284						}.boxed());
285					}
286				}
287				Some(_conn) = iroh_accept => {
288					#[cfg(feature = "iroh")]
289					self.accept.push(async move {
290						let iroh = super::iroh::IrohRequest::accept(_conn).await?;
291						Ok(Request {
292							server,
293							kind: RequestKind::Iroh(iroh),
294						})
295					}.boxed());
296				}
297				Some(res) = self.accept.next() => {
298					match res {
299						Ok(session) => return Some(session),
300						Err(err) => tracing::debug!(%err, "failed to accept session"),
301					}
302				}
303				_ = tokio::signal::ctrl_c() => {
304					self.close().await;
305					return None;
306				}
307			}
308		}
309	}
310
311	#[cfg(feature = "iroh")]
312	pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
313		self.iroh.as_ref()
314	}
315
316	pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
317		#[cfg(feature = "quinn")]
318		if let Some(quinn) = self.quinn.as_ref() {
319			return quinn.local_addr();
320		}
321		#[cfg(feature = "quiche")]
322		if let Some(quiche) = self.quiche.as_ref() {
323			return quiche.local_addr();
324		}
325		unreachable!("no QUIC backend compiled");
326	}
327
328	pub async fn close(&mut self) {
329		#[cfg(feature = "quinn")]
330		if let Some(quinn) = self.quinn.as_mut() {
331			quinn.close();
332			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
333		}
334		#[cfg(feature = "quiche")]
335		if let Some(quiche) = self.quiche.as_mut() {
336			quiche.close();
337			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
338		}
339		#[cfg(feature = "iroh")]
340		if let Some(iroh) = self.iroh.take() {
341			iroh.close().await;
342		}
343		#[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
344		unreachable!("no QUIC backend compiled");
345	}
346}
347
348/// An incoming connection that can be accepted or rejected.
349pub(crate) enum RequestKind {
350	#[cfg(feature = "quinn")]
351	Quinn(crate::quinn::QuinnRequest),
352	#[cfg(feature = "quiche")]
353	Quiche(crate::quiche::QuicheRequest),
354	#[cfg(feature = "iroh")]
355	Iroh(crate::iroh::IrohRequest),
356}
357
358/// An incoming MoQ session that can be accepted or rejected.
359///
360/// [Self::with_publish] and [Self::with_consume] will configure what will be published and consumed from the session respectively.
361/// Otherwise, the Server's configuration is used by default.
362pub struct Request {
363	server: moq_lite::Server,
364	kind: RequestKind,
365}
366
367impl Request {
368	/// Reject the session, returning your favorite HTTP status code.
369	pub async fn close(self, _code: u16) -> anyhow::Result<()> {
370		match self.kind {
371			#[cfg(feature = "quinn")]
372			RequestKind::Quinn(request) => {
373				let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
374				request.close(status).await?;
375				Ok(())
376			}
377			#[cfg(feature = "quiche")]
378			RequestKind::Quiche(request) => {
379				let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
380				request
381					.reject(status)
382					.await
383					.map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
384				Ok(())
385			}
386			#[cfg(feature = "iroh")]
387			RequestKind::Iroh(request) => {
388				let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
389				request.close(status).await?;
390				Ok(())
391			}
392		}
393	}
394
395	/// Publish the given origin to the session.
396	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
397		self.server = self.server.with_publish(publish);
398		self
399	}
400
401	/// Consume the given origin from the session.
402	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
403		self.server = self.server.with_consume(consume);
404		self
405	}
406
407	/// Accept the session, performing rest of the MoQ handshake.
408	pub async fn ok(self) -> anyhow::Result<Session> {
409		match self.kind {
410			#[cfg(feature = "quinn")]
411			RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
412			#[cfg(feature = "quiche")]
413			RequestKind::Quiche(request) => {
414				let conn = request
415					.ok()
416					.await
417					.map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
418				Ok(self.server.accept(conn).await?)
419			}
420			#[cfg(feature = "iroh")]
421			RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
422		}
423	}
424
425	/// Returns the URL provided by the client.
426	pub fn url(&self) -> Option<&Url> {
427		#[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
428		unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
429
430		match self.kind {
431			#[cfg(feature = "quinn")]
432			RequestKind::Quinn(ref request) => request.url(),
433			#[cfg(feature = "quiche")]
434			RequestKind::Quiche(ref request) => request.url(),
435			#[cfg(feature = "iroh")]
436			RequestKind::Iroh(ref request) => request.url(),
437		}
438	}
439}
440
441/// TLS certificate information including fingerprints.
442#[derive(Debug)]
443pub struct ServerTlsInfo {
444	#[cfg(feature = "quinn")]
445	pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
446	pub fingerprints: Vec<String>,
447}
448
449/// Server ID for QUIC-LB support.
450#[serde_with::serde_as]
451#[derive(Clone, serde::Serialize, serde::Deserialize)]
452pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
453
454impl ServerId {
455	#[allow(dead_code)]
456	pub(crate) fn len(&self) -> usize {
457		self.0.len()
458	}
459}
460
461impl std::fmt::Debug for ServerId {
462	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463		f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
464	}
465}
466
467impl std::str::FromStr for ServerId {
468	type Err = hex::FromHexError;
469
470	fn from_str(s: &str) -> Result<Self, Self::Err> {
471		hex::decode(s).map(Self)
472	}
473}