1use crate::client::ClientConfig;
2use crate::server::{ServerConfig, ServerId};
3use crate::tls::{FingerprintVerifier, ServeCerts};
4use std::sync::{Arc, RwLock};
5use std::time::Duration;
6use std::{net, time};
7use url::Url;
8
9pub use web_transport_quinn;
10
11#[derive(Debug, thiserror::Error)]
13#[non_exhaustive]
14pub enum Error {
15 #[error("failed to bind UDP socket")]
16 BindSocket(#[source] std::io::Error),
17
18 #[error("failed to create QUIC endpoint")]
19 CreateEndpoint(#[source] std::io::Error),
20
21 #[error("no async runtime")]
22 NoRuntime,
23
24 #[error("failed to get local address")]
25 LocalAddr(#[source] std::io::Error),
26
27 #[error("failed to resolve bind address")]
28 ResolveBind(#[source] std::io::Error),
29
30 #[error("invalid DNS name")]
31 InvalidDnsName,
32
33 #[error("failed DNS lookup")]
34 DnsLookup(#[source] std::io::Error),
35
36 #[error("no DNS entries")]
37 NoDnsEntries,
38
39 #[error("failed to fetch fingerprint")]
40 FetchFingerprint(#[source] reqwest::Error),
41
42 #[error("fingerprint request failed")]
43 FingerprintStatus(#[source] reqwest::Error),
44
45 #[error("failed to read fingerprint")]
46 ReadFingerprint(#[source] reqwest::Error),
47
48 #[error("invalid fingerprint")]
49 InvalidFingerprint(#[from] hex::FromHexError),
50
51 #[error("url scheme must be 'https', 'moqt', or 'moql'")]
52 InvalidScheme,
53
54 #[error("unsupported URL scheme: {0}")]
55 UnsupportedScheme(String),
56
57 #[error("missing handshake data")]
58 MissingHandshake,
59
60 #[error("missing ALPN")]
61 MissingAlpn,
62
63 #[error("failed to decode ALPN")]
64 DecodeAlpn(#[from] std::string::FromUtf8Error),
65
66 #[error("unsupported ALPN: {0}")]
67 UnsupportedAlpn(String),
68
69 #[error("missing server name for raw QUIC connection")]
70 MissingServerName,
71
72 #[error("failed to construct URL from server name")]
73 BuildUrl(#[source] url::ParseError),
74
75 #[error("quic_lb_nonce must be at least 4")]
76 QuicLbNonceTooSmall,
77
78 #[error("connection ID length ({0}) exceeds maximum of 20")]
79 QuicLbCidTooLong(usize),
80
81 #[error("failed to build client certificate verifier")]
82 ClientVerifier(#[source] rustls::server::VerifierBuilderError),
83
84 #[error(transparent)]
85 NoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
86
87 #[error(transparent)]
88 Connect(#[from] quinn::ConnectError),
89
90 #[error(transparent)]
91 Connection(#[from] quinn::ConnectionError),
92
93 #[error(transparent)]
94 Client(#[from] web_transport_quinn::ClientError),
95
96 #[error(transparent)]
97 Server(#[from] web_transport_quinn::ServerError),
98
99 #[error("failed to establish QUIC connection")]
100 Establish(#[source] quinn::ConnectionError),
101
102 #[error("failed to receive WebTransport request")]
103 RecvRequest(#[source] web_transport_quinn::ServerError),
104
105 #[error(transparent)]
106 Tls(#[from] crate::tls::Error),
107}
108
109type Result<T> = std::result::Result<T, Error>;
110
111#[derive(Clone)]
114pub(crate) struct QuinnClient {
115 pub quic: quinn::Endpoint,
116 pub transport: Arc<quinn::TransportConfig>,
117 pub versions: moq_net::Versions,
118}
119
120impl QuinnClient {
121 pub fn new(config: &ClientConfig) -> Result<Self> {
122 let socket = std::net::UdpSocket::bind(config.bind).map_err(Error::BindSocket)?;
123
124 let mut transport = quinn::TransportConfig::default();
126 transport.max_idle_timeout(Some(time::Duration::from_secs(30).try_into().unwrap()));
127 transport.keep_alive_interval(Some(time::Duration::from_secs(5)));
128 transport.mtu_discovery_config(None); let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
131 let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
132 transport.max_concurrent_bidi_streams(max_streams);
133 transport.max_concurrent_uni_streams(max_streams);
134
135 let transport = Arc::new(transport);
136
137 let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
139 let endpoint_config = quinn::EndpointConfig::default();
140
141 let quic = quinn::Endpoint::new(endpoint_config, None, socket, runtime).map_err(Error::CreateEndpoint)?;
143
144 Ok(Self {
145 quic,
146 transport,
147 versions: config.versions(),
148 })
149 }
150
151 pub async fn connect(&self, tls: &rustls::ClientConfig, url: Url) -> Result<web_transport_quinn::Session> {
152 let mut url = url;
153 let mut config = tls.clone();
154
155 let host = url.host().ok_or(Error::InvalidDnsName)?.to_string();
156 let port = url.port().unwrap_or(443);
157
158 let local = self.quic.local_addr().map_err(Error::LocalAddr)?;
164 let addrs = tokio::net::lookup_host((host.clone(), port))
165 .await
166 .map_err(Error::DnsLookup)?;
167 let ip = crate::util::pick_addr(addrs, local).ok_or(Error::NoDnsEntries)?;
168
169 if url.scheme() == "http" {
170 let mut fingerprint = url.clone();
172 fingerprint.set_path("/certificate.sha256");
173 fingerprint.set_query(None);
174 fingerprint.set_fragment(None);
175
176 tracing::warn!(url = %fingerprint, "performing insecure HTTP request for certificate");
177
178 let resp = reqwest::get(fingerprint.as_str())
179 .await
180 .map_err(Error::FetchFingerprint)?
181 .error_for_status()
182 .map_err(Error::FingerprintStatus)?;
183
184 let fingerprint = resp.text().await.map_err(Error::ReadFingerprint)?;
185 let fingerprint = hex::decode(fingerprint.trim())?;
186
187 let verifier = FingerprintVerifier::new(config.crypto_provider().clone(), fingerprint);
188 config.dangerous().set_certificate_verifier(Arc::new(verifier));
189
190 url.set_scheme("https").expect("failed to set scheme");
191 }
192
193 let alpns: Vec<Vec<u8>> = match url.scheme() {
194 "https" => vec![web_transport_quinn::ALPN.as_bytes().to_vec()],
195 "moqt" | "moql" => self
196 .versions
197 .alpns()
198 .iter()
199 .map(|alpn| alpn.as_bytes().to_vec())
200 .collect(),
201 _ => return Err(Error::InvalidScheme),
202 };
203
204 config.alpn_protocols = alpns;
205 config.key_log = Arc::new(rustls::KeyLogFile::new());
206
207 let config: quinn::crypto::rustls::QuicClientConfig = config.try_into()?;
208 let mut config = quinn::ClientConfig::new(Arc::new(config));
209 config.transport_config(self.transport.clone());
210
211 tracing::debug!(%url, %ip, "connecting");
212
213 let connection = self.quic.connect_with(config, ip, &host)?.await?;
214 tracing::Span::current().record("id", connection.stable_id());
215
216 let mut request = web_transport_quinn::proto::ConnectRequest::new(url.clone());
217 for alpn in self.versions.alpns() {
218 request = request.with_protocol(alpn.to_string());
219 }
220
221 let session = match url.scheme() {
222 "https" => web_transport_quinn::Session::connect(connection, request).await?,
223 "moqt" | "moql" => {
224 let handshake = connection
225 .handshake_data()
226 .ok_or(Error::MissingHandshake)?
227 .downcast::<quinn::crypto::rustls::HandshakeData>()
228 .unwrap();
229
230 let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
231 let alpn = String::from_utf8(alpn)?;
232
233 let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
234 web_transport_quinn::Session::raw(connection, request, response)
235 }
236 _ => return Err(Error::UnsupportedScheme(url.scheme().to_string())),
237 };
238
239 Ok(session)
240 }
241}
242
243pub(crate) struct QuinnServer {
246 pub quic: quinn::Endpoint,
247 pub certs: Arc<ServeCerts>,
248}
249
250impl QuinnServer {
251 pub fn new(config: ServerConfig) -> Result<Self> {
252 let mut transport = quinn::TransportConfig::default();
255 transport.max_idle_timeout(Some(Duration::from_secs(30).try_into().unwrap()));
256 transport.keep_alive_interval(Some(Duration::from_secs(5)));
257 transport.mtu_discovery_config(None); let max_streams = config.max_streams.unwrap_or(crate::DEFAULT_MAX_STREAMS);
260 let max_streams = quinn::VarInt::from_u64(max_streams).unwrap_or(quinn::VarInt::MAX);
261 transport.max_concurrent_bidi_streams(max_streams);
262 transport.max_concurrent_uni_streams(max_streams);
263
264 let transport = Arc::new(transport);
265
266 let provider = crate::crypto::provider();
267
268 let certs = ServeCerts::new(provider.clone());
269 certs.load_certs(&config.tls)?;
270 let certs = Arc::new(certs);
271
272 let tls_builder = rustls::ServerConfig::builder_with_provider(provider.clone())
273 .with_protocol_versions(&[&rustls::version::TLS13])
274 .map_err(crate::tls::Error::from)?;
275
276 let mut tls = if config.tls.root.is_empty() {
277 tls_builder.with_no_client_auth().with_cert_resolver(certs.clone())
278 } else {
279 let roots = config.tls.load_roots()?;
280 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(Arc::new(roots), provider)
281 .allow_unauthenticated()
282 .build()
283 .map_err(Error::ClientVerifier)?;
284 tls_builder
285 .with_client_cert_verifier(verifier)
286 .with_cert_resolver(certs.clone())
287 };
288
289 let mut alpns: Vec<Vec<u8>> = config
291 .versions()
292 .alpns()
293 .iter()
294 .map(|alpn| alpn.as_bytes().to_vec())
295 .collect();
296 alpns.push(web_transport_quinn::ALPN.as_bytes().to_vec());
297
298 tls.alpn_protocols = alpns;
299 tls.key_log = Arc::new(rustls::KeyLogFile::new());
300
301 let tls: quinn::crypto::rustls::QuicServerConfig = tls.try_into()?;
302 let mut tls = quinn::ServerConfig::with_crypto(Arc::new(tls));
303 tls.transport_config(transport);
304
305 if let Some(addr) = config.preferred_v4 {
308 tls.preferred_address_v4(Some(addr));
309 }
310 if let Some(addr) = config.preferred_v6 {
311 tls.preferred_address_v6(Some(addr));
312 }
313
314 let runtime = quinn::default_runtime().ok_or(Error::NoRuntime)?;
316
317 let listen =
318 crate::util::resolve(config.bind.as_deref(), crate::server::DEFAULT_BIND).map_err(Error::ResolveBind)?;
319
320 let mut endpoint_config = quinn::EndpointConfig::default();
322 if let Some(server_id) = config.quic_lb_id {
323 let nonce_len = config.quic_lb_nonce.unwrap_or(8);
324 if nonce_len < 4 {
325 return Err(Error::QuicLbNonceTooSmall);
326 }
327
328 let cid_len = 1 + server_id.len() + nonce_len;
329 if cid_len > 20 {
330 return Err(Error::QuicLbCidTooLong(cid_len));
331 }
332
333 tracing::info!(
334 ?server_id,
335 nonce_len,
336 "using QUIC-LB compatible connection ID generation"
337 );
338 endpoint_config.cid_generator(move || Box::new(ServerIdGenerator::new(server_id.clone(), nonce_len)));
339 }
340
341 let socket = std::net::UdpSocket::bind(listen).map_err(Error::BindSocket)?;
342
343 let quic = quinn::Endpoint::new(endpoint_config, Some(tls), socket, runtime).map_err(Error::CreateEndpoint)?;
345
346 tokio::spawn(crate::tls::reload_certs(certs.clone(), config.tls.clone()));
349
350 Ok(Self { quic, certs })
351 }
352
353 pub fn accept(&self) -> impl std::future::Future<Output = Option<quinn::Incoming>> + '_ {
354 self.quic.accept()
355 }
356
357 pub fn tls_info(&self) -> Arc<RwLock<crate::tls::Info>> {
358 self.certs.info.clone()
359 }
360
361 pub fn local_addr(&self) -> Result<net::SocketAddr> {
362 self.quic.local_addr().map_err(Error::LocalAddr)
363 }
364
365 pub fn close(&self) {
366 self.quic.close(quinn::VarInt::from_u32(0), b"server shutdown");
367 }
368}
369
370pub(crate) enum QuinnRequest {
374 Raw {
375 request: web_transport_quinn::proto::ConnectRequest,
376 response: web_transport_quinn::proto::ConnectResponse,
377 connection: quinn::Connection,
378 },
379 WebTransport {
380 request: web_transport_quinn::Request,
381 alpns: Vec<&'static str>,
382 },
383}
384
385impl QuinnRequest {
386 pub async fn accept(conn: quinn::Incoming, alpns: Vec<&'static str>) -> Result<Self> {
387 let mut conn = conn.accept()?;
388
389 let handshake = conn
390 .handshake_data()
391 .await?
392 .downcast::<quinn::crypto::rustls::HandshakeData>()
393 .unwrap();
394
395 let alpn = handshake.protocol.ok_or(Error::MissingAlpn)?;
396 let alpn = String::from_utf8(alpn)?;
397 let host = handshake.server_name.unwrap_or_default();
398
399 tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepting");
400
401 let conn = conn.await.map_err(Error::Establish)?;
403
404 let span = tracing::Span::current();
405 span.record("id", conn.stable_id()); tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepted");
407
408 match alpn.as_str() {
409 web_transport_quinn::ALPN => {
410 let request = web_transport_quinn::Request::accept(conn)
412 .await
413 .map_err(Error::RecvRequest)?;
414 Ok(Self::WebTransport { request, alpns })
415 }
416 alpn if moq_net::ALPNS.contains(&alpn) => {
417 if host.is_empty() {
418 return Err(Error::MissingServerName);
419 }
420 let host_str = if host.contains(':') {
421 format!("[{}]", host)
422 } else {
423 host.clone()
424 };
425 let url = format!("moqt://{}", host_str).parse::<Url>().map_err(Error::BuildUrl)?;
426 let request = web_transport_quinn::proto::ConnectRequest::new(url);
427 let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn);
428 Ok(Self::Raw {
429 connection: conn,
430 request,
431 response,
432 })
433 }
434 _ => Err(Error::UnsupportedAlpn(alpn)),
435 }
436 }
437
438 pub async fn ok(self) -> std::result::Result<web_transport_quinn::Session, web_transport_quinn::ServerError> {
440 match self {
441 QuinnRequest::Raw {
442 connection,
443 request,
444 response,
445 } => Ok(web_transport_quinn::Session::raw(connection, request, response)),
446 QuinnRequest::WebTransport { request, alpns } => {
447 let mut response = web_transport_quinn::proto::ConnectResponse::OK;
448 if let Some(protocol) = request.protocols.iter().find(|p| alpns.contains(&p.as_str())) {
454 response = response.with_protocol(protocol);
455 }
456 request.respond(response).await
457 }
458 }
459 }
460
461 pub fn url(&self) -> Option<&Url> {
463 match self {
464 QuinnRequest::Raw { .. } => None,
465 QuinnRequest::WebTransport { request, .. } => Some(&request.url),
466 }
467 }
468
469 pub fn has_peer_certificate(&self) -> bool {
472 let conn = match self {
473 QuinnRequest::Raw { connection, .. } => connection,
474 QuinnRequest::WebTransport { request, .. } => request.conn(),
475 };
476 conn.peer_identity().is_some()
477 }
478
479 pub async fn close(
481 self,
482 status: web_transport_quinn::http::StatusCode,
483 ) -> std::result::Result<(), web_transport_quinn::ServerError> {
484 match self {
485 QuinnRequest::Raw { connection, .. } => {
486 connection.close(status.as_u16().into(), status.as_str().as_bytes());
487 Ok(())
488 }
489 QuinnRequest::WebTransport { request, alpns: _, .. } => request.reject(status).await,
490 }
491 }
492}
493
494struct ServerIdGenerator {
497 server_id: ServerId,
498 nonce_len: usize,
499}
500
501impl ServerIdGenerator {
502 fn new(server_id: ServerId, nonce_len: usize) -> Self {
503 Self { server_id, nonce_len }
504 }
505}
506
507impl quinn::ConnectionIdGenerator for ServerIdGenerator {
508 fn generate_cid(&mut self) -> quinn::ConnectionId {
509 use rand::RngExt;
510 let cid_len = self.cid_len();
511 let mut cid = Vec::with_capacity(cid_len);
512 cid.push((cid_len - 1) as u8);
514 cid.extend(self.server_id.0.iter());
515 cid.extend(rand::rng().random_iter::<u8>().take(self.nonce_len));
516 quinn::ConnectionId::new(cid.as_slice())
517 }
518
519 fn cid_len(&self) -> usize {
520 1 + self.server_id.len() + self.nonce_len
521 }
522
523 fn cid_lifetime(&self) -> Option<Duration> {
524 None
525 }
526}