Skip to main content

moq_native/
client.rs

1use crate::crypto;
2use crate::{Backoff, QuicBackend, Reconnect};
3use anyhow::Context;
4use std::path::PathBuf;
5use std::{net, sync::Arc};
6use url::Url;
7
8/// TLS configuration for the client.
9#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
10#[serde(default, deny_unknown_fields)]
11#[non_exhaustive]
12pub struct ClientTls {
13	/// Use the TLS root at this path, encoded as PEM.
14	///
15	/// This value can be provided multiple times for multiple roots.
16	/// If this is empty, system roots will be used instead
17	#[serde(skip_serializing_if = "Vec::is_empty")]
18	#[arg(id = "tls-root", long = "tls-root", env = "MOQ_CLIENT_TLS_ROOT")]
19	pub root: Vec<PathBuf>,
20
21	/// PEM file containing the client certificate chain for mTLS.
22	///
23	/// Only certificates are extracted; any private keys in the file are ignored.
24	/// Must be paired with `--client-tls-key`.
25	#[serde(skip_serializing_if = "Option::is_none")]
26	#[arg(id = "client-tls-cert", long = "client-tls-cert", env = "MOQ_CLIENT_TLS_CERT")]
27	pub cert: Option<PathBuf>,
28
29	/// PEM file containing the private key for mTLS.
30	///
31	/// Only the private key is extracted; any certificates in the file are ignored.
32	/// Must be paired with `--client-tls-cert`.
33	#[serde(skip_serializing_if = "Option::is_none")]
34	#[arg(id = "client-tls-key", long = "client-tls-key", env = "MOQ_CLIENT_TLS_KEY")]
35	pub key: Option<PathBuf>,
36
37	/// Danger: Disable TLS certificate verification.
38	///
39	/// Fine for local development and between relays, but should be used in caution in production.
40	#[serde(skip_serializing_if = "Option::is_none")]
41	#[arg(
42		id = "tls-disable-verify",
43		long = "tls-disable-verify",
44		env = "MOQ_CLIENT_TLS_DISABLE_VERIFY",
45		default_missing_value = "true",
46		num_args = 0..=1,
47		require_equals = true,
48		value_parser = clap::value_parser!(bool),
49	)]
50	pub disable_verify: Option<bool>,
51}
52
53/// Configuration for the MoQ client.
54#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
55#[serde(deny_unknown_fields, default)]
56#[non_exhaustive]
57pub struct ClientConfig {
58	/// Listen for UDP packets on the given address.
59	#[arg(
60		id = "client-bind",
61		long = "client-bind",
62		default_value = "[::]:0",
63		env = "MOQ_CLIENT_BIND"
64	)]
65	pub bind: net::SocketAddr,
66
67	/// The QUIC backend to use.
68	/// Auto-detected from compiled features if not specified.
69	#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
70	pub backend: Option<QuicBackend>,
71
72	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
73	#[serde(skip_serializing_if = "Option::is_none")]
74	#[arg(
75		id = "client-max-streams",
76		long = "client-max-streams",
77		env = "MOQ_CLIENT_MAX_STREAMS"
78	)]
79	pub max_streams: Option<u64>,
80
81	/// Restrict the client to specific MoQ protocol version(s).
82	///
83	/// By default, the client offers all supported versions and lets the server choose.
84	/// Use this to force a specific version, e.g. `--client-version moq-lite-02`.
85	/// Can be specified multiple times to offer a subset of versions.
86	///
87	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16, moq-transport-17
88	#[serde(default, skip_serializing_if = "Vec::is_empty")]
89	#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
90	pub version: Vec<moq_lite::Version>,
91
92	#[command(flatten)]
93	#[serde(default)]
94	pub tls: ClientTls,
95
96	#[command(flatten)]
97	#[serde(default)]
98	pub backoff: Backoff,
99
100	#[cfg(feature = "websocket")]
101	#[command(flatten)]
102	#[serde(default)]
103	pub websocket: super::ClientWebSocket,
104}
105
106impl ClientTls {
107	/// Build a [`rustls::ClientConfig`] from this configuration.
108	///
109	/// Loads the configured roots (or the platform's native roots if none),
110	/// optionally attaches a client identity for mTLS, and disables server
111	/// certificate verification when `disable_verify` is set.
112	pub fn build(&self) -> anyhow::Result<rustls::ClientConfig> {
113		use rustls::pki_types::CertificateDer;
114
115		let provider = crypto::provider();
116
117		let mut roots = rustls::RootCertStore::empty();
118		if self.root.is_empty() {
119			let native = rustls_native_certs::load_native_certs();
120			for err in native.errors {
121				tracing::warn!(%err, "failed to load root cert");
122			}
123			for cert in native.certs {
124				roots.add(cert).context("failed to add root cert")?;
125			}
126		} else {
127			for root in &self.root {
128				let file = std::fs::File::open(root).context("failed to open root cert file")?;
129				let mut reader = std::io::BufReader::new(file);
130				let cert = rustls_pemfile::certs(&mut reader)
131					.next()
132					.context("no roots found")?
133					.context("failed to read root cert")?;
134				roots.add(cert).context("failed to add root cert")?;
135			}
136		}
137
138		// Allow TLS 1.2 in addition to 1.3 for WebSocket compatibility.
139		// QUIC always negotiates TLS 1.3 regardless of this setting.
140		let builder = rustls::ClientConfig::builder_with_provider(provider.clone())
141			.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
142			.with_root_certificates(roots);
143
144		let mut tls = match (&self.cert, &self.key) {
145			(Some(cert_path), Some(key_path)) => {
146				let cert_pem = std::fs::read(cert_path).context("failed to read client certificate")?;
147				let chain: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut cert_pem.as_slice())
148					.collect::<Result<_, _>>()
149					.context("failed to parse client certificate")?;
150				anyhow::ensure!(!chain.is_empty(), "no certificates found in client certificate");
151				let key_pem = std::fs::read(key_path).context("failed to read client key")?;
152				let key = rustls_pemfile::private_key(&mut key_pem.as_slice())
153					.context("failed to parse client key")?
154					.context("no private key found in client key")?;
155				builder
156					.with_client_auth_cert(chain, key)
157					.context("failed to configure client certificate")?
158			}
159			(None, None) => builder.with_no_client_auth(),
160			_ => anyhow::bail!("both --client-tls-cert and --client-tls-key must be provided"),
161		};
162
163		if self.disable_verify.unwrap_or_default() {
164			tracing::warn!("TLS server certificate verification is disabled; A man-in-the-middle attack is possible.");
165			let noop = NoCertificateVerification(provider);
166			tls.dangerous().set_certificate_verifier(Arc::new(noop));
167		}
168
169		Ok(tls)
170	}
171
172	/// Parse the configured certificate PEM (if any) and return the first DNS
173	/// SAN on its leaf certificate.
174	///
175	/// Useful for sanity-checking that a caller's own cluster node name
176	/// matches the certificate they will present. Returns `Ok(None)` if no
177	/// certificate is configured.
178	pub fn cert_dns_name(&self) -> anyhow::Result<Option<String>> {
179		use rustls::pki_types::CertificateDer;
180
181		let Some(path) = self.cert.as_ref() else {
182			return Ok(None);
183		};
184		let pem = std::fs::read(path).context("failed to read client certificate")?;
185		let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut pem.as_slice())
186			.collect::<Result<_, _>>()
187			.context("failed to parse client certificate")?;
188		let leaf = certs.first().context("no certificates found")?;
189		let (_, cert) =
190			x509_parser::parse_x509_certificate(leaf.as_ref()).context("failed to parse client certificate")?;
191		let san = cert
192			.subject_alternative_name()
193			.context("failed to read subject alternative name extension")?
194			.and_then(|san| {
195				san.value.general_names.iter().find_map(|name| match name {
196					x509_parser::extensions::GeneralName::DNSName(n) => Some((*n).to_string()),
197					_ => None,
198				})
199			});
200		Ok(san)
201	}
202}
203
204impl ClientConfig {
205	pub fn init(self) -> anyhow::Result<Client> {
206		Client::new(self)
207	}
208
209	/// Returns the configured versions, defaulting to all if none specified.
210	pub fn versions(&self) -> moq_lite::Versions {
211		if self.version.is_empty() {
212			moq_lite::Versions::all()
213		} else {
214			moq_lite::Versions::from(self.version.clone())
215		}
216	}
217}
218
219impl Default for ClientConfig {
220	fn default() -> Self {
221		Self {
222			bind: "[::]:0".parse().unwrap(),
223			backend: None,
224			max_streams: None,
225			version: Vec::new(),
226			tls: ClientTls::default(),
227			backoff: Backoff::default(),
228			#[cfg(feature = "websocket")]
229			websocket: super::ClientWebSocket::default(),
230		}
231	}
232}
233
234/// Client for establishing MoQ connections over QUIC, WebTransport, or WebSocket.
235///
236/// Create via [`ClientConfig::init`] or [`Client::new`].
237#[derive(Clone)]
238pub struct Client {
239	moq: moq_lite::Client,
240	versions: moq_lite::Versions,
241	backoff: Backoff,
242	#[cfg(feature = "websocket")]
243	websocket: super::ClientWebSocket,
244	tls: rustls::ClientConfig,
245	#[cfg(feature = "noq")]
246	noq: Option<crate::noq::NoqClient>,
247	#[cfg(feature = "quinn")]
248	quinn: Option<crate::quinn::QuinnClient>,
249	#[cfg(feature = "quiche")]
250	quiche: Option<crate::quiche::QuicheClient>,
251	#[cfg(feature = "iroh")]
252	iroh: Option<web_transport_iroh::iroh::Endpoint>,
253	#[cfg(feature = "iroh")]
254	iroh_addrs: Vec<std::net::SocketAddr>,
255}
256
257impl Client {
258	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
259	pub fn new(_config: ClientConfig) -> anyhow::Result<Self> {
260		anyhow::bail!("no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature");
261	}
262
263	/// Create a new client
264	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
265	pub fn new(config: ClientConfig) -> anyhow::Result<Self> {
266		#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
267		let backend = config.backend.clone().unwrap_or({
268			#[cfg(feature = "quinn")]
269			{
270				QuicBackend::Quinn
271			}
272			#[cfg(all(feature = "noq", not(feature = "quinn")))]
273			{
274				QuicBackend::Noq
275			}
276			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
277			{
278				QuicBackend::Quiche
279			}
280			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
281			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
282		});
283
284		let tls = config.tls.build()?;
285
286		#[cfg(feature = "noq")]
287		#[allow(unreachable_patterns)]
288		let noq = match backend {
289			QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
290			_ => None,
291		};
292
293		#[cfg(feature = "quinn")]
294		#[allow(unreachable_patterns)]
295		let quinn = match backend {
296			QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
297			_ => None,
298		};
299
300		#[cfg(feature = "quiche")]
301		let quiche = match backend {
302			QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
303			_ => None,
304		};
305
306		let versions = config.versions();
307		Ok(Self {
308			moq: moq_lite::Client::new().with_versions(versions.clone()),
309			versions,
310			backoff: config.backoff,
311			#[cfg(feature = "websocket")]
312			websocket: config.websocket,
313			tls,
314			#[cfg(feature = "noq")]
315			noq,
316			#[cfg(feature = "quinn")]
317			quinn,
318			#[cfg(feature = "quiche")]
319			quiche,
320			#[cfg(feature = "iroh")]
321			iroh: None,
322			#[cfg(feature = "iroh")]
323			iroh_addrs: Vec::new(),
324		})
325	}
326
327	#[cfg(feature = "iroh")]
328	pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
329		self.iroh = iroh;
330		self
331	}
332
333	/// Set direct IP addresses for connecting to iroh peers.
334	///
335	/// This is useful when the peer's IP addresses are known ahead of time,
336	/// bypassing the need for peer discovery (e.g. in tests or local networks).
337	#[cfg(feature = "iroh")]
338	pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
339		self.iroh_addrs = addrs;
340		self
341	}
342
343	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
344		self.moq = self.moq.with_publish(publish);
345		self
346	}
347
348	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
349		self.moq = self.moq.with_consume(consume);
350		self
351	}
352
353	/// Start a background reconnect loop that connects to the given URL,
354	/// waits for the session to close, then reconnects with exponential backoff.
355	///
356	/// Returns a [`Reconnect`] handle. Drop or call [`Reconnect::close`] to stop.
357	pub fn reconnect(&self, url: Url) -> Reconnect {
358		Reconnect::new(self.clone(), url, self.backoff.clone())
359	}
360
361	#[cfg(not(any(
362		feature = "noq",
363		feature = "quinn",
364		feature = "quiche",
365		feature = "iroh",
366		feature = "websocket"
367	)))]
368	pub async fn connect(&self, _url: Url) -> anyhow::Result<moq_lite::Session> {
369		anyhow::bail!("no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature");
370	}
371
372	#[cfg(any(
373		feature = "noq",
374		feature = "quinn",
375		feature = "quiche",
376		feature = "iroh",
377		feature = "websocket"
378	))]
379	pub async fn connect(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
380		let session = self.connect_inner(url).await?;
381		tracing::info!(version = %session.version(), "connected");
382		Ok(session)
383	}
384
385	#[cfg(any(
386		feature = "noq",
387		feature = "quinn",
388		feature = "quiche",
389		feature = "iroh",
390		feature = "websocket"
391	))]
392	async fn connect_inner(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
393		#[cfg(feature = "iroh")]
394		if url.scheme() == "iroh" {
395			let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
396			let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
397			let session = self.moq.connect(session).await?;
398			return Ok(session);
399		}
400
401		#[cfg(feature = "noq")]
402		if let Some(noq) = self.noq.as_ref() {
403			let tls = self.tls.clone();
404			let quic_url = url.clone();
405			let quic_handle = async {
406				let res = noq.connect(&tls, quic_url).await;
407				if let Err(err) = &res {
408					tracing::warn!(%err, "QUIC connection failed");
409				}
410				res
411			};
412
413			#[cfg(feature = "websocket")]
414			{
415				let alpns = self.versions.alpns();
416				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
417
418				return Ok(tokio::select! {
419					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
420					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
421					else => anyhow::bail!("failed to connect to server"),
422				});
423			}
424
425			#[cfg(not(feature = "websocket"))]
426			{
427				let session = quic_handle.await?;
428				return Ok(self.moq.connect(session).await?);
429			}
430		}
431
432		#[cfg(feature = "quinn")]
433		if let Some(quinn) = self.quinn.as_ref() {
434			let tls = self.tls.clone();
435			let quic_url = url.clone();
436			let quic_handle = async {
437				let res = quinn.connect(&tls, quic_url).await;
438				if let Err(err) = &res {
439					tracing::warn!(%err, "QUIC connection failed");
440				}
441				res
442			};
443
444			#[cfg(feature = "websocket")]
445			{
446				let alpns = self.versions.alpns();
447				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
448
449				return Ok(tokio::select! {
450					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
451					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
452					else => anyhow::bail!("failed to connect to server"),
453				});
454			}
455
456			#[cfg(not(feature = "websocket"))]
457			{
458				let session = quic_handle.await?;
459				return Ok(self.moq.connect(session).await?);
460			}
461		}
462
463		#[cfg(feature = "quiche")]
464		if let Some(quiche) = self.quiche.as_ref() {
465			let quic_url = url.clone();
466			let quic_handle = async {
467				let res = quiche.connect(quic_url).await;
468				if let Err(err) = &res {
469					tracing::warn!(%err, "QUIC connection failed");
470				}
471				res
472			};
473
474			#[cfg(feature = "websocket")]
475			{
476				let alpns = self.versions.alpns();
477				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
478
479				return Ok(tokio::select! {
480					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
481					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
482					else => anyhow::bail!("failed to connect to server"),
483				});
484			}
485
486			#[cfg(not(feature = "websocket"))]
487			{
488				let session = quic_handle.await?;
489				return Ok(self.moq.connect(session).await?);
490			}
491		}
492
493		#[cfg(feature = "websocket")]
494		{
495			let alpns = self.versions.alpns();
496			let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
497			return Ok(self.moq.connect(session).await?);
498		}
499
500		#[cfg(not(feature = "websocket"))]
501		anyhow::bail!("no QUIC backend matched; this should not happen");
502	}
503}
504
505use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
506
507#[derive(Debug)]
508struct NoCertificateVerification(crypto::Provider);
509
510impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
511	fn verify_server_cert(
512		&self,
513		_end_entity: &CertificateDer<'_>,
514		_intermediates: &[CertificateDer<'_>],
515		_server_name: &ServerName<'_>,
516		_ocsp: &[u8],
517		_now: UnixTime,
518	) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
519		Ok(rustls::client::danger::ServerCertVerified::assertion())
520	}
521
522	fn verify_tls12_signature(
523		&self,
524		message: &[u8],
525		cert: &CertificateDer<'_>,
526		dss: &rustls::DigitallySignedStruct,
527	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
528		rustls::crypto::verify_tls12_signature(message, cert, dss, &self.0.signature_verification_algorithms)
529	}
530
531	fn verify_tls13_signature(
532		&self,
533		message: &[u8],
534		cert: &CertificateDer<'_>,
535		dss: &rustls::DigitallySignedStruct,
536	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
537		rustls::crypto::verify_tls13_signature(message, cert, dss, &self.0.signature_verification_algorithms)
538	}
539
540	fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
541		self.0.signature_verification_algorithms.supported_schemes()
542	}
543}
544
545#[cfg(test)]
546mod tests {
547	use super::*;
548	use clap::Parser;
549
550	#[test]
551	fn test_toml_disable_verify_survives_update_from() {
552		let toml = r#"
553			tls.disable_verify = true
554		"#;
555
556		let mut config: ClientConfig = toml::from_str(toml).unwrap();
557		assert_eq!(config.tls.disable_verify, Some(true));
558
559		// Simulate: TOML loaded, then CLI args re-applied (no --tls-disable-verify flag).
560		config.update_from(["test"]);
561		assert_eq!(config.tls.disable_verify, Some(true));
562	}
563
564	#[test]
565	fn test_cli_disable_verify_flag() {
566		let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
567		assert_eq!(config.tls.disable_verify, Some(true));
568	}
569
570	#[test]
571	fn test_cli_disable_verify_explicit_false() {
572		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
573		assert_eq!(config.tls.disable_verify, Some(false));
574	}
575
576	#[test]
577	fn test_cli_disable_verify_explicit_true() {
578		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
579		assert_eq!(config.tls.disable_verify, Some(true));
580	}
581
582	#[test]
583	fn test_cli_no_disable_verify() {
584		let config = ClientConfig::parse_from(["test"]);
585		assert_eq!(config.tls.disable_verify, None);
586	}
587
588	#[test]
589	fn test_toml_version_survives_update_from() {
590		let toml = r#"
591			version = ["moq-lite-02"]
592		"#;
593
594		let mut config: ClientConfig = toml::from_str(toml).unwrap();
595		assert_eq!(
596			config.version,
597			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
598		);
599
600		// Simulate: TOML loaded, then CLI args re-applied (no --client-version flag).
601		config.update_from(["test"]);
602		assert_eq!(
603			config.version,
604			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
605		);
606	}
607
608	#[test]
609	fn test_cli_version() {
610		let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
611		assert_eq!(
612			config.version,
613			vec!["moq-lite-03".parse::<moq_lite::Version>().unwrap()]
614		);
615	}
616
617	#[test]
618	fn test_cli_no_version_defaults_to_all() {
619		let config = ClientConfig::parse_from(["test"]);
620		assert!(config.version.is_empty());
621		// versions() helper returns all when none specified
622		assert_eq!(config.versions().alpns().len(), moq_lite::ALPNS.len());
623	}
624}