Skip to main content

moq_native/
client.rs

1use crate::QuicBackend;
2use crate::crypto;
3use anyhow::Context;
4use std::path::PathBuf;
5use std::{net, sync::Arc};
6use url::Url;
7
8/// TLS configuration for the client.
9#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
10#[serde(default, deny_unknown_fields)]
11#[non_exhaustive]
12pub struct ClientTls {
13	/// Use the TLS root at this path, encoded as PEM.
14	///
15	/// This value can be provided multiple times for multiple roots.
16	/// If this is empty, system roots will be used instead
17	#[serde(skip_serializing_if = "Vec::is_empty")]
18	#[arg(id = "tls-root", long = "tls-root", env = "MOQ_CLIENT_TLS_ROOT")]
19	pub root: Vec<PathBuf>,
20
21	/// Danger: Disable TLS certificate verification.
22	///
23	/// Fine for local development and between relays, but should be used in caution in production.
24	#[serde(skip_serializing_if = "Option::is_none")]
25	#[arg(
26		id = "tls-disable-verify",
27		long = "tls-disable-verify",
28		env = "MOQ_CLIENT_TLS_DISABLE_VERIFY",
29		default_missing_value = "true",
30		num_args = 0..=1,
31		require_equals = true,
32		value_parser = clap::value_parser!(bool),
33	)]
34	pub disable_verify: Option<bool>,
35}
36
37/// Configuration for the MoQ client.
38#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
39#[serde(deny_unknown_fields, default)]
40#[non_exhaustive]
41pub struct ClientConfig {
42	/// Listen for UDP packets on the given address.
43	#[arg(
44		id = "client-bind",
45		long = "client-bind",
46		default_value = "[::]:0",
47		env = "MOQ_CLIENT_BIND"
48	)]
49	pub bind: net::SocketAddr,
50
51	/// The QUIC backend to use.
52	/// Auto-detected from compiled features if not specified.
53	#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
54	pub backend: Option<QuicBackend>,
55
56	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
57	#[serde(skip_serializing_if = "Option::is_none")]
58	#[arg(
59		id = "client-max-streams",
60		long = "client-max-streams",
61		env = "MOQ_CLIENT_MAX_STREAMS"
62	)]
63	pub max_streams: Option<u64>,
64
65	/// Restrict the client to specific MoQ protocol version(s).
66	///
67	/// By default, the client offers all supported versions and lets the server choose.
68	/// Use this to force a specific version, e.g. `--client-version moq-lite-02`.
69	/// Can be specified multiple times to offer a subset of versions.
70	///
71	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16, moq-transport-17
72	#[serde(default, skip_serializing_if = "Vec::is_empty")]
73	#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
74	pub version: Vec<moq_lite::Version>,
75
76	#[command(flatten)]
77	#[serde(default)]
78	pub tls: ClientTls,
79
80	#[cfg(feature = "websocket")]
81	#[command(flatten)]
82	#[serde(default)]
83	pub websocket: super::ClientWebSocket,
84}
85
86impl ClientConfig {
87	pub fn init(self) -> anyhow::Result<Client> {
88		Client::new(self)
89	}
90
91	/// Returns the configured versions, defaulting to all if none specified.
92	pub fn versions(&self) -> moq_lite::Versions {
93		if self.version.is_empty() {
94			moq_lite::Versions::all()
95		} else {
96			moq_lite::Versions::from(self.version.clone())
97		}
98	}
99}
100
101impl Default for ClientConfig {
102	fn default() -> Self {
103		Self {
104			bind: "[::]:0".parse().unwrap(),
105			backend: None,
106			max_streams: None,
107			version: Vec::new(),
108			tls: ClientTls::default(),
109			#[cfg(feature = "websocket")]
110			websocket: super::ClientWebSocket::default(),
111		}
112	}
113}
114
115/// Client for establishing MoQ connections over QUIC, WebTransport, or WebSocket.
116///
117/// Create via [`ClientConfig::init`] or [`Client::new`].
118#[derive(Clone)]
119pub struct Client {
120	moq: moq_lite::Client,
121	versions: moq_lite::Versions,
122	#[cfg(feature = "websocket")]
123	websocket: super::ClientWebSocket,
124	tls: rustls::ClientConfig,
125	#[cfg(feature = "noq")]
126	noq: Option<crate::noq::NoqClient>,
127	#[cfg(feature = "quinn")]
128	quinn: Option<crate::quinn::QuinnClient>,
129	#[cfg(feature = "quiche")]
130	quiche: Option<crate::quiche::QuicheClient>,
131	#[cfg(feature = "iroh")]
132	iroh: Option<web_transport_iroh::iroh::Endpoint>,
133	#[cfg(feature = "iroh")]
134	iroh_addrs: Vec<std::net::SocketAddr>,
135}
136
137impl Client {
138	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
139	pub fn new(_config: ClientConfig) -> anyhow::Result<Self> {
140		anyhow::bail!("no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature");
141	}
142
143	/// Create a new client
144	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
145	pub fn new(config: ClientConfig) -> anyhow::Result<Self> {
146		#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
147		let backend = config.backend.clone().unwrap_or({
148			#[cfg(feature = "quinn")]
149			{
150				QuicBackend::Quinn
151			}
152			#[cfg(all(feature = "noq", not(feature = "quinn")))]
153			{
154				QuicBackend::Noq
155			}
156			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
157			{
158				QuicBackend::Quiche
159			}
160			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
161			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
162		});
163
164		let provider = crypto::provider();
165
166		// Create a list of acceptable root certificates.
167		let mut roots = rustls::RootCertStore::empty();
168
169		if config.tls.root.is_empty() {
170			let native = rustls_native_certs::load_native_certs();
171
172			// Log any errors that occurred while loading the native root certificates.
173			for err in native.errors {
174				tracing::warn!(%err, "failed to load root cert");
175			}
176
177			// Add the platform's native root certificates.
178			for cert in native.certs {
179				roots.add(cert).context("failed to add root cert")?;
180			}
181		} else {
182			// Add the specified root certificates.
183			for root in &config.tls.root {
184				let root = std::fs::File::open(root).context("failed to open root cert file")?;
185				let mut root = std::io::BufReader::new(root);
186
187				let root = rustls_pemfile::certs(&mut root)
188					.next()
189					.context("no roots found")?
190					.context("failed to read root cert")?;
191
192				roots.add(root).context("failed to add root cert")?;
193			}
194		}
195
196		// Create the TLS configuration we'll use as a client.
197		// Allow TLS 1.2 in addition to 1.3 for WebSocket compatibility.
198		// QUIC always negotiates TLS 1.3 regardless of this setting.
199		let mut tls = rustls::ClientConfig::builder_with_provider(provider.clone())
200			.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
201			.with_root_certificates(roots)
202			.with_no_client_auth();
203
204		// Allow disabling TLS verification altogether.
205		if config.tls.disable_verify.unwrap_or_default() {
206			tracing::warn!("TLS server certificate verification is disabled; A man-in-the-middle attack is possible.");
207
208			let noop = NoCertificateVerification(provider.clone());
209			tls.dangerous().set_certificate_verifier(Arc::new(noop));
210		}
211
212		#[cfg(feature = "noq")]
213		#[allow(unreachable_patterns)]
214		let noq = match backend {
215			QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
216			_ => None,
217		};
218
219		#[cfg(feature = "quinn")]
220		#[allow(unreachable_patterns)]
221		let quinn = match backend {
222			QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
223			_ => None,
224		};
225
226		#[cfg(feature = "quiche")]
227		let quiche = match backend {
228			QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
229			_ => None,
230		};
231
232		let versions = config.versions();
233		Ok(Self {
234			moq: moq_lite::Client::new().with_versions(versions.clone()),
235			versions,
236			#[cfg(feature = "websocket")]
237			websocket: config.websocket,
238			tls,
239			#[cfg(feature = "noq")]
240			noq,
241			#[cfg(feature = "quinn")]
242			quinn,
243			#[cfg(feature = "quiche")]
244			quiche,
245			#[cfg(feature = "iroh")]
246			iroh: None,
247			#[cfg(feature = "iroh")]
248			iroh_addrs: Vec::new(),
249		})
250	}
251
252	#[cfg(feature = "iroh")]
253	pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
254		self.iroh = iroh;
255		self
256	}
257
258	/// Set direct IP addresses for connecting to iroh peers.
259	///
260	/// This is useful when the peer's IP addresses are known ahead of time,
261	/// bypassing the need for peer discovery (e.g. in tests or local networks).
262	#[cfg(feature = "iroh")]
263	pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
264		self.iroh_addrs = addrs;
265		self
266	}
267
268	pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
269		self.moq = self.moq.with_publish(publish);
270		self
271	}
272
273	pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
274		self.moq = self.moq.with_consume(consume);
275		self
276	}
277
278	#[cfg(not(any(
279		feature = "noq",
280		feature = "quinn",
281		feature = "quiche",
282		feature = "iroh",
283		feature = "websocket"
284	)))]
285	pub async fn connect(&self, _url: Url) -> anyhow::Result<moq_lite::Session> {
286		anyhow::bail!("no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature");
287	}
288
289	#[cfg(any(
290		feature = "noq",
291		feature = "quinn",
292		feature = "quiche",
293		feature = "iroh",
294		feature = "websocket"
295	))]
296	pub async fn connect(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
297		let session = self.connect_inner(url).await?;
298		tracing::info!(version = %session.version(), "connected");
299		Ok(session)
300	}
301
302	#[cfg(any(
303		feature = "noq",
304		feature = "quinn",
305		feature = "quiche",
306		feature = "iroh",
307		feature = "websocket"
308	))]
309	async fn connect_inner(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
310		#[cfg(feature = "iroh")]
311		if url.scheme() == "iroh" {
312			let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
313			let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
314			let session = self.moq.connect(session).await?;
315			return Ok(session);
316		}
317
318		#[cfg(feature = "noq")]
319		if let Some(noq) = self.noq.as_ref() {
320			let tls = self.tls.clone();
321			let quic_url = url.clone();
322			let quic_handle = async {
323				let res = noq.connect(&tls, quic_url).await;
324				if let Err(err) = &res {
325					tracing::warn!(%err, "QUIC connection failed");
326				}
327				res
328			};
329
330			#[cfg(feature = "websocket")]
331			{
332				let alpns = self.versions.alpns();
333				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
334
335				return Ok(tokio::select! {
336					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
337					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
338					else => anyhow::bail!("failed to connect to server"),
339				});
340			}
341
342			#[cfg(not(feature = "websocket"))]
343			{
344				let session = quic_handle.await?;
345				return Ok(self.moq.connect(session).await?);
346			}
347		}
348
349		#[cfg(feature = "quinn")]
350		if let Some(quinn) = self.quinn.as_ref() {
351			let tls = self.tls.clone();
352			let quic_url = url.clone();
353			let quic_handle = async {
354				let res = quinn.connect(&tls, quic_url).await;
355				if let Err(err) = &res {
356					tracing::warn!(%err, "QUIC connection failed");
357				}
358				res
359			};
360
361			#[cfg(feature = "websocket")]
362			{
363				let alpns = self.versions.alpns();
364				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
365
366				return Ok(tokio::select! {
367					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
368					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
369					else => anyhow::bail!("failed to connect to server"),
370				});
371			}
372
373			#[cfg(not(feature = "websocket"))]
374			{
375				let session = quic_handle.await?;
376				return Ok(self.moq.connect(session).await?);
377			}
378		}
379
380		#[cfg(feature = "quiche")]
381		if let Some(quiche) = self.quiche.as_ref() {
382			let quic_url = url.clone();
383			let quic_handle = async {
384				let res = quiche.connect(quic_url).await;
385				if let Err(err) = &res {
386					tracing::warn!(%err, "QUIC connection failed");
387				}
388				res
389			};
390
391			#[cfg(feature = "websocket")]
392			{
393				let alpns = self.versions.alpns();
394				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
395
396				return Ok(tokio::select! {
397					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
398					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
399					else => anyhow::bail!("failed to connect to server"),
400				});
401			}
402
403			#[cfg(not(feature = "websocket"))]
404			{
405				let session = quic_handle.await?;
406				return Ok(self.moq.connect(session).await?);
407			}
408		}
409
410		#[cfg(feature = "websocket")]
411		{
412			let alpns = self.versions.alpns();
413			let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
414			return Ok(self.moq.connect(session).await?);
415		}
416
417		#[cfg(not(feature = "websocket"))]
418		anyhow::bail!("no QUIC backend matched; this should not happen");
419	}
420}
421
422use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
423
424#[derive(Debug)]
425struct NoCertificateVerification(crypto::Provider);
426
427impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
428	fn verify_server_cert(
429		&self,
430		_end_entity: &CertificateDer<'_>,
431		_intermediates: &[CertificateDer<'_>],
432		_server_name: &ServerName<'_>,
433		_ocsp: &[u8],
434		_now: UnixTime,
435	) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
436		Ok(rustls::client::danger::ServerCertVerified::assertion())
437	}
438
439	fn verify_tls12_signature(
440		&self,
441		message: &[u8],
442		cert: &CertificateDer<'_>,
443		dss: &rustls::DigitallySignedStruct,
444	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
445		rustls::crypto::verify_tls12_signature(message, cert, dss, &self.0.signature_verification_algorithms)
446	}
447
448	fn verify_tls13_signature(
449		&self,
450		message: &[u8],
451		cert: &CertificateDer<'_>,
452		dss: &rustls::DigitallySignedStruct,
453	) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
454		rustls::crypto::verify_tls13_signature(message, cert, dss, &self.0.signature_verification_algorithms)
455	}
456
457	fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
458		self.0.signature_verification_algorithms.supported_schemes()
459	}
460}
461
462#[cfg(test)]
463mod tests {
464	use super::*;
465	use clap::Parser;
466
467	#[test]
468	fn test_toml_disable_verify_survives_update_from() {
469		let toml = r#"
470			tls.disable_verify = true
471		"#;
472
473		let mut config: ClientConfig = toml::from_str(toml).unwrap();
474		assert_eq!(config.tls.disable_verify, Some(true));
475
476		// Simulate: TOML loaded, then CLI args re-applied (no --tls-disable-verify flag).
477		config.update_from(["test"]);
478		assert_eq!(config.tls.disable_verify, Some(true));
479	}
480
481	#[test]
482	fn test_cli_disable_verify_flag() {
483		let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
484		assert_eq!(config.tls.disable_verify, Some(true));
485	}
486
487	#[test]
488	fn test_cli_disable_verify_explicit_false() {
489		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
490		assert_eq!(config.tls.disable_verify, Some(false));
491	}
492
493	#[test]
494	fn test_cli_disable_verify_explicit_true() {
495		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
496		assert_eq!(config.tls.disable_verify, Some(true));
497	}
498
499	#[test]
500	fn test_cli_no_disable_verify() {
501		let config = ClientConfig::parse_from(["test"]);
502		assert_eq!(config.tls.disable_verify, None);
503	}
504
505	#[test]
506	fn test_toml_version_survives_update_from() {
507		let toml = r#"
508			version = ["moq-lite-02"]
509		"#;
510
511		let mut config: ClientConfig = toml::from_str(toml).unwrap();
512		assert_eq!(
513			config.version,
514			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
515		);
516
517		// Simulate: TOML loaded, then CLI args re-applied (no --client-version flag).
518		config.update_from(["test"]);
519		assert_eq!(
520			config.version,
521			vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
522		);
523	}
524
525	#[test]
526	fn test_cli_version() {
527		let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
528		assert_eq!(
529			config.version,
530			vec!["moq-lite-03".parse::<moq_lite::Version>().unwrap()]
531		);
532	}
533
534	#[test]
535	fn test_cli_no_version_defaults_to_all() {
536		let config = ClientConfig::parse_from(["test"]);
537		assert!(config.version.is_empty());
538		// versions() helper returns all when none specified
539		assert_eq!(config.versions().alpns().len(), moq_lite::ALPNS.len());
540	}
541}