Skip to main content

moq_native/
quinn.rs

1use crate::client::ClientConfig;
2use crate::server::{ServerConfig, ServerId};
3use crate::tls::{FingerprintVerifier, ServeCerts};
4use std::sync::{Arc, RwLock};
5use std::time::Duration;
6use std::{net, time};
7use url::Url;
8
9pub use web_transport_quinn;
10
11/// Errors specific to the quinn QUIC backend.
12#[derive(Debug, thiserror::Error)]
13#[non_exhaustive]
14pub enum Error {
15	#[error("failed to bind UDP socket")]
16	BindSocket(#[source] std::io::Error),
17
18	#[error("failed to create QUIC endpoint")]
19	CreateEndpoint(#[source] std::io::Error),
20
21	#[error("no async runtime")]
22	NoRuntime,
23
24	#[error("failed to get local address")]
25	LocalAddr(#[source] std::io::Error),
26
27	#[error("failed to resolve bind address")]
28	ResolveBind(#[source] std::io::Error),
29
30	#[error("invalid DNS name")]
31	InvalidDnsName,
32
33	#[error("failed DNS lookup")]
34	DnsLookup(#[source] std::io::Error),
35
36	#[error("no DNS entries")]
37	NoDnsEntries,
38
39	#[error("failed to fetch fingerprint")]
40	FetchFingerprint(#[source] reqwest::Error),
41
42	#[error("fingerprint request failed")]
43	FingerprintStatus(#[source] reqwest::Error),
44
45	#[error("failed to read fingerprint")]
46	ReadFingerprint(#[source] reqwest::Error),
47
48	#[error("invalid fingerprint")]
49	InvalidFingerprint(#[from] hex::FromHexError),
50
51	#[error("url scheme must be 'https', 'moqt', or 'moql'")]
52	InvalidScheme,
53
54	#[error("unsupported URL scheme: {0}")]
55	UnsupportedScheme(String),
56
57	#[error("missing handshake data")]
58	MissingHandshake,
59
60	#[error("missing ALPN")]
61	MissingAlpn,
62
63	#[error("failed to decode ALPN")]
64	DecodeAlpn(#[from] std::string::FromUtf8Error),
65
66	#[error("unsupported ALPN: {0}")]
67	UnsupportedAlpn(String),
68
69	#[error("missing server name for raw QUIC connection")]
70	MissingServerName,
71
72	#[error("failed to construct URL from server name")]
73	BuildUrl(#[source] url::ParseError),
74
75	#[error("quic_lb_nonce must be at least 4")]
76	QuicLbNonceTooSmall,
77
78	#[error("connection ID length ({0}) exceeds maximum of 20")]
79	QuicLbCidTooLong(usize),
80
81	#[error("failed to build client certificate verifier")]
82	ClientVerifier(#[source] rustls::server::VerifierBuilderError),
83
84	#[error(transparent)]
85	NoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
86
87	#[error(transparent)]
88	Connect(#[from] quinn::ConnectError),
89
90	#[error(transparent)]
91	Connection(#[from] quinn::ConnectionError),
92
93	#[error(transparent)]
94	Client(#[from] web_transport_quinn::ClientError),
95
96	#[error(transparent)]
97	Server(#[from] web_transport_quinn::ServerError),
98
99	#[error("failed to establish QUIC connection")]
100	Establish(#[source] quinn::ConnectionError),
101
102	#[error("failed to receive WebTransport request")]
103	RecvRequest(#[source] web_transport_quinn::ServerError),
104
105	#[error(transparent)]
106	Tls(#[from] crate::tls::Error),
107}
108
109type Result<T> = std::result::Result<T, Error>;
110
111// ── Client ──────────────────────────────────────────────────────────
112
113#[derive(Clone)]
114pub(crate) struct QuinnClient {
115	pub quic: quinn::Endpoint,
116	pub transport: Arc<quinn::TransportConfig>,
117	pub versions: moq_net::Versions,
118}
119
120impl QuinnClient {
121	pub fn new(config: &ClientConfig) -> Result<Self> {
122		let socket = std::net::UdpSocket::bind(config.bind).map_err(Error::BindSocket)?;
123
124		// TODO Validate the BBR implementation before enabling it
125		let mut transport = quinn::TransportConfig::default();
126		transport.max_idle_timeout(Some(time::Duration::from_secs(30).try_into().unwrap()));
127		transport.keep_alive_interval(Some(time::Duration::from_secs(5)));
128		transport.mtu_discovery_config(None); // Disable MTU discovery
129
130		let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
131		let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
132		transport.max_concurrent_bidi_streams(max_streams);
133		transport.max_concurrent_uni_streams(max_streams);
134
135		let transport = Arc::new(transport);
136
137		// There's a bit more boilerplate to make a generic endpoint.
138		let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
139		let endpoint_config = quinn::EndpointConfig::default();
140
141		// Create the generic QUIC endpoint.
142		let quic = quinn::Endpoint::new(endpoint_config, None, socket, runtime).map_err(Error::CreateEndpoint)?;
143
144		Ok(Self {
145			quic,
146			transport,
147			versions: config.versions(),
148		})
149	}
150
151	pub async fn connect(&self, tls: &rustls::ClientConfig, url: Url) -> Result<web_transport_quinn::Session> {
152		let mut url = url;
153		let mut config = tls.clone();
154
155		let host = url.host().ok_or(Error::InvalidDnsName)?.to_string();
156		let port = url.port().unwrap_or(443);
157
158		// Look up the DNS entry.
159		// Quinn doesn't support happy eyeballs, so we pick a single address,
160		// preferring one whose family matches the local socket so the OS
161		// doesn't reject it (notably on Windows, where IPv6 sockets aren't
162		// dual-stack by default).
163		let local = self.quic.local_addr().map_err(Error::LocalAddr)?;
164		let addrs = tokio::net::lookup_host((host.clone(), port))
165			.await
166			.map_err(Error::DnsLookup)?;
167		let ip = crate::util::pick_addr(addrs, local).ok_or(Error::NoDnsEntries)?;
168
169		if url.scheme() == "http" {
170			// Perform a HTTP request to fetch the certificate fingerprint.
171			let mut fingerprint = url.clone();
172			fingerprint.set_path("/certificate.sha256");
173			fingerprint.set_query(None);
174			fingerprint.set_fragment(None);
175
176			tracing::warn!(url = %fingerprint, "performing insecure HTTP request for certificate");
177
178			let resp = reqwest::get(fingerprint.as_str())
179				.await
180				.map_err(Error::FetchFingerprint)?
181				.error_for_status()
182				.map_err(Error::FingerprintStatus)?;
183
184			let fingerprint = resp.text().await.map_err(Error::ReadFingerprint)?;
185			let fingerprint = hex::decode(fingerprint.trim())?;
186
187			let verifier = FingerprintVerifier::new(config.crypto_provider().clone(), fingerprint);
188			config.dangerous().set_certificate_verifier(Arc::new(verifier));
189
190			url.set_scheme("https").expect("failed to set scheme");
191		}
192
193		let alpns: Vec<Vec<u8>> = match url.scheme() {
194			"https" => vec![web_transport_quinn::ALPN.as_bytes().to_vec()],
195			"moqt" | "moql" => self
196				.versions
197				.alpns()
198				.iter()
199				.map(|alpn| alpn.as_bytes().to_vec())
200				.collect(),
201			_ => return Err(Error::InvalidScheme),
202		};
203
204		config.alpn_protocols = alpns;
205		config.key_log = Arc::new(rustls::KeyLogFile::new());
206
207		let config: quinn::crypto::rustls::QuicClientConfig = config.try_into()?;
208		let mut config = quinn::ClientConfig::new(Arc::new(config));
209		config.transport_config(self.transport.clone());
210
211		tracing::debug!(%url, %ip, "connecting");
212
213		let connection = self.quic.connect_with(config, ip, &host)?.await?;
214		tracing::Span::current().record("id", connection.stable_id());
215
216		let mut request = web_transport_quinn::proto::ConnectRequest::new(url.clone());
217		for alpn in self.versions.alpns() {
218			request = request.with_protocol(alpn.to_string());
219		}
220
221		let session = match url.scheme() {
222			"https" => web_transport_quinn::Session::connect(connection, request).await?,
223			"moqt" | "moql" => {
224				let handshake = connection
225					.handshake_data()
226					.ok_or(Error::MissingHandshake)?
227					.downcast::<quinn::crypto::rustls::HandshakeData>()
228					.unwrap();
229
230				let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
231				let alpn = String::from_utf8(alpn)?;
232
233				let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
234				web_transport_quinn::Session::raw(connection, request, response)
235			}
236			_ => return Err(Error::UnsupportedScheme(url.scheme().to_string())),
237		};
238
239		Ok(session)
240	}
241}
242
243// ── Server ──────────────────────────────────────────────────────────
244
245pub(crate) struct QuinnServer {
246	pub quic: quinn::Endpoint,
247	pub certs: Arc<ServeCerts>,
248}
249
250impl QuinnServer {
251	pub fn new(config: ServerConfig) -> Result<Self> {
252		// Enable BBR congestion control
253		// TODO Validate the BBR implementation before enabling it
254		let mut transport = quinn::TransportConfig::default();
255		transport.max_idle_timeout(Some(Duration::from_secs(30).try_into().unwrap()));
256		transport.keep_alive_interval(Some(Duration::from_secs(5)));
257		transport.mtu_discovery_config(None); // Disable MTU discovery
258
259		let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
260		let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
261		transport.max_concurrent_bidi_streams(max_streams);
262		transport.max_concurrent_uni_streams(max_streams);
263
264		let transport = Arc::new(transport);
265
266		let provider = crate::crypto::provider();
267
268		let certs = ServeCerts::new(provider.clone());
269		certs.load_certs(&config.tls)?;
270		let certs = Arc::new(certs);
271
272		let tls_builder = rustls::ServerConfig::builder_with_provider(provider.clone())
273			.with_protocol_versions(&[&rustls::version::TLS13])
274			.map_err(crate::tls::Error::from)?;
275
276		let mut tls = if config.tls.root.is_empty() {
277			tls_builder.with_no_client_auth().with_cert_resolver(certs.clone())
278		} else {
279			let roots = config.tls.load_roots()?;
280			let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(Arc::new(roots), provider)
281				.allow_unauthenticated()
282				.build()
283				.map_err(Error::ClientVerifier)?;
284			tls_builder
285				.with_client_cert_verifier(verifier)
286				.with_cert_resolver(certs.clone())
287		};
288
289		// H3 is last because it requires WebTransport framing which not all H3 endpoints support.
290		let mut alpns: Vec<Vec<u8>> = config
291			.versions()
292			.alpns()
293			.iter()
294			.map(|alpn| alpn.as_bytes().to_vec())
295			.collect();
296		alpns.push(web_transport_quinn::ALPN.as_bytes().to_vec());
297
298		tls.alpn_protocols = alpns;
299		tls.key_log = Arc::new(rustls::KeyLogFile::new());
300
301		let tls: quinn::crypto::rustls::QuicServerConfig = tls.try_into()?;
302		let mut tls = quinn::ServerConfig::with_crypto(Arc::new(tls));
303		tls.transport_config(transport);
304
305		// Advertise the preferred_address transport parameter (RFC 9000 §9.6).
306		// Quinn allocates a fresh CID + reset token for the address during the handshake.
307		if let Some(addr) = config.preferred_v4 {
308			tls.preferred_address_v4(Some(addr));
309		}
310		if let Some(addr) = config.preferred_v6 {
311			tls.preferred_address_v6(Some(addr));
312		}
313
314		// There's a bit more boilerplate to make a generic endpoint.
315		let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
316
317		let listen =
318			crate::util::resolve(config.bind.as_deref(), crate::server::DEFAULT_BIND).map_err(Error::ResolveBind)?;
319
320		// Configure connection ID generator with server ID if provided
321		let mut endpoint_config = quinn::EndpointConfig::default();
322		if let Some(server_id) = config.quic_lb_id {
323			let nonce_len = config.quic_lb_nonce.unwrap_or(8);
324			if nonce_len < 4 {
325				return Err(Error::QuicLbNonceTooSmall);
326			}
327
328			let cid_len = 1 + server_id.len() + nonce_len;
329			if cid_len > 20 {
330				return Err(Error::QuicLbCidTooLong(cid_len));
331			}
332
333			tracing::info!(
334				?server_id,
335				nonce_len,
336				"using QUIC-LB compatible connection ID generation"
337			);
338			endpoint_config.cid_generator(move || Box::new(ServerIdGenerator::new(server_id.clone(), nonce_len)));
339		}
340
341		let socket = std::net::UdpSocket::bind(listen).map_err(Error::BindSocket)?;
342
343		// Create the generic QUIC endpoint.
344		let quic = quinn::Endpoint::new(endpoint_config, Some(tls), socket, runtime).map_err(Error::CreateEndpoint)?;
345
346		// Spawn the cert reload watcher only after endpoint creation succeeds,
347		// so we don't leave a dangling watcher on failure.
348		tokio::spawn(crate::tls::reload_certs(certs.clone(), config.tls.clone()));
349
350		Ok(Self { quic, certs })
351	}
352
353	pub fn accept(&self) -> impl std::future::Future<Output = Option<quinn::Incoming>> + '_ {
354		self.quic.accept()
355	}
356
357	pub fn tls_info(&self) -> Arc<RwLock<crate::tls::Info>> {
358		self.certs.info.clone()
359	}
360
361	pub fn local_addr(&self) -> Result<net::SocketAddr> {
362		self.quic.local_addr().map_err(Error::LocalAddr)
363	}
364
365	pub fn close(&self) {
366		self.quic.close(quinn::VarInt::from_u32(0), b"server shutdown");
367	}
368}
369
370// ── QuinnRequest ────────────────────────────────────────────────────
371
372/// A raw QUIC connection request without WebTransport framing (quinn backend).
373pub(crate) enum QuinnRequest {
374	Raw {
375		request: web_transport_quinn::proto::ConnectRequest,
376		response: web_transport_quinn::proto::ConnectResponse,
377		connection: quinn::Connection,
378	},
379	WebTransport {
380		request: web_transport_quinn::Request,
381		alpns: Vec<&'static str>,
382	},
383}
384
385impl QuinnRequest {
386	pub async fn accept(conn: quinn::Incoming, alpns: Vec<&'static str>) -> Result<Self> {
387		let mut conn = conn.accept()?;
388
389		let handshake = conn
390			.handshake_data()
391			.await?
392			.downcast::<quinn::crypto::rustls::HandshakeData>()
393			.unwrap();
394
395		let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
396		let alpn = String::from_utf8(alpn)?;
397		let host = handshake.server_name.unwrap_or_default();
398
399		tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepting");
400
401		// Wait for the QUIC connection to be established.
402		let conn = conn.await.map_err(Error::Establish)?;
403
404		let span = tracing::Span::current();
405		span.record("id", conn.stable_id()); // TODO can we get this earlier?
406		tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepted");
407
408		match alpn.as_str() {
409			web_transport_quinn::ALPN => {
410				// Wait for the CONNECT request.
411				let request = web_transport_quinn::Request::accept(conn)
412					.await
413					.map_err(Error::RecvRequest)?;
414				Ok(Self::WebTransport { request, alpns })
415			}
416			alpn if moq_net::ALPNS.contains(&alpn) => {
417				if host.is_empty() {
418					return Err(Error::MissingServerName);
419				}
420				let host_str = if host.contains(':') {
421					format!("[{}]", host)
422				} else {
423					host.clone()
424				};
425				let url = format!("moqt://{}", host_str).parse::<Url>().map_err(Error::BuildUrl)?;
426				let request = web_transport_quinn::proto::ConnectRequest::new(url);
427				let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
428				Ok(Self::Raw {
429					connection: conn,
430					request,
431					response,
432				})
433			}
434			_ => Err(Error::UnsupportedAlpn(alpn)),
435		}
436	}
437
438	/// Accept the session, returning a 200 OK if using WebTransport.
439	pub async fn ok(self) -> std::result::Result<web_transport_quinn::Session, web_transport_quinn::ServerError> {
440		match self {
441			QuinnRequest::Raw {
442				connection,
443				request,
444				response,
445			} => Ok(web_transport_quinn::Session::raw(connection, request, response)),
446			QuinnRequest::WebTransport { request, alpns } => {
447				let mut response = web_transport_quinn::proto::ConnectResponse::OK;
448				// Pick the first sub-protocol that we actually support.
449				// This is the WebTransport equivalent of ALPN negotiation.
450				// If no match is found, we default to no sub-protocol to support older
451				// clients that don't use ALPN. We assume moq-transport-14/moq-lite-02
452				// and perform the SETUP_x exchange instead.
453				if let Some(protocol) = request.protocols.iter().find(|p| alpns.contains(&p.as_str())) {
454					response = response.with_protocol(protocol);
455				}
456				request.respond(response).await
457			}
458		}
459	}
460
461	/// Returns the URL provided by the client.
462	pub fn url(&self) -> Option<&Url> {
463		match self {
464			QuinnRequest::Raw { .. } => None,
465			QuinnRequest::WebTransport { request, .. } => Some(&request.url),
466		}
467	}
468
469	/// Whether the peer presented a client certificate that rustls validated
470	/// against the configured `tls.root` during the handshake.
471	pub fn has_peer_certificate(&self) -> bool {
472		let conn = match self {
473			QuinnRequest::Raw { connection, .. } => connection,
474			QuinnRequest::WebTransport { request, .. } => request.conn(),
475		};
476		conn.peer_identity().is_some()
477	}
478
479	/// Reject the session with a status code.
480	pub async fn close(
481		self,
482		status: web_transport_quinn::http::StatusCode,
483	) -> std::result::Result<(), web_transport_quinn::ServerError> {
484		match self {
485			QuinnRequest::Raw { connection, .. } => {
486				connection.close(status.as_u16().into(), status.as_str().as_bytes());
487				Ok(())
488			}
489			QuinnRequest::WebTransport { request, alpns: _, .. } => request.reject(status).await,
490		}
491	}
492}
493
494// ── ServerIdGenerator ───────────────────────────────────────────────
495
496struct ServerIdGenerator {
497	server_id: ServerId,
498	nonce_len: usize,
499}
500
501impl ServerIdGenerator {
502	fn new(server_id: ServerId, nonce_len: usize) -> Self {
503		Self { server_id, nonce_len }
504	}
505}
506
507impl quinn::ConnectionIdGenerator for ServerIdGenerator {
508	fn generate_cid(&mut self) -> quinn::ConnectionId {
509		use rand::RngExt;
510		let cid_len = self.cid_len();
511		let mut cid = Vec::with_capacity(cid_len);
512		// First byte has "self-encoded length" of server ID + nonce
513		cid.push((cid_len - 1) as u8);
514		cid.extend(self.server_id.0.iter());
515		cid.extend(rand::rng().random_iter::<u8>().take(self.nonce_len));
516		quinn::ConnectionId::new(cid.as_slice())
517	}
518
519	fn cid_len(&self) -> usize {
520		1 + self.server_id.len() + self.nonce_len
521	}
522
523	fn cid_lifetime(&self) -> Option<Duration> {
524		None
525	}
526}