Skip to main content

moq_native/
client.rs

1use crate::{Backoff, Error, QuicBackend, Reconnect};
2use std::net;
3use url::Url;
4
5/// Configuration for the MoQ client.
6#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
7#[serde(deny_unknown_fields, default)]
8#[non_exhaustive]
9pub struct ClientConfig {
10	/// Listen for UDP packets on the given address.
11	#[arg(
12		id = "client-bind",
13		long = "client-bind",
14		default_value = "[::]:0",
15		env = "MOQ_CLIENT_BIND"
16	)]
17	pub bind: net::SocketAddr,
18
19	/// The QUIC backend to use.
20	/// Auto-detected from compiled features if not specified.
21	#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
22	pub backend: Option<QuicBackend>,
23
24	/// Maximum number of concurrent QUIC streams per connection (both bidi and uni).
25	#[serde(skip_serializing_if = "Option::is_none")]
26	#[arg(
27		id = "client-max-streams",
28		long = "client-max-streams",
29		env = "MOQ_CLIENT_MAX_STREAMS"
30	)]
31	pub max_streams: Option<u64>,
32
33	/// Restrict the client to specific MoQ protocol version(s).
34	///
35	/// By default, the client offers all supported versions and lets the server choose.
36	/// Use this to force a specific version, e.g. `--client-version moq-lite-02`.
37	/// Can be specified multiple times to offer a subset of versions.
38	///
39	/// Valid values: moq-lite-01, moq-lite-02, moq-lite-03, moq-transport-14, moq-transport-15, moq-transport-16, moq-transport-17
40	#[serde(default, skip_serializing_if = "Vec::is_empty")]
41	#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
42	pub version: Vec<moq_net::Version>,
43
44	#[command(flatten)]
45	#[serde(default)]
46	pub tls: crate::tls::Client,
47
48	#[command(flatten)]
49	#[serde(default)]
50	pub backoff: Backoff,
51
52	#[cfg(feature = "websocket")]
53	#[command(flatten)]
54	#[serde(default)]
55	pub websocket: crate::websocket::Client,
56}
57
58impl ClientConfig {
59	pub fn init(self) -> crate::Result<Client> {
60		Client::new(self)
61	}
62
63	/// Returns the configured versions, defaulting to all if none specified.
64	pub fn versions(&self) -> moq_net::Versions {
65		if self.version.is_empty() {
66			moq_net::Versions::all()
67		} else {
68			moq_net::Versions::from(self.version.clone())
69		}
70	}
71}
72
73impl Default for ClientConfig {
74	fn default() -> Self {
75		Self {
76			bind: "[::]:0".parse().unwrap(),
77			backend: None,
78			max_streams: None,
79			version: Vec::new(),
80			tls: crate::tls::Client::default(),
81			backoff: Backoff::default(),
82			#[cfg(feature = "websocket")]
83			websocket: crate::websocket::Client::default(),
84		}
85	}
86}
87
88/// Client for establishing MoQ connections over QUIC, WebTransport, or WebSocket.
89///
90/// Create via [`ClientConfig::init`] or [`Client::new`].
91#[derive(Clone)]
92pub struct Client {
93	moq: moq_net::Client,
94	versions: moq_net::Versions,
95	backoff: Backoff,
96	#[cfg(feature = "websocket")]
97	websocket: crate::websocket::Client,
98	tls: rustls::ClientConfig,
99	#[cfg(feature = "noq")]
100	noq: Option<crate::noq::NoqClient>,
101	#[cfg(feature = "quinn")]
102	quinn: Option<crate::quinn::QuinnClient>,
103	#[cfg(feature = "quiche")]
104	quiche: Option<crate::quiche::QuicheClient>,
105	#[cfg(feature = "iroh")]
106	iroh: Option<web_transport_iroh::iroh::Endpoint>,
107	#[cfg(feature = "iroh")]
108	iroh_addrs: Vec<std::net::SocketAddr>,
109}
110
111impl Client {
112	#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
113	pub fn new(_config: ClientConfig) -> crate::Result<Self> {
114		Err(Error::NoBackend(
115			"no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature",
116		))
117	}
118
119	/// Create a new client
120	#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
121	pub fn new(config: ClientConfig) -> crate::Result<Self> {
122		#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
123		let backend = config.backend.clone().unwrap_or({
124			#[cfg(feature = "quinn")]
125			{
126				QuicBackend::Quinn
127			}
128			#[cfg(all(feature = "noq", not(feature = "quinn")))]
129			{
130				QuicBackend::Noq
131			}
132			#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
133			{
134				QuicBackend::Quiche
135			}
136			#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
137			panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
138		});
139
140		let tls = config.tls.build()?;
141
142		#[cfg(feature = "noq")]
143		#[allow(unreachable_patterns)]
144		let noq = match backend {
145			QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
146			_ => None,
147		};
148
149		#[cfg(feature = "quinn")]
150		#[allow(unreachable_patterns)]
151		let quinn = match backend {
152			QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
153			_ => None,
154		};
155
156		#[cfg(feature = "quiche")]
157		let quiche = match backend {
158			QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
159			_ => None,
160		};
161
162		let versions = config.versions();
163		Ok(Self {
164			moq: moq_net::Client::new().with_versions(versions.clone()),
165			versions,
166			backoff: config.backoff,
167			#[cfg(feature = "websocket")]
168			websocket: config.websocket,
169			tls,
170			#[cfg(feature = "noq")]
171			noq,
172			#[cfg(feature = "quinn")]
173			quinn,
174			#[cfg(feature = "quiche")]
175			quiche,
176			#[cfg(feature = "iroh")]
177			iroh: None,
178			#[cfg(feature = "iroh")]
179			iroh_addrs: Vec::new(),
180		})
181	}
182
183	#[cfg(feature = "iroh")]
184	pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
185		self.iroh = iroh;
186		self
187	}
188
189	/// Set direct IP addresses for connecting to iroh peers.
190	///
191	/// This is useful when the peer's IP addresses are known ahead of time,
192	/// bypassing the need for peer discovery (e.g. in tests or local networks).
193	#[cfg(feature = "iroh")]
194	pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
195		self.iroh_addrs = addrs;
196		self
197	}
198
199	pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
200		self.moq = self.moq.with_publish(publish);
201		self
202	}
203
204	pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
205		self.moq = self.moq.with_consume(consume);
206		self
207	}
208
209	/// Attach a tier-scoped [`moq_net::StatsHandle`] to all sessions opened by this client.
210	pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
211		self.moq = self.moq.with_stats(stats);
212		self
213	}
214
215	/// Start a background reconnect loop that connects to the given URL,
216	/// waits for the session to close, then reconnects with exponential backoff.
217	///
218	/// Returns a [`Reconnect`] handle; drop the last handle to stop the loop.
219	pub fn reconnect(&self, url: Url) -> Reconnect {
220		Reconnect::new(self.clone(), url, self.backoff.clone())
221	}
222
223	#[cfg(not(any(
224		feature = "noq",
225		feature = "quinn",
226		feature = "quiche",
227		feature = "iroh",
228		feature = "websocket"
229	)))]
230	pub async fn connect(&self, _url: Url) -> crate::Result<moq_net::Session> {
231		Err(Error::NoBackend(
232			"no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature",
233		))
234	}
235
236	#[cfg(any(
237		feature = "noq",
238		feature = "quinn",
239		feature = "quiche",
240		feature = "iroh",
241		feature = "websocket"
242	))]
243	pub async fn connect(&self, url: Url) -> crate::Result<moq_net::Session> {
244		let session = self.connect_inner(url).await?;
245		tracing::info!(version = %session.version(), "connected");
246		Ok(session)
247	}
248
249	#[cfg(any(
250		feature = "noq",
251		feature = "quinn",
252		feature = "quiche",
253		feature = "iroh",
254		feature = "websocket"
255	))]
256	async fn connect_inner(&self, url: Url) -> crate::Result<moq_net::Session> {
257		#[cfg(feature = "iroh")]
258		if url.scheme() == "iroh" {
259			let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?;
260			let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
261			let session = self.moq.connect(session).await?;
262			return Ok(session);
263		}
264
265		#[cfg(feature = "noq")]
266		if let Some(noq) = self.noq.as_ref() {
267			let tls = self.tls.clone();
268			let quic_url = url.clone();
269			let quic_handle = async {
270				let res = noq.connect(&tls, quic_url).await;
271				if let Err(err) = &res {
272					tracing::warn!(%err, "QUIC connection failed");
273				}
274				res
275			};
276
277			#[cfg(feature = "websocket")]
278			{
279				let alpns = self.versions.alpns();
280				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
281
282				return Ok(tokio::select! {
283					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
284					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
285					else => return Err(Error::ConnectFailed),
286				});
287			}
288
289			#[cfg(not(feature = "websocket"))]
290			{
291				let session = quic_handle.await?;
292				return Ok(self.moq.connect(session).await?);
293			}
294		}
295
296		#[cfg(feature = "quinn")]
297		if let Some(quinn) = self.quinn.as_ref() {
298			let tls = self.tls.clone();
299			let quic_url = url.clone();
300			let quic_handle = async {
301				let res = quinn.connect(&tls, quic_url).await;
302				if let Err(err) = &res {
303					tracing::warn!(%err, "QUIC connection failed");
304				}
305				res
306			};
307
308			#[cfg(feature = "websocket")]
309			{
310				let alpns = self.versions.alpns();
311				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
312
313				return Ok(tokio::select! {
314					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
315					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
316					else => return Err(Error::ConnectFailed),
317				});
318			}
319
320			#[cfg(not(feature = "websocket"))]
321			{
322				let session = quic_handle.await?;
323				return Ok(self.moq.connect(session).await?);
324			}
325		}
326
327		#[cfg(feature = "quiche")]
328		if let Some(quiche) = self.quiche.as_ref() {
329			let quic_url = url.clone();
330			let quic_handle = async {
331				let res = quiche.connect(quic_url).await;
332				if let Err(err) = &res {
333					tracing::warn!(%err, "QUIC connection failed");
334				}
335				res
336			};
337
338			#[cfg(feature = "websocket")]
339			{
340				let alpns = self.versions.alpns();
341				let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
342
343				return Ok(tokio::select! {
344					Ok(quic) = quic_handle => self.moq.connect(quic).await?,
345					Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
346					else => return Err(Error::ConnectFailed),
347				});
348			}
349
350			#[cfg(not(feature = "websocket"))]
351			{
352				let session = quic_handle.await?;
353				return Ok(self.moq.connect(session).await?);
354			}
355		}
356
357		#[cfg(feature = "websocket")]
358		{
359			let alpns = self.versions.alpns();
360			let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
361			return Ok(self.moq.connect(session).await?);
362		}
363
364		#[cfg(not(feature = "websocket"))]
365		return Err(Error::NoBackend("no QUIC backend matched; this should not happen"));
366	}
367}
368
369#[cfg(test)]
370mod tests {
371	use super::*;
372	use clap::Parser;
373
374	#[test]
375	fn test_toml_disable_verify_survives_update_from() {
376		let toml = r#"
377			tls.disable_verify = true
378		"#;
379
380		let mut config: ClientConfig = toml::from_str(toml).unwrap();
381		assert_eq!(config.tls.disable_verify, Some(true));
382
383		// Simulate: TOML loaded, then CLI args re-applied (no --tls-disable-verify flag).
384		config.update_from(["test"]);
385		assert_eq!(config.tls.disable_verify, Some(true));
386	}
387
388	#[test]
389	fn test_cli_disable_verify_flag() {
390		let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
391		assert_eq!(config.tls.disable_verify, Some(true));
392	}
393
394	#[test]
395	fn test_cli_disable_verify_explicit_false() {
396		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
397		assert_eq!(config.tls.disable_verify, Some(false));
398	}
399
400	#[test]
401	fn test_cli_disable_verify_explicit_true() {
402		let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
403		assert_eq!(config.tls.disable_verify, Some(true));
404	}
405
406	#[test]
407	fn test_cli_no_disable_verify() {
408		let config = ClientConfig::parse_from(["test"]);
409		assert_eq!(config.tls.disable_verify, None);
410	}
411
412	#[test]
413	fn test_toml_version_survives_update_from() {
414		let toml = r#"
415			version = ["moq-lite-02"]
416		"#;
417
418		let mut config: ClientConfig = toml::from_str(toml).unwrap();
419		assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
420
421		// Simulate: TOML loaded, then CLI args re-applied (no --client-version flag).
422		config.update_from(["test"]);
423		assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
424	}
425
426	#[test]
427	fn test_cli_version() {
428		let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
429		assert_eq!(config.version, vec!["moq-lite-03".parse::<moq_net::Version>().unwrap()]);
430	}
431
432	#[test]
433	fn test_cli_no_version_defaults_to_all() {
434		let config = ClientConfig::parse_from(["test"]);
435		assert!(config.version.is_empty());
436		// versions() helper returns all when none specified
437		assert_eq!(config.versions().alpns().len(), moq_net::ALPNS.len());
438	}
439}