Skip to main content

moq_native/
server.rs

1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18/// TLS configuration for the server.
19///
20/// Certificate and keys must currently be files on disk.
21/// Alternatively, you can generate a self-signed certificate given a list of hostnames.
22#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26	/// Load the given certificate from disk.
27	#[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28	#[serde(default, skip_serializing_if = "Vec::is_empty")]
29	pub cert: Vec<PathBuf>,
30
31	/// Load the given key from disk.
32	#[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33	#[serde(default, skip_serializing_if = "Vec::is_empty")]
34	pub key: Vec<PathBuf>,
35
36	/// Or generate a new certificate and key with the given hostnames.
37	/// This won't be valid unless the client uses the fingerprint or disables verification.
38	#[arg(
39		long = "tls-generate",
40		id = "tls-generate",
41		value_delimiter = ',',
42		env = "MOQ_SERVER_TLS_GENERATE"
43	)]
44	#[serde(default, skip_serializing_if = "Vec::is_empty")]
45	pub generate: Vec<String>,
46}
47
48/// Configuration for the MoQ server.
49#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53	/// Listen for UDP packets on the given address.
54	/// Defaults to `[::]:443` if not provided.
55	#[serde(alias = "listen")]
56	#[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57	pub bind: Option<net::SocketAddr>,
58
59	/// The QUIC backend to use.
60	/// Auto-detected from compiled features if not specified.
61	#[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62	pub backend: Option<QuicBackend>,
63
64	/// Server ID to embed in connection IDs for QUIC-LB compatibility.
65	/// If set, connection IDs will be derived semi-deterministically.
66	#[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67	#[serde(default, skip_serializing_if = "Option::is_none")]
68	pub quic_lb_id: Option<ServerId>,
69
70	/// Number of random nonce bytes in QUIC-LB connection IDs.
71	/// Must be at least 4, and server_id + nonce + 1 must not exceed 20.
72	#[arg(
73		id = "server-quic-lb-nonce",
74		long = "server-quic-lb-nonce",
75		requires = "server-quic-lb-id",
76		env = "MOQ_SERVER_QUIC_LB_NONCE"
77	)]
78	#[serde(default, skip_serializing_if = "Option::is_none")]
79	pub quic_lb_nonce: Option<usize>,
80
81	#[command(flatten)]
82	#[serde(default)]
83	pub tls: ServerTlsConfig,
84}
85
86impl ServerConfig {
87	pub fn init(self) -> anyhow::Result<Server> {
88		Server::new(self)
89	}
90}
91
92/// Server for accepting MoQ connections over QUIC.
93///
94/// Create via [`ServerConfig::init`] or [`Server::new`].
95pub struct Server {
96	moq: moq_lite::Server,
97	accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
98	#[cfg(feature = "iroh")]
99	iroh: Option<iroh::Endpoint>,
100	#[cfg(feature = "quinn")]
101	quinn: Option<crate::quinn::QuinnServer>,
102	#[cfg(feature = "quiche")]
103	quiche: Option<crate::quiche::QuicheServer>,
104}
105
106impl Server {
107	pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
108		let backend = config.backend.clone().unwrap_or({
109			#[cfg(feature = "quinn")]
110			{
111				QuicBackend::Quinn
112			}
113			#[cfg(all(feature = "quiche", not(feature = "quinn")))]
114			{
115				QuicBackend::Quiche
116			}
117			#[cfg(all(not(feature = "quiche"), not(feature = "quinn")))]
118			panic!("no QUIC backend compiled; enable quinn or quiche feature");
119		});
120
121		#[cfg(feature = "quinn")]
122		let quinn = match backend {
123			QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
124			_ => None,
125		};
126
127		#[cfg(feature = "quiche")]
128		let quiche = match backend {
129			QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
130			_ => None,
131		};
132
133		Ok(Server {
134			accept: Default::default(),
135			moq: moq_lite::Server::new(),
136			#[cfg(feature = "iroh")]
137			iroh: None,
138			#[cfg(feature = "quinn")]
139			quinn,
140			#[cfg(feature = "quiche")]
141			quiche,
142		})
143	}
144
145	#[cfg(feature = "iroh")]
146	pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
147		self.iroh = iroh;
148		self
149	}
150
151	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
152		self.moq = self.moq.with_publish(publish);
153		self
154	}
155
156	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
157		self.moq = self.moq.with_consume(consume);
158		self
159	}
160
161	// Return the SHA256 fingerprints of all our certificates.
162	pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
163		#[cfg(feature = "quinn")]
164		if let Some(quinn) = self.quinn.as_ref() {
165			return quinn.tls_info();
166		}
167		#[cfg(feature = "quiche")]
168		if let Some(quiche) = self.quiche.as_ref() {
169			return quiche.tls_info();
170		}
171		unreachable!("no QUIC backend compiled");
172	}
173
174	#[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
175	pub async fn accept(&mut self) -> Option<Request> {
176		unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
177	}
178
179	/// Returns the next partially established QUIC or WebTransport session.
180	///
181	/// This returns a [Request] instead of a [web_transport_quinn::Session]
182	/// so the connection can be rejected early on an invalid path or missing auth.
183	///
184	/// The [Request] is either a WebTransport or a raw QUIC request.
185	/// Call [Request::ok] or [Request::close] to complete the handshake.
186	#[cfg(any(feature = "quinn", feature = "quiche", feature = "iroh"))]
187	pub async fn accept(&mut self) -> Option<Request> {
188		loop {
189			// tokio::select! does not support cfg directives on arms, so we need to create the futures here.
190			#[cfg(feature = "iroh")]
191			let iroh_accept = async {
192				#[cfg(feature = "iroh")]
193				if let Some(endpoint) = self.iroh.as_mut() {
194					return endpoint.accept().await;
195				}
196				None
197			};
198			#[cfg(not(feature = "iroh"))]
199			let iroh_accept = async { None::<()> };
200
201			#[cfg(feature = "quinn")]
202			let quinn_accept = async {
203				#[cfg(feature = "quinn")]
204				if let Some(quinn) = self.quinn.as_mut() {
205					return quinn.accept().await;
206				}
207				None
208			};
209			#[cfg(not(feature = "quinn"))]
210			let quinn_accept = async { None::<()> };
211
212			#[cfg(feature = "quiche")]
213			let quiche_accept = async {
214				#[cfg(feature = "quiche")]
215				if let Some(quiche) = self.quiche.as_mut() {
216					return quiche.accept().await;
217				}
218				None
219			};
220			#[cfg(not(feature = "quiche"))]
221			let quiche_accept = async { None::<()> };
222
223			let server = self.moq.clone();
224
225			tokio::select! {
226				Some(conn) = quinn_accept => {
227					#[cfg(feature = "quinn")]
228					self.accept.push(async move {
229						let quinn = super::quinn::QuinnRequest::accept(conn).await?;
230						Ok(Request {
231							server,
232							kind: RequestKind::Quinn(quinn),
233						})
234					}.boxed());
235				}
236				Some(conn) = quiche_accept => {
237					#[cfg(feature = "quiche")]
238					self.accept.push(async move {
239						let quiche = super::quiche::QuicheRequest::accept(conn).await?;
240						Ok(Request {
241							server,
242							kind: RequestKind::Quiche(quiche),
243						})
244					}.boxed());
245				}
246				Some(conn) = iroh_accept => {
247					#[cfg(feature = "iroh")]
248					self.accept.push(async move {
249						let iroh = super::iroh::IrohRequest::accept(conn).await?;
250						Ok(Request {
251							server,
252							kind: RequestKind::Iroh(iroh),
253						})
254					}.boxed());
255				}
256				Some(res) = self.accept.next() => {
257					match res {
258						Ok(session) => return Some(session),
259						Err(err) => tracing::debug!(%err, "failed to accept session"),
260					}
261				}
262				_ = tokio::signal::ctrl_c() => {
263					self.close();
264					tokio::time::sleep(std::time::Duration::from_millis(100)).await;
265					return None;
266				}
267			}
268		}
269	}
270
271	#[cfg(feature = "iroh")]
272	pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
273		self.iroh.as_ref()
274	}
275
276	pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
277		#[cfg(feature = "quinn")]
278		if let Some(quinn) = self.quinn.as_ref() {
279			return quinn.local_addr();
280		}
281		#[cfg(feature = "quiche")]
282		if let Some(quiche) = self.quiche.as_ref() {
283			return quiche.local_addr();
284		}
285		unreachable!("no QUIC backend compiled");
286	}
287
288	pub fn close(&mut self) {
289		#[cfg(feature = "quinn")]
290		if let Some(quinn) = self.quinn.as_mut() {
291			quinn.close();
292		}
293		#[cfg(feature = "quiche")]
294		if let Some(quiche) = self.quiche.as_mut() {
295			quiche.close();
296		}
297		unreachable!("no QUIC backend compiled");
298	}
299}
300
301/// An incoming connection that can be accepted or rejected.
302pub(crate) enum RequestKind {
303	#[cfg(feature = "quinn")]
304	Quinn(crate::quinn::QuinnRequest),
305	#[cfg(feature = "quiche")]
306	Quiche(crate::quiche::QuicheRequest),
307	#[cfg(feature = "iroh")]
308	Iroh(crate::iroh::IrohRequest),
309}
310
311/// An incoming MoQ session that can be accepted or rejected.
312///
313/// [Self::with_publish] and [Self::with_consume] will configure what will be published and consumed from the session respectively.
314/// Otherwise, the Server's configuration is used by default.
315pub struct Request {
316	server: moq_lite::Server,
317	kind: RequestKind,
318}
319
320impl Request {
321	/// Reject the session, returning your favorite HTTP status code.
322	pub async fn close(self, _code: u16) -> anyhow::Result<()> {
323		match self.kind {
324			#[cfg(feature = "quinn")]
325			RequestKind::Quinn(request) => {
326				let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
327				request.close(status).await?;
328				Ok(())
329			}
330			#[cfg(feature = "quiche")]
331			RequestKind::Quiche(request) => {
332				let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
333				request
334					.reject(status)
335					.await
336					.map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
337				Ok(())
338			}
339			#[cfg(feature = "iroh")]
340			RequestKind::Iroh(request) => {
341				let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
342				request.close(status).await?;
343				Ok(())
344			}
345		}
346	}
347
348	/// Publish the given origin to the session.
349	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
350		self.server = self.server.with_publish(publish);
351		self
352	}
353
354	/// Consume the given origin from the session.
355	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
356		self.server = self.server.with_consume(consume);
357		self
358	}
359
360	/// Accept the session, performing rest of the MoQ handshake.
361	pub async fn ok(self) -> anyhow::Result<Session> {
362		match self.kind {
363			#[cfg(feature = "quinn")]
364			RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
365			#[cfg(feature = "quiche")]
366			RequestKind::Quiche(request) => {
367				let conn = request
368					.ok()
369					.await
370					.map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
371				Ok(self.server.accept(conn).await?)
372			}
373			#[cfg(feature = "iroh")]
374			RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
375		}
376	}
377
378	/// Returns the URL provided by the client.
379	pub fn url(&self) -> Option<&Url> {
380		#[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
381		unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
382
383		match self.kind {
384			#[cfg(feature = "quinn")]
385			RequestKind::Quinn(ref request) => request.url(),
386			#[cfg(feature = "quiche")]
387			RequestKind::Quiche(ref request) => request.url(),
388			#[cfg(feature = "iroh")]
389			RequestKind::Iroh(ref request) => request.url(),
390		}
391	}
392}
393
394/// TLS certificate information including fingerprints.
395#[derive(Debug)]
396pub struct ServerTlsInfo {
397	#[cfg(feature = "quinn")]
398	pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
399	pub fingerprints: Vec<String>,
400}
401
402/// Server ID for QUIC-LB support.
403#[serde_with::serde_as]
404#[derive(Clone, serde::Serialize, serde::Deserialize)]
405pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
406
407impl ServerId {
408	#[allow(dead_code)]
409	pub(crate) fn len(&self) -> usize {
410		self.0.len()
411	}
412}
413
414impl std::fmt::Debug for ServerId {
415	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416		f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
417	}
418}
419
420impl std::str::FromStr for ServerId {
421	type Err = hex::FromHexError;
422
423	fn from_str(s: &str) -> Result<Self, Self::Err> {
424		hex::decode(s).map(Self)
425	}
426}