1use crate::crypto;
2use crate::{Backoff, QuicBackend, Reconnect};
3use anyhow::Context;
4use std::path::PathBuf;
5use std::{net, sync::Arc};
6use url::Url;
7
8#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
10#[serde(default, deny_unknown_fields)]
11#[non_exhaustive]
12pub struct ClientTls {
13 #[serde(skip_serializing_if = "Vec::is_empty")]
18 #[arg(id = "tls-root", long = "tls-root", env = "MOQ_CLIENT_TLS_ROOT")]
19 pub root: Vec<PathBuf>,
20
21 #[serde(skip_serializing_if = "Option::is_none")]
27 #[arg(
28 id = "client-tls-identity",
29 long = "client-tls-identity",
30 env = "MOQ_CLIENT_TLS_IDENTITY"
31 )]
32 pub identity: Option<PathBuf>,
33
34 #[serde(skip_serializing_if = "Option::is_none")]
38 #[arg(
39 id = "tls-disable-verify",
40 long = "tls-disable-verify",
41 env = "MOQ_CLIENT_TLS_DISABLE_VERIFY",
42 default_missing_value = "true",
43 num_args = 0..=1,
44 require_equals = true,
45 value_parser = clap::value_parser!(bool),
46 )]
47 pub disable_verify: Option<bool>,
48}
49
50#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
52#[serde(deny_unknown_fields, default)]
53#[non_exhaustive]
54pub struct ClientConfig {
55 #[arg(
57 id = "client-bind",
58 long = "client-bind",
59 default_value = "[::]:0",
60 env = "MOQ_CLIENT_BIND"
61 )]
62 pub bind: net::SocketAddr,
63
64 #[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
67 pub backend: Option<QuicBackend>,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
71 #[arg(
72 id = "client-max-streams",
73 long = "client-max-streams",
74 env = "MOQ_CLIENT_MAX_STREAMS"
75 )]
76 pub max_streams: Option<u64>,
77
78 #[serde(default, skip_serializing_if = "Vec::is_empty")]
86 #[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
87 pub version: Vec<moq_lite::Version>,
88
89 #[command(flatten)]
90 #[serde(default)]
91 pub tls: ClientTls,
92
93 #[command(flatten)]
94 #[serde(default)]
95 pub backoff: Backoff,
96
97 #[cfg(feature = "websocket")]
98 #[command(flatten)]
99 #[serde(default)]
100 pub websocket: super::ClientWebSocket,
101}
102
103impl ClientTls {
104 pub fn build(&self) -> anyhow::Result<rustls::ClientConfig> {
110 use rustls::pki_types::CertificateDer;
111
112 let provider = crypto::provider();
113
114 let mut roots = rustls::RootCertStore::empty();
115 if self.root.is_empty() {
116 let native = rustls_native_certs::load_native_certs();
117 for err in native.errors {
118 tracing::warn!(%err, "failed to load root cert");
119 }
120 for cert in native.certs {
121 roots.add(cert).context("failed to add root cert")?;
122 }
123 } else {
124 for root in &self.root {
125 let file = std::fs::File::open(root).context("failed to open root cert file")?;
126 let mut reader = std::io::BufReader::new(file);
127 let cert = rustls_pemfile::certs(&mut reader)
128 .next()
129 .context("no roots found")?
130 .context("failed to read root cert")?;
131 roots.add(cert).context("failed to add root cert")?;
132 }
133 }
134
135 let builder = rustls::ClientConfig::builder_with_provider(provider.clone())
138 .with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
139 .with_root_certificates(roots);
140
141 let mut tls = match &self.identity {
142 Some(path) => {
143 let pem = std::fs::read(path).context("failed to read client identity")?;
144 let chain: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut pem.as_slice())
145 .collect::<Result<_, _>>()
146 .context("failed to parse client identity certs")?;
147 anyhow::ensure!(!chain.is_empty(), "no certificates found in client identity");
148 let key = rustls_pemfile::private_key(&mut pem.as_slice())
149 .context("failed to parse client identity key")?
150 .context("no private key found in client identity")?;
151 builder
152 .with_client_auth_cert(chain, key)
153 .context("failed to configure client certificate")?
154 }
155 None => builder.with_no_client_auth(),
156 };
157
158 if self.disable_verify.unwrap_or_default() {
159 tracing::warn!("TLS server certificate verification is disabled; A man-in-the-middle attack is possible.");
160 let noop = NoCertificateVerification(provider);
161 tls.dangerous().set_certificate_verifier(Arc::new(noop));
162 }
163
164 Ok(tls)
165 }
166
167 pub fn identity_dns_name(&self) -> anyhow::Result<Option<String>> {
174 use rustls::pki_types::CertificateDer;
175
176 let Some(path) = self.identity.as_ref() else {
177 return Ok(None);
178 };
179 let pem = std::fs::read(path).context("failed to read client identity")?;
180 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut pem.as_slice())
181 .collect::<Result<_, _>>()
182 .context("failed to parse client identity certs")?;
183 let leaf = certs.first().context("no certificates found")?;
184 let (_, cert) =
185 x509_parser::parse_x509_certificate(leaf.as_ref()).context("failed to parse identity certificate")?;
186 let san = cert
187 .subject_alternative_name()
188 .context("failed to read subject alternative name extension")?
189 .and_then(|san| {
190 san.value.general_names.iter().find_map(|name| match name {
191 x509_parser::extensions::GeneralName::DNSName(n) => Some((*n).to_string()),
192 _ => None,
193 })
194 });
195 Ok(san)
196 }
197}
198
199impl ClientConfig {
200 pub fn init(self) -> anyhow::Result<Client> {
201 Client::new(self)
202 }
203
204 pub fn versions(&self) -> moq_lite::Versions {
206 if self.version.is_empty() {
207 moq_lite::Versions::all()
208 } else {
209 moq_lite::Versions::from(self.version.clone())
210 }
211 }
212}
213
214impl Default for ClientConfig {
215 fn default() -> Self {
216 Self {
217 bind: "[::]:0".parse().unwrap(),
218 backend: None,
219 max_streams: None,
220 version: Vec::new(),
221 tls: ClientTls::default(),
222 backoff: Backoff::default(),
223 #[cfg(feature = "websocket")]
224 websocket: super::ClientWebSocket::default(),
225 }
226 }
227}
228
229#[derive(Clone)]
233pub struct Client {
234 moq: moq_lite::Client,
235 versions: moq_lite::Versions,
236 backoff: Backoff,
237 #[cfg(feature = "websocket")]
238 websocket: super::ClientWebSocket,
239 tls: rustls::ClientConfig,
240 #[cfg(feature = "noq")]
241 noq: Option<crate::noq::NoqClient>,
242 #[cfg(feature = "quinn")]
243 quinn: Option<crate::quinn::QuinnClient>,
244 #[cfg(feature = "quiche")]
245 quiche: Option<crate::quiche::QuicheClient>,
246 #[cfg(feature = "iroh")]
247 iroh: Option<web_transport_iroh::iroh::Endpoint>,
248 #[cfg(feature = "iroh")]
249 iroh_addrs: Vec<std::net::SocketAddr>,
250}
251
252impl Client {
253 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
254 pub fn new(_config: ClientConfig) -> anyhow::Result<Self> {
255 anyhow::bail!("no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature");
256 }
257
258 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
260 pub fn new(config: ClientConfig) -> anyhow::Result<Self> {
261 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
262 let backend = config.backend.clone().unwrap_or({
263 #[cfg(feature = "quinn")]
264 {
265 QuicBackend::Quinn
266 }
267 #[cfg(all(feature = "noq", not(feature = "quinn")))]
268 {
269 QuicBackend::Noq
270 }
271 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
272 {
273 QuicBackend::Quiche
274 }
275 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
276 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
277 });
278
279 let tls = config.tls.build()?;
280
281 #[cfg(feature = "noq")]
282 #[allow(unreachable_patterns)]
283 let noq = match backend {
284 QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
285 _ => None,
286 };
287
288 #[cfg(feature = "quinn")]
289 #[allow(unreachable_patterns)]
290 let quinn = match backend {
291 QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
292 _ => None,
293 };
294
295 #[cfg(feature = "quiche")]
296 let quiche = match backend {
297 QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
298 _ => None,
299 };
300
301 let versions = config.versions();
302 Ok(Self {
303 moq: moq_lite::Client::new().with_versions(versions.clone()),
304 versions,
305 backoff: config.backoff,
306 #[cfg(feature = "websocket")]
307 websocket: config.websocket,
308 tls,
309 #[cfg(feature = "noq")]
310 noq,
311 #[cfg(feature = "quinn")]
312 quinn,
313 #[cfg(feature = "quiche")]
314 quiche,
315 #[cfg(feature = "iroh")]
316 iroh: None,
317 #[cfg(feature = "iroh")]
318 iroh_addrs: Vec::new(),
319 })
320 }
321
322 #[cfg(feature = "iroh")]
323 pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
324 self.iroh = iroh;
325 self
326 }
327
328 #[cfg(feature = "iroh")]
333 pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
334 self.iroh_addrs = addrs;
335 self
336 }
337
338 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
339 self.moq = self.moq.with_publish(publish);
340 self
341 }
342
343 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
344 self.moq = self.moq.with_consume(consume);
345 self
346 }
347
348 pub fn reconnect(&self, url: Url) -> Reconnect {
353 Reconnect::new(self.clone(), url, self.backoff.clone())
354 }
355
356 #[cfg(not(any(
357 feature = "noq",
358 feature = "quinn",
359 feature = "quiche",
360 feature = "iroh",
361 feature = "websocket"
362 )))]
363 pub async fn connect(&self, _url: Url) -> anyhow::Result<moq_lite::Session> {
364 anyhow::bail!("no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature");
365 }
366
367 #[cfg(any(
368 feature = "noq",
369 feature = "quinn",
370 feature = "quiche",
371 feature = "iroh",
372 feature = "websocket"
373 ))]
374 pub async fn connect(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
375 let session = self.connect_inner(url).await?;
376 tracing::info!(version = %session.version(), "connected");
377 Ok(session)
378 }
379
380 #[cfg(any(
381 feature = "noq",
382 feature = "quinn",
383 feature = "quiche",
384 feature = "iroh",
385 feature = "websocket"
386 ))]
387 async fn connect_inner(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
388 #[cfg(feature = "iroh")]
389 if url.scheme() == "iroh" {
390 let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
391 let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
392 let session = self.moq.connect(session).await?;
393 return Ok(session);
394 }
395
396 #[cfg(feature = "noq")]
397 if let Some(noq) = self.noq.as_ref() {
398 let tls = self.tls.clone();
399 let quic_url = url.clone();
400 let quic_handle = async {
401 let res = noq.connect(&tls, quic_url).await;
402 if let Err(err) = &res {
403 tracing::warn!(%err, "QUIC connection failed");
404 }
405 res
406 };
407
408 #[cfg(feature = "websocket")]
409 {
410 let alpns = self.versions.alpns();
411 let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
412
413 return Ok(tokio::select! {
414 Ok(quic) = quic_handle => self.moq.connect(quic).await?,
415 Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
416 else => anyhow::bail!("failed to connect to server"),
417 });
418 }
419
420 #[cfg(not(feature = "websocket"))]
421 {
422 let session = quic_handle.await?;
423 return Ok(self.moq.connect(session).await?);
424 }
425 }
426
427 #[cfg(feature = "quinn")]
428 if let Some(quinn) = self.quinn.as_ref() {
429 let tls = self.tls.clone();
430 let quic_url = url.clone();
431 let quic_handle = async {
432 let res = quinn.connect(&tls, quic_url).await;
433 if let Err(err) = &res {
434 tracing::warn!(%err, "QUIC connection failed");
435 }
436 res
437 };
438
439 #[cfg(feature = "websocket")]
440 {
441 let alpns = self.versions.alpns();
442 let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
443
444 return Ok(tokio::select! {
445 Ok(quic) = quic_handle => self.moq.connect(quic).await?,
446 Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
447 else => anyhow::bail!("failed to connect to server"),
448 });
449 }
450
451 #[cfg(not(feature = "websocket"))]
452 {
453 let session = quic_handle.await?;
454 return Ok(self.moq.connect(session).await?);
455 }
456 }
457
458 #[cfg(feature = "quiche")]
459 if let Some(quiche) = self.quiche.as_ref() {
460 let quic_url = url.clone();
461 let quic_handle = async {
462 let res = quiche.connect(quic_url).await;
463 if let Err(err) = &res {
464 tracing::warn!(%err, "QUIC connection failed");
465 }
466 res
467 };
468
469 #[cfg(feature = "websocket")]
470 {
471 let alpns = self.versions.alpns();
472 let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
473
474 return Ok(tokio::select! {
475 Ok(quic) = quic_handle => self.moq.connect(quic).await?,
476 Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
477 else => anyhow::bail!("failed to connect to server"),
478 });
479 }
480
481 #[cfg(not(feature = "websocket"))]
482 {
483 let session = quic_handle.await?;
484 return Ok(self.moq.connect(session).await?);
485 }
486 }
487
488 #[cfg(feature = "websocket")]
489 {
490 let alpns = self.versions.alpns();
491 let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
492 return Ok(self.moq.connect(session).await?);
493 }
494
495 #[cfg(not(feature = "websocket"))]
496 anyhow::bail!("no QUIC backend matched; this should not happen");
497 }
498}
499
500use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
501
502#[derive(Debug)]
503struct NoCertificateVerification(crypto::Provider);
504
505impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
506 fn verify_server_cert(
507 &self,
508 _end_entity: &CertificateDer<'_>,
509 _intermediates: &[CertificateDer<'_>],
510 _server_name: &ServerName<'_>,
511 _ocsp: &[u8],
512 _now: UnixTime,
513 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
514 Ok(rustls::client::danger::ServerCertVerified::assertion())
515 }
516
517 fn verify_tls12_signature(
518 &self,
519 message: &[u8],
520 cert: &CertificateDer<'_>,
521 dss: &rustls::DigitallySignedStruct,
522 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
523 rustls::crypto::verify_tls12_signature(message, cert, dss, &self.0.signature_verification_algorithms)
524 }
525
526 fn verify_tls13_signature(
527 &self,
528 message: &[u8],
529 cert: &CertificateDer<'_>,
530 dss: &rustls::DigitallySignedStruct,
531 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
532 rustls::crypto::verify_tls13_signature(message, cert, dss, &self.0.signature_verification_algorithms)
533 }
534
535 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
536 self.0.signature_verification_algorithms.supported_schemes()
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use clap::Parser;
544
545 #[test]
546 fn test_toml_disable_verify_survives_update_from() {
547 let toml = r#"
548 tls.disable_verify = true
549 "#;
550
551 let mut config: ClientConfig = toml::from_str(toml).unwrap();
552 assert_eq!(config.tls.disable_verify, Some(true));
553
554 config.update_from(["test"]);
556 assert_eq!(config.tls.disable_verify, Some(true));
557 }
558
559 #[test]
560 fn test_cli_disable_verify_flag() {
561 let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
562 assert_eq!(config.tls.disable_verify, Some(true));
563 }
564
565 #[test]
566 fn test_cli_disable_verify_explicit_false() {
567 let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
568 assert_eq!(config.tls.disable_verify, Some(false));
569 }
570
571 #[test]
572 fn test_cli_disable_verify_explicit_true() {
573 let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
574 assert_eq!(config.tls.disable_verify, Some(true));
575 }
576
577 #[test]
578 fn test_cli_no_disable_verify() {
579 let config = ClientConfig::parse_from(["test"]);
580 assert_eq!(config.tls.disable_verify, None);
581 }
582
583 #[test]
584 fn test_toml_version_survives_update_from() {
585 let toml = r#"
586 version = ["moq-lite-02"]
587 "#;
588
589 let mut config: ClientConfig = toml::from_str(toml).unwrap();
590 assert_eq!(
591 config.version,
592 vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
593 );
594
595 config.update_from(["test"]);
597 assert_eq!(
598 config.version,
599 vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
600 );
601 }
602
603 #[test]
604 fn test_cli_version() {
605 let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
606 assert_eq!(
607 config.version,
608 vec!["moq-lite-03".parse::<moq_lite::Version>().unwrap()]
609 );
610 }
611
612 #[test]
613 fn test_cli_no_version_defaults_to_all() {
614 let config = ClientConfig::parse_from(["test"]);
615 assert!(config.version.is_empty());
616 assert_eq!(config.versions().alpns().len(), moq_lite::ALPNS.len());
618 }
619}