use crate::crypto;
use crate::{Backoff, QuicBackend, Reconnect};
use anyhow::Context;
use std::path::PathBuf;
use std::{net, sync::Arc};
use url::Url;
#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
#[serde(default, deny_unknown_fields)]
#[non_exhaustive]
pub struct ClientTls {
#[serde(skip_serializing_if = "Vec::is_empty")]
#[arg(id = "tls-root", long = "tls-root", env = "MOQ_CLIENT_TLS_ROOT")]
pub root: Vec<PathBuf>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(
id = "client-tls-identity",
long = "client-tls-identity",
env = "MOQ_CLIENT_TLS_IDENTITY"
)]
pub identity: Option<PathBuf>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(
id = "tls-disable-verify",
long = "tls-disable-verify",
env = "MOQ_CLIENT_TLS_DISABLE_VERIFY",
default_missing_value = "true",
num_args = 0..=1,
require_equals = true,
value_parser = clap::value_parser!(bool),
)]
pub disable_verify: Option<bool>,
}
#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields, default)]
#[non_exhaustive]
pub struct ClientConfig {
#[arg(
id = "client-bind",
long = "client-bind",
default_value = "[::]:0",
env = "MOQ_CLIENT_BIND"
)]
pub bind: net::SocketAddr,
#[arg(id = "client-backend", long = "client-backend", env = "MOQ_CLIENT_BACKEND")]
pub backend: Option<QuicBackend>,
#[serde(skip_serializing_if = "Option::is_none")]
#[arg(
id = "client-max-streams",
long = "client-max-streams",
env = "MOQ_CLIENT_MAX_STREAMS"
)]
pub max_streams: Option<u64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
#[arg(id = "client-version", long = "client-version", env = "MOQ_CLIENT_VERSION")]
pub version: Vec<moq_lite::Version>,
#[command(flatten)]
#[serde(default)]
pub tls: ClientTls,
#[command(flatten)]
#[serde(default)]
pub backoff: Backoff,
#[cfg(feature = "websocket")]
#[command(flatten)]
#[serde(default)]
pub websocket: super::ClientWebSocket,
}
impl ClientTls {
pub fn build(&self) -> anyhow::Result<rustls::ClientConfig> {
use rustls::pki_types::CertificateDer;
let provider = crypto::provider();
let mut roots = rustls::RootCertStore::empty();
if self.root.is_empty() {
let native = rustls_native_certs::load_native_certs();
for err in native.errors {
tracing::warn!(%err, "failed to load root cert");
}
for cert in native.certs {
roots.add(cert).context("failed to add root cert")?;
}
} else {
for root in &self.root {
let file = std::fs::File::open(root).context("failed to open root cert file")?;
let mut reader = std::io::BufReader::new(file);
let cert = rustls_pemfile::certs(&mut reader)
.next()
.context("no roots found")?
.context("failed to read root cert")?;
roots.add(cert).context("failed to add root cert")?;
}
}
let builder = rustls::ClientConfig::builder_with_provider(provider.clone())
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
.with_root_certificates(roots);
let mut tls = match &self.identity {
Some(path) => {
let pem = std::fs::read(path).context("failed to read client identity")?;
let chain: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut pem.as_slice())
.collect::<Result<_, _>>()
.context("failed to parse client identity certs")?;
anyhow::ensure!(!chain.is_empty(), "no certificates found in client identity");
let key = rustls_pemfile::private_key(&mut pem.as_slice())
.context("failed to parse client identity key")?
.context("no private key found in client identity")?;
builder
.with_client_auth_cert(chain, key)
.context("failed to configure client certificate")?
}
None => builder.with_no_client_auth(),
};
if self.disable_verify.unwrap_or_default() {
tracing::warn!("TLS server certificate verification is disabled; A man-in-the-middle attack is possible.");
let noop = NoCertificateVerification(provider);
tls.dangerous().set_certificate_verifier(Arc::new(noop));
}
Ok(tls)
}
pub fn identity_dns_name(&self) -> anyhow::Result<Option<String>> {
use rustls::pki_types::CertificateDer;
let Some(path) = self.identity.as_ref() else {
return Ok(None);
};
let pem = std::fs::read(path).context("failed to read client identity")?;
let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut pem.as_slice())
.collect::<Result<_, _>>()
.context("failed to parse client identity certs")?;
let leaf = certs.first().context("no certificates found")?;
let (_, cert) =
x509_parser::parse_x509_certificate(leaf.as_ref()).context("failed to parse identity certificate")?;
let san = cert
.subject_alternative_name()
.context("failed to read subject alternative name extension")?
.and_then(|san| {
san.value.general_names.iter().find_map(|name| match name {
x509_parser::extensions::GeneralName::DNSName(n) => Some((*n).to_string()),
_ => None,
})
});
Ok(san)
}
}
impl ClientConfig {
pub fn init(self) -> anyhow::Result<Client> {
Client::new(self)
}
pub fn versions(&self) -> moq_lite::Versions {
if self.version.is_empty() {
moq_lite::Versions::all()
} else {
moq_lite::Versions::from(self.version.clone())
}
}
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
bind: "[::]:0".parse().unwrap(),
backend: None,
max_streams: None,
version: Vec::new(),
tls: ClientTls::default(),
backoff: Backoff::default(),
#[cfg(feature = "websocket")]
websocket: super::ClientWebSocket::default(),
}
}
}
#[derive(Clone)]
pub struct Client {
moq: moq_lite::Client,
versions: moq_lite::Versions,
backoff: Backoff,
#[cfg(feature = "websocket")]
websocket: super::ClientWebSocket,
tls: rustls::ClientConfig,
#[cfg(feature = "noq")]
noq: Option<crate::noq::NoqClient>,
#[cfg(feature = "quinn")]
quinn: Option<crate::quinn::QuinnClient>,
#[cfg(feature = "quiche")]
quiche: Option<crate::quiche::QuicheClient>,
#[cfg(feature = "iroh")]
iroh: Option<web_transport_iroh::iroh::Endpoint>,
#[cfg(feature = "iroh")]
iroh_addrs: Vec<std::net::SocketAddr>,
}
impl Client {
#[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket")))]
pub fn new(_config: ClientConfig) -> anyhow::Result<Self> {
anyhow::bail!("no QUIC or WebSocket backend compiled; enable noq, quinn, quiche, or websocket feature");
}
#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "websocket"))]
pub fn new(config: ClientConfig) -> anyhow::Result<Self> {
#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche"))]
let backend = config.backend.clone().unwrap_or({
#[cfg(feature = "quinn")]
{
QuicBackend::Quinn
}
#[cfg(all(feature = "noq", not(feature = "quinn")))]
{
QuicBackend::Noq
}
#[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
{
QuicBackend::Quiche
}
#[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
});
let tls = config.tls.build()?;
#[cfg(feature = "noq")]
#[allow(unreachable_patterns)]
let noq = match backend {
QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
_ => None,
};
#[cfg(feature = "quinn")]
#[allow(unreachable_patterns)]
let quinn = match backend {
QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
_ => None,
};
#[cfg(feature = "quiche")]
let quiche = match backend {
QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
_ => None,
};
let versions = config.versions();
Ok(Self {
moq: moq_lite::Client::new().with_versions(versions.clone()),
versions,
backoff: config.backoff,
#[cfg(feature = "websocket")]
websocket: config.websocket,
tls,
#[cfg(feature = "noq")]
noq,
#[cfg(feature = "quinn")]
quinn,
#[cfg(feature = "quiche")]
quiche,
#[cfg(feature = "iroh")]
iroh: None,
#[cfg(feature = "iroh")]
iroh_addrs: Vec::new(),
})
}
#[cfg(feature = "iroh")]
pub fn with_iroh(mut self, iroh: Option<web_transport_iroh::iroh::Endpoint>) -> Self {
self.iroh = iroh;
self
}
#[cfg(feature = "iroh")]
pub fn with_iroh_addrs(mut self, addrs: Vec<std::net::SocketAddr>) -> Self {
self.iroh_addrs = addrs;
self
}
pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
self.moq = self.moq.with_publish(publish);
self
}
pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
self.moq = self.moq.with_consume(consume);
self
}
pub fn reconnect(&self, url: Url) -> Reconnect {
Reconnect::new(self.clone(), url, self.backoff.clone())
}
#[cfg(not(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
)))]
pub async fn connect(&self, _url: Url) -> anyhow::Result<moq_lite::Session> {
anyhow::bail!("no backend compiled; enable noq, quinn, quiche, iroh, or websocket feature");
}
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
))]
pub async fn connect(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
let session = self.connect_inner(url).await?;
tracing::info!(version = %session.version(), "connected");
Ok(session)
}
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket"
))]
async fn connect_inner(&self, url: Url) -> anyhow::Result<moq_lite::Session> {
#[cfg(feature = "iroh")]
if url.scheme() == "iroh" {
let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
let session = self.moq.connect(session).await?;
return Ok(session);
}
#[cfg(feature = "noq")]
if let Some(noq) = self.noq.as_ref() {
let tls = self.tls.clone();
let quic_url = url.clone();
let quic_handle = async {
let res = noq.connect(&tls, quic_url).await;
if let Err(err) = &res {
tracing::warn!(%err, "QUIC connection failed");
}
res
};
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
else => anyhow::bail!("failed to connect to server"),
});
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "quinn")]
if let Some(quinn) = self.quinn.as_ref() {
let tls = self.tls.clone();
let quic_url = url.clone();
let quic_handle = async {
let res = quinn.connect(&tls, quic_url).await;
if let Err(err) = &res {
tracing::warn!(%err, "QUIC connection failed");
}
res
};
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
else => anyhow::bail!("failed to connect to server"),
});
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "quiche")]
if let Some(quiche) = self.quiche.as_ref() {
let quic_url = url.clone();
let quic_handle = async {
let res = quiche.connect(quic_url).await;
if let Err(err) = &res {
tracing::warn!(%err, "QUIC connection failed");
}
res
};
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?,
else => anyhow::bail!("failed to connect to server"),
});
}
#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
}
}
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
return Ok(self.moq.connect(session).await?);
}
#[cfg(not(feature = "websocket"))]
anyhow::bail!("no QUIC backend matched; this should not happen");
}
}
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
#[derive(Debug)]
struct NoCertificateVerification(crypto::Provider);
impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp: &[u8],
_now: UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
rustls::crypto::verify_tls12_signature(message, cert, dss, &self.0.signature_verification_algorithms)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
rustls::crypto::verify_tls13_signature(message, cert, dss, &self.0.signature_verification_algorithms)
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes()
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[test]
fn test_toml_disable_verify_survives_update_from() {
let toml = r#"
tls.disable_verify = true
"#;
let mut config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(config.tls.disable_verify, Some(true));
config.update_from(["test"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_disable_verify_flag() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_disable_verify_explicit_false() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify=false"]);
assert_eq!(config.tls.disable_verify, Some(false));
}
#[test]
fn test_cli_disable_verify_explicit_true() {
let config = ClientConfig::parse_from(["test", "--tls-disable-verify=true"]);
assert_eq!(config.tls.disable_verify, Some(true));
}
#[test]
fn test_cli_no_disable_verify() {
let config = ClientConfig::parse_from(["test"]);
assert_eq!(config.tls.disable_verify, None);
}
#[test]
fn test_toml_version_survives_update_from() {
let toml = r#"
version = ["moq-lite-02"]
"#;
let mut config: ClientConfig = toml::from_str(toml).unwrap();
assert_eq!(
config.version,
vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
);
config.update_from(["test"]);
assert_eq!(
config.version,
vec!["moq-lite-02".parse::<moq_lite::Version>().unwrap()]
);
}
#[test]
fn test_cli_version() {
let config = ClientConfig::parse_from(["test", "--client-version", "moq-lite-03"]);
assert_eq!(
config.version,
vec!["moq-lite-03".parse::<moq_lite::Version>().unwrap()]
);
}
#[test]
fn test_cli_no_version_defaults_to_all() {
let config = ClientConfig::parse_from(["test"]);
assert!(config.version.is_empty());
assert_eq!(config.versions().alpns().len(), moq_lite::ALPNS.len());
}
}