use crate::{Backoff, Error, QuicBackend, Reconnect};
#[cfg(feature = "websocket")]
use std::future::Future;
use std::net;
use url::Url;
#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields, default)]
#[non_exhaustive]
pub struct ClientConfig {
#[arg(
id = "client-bind",
long = "client-bind",
default_value = "[::]:0",
env = "MOQ_CLIENT_BIND"
)]
pub bind: net::SocketAddr,
#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
pub backend: Option<QuicBackend>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(
id = "client-max-streams",
long = "client-max-streams",
env = "MOQ_CLIENT_MAX_STREAMS"
)]
pub max_streams: Option<u64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
pub version: Vec<moq_net::Version>,
#[command(flatten)]
#[serde(default)]
pub tls: crate::tls::Client,
#[command(flatten)]
#[serde(default)]
pub backoff: Backoff,
#[cfg(feature = "websocket")]
#[command(flatten)]
#[serde(default)]
pub websocket: crate::websocket::Client,
}
impl ClientConfig {
pub fn init(self) -> crate::Result<Client> {
Client::new(self)
}
pub fn versions(&self) -> moq_net::Versions {
if self.version.is_empty() {
moq_net::Versions::all()
} else {
moq_net::Versions::from(self.version.clone())
}
}
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
bind: "[::]:0".parse().unwrap(),
backend: None,
max_streams: None,
version: Vec::new(),
tls: crate::tls::Client::default(),
backoff: Backoff::default(),
#[cfg(feature = "websocket")]
websocket: crate::websocket::Client::default(),
}
}
}
#[derive(Clone)]
pub struct Client {
moq: moq_net::Client,
versions: moq_net::Versions,
backoff: Backoff,
#[cfg(feature = "websocket")]
websocket: crate::websocket::Client,
tls: rustls::ClientConfig,
#[cfg(feature = "noq")]
noq: Option<crate::noq::NoqClient>,
#[cfg(feature = "quinn")]
quinn: Option<crate::quinn::QuinnClient>,
#[cfg(feature = "quiche")]
quiche: Option<crate::quiche::QuicheClient>,
#[cfg(feature = "iroh")]
iroh: Option<web_transport_iroh::iroh::Endpoint>,
#[cfg(feature = "iroh")]
iroh_addrs: Vec<std::net::SocketAddr>,
}
impl Client {
#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
pub fn new(_config: ClientConfig) -> crate::Result<Self> {
Err(Error::NoBackend(
"no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature",
))
}
#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
pub fn new(config: ClientConfig) -> crate::Result<Self> {
#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
let backend = config.backend.clone().unwrap_or({
#[cfg(feature = "quinn")]
{
QuicBackend::Quinn
}
#[cfg(all(feature = "noq", not(feature = "quinn")))]
{
QuicBackend::Noq
}
#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
{
QuicBackend::Quiche
}
#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
});
let tls = config.tls.build()?;
#[cfg(feature = "noq")]
#[allow(unreachable_patterns)]
let noq = match backend {
QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
_ => None,
};
#[cfg(feature = "quinn")]
#[allow(unreachable_patterns)]
let quinn = match backend {
QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
_ => None,
};
#[cfg(feature = "quiche")]
let quiche = match backend {
QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
_ => None,
};
let versions = config.versions();
Ok(Self {
moq: moq_net::Client::new().with_versions(versions.clone()),
versions,
backoff: config.backoff,
#[cfg(feature = "websocket")]
websocket: config.websocket,
tls,
#[cfg(feature = "noq")]
noq,
#[cfg(feature = "quinn")]
quinn,
#[cfg(feature = "quiche")]
quiche,
#[cfg(feature = "iroh")]
iroh: None,
#[cfg(feature = "iroh")]
iroh_addrs: Vec::new(),
})
}
#[cfg(feature = "iroh")]
pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
self.iroh = iroh;
self
}
#[cfg(feature = "iroh")]
pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
self.iroh_addrs = addrs;
self
}
pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
self.moq = self.moq.with_publish(publish);
self
}
pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
self.moq = self.moq.with_consume(consume);
self
}
pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
self.moq = self.moq.with_stats(stats);
self
}
pub fn reconnect(&self, url: Url) -> Reconnect {
Reconnect::new(self.clone(), url, self.backoff.clone())
}
#[cfg(not(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
)))]
pub async fn connect(&self, _url: Url) -> crate::Result<moq_net::Session> {
Err(Error::NoBackend(
"no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature",
))
}
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
))]
pub async fn connect(&self, url: Url) -> crate::Result<moq_net::Session> {
let session = self.connect_inner(url).await?;
tracing::info!(version = %session.version(), "connected");
Ok(session)
}
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
))]
async fn connect_inner(&self, url: Url) -> crate::Result<moq_net::Session> {
#[cfg(feature = "iroh")]
if url.scheme() == "iroh" {
let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?;
let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
let session = self.moq.connect(session).await?;
return Ok(session);
}
#[cfg(feature = "noq")]
if let Some(noq) = self.noq.as_ref() {
let tls = self.tls.clone();
let quic_url = url.clone();
let quic_handle = async { noq.connect(&tls, quic_url).await.map_err(Error::from) };
#[cfg(feature = "websocket")]
{
return self.race_moq_connect(url, quic_handle).await;
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "quinn")]
if let Some(quinn) = self.quinn.as_ref() {
let tls = self.tls.clone();
let quic_url = url.clone();
let quic_handle = async { quinn.connect(&tls, quic_url).await.map_err(Error::from) };
#[cfg(feature = "websocket")]
{
return self.race_moq_connect(url, quic_handle).await;
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "quiche")]
if let Some(quiche) = self.quiche.as_ref() {
let quic_url = url.clone();
let quic_handle = async { quiche.connect(quic_url).await.map_err(Error::from) };
#[cfg(feature = "websocket")]
{
return self.race_moq_connect(url, quic_handle).await;
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
return Ok(self.moq.connect(session).await?);
}
#[cfg(not(feature = "websocket"))]
return Err(Error::NoBackend("no QUIC backend matched; this should not happen"));
}
#[cfg(feature = "websocket")]
async fn race_moq_connect<Q, S>(&self, url: Url, quic: Q) -> crate::Result<moq_net::Session>
where
Q: Future<Output = crate::Result<S>>,
S: web_transport_trait::Session,
{
let alpns = self.versions.alpns();
let ws_config = self.websocket.clone();
let ws_tls = self.tls.clone();
let websocket = async move {
crate::websocket::race_handle(&ws_config, &ws_tls, url, &alpns)
.await
.map(|res| res.map_err(Error::from))
};
match race_transport_connect(quic, websocket).await? {
TransportRace::Quic(quic) => Ok(self.moq.connect(quic).await?),
TransportRace::WebSocket(websocket) => Ok(self.moq.connect(websocket).await?),
}
}
}
#[cfg(feature = "websocket")]
#[derive(Debug, PartialEq, Eq)]
enum TransportRace<Q, W> {
Quic(Q),
WebSocket(W),
}
#[cfg(feature = "websocket")]
async fn race_transport_connect<Q, W, QT, WT>(quic: Q, websocket: W) -> crate::Result<TransportRace<QT, WT>>
where
Q: Future<Output = crate::Result<QT>>,
W: Future<Output = Option<crate::Result<WT>>>,
{
tokio::pin!(quic);
tokio::pin!(websocket);
let mut quic_err = None;
let mut websocket_err = None;
let mut quic_done = false;
let mut websocket_done = false;
loop {
tokio::select! {
res = &mut quic, if !quic_done => {
match res {
Ok(session) => return Ok(TransportRace::Quic(session)),
Err(err) if err.is_auth() => return Err(err),
Err(err) => {
tracing::warn!(%err, "QUIC connection failed");
quic_err = Some(err);
quic_done = true;
}
}
}
res = &mut websocket, if !websocket_done => {
match res {
Some(Ok(session)) => return Ok(TransportRace::WebSocket(session)),
Some(Err(err)) if err.is_auth() => return Err(err),
Some(Err(err)) => {
tracing::warn!(%err, "WebSocket connection failed");
websocket_err = Some(err);
websocket_done = true;
}
None => {
websocket_done = true;
}
}
}
else => break,
}
if quic_done && websocket_done {
break;
}
}
match (quic_err, websocket_err) {
(Some(quic), Some(websocket)) => Err(Error::TransportRace {
quic: std::sync::Arc::new(quic),
websocket: std::sync::Arc::new(websocket),
}),
(Some(err), None) | (None, Some(err)) => Err(err),
(None, None) => Err(Error::ConnectFailed),
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[test]
fn test_toml_disable_verify_survives_update_from() {
let toml = r#"
tls.disable_verify = true
"#;
let mut config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(config.tls.disable_verify, Some(true));
config.update_from(["test"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_disable_verify_flag() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_disable_verify_explicit_false() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
assert_eq!(config.tls.disable_verify, Some(false));
}
#[test]
fn test_cli_disable_verify_explicit_true() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_no_disable_verify() {
let config = ClientConfig::parse_from(["test"]);
assert_eq!(config.tls.disable_verify, None);
}
#[test]
fn test_toml_fingerprint_survives_update_from() {
let toml = r#"
tls.fingerprint = ["abcd1234", "ef567890"]
"#;
let mut config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(config.tls.fingerprint, vec!["abcd1234", "ef567890"]);
config.update_from(["test"]);
assert_eq!(config.tls.fingerprint, vec!["abcd1234", "ef567890"]);
}
#[test]
fn test_toml_fingerprint_accepts_single_string() {
let toml = r#"
tls.fingerprint = "abcd1234"
"#;
let config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(config.tls.fingerprint, vec!["abcd1234"]);
}
#[test]
fn test_cli_fingerprint() {
let config = ClientConfig::parse_from(["test", "--tls-fingerprint", "abcd1234"]);
assert_eq!(config.tls.fingerprint, vec!["abcd1234"]);
}
#[test]
fn test_toml_version_survives_update_from() {
let toml = r#"
version = ["moq-lite-02"]
"#;
let mut config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
config.update_from(["test"]);
assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
}
#[test]
fn test_cli_version() {
let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
assert_eq!(config.version, vec!["moq-lite-03".parse::<moq_net::Version>().unwrap()]);
}
#[test]
fn test_cli_no_version_defaults_to_all() {
let config = ClientConfig::parse_from(["test"]);
assert!(config.version.is_empty());
assert_eq!(config.versions().alpns().len(), moq_net::ALPNS.len());
}
#[cfg(feature = "websocket")]
#[tokio::test]
async fn race_transport_connect_stops_on_quic_auth_error() {
let quic = async { Err::<usize, _>(crate::ConnectError::Unauthorized.into()) };
let websocket = async {
tokio::task::yield_now().await;
Some(Ok(1usize))
};
let err = super::race_transport_connect(quic, websocket).await.unwrap_err();
assert_eq!(err.connect_error(), Some(crate::ConnectError::Unauthorized));
}
#[cfg(feature = "websocket")]
#[tokio::test]
async fn race_transport_connect_keeps_websocket_after_quic_non_auth_error() {
let quic = async { Err::<usize, _>(Error::ConnectFailed) };
let websocket = async { Some(Ok(7usize)) };
let value = super::race_transport_connect(quic, websocket).await.unwrap();
assert_eq!(value, super::TransportRace::WebSocket(7));
}
#[cfg(feature = "websocket")]
#[tokio::test]
async fn race_transport_connect_returns_when_quic_transport_connects() {
let quic = async { Ok("quic") };
let websocket = std::future::pending::<Option<crate::Result<&str>>>();
let value = tokio::time::timeout(
std::time::Duration::from_secs(1),
super::race_transport_connect(quic, websocket),
)
.await
.expect("race waited for WebSocket after QUIC transport connected")
.unwrap();
assert_eq!(value, super::TransportRace::Quic("quic"));
}
}