use crate::{Backoff, Error, QuicBackend, Reconnect};
use std::net;
use url::Url;
#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields, default)]
#[non_exhaustive]
pub struct ClientConfig {
#[arg(
id = "client-bind",
long = "client-bind",
default_value = "[::]:0",
env = "MOQ_CLIENT_BIND"
)]
pub bind: net::SocketAddr,
#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
pub backend: Option<QuicBackend>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(
id = "client-max-streams",
long = "client-max-streams",
env = "MOQ_CLIENT_MAX_STREAMS"
)]
pub max_streams: Option<u64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
pub version: Vec<moq_net::Version>,
#[command(flatten)]
#[serde(default)]
pub tls: crate::tls::Client,
#[command(flatten)]
#[serde(default)]
pub backoff: Backoff,
#[cfg(feature = "websocket")]
#[command(flatten)]
#[serde(default)]
pub websocket: crate::websocket::Client,
}
impl ClientConfig {
pub fn init(self) -> crate::Result<Client> {
Client::new(self)
}
pub fn versions(&self) -> moq_net::Versions {
if self.version.is_empty() {
moq_net::Versions::all()
} else {
moq_net::Versions::from(self.version.clone())
}
}
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
bind: "[::]:0".parse().unwrap(),
backend: None,
max_streams: None,
version: Vec::new(),
tls: crate::tls::Client::default(),
backoff: Backoff::default(),
#[cfg(feature = "websocket")]
websocket: crate::websocket::Client::default(),
}
}
}
#[derive(Clone)]
pub struct Client {
moq: moq_net::Client,
versions: moq_net::Versions,
backoff: Backoff,
#[cfg(feature = "websocket")]
websocket: crate::websocket::Client,
tls: rustls::ClientConfig,
#[cfg(feature = "noq")]
noq: Option<crate::noq::NoqClient>,
#[cfg(feature = "quinn")]
quinn: Option<crate::quinn::QuinnClient>,
#[cfg(feature = "quiche")]
quiche: Option<crate::quiche::QuicheClient>,
#[cfg(feature = "iroh")]
iroh: Option<web_transport_iroh::iroh::Endpoint>,
#[cfg(feature = "iroh")]
iroh_addrs: Vec<std::net::SocketAddr>,
}
impl Client {
#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
pub fn new(_config: ClientConfig) -> crate::Result<Self> {
Err(Error::NoBackend(
"no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature",
))
}
#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
pub fn new(config: ClientConfig) -> crate::Result<Self> {
#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
let backend = config.backend.clone().unwrap_or({
#[cfg(feature = "quinn")]
{
QuicBackend::Quinn
}
#[cfg(all(feature = "noq", not(feature = "quinn")))]
{
QuicBackend::Noq
}
#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
{
QuicBackend::Quiche
}
#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
});
let tls = config.tls.build()?;
#[cfg(feature = "noq")]
#[allow(unreachable_patterns)]
let noq = match backend {
QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
_ => None,
};
#[cfg(feature = "quinn")]
#[allow(unreachable_patterns)]
let quinn = match backend {
QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
_ => None,
};
#[cfg(feature = "quiche")]
let quiche = match backend {
QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
_ => None,
};
let versions = config.versions();
Ok(Self {
moq: moq_net::Client::new().with_versions(versions.clone()),
versions,
backoff: config.backoff,
#[cfg(feature = "websocket")]
websocket: config.websocket,
tls,
#[cfg(feature = "noq")]
noq,
#[cfg(feature = "quinn")]
quinn,
#[cfg(feature = "quiche")]
quiche,
#[cfg(feature = "iroh")]
iroh: None,
#[cfg(feature = "iroh")]
iroh_addrs: Vec::new(),
})
}
#[cfg(feature = "iroh")]
pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
self.iroh = iroh;
self
}
#[cfg(feature = "iroh")]
pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
self.iroh_addrs = addrs;
self
}
pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
self.moq = self.moq.with_publish(publish);
self
}
pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
self.moq = self.moq.with_consume(consume);
self
}
pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
self.moq = self.moq.with_stats(stats);
self
}
pub fn reconnect(&self, url: Url) -> Reconnect {
Reconnect::new(self.clone(), url, self.backoff.clone())
}
#[cfg(not(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
)))]
pub async fn connect(&self, _url: Url) -> crate::Result<moq_net::Session> {
Err(Error::NoBackend(
"no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature",
))
}
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
))]
pub async fn connect(&self, url: Url) -> crate::Result<moq_net::Session> {
let session = self.connect_inner(url).await?;
tracing::info!(version = %session.version(), "connected");
Ok(session)
}
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
))]
async fn connect_inner(&self, url: Url) -> crate::Result<moq_net::Session> {
#[cfg(feature = "iroh")]
if url.scheme() == "iroh" {
let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?;
let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
let session = self.moq.connect(session).await?;
return Ok(session);
}
#[cfg(feature = "noq")]
if let Some(noq) = self.noq.as_ref() {
let tls = self.tls.clone();
let quic_url = url.clone();
let quic_handle = async {
let res = noq.connect(&tls, quic_url).await;
if let Err(err) = &res {
tracing::warn!(%err, "QUIC connection failed");
}
res
};
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
else => return Err(Error::ConnectFailed),
});
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "quinn")]
if let Some(quinn) = self.quinn.as_ref() {
let tls = self.tls.clone();
let quic_url = url.clone();
let quic_handle = async {
let res = quinn.connect(&tls, quic_url).await;
if let Err(err) = &res {
tracing::warn!(%err, "QUIC connection failed");
}
res
};
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
else => return Err(Error::ConnectFailed),
});
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "quiche")]
if let Some(quiche) = self.quiche.as_ref() {
let quic_url = url.clone();
let quic_handle = async {
let res = quiche.connect(quic_url).await;
if let Err(err) = &res {
tracing::warn!(%err, "QUIC connection failed");
}
res
};
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
else => return Err(Error::ConnectFailed),
});
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
return Ok(self.moq.connect(session).await?);
}
#[cfg(not(feature = "websocket"))]
return Err(Error::NoBackend("no QUIC backend matched; this should not happen"));
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[test]
fn test_toml_disable_verify_survives_update_from() {
let toml = r#"
tls.disable_verify = true
"#;
let mut config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(config.tls.disable_verify, Some(true));
config.update_from(["test"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_disable_verify_flag() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_disable_verify_explicit_false() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
assert_eq!(config.tls.disable_verify, Some(false));
}
#[test]
fn test_cli_disable_verify_explicit_true() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_no_disable_verify() {
let config = ClientConfig::parse_from(["test"]);
assert_eq!(config.tls.disable_verify, None);
}
#[test]
fn test_toml_version_survives_update_from() {
let toml = r#"
version = ["moq-lite-02"]
"#;
let mut config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
config.update_from(["test"]);
assert_eq!(config.version, vec!["moq-lite-02".parse::<moq_net::Version>().unwrap()]);
}
#[test]
fn test_cli_version() {
let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
assert_eq!(config.version, vec!["moq-lite-03".parse::<moq_net::Version>().unwrap()]);
}
#[test]
fn test_cli_no_version_defaults_to_all() {
let config = ClientConfig::parse_from(["test"]);
assert!(config.version.is_empty());
assert_eq!(config.versions().alpns().len(), moq_net::ALPNS.len());
}
}