Skip to main content

moq_native/
server.rs

1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18/// TLS configuration for the server.
19///
20/// Certificate and keys must currently be files on disk.
21/// Alternatively, you can generate a self-signed certificate given a list of hostnames.
22#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26	/// Load the given certificate from disk.
27	#[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28	#[serde(default, skip_serializing_if = "Vec::is_empty")]
29	pub cert: Vec<PathBuf>,
30
31	/// Load the given key from disk.
32	#[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33	#[serde(default, skip_serializing_if = "Vec::is_empty")]
34	pub key: Vec<PathBuf>,
35
36	/// Or generate a new certificate and key with the given hostnames.
37	/// This won't be valid unless the client uses the fingerprint or disables verification.
38	#[arg(
39		long = "tls-generate",
40		id = "tls-generate",
41		value_delimiter = ',',
42		env = "MOQ_SERVER_TLS_GENERATE"
43	)]
44	#[serde(default, skip_serializing_if = "Vec::is_empty")]
45	pub generate: Vec<String>,
46}
47
48/// Configuration for the MoQ server.
49#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53	/// Listen for UDP packets on the given address.
54	/// Defaults to `[::]:443` if not provided.
55	#[serde(alias = "listen")]
56	#[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57	pub bind: Option<net::SocketAddr>,
58
59	/// The QUIC backend to use.
60	/// Auto-detected from compiled features if not specified.
61	#[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62	pub backend: Option<QuicBackend>,
63
64	/// Server ID to embed in connection IDs for QUIC-LB compatibility.
65	/// If set, connection IDs will be derived semi-deterministically.
66	#[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67	#[serde(default, skip_serializing_if = "Option::is_none")]
68	pub quic_lb_id: Option<ServerId>,
69
70	/// Number of random nonce bytes in QUIC-LB connection IDs.
71	/// Must be at least 4, and server_id + nonce + 1 must not exceed 20.
72	#[arg(
73		id = "server-quic-lb-nonce",
74		long = "server-quic-lb-nonce",
75		requires = "server-quic-lb-id",
76		env = "MOQ_SERVER_QUIC_LB_NONCE"
77	)]
78	#[serde(default, skip_serializing_if = "Option::is_none")]
79	pub quic_lb_nonce: Option<usize>,
80
81	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
82	#[serde(skip_serializing_if = "Option::is_none")]
83	#[arg(
84		id = "server-max-streams",
85		long = "server-max-streams",
86		env = "MOQ_SERVER_MAX_STREAMS"
87	)]
88	pub max_streams: Option<u64>,
89
90	#[command(flatten)]
91	#[serde(default)]
92	pub tls: ServerTlsConfig,
93}
94
95impl ServerConfig {
96	pub fn init(self) -> anyhow::Result<Server> {
97		Server::new(self)
98	}
99}
100
101/// Server for accepting MoQ connections over QUIC.
102///
103/// Create via [`ServerConfig::init`] or [`Server::new`].
104pub struct Server {
105	moq: moq_lite::Server,
106	accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
107	#[cfg(feature = "iroh")]
108	iroh: Option<iroh::Endpoint>,
109	#[cfg(feature = "quinn")]
110	quinn: Option<crate::quinn::QuinnServer>,
111	#[cfg(feature = "quiche")]
112	quiche: Option<crate::quiche::QuicheServer>,
113}
114
115impl Server {
116	pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
117		let backend = config.backend.clone().unwrap_or({
118			#[cfg(feature = "quinn")]
119			{
120				QuicBackend::Quinn
121			}
122			#[cfg(all(feature = "quiche", not(feature = "quinn")))]
123			{
124				QuicBackend::Quiche
125			}
126			#[cfg(all(not(feature = "quiche"), not(feature = "quinn")))]
127			panic!("no QUIC backend compiled; enable quinn or quiche feature");
128		});
129
130		#[cfg(feature = "quinn")]
131		#[allow(unreachable_patterns)]
132		let quinn = match backend {
133			QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
134			_ => None,
135		};
136
137		#[cfg(feature = "quiche")]
138		let quiche = match backend {
139			QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
140			_ => None,
141		};
142
143		Ok(Server {
144			accept: Default::default(),
145			moq: moq_lite::Server::new(),
146			#[cfg(feature = "iroh")]
147			iroh: None,
148			#[cfg(feature = "quinn")]
149			quinn,
150			#[cfg(feature = "quiche")]
151			quiche,
152		})
153	}
154
155	#[cfg(feature = "iroh")]
156	pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
157		self.iroh = iroh;
158		self
159	}
160
161	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
162		self.moq = self.moq.with_publish(publish);
163		self
164	}
165
166	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
167		self.moq = self.moq.with_consume(consume);
168		self
169	}
170
171	// Return the SHA256 fingerprints of all our certificates.
172	pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
173		#[cfg(feature = "quinn")]
174		if let Some(quinn) = self.quinn.as_ref() {
175			return quinn.tls_info();
176		}
177		#[cfg(feature = "quiche")]
178		if let Some(quiche) = self.quiche.as_ref() {
179			return quiche.tls_info();
180		}
181		unreachable!("no QUIC backend compiled");
182	}
183
184	#[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
185	pub async fn accept(&mut self) -> Option<Request> {
186		unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
187	}
188
189	/// Returns the next partially established QUIC or WebTransport session.
190	///
191	/// This returns a [Request] instead of a [web_transport_quinn::Session]
192	/// so the connection can be rejected early on an invalid path or missing auth.
193	///
194	/// The [Request] is either a WebTransport or a raw QUIC request.
195	/// Call [Request::ok] or [Request::close] to complete the handshake.
196	#[cfg(any(feature = "quinn", feature = "quiche", feature = "iroh"))]
197	pub async fn accept(&mut self) -> Option<Request> {
198		loop {
199			// tokio::select! does not support cfg directives on arms, so we need to create the futures here.
200			#[cfg(feature = "iroh")]
201			let iroh_accept = async {
202				#[cfg(feature = "iroh")]
203				if let Some(endpoint) = self.iroh.as_mut() {
204					return endpoint.accept().await;
205				}
206				None
207			};
208			#[cfg(not(feature = "iroh"))]
209			let iroh_accept = async { None::<()> };
210
211			#[cfg(feature = "quinn")]
212			let quinn_accept = async {
213				#[cfg(feature = "quinn")]
214				if let Some(quinn) = self.quinn.as_mut() {
215					return quinn.accept().await;
216				}
217				None
218			};
219			#[cfg(not(feature = "quinn"))]
220			let quinn_accept = async { None::<()> };
221
222			#[cfg(feature = "quiche")]
223			let quiche_accept = async {
224				#[cfg(feature = "quiche")]
225				if let Some(quiche) = self.quiche.as_mut() {
226					return quiche.accept().await;
227				}
228				None
229			};
230			#[cfg(not(feature = "quiche"))]
231			let quiche_accept = async { None::<()> };
232
233			let server = self.moq.clone();
234
235			tokio::select! {
236				Some(conn) = quinn_accept => {
237					#[cfg(feature = "quinn")]
238					self.accept.push(async move {
239						let quinn = super::quinn::QuinnRequest::accept(conn).await?;
240						Ok(Request {
241							server,
242							kind: RequestKind::Quinn(quinn),
243						})
244					}.boxed());
245				}
246				Some(_conn) = quiche_accept => {
247					#[cfg(feature = "quiche")]
248					self.accept.push(async move {
249						let quiche = super::quiche::QuicheRequest::accept(_conn).await?;
250						Ok(Request {
251							server,
252							kind: RequestKind::Quiche(quiche),
253						})
254					}.boxed());
255				}
256				Some(conn) = iroh_accept => {
257					#[cfg(feature = "iroh")]
258					self.accept.push(async move {
259						let iroh = super::iroh::IrohRequest::accept(conn).await?;
260						Ok(Request {
261							server,
262							kind: RequestKind::Iroh(iroh),
263						})
264					}.boxed());
265				}
266				Some(res) = self.accept.next() => {
267					match res {
268						Ok(session) => return Some(session),
269						Err(err) => tracing::debug!(%err, "failed to accept session"),
270					}
271				}
272				_ = tokio::signal::ctrl_c() => {
273					self.close();
274					tokio::time::sleep(std::time::Duration::from_millis(100)).await;
275					return None;
276				}
277			}
278		}
279	}
280
281	#[cfg(feature = "iroh")]
282	pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
283		self.iroh.as_ref()
284	}
285
286	pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
287		#[cfg(feature = "quinn")]
288		if let Some(quinn) = self.quinn.as_ref() {
289			return quinn.local_addr();
290		}
291		#[cfg(feature = "quiche")]
292		if let Some(quiche) = self.quiche.as_ref() {
293			return quiche.local_addr();
294		}
295		unreachable!("no QUIC backend compiled");
296	}
297
298	pub fn close(&mut self) {
299		#[cfg(feature = "quinn")]
300		if let Some(quinn) = self.quinn.as_mut() {
301			quinn.close();
302		}
303		#[cfg(feature = "quiche")]
304		if let Some(quiche) = self.quiche.as_mut() {
305			quiche.close();
306		}
307		unreachable!("no QUIC backend compiled");
308	}
309}
310
311/// An incoming connection that can be accepted or rejected.
312pub(crate) enum RequestKind {
313	#[cfg(feature = "quinn")]
314	Quinn(crate::quinn::QuinnRequest),
315	#[cfg(feature = "quiche")]
316	Quiche(crate::quiche::QuicheRequest),
317	#[cfg(feature = "iroh")]
318	Iroh(crate::iroh::IrohRequest),
319}
320
321/// An incoming MoQ session that can be accepted or rejected.
322///
323/// [Self::with_publish] and [Self::with_consume] will configure what will be published and consumed from the session respectively.
324/// Otherwise, the Server's configuration is used by default.
325pub struct Request {
326	server: moq_lite::Server,
327	kind: RequestKind,
328}
329
330impl Request {
331	/// Reject the session, returning your favorite HTTP status code.
332	pub async fn close(self, _code: u16) -> anyhow::Result<()> {
333		match self.kind {
334			#[cfg(feature = "quinn")]
335			RequestKind::Quinn(request) => {
336				let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
337				request.close(status).await?;
338				Ok(())
339			}
340			#[cfg(feature = "quiche")]
341			RequestKind::Quiche(request) => {
342				let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
343				request
344					.reject(status)
345					.await
346					.map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
347				Ok(())
348			}
349			#[cfg(feature = "iroh")]
350			RequestKind::Iroh(request) => {
351				let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
352				request.close(status).await?;
353				Ok(())
354			}
355		}
356	}
357
358	/// Publish the given origin to the session.
359	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
360		self.server = self.server.with_publish(publish);
361		self
362	}
363
364	/// Consume the given origin from the session.
365	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
366		self.server = self.server.with_consume(consume);
367		self
368	}
369
370	/// Accept the session, performing rest of the MoQ handshake.
371	pub async fn ok(self) -> anyhow::Result<Session> {
372		match self.kind {
373			#[cfg(feature = "quinn")]
374			RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
375			#[cfg(feature = "quiche")]
376			RequestKind::Quiche(request) => {
377				let conn = request
378					.ok()
379					.await
380					.map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
381				Ok(self.server.accept(conn).await?)
382			}
383			#[cfg(feature = "iroh")]
384			RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
385		}
386	}
387
388	/// Returns the URL provided by the client.
389	pub fn url(&self) -> Option<&Url> {
390		#[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
391		unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
392
393		match self.kind {
394			#[cfg(feature = "quinn")]
395			RequestKind::Quinn(ref request) => request.url(),
396			#[cfg(feature = "quiche")]
397			RequestKind::Quiche(ref request) => request.url(),
398			#[cfg(feature = "iroh")]
399			RequestKind::Iroh(ref request) => request.url(),
400		}
401	}
402}
403
404/// TLS certificate information including fingerprints.
405#[derive(Debug)]
406pub struct ServerTlsInfo {
407	#[cfg(feature = "quinn")]
408	pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
409	pub fingerprints: Vec<String>,
410}
411
412/// Server ID for QUIC-LB support.
413#[serde_with::serde_as]
414#[derive(Clone, serde::Serialize, serde::Deserialize)]
415pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
416
417impl ServerId {
418	#[allow(dead_code)]
419	pub(crate) fn len(&self) -> usize {
420		self.0.len()
421	}
422}
423
424impl std::fmt::Debug for ServerId {
425	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426		f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
427	}
428}
429
430impl std::str::FromStr for ServerId {
431	type Err = hex::FromHexError;
432
433	fn from_str(s: &str) -> Result<Self, Self::Err> {
434		hex::decode(s).map(Self)
435	}
436}