Skip to main content

moq_native/
client.rs

1use crate::crypto;
2use crate::{Backoff, QuicBackend, Reconnect};
3use anyhow::Context;
4use std::path::PathBuf;
5use std::{net, sync::Arc};
6use url::Url;
7
8/// TLS configuration for the client.
9#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
10#[serde(default, deny_unknown_fields)]
11#[non_exhaustive]
12pub struct ClientTls {
13	/// Use the TLS root at this path, encoded as PEM.
14	///
15	/// This value can be provided multiple times for multiple roots.
16	/// If this is empty, system roots will be used instead
17	#[serde(skip_serializing_if = "Vec::is_empty")]
18	#[arg(id = "tls-root", long = "tls-root", env = "MOQ_CLIENT_TLS_ROOT")]
19	pub root: Vec<PathBuf>,
20
21	/// Present a client certificate during the TLS handshake (mTLS).
22	///
23	/// The path must point at a single PEM file containing both the
24	/// certificate chain and the matching private key (in any order). This
25	/// is the same bundle layout used by curl's `--cert` and many PKI tools.
26	#[serde(skip_serializing_if = "Option::is_none")]
27	#[arg(
28		id = "client-tls-identity",
29		long = "client-tls-identity",
30		env = "MOQ_CLIENT_TLS_IDENTITY"
31	)]
32	pub identity: Option<PathBuf>,
33
34	/// Danger: Disable TLS certificate verification.
35	///
36	/// Fine for local development and between relays, but should be used in caution in production.
37	#[serde(skip_serializing_if = "Option::is_none")]
38	#[arg(
39		id = "tls-disable-verify",
40		long = "tls-disable-verify",
41		env = "MOQ_CLIENT_TLS_DISABLE_VERIFY",
42		default_missing_value = "true",
43		num_args = 0..=1,
44		require_equals = true,
45		value_parser = clap::value_parser!(bool),
46	)]
47	pub disable_verify: Option<bool>,
48}
49
50/// Configuration for the MoQ client.
51#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
52#[serde(deny_unknown_fields, default)]
53#[non_exhaustive]
54pub struct ClientConfig {
55	/// Listen for UDP packets on the given address.
56	#[arg(
57		id = "client-bind",
58		long = "client-bind",
59		default_value = "[::]:0",
60		env = "MOQ_CLIENT_BIND"
61	)]
62	pub bind: net::SocketAddr,
63
64	/// The QUIC backend to use.
65	/// Auto-detected from compiled features if not specified.
66	#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
67	pub backend: Option<QuicBackend>,
68
69	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
70	#[serde(skip_serializing_if = "Option::is_none")]
71	#[arg(
72		id = "client-max-streams",
73		long = "client-max-streams",
74		env = "MOQ_CLIENT_MAX_STREAMS"
75	)]
76	pub max_streams: Option<u64>,
77
78	/// Restrict the client to specific MoQ protocol version(s).
79	///
80	/// By default, the client offers all supported versions and lets the server choose.
81	/// Use this to force a specific version, e.g. `--client-version moq-lite-02`.
82	/// Can be specified multiple times to offer a subset of versions.
83	///
84	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16, moq-transport-17
85	#[serde(default, skip_serializing_if = "Vec::is_empty")]
86	#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
87	pub version: Vec<moq_lite::Version>,
88
89	#[command(flatten)]
90	#[serde(default)]
91	pub tls: ClientTls,
92
93	#[command(flatten)]
94	#[serde(default)]
95	pub backoff: Backoff,
96
97	#[cfg(feature = "websocket")]
98	#[command(flatten)]
99	#[serde(default)]
100	pub websocket: super::ClientWebSocket,
101}
102
103impl ClientTls {
104	/// Build a [`rustls::ClientConfig`] from this configuration.
105	///
106	/// Loads the configured roots (or the platform's native roots if none),
107	/// optionally attaches a client identity for mTLS, and disables server
108	/// certificate verification when `disable_verify` is set.
109	pub fn build(&self) -> anyhow::Result<rustls::ClientConfig> {
110		use rustls::pki_types::CertificateDer;
111
112		let provider = crypto::provider();
113
114		let mut roots = rustls::RootCertStore::empty();
115		if self.root.is_empty() {
116			let native = rustls_native_certs::load_native_certs();
117			for err in native.errors {
118				tracing::warn!(%err, "failed to load root cert");
119			}
120			for cert in native.certs {
121				roots.add(cert).context("failed to add root cert")?;
122			}
123		} else {
124			for root in &self.root {
125				let file = std::fs::File::open(root).context("failed to open root cert file")?;
126				let mut reader = std::io::BufReader::new(file);
127				let cert = rustls_pemfile::certs(&mut reader)
128					.next()
129					.context("no roots found")?
130					.context("failed to read root cert")?;
131				roots.add(cert).context("failed to add root cert")?;
132			}
133		}
134
135		// Allow TLS 1.2 in addition to 1.3 for WebSocket compatibility.
136		// QUIC always negotiates TLS 1.3 regardless of this setting.
137		let builder = rustls::ClientConfig::builder_with_provider(provider.clone())
138			.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
139			.with_root_certificates(roots);
140
141		let mut tls = match &self.identity {
142			Some(path) => {
143				let pem = std::fs::read(path).context("failed to read client identity")?;
144				let chain: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut pem.as_slice())
145					.collect::<Result<_, _>>()
146					.context("failed to parse client identity certs")?;
147				anyhow::ensure!(!chain.is_empty(), "no certificates found in client identity");
148				let key = rustls_pemfile::private_key(&mut pem.as_slice())
149					.context("failed to parse client identity key")?
150					.context("no private key found in client identity")?;
151				builder
152					.with_client_auth_cert(chain, key)
153					.context("failed to configure client certificate")?
154			}
155			None => builder.with_no_client_auth(),
156		};
157
158		if self.disable_verify.unwrap_or_default() {
159			tracing::warn!("TLS server certificate verification is disabled; A man-in-the-middle attack is possible.");
160			let noop = NoCertificateVerification(provider);
161			tls.dangerous().set_certificate_verifier(Arc::new(noop));
162		}
163
164		Ok(tls)
165	}
166
167	/// Parse the configured identity PEM (if any) and return the first DNS
168	/// SAN on its leaf certificate.
169	///
170	/// Useful for sanity-checking that a caller's own cluster node name
171	/// matches the identity they will present. Returns `Ok(None)` if no
172	/// identity is configured.
173	pub fn identity_dns_name(&self) -> anyhow::Result<Option<String>> {
174		use rustls::pki_types::CertificateDer;
175
176		let Some(path) = self.identity.as_ref() else {
177			return Ok(None);
178		};
179		let pem = std::fs::read(path).context("failed to read client identity")?;
180		let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut pem.as_slice())
181			.collect::<Result<_, _>>()
182			.context("failed to parse client identity certs")?;
183		let leaf = certs.first().context("no certificates found")?;
184		let (_, cert) =
185			x509_parser::parse_x509_certificate(leaf.as_ref()).context("failed to parse identity certificate")?;
186		let san = cert
187			.subject_alternative_name()
188			.context("failed to read subject alternative name extension")?
189			.and_then(|san| {
190				san.value.general_names.iter().find_map(|name| match name {
191					x509_parser::extensions::GeneralName::DNSName(n) => Some((*n).to_string()),
192					_ => None,
193				})
194			});
195		Ok(san)
196	}
197}
198
199impl ClientConfig {
200	pub fn init(self) -> anyhow::Result<Client> {
201		Client::new(self)
202	}
203
204	/// Returns the configured versions, defaulting to all if none specified.
205	pub fn versions(&self) -> moq_lite::Versions {
206		if self.version.is_empty() {
207			moq_lite::Versions::all()
208		} else {
209			moq_lite::Versions::from(self.version.clone())
210		}
211	}
212}
213
214impl Default for ClientConfig {
215	fn default() -> Self {
216		Self {
217			bind: "[::]:0".parse().unwrap(),
218			backend: None,
219			max_streams: None,
220			version: Vec::new(),
221			tls: ClientTls::default(),
222			backoff: Backoff::default(),
223			#[cfg(feature = "websocket")]
224			websocket: super::ClientWebSocket::default(),
225		}
226	}
227}
228
229/// Client for establishing MoQ connections over QUIC, WebTransport, or WebSocket.
230///
231/// Create via [`ClientConfig::init`] or [`Client::new`].
232#[derive(Clone)]
233pub struct Client {
234	moq: moq_lite::Client,
235	versions: moq_lite::Versions,
236	backoff: Backoff,
237	#[cfg(feature = "websocket")]
238	websocket: super::ClientWebSocket,
239	tls: rustls::ClientConfig,
240	#[cfg(feature = "noq")]
241	noq: Option<crate::noq::NoqClient>,
242	#[cfg(feature = "quinn")]
243	quinn: Option<crate::quinn::QuinnClient>,
244	#[cfg(feature = "quiche")]
245	quiche: Option<crate::quiche::QuicheClient>,
246	#[cfg(feature = "iroh")]
247	iroh: Option<web_transport_iroh::iroh::Endpoint>,
248	#[cfg(feature = "iroh")]
249	iroh_addrs: Vec<std::net::SocketAddr>,
250}
251
252impl Client {
253	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
254	pub fn new(_config: ClientConfig) -> anyhow::Result<Self> {
255		anyhow::bail!("no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature");
256	}
257
258	/// Create a new client
259	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
260	pub fn new(config: ClientConfig) -> anyhow::Result<Self> {
261		#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
262		let backend = config.backend.clone().unwrap_or({
263			#[cfg(feature = "quinn")]
264			{
265				QuicBackend::Quinn
266			}
267			#[cfg(all(feature = "noq", not(feature = "quinn")))]
268			{
269				QuicBackend::Noq
270			}
271			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
272			{
273				QuicBackend::Quiche
274			}
275			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
276			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
277		});
278
279		let tls = config.tls.build()?;
280
281		#[cfg(feature = "noq")]
282		#[allow(unreachable_patterns)]
283		let noq = match backend {
284			QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
285			_ => None,
286		};
287
288		#[cfg(feature = "quinn")]
289		#[allow(unreachable_patterns)]
290		let quinn = match backend {
291			QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
292			_ => None,
293		};
294
295		#[cfg(feature = "quiche")]
296		let quiche = match backend {
297			QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
298			_ => None,
299		};
300
301		let versions = config.versions();
302		Ok(Self {
303			moq: moq_lite::Client::new().with_versions(versions.clone()),
304			versions,
305			backoff: config.backoff,
306			#[cfg(feature = "websocket")]
307			websocket: config.websocket,
308			tls,
309			#[cfg(feature = "noq")]
310			noq,
311			#[cfg(feature = "quinn")]
312			quinn,
313			#[cfg(feature = "quiche")]
314			quiche,
315			#[cfg(feature = "iroh")]
316			iroh: None,
317			#[cfg(feature = "iroh")]
318			iroh_addrs: Vec::new(),
319		})
320	}
321
322	#[cfg(feature = "iroh")]
323	pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
324		self.iroh = iroh;
325		self
326	}
327
328	/// Set direct IP addresses for connecting to iroh peers.
329	///
330	/// This is useful when the peer's IP addresses are known ahead of time,
331	/// bypassing the need for peer discovery (e.g. in tests or local networks).
332	#[cfg(feature = "iroh")]
333	pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
334		self.iroh_addrs = addrs;
335		self
336	}
337
338	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
339		self.moq = self.moq.with_publish(publish);
340		self
341	}
342
343	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
344		self.moq = self.moq.with_consume(consume);
345		self
346	}
347
348	/// Start a background reconnect loop that connects to the given URL,
349	/// waits for the session to close, then reconnects with exponential backoff.
350	///
351	/// Returns a [`Reconnect`] handle. Drop or call [`Reconnect::close`] to stop.
352	pub fn reconnect(&self, url: Url) -> Reconnect {
353		Reconnect::new(self.clone(), url, self.backoff.clone())
354	}
355
356	#[cfg(not(any(
357		feature = "noq",
358		feature = "quinn",
359		feature = "quiche",
360		feature = "iroh",
361		feature = "websocket"
362	)))]
363	pub async fn connect(&self, _url: Url) -> anyhow::Result<moq_lite::Session> {
364		anyhow::bail!("no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature");
365	}
366
367	#[cfg(any(
368		feature = "noq",
369		feature = "quinn",
370		feature = "quiche",
371		feature = "iroh",
372		feature = "websocket"
373	))]
374	pub async fn connect(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
375		let session = self.connect_inner(url).await?;
376		tracing::info!(version = %session.version(), "connected");
377		Ok(session)
378	}
379
380	#[cfg(any(
381		feature = "noq",
382		feature = "quinn",
383		feature = "quiche",
384		feature = "iroh",
385		feature = "websocket"
386	))]
387	async fn connect_inner(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
388		#[cfg(feature = "iroh")]
389		if url.scheme() == "iroh" {
390			let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
391			let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
392			let session = self.moq.connect(session).await?;
393			return Ok(session);
394		}
395
396		#[cfg(feature = "noq")]
397		if let Some(noq) = self.noq.as_ref() {
398			let tls = self.tls.clone();
399			let quic_url = url.clone();
400			let quic_handle = async {
401				let res = noq.connect(&tls, quic_url).await;
402				if let Err(err) = &res {
403					tracing::warn!(%err, "QUIC connection failed");
404				}
405				res
406			};
407
408			#[cfg(feature = "websocket")]
409			{
410				let alpns = self.versions.alpns();
411				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
412
413				return Ok(tokio::select! {
414					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
415					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
416					else => anyhow::bail!("failed to connect to server"),
417				});
418			}
419
420			#[cfg(not(feature = "websocket"))]
421			{
422				let session = quic_handle.await?;
423				return Ok(self.moq.connect(session).await?);
424			}
425		}
426
427		#[cfg(feature = "quinn")]
428		if let Some(quinn) = self.quinn.as_ref() {
429			let tls = self.tls.clone();
430			let quic_url = url.clone();
431			let quic_handle = async {
432				let res = quinn.connect(&tls, quic_url).await;
433				if let Err(err) = &res {
434					tracing::warn!(%err, "QUIC connection failed");
435				}
436				res
437			};
438
439			#[cfg(feature = "websocket")]
440			{
441				let alpns = self.versions.alpns();
442				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
443
444				return Ok(tokio::select! {
445					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
446					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
447					else => anyhow::bail!("failed to connect to server"),
448				});
449			}
450
451			#[cfg(not(feature = "websocket"))]
452			{
453				let session = quic_handle.await?;
454				return Ok(self.moq.connect(session).await?);
455			}
456		}
457
458		#[cfg(feature = "quiche")]
459		if let Some(quiche) = self.quiche.as_ref() {
460			let quic_url = url.clone();
461			let quic_handle = async {
462				let res = quiche.connect(quic_url).await;
463				if let Err(err) = &res {
464					tracing::warn!(%err, "QUIC connection failed");
465				}
466				res
467			};
468
469			#[cfg(feature = "websocket")]
470			{
471				let alpns = self.versions.alpns();
472				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
473
474				return Ok(tokio::select! {
475					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
476					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
477					else => anyhow::bail!("failed to connect to server"),
478				});
479			}
480
481			#[cfg(not(feature = "websocket"))]
482			{
483				let session = quic_handle.await?;
484				return Ok(self.moq.connect(session).await?);
485			}
486		}
487
488		#[cfg(feature = "websocket")]
489		{
490			let alpns = self.versions.alpns();
491			let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
492			return Ok(self.moq.connect(session).await?);
493		}
494
495		#[cfg(not(feature = "websocket"))]
496		anyhow::bail!("no QUIC backend matched; this should not happen");
497	}
498}
499
500use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
501
502#[derive(Debug)]
503struct NoCertificateVerification(crypto::Provider);
504
505impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
506	fn verify_server_cert(
507		&self,
508		_end_entity: &CertificateDer<'_>,
509		_intermediates: &[CertificateDer<'_>],
510		_server_name: &ServerName<'_>,
511		_ocsp: &[u8],
512		_now: UnixTime,
513	) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
514		Ok(rustls::client::danger::ServerCertVerified::assertion())
515	}
516
517	fn verify_tls12_signature(
518		&self,
519		message: &[u8],
520		cert: &CertificateDer<'_>,
521		dss: &rustls::DigitallySignedStruct,
522	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
523		rustls::crypto::verify_tls12_signature(message, cert, dss, &self.0.signature_verification_algorithms)
524	}
525
526	fn verify_tls13_signature(
527		&self,
528		message: &[u8],
529		cert: &CertificateDer<'_>,
530		dss: &rustls::DigitallySignedStruct,
531	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
532		rustls::crypto::verify_tls13_signature(message, cert, dss, &self.0.signature_verification_algorithms)
533	}
534
535	fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
536		self.0.signature_verification_algorithms.supported_schemes()
537	}
538}
539
540#[cfg(test)]
541mod tests {
542	use super::*;
543	use clap::Parser;
544
545	#[test]
546	fn test_toml_disable_verify_survives_update_from() {
547		let toml = r#"
548			tls.disable_verify = true
549		"#;
550
551		let mut config: ClientConfig = toml::from_str(toml).unwrap();
552		assert_eq!(config.tls.disable_verify, Some(true));
553
554		// Simulate: TOML loaded, then CLI args re-applied (no --tls-disable-verify flag).
555		config.update_from(["test"]);
556		assert_eq!(config.tls.disable_verify, Some(true));
557	}
558
559	#[test]
560	fn test_cli_disable_verify_flag() {
561		let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
562		assert_eq!(config.tls.disable_verify, Some(true));
563	}
564
565	#[test]
566	fn test_cli_disable_verify_explicit_false() {
567		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
568		assert_eq!(config.tls.disable_verify, Some(false));
569	}
570
571	#[test]
572	fn test_cli_disable_verify_explicit_true() {
573		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
574		assert_eq!(config.tls.disable_verify, Some(true));
575	}
576
577	#[test]
578	fn test_cli_no_disable_verify() {
579		let config = ClientConfig::parse_from(["test"]);
580		assert_eq!(config.tls.disable_verify, None);
581	}
582
583	#[test]
584	fn test_toml_version_survives_update_from() {
585		let toml = r#"
586			version = ["moq-lite-02"]
587		"#;
588
589		let mut config: ClientConfig = toml::from_str(toml).unwrap();
590		assert_eq!(
591			config.version,
592			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
593		);
594
595		// Simulate: TOML loaded, then CLI args re-applied (no --client-version flag).
596		config.update_from(["test"]);
597		assert_eq!(
598			config.version,
599			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
600		);
601	}
602
603	#[test]
604	fn test_cli_version() {
605		let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
606		assert_eq!(
607			config.version,
608			vec!["moq-lite-03".parse::<moq_lite::Version>().unwrap()]
609		);
610	}
611
612	#[test]
613	fn test_cli_no_version_defaults_to_all() {
614		let config = ClientConfig::parse_from(["test"]);
615		assert!(config.version.is_empty());
616		// versions() helper returns all when none specified
617		assert_eq!(config.versions().alpns().len(), moq_lite::ALPNS.len());
618	}
619}