1use crate::{Backoff, Error, QuicBackend, Reconnect};
2#[cfg(feature = "websocket")]
3use std::future::Future;
4use std::net;
5use url::Url;
6
7#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
9#[serde(deny_unknown_fields, default)]
10#[non_exhaustive]
11pub struct ClientConfig {
12 #[arg(
14 id = "client-bind",
15 long = "client-bind",
16 default_value = "[::]:0",
17 env = "MOQ_CLIENT_BIND"
18 )]
19 pub bind: net::SocketAddr,
20
21 #[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
24 pub backend: Option<QuicBackend>,
25
26 #[serde(skip_serializing_if = "Option::is_none")]
28 #[arg(
29 id = "client-max-streams",
30 long = "client-max-streams",
31 env = "MOQ_CLIENT_MAX_STREAMS"
32 )]
33 pub max_streams: Option<u64>,
34
35 #[serde(default, skip_serializing_if = "Vec::is_empty")]
43 #[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
44 pub version: Vec<moq_net::Version>,
45
46 #[command(flatten)]
47 #[serde(default)]
48 pub tls: crate::tls::Client,
49
50 #[command(flatten)]
51 #[serde(default)]
52 pub backoff: Backoff,
53
54 #[cfg(feature = "websocket")]
55 #[command(flatten)]
56 #[serde(default)]
57 pub websocket: crate::websocket::Client,
58}
59
60impl ClientConfig {
61 pub fn init(self) -> crate::Result<Client> {
62 Client::new(self)
63 }
64
65 pub fn versions(&self) -> moq_net::Versions {
67 if self.version.is_empty() {
68 moq_net::Versions::all()
69 } else {
70 moq_net::Versions::from(self.version.clone())
71 }
72 }
73}
74
75impl Default for ClientConfig {
76 fn default() -> Self {
77 Self {
78 bind: "[::]:0".parse().unwrap(),
79 backend: None,
80 max_streams: None,
81 version: Vec::new(),
82 tls: crate::tls::Client::default(),
83 backoff: Backoff::default(),
84 #[cfg(feature = "websocket")]
85 websocket: crate::websocket::Client::default(),
86 }
87 }
88}
89
90#[derive(Clone)]
94pub struct Client {
95 moq: moq_net::Client,
96 versions: moq_net::Versions,
97 backoff: Backoff,
98 #[cfg(feature = "websocket")]
99 websocket: crate::websocket::Client,
100 tls: rustls::ClientConfig,
101 #[cfg(feature = "noq")]
102 noq: Option<crate::noq::NoqClient>,
103 #[cfg(feature = "quinn")]
104 quinn: Option<crate::quinn::QuinnClient>,
105 #[cfg(feature = "quiche")]
106 quiche: Option<crate::quiche::QuicheClient>,
107 #[cfg(feature = "iroh")]
108 iroh: Option<web_transport_iroh::iroh::Endpoint>,
109 #[cfg(feature = "iroh")]
110 iroh_addrs: Vec<std::net::SocketAddr>,
111}
112
113impl Client {
114 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
115 pub fn new(_config: ClientConfig) -> crate::Result<Self> {
116 Err(Error::NoBackend(
117 "no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature",
118 ))
119 }
120
121 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
123 pub fn new(config: ClientConfig) -> crate::Result<Self> {
124 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
125 let backend = config.backend.clone().unwrap_or({
126 #[cfg(feature = "quinn")]
127 {
128 QuicBackend::Quinn
129 }
130 #[cfg(all(feature = "noq", not(feature = "quinn")))]
131 {
132 QuicBackend::Noq
133 }
134 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
135 {
136 QuicBackend::Quiche
137 }
138 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
139 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
140 });
141
142 let tls = config.tls.build()?;
143
144 #[cfg(feature = "noq")]
145 #[allow(unreachable_patterns)]
146 let noq = match backend {
147 QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
148 _ => None,
149 };
150
151 #[cfg(feature = "quinn")]
152 #[allow(unreachable_patterns)]
153 let quinn = match backend {
154 QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
155 _ => None,
156 };
157
158 #[cfg(feature = "quiche")]
159 let quiche = match backend {
160 QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
161 _ => None,
162 };
163
164 let versions = config.versions();
165 Ok(Self {
166 moq: moq_net::Client::new().with_versions(versions.clone()),
167 versions,
168 backoff: config.backoff,
169 #[cfg(feature = "websocket")]
170 websocket: config.websocket,
171 tls,
172 #[cfg(feature = "noq")]
173 noq,
174 #[cfg(feature = "quinn")]
175 quinn,
176 #[cfg(feature = "quiche")]
177 quiche,
178 #[cfg(feature = "iroh")]
179 iroh: None,
180 #[cfg(feature = "iroh")]
181 iroh_addrs: Vec::new(),
182 })
183 }
184
185 #[cfg(feature = "iroh")]
186 pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
187 self.iroh = iroh;
188 self
189 }
190
191 #[cfg(feature = "iroh")]
196 pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
197 self.iroh_addrs = addrs;
198 self
199 }
200
201 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
202 self.moq = self.moq.with_publish(publish);
203 self
204 }
205
206 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
207 self.moq = self.moq.with_consume(consume);
208 self
209 }
210
211 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
213 self.moq = self.moq.with_stats(stats);
214 self
215 }
216
217 pub fn reconnect(&self, url: Url) -> Reconnect {
222 Reconnect::new(self.clone(), url, self.backoff.clone())
223 }
224
225 #[cfg(not(any(
226 feature = "noq",
227 feature = "quinn",
228 feature = "quiche",
229 feature = "iroh",
230 feature = "websocket"
231 )))]
232 pub async fn connect(&self, _url: Url) -> crate::Result<moq_net::Session> {
233 Err(Error::NoBackend(
234 "no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature",
235 ))
236 }
237
238 #[cfg(any(
239 feature = "noq",
240 feature = "quinn",
241 feature = "quiche",
242 feature = "iroh",
243 feature = "websocket"
244 ))]
245 pub async fn connect(&self, url: Url) -> crate::Result<moq_net::Session> {
246 let session = self.connect_inner(url).await?;
247 tracing::info!(version = %session.version(), "connected");
248 Ok(session)
249 }
250
251 #[cfg(any(
252 feature = "noq",
253 feature = "quinn",
254 feature = "quiche",
255 feature = "iroh",
256 feature = "websocket"
257 ))]
258 async fn connect_inner(&self, url: Url) -> crate::Result<moq_net::Session> {
259 #[cfg(feature = "iroh")]
260 if url.scheme() == "iroh" {
261 let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?;
262 let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
263 let session = self.moq.connect(session).await?;
264 return Ok(session);
265 }
266
267 #[cfg(feature = "noq")]
268 if let Some(noq) = self.noq.as_ref() {
269 let tls = self.tls.clone();
270 let quic_url = url.clone();
271 let quic_handle = async { noq.connect(&tls, quic_url).await.map_err(Error::from) };
272
273 #[cfg(feature = "websocket")]
274 {
275 return self.race_moq_connect(url, quic_handle).await;
276 }
277
278 #[cfg(not(feature = "websocket"))]
279 {
280 let session = quic_handle.await?;
281 return Ok(self.moq.connect(session).await?);
282 }
283 }
284
285 #[cfg(feature = "quinn")]
286 if let Some(quinn) = self.quinn.as_ref() {
287 let tls = self.tls.clone();
288 let quic_url = url.clone();
289 let quic_handle = async { quinn.connect(&tls, quic_url).await.map_err(Error::from) };
290
291 #[cfg(feature = "websocket")]
292 {
293 return self.race_moq_connect(url, quic_handle).await;
294 }
295
296 #[cfg(not(feature = "websocket"))]
297 {
298 let session = quic_handle.await?;
299 return Ok(self.moq.connect(session).await?);
300 }
301 }
302
303 #[cfg(feature = "quiche")]
304 if let Some(quiche) = self.quiche.as_ref() {
305 let quic_url = url.clone();
306 let quic_handle = async { quiche.connect(quic_url).await.map_err(Error::from) };
307
308 #[cfg(feature = "websocket")]
309 {
310 return self.race_moq_connect(url, quic_handle).await;
311 }
312
313 #[cfg(not(feature = "websocket"))]
314 {
315 let session = quic_handle.await?;
316 return Ok(self.moq.connect(session).await?);
317 }
318 }
319
320 #[cfg(feature = "websocket")]
321 {
322 let alpns = self.versions.alpns();
323 let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
324 return Ok(self.moq.connect(session).await?);
325 }
326
327 #[cfg(not(feature = "websocket"))]
328 return Err(Error::NoBackend("no QUIC backend matched; this should not happen"));
329 }
330
331 #[cfg(feature = "websocket")]
332 async fn race_moq_connect<Q, S>(&self, url: Url, quic: Q) -> crate::Result<moq_net::Session>
333 where
334 Q: Future<Output = crate::Result<S>>,
335 S: web_transport_trait::Session,
336 {
337 let alpns = self.versions.alpns();
338 let ws_config = self.websocket.clone();
339 let ws_tls = self.tls.clone();
340 let websocket = async move {
341 crate::websocket::race_handle(&ws_config, &ws_tls, url, &alpns)
342 .await
343 .map(|res| res.map_err(Error::from))
344 };
345
346 match race_transport_connect(quic, websocket).await? {
347 TransportRace::Quic(quic) => Ok(self.moq.connect(quic).await?),
348 TransportRace::WebSocket(websocket) => Ok(self.moq.connect(websocket).await?),
349 }
350 }
351}
352
353#[cfg(feature = "websocket")]
354#[derive(Debug, PartialEq, Eq)]
355enum TransportRace<Q, W> {
356 Quic(Q),
357 WebSocket(W),
358}
359
360#[cfg(feature = "websocket")]
361async fn race_transport_connect<Q, W, QT, WT>(quic: Q, websocket: W) -> crate::Result<TransportRace<QT, WT>>
362where
363 Q: Future<Output = crate::Result<QT>>,
364 W: Future<Output = Option<crate::Result<WT>>>,
365{
366 tokio::pin!(quic);
367 tokio::pin!(websocket);
368
369 let mut quic_err = None;
370 let mut websocket_err = None;
371 let mut quic_done = false;
372 let mut websocket_done = false;
373
374 loop {
375 tokio::select! {
376 res = &mut quic, if !quic_done => {
377 match res {
378 Ok(session) => return Ok(TransportRace::Quic(session)),
379 Err(err) if err.is_auth() => return Err(err),
380 Err(err) => {
381 tracing::warn!(%err, "QUIC connection failed");
382 quic_err = Some(err);
383 quic_done = true;
384 }
385 }
386 }
387 res = &mut websocket, if !websocket_done => {
388 match res {
389 Some(Ok(session)) => return Ok(TransportRace::WebSocket(session)),
390 Some(Err(err)) if err.is_auth() => return Err(err),
391 Some(Err(err)) => {
392 tracing::warn!(%err, "WebSocket connection failed");
393 websocket_err = Some(err);
394 websocket_done = true;
395 }
396 None => {
397 websocket_done = true;
398 }
399 }
400 }
401 else => break,
402 }
403
404 if quic_done && websocket_done {
405 break;
406 }
407 }
408
409 match (quic_err, websocket_err) {
410 (Some(quic), Some(websocket)) => Err(Error::TransportRace {
411 quic: std::sync::Arc::new(quic),
412 websocket: std::sync::Arc::new(websocket),
413 }),
414 (Some(err), None) | (None, Some(err)) => Err(err),
415 (None, None) => Err(Error::ConnectFailed),
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use clap::Parser;
423
424 #[test]
425 fn test_toml_disable_verify_survives_update_from() {
426 let toml = r#"
427 tls.disable_verify = true
428 "#;
429
430 let mut config: ClientConfig = toml::from_str(toml).unwrap();
431 assert_eq!(config.tls.disable_verify, Some(true));
432
433 config.update_from(["test"]);
435 assert_eq!(config.tls.disable_verify, Some(true));
436 }
437
438 #[test]
439 fn test_cli_disable_verify_flag() {
440 let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
441 assert_eq!(config.tls.disable_verify, Some(true));
442 }
443
444 #[test]
445 fn test_cli_disable_verify_explicit_false() {
446 let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
447 assert_eq!(config.tls.disable_verify, Some(false));
448 }
449
450 #[test]
451 fn test_cli_disable_verify_explicit_true() {
452 let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
453 assert_eq!(config.tls.disable_verify, Some(true));
454 }
455
456 #[test]
457 fn test_cli_no_disable_verify() {
458 let config = ClientConfig::parse_from(["test"]);
459 assert_eq!(config.tls.disable_verify, None);
460 }
461
462 #[test]
463 fn test_toml_fingerprint_survives_update_from() {
464 let toml = r#"
465 tls.fingerprint = ["abcd1234", "ef567890"]
466 "#;
467
468 let mut config: ClientConfig = toml::from_str(toml).unwrap();
469 assert_eq!(config.tls.fingerprint, vec!["abcd1234", "ef567890"]);
470
471 config.update_from(["test"]);
473 assert_eq!(config.tls.fingerprint, vec!["abcd1234", "ef567890"]);
474 }
475
476 #[test]
477 fn test_toml_fingerprint_accepts_single_string() {
478 let toml = r#"
479 tls.fingerprint = "abcd1234"
480 "#;
481
482 let config: ClientConfig = toml::from_str(toml).unwrap();
483 assert_eq!(config.tls.fingerprint, vec!["abcd1234"]);
484 }
485
486 #[test]
487 fn test_cli_fingerprint() {
488 let config = ClientConfig::parse_from(["test", "--tls-fingerprint", "abcd1234"]);
489 assert_eq!(config.tls.fingerprint, vec!["abcd1234"]);
490 }
491
492 #[test]
493 fn test_toml_version_survives_update_from() {
494 let toml = r#"
495 version = ["moq-lite-02"]
496 "#;
497
498 let mut config: ClientConfig = toml::from_str(toml).unwrap();
499 assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
500
501 config.update_from(["test"]);
503 assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
504 }
505
506 #[test]
507 fn test_cli_version() {
508 let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
509 assert_eq!(config.version, vec!["moq-lite-03".parse::<moq_net::Version>().unwrap()]);
510 }
511
512 #[test]
513 fn test_cli_no_version_defaults_to_all() {
514 let config = ClientConfig::parse_from(["test"]);
515 assert!(config.version.is_empty());
516 assert_eq!(config.versions().alpns().len(), moq_net::ALPNS.len());
518 }
519
520 #[cfg(feature = "websocket")]
521 #[tokio::test]
522 async fn race_transport_connect_stops_on_quic_auth_error() {
523 let quic = async { Err::<usize, _>(crate::ConnectError::Unauthorized.into()) };
524 let websocket = async {
525 tokio::task::yield_now().await;
527 Some(Ok(1usize))
528 };
529
530 let err = super::race_transport_connect(quic, websocket).await.unwrap_err();
531 assert_eq!(err.connect_error(), Some(crate::ConnectError::Unauthorized));
532 }
533
534 #[cfg(feature = "websocket")]
535 #[tokio::test]
536 async fn race_transport_connect_keeps_websocket_after_quic_non_auth_error() {
537 let quic = async { Err::<usize, _>(Error::ConnectFailed) };
538 let websocket = async { Some(Ok(7usize)) };
539
540 let value = super::race_transport_connect(quic, websocket).await.unwrap();
541 assert_eq!(value, super::TransportRace::WebSocket(7));
542 }
543
544 #[cfg(feature = "websocket")]
545 #[tokio::test]
546 async fn race_transport_connect_returns_when_quic_transport_connects() {
547 let quic = async { Ok("quic") };
548 let websocket = std::future::pending::<Option<crate::Result<&str>>>();
549
550 let value = tokio::time::timeout(
551 std::time::Duration::from_secs(1),
552 super::race_transport_connect(quic, websocket),
553 )
554 .await
555 .expect("race waited for WebSocket after QUIC transport connected")
556 .unwrap();
557 assert_eq!(value, super::TransportRace::Quic("quic"));
558 }
559}