Skip to main content

moq_native/
client.rs

1use crate::crypto;
2use crate::{Backoff, QuicBackend, Reconnect};
3use anyhow::Context;
4use std::path::PathBuf;
5use std::{net, sync::Arc};
6use url::Url;
7
8/// TLS configuration for the client.
9#[serde_with::serde_as]
10#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
11#[serde(default, deny_unknown_fields)]
12#[non_exhaustive]
13pub struct ClientTls {
14	/// Use the TLS root at this path, encoded as PEM.
15	///
16	/// This value can be provided multiple times for multiple roots.
17	/// If this is empty, system roots will be used instead.
18	/// In config files, accepts either a single string or a TOML array.
19	#[serde(skip_serializing_if = "Vec::is_empty")]
20	#[arg(id = "tls-root", long = "tls-root", env = "MOQ_CLIENT_TLS_ROOT")]
21	#[serde_as(as = "serde_with::OneOrMany<_>")]
22	pub root: Vec<PathBuf>,
23
24	/// PEM file containing the client certificate chain for mTLS.
25	///
26	/// Only certificates are extracted; any private keys in the file are ignored.
27	/// Must be paired with `--client-tls-key`.
28	#[serde(skip_serializing_if = "Option::is_none")]
29	#[arg(id = "client-tls-cert", long = "client-tls-cert", env = "MOQ_CLIENT_TLS_CERT")]
30	pub cert: Option<PathBuf>,
31
32	/// PEM file containing the private key for mTLS.
33	///
34	/// Only the private key is extracted; any certificates in the file are ignored.
35	/// Must be paired with `--client-tls-cert`.
36	#[serde(skip_serializing_if = "Option::is_none")]
37	#[arg(id = "client-tls-key", long = "client-tls-key", env = "MOQ_CLIENT_TLS_KEY")]
38	pub key: Option<PathBuf>,
39
40	/// Danger: Disable TLS certificate verification.
41	///
42	/// Fine for local development and between relays, but should be used in caution in production.
43	#[serde(skip_serializing_if = "Option::is_none")]
44	#[arg(
45		id = "tls-disable-verify",
46		long = "tls-disable-verify",
47		env = "MOQ_CLIENT_TLS_DISABLE_VERIFY",
48		default_missing_value = "true",
49		num_args = 0..=1,
50		require_equals = true,
51		value_parser = clap::value_parser!(bool),
52	)]
53	pub disable_verify: Option<bool>,
54}
55
56/// Configuration for the MoQ client.
57#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
58#[serde(deny_unknown_fields, default)]
59#[non_exhaustive]
60pub struct ClientConfig {
61	/// Listen for UDP packets on the given address.
62	#[arg(
63		id = "client-bind",
64		long = "client-bind",
65		default_value = "[::]:0",
66		env = "MOQ_CLIENT_BIND"
67	)]
68	pub bind: net::SocketAddr,
69
70	/// The QUIC backend to use.
71	/// Auto-detected from compiled features if not specified.
72	#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
73	pub backend: Option<QuicBackend>,
74
75	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
76	#[serde(skip_serializing_if = "Option::is_none")]
77	#[arg(
78		id = "client-max-streams",
79		long = "client-max-streams",
80		env = "MOQ_CLIENT_MAX_STREAMS"
81	)]
82	pub max_streams: Option<u64>,
83
84	/// Restrict the client to specific MoQ protocol version(s).
85	///
86	/// By default, the client offers all supported versions and lets the server choose.
87	/// Use this to force a specific version, e.g. `--client-version moq-lite-02`.
88	/// Can be specified multiple times to offer a subset of versions.
89	///
90	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16, moq-transport-17
91	#[serde(default, skip_serializing_if = "Vec::is_empty")]
92	#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
93	pub version: Vec<moq_lite::Version>,
94
95	#[command(flatten)]
96	#[serde(default)]
97	pub tls: ClientTls,
98
99	#[command(flatten)]
100	#[serde(default)]
101	pub backoff: Backoff,
102
103	#[cfg(feature = "websocket")]
104	#[command(flatten)]
105	#[serde(default)]
106	pub websocket: super::ClientWebSocket,
107}
108
109impl ClientTls {
110	/// Build a [`rustls::ClientConfig`] from this configuration.
111	///
112	/// Loads the configured roots (or the platform's native roots if none),
113	/// optionally attaches a client identity for mTLS, and disables server
114	/// certificate verification when `disable_verify` is set.
115	pub fn build(&self) -> anyhow::Result<rustls::ClientConfig> {
116		use rustls::pki_types::CertificateDer;
117
118		let provider = crypto::provider();
119
120		let mut roots = rustls::RootCertStore::empty();
121		if self.root.is_empty() {
122			let native = rustls_native_certs::load_native_certs();
123			for err in native.errors {
124				tracing::warn!(%err, "failed to load root cert");
125			}
126			for cert in native.certs {
127				roots.add(cert).context("failed to add root cert")?;
128			}
129		} else {
130			for root in &self.root {
131				let file = std::fs::File::open(root).context("failed to open root cert file")?;
132				let mut reader = std::io::BufReader::new(file);
133				let cert = rustls_pemfile::certs(&mut reader)
134					.next()
135					.context("no roots found")?
136					.context("failed to read root cert")?;
137				roots.add(cert).context("failed to add root cert")?;
138			}
139		}
140
141		// Allow TLS 1.2 in addition to 1.3 for WebSocket compatibility.
142		// QUIC always negotiates TLS 1.3 regardless of this setting.
143		let builder = rustls::ClientConfig::builder_with_provider(provider.clone())
144			.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
145			.with_root_certificates(roots);
146
147		let mut tls = match (&self.cert, &self.key) {
148			(Some(cert_path), Some(key_path)) => {
149				let cert_pem = std::fs::read(cert_path).context("failed to read client certificate")?;
150				let chain: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut cert_pem.as_slice())
151					.collect::<Result<_, _>>()
152					.context("failed to parse client certificate")?;
153				anyhow::ensure!(!chain.is_empty(), "no certificates found in client certificate");
154				let key_pem = std::fs::read(key_path).context("failed to read client key")?;
155				let key = rustls_pemfile::private_key(&mut key_pem.as_slice())
156					.context("failed to parse client key")?
157					.context("no private key found in client key")?;
158				builder
159					.with_client_auth_cert(chain, key)
160					.context("failed to configure client certificate")?
161			}
162			(None, None) => builder.with_no_client_auth(),
163			_ => anyhow::bail!("both --client-tls-cert and --client-tls-key must be provided"),
164		};
165
166		if self.disable_verify.unwrap_or_default() {
167			tracing::warn!("TLS server certificate verification is disabled; A man-in-the-middle attack is possible.");
168			let noop = NoCertificateVerification(provider);
169			tls.dangerous().set_certificate_verifier(Arc::new(noop));
170		}
171
172		Ok(tls)
173	}
174}
175
176impl ClientConfig {
177	pub fn init(self) -> anyhow::Result<Client> {
178		Client::new(self)
179	}
180
181	/// Returns the configured versions, defaulting to all if none specified.
182	pub fn versions(&self) -> moq_lite::Versions {
183		if self.version.is_empty() {
184			moq_lite::Versions::all()
185		} else {
186			moq_lite::Versions::from(self.version.clone())
187		}
188	}
189}
190
191impl Default for ClientConfig {
192	fn default() -> Self {
193		Self {
194			bind: "[::]:0".parse().unwrap(),
195			backend: None,
196			max_streams: None,
197			version: Vec::new(),
198			tls: ClientTls::default(),
199			backoff: Backoff::default(),
200			#[cfg(feature = "websocket")]
201			websocket: super::ClientWebSocket::default(),
202		}
203	}
204}
205
206/// Client for establishing MoQ connections over QUIC, WebTransport, or WebSocket.
207///
208/// Create via [`ClientConfig::init`] or [`Client::new`].
209#[derive(Clone)]
210pub struct Client {
211	moq: moq_lite::Client,
212	versions: moq_lite::Versions,
213	backoff: Backoff,
214	#[cfg(feature = "websocket")]
215	websocket: super::ClientWebSocket,
216	tls: rustls::ClientConfig,
217	#[cfg(feature = "noq")]
218	noq: Option<crate::noq::NoqClient>,
219	#[cfg(feature = "quinn")]
220	quinn: Option<crate::quinn::QuinnClient>,
221	#[cfg(feature = "quiche")]
222	quiche: Option<crate::quiche::QuicheClient>,
223	#[cfg(feature = "iroh")]
224	iroh: Option<web_transport_iroh::iroh::Endpoint>,
225	#[cfg(feature = "iroh")]
226	iroh_addrs: Vec<std::net::SocketAddr>,
227}
228
229impl Client {
230	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
231	pub fn new(_config: ClientConfig) -> anyhow::Result<Self> {
232		anyhow::bail!("no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature");
233	}
234
235	/// Create a new client
236	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
237	pub fn new(config: ClientConfig) -> anyhow::Result<Self> {
238		#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
239		let backend = config.backend.clone().unwrap_or({
240			#[cfg(feature = "quinn")]
241			{
242				QuicBackend::Quinn
243			}
244			#[cfg(all(feature = "noq", not(feature = "quinn")))]
245			{
246				QuicBackend::Noq
247			}
248			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
249			{
250				QuicBackend::Quiche
251			}
252			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
253			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
254		});
255
256		let tls = config.tls.build()?;
257
258		#[cfg(feature = "noq")]
259		#[allow(unreachable_patterns)]
260		let noq = match backend {
261			QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
262			_ => None,
263		};
264
265		#[cfg(feature = "quinn")]
266		#[allow(unreachable_patterns)]
267		let quinn = match backend {
268			QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
269			_ => None,
270		};
271
272		#[cfg(feature = "quiche")]
273		let quiche = match backend {
274			QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
275			_ => None,
276		};
277
278		let versions = config.versions();
279		Ok(Self {
280			moq: moq_lite::Client::new().with_versions(versions.clone()),
281			versions,
282			backoff: config.backoff,
283			#[cfg(feature = "websocket")]
284			websocket: config.websocket,
285			tls,
286			#[cfg(feature = "noq")]
287			noq,
288			#[cfg(feature = "quinn")]
289			quinn,
290			#[cfg(feature = "quiche")]
291			quiche,
292			#[cfg(feature = "iroh")]
293			iroh: None,
294			#[cfg(feature = "iroh")]
295			iroh_addrs: Vec::new(),
296		})
297	}
298
299	#[cfg(feature = "iroh")]
300	pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
301		self.iroh = iroh;
302		self
303	}
304
305	/// Set direct IP addresses for connecting to iroh peers.
306	///
307	/// This is useful when the peer's IP addresses are known ahead of time,
308	/// bypassing the need for peer discovery (e.g. in tests or local networks).
309	#[cfg(feature = "iroh")]
310	pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
311		self.iroh_addrs = addrs;
312		self
313	}
314
315	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
316		self.moq = self.moq.with_publish(publish);
317		self
318	}
319
320	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
321		self.moq = self.moq.with_consume(consume);
322		self
323	}
324
325	/// Start a background reconnect loop that connects to the given URL,
326	/// waits for the session to close, then reconnects with exponential backoff.
327	///
328	/// Returns a [`Reconnect`] handle. Drop or call [`Reconnect::close`] to stop.
329	pub fn reconnect(&self, url: Url) -> Reconnect {
330		Reconnect::new(self.clone(), url, self.backoff.clone())
331	}
332
333	#[cfg(not(any(
334		feature = "noq",
335		feature = "quinn",
336		feature = "quiche",
337		feature = "iroh",
338		feature = "websocket"
339	)))]
340	pub async fn connect(&self, _url: Url) -> anyhow::Result<moq_lite::Session> {
341		anyhow::bail!("no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature");
342	}
343
344	#[cfg(any(
345		feature = "noq",
346		feature = "quinn",
347		feature = "quiche",
348		feature = "iroh",
349		feature = "websocket"
350	))]
351	pub async fn connect(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
352		let session = self.connect_inner(url).await?;
353		tracing::info!(version = %session.version(), "connected");
354		Ok(session)
355	}
356
357	#[cfg(any(
358		feature = "noq",
359		feature = "quinn",
360		feature = "quiche",
361		feature = "iroh",
362		feature = "websocket"
363	))]
364	async fn connect_inner(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
365		#[cfg(feature = "iroh")]
366		if url.scheme() == "iroh" {
367			let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
368			let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
369			let session = self.moq.connect(session).await?;
370			return Ok(session);
371		}
372
373		#[cfg(feature = "noq")]
374		if let Some(noq) = self.noq.as_ref() {
375			let tls = self.tls.clone();
376			let quic_url = url.clone();
377			let quic_handle = async {
378				let res = noq.connect(&tls, quic_url).await;
379				if let Err(err) = &res {
380					tracing::warn!(%err, "QUIC connection failed");
381				}
382				res
383			};
384
385			#[cfg(feature = "websocket")]
386			{
387				let alpns = self.versions.alpns();
388				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
389
390				return Ok(tokio::select! {
391					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
392					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
393					else => anyhow::bail!("failed to connect to server"),
394				});
395			}
396
397			#[cfg(not(feature = "websocket"))]
398			{
399				let session = quic_handle.await?;
400				return Ok(self.moq.connect(session).await?);
401			}
402		}
403
404		#[cfg(feature = "quinn")]
405		if let Some(quinn) = self.quinn.as_ref() {
406			let tls = self.tls.clone();
407			let quic_url = url.clone();
408			let quic_handle = async {
409				let res = quinn.connect(&tls, quic_url).await;
410				if let Err(err) = &res {
411					tracing::warn!(%err, "QUIC connection failed");
412				}
413				res
414			};
415
416			#[cfg(feature = "websocket")]
417			{
418				let alpns = self.versions.alpns();
419				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
420
421				return Ok(tokio::select! {
422					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
423					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
424					else => anyhow::bail!("failed to connect to server"),
425				});
426			}
427
428			#[cfg(not(feature = "websocket"))]
429			{
430				let session = quic_handle.await?;
431				return Ok(self.moq.connect(session).await?);
432			}
433		}
434
435		#[cfg(feature = "quiche")]
436		if let Some(quiche) = self.quiche.as_ref() {
437			let quic_url = url.clone();
438			let quic_handle = async {
439				let res = quiche.connect(quic_url).await;
440				if let Err(err) = &res {
441					tracing::warn!(%err, "QUIC connection failed");
442				}
443				res
444			};
445
446			#[cfg(feature = "websocket")]
447			{
448				let alpns = self.versions.alpns();
449				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
450
451				return Ok(tokio::select! {
452					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
453					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
454					else => anyhow::bail!("failed to connect to server"),
455				});
456			}
457
458			#[cfg(not(feature = "websocket"))]
459			{
460				let session = quic_handle.await?;
461				return Ok(self.moq.connect(session).await?);
462			}
463		}
464
465		#[cfg(feature = "websocket")]
466		{
467			let alpns = self.versions.alpns();
468			let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
469			return Ok(self.moq.connect(session).await?);
470		}
471
472		#[cfg(not(feature = "websocket"))]
473		anyhow::bail!("no QUIC backend matched; this should not happen");
474	}
475}
476
477use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
478
479#[derive(Debug)]
480struct NoCertificateVerification(crypto::Provider);
481
482impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
483	fn verify_server_cert(
484		&self,
485		_end_entity: &CertificateDer<'_>,
486		_intermediates: &[CertificateDer<'_>],
487		_server_name: &ServerName<'_>,
488		_ocsp: &[u8],
489		_now: UnixTime,
490	) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
491		Ok(rustls::client::danger::ServerCertVerified::assertion())
492	}
493
494	fn verify_tls12_signature(
495		&self,
496		message: &[u8],
497		cert: &CertificateDer<'_>,
498		dss: &rustls::DigitallySignedStruct,
499	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
500		rustls::crypto::verify_tls12_signature(message, cert, dss, &self.0.signature_verification_algorithms)
501	}
502
503	fn verify_tls13_signature(
504		&self,
505		message: &[u8],
506		cert: &CertificateDer<'_>,
507		dss: &rustls::DigitallySignedStruct,
508	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
509		rustls::crypto::verify_tls13_signature(message, cert, dss, &self.0.signature_verification_algorithms)
510	}
511
512	fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
513		self.0.signature_verification_algorithms.supported_schemes()
514	}
515}
516
517#[cfg(test)]
518mod tests {
519	use super::*;
520	use clap::Parser;
521
522	#[test]
523	fn test_toml_disable_verify_survives_update_from() {
524		let toml = r#"
525			tls.disable_verify = true
526		"#;
527
528		let mut config: ClientConfig = toml::from_str(toml).unwrap();
529		assert_eq!(config.tls.disable_verify, Some(true));
530
531		// Simulate: TOML loaded, then CLI args re-applied (no --tls-disable-verify flag).
532		config.update_from(["test"]);
533		assert_eq!(config.tls.disable_verify, Some(true));
534	}
535
536	#[test]
537	fn test_cli_disable_verify_flag() {
538		let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
539		assert_eq!(config.tls.disable_verify, Some(true));
540	}
541
542	#[test]
543	fn test_cli_disable_verify_explicit_false() {
544		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
545		assert_eq!(config.tls.disable_verify, Some(false));
546	}
547
548	#[test]
549	fn test_cli_disable_verify_explicit_true() {
550		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
551		assert_eq!(config.tls.disable_verify, Some(true));
552	}
553
554	#[test]
555	fn test_cli_no_disable_verify() {
556		let config = ClientConfig::parse_from(["test"]);
557		assert_eq!(config.tls.disable_verify, None);
558	}
559
560	#[test]
561	fn test_toml_version_survives_update_from() {
562		let toml = r#"
563			version = ["moq-lite-02"]
564		"#;
565
566		let mut config: ClientConfig = toml::from_str(toml).unwrap();
567		assert_eq!(
568			config.version,
569			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
570		);
571
572		// Simulate: TOML loaded, then CLI args re-applied (no --client-version flag).
573		config.update_from(["test"]);
574		assert_eq!(
575			config.version,
576			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
577		);
578	}
579
580	#[test]
581	fn test_cli_version() {
582		let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
583		assert_eq!(
584			config.version,
585			vec!["moq-lite-03".parse::<moq_lite::Version>().unwrap()]
586		);
587	}
588
589	#[test]
590	fn test_cli_no_version_defaults_to_all() {
591		let config = ClientConfig::parse_from(["test"]);
592		assert!(config.version.is_empty());
593		// versions() helper returns all when none specified
594		assert_eq!(config.versions().alpns().len(), moq_lite::ALPNS.len());
595	}
596}