1use anyhow::Context;
2use qmux::tokio_tungstenite;
3use std::collections::HashSet;
4use std::sync::{Arc, LazyLock, Mutex};
5use std::{net, time};
6use url::Url;
7
8static WEBSOCKET_WON: LazyLock<Mutex<HashSet<(String, u16)>>> = LazyLock::new(|| Mutex::new(HashSet::new()));
10
11#[derive(Clone, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
13#[serde(default, deny_unknown_fields)]
14#[non_exhaustive]
15pub struct ClientWebSocket {
16 #[arg(
18 id = "websocket-enabled",
19 long = "websocket-enabled",
20 env = "MOQ_CLIENT_WEBSOCKET_ENABLED",
21 default_value = "true"
22 )]
23 pub enabled: bool,
24
25 #[arg(
28 id = "websocket-delay",
29 long = "websocket-delay",
30 env = "MOQ_CLIENT_WEBSOCKET_DELAY",
31 default_value = "200ms",
32 value_parser = humantime::parse_duration,
33 )]
34 #[serde(with = "humantime_serde")]
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub delay: Option<time::Duration>,
37}
38
39impl Default for ClientWebSocket {
40 fn default() -> Self {
41 Self {
42 enabled: true,
43 delay: Some(time::Duration::from_millis(200)),
44 }
45 }
46}
47
48pub(crate) async fn race_handle(
49 config: &ClientWebSocket,
50 tls: &rustls::ClientConfig,
51 url: Url,
52 alpns: &[&str],
53) -> Option<anyhow::Result<qmux::Session>> {
54 if !config.enabled {
55 return None;
56 }
57
58 match url.scheme() {
61 "http" | "https" | "ws" | "wss" => {}
62 _ => return None,
63 }
64
65 let res = connect(config, tls, url, alpns).await;
66 if let Err(err) = &res {
67 tracing::warn!(%err, "WebSocket connection failed");
68 }
69 Some(res)
70}
71
72pub(crate) async fn connect(
73 config: &ClientWebSocket,
74 tls: &rustls::ClientConfig,
75 mut url: Url,
76 alpns: &[&str],
77) -> anyhow::Result<qmux::Session> {
78 anyhow::ensure!(config.enabled, "WebSocket support is disabled");
79
80 let host = url.host_str().context("missing hostname")?.to_string();
81 let port = url.port().unwrap_or_else(|| match url.scheme() {
82 "https" | "wss" | "moql" | "moqt" => 443,
83 "http" | "ws" => 80,
84 _ => 443,
85 });
86 let key = (host, port);
87
88 match config.delay {
92 Some(delay) if !WEBSOCKET_WON.lock().unwrap().contains(&key) => {
93 tokio::time::sleep(delay).await;
94 tracing::debug!(%url, delay_ms = %delay.as_millis(), "QUIC not yet connected, attempting WebSocket fallback");
95 }
96 _ => {}
97 }
98
99 let needs_tls = match url.scheme() {
102 "http" => {
103 url.set_scheme("ws").expect("failed to set scheme");
104 false
105 }
106 "https" => {
107 url.set_scheme("wss").expect("failed to set scheme");
108 true
109 }
110 "ws" => false,
111 "wss" => true,
112 _ => anyhow::bail!("unsupported URL scheme for WebSocket: {}", url.scheme()),
113 };
114
115 tracing::debug!(%url, "connecting via WebSocket");
116
117 let connector = if needs_tls {
119 tokio_tungstenite::Connector::Rustls(Arc::new(tls.clone()))
120 } else {
121 tokio_tungstenite::Connector::Plain
122 };
123
124 let session = qmux::Client::new()
125 .with_protocols(alpns)
126 .with_connector(connector)
127 .connect(url.as_str())
128 .await
129 .context("failed to connect WebSocket")?;
130
131 tracing::warn!(%url, "using WebSocket fallback");
132 WEBSOCKET_WON.lock().unwrap().insert(key);
133
134 Ok(session)
135}
136
137pub struct WebSocketListener {
142 listener: tokio::net::TcpListener,
143 server: qmux::Server,
144}
145
146impl WebSocketListener {
147 pub async fn bind(addr: net::SocketAddr) -> anyhow::Result<Self> {
148 Self::bind_with_alpns(addr, moq_lite::ALPNS).await
149 }
150
151 pub async fn bind_with_alpns(addr: net::SocketAddr, alpns: &[&str]) -> anyhow::Result<Self> {
152 let listener = tokio::net::TcpListener::bind(addr).await?;
153 let server = qmux::Server::new().with_protocols(alpns);
154 Ok(Self { listener, server })
155 }
156
157 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
158 Ok(self.listener.local_addr()?)
159 }
160
161 pub async fn accept(&self) -> Option<anyhow::Result<qmux::Session>> {
162 match self.listener.accept().await {
163 Ok((stream, addr)) => {
164 tracing::debug!(%addr, "accepted WebSocket TCP connection");
165 let server = self.server.clone();
166 Some(
167 server
168 .accept(stream)
169 .await
170 .map_err(|e| anyhow::anyhow!("WebSocket accept failed: {e}")),
171 )
172 }
173 Err(e) => Some(Err(e.into())),
174 }
175 }
176}