1use crate::client::ClientConfig;
2use crate::server::{ServerConfig, ServerId};
3use crate::tls::{FingerprintVerifier, ServeCerts};
4use std::sync::{Arc, RwLock};
5use std::time::Duration;
6use std::{net, time};
7use url::Url;
8
9pub use web_transport_quinn;
10
11#[derive(Debug, thiserror::Error)]
13#[non_exhaustive]
14pub enum Error {
15 #[error("failed to bind UDP socket")]
16 BindSocket(#[source] std::io::Error),
17
18 #[error("failed to create QUIC endpoint")]
19 CreateEndpoint(#[source] std::io::Error),
20
21 #[error("no async runtime")]
22 NoRuntime,
23
24 #[error("failed to get local address")]
25 LocalAddr(#[source] std::io::Error),
26
27 #[error("failed to resolve bind address")]
28 ResolveBind(#[source] std::io::Error),
29
30 #[error("invalid DNS name")]
31 InvalidDnsName,
32
33 #[error("failed DNS lookup")]
34 DnsLookup(#[source] std::io::Error),
35
36 #[error("no DNS entries")]
37 NoDnsEntries,
38
39 #[error("failed to fetch fingerprint")]
40 FetchFingerprint(#[source] reqwest::Error),
41
42 #[error("fingerprint request failed")]
43 FingerprintStatus(#[source] reqwest::Error),
44
45 #[error("failed to read fingerprint")]
46 ReadFingerprint(#[source] reqwest::Error),
47
48 #[error("invalid fingerprint")]
49 InvalidFingerprint(#[from] hex::FromHexError),
50
51 #[error("url scheme must be 'https', 'moqt', or 'moql'")]
52 InvalidScheme,
53
54 #[error("unsupported URL scheme: {0}")]
55 UnsupportedScheme(String),
56
57 #[error("missing handshake data")]
58 MissingHandshake,
59
60 #[error("missing ALPN")]
61 MissingAlpn,
62
63 #[error("failed to decode ALPN")]
64 DecodeAlpn(#[from] std::string::FromUtf8Error),
65
66 #[error("unsupported ALPN: {0}")]
67 UnsupportedAlpn(String),
68
69 #[error("missing server name for raw QUIC connection")]
70 MissingServerName,
71
72 #[error("failed to construct URL from server name")]
73 BuildUrl(#[source] url::ParseError),
74
75 #[error("quic_lb_nonce must be at least 4")]
76 QuicLbNonceTooSmall,
77
78 #[error("connection ID length ({0}) exceeds maximum of 20")]
79 QuicLbCidTooLong(usize),
80
81 #[error("failed to build client certificate verifier")]
82 ClientVerifier(#[source] rustls::server::VerifierBuilderError),
83
84 #[error(transparent)]
85 NoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
86
87 #[error(transparent)]
88 Connect(#[from] quinn::ConnectError),
89
90 #[error(transparent)]
91 Connection(#[from] quinn::ConnectionError),
92
93 #[error(transparent)]
94 Client(#[from] web_transport_quinn::ClientError),
95
96 #[error(transparent)]
97 ConnectRejected(#[from] crate::ConnectError),
98
99 #[error(transparent)]
100 Server(#[from] web_transport_quinn::ServerError),
101
102 #[error("failed to establish QUIC connection")]
103 Establish(#[source] quinn::ConnectionError),
104
105 #[error("failed to receive WebTransport request")]
106 RecvRequest(#[source] web_transport_quinn::ServerError),
107
108 #[error(transparent)]
109 Tls(#[from] crate::tls::Error),
110}
111
112type Result<T> = std::result::Result<T, Error>;
113
114#[derive(Clone)]
117pub(crate) struct QuinnClient {
118 pub quic: quinn::Endpoint,
119 pub transport: Arc<quinn::TransportConfig>,
120 pub versions: moq_net::Versions,
121}
122
123impl QuinnClient {
124 pub fn new(config: &ClientConfig) -> Result<Self> {
125 let socket = crate::bind::udp(config.bind).map_err(Error::BindSocket)?;
126
127 let mut transport = quinn::TransportConfig::default();
129 transport.max_idle_timeout(Some(time::Duration::from_secs(30).try_into().unwrap()));
130 transport.keep_alive_interval(Some(time::Duration::from_secs(5)));
131 transport.mtu_discovery_config(None); let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
134 let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
135 transport.max_concurrent_bidi_streams(max_streams);
136 transport.max_concurrent_uni_streams(max_streams);
137
138 let transport = Arc::new(transport);
139
140 let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
142 let endpoint_config = quinn::EndpointConfig::default();
143
144 let quic = quinn::Endpoint::new(endpoint_config, None, socket, runtime).map_err(Error::CreateEndpoint)?;
146
147 Ok(Self {
148 quic,
149 transport,
150 versions: config.versions(),
151 })
152 }
153
154 pub async fn connect(&self, tls: &rustls::ClientConfig, url: Url) -> Result<web_transport_quinn::Session> {
155 let mut url = url;
156 let mut config = tls.clone();
157
158 let host = url.host().ok_or(Error::InvalidDnsName)?.to_string();
159 let port = url.port().unwrap_or(443);
160
161 let local = self.quic.local_addr().map_err(Error::LocalAddr)?;
167 let addrs = tokio::net::lookup_host((host.clone(), port))
168 .await
169 .map_err(Error::DnsLookup)?;
170 let ip = crate::util::pick_addr(addrs, local).ok_or(Error::NoDnsEntries)?;
171
172 if url.scheme() == "http" {
173 let mut fingerprint = url.clone();
175 fingerprint.set_path("/certificate.sha256");
176 fingerprint.set_query(None);
177 fingerprint.set_fragment(None);
178
179 tracing::warn!(url = %fingerprint, "performing insecure HTTP request for certificate");
180
181 let resp = reqwest::get(fingerprint.as_str())
182 .await
183 .map_err(Error::FetchFingerprint)?
184 .error_for_status()
185 .map_err(Error::FingerprintStatus)?;
186
187 let fingerprint = resp.text().await.map_err(Error::ReadFingerprint)?;
188 let fingerprint = hex::decode(fingerprint.trim())?;
189
190 let verifier = FingerprintVerifier::new(config.crypto_provider().clone(), vec![fingerprint]);
191 config.dangerous().set_certificate_verifier(Arc::new(verifier));
192
193 url.set_scheme("https").expect("failed to set scheme");
194 }
195
196 let alpns: Vec<Vec<u8>> = match url.scheme() {
197 "https" => vec![web_transport_quinn::ALPN.as_bytes().to_vec()],
198 "moqt" | "moql" => self
199 .versions
200 .alpns()
201 .iter()
202 .map(|alpn| alpn.as_bytes().to_vec())
203 .collect(),
204 _ => return Err(Error::InvalidScheme),
205 };
206
207 config.alpn_protocols = alpns;
208 config.key_log = Arc::new(rustls::KeyLogFile::new());
209
210 let config: quinn::crypto::rustls::QuicClientConfig = config.try_into()?;
211 let mut config = quinn::ClientConfig::new(Arc::new(config));
212 config.transport_config(self.transport.clone());
213
214 tracing::debug!(%url, %ip, "connecting");
215
216 let connection = self.quic.connect_with(config, ip, &host)?.await?;
217 tracing::Span::current().record("id", connection.stable_id());
218
219 let mut request = web_transport_quinn::proto::ConnectRequest::new(url.clone());
220 for alpn in self.versions.alpns() {
221 request = request.with_protocol(alpn.to_string());
222 }
223
224 let session = match url.scheme() {
225 "https" => web_transport_quinn::Session::connect(connection, request)
226 .await
227 .map_err(map_client_error)?,
228 "moqt" | "moql" => {
229 let handshake = connection
230 .handshake_data()
231 .ok_or(Error::MissingHandshake)?
232 .downcast::<quinn::crypto::rustls::HandshakeData>()
233 .unwrap();
234
235 let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
236 let alpn = String::from_utf8(alpn)?;
237
238 let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
239 web_transport_quinn::Session::raw(connection, request, response)
240 }
241 _ => return Err(Error::UnsupportedScheme(url.scheme().to_string())),
242 };
243
244 Ok(session)
245 }
246}
247
248impl Error {
249 pub(crate) fn connect_error(&self) -> Option<crate::ConnectError> {
250 match self {
251 Self::ConnectRejected(err) => Some(*err),
252 Self::Client(err) => classify_client_error(err),
253 _ => None,
254 }
255 }
256}
257
258fn map_client_error(err: web_transport_quinn::ClientError) -> Error {
259 if let Some(err) = classify_client_error(&err) {
260 return err.into();
261 }
262
263 err.into()
264}
265
266fn classify_client_error(err: &web_transport_quinn::ClientError) -> Option<crate::ConnectError> {
267 match err {
268 web_transport_quinn::ClientError::HttpError(err) => classify_connect_error(err),
269 _ => None,
270 }
271}
272
273fn classify_connect_error(err: &web_transport_quinn::ConnectError) -> Option<crate::ConnectError> {
274 match err {
275 web_transport_quinn::ConnectError::ErrorStatus(status) => crate::ConnectError::from_status_u16(status.as_u16()),
276 web_transport_quinn::ConnectError::ProtoError(err) => classify_proto_error(err),
277 _ => None,
278 }
279}
280
281fn classify_proto_error(err: &web_transport_quinn::proto::ConnectError) -> Option<crate::ConnectError> {
282 match err {
283 web_transport_quinn::proto::ConnectError::ErrorStatus(status)
284 | web_transport_quinn::proto::ConnectError::WrongStatus(Some(status)) => {
285 crate::ConnectError::from_status_u16(status.as_u16())
286 }
287 _ => None,
288 }
289}
290
291pub(crate) struct QuinnServer {
294 pub quic: quinn::Endpoint,
295 pub certs: Arc<ServeCerts>,
296}
297
298impl QuinnServer {
299 pub fn new(config: ServerConfig) -> Result<Self> {
300 let mut transport = quinn::TransportConfig::default();
303 transport.max_idle_timeout(Some(Duration::from_secs(30).try_into().unwrap()));
304 transport.keep_alive_interval(Some(Duration::from_secs(5)));
305 transport.mtu_discovery_config(None); let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
308 let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
309 transport.max_concurrent_bidi_streams(max_streams);
310 transport.max_concurrent_uni_streams(max_streams);
311
312 let transport = Arc::new(transport);
313
314 let provider = crate::crypto::provider();
315
316 let certs = ServeCerts::new(provider.clone());
317 certs.load_certs(&config.tls)?;
318 let certs = Arc::new(certs);
319
320 let tls_builder = rustls::ServerConfig::builder_with_provider(provider.clone())
321 .with_protocol_versions(&[&rustls::version::TLS13])
322 .map_err(crate::tls::Error::from)?;
323
324 let mut tls = if config.tls.root.is_empty() {
325 tls_builder.with_no_client_auth().with_cert_resolver(certs.clone())
326 } else {
327 let roots = config.tls.load_roots()?;
328 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(Arc::new(roots), provider)
329 .allow_unauthenticated()
330 .build()
331 .map_err(Error::ClientVerifier)?;
332 tls_builder
333 .with_client_cert_verifier(verifier)
334 .with_cert_resolver(certs.clone())
335 };
336
337 let mut alpns: Vec<Vec<u8>> = config
339 .versions()
340 .alpns()
341 .iter()
342 .map(|alpn| alpn.as_bytes().to_vec())
343 .collect();
344 alpns.push(web_transport_quinn::ALPN.as_bytes().to_vec());
345
346 tls.alpn_protocols = alpns;
347 tls.key_log = Arc::new(rustls::KeyLogFile::new());
348
349 let tls: quinn::crypto::rustls::QuicServerConfig = tls.try_into()?;
350 let mut tls = quinn::ServerConfig::with_crypto(Arc::new(tls));
351 tls.transport_config(transport);
352
353 if let Some(addr) = config.preferred_v4 {
356 tls.preferred_address_v4(Some(addr));
357 }
358 if let Some(addr) = config.preferred_v6 {
359 tls.preferred_address_v6(Some(addr));
360 }
361
362 let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
364
365 let listen =
366 crate::util::resolve(config.bind.as_deref(), crate::server::DEFAULT_BIND).map_err(Error::ResolveBind)?;
367
368 let mut endpoint_config = quinn::EndpointConfig::default();
370 if let Some(server_id) = config.quic_lb_id {
371 let nonce_len = config.quic_lb_nonce.unwrap_or(8);
372 if nonce_len < 4 {
373 return Err(Error::QuicLbNonceTooSmall);
374 }
375
376 let cid_len = 1 + server_id.len() + nonce_len;
377 if cid_len > 20 {
378 return Err(Error::QuicLbCidTooLong(cid_len));
379 }
380
381 tracing::info!(
382 ?server_id,
383 nonce_len,
384 "using QUIC-LB compatible connection ID generation"
385 );
386 endpoint_config.cid_generator(move || Box::new(ServerIdGenerator::new(server_id.clone(), nonce_len)));
387 }
388
389 let socket = crate::bind::udp(listen).map_err(Error::BindSocket)?;
390
391 let quic = quinn::Endpoint::new(endpoint_config, Some(tls), socket, runtime).map_err(Error::CreateEndpoint)?;
393
394 tokio::spawn(crate::tls::reload_certs(certs.clone(), config.tls.clone()));
397
398 Ok(Self { quic, certs })
399 }
400
401 pub fn accept(&self) -> impl std::future::Future<Output = Option<quinn::Incoming>> + '_ {
402 self.quic.accept()
403 }
404
405 pub fn tls_info(&self) -> Arc<RwLock<crate::tls::Info>> {
406 self.certs.info.clone()
407 }
408
409 pub fn local_addr(&self) -> Result<net::SocketAddr> {
410 self.quic.local_addr().map_err(Error::LocalAddr)
411 }
412
413 pub fn close(&self) {
414 self.quic.close(quinn::VarInt::from_u32(0), b"server shutdown");
415 }
416}
417
418pub(crate) enum QuinnRequest {
422 Raw {
423 request: web_transport_quinn::proto::ConnectRequest,
424 response: web_transport_quinn::proto::ConnectResponse,
425 connection: quinn::Connection,
426 },
427 WebTransport {
428 request: web_transport_quinn::Request,
429 alpns: Vec<&'static str>,
430 },
431}
432
433impl QuinnRequest {
434 pub async fn accept(conn: quinn::Incoming, alpns: Vec<&'static str>) -> Result<Self> {
435 let mut conn = conn.accept()?;
436
437 let handshake = conn
438 .handshake_data()
439 .await?
440 .downcast::<quinn::crypto::rustls::HandshakeData>()
441 .unwrap();
442
443 let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
444 let alpn = String::from_utf8(alpn)?;
445 let host = handshake.server_name.unwrap_or_default();
446
447 tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepting");
448
449 let conn = conn.await.map_err(Error::Establish)?;
451
452 let span = tracing::Span::current();
453 span.record("id", conn.stable_id()); tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepted");
455
456 match alpn.as_str() {
457 web_transport_quinn::ALPN => {
458 let request = web_transport_quinn::Request::accept(conn)
460 .await
461 .map_err(Error::RecvRequest)?;
462 Ok(Self::WebTransport { request, alpns })
463 }
464 alpn if moq_net::ALPNS.contains(&alpn) => {
465 if host.is_empty() {
466 return Err(Error::MissingServerName);
467 }
468 let host_str = if host.contains(':') {
469 format!("[{}]", host)
470 } else {
471 host.clone()
472 };
473 let url = format!("moqt://{}", host_str).parse::<Url>().map_err(Error::BuildUrl)?;
474 let request = web_transport_quinn::proto::ConnectRequest::new(url);
475 let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
476 Ok(Self::Raw {
477 connection: conn,
478 request,
479 response,
480 })
481 }
482 _ => Err(Error::UnsupportedAlpn(alpn)),
483 }
484 }
485
486 pub async fn ok(self) -> std::result::Result<web_transport_quinn::Session, web_transport_quinn::ServerError> {
488 match self {
489 QuinnRequest::Raw {
490 connection,
491 request,
492 response,
493 } => Ok(web_transport_quinn::Session::raw(connection, request, response)),
494 QuinnRequest::WebTransport { request, alpns } => {
495 let mut response = web_transport_quinn::proto::ConnectResponse::OK;
496 if let Some(protocol) = request.protocols.iter().find(|p| alpns.contains(&p.as_str())) {
502 response = response.with_protocol(protocol);
503 }
504 request.respond(response).await
505 }
506 }
507 }
508
509 pub fn url(&self) -> Option<&Url> {
511 match self {
512 QuinnRequest::Raw { .. } => None,
513 QuinnRequest::WebTransport { request, .. } => Some(&request.url),
514 }
515 }
516
517 pub fn peer_identity(&self) -> Option<crate::tls::PeerIdentity> {
520 let conn = match self {
521 QuinnRequest::Raw { connection, .. } => connection,
522 QuinnRequest::WebTransport { request, .. } => request.conn(),
523 };
524 crate::tls::PeerIdentity::from_any(conn.peer_identity())
525 }
526
527 pub async fn close(
529 self,
530 status: web_transport_quinn::http::StatusCode,
531 ) -> std::result::Result<(), web_transport_quinn::ServerError> {
532 match self {
533 QuinnRequest::Raw { connection, .. } => {
534 connection.close(status.as_u16().into(), status.as_str().as_bytes());
535 Ok(())
536 }
537 QuinnRequest::WebTransport { request, alpns: _, .. } => request.reject(status).await,
538 }
539 }
540}
541
542struct ServerIdGenerator {
545 server_id: ServerId,
546 nonce_len: usize,
547}
548
549impl ServerIdGenerator {
550 fn new(server_id: ServerId, nonce_len: usize) -> Self {
551 Self { server_id, nonce_len }
552 }
553}
554
555impl quinn::ConnectionIdGenerator for ServerIdGenerator {
556 fn generate_cid(&mut self) -> quinn::ConnectionId {
557 use rand::RngExt;
558 let cid_len = self.cid_len();
559 let mut cid = Vec::with_capacity(cid_len);
560 cid.push((cid_len - 1) as u8);
562 cid.extend(self.server_id.0.iter());
563 cid.extend(rand::rng().random_iter::<u8>().take(self.nonce_len));
564 quinn::ConnectionId::new(cid.as_slice())
565 }
566
567 fn cid_len(&self) -> usize {
568 1 + self.server_id.len() + self.nonce_len
569 }
570
571 fn cid_lifetime(&self) -> Option<Duration> {
572 None
573 }
574}