Skip to main content

moq_native/
quinn.rs

1use crate::client::ClientConfig;
2use crate::server::{ServerConfig, ServerId};
3use crate::tls::{FingerprintVerifier, ServeCerts};
4use std::sync::{Arc, RwLock};
5use std::time::Duration;
6use std::{net, time};
7use url::Url;
8
9pub use web_transport_quinn;
10
11/// Errors specific to the quinn QUIC backend.
12#[derive(Debug, thiserror::Error)]
13#[non_exhaustive]
14pub enum Error {
15	#[error("failed to bind UDP socket")]
16	BindSocket(#[source] std::io::Error),
17
18	#[error("failed to create QUIC endpoint")]
19	CreateEndpoint(#[source] std::io::Error),
20
21	#[error("no async runtime")]
22	NoRuntime,
23
24	#[error("failed to get local address")]
25	LocalAddr(#[source] std::io::Error),
26
27	#[error("failed to resolve bind address")]
28	ResolveBind(#[source] std::io::Error),
29
30	#[error("invalid DNS name")]
31	InvalidDnsName,
32
33	#[error("failed DNS lookup")]
34	DnsLookup(#[source] std::io::Error),
35
36	#[error("no DNS entries")]
37	NoDnsEntries,
38
39	#[error("failed to fetch fingerprint")]
40	FetchFingerprint(#[source] reqwest::Error),
41
42	#[error("fingerprint request failed")]
43	FingerprintStatus(#[source] reqwest::Error),
44
45	#[error("failed to read fingerprint")]
46	ReadFingerprint(#[source] reqwest::Error),
47
48	#[error("invalid fingerprint")]
49	InvalidFingerprint(#[from] hex::FromHexError),
50
51	#[error("url scheme must be 'https', 'moqt', or 'moql'")]
52	InvalidScheme,
53
54	#[error("unsupported URL scheme: {0}")]
55	UnsupportedScheme(String),
56
57	#[error("missing handshake data")]
58	MissingHandshake,
59
60	#[error("missing ALPN")]
61	MissingAlpn,
62
63	#[error("failed to decode ALPN")]
64	DecodeAlpn(#[from] std::string::FromUtf8Error),
65
66	#[error("unsupported ALPN: {0}")]
67	UnsupportedAlpn(String),
68
69	#[error("missing server name for raw QUIC connection")]
70	MissingServerName,
71
72	#[error("failed to construct URL from server name")]
73	BuildUrl(#[source] url::ParseError),
74
75	#[error("quic_lb_nonce must be at least 4")]
76	QuicLbNonceTooSmall,
77
78	#[error("connection ID length ({0}) exceeds maximum of 20")]
79	QuicLbCidTooLong(usize),
80
81	#[error("failed to build client certificate verifier")]
82	ClientVerifier(#[source] rustls::server::VerifierBuilderError),
83
84	#[error(transparent)]
85	NoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
86
87	#[error(transparent)]
88	Connect(#[from] quinn::ConnectError),
89
90	#[error(transparent)]
91	Connection(#[from] quinn::ConnectionError),
92
93	#[error(transparent)]
94	Client(#[from] web_transport_quinn::ClientError),
95
96	#[error(transparent)]
97	ConnectRejected(#[from] crate::ConnectError),
98
99	#[error(transparent)]
100	Server(#[from] web_transport_quinn::ServerError),
101
102	#[error("failed to establish QUIC connection")]
103	Establish(#[source] quinn::ConnectionError),
104
105	#[error("failed to receive WebTransport request")]
106	RecvRequest(#[source] web_transport_quinn::ServerError),
107
108	#[error(transparent)]
109	Tls(#[from] crate::tls::Error),
110}
111
112type Result<T> = std::result::Result<T, Error>;
113
114// ── Client ──────────────────────────────────────────────────────────
115
116#[derive(Clone)]
117pub(crate) struct QuinnClient {
118	pub quic: quinn::Endpoint,
119	pub transport: Arc<quinn::TransportConfig>,
120	pub versions: moq_net::Versions,
121}
122
123impl QuinnClient {
124	pub fn new(config: &ClientConfig) -> Result<Self> {
125		let socket = crate::bind::udp(config.bind).map_err(Error::BindSocket)?;
126
127		// TODO Validate the BBR implementation before enabling it
128		let mut transport = quinn::TransportConfig::default();
129		transport.max_idle_timeout(Some(time::Duration::from_secs(30).try_into().unwrap()));
130		transport.keep_alive_interval(Some(time::Duration::from_secs(5)));
131		transport.mtu_discovery_config(None); // Disable MTU discovery
132
133		let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
134		let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
135		transport.max_concurrent_bidi_streams(max_streams);
136		transport.max_concurrent_uni_streams(max_streams);
137
138		let transport = Arc::new(transport);
139
140		// There's a bit more boilerplate to make a generic endpoint.
141		let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
142		let endpoint_config = quinn::EndpointConfig::default();
143
144		// Create the generic QUIC endpoint.
145		let quic = quinn::Endpoint::new(endpoint_config, None, socket, runtime).map_err(Error::CreateEndpoint)?;
146
147		Ok(Self {
148			quic,
149			transport,
150			versions: config.versions(),
151		})
152	}
153
154	pub async fn connect(&self, tls: &rustls::ClientConfig, url: Url) -> Result<web_transport_quinn::Session> {
155		let mut url = url;
156		let mut config = tls.clone();
157
158		let host = url.host().ok_or(Error::InvalidDnsName)?.to_string();
159		let port = url.port().unwrap_or(443);
160
161		// Look up the DNS entry.
162		// Quinn doesn't support happy eyeballs, so we pick a single address,
163		// preferring one whose family matches the local socket so the OS
164		// doesn't reject it (notably on Windows, where IPv6 sockets aren't
165		// dual-stack by default).
166		let local = self.quic.local_addr().map_err(Error::LocalAddr)?;
167		let addrs = tokio::net::lookup_host((host.clone(), port))
168			.await
169			.map_err(Error::DnsLookup)?;
170		let ip = crate::util::pick_addr(addrs, local).ok_or(Error::NoDnsEntries)?;
171
172		if url.scheme() == "http" {
173			// Perform a HTTP request to fetch the certificate fingerprint.
174			let mut fingerprint = url.clone();
175			fingerprint.set_path("/certificate.sha256");
176			fingerprint.set_query(None);
177			fingerprint.set_fragment(None);
178
179			tracing::warn!(url = %fingerprint, "performing insecure HTTP request for certificate");
180
181			let resp = reqwest::get(fingerprint.as_str())
182				.await
183				.map_err(Error::FetchFingerprint)?
184				.error_for_status()
185				.map_err(Error::FingerprintStatus)?;
186
187			let fingerprint = resp.text().await.map_err(Error::ReadFingerprint)?;
188			let fingerprint = hex::decode(fingerprint.trim())?;
189
190			let verifier = FingerprintVerifier::new(config.crypto_provider().clone(), vec![fingerprint]);
191			config.dangerous().set_certificate_verifier(Arc::new(verifier));
192
193			url.set_scheme("https").expect("failed to set scheme");
194		}
195
196		let alpns: Vec<Vec<u8>> = match url.scheme() {
197			"https" => vec![web_transport_quinn::ALPN.as_bytes().to_vec()],
198			"moqt" | "moql" => self
199				.versions
200				.alpns()
201				.iter()
202				.map(|alpn| alpn.as_bytes().to_vec())
203				.collect(),
204			_ => return Err(Error::InvalidScheme),
205		};
206
207		config.alpn_protocols = alpns;
208		config.key_log = Arc::new(rustls::KeyLogFile::new());
209
210		let config: quinn::crypto::rustls::QuicClientConfig = config.try_into()?;
211		let mut config = quinn::ClientConfig::new(Arc::new(config));
212		config.transport_config(self.transport.clone());
213
214		tracing::debug!(%url, %ip, "connecting");
215
216		let connection = self.quic.connect_with(config, ip, &host)?.await?;
217		tracing::Span::current().record("id", connection.stable_id());
218
219		let mut request = web_transport_quinn::proto::ConnectRequest::new(url.clone());
220		for alpn in self.versions.alpns() {
221			request = request.with_protocol(alpn.to_string());
222		}
223
224		let session = match url.scheme() {
225			"https" => web_transport_quinn::Session::connect(connection, request)
226				.await
227				.map_err(map_client_error)?,
228			"moqt" | "moql" => {
229				let handshake = connection
230					.handshake_data()
231					.ok_or(Error::MissingHandshake)?
232					.downcast::<quinn::crypto::rustls::HandshakeData>()
233					.unwrap();
234
235				let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
236				let alpn = String::from_utf8(alpn)?;
237
238				let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
239				web_transport_quinn::Session::raw(connection, request, response)
240			}
241			_ => return Err(Error::UnsupportedScheme(url.scheme().to_string())),
242		};
243
244		Ok(session)
245	}
246}
247
248impl Error {
249	pub(crate) fn connect_error(&self) -> Option<crate::ConnectError> {
250		match self {
251			Self::ConnectRejected(err) => Some(*err),
252			Self::Client(err) => classify_client_error(err),
253			_ => None,
254		}
255	}
256}
257
258fn map_client_error(err: web_transport_quinn::ClientError) -> Error {
259	if let Some(err) = classify_client_error(&err) {
260		return err.into();
261	}
262
263	err.into()
264}
265
266fn classify_client_error(err: &web_transport_quinn::ClientError) -> Option<crate::ConnectError> {
267	match err {
268		web_transport_quinn::ClientError::HttpError(err) => classify_connect_error(err),
269		_ => None,
270	}
271}
272
273fn classify_connect_error(err: &web_transport_quinn::ConnectError) -> Option<crate::ConnectError> {
274	match err {
275		web_transport_quinn::ConnectError::ErrorStatus(status) => crate::ConnectError::from_status_u16(status.as_u16()),
276		web_transport_quinn::ConnectError::ProtoError(err) => classify_proto_error(err),
277		_ => None,
278	}
279}
280
281fn classify_proto_error(err: &web_transport_quinn::proto::ConnectError) -> Option<crate::ConnectError> {
282	match err {
283		web_transport_quinn::proto::ConnectError::ErrorStatus(status)
284		| web_transport_quinn::proto::ConnectError::WrongStatus(Some(status)) => {
285			crate::ConnectError::from_status_u16(status.as_u16())
286		}
287		_ => None,
288	}
289}
290
291// ── Server ──────────────────────────────────────────────────────────
292
293pub(crate) struct QuinnServer {
294	pub quic: quinn::Endpoint,
295	pub certs: Arc<ServeCerts>,
296}
297
298impl QuinnServer {
299	pub fn new(config: ServerConfig) -> Result<Self> {
300		// Enable BBR congestion control
301		// TODO Validate the BBR implementation before enabling it
302		let mut transport = quinn::TransportConfig::default();
303		transport.max_idle_timeout(Some(Duration::from_secs(30).try_into().unwrap()));
304		transport.keep_alive_interval(Some(Duration::from_secs(5)));
305		transport.mtu_discovery_config(None); // Disable MTU discovery
306
307		let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
308		let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
309		transport.max_concurrent_bidi_streams(max_streams);
310		transport.max_concurrent_uni_streams(max_streams);
311
312		let transport = Arc::new(transport);
313
314		let provider = crate::crypto::provider();
315
316		let certs = ServeCerts::new(provider.clone());
317		certs.load_certs(&config.tls)?;
318		let certs = Arc::new(certs);
319
320		let tls_builder = rustls::ServerConfig::builder_with_provider(provider.clone())
321			.with_protocol_versions(&[&rustls::version::TLS13])
322			.map_err(crate::tls::Error::from)?;
323
324		let mut tls = if config.tls.root.is_empty() {
325			tls_builder.with_no_client_auth().with_cert_resolver(certs.clone())
326		} else {
327			let roots = config.tls.load_roots()?;
328			let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(Arc::new(roots), provider)
329				.allow_unauthenticated()
330				.build()
331				.map_err(Error::ClientVerifier)?;
332			tls_builder
333				.with_client_cert_verifier(verifier)
334				.with_cert_resolver(certs.clone())
335		};
336
337		// H3 is last because it requires WebTransport framing which not all H3 endpoints support.
338		let mut alpns: Vec<Vec<u8>> = config
339			.versions()
340			.alpns()
341			.iter()
342			.map(|alpn| alpn.as_bytes().to_vec())
343			.collect();
344		alpns.push(web_transport_quinn::ALPN.as_bytes().to_vec());
345
346		tls.alpn_protocols = alpns;
347		tls.key_log = Arc::new(rustls::KeyLogFile::new());
348
349		let tls: quinn::crypto::rustls::QuicServerConfig = tls.try_into()?;
350		let mut tls = quinn::ServerConfig::with_crypto(Arc::new(tls));
351		tls.transport_config(transport);
352
353		// Advertise the preferred_address transport parameter (RFC 9000 §9.6).
354		// Quinn allocates a fresh CID + reset token for the address during the handshake.
355		if let Some(addr) = config.preferred_v4 {
356			tls.preferred_address_v4(Some(addr));
357		}
358		if let Some(addr) = config.preferred_v6 {
359			tls.preferred_address_v6(Some(addr));
360		}
361
362		// There's a bit more boilerplate to make a generic endpoint.
363		let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
364
365		let listen =
366			crate::util::resolve(config.bind.as_deref(), crate::server::DEFAULT_BIND).map_err(Error::ResolveBind)?;
367
368		// Configure connection ID generator with server ID if provided
369		let mut endpoint_config = quinn::EndpointConfig::default();
370		if let Some(server_id) = config.quic_lb_id {
371			let nonce_len = config.quic_lb_nonce.unwrap_or(8);
372			if nonce_len < 4 {
373				return Err(Error::QuicLbNonceTooSmall);
374			}
375
376			let cid_len = 1 + server_id.len() + nonce_len;
377			if cid_len > 20 {
378				return Err(Error::QuicLbCidTooLong(cid_len));
379			}
380
381			tracing::info!(
382				?server_id,
383				nonce_len,
384				"using QUIC-LB compatible connection ID generation"
385			);
386			endpoint_config.cid_generator(move || Box::new(ServerIdGenerator::new(server_id.clone(), nonce_len)));
387		}
388
389		let socket = crate::bind::udp(listen).map_err(Error::BindSocket)?;
390
391		// Create the generic QUIC endpoint.
392		let quic = quinn::Endpoint::new(endpoint_config, Some(tls), socket, runtime).map_err(Error::CreateEndpoint)?;
393
394		// Spawn the cert reload watcher only after endpoint creation succeeds,
395		// so we don't leave a dangling watcher on failure.
396		tokio::spawn(crate::tls::reload_certs(certs.clone(), config.tls.clone()));
397
398		Ok(Self { quic, certs })
399	}
400
401	pub fn accept(&self) -> impl std::future::Future<Output = Option<quinn::Incoming>> + '_ {
402		self.quic.accept()
403	}
404
405	pub fn tls_info(&self) -> Arc<RwLock<crate::tls::Info>> {
406		self.certs.info.clone()
407	}
408
409	pub fn local_addr(&self) -> Result<net::SocketAddr> {
410		self.quic.local_addr().map_err(Error::LocalAddr)
411	}
412
413	pub fn close(&self) {
414		self.quic.close(quinn::VarInt::from_u32(0), b"server shutdown");
415	}
416}
417
418// ── QuinnRequest ────────────────────────────────────────────────────
419
420/// A raw QUIC connection request without WebTransport framing (quinn backend).
421pub(crate) enum QuinnRequest {
422	Raw {
423		request: web_transport_quinn::proto::ConnectRequest,
424		response: web_transport_quinn::proto::ConnectResponse,
425		connection: quinn::Connection,
426	},
427	WebTransport {
428		request: web_transport_quinn::Request,
429		alpns: Vec<&'static str>,
430	},
431}
432
433impl QuinnRequest {
434	pub async fn accept(conn: quinn::Incoming, alpns: Vec<&'static str>) -> Result<Self> {
435		let mut conn = conn.accept()?;
436
437		let handshake = conn
438			.handshake_data()
439			.await?
440			.downcast::<quinn::crypto::rustls::HandshakeData>()
441			.unwrap();
442
443		let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
444		let alpn = String::from_utf8(alpn)?;
445		let host = handshake.server_name.unwrap_or_default();
446
447		tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepting");
448
449		// Wait for the QUIC connection to be established.
450		let conn = conn.await.map_err(Error::Establish)?;
451
452		let span = tracing::Span::current();
453		span.record("id", conn.stable_id()); // TODO can we get this earlier?
454		tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepted");
455
456		match alpn.as_str() {
457			web_transport_quinn::ALPN => {
458				// Wait for the CONNECT request.
459				let request = web_transport_quinn::Request::accept(conn)
460					.await
461					.map_err(Error::RecvRequest)?;
462				Ok(Self::WebTransport { request, alpns })
463			}
464			alpn if moq_net::ALPNS.contains(&alpn) => {
465				if host.is_empty() {
466					return Err(Error::MissingServerName);
467				}
468				let host_str = if host.contains(':') {
469					format!("[{}]", host)
470				} else {
471					host.clone()
472				};
473				let url = format!("moqt://{}", host_str).parse::<Url>().map_err(Error::BuildUrl)?;
474				let request = web_transport_quinn::proto::ConnectRequest::new(url);
475				let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
476				Ok(Self::Raw {
477					connection: conn,
478					request,
479					response,
480				})
481			}
482			_ => Err(Error::UnsupportedAlpn(alpn)),
483		}
484	}
485
486	/// Accept the session, returning a 200 OK if using WebTransport.
487	pub async fn ok(self) -> std::result::Result<web_transport_quinn::Session, web_transport_quinn::ServerError> {
488		match self {
489			QuinnRequest::Raw {
490				connection,
491				request,
492				response,
493			} => Ok(web_transport_quinn::Session::raw(connection, request, response)),
494			QuinnRequest::WebTransport { request, alpns } => {
495				let mut response = web_transport_quinn::proto::ConnectResponse::OK;
496				// Pick the first sub-protocol that we actually support.
497				// This is the WebTransport equivalent of ALPN negotiation.
498				// If no match is found, we default to no sub-protocol to support older
499				// clients that don't use ALPN. We assume moq-transport-14/moq-lite-02
500				// and perform the SETUP_x exchange instead.
501				if let Some(protocol) = request.protocols.iter().find(|p| alpns.contains(&p.as_str())) {
502					response = response.with_protocol(protocol);
503				}
504				request.respond(response).await
505			}
506		}
507	}
508
509	/// Returns the URL provided by the client.
510	pub fn url(&self) -> Option<&Url> {
511		match self {
512			QuinnRequest::Raw { .. } => None,
513			QuinnRequest::WebTransport { request, .. } => Some(&request.url),
514		}
515	}
516
517	/// Whether the peer presented a client certificate that rustls validated
518	/// against the configured `tls.root` during the handshake.
519	pub fn has_peer_certificate(&self) -> bool {
520		let conn = match self {
521			QuinnRequest::Raw { connection, .. } => connection,
522			QuinnRequest::WebTransport { request, .. } => request.conn(),
523		};
524		conn.peer_identity().is_some()
525	}
526
527	/// Reject the session with a status code.
528	pub async fn close(
529		self,
530		status: web_transport_quinn::http::StatusCode,
531	) -> std::result::Result<(), web_transport_quinn::ServerError> {
532		match self {
533			QuinnRequest::Raw { connection, .. } => {
534				connection.close(status.as_u16().into(), status.as_str().as_bytes());
535				Ok(())
536			}
537			QuinnRequest::WebTransport { request, alpns: _, .. } => request.reject(status).await,
538		}
539	}
540}
541
542// ── ServerIdGenerator ───────────────────────────────────────────────
543
544struct ServerIdGenerator {
545	server_id: ServerId,
546	nonce_len: usize,
547}
548
549impl ServerIdGenerator {
550	fn new(server_id: ServerId, nonce_len: usize) -> Self {
551		Self { server_id, nonce_len }
552	}
553}
554
555impl quinn::ConnectionIdGenerator for ServerIdGenerator {
556	fn generate_cid(&mut self) -> quinn::ConnectionId {
557		use rand::RngExt;
558		let cid_len = self.cid_len();
559		let mut cid = Vec::with_capacity(cid_len);
560		// First byte has "self-encoded length" of server ID + nonce
561		cid.push((cid_len - 1) as u8);
562		cid.extend(self.server_id.0.iter());
563		cid.extend(rand::rng().random_iter::<u8>().take(self.nonce_len));
564		quinn::ConnectionId::new(cid.as_slice())
565	}
566
567	fn cid_len(&self) -> usize {
568		1 + self.server_id.len() + self.nonce_len
569	}
570
571	fn cid_lifetime(&self) -> Option<Duration> {
572		None
573	}
574}