Skip to main content

moq_native/
websocket.rs

1use anyhow::Context;
2use qmux::tokio_tungstenite;
3use std::collections::HashSet;
4use std::sync::{Arc, LazyLock, Mutex};
5use std::{net, time};
6use url::Url;
7
8// Track servers (hostname:port) where WebSocket won the race, so we won't give QUIC a headstart next time
9static WEBSOCKET_WON: LazyLock<Mutex<HashSet<(String, u16)>>> = LazyLock::new(|| Mutex::new(HashSet::new()));
10
11/// WebSocket configuration for the client.
12#[derive(Clone, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
13#[serde(default, deny_unknown_fields)]
14#[non_exhaustive]
15pub struct ClientWebSocket {
16	/// Whether to enable WebSocket support.
17	#[arg(
18		id = "websocket-enabled",
19		long = "websocket-enabled",
20		env = "MOQ_CLIENT_WEBSOCKET_ENABLED",
21		default_value = "true"
22	)]
23	pub enabled: bool,
24
25	/// Delay in milliseconds before attempting WebSocket fallback (default: 200)
26	/// If WebSocket won the previous race for a given server, this will be 0.
27	#[arg(
28		id = "websocket-delay",
29		long = "websocket-delay",
30		env = "MOQ_CLIENT_WEBSOCKET_DELAY",
31		default_value = "200ms",
32		value_parser = humantime::parse_duration,
33	)]
34	#[serde(with = "humantime_serde")]
35	#[serde(skip_serializing_if = "Option::is_none")]
36	pub delay: Option<time::Duration>,
37}
38
39impl Default for ClientWebSocket {
40	fn default() -> Self {
41		Self {
42			enabled: true,
43			delay: Some(time::Duration::from_millis(200)),
44		}
45	}
46}
47
48pub(crate) async fn race_handle(
49	config: &ClientWebSocket,
50	tls: &rustls::ClientConfig,
51	url: Url,
52	alpns: &[&str],
53) -> Option<anyhow::Result<qmux::Session>> {
54	if !config.enabled {
55		return None;
56	}
57
58	// Only attempt WebSocket for HTTP-based schemes.
59	// Custom protocols (moqt://, moql://) use raw QUIC and don't support WebSocket.
60	match url.scheme() {
61		"http" | "https" | "ws" | "wss" => {}
62		_ => return None,
63	}
64
65	let res = connect(config, tls, url, alpns).await;
66	if let Err(err) = &res {
67		tracing::warn!(%err, "WebSocket connection failed");
68	}
69	Some(res)
70}
71
72pub(crate) async fn connect(
73	config: &ClientWebSocket,
74	tls: &rustls::ClientConfig,
75	mut url: Url,
76	alpns: &[&str],
77) -> anyhow::Result<qmux::Session> {
78	anyhow::ensure!(config.enabled, "WebSocket support is disabled");
79
80	let host = url.host_str().context("missing hostname")?.to_string();
81	let port = url.port().unwrap_or_else(|| match url.scheme() {
82		"https" | "wss" | "moql" | "moqt" => 443,
83		"http" | "ws" => 80,
84		_ => 443,
85	});
86	let key = (host, port);
87
88	// Apply a small penalty to WebSocket to improve odds for QUIC to connect first,
89	// unless we've already had to fall back to WebSockets for this server.
90	// TODO if let chain
91	match config.delay {
92		Some(delay) if !WEBSOCKET_WON.lock().unwrap().contains(&key) => {
93			tokio::time::sleep(delay).await;
94			tracing::debug!(%url, delay_ms = %delay.as_millis(), "QUIC not yet connected, attempting WebSocket fallback");
95		}
96		_ => {}
97	}
98
99	// Convert URL scheme: http:// -> ws://, https:// -> wss://
100	// Custom protocols (moqt://, moql://) use raw QUIC and don't support WebSocket.
101	let needs_tls = match url.scheme() {
102		"http" => {
103			url.set_scheme("ws").expect("failed to set scheme");
104			false
105		}
106		"https" => {
107			url.set_scheme("wss").expect("failed to set scheme");
108			true
109		}
110		"ws" => false,
111		"wss" => true,
112		_ => anyhow::bail!("unsupported URL scheme for WebSocket: {}", url.scheme()),
113	};
114
115	tracing::debug!(%url, "connecting via WebSocket");
116
117	// Use the existing TLS config (which respects tls-disable-verify) for secure connections
118	let connector = if needs_tls {
119		tokio_tungstenite::Connector::Rustls(Arc::new(tls.clone()))
120	} else {
121		tokio_tungstenite::Connector::Plain
122	};
123
124	let session = qmux::Client::new()
125		.with_protocols(alpns)
126		.with_connector(connector)
127		.connect(url.as_str())
128		.await
129		.context("failed to connect WebSocket")?;
130
131	tracing::warn!(%url, "using WebSocket fallback");
132	WEBSOCKET_WON.lock().unwrap().insert(key);
133
134	Ok(session)
135}
136
137/// Listens for incoming WebSocket connections on a TCP port.
138///
139/// Use with [`crate::Server::with_websocket`] to accept WebSocket connections
140/// alongside QUIC connections on a separate port.
141pub struct WebSocketListener {
142	listener: tokio::net::TcpListener,
143	server: qmux::Server,
144}
145
146impl WebSocketListener {
147	pub async fn bind(addr: net::SocketAddr) -> anyhow::Result<Self> {
148		Self::bind_with_alpns(addr, moq_lite::ALPNS).await
149	}
150
151	pub async fn bind_with_alpns(addr: net::SocketAddr, alpns: &[&str]) -> anyhow::Result<Self> {
152		let listener = tokio::net::TcpListener::bind(addr).await?;
153		let server = qmux::Server::new().with_protocols(alpns);
154		Ok(Self { listener, server })
155	}
156
157	pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
158		Ok(self.listener.local_addr()?)
159	}
160
161	pub async fn accept(&self) -> Option<anyhow::Result<qmux::Session>> {
162		match self.listener.accept().await {
163			Ok((stream, addr)) => {
164				tracing::debug!(%addr, "accepted WebSocket TCP connection");
165				let server = self.server.clone();
166				Some(
167					server
168						.accept(stream)
169						.await
170						.map_err(|e| anyhow::anyhow!("WebSocket accept failed: {e}")),
171				)
172			}
173			Err(e) => Some(Err(e.into())),
174		}
175	}
176}