Skip to main content

moq_native/
client.rs

1use crate::crypto;
2use crate::{Backoff, QuicBackend, Reconnect};
3use anyhow::Context;
4use std::path::PathBuf;
5use std::{net, sync::Arc};
6use url::Url;
7
8/// TLS configuration for the client.
9#[serde_with::serde_as]
10#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
11#[serde(default, deny_unknown_fields)]
12#[non_exhaustive]
13pub struct ClientTls {
14	/// Use the TLS root at this path, encoded as PEM.
15	///
16	/// This value can be provided multiple times for multiple roots.
17	/// If this is empty, system roots will be used instead.
18	/// In config files, accepts either a single string or a TOML array.
19	#[serde(skip_serializing_if = "Vec::is_empty")]
20	#[arg(id = "tls-root", long = "tls-root", env = "MOQ_CLIENT_TLS_ROOT")]
21	#[serde_as(as = "serde_with::OneOrMany<_>")]
22	pub root: Vec<PathBuf>,
23
24	/// PEM file containing the client certificate chain for mTLS.
25	///
26	/// Only certificates are extracted; any private keys in the file are ignored.
27	/// Must be paired with `--client-tls-key`.
28	#[serde(skip_serializing_if = "Option::is_none")]
29	#[arg(id = "client-tls-cert", long = "client-tls-cert", env = "MOQ_CLIENT_TLS_CERT")]
30	pub cert: Option<PathBuf>,
31
32	/// PEM file containing the private key for mTLS.
33	///
34	/// Only the private key is extracted; any certificates in the file are ignored.
35	/// Must be paired with `--client-tls-cert`.
36	#[serde(skip_serializing_if = "Option::is_none")]
37	#[arg(id = "client-tls-key", long = "client-tls-key", env = "MOQ_CLIENT_TLS_KEY")]
38	pub key: Option<PathBuf>,
39
40	/// Danger: Disable TLS certificate verification.
41	///
42	/// Fine for local development and between relays, but should be used in caution in production.
43	#[serde(skip_serializing_if = "Option::is_none")]
44	#[arg(
45		id = "tls-disable-verify",
46		long = "tls-disable-verify",
47		env = "MOQ_CLIENT_TLS_DISABLE_VERIFY",
48		default_missing_value = "true",
49		num_args = 0..=1,
50		require_equals = true,
51		value_parser = clap::value_parser!(bool),
52	)]
53	pub disable_verify: Option<bool>,
54}
55
56/// Configuration for the MoQ client.
57#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
58#[serde(deny_unknown_fields, default)]
59#[non_exhaustive]
60pub struct ClientConfig {
61	/// Listen for UDP packets on the given address.
62	#[arg(
63		id = "client-bind",
64		long = "client-bind",
65		default_value = "[::]:0",
66		env = "MOQ_CLIENT_BIND"
67	)]
68	pub bind: net::SocketAddr,
69
70	/// The QUIC backend to use.
71	/// Auto-detected from compiled features if not specified.
72	#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
73	pub backend: Option<QuicBackend>,
74
75	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
76	#[serde(skip_serializing_if = "Option::is_none")]
77	#[arg(
78		id = "client-max-streams",
79		long = "client-max-streams",
80		env = "MOQ_CLIENT_MAX_STREAMS"
81	)]
82	pub max_streams: Option<u64>,
83
84	/// Restrict the client to specific MoQ protocol version(s).
85	///
86	/// By default, the client offers all supported versions and lets the server choose.
87	/// Use this to force a specific version, e.g. `--client-version moq-lite-02`.
88	/// Can be specified multiple times to offer a subset of versions.
89	///
90	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16, moq-transport-17
91	#[serde(default, skip_serializing_if = "Vec::is_empty")]
92	#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
93	pub version: Vec<moq_net::Version>,
94
95	#[command(flatten)]
96	#[serde(default)]
97	pub tls: ClientTls,
98
99	#[command(flatten)]
100	#[serde(default)]
101	pub backoff: Backoff,
102
103	#[cfg(feature = "websocket")]
104	#[command(flatten)]
105	#[serde(default)]
106	pub websocket: super::ClientWebSocket,
107}
108
109impl ClientTls {
110	/// Build a [`rustls::ClientConfig`] from this configuration.
111	///
112	/// Loads the configured roots (or the platform's native roots if none),
113	/// optionally attaches a client identity for mTLS, and disables server
114	/// certificate verification when `disable_verify` is set.
115	pub fn build(&self) -> anyhow::Result<rustls::ClientConfig> {
116		use rustls::pki_types::CertificateDer;
117		use rustls::pki_types::PrivateKeyDer;
118		use rustls::pki_types::pem::PemObject;
119
120		let provider = crypto::provider();
121
122		let mut roots = rustls::RootCertStore::empty();
123		if self.root.is_empty() {
124			let native = rustls_native_certs::load_native_certs();
125			for err in native.errors {
126				tracing::warn!(%err, "failed to load root cert");
127			}
128			for cert in native.certs {
129				roots.add(cert).context("failed to add root cert")?;
130			}
131		} else {
132			for root in &self.root {
133				let file = std::fs::File::open(root).context("failed to open root cert file")?;
134				let mut reader = std::io::BufReader::new(file);
135				let certs: Vec<CertificateDer<'static>> = CertificateDer::pem_reader_iter(&mut reader)
136					.collect::<Result<_, _>>()
137					.context("failed to read root cert")?;
138				anyhow::ensure!(!certs.is_empty(), "no roots found in {}", root.display());
139				for cert in certs {
140					roots.add(cert).context("failed to add root cert")?;
141				}
142			}
143		}
144
145		// Allow TLS 1.2 in addition to 1.3 for WebSocket compatibility.
146		// QUIC always negotiates TLS 1.3 regardless of this setting.
147		let builder = rustls::ClientConfig::builder_with_provider(provider.clone())
148			.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
149			.with_root_certificates(roots);
150
151		let mut tls = match (&self.cert, &self.key) {
152			(Some(cert_path), Some(key_path)) => {
153				let cert_pem = std::fs::read(cert_path).context("failed to read client certificate")?;
154				let chain: Vec<CertificateDer<'static>> = CertificateDer::pem_slice_iter(&cert_pem)
155					.collect::<Result<_, _>>()
156					.context("failed to parse client certificate")?;
157				anyhow::ensure!(!chain.is_empty(), "no certificates found in client certificate");
158				let key_pem = std::fs::read(key_path).context("failed to read client key")?;
159				let key = PrivateKeyDer::from_pem_slice(&key_pem).context("failed to parse client key")?;
160				builder
161					.with_client_auth_cert(chain, key)
162					.context("failed to configure client certificate")?
163			}
164			(None, None) => builder.with_no_client_auth(),
165			_ => anyhow::bail!("both --client-tls-cert and --client-tls-key must be provided"),
166		};
167
168		if self.disable_verify.unwrap_or_default() {
169			tracing::warn!("TLS server certificate verification is disabled; A man-in-the-middle attack is possible.");
170			let noop = NoCertificateVerification(provider);
171			tls.dangerous().set_certificate_verifier(Arc::new(noop));
172		}
173
174		Ok(tls)
175	}
176}
177
178impl ClientConfig {
179	pub fn init(self) -> anyhow::Result<Client> {
180		Client::new(self)
181	}
182
183	/// Returns the configured versions, defaulting to all if none specified.
184	pub fn versions(&self) -> moq_net::Versions {
185		if self.version.is_empty() {
186			moq_net::Versions::all()
187		} else {
188			moq_net::Versions::from(self.version.clone())
189		}
190	}
191}
192
193impl Default for ClientConfig {
194	fn default() -> Self {
195		Self {
196			bind: "[::]:0".parse().unwrap(),
197			backend: None,
198			max_streams: None,
199			version: Vec::new(),
200			tls: ClientTls::default(),
201			backoff: Backoff::default(),
202			#[cfg(feature = "websocket")]
203			websocket: super::ClientWebSocket::default(),
204		}
205	}
206}
207
208/// Client for establishing MoQ connections over QUIC, WebTransport, or WebSocket.
209///
210/// Create via [`ClientConfig::init`] or [`Client::new`].
211#[derive(Clone)]
212pub struct Client {
213	moq: moq_net::Client,
214	versions: moq_net::Versions,
215	backoff: Backoff,
216	#[cfg(feature = "websocket")]
217	websocket: super::ClientWebSocket,
218	tls: rustls::ClientConfig,
219	#[cfg(feature = "noq")]
220	noq: Option<crate::noq::NoqClient>,
221	#[cfg(feature = "quinn")]
222	quinn: Option<crate::quinn::QuinnClient>,
223	#[cfg(feature = "quiche")]
224	quiche: Option<crate::quiche::QuicheClient>,
225	#[cfg(feature = "iroh")]
226	iroh: Option<web_transport_iroh::iroh::Endpoint>,
227	#[cfg(feature = "iroh")]
228	iroh_addrs: Vec<std::net::SocketAddr>,
229}
230
231impl Client {
232	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
233	pub fn new(_config: ClientConfig) -> anyhow::Result<Self> {
234		anyhow::bail!("no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature");
235	}
236
237	/// Create a new client
238	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
239	pub fn new(config: ClientConfig) -> anyhow::Result<Self> {
240		#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
241		let backend = config.backend.clone().unwrap_or({
242			#[cfg(feature = "quinn")]
243			{
244				QuicBackend::Quinn
245			}
246			#[cfg(all(feature = "noq", not(feature = "quinn")))]
247			{
248				QuicBackend::Noq
249			}
250			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
251			{
252				QuicBackend::Quiche
253			}
254			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
255			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
256		});
257
258		let tls = config.tls.build()?;
259
260		#[cfg(feature = "noq")]
261		#[allow(unreachable_patterns)]
262		let noq = match backend {
263			QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
264			_ => None,
265		};
266
267		#[cfg(feature = "quinn")]
268		#[allow(unreachable_patterns)]
269		let quinn = match backend {
270			QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
271			_ => None,
272		};
273
274		#[cfg(feature = "quiche")]
275		let quiche = match backend {
276			QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
277			_ => None,
278		};
279
280		let versions = config.versions();
281		Ok(Self {
282			moq: moq_net::Client::new().with_versions(versions.clone()),
283			versions,
284			backoff: config.backoff,
285			#[cfg(feature = "websocket")]
286			websocket: config.websocket,
287			tls,
288			#[cfg(feature = "noq")]
289			noq,
290			#[cfg(feature = "quinn")]
291			quinn,
292			#[cfg(feature = "quiche")]
293			quiche,
294			#[cfg(feature = "iroh")]
295			iroh: None,
296			#[cfg(feature = "iroh")]
297			iroh_addrs: Vec::new(),
298		})
299	}
300
301	#[cfg(feature = "iroh")]
302	pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
303		self.iroh = iroh;
304		self
305	}
306
307	/// Set direct IP addresses for connecting to iroh peers.
308	///
309	/// This is useful when the peer's IP addresses are known ahead of time,
310	/// bypassing the need for peer discovery (e.g. in tests or local networks).
311	#[cfg(feature = "iroh")]
312	pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
313		self.iroh_addrs = addrs;
314		self
315	}
316
317	pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
318		self.moq = self.moq.with_publish(publish);
319		self
320	}
321
322	pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
323		self.moq = self.moq.with_consume(consume);
324		self
325	}
326
327	/// Attach a tier-scoped [`moq_net::StatsHandle`] to all sessions opened by this client.
328	pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
329		self.moq = self.moq.with_stats(stats);
330		self
331	}
332
333	/// Start a background reconnect loop that connects to the given URL,
334	/// waits for the session to close, then reconnects with exponential backoff.
335	///
336	/// Returns a [`Reconnect`] handle; drop the last handle to stop the loop.
337	pub fn reconnect(&self, url: Url) -> Reconnect {
338		Reconnect::new(self.clone(), url, self.backoff.clone())
339	}
340
341	#[cfg(not(any(
342		feature = "noq",
343		feature = "quinn",
344		feature = "quiche",
345		feature = "iroh",
346		feature = "websocket"
347	)))]
348	pub async fn connect(&self, _url: Url) -> anyhow::Result<moq_net::Session> {
349		anyhow::bail!("no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature");
350	}
351
352	#[cfg(any(
353		feature = "noq",
354		feature = "quinn",
355		feature = "quiche",
356		feature = "iroh",
357		feature = "websocket"
358	))]
359	pub async fn connect(&self, url: Url) -> anyhow::Result<moq_net::Session> {
360		let session = self.connect_inner(url).await?;
361		tracing::info!(version = %session.version(), "connected");
362		Ok(session)
363	}
364
365	#[cfg(any(
366		feature = "noq",
367		feature = "quinn",
368		feature = "quiche",
369		feature = "iroh",
370		feature = "websocket"
371	))]
372	async fn connect_inner(&self, url: Url) -> anyhow::Result<moq_net::Session> {
373		#[cfg(feature = "iroh")]
374		if url.scheme() == "iroh" {
375			let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
376			let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
377			let session = self.moq.connect(session).await?;
378			return Ok(session);
379		}
380
381		#[cfg(feature = "noq")]
382		if let Some(noq) = self.noq.as_ref() {
383			let tls = self.tls.clone();
384			let quic_url = url.clone();
385			let quic_handle = async {
386				let res = noq.connect(&tls, quic_url).await;
387				if let Err(err) = &res {
388					tracing::warn!(%err, "QUIC connection failed");
389				}
390				res
391			};
392
393			#[cfg(feature = "websocket")]
394			{
395				let alpns = self.versions.alpns();
396				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
397
398				return Ok(tokio::select! {
399					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
400					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
401					else => anyhow::bail!("failed to connect to server"),
402				});
403			}
404
405			#[cfg(not(feature = "websocket"))]
406			{
407				let session = quic_handle.await?;
408				return Ok(self.moq.connect(session).await?);
409			}
410		}
411
412		#[cfg(feature = "quinn")]
413		if let Some(quinn) = self.quinn.as_ref() {
414			let tls = self.tls.clone();
415			let quic_url = url.clone();
416			let quic_handle = async {
417				let res = quinn.connect(&tls, quic_url).await;
418				if let Err(err) = &res {
419					tracing::warn!(%err, "QUIC connection failed");
420				}
421				res
422			};
423
424			#[cfg(feature = "websocket")]
425			{
426				let alpns = self.versions.alpns();
427				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
428
429				return Ok(tokio::select! {
430					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
431					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
432					else => anyhow::bail!("failed to connect to server"),
433				});
434			}
435
436			#[cfg(not(feature = "websocket"))]
437			{
438				let session = quic_handle.await?;
439				return Ok(self.moq.connect(session).await?);
440			}
441		}
442
443		#[cfg(feature = "quiche")]
444		if let Some(quiche) = self.quiche.as_ref() {
445			let quic_url = url.clone();
446			let quic_handle = async {
447				let res = quiche.connect(quic_url).await;
448				if let Err(err) = &res {
449					tracing::warn!(%err, "QUIC connection failed");
450				}
451				res
452			};
453
454			#[cfg(feature = "websocket")]
455			{
456				let alpns = self.versions.alpns();
457				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
458
459				return Ok(tokio::select! {
460					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
461					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
462					else => anyhow::bail!("failed to connect to server"),
463				});
464			}
465
466			#[cfg(not(feature = "websocket"))]
467			{
468				let session = quic_handle.await?;
469				return Ok(self.moq.connect(session).await?);
470			}
471		}
472
473		#[cfg(feature = "websocket")]
474		{
475			let alpns = self.versions.alpns();
476			let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
477			return Ok(self.moq.connect(session).await?);
478		}
479
480		#[cfg(not(feature = "websocket"))]
481		anyhow::bail!("no QUIC backend matched; this should not happen");
482	}
483}
484
485use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
486
487#[derive(Debug)]
488struct NoCertificateVerification(crypto::Provider);
489
490impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
491	fn verify_server_cert(
492		&self,
493		_end_entity: &CertificateDer<'_>,
494		_intermediates: &[CertificateDer<'_>],
495		_server_name: &ServerName<'_>,
496		_ocsp: &[u8],
497		_now: UnixTime,
498	) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
499		Ok(rustls::client::danger::ServerCertVerified::assertion())
500	}
501
502	fn verify_tls12_signature(
503		&self,
504		message: &[u8],
505		cert: &CertificateDer<'_>,
506		dss: &rustls::DigitallySignedStruct,
507	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
508		rustls::crypto::verify_tls12_signature(message, cert, dss, &self.0.signature_verification_algorithms)
509	}
510
511	fn verify_tls13_signature(
512		&self,
513		message: &[u8],
514		cert: &CertificateDer<'_>,
515		dss: &rustls::DigitallySignedStruct,
516	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
517		rustls::crypto::verify_tls13_signature(message, cert, dss, &self.0.signature_verification_algorithms)
518	}
519
520	fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
521		self.0.signature_verification_algorithms.supported_schemes()
522	}
523}
524
525#[cfg(test)]
526mod tests {
527	use super::*;
528	use clap::Parser;
529
530	#[test]
531	fn test_toml_disable_verify_survives_update_from() {
532		let toml = r#"
533			tls.disable_verify = true
534		"#;
535
536		let mut config: ClientConfig = toml::from_str(toml).unwrap();
537		assert_eq!(config.tls.disable_verify, Some(true));
538
539		// Simulate: TOML loaded, then CLI args re-applied (no --tls-disable-verify flag).
540		config.update_from(["test"]);
541		assert_eq!(config.tls.disable_verify, Some(true));
542	}
543
544	#[test]
545	fn test_cli_disable_verify_flag() {
546		let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
547		assert_eq!(config.tls.disable_verify, Some(true));
548	}
549
550	#[test]
551	fn test_cli_disable_verify_explicit_false() {
552		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
553		assert_eq!(config.tls.disable_verify, Some(false));
554	}
555
556	#[test]
557	fn test_cli_disable_verify_explicit_true() {
558		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
559		assert_eq!(config.tls.disable_verify, Some(true));
560	}
561
562	#[test]
563	fn test_cli_no_disable_verify() {
564		let config = ClientConfig::parse_from(["test"]);
565		assert_eq!(config.tls.disable_verify, None);
566	}
567
568	#[test]
569	fn test_toml_version_survives_update_from() {
570		let toml = r#"
571			version = ["moq-lite-02"]
572		"#;
573
574		let mut config: ClientConfig = toml::from_str(toml).unwrap();
575		assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
576
577		// Simulate: TOML loaded, then CLI args re-applied (no --client-version flag).
578		config.update_from(["test"]);
579		assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
580	}
581
582	#[test]
583	fn test_cli_version() {
584		let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
585		assert_eq!(config.version, vec!["moq-lite-03".parse::<moq_net::Version>().unwrap()]);
586	}
587
588	#[test]
589	fn test_cli_no_version_defaults_to_all() {
590		let config = ClientConfig::parse_from(["test"]);
591		assert!(config.version.is_empty());
592		// versions() helper returns all when none specified
593		assert_eq!(config.versions().alpns().len(), moq_net::ALPNS.len());
594	}
595}