1use crate::{Backoff, Error, QuicBackend, Reconnect};
2use std::net;
3use url::Url;
4
5#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
7#[serde(deny_unknown_fields, default)]
8#[non_exhaustive]
9pub struct ClientConfig {
10 #[arg(
12 id = "client-bind",
13 long = "client-bind",
14 default_value = "[::]:0",
15 env = "MOQ_CLIENT_BIND"
16 )]
17 pub bind: net::SocketAddr,
18
19 #[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
22 pub backend: Option<QuicBackend>,
23
24 #[serde(skip_serializing_if = "Option::is_none")]
26 #[arg(
27 id = "client-max-streams",
28 long = "client-max-streams",
29 env = "MOQ_CLIENT_MAX_STREAMS"
30 )]
31 pub max_streams: Option<u64>,
32
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
41 #[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
42 pub version: Vec<moq_net::Version>,
43
44 #[command(flatten)]
45 #[serde(default)]
46 pub tls: crate::tls::Client,
47
48 #[command(flatten)]
49 #[serde(default)]
50 pub backoff: Backoff,
51
52 #[cfg(feature = "websocket")]
53 #[command(flatten)]
54 #[serde(default)]
55 pub websocket: crate::websocket::Client,
56}
57
58impl ClientConfig {
59 pub fn init(self) -> crate::Result<Client> {
60 Client::new(self)
61 }
62
63 pub fn versions(&self) -> moq_net::Versions {
65 if self.version.is_empty() {
66 moq_net::Versions::all()
67 } else {
68 moq_net::Versions::from(self.version.clone())
69 }
70 }
71}
72
73impl Default for ClientConfig {
74 fn default() -> Self {
75 Self {
76 bind: "[::]:0".parse().unwrap(),
77 backend: None,
78 max_streams: None,
79 version: Vec::new(),
80 tls: crate::tls::Client::default(),
81 backoff: Backoff::default(),
82 #[cfg(feature = "websocket")]
83 websocket: crate::websocket::Client::default(),
84 }
85 }
86}
87
88#[derive(Clone)]
92pub struct Client {
93 moq: moq_net::Client,
94 versions: moq_net::Versions,
95 backoff: Backoff,
96 #[cfg(feature = "websocket")]
97 websocket: crate::websocket::Client,
98 tls: rustls::ClientConfig,
99 #[cfg(feature = "noq")]
100 noq: Option<crate::noq::NoqClient>,
101 #[cfg(feature = "quinn")]
102 quinn: Option<crate::quinn::QuinnClient>,
103 #[cfg(feature = "quiche")]
104 quiche: Option<crate::quiche::QuicheClient>,
105 #[cfg(feature = "iroh")]
106 iroh: Option<web_transport_iroh::iroh::Endpoint>,
107 #[cfg(feature = "iroh")]
108 iroh_addrs: Vec<std::net::SocketAddr>,
109}
110
111impl Client {
112 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
113 pub fn new(_config: ClientConfig) -> crate::Result<Self> {
114 Err(Error::NoBackend(
115 "no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature",
116 ))
117 }
118
119 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
121 pub fn new(config: ClientConfig) -> crate::Result<Self> {
122 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
123 let backend = config.backend.clone().unwrap_or({
124 #[cfg(feature = "quinn")]
125 {
126 QuicBackend::Quinn
127 }
128 #[cfg(all(feature = "noq", not(feature = "quinn")))]
129 {
130 QuicBackend::Noq
131 }
132 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
133 {
134 QuicBackend::Quiche
135 }
136 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
137 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
138 });
139
140 let tls = config.tls.build()?;
141
142 #[cfg(feature = "noq")]
143 #[allow(unreachable_patterns)]
144 let noq = match backend {
145 QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
146 _ => None,
147 };
148
149 #[cfg(feature = "quinn")]
150 #[allow(unreachable_patterns)]
151 let quinn = match backend {
152 QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
153 _ => None,
154 };
155
156 #[cfg(feature = "quiche")]
157 let quiche = match backend {
158 QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
159 _ => None,
160 };
161
162 let versions = config.versions();
163 Ok(Self {
164 moq: moq_net::Client::new().with_versions(versions.clone()),
165 versions,
166 backoff: config.backoff,
167 #[cfg(feature = "websocket")]
168 websocket: config.websocket,
169 tls,
170 #[cfg(feature = "noq")]
171 noq,
172 #[cfg(feature = "quinn")]
173 quinn,
174 #[cfg(feature = "quiche")]
175 quiche,
176 #[cfg(feature = "iroh")]
177 iroh: None,
178 #[cfg(feature = "iroh")]
179 iroh_addrs: Vec::new(),
180 })
181 }
182
183 #[cfg(feature = "iroh")]
184 pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
185 self.iroh = iroh;
186 self
187 }
188
189 #[cfg(feature = "iroh")]
194 pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
195 self.iroh_addrs = addrs;
196 self
197 }
198
199 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
200 self.moq = self.moq.with_publish(publish);
201 self
202 }
203
204 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
205 self.moq = self.moq.with_consume(consume);
206 self
207 }
208
209 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
211 self.moq = self.moq.with_stats(stats);
212 self
213 }
214
215 pub fn reconnect(&self, url: Url) -> Reconnect {
220 Reconnect::new(self.clone(), url, self.backoff.clone())
221 }
222
223 #[cfg(not(any(
224 feature = "noq",
225 feature = "quinn",
226 feature = "quiche",
227 feature = "iroh",
228 feature = "websocket"
229 )))]
230 pub async fn connect(&self, _url: Url) -> crate::Result<moq_net::Session> {
231 Err(Error::NoBackend(
232 "no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature",
233 ))
234 }
235
236 #[cfg(any(
237 feature = "noq",
238 feature = "quinn",
239 feature = "quiche",
240 feature = "iroh",
241 feature = "websocket"
242 ))]
243 pub async fn connect(&self, url: Url) -> crate::Result<moq_net::Session> {
244 let session = self.connect_inner(url).await?;
245 tracing::info!(version = %session.version(), "connected");
246 Ok(session)
247 }
248
249 #[cfg(any(
250 feature = "noq",
251 feature = "quinn",
252 feature = "quiche",
253 feature = "iroh",
254 feature = "websocket"
255 ))]
256 async fn connect_inner(&self, url: Url) -> crate::Result<moq_net::Session> {
257 #[cfg(feature = "iroh")]
258 if url.scheme() == "iroh" {
259 let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?;
260 let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
261 let session = self.moq.connect(session).await?;
262 return Ok(session);
263 }
264
265 #[cfg(feature = "noq")]
266 if let Some(noq) = self.noq.as_ref() {
267 let tls = self.tls.clone();
268 let quic_url = url.clone();
269 let quic_handle = async {
270 let res = noq.connect(&tls, quic_url).await;
271 if let Err(err) = &res {
272 tracing::warn!(%err, "QUIC connection failed");
273 }
274 res
275 };
276
277 #[cfg(feature = "websocket")]
278 {
279 let alpns = self.versions.alpns();
280 let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
281
282 return Ok(tokio::select! {
283 Ok(quic) = quic_handle => self.moq.connect(quic).await?,
284 Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
285 else => return Err(Error::ConnectFailed),
286 });
287 }
288
289 #[cfg(not(feature = "websocket"))]
290 {
291 let session = quic_handle.await?;
292 return Ok(self.moq.connect(session).await?);
293 }
294 }
295
296 #[cfg(feature = "quinn")]
297 if let Some(quinn) = self.quinn.as_ref() {
298 let tls = self.tls.clone();
299 let quic_url = url.clone();
300 let quic_handle = async {
301 let res = quinn.connect(&tls, quic_url).await;
302 if let Err(err) = &res {
303 tracing::warn!(%err, "QUIC connection failed");
304 }
305 res
306 };
307
308 #[cfg(feature = "websocket")]
309 {
310 let alpns = self.versions.alpns();
311 let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
312
313 return Ok(tokio::select! {
314 Ok(quic) = quic_handle => self.moq.connect(quic).await?,
315 Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
316 else => return Err(Error::ConnectFailed),
317 });
318 }
319
320 #[cfg(not(feature = "websocket"))]
321 {
322 let session = quic_handle.await?;
323 return Ok(self.moq.connect(session).await?);
324 }
325 }
326
327 #[cfg(feature = "quiche")]
328 if let Some(quiche) = self.quiche.as_ref() {
329 let quic_url = url.clone();
330 let quic_handle = async {
331 let res = quiche.connect(quic_url).await;
332 if let Err(err) = &res {
333 tracing::warn!(%err, "QUIC connection failed");
334 }
335 res
336 };
337
338 #[cfg(feature = "websocket")]
339 {
340 let alpns = self.versions.alpns();
341 let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
342
343 return Ok(tokio::select! {
344 Ok(quic) = quic_handle => self.moq.connect(quic).await?,
345 Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
346 else => return Err(Error::ConnectFailed),
347 });
348 }
349
350 #[cfg(not(feature = "websocket"))]
351 {
352 let session = quic_handle.await?;
353 return Ok(self.moq.connect(session).await?);
354 }
355 }
356
357 #[cfg(feature = "websocket")]
358 {
359 let alpns = self.versions.alpns();
360 let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
361 return Ok(self.moq.connect(session).await?);
362 }
363
364 #[cfg(not(feature = "websocket"))]
365 return Err(Error::NoBackend("no QUIC backend matched; this should not happen"));
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use clap::Parser;
373
374 #[test]
375 fn test_toml_disable_verify_survives_update_from() {
376 let toml = r#"
377 tls.disable_verify = true
378 "#;
379
380 let mut config: ClientConfig = toml::from_str(toml).unwrap();
381 assert_eq!(config.tls.disable_verify, Some(true));
382
383 config.update_from(["test"]);
385 assert_eq!(config.tls.disable_verify, Some(true));
386 }
387
388 #[test]
389 fn test_cli_disable_verify_flag() {
390 let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
391 assert_eq!(config.tls.disable_verify, Some(true));
392 }
393
394 #[test]
395 fn test_cli_disable_verify_explicit_false() {
396 let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
397 assert_eq!(config.tls.disable_verify, Some(false));
398 }
399
400 #[test]
401 fn test_cli_disable_verify_explicit_true() {
402 let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
403 assert_eq!(config.tls.disable_verify, Some(true));
404 }
405
406 #[test]
407 fn test_cli_no_disable_verify() {
408 let config = ClientConfig::parse_from(["test"]);
409 assert_eq!(config.tls.disable_verify, None);
410 }
411
412 #[test]
413 fn test_toml_version_survives_update_from() {
414 let toml = r#"
415 version = ["moq-lite-02"]
416 "#;
417
418 let mut config: ClientConfig = toml::from_str(toml).unwrap();
419 assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
420
421 config.update_from(["test"]);
423 assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
424 }
425
426 #[test]
427 fn test_cli_version() {
428 let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
429 assert_eq!(config.version, vec!["moq-lite-03".parse::<moq_net::Version>().unwrap()]);
430 }
431
432 #[test]
433 fn test_cli_no_version_defaults_to_all() {
434 let config = ClientConfig::parse_from(["test"]);
435 assert!(config.version.is_empty());
436 assert_eq!(config.versions().alpns().len(), moq_net::ALPNS.len());
438 }
439}