Skip to main content

moq_native/
client.rs

1use crate::{Backoff, Error, QuicBackend, Reconnect};
2#[cfg(feature = "websocket")]
3use std::future::Future;
4use std::net;
5use url::Url;
6
7/// Configuration for the MoQ client.
8#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
9#[serde(deny_unknown_fields, default)]
10#[non_exhaustive]
11pub struct ClientConfig {
12	/// Listen for UDP packets on the given address.
13	#[arg(
14		id = "client-bind",
15		long = "client-bind",
16		default_value = "[::]:0",
17		env = "MOQ_CLIENT_BIND"
18	)]
19	pub bind: net::SocketAddr,
20
21	/// The QUIC backend to use.
22	/// Auto-detected from compiled features if not specified.
23	#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
24	pub backend: Option<QuicBackend>,
25
26	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
27	#[serde(skip_serializing_if = "Option::is_none")]
28	#[arg(
29		id = "client-max-streams",
30		long = "client-max-streams",
31		env = "MOQ_CLIENT_MAX_STREAMS"
32	)]
33	pub max_streams: Option<u64>,
34
35	/// Restrict the client to specific MoQ protocol version(s).
36	///
37	/// By default, the client offers all supported versions and lets the server choose.
38	/// Use this to force a specific version, e.g. `--client-version moq-lite-02`.
39	/// Can be specified multiple times to offer a subset of versions.
40	///
41	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16, moq-transport-17
42	#[serde(default, skip_serializing_if = "Vec::is_empty")]
43	#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
44	pub version: Vec<moq_net::Version>,
45
46	#[command(flatten)]
47	#[serde(default)]
48	pub tls: crate::tls::Client,
49
50	#[command(flatten)]
51	#[serde(default)]
52	pub backoff: Backoff,
53
54	#[cfg(feature = "websocket")]
55	#[command(flatten)]
56	#[serde(default)]
57	pub websocket: crate::websocket::Client,
58}
59
60impl ClientConfig {
61	pub fn init(self) -> crate::Result<Client> {
62		Client::new(self)
63	}
64
65	/// Returns the configured versions, defaulting to all if none specified.
66	pub fn versions(&self) -> moq_net::Versions {
67		if self.version.is_empty() {
68			moq_net::Versions::all()
69		} else {
70			moq_net::Versions::from(self.version.clone())
71		}
72	}
73}
74
75impl Default for ClientConfig {
76	fn default() -> Self {
77		Self {
78			bind: "[::]:0".parse().unwrap(),
79			backend: None,
80			max_streams: None,
81			version: Vec::new(),
82			tls: crate::tls::Client::default(),
83			backoff: Backoff::default(),
84			#[cfg(feature = "websocket")]
85			websocket: crate::websocket::Client::default(),
86		}
87	}
88}
89
90/// Client for establishing MoQ connections over QUIC, WebTransport, or WebSocket.
91///
92/// Create via [`ClientConfig::init`] or [`Client::new`].
93#[derive(Clone)]
94pub struct Client {
95	moq: moq_net::Client,
96	versions: moq_net::Versions,
97	backoff: Backoff,
98	#[cfg(feature = "websocket")]
99	websocket: crate::websocket::Client,
100	tls: rustls::ClientConfig,
101	#[cfg(feature = "noq")]
102	noq: Option<crate::noq::NoqClient>,
103	#[cfg(feature = "quinn")]
104	quinn: Option<crate::quinn::QuinnClient>,
105	#[cfg(feature = "quiche")]
106	quiche: Option<crate::quiche::QuicheClient>,
107	#[cfg(feature = "iroh")]
108	iroh: Option<web_transport_iroh::iroh::Endpoint>,
109	#[cfg(feature = "iroh")]
110	iroh_addrs: Vec<std::net::SocketAddr>,
111}
112
113impl Client {
114	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
115	pub fn new(_config: ClientConfig) -> crate::Result<Self> {
116		Err(Error::NoBackend(
117			"no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature",
118		))
119	}
120
121	/// Create a new client
122	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
123	pub fn new(config: ClientConfig) -> crate::Result<Self> {
124		#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
125		let backend = config.backend.clone().unwrap_or({
126			#[cfg(feature = "quinn")]
127			{
128				QuicBackend::Quinn
129			}
130			#[cfg(all(feature = "noq", not(feature = "quinn")))]
131			{
132				QuicBackend::Noq
133			}
134			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
135			{
136				QuicBackend::Quiche
137			}
138			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
139			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
140		});
141
142		let tls = config.tls.build()?;
143
144		#[cfg(feature = "noq")]
145		#[allow(unreachable_patterns)]
146		let noq = match backend {
147			QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
148			_ => None,
149		};
150
151		#[cfg(feature = "quinn")]
152		#[allow(unreachable_patterns)]
153		let quinn = match backend {
154			QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
155			_ => None,
156		};
157
158		#[cfg(feature = "quiche")]
159		let quiche = match backend {
160			QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
161			_ => None,
162		};
163
164		let versions = config.versions();
165		Ok(Self {
166			moq: moq_net::Client::new().with_versions(versions.clone()),
167			versions,
168			backoff: config.backoff,
169			#[cfg(feature = "websocket")]
170			websocket: config.websocket,
171			tls,
172			#[cfg(feature = "noq")]
173			noq,
174			#[cfg(feature = "quinn")]
175			quinn,
176			#[cfg(feature = "quiche")]
177			quiche,
178			#[cfg(feature = "iroh")]
179			iroh: None,
180			#[cfg(feature = "iroh")]
181			iroh_addrs: Vec::new(),
182		})
183	}
184
185	#[cfg(feature = "iroh")]
186	pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
187		self.iroh = iroh;
188		self
189	}
190
191	/// Set direct IP addresses for connecting to iroh peers.
192	///
193	/// This is useful when the peer's IP addresses are known ahead of time,
194	/// bypassing the need for peer discovery (e.g. in tests or local networks).
195	#[cfg(feature = "iroh")]
196	pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
197		self.iroh_addrs = addrs;
198		self
199	}
200
201	pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
202		self.moq = self.moq.with_publish(publish);
203		self
204	}
205
206	pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
207		self.moq = self.moq.with_consume(consume);
208		self
209	}
210
211	/// Attach a tier-scoped [`moq_net::StatsHandle`] to all sessions opened by this client.
212	pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
213		self.moq = self.moq.with_stats(stats);
214		self
215	}
216
217	/// Start a background reconnect loop that connects to the given URL,
218	/// waits for the session to close, then reconnects with exponential backoff.
219	///
220	/// Returns a [`Reconnect`] handle; drop the last handle to stop the loop.
221	pub fn reconnect(&self, url: Url) -> Reconnect {
222		Reconnect::new(self.clone(), url, self.backoff.clone())
223	}
224
225	#[cfg(not(any(
226		feature = "noq",
227		feature = "quinn",
228		feature = "quiche",
229		feature = "iroh",
230		feature = "websocket"
231	)))]
232	pub async fn connect(&self, _url: Url) -> crate::Result<moq_net::Session> {
233		Err(Error::NoBackend(
234			"no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature",
235		))
236	}
237
238	#[cfg(any(
239		feature = "noq",
240		feature = "quinn",
241		feature = "quiche",
242		feature = "iroh",
243		feature = "websocket"
244	))]
245	pub async fn connect(&self, url: Url) -> crate::Result<moq_net::Session> {
246		let session = self.connect_inner(url).await?;
247		tracing::info!(version = %session.version(), "connected");
248		Ok(session)
249	}
250
251	#[cfg(any(
252		feature = "noq",
253		feature = "quinn",
254		feature = "quiche",
255		feature = "iroh",
256		feature = "websocket"
257	))]
258	async fn connect_inner(&self, url: Url) -> crate::Result<moq_net::Session> {
259		#[cfg(feature = "iroh")]
260		if url.scheme() == "iroh" {
261			let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?;
262			let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
263			let session = self.moq.connect(session).await?;
264			return Ok(session);
265		}
266
267		#[cfg(feature = "noq")]
268		if let Some(noq) = self.noq.as_ref() {
269			let tls = self.tls.clone();
270			let quic_url = url.clone();
271			let quic_handle = async { noq.connect(&tls, quic_url).await.map_err(Error::from) };
272
273			#[cfg(feature = "websocket")]
274			{
275				return self.race_moq_connect(url, quic_handle).await;
276			}
277
278			#[cfg(not(feature = "websocket"))]
279			{
280				let session = quic_handle.await?;
281				return Ok(self.moq.connect(session).await?);
282			}
283		}
284
285		#[cfg(feature = "quinn")]
286		if let Some(quinn) = self.quinn.as_ref() {
287			let tls = self.tls.clone();
288			let quic_url = url.clone();
289			let quic_handle = async { quinn.connect(&tls, quic_url).await.map_err(Error::from) };
290
291			#[cfg(feature = "websocket")]
292			{
293				return self.race_moq_connect(url, quic_handle).await;
294			}
295
296			#[cfg(not(feature = "websocket"))]
297			{
298				let session = quic_handle.await?;
299				return Ok(self.moq.connect(session).await?);
300			}
301		}
302
303		#[cfg(feature = "quiche")]
304		if let Some(quiche) = self.quiche.as_ref() {
305			let quic_url = url.clone();
306			let quic_handle = async { quiche.connect(quic_url).await.map_err(Error::from) };
307
308			#[cfg(feature = "websocket")]
309			{
310				return self.race_moq_connect(url, quic_handle).await;
311			}
312
313			#[cfg(not(feature = "websocket"))]
314			{
315				let session = quic_handle.await?;
316				return Ok(self.moq.connect(session).await?);
317			}
318		}
319
320		#[cfg(feature = "websocket")]
321		{
322			let alpns = self.versions.alpns();
323			let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
324			return Ok(self.moq.connect(session).await?);
325		}
326
327		#[cfg(not(feature = "websocket"))]
328		return Err(Error::NoBackend("no QUIC backend matched; this should not happen"));
329	}
330
331	#[cfg(feature = "websocket")]
332	async fn race_moq_connect<Q, S>(&self, url: Url, quic: Q) -> crate::Result<moq_net::Session>
333	where
334		Q: Future<Output = crate::Result<S>>,
335		S: web_transport_trait::Session,
336	{
337		let alpns = self.versions.alpns();
338		let ws_config = self.websocket.clone();
339		let ws_tls = self.tls.clone();
340		let websocket = async move {
341			crate::websocket::race_handle(&ws_config, &ws_tls, url, &alpns)
342				.await
343				.map(|res| res.map_err(Error::from))
344		};
345
346		match race_transport_connect(quic, websocket).await? {
347			TransportRace::Quic(quic) => Ok(self.moq.connect(quic).await?),
348			TransportRace::WebSocket(websocket) => Ok(self.moq.connect(websocket).await?),
349		}
350	}
351}
352
353#[cfg(feature = "websocket")]
354#[derive(Debug, PartialEq, Eq)]
355enum TransportRace<Q, W> {
356	Quic(Q),
357	WebSocket(W),
358}
359
360#[cfg(feature = "websocket")]
361async fn race_transport_connect<Q, W, QT, WT>(quic: Q, websocket: W) -> crate::Result<TransportRace<QT, WT>>
362where
363	Q: Future<Output = crate::Result<QT>>,
364	W: Future<Output = Option<crate::Result<WT>>>,
365{
366	tokio::pin!(quic);
367	tokio::pin!(websocket);
368
369	let mut quic_err = None;
370	let mut websocket_err = None;
371	let mut quic_done = false;
372	let mut websocket_done = false;
373
374	loop {
375		tokio::select! {
376			res = &mut quic, if !quic_done => {
377				match res {
378					Ok(session) => return Ok(TransportRace::Quic(session)),
379					Err(err) if err.is_auth() => return Err(err),
380					Err(err) => {
381						tracing::warn!(%err, "QUIC connection failed");
382						quic_err = Some(err);
383						quic_done = true;
384					}
385				}
386			}
387			res = &mut websocket, if !websocket_done => {
388				match res {
389					Some(Ok(session)) => return Ok(TransportRace::WebSocket(session)),
390					Some(Err(err)) if err.is_auth() => return Err(err),
391					Some(Err(err)) => {
392						tracing::warn!(%err, "WebSocket connection failed");
393						websocket_err = Some(err);
394						websocket_done = true;
395					}
396					None => {
397						websocket_done = true;
398					}
399				}
400			}
401			else => break,
402		}
403
404		if quic_done && websocket_done {
405			break;
406		}
407	}
408
409	match (quic_err, websocket_err) {
410		(Some(quic), Some(websocket)) => Err(Error::TransportRace {
411			quic: std::sync::Arc::new(quic),
412			websocket: std::sync::Arc::new(websocket),
413		}),
414		(Some(err), None) | (None, Some(err)) => Err(err),
415		(None, None) => Err(Error::ConnectFailed),
416	}
417}
418
419#[cfg(test)]
420mod tests {
421	use super::*;
422	use clap::Parser;
423
424	#[test]
425	fn test_toml_disable_verify_survives_update_from() {
426		let toml = r#"
427			tls.disable_verify = true
428		"#;
429
430		let mut config: ClientConfig = toml::from_str(toml).unwrap();
431		assert_eq!(config.tls.disable_verify, Some(true));
432
433		// Simulate: TOML loaded, then CLI args re-applied (no --tls-disable-verify flag).
434		config.update_from(["test"]);
435		assert_eq!(config.tls.disable_verify, Some(true));
436	}
437
438	#[test]
439	fn test_cli_disable_verify_flag() {
440		let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
441		assert_eq!(config.tls.disable_verify, Some(true));
442	}
443
444	#[test]
445	fn test_cli_disable_verify_explicit_false() {
446		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
447		assert_eq!(config.tls.disable_verify, Some(false));
448	}
449
450	#[test]
451	fn test_cli_disable_verify_explicit_true() {
452		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
453		assert_eq!(config.tls.disable_verify, Some(true));
454	}
455
456	#[test]
457	fn test_cli_no_disable_verify() {
458		let config = ClientConfig::parse_from(["test"]);
459		assert_eq!(config.tls.disable_verify, None);
460	}
461
462	#[test]
463	fn test_toml_fingerprint_survives_update_from() {
464		let toml = r#"
465			tls.fingerprint = ["abcd1234", "ef567890"]
466		"#;
467
468		let mut config: ClientConfig = toml::from_str(toml).unwrap();
469		assert_eq!(config.tls.fingerprint, vec!["abcd1234", "ef567890"]);
470
471		// Simulate: TOML loaded, then CLI args re-applied (no --tls-fingerprint flag).
472		config.update_from(["test"]);
473		assert_eq!(config.tls.fingerprint, vec!["abcd1234", "ef567890"]);
474	}
475
476	#[test]
477	fn test_toml_fingerprint_accepts_single_string() {
478		let toml = r#"
479			tls.fingerprint = "abcd1234"
480		"#;
481
482		let config: ClientConfig = toml::from_str(toml).unwrap();
483		assert_eq!(config.tls.fingerprint, vec!["abcd1234"]);
484	}
485
486	#[test]
487	fn test_cli_fingerprint() {
488		let config = ClientConfig::parse_from(["test", "--tls-fingerprint", "abcd1234"]);
489		assert_eq!(config.tls.fingerprint, vec!["abcd1234"]);
490	}
491
492	#[test]
493	fn test_toml_version_survives_update_from() {
494		let toml = r#"
495			version = ["moq-lite-02"]
496		"#;
497
498		let mut config: ClientConfig = toml::from_str(toml).unwrap();
499		assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
500
501		// Simulate: TOML loaded, then CLI args re-applied (no --client-version flag).
502		config.update_from(["test"]);
503		assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
504	}
505
506	#[test]
507	fn test_cli_version() {
508		let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
509		assert_eq!(config.version, vec!["moq-lite-03".parse::<moq_net::Version>().unwrap()]);
510	}
511
512	#[test]
513	fn test_cli_no_version_defaults_to_all() {
514		let config = ClientConfig::parse_from(["test"]);
515		assert!(config.version.is_empty());
516		// versions() helper returns all when none specified
517		assert_eq!(config.versions().alpns().len(), moq_net::ALPNS.len());
518	}
519
520	#[cfg(feature = "websocket")]
521	#[tokio::test]
522	async fn race_transport_connect_stops_on_quic_auth_error() {
523		let quic = async { Err::<usize, _>(crate::ConnectError::Unauthorized.into()) };
524		let websocket = async {
525			// This only needs to complete later than the immediately ready QUIC auth error.
526			tokio::task::yield_now().await;
527			Some(Ok(1usize))
528		};
529
530		let err = super::race_transport_connect(quic, websocket).await.unwrap_err();
531		assert_eq!(err.connect_error(), Some(crate::ConnectError::Unauthorized));
532	}
533
534	#[cfg(feature = "websocket")]
535	#[tokio::test]
536	async fn race_transport_connect_keeps_websocket_after_quic_non_auth_error() {
537		let quic = async { Err::<usize, _>(Error::ConnectFailed) };
538		let websocket = async { Some(Ok(7usize)) };
539
540		let value = super::race_transport_connect(quic, websocket).await.unwrap();
541		assert_eq!(value, super::TransportRace::WebSocket(7));
542	}
543
544	#[cfg(feature = "websocket")]
545	#[tokio::test]
546	async fn race_transport_connect_returns_when_quic_transport_connects() {
547		let quic = async { Ok("quic") };
548		let websocket = std::future::pending::<Option<crate::Result<&str>>>();
549
550		let value = tokio::time::timeout(
551			std::time::Duration::from_secs(1),
552			super::race_transport_connect(quic, websocket),
553		)
554		.await
555		.expect("race waited for WebSocket after QUIC transport connected")
556		.unwrap();
557		assert_eq!(value, super::TransportRace::Quic("quic"));
558	}
559}