huddle_core/network/server.rs
1//! Client connector to the centralized `huddle-server`.
2//!
3//! huddle's primary transport is libp2p (mDNS on the LAN, gossipsub
4//! across direct/relayed connections). This module adds a *second* path:
5//! a WebSocket to a single canonical server that the operator hosts. The
6//! server is reachable only as a **Tor v3 onion**, so `.onion` URLs are
7//! dialed through Tor's local SOCKS5 proxy; plain `ws://host:port` URLs
8//! (used in tests) are dialed directly.
9//!
10//! The server is a dumb ciphertext mover: we hand it the same opaque
11//! huddle wire bytes we would have published on a gossipsub topic,
12//! tagged with the cleartext `room` id, base64-encoded. It fans them out
13//! to the room's other members and queues them for offline ones. All
14//! encryption/authentication stays in the layers above — this module
15//! never inspects the payload.
16
17use std::sync::Arc;
18
19use base64::engine::general_purpose::STANDARD as B64;
20use base64::Engine;
21use futures::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use tokio::sync::mpsc;
24use tokio_tungstenite::tungstenite::Message as WsMessage;
25use tokio_tungstenite::WebSocketStream;
26use tracing::warn;
27
28use crate::error::{HuddleError, Result};
29use crate::identity::{relay_auth_msg, Identity};
30
31/// Messages we send to the server. Mirrors `huddle-server`'s `ClientMsg`.
32#[derive(Debug, Serialize)]
33#[serde(tag = "type", rename_all = "snake_case")]
34enum ClientMsg {
35 /// huddle 1.1.4: `Hello` now authenticates. It carries our Ed25519
36 /// pubkey and a signature over `relay_auth_msg(nonce)` for the nonce the
37 /// server sent in its opening `Challenge`. The relay verifies the
38 /// signature and that the pubkey hashes to `fingerprint` before it lets
39 /// us touch any mailbox.
40 Hello {
41 fingerprint: String,
42 pubkey_b64: String,
43 signature_b64: String,
44 rooms: Vec<String>,
45 },
46 Subscribe { room: String },
47 Unsubscribe { room: String },
48 Publish { room: String, id: String, payload_b64: String },
49 /// huddle 1.2: deliver straight to a recipient fingerprint (`to`),
50 /// independent of room membership. Used for 1:1 DMs and friend requests,
51 /// where we know exactly who the recipient is. `room` is the opaque tag
52 /// the recipient files it under (DM room id, or their inbox id). Mirrors
53 /// `huddle-server`'s `ClientMsg::SendDirect`.
54 SendDirect { to: String, room: String, id: String, payload_b64: String },
55 Fetch,
56 Ping,
57}
58
59/// Messages the server sends back. Mirrors `huddle-server`'s `ServerMsg`.
60#[derive(Debug, Deserialize)]
61#[serde(tag = "type", rename_all = "snake_case")]
62enum ServerMsg {
63 /// huddle 1.1.4: the relay opens the connection with a random challenge
64 /// nonce. We sign it and answer with an authenticated `Hello`.
65 Challenge { nonce_b64: String },
66 // The server echoes our fingerprint on `ready`, but we already know
67 // our own identity, so we keep only the tag and let serde ignore the
68 // extra field.
69 Ready,
70 Message { room: String, id: String, payload_b64: String },
71 Sent { id: String, delivered: usize, queued: usize },
72 Pong,
73 Error { message: String },
74}
75
76/// What the connector surfaces to the rest of huddle-core. The caller
77/// drives these into the same path that handles a received gossipsub
78/// message (decode → decrypt → `AppEvent`).
79#[derive(Debug, Clone)]
80pub enum ServerEvent {
81 /// Handshake complete; the mailbox (if any) will follow as `Message`s.
82 Ready,
83 /// Delivery receipt for one of our `publish` calls: how many of the
84 /// room's other members received it live vs. were queued because they
85 /// were offline. Lets the UI mark a message delivered/pending.
86 Sent { id: String, delivered: usize, queued: usize },
87 /// A room message delivered (live or from the offline mailbox).
88 Message { room: String, id: String, payload: Vec<u8> },
89 /// The socket closed; the caller may choose to reconnect.
90 Disconnected,
91}
92
93/// A live connection to the server. Cloneable handle; cloning shares the
94/// same underlying socket.
95#[derive(Clone)]
96pub struct ServerClient {
97 out_tx: mpsc::UnboundedSender<ClientMsg>,
98}
99
100impl ServerClient {
101 /// Open a connection, send the initial `hello`, and return the client
102 /// plus a stream of [`ServerEvent`]s.
103 ///
104 /// - `url`: `ws://<onion>:80/ws` (onion), `wss://relay/ws` (clearnet TLS),
105 /// or `ws://host:port/ws` (clearnet plain / tests).
106 /// - `dial`: how to physically reach it — one of the transport "doors"
107 /// (`Socks5` for onion via Tor, `Tls` for `wss://`, `Direct` for `ws://`).
108 /// - `identity`: our identity, used to answer the relay's auth `Challenge`
109 /// (huddle 1.1.4). The connector signs the challenge nonce and sends the
110 /// pubkey + signature in `Hello`; the relay rejects us otherwise.
111 pub async fn connect(
112 url: &str,
113 dial: &crate::network::transport::DialMode,
114 identity: Arc<Identity>,
115 rooms: Vec<String>,
116 ) -> Result<(Self, mpsc::UnboundedReceiver<ServerEvent>)> {
117 use crate::network::transport::DialMode;
118 match dial {
119 DialMode::Socks5 { proxy } => {
120 let proxy: std::net::SocketAddr = proxy
121 .parse()
122 .map_err(|e| HuddleError::Network(format!("bad socks address: {e}")))?;
123 let target = host_port_from_ws_url(url)?;
124 let stream = tokio_socks::tcp::Socks5Stream::connect(proxy, target.as_str())
125 .await
126 .map_err(|e| HuddleError::Network(format!("tor socks connect: {e}")))?;
127 let (ws, _resp) = tokio_tungstenite::client_async(url, stream)
128 .await
129 .map_err(|e| HuddleError::Network(format!("ws handshake: {e}")))?;
130 Ok(Self::spawn(ws, identity, rooms))
131 }
132 // Plain `ws://` and `wss://` with the system trust store both go
133 // through `connect_async`, which negotiates TLS from the URL
134 // scheme (tokio-tungstenite's rustls-tls-native-roots feature).
135 DialMode::Direct | DialMode::Tls { pinned_cert_der: None } => {
136 let (ws, _resp) = tokio_tungstenite::connect_async(url)
137 .await
138 .map_err(|e| HuddleError::Network(format!("ws connect: {e}")))?;
139 Ok(Self::spawn(ws, identity, rooms))
140 }
141 // Self-signed cert pinning is structured but not wired in this
142 // build — the recommended clearnet-TLS path uses a real cert
143 // (Caddy / Let's Encrypt / Cloudflare), which the arm above
144 // handles. Onion doors remain available for stronger privacy.
145 DialMode::Tls {
146 pinned_cert_der: Some(_),
147 } => Err(HuddleError::Network(
148 "pinned-certificate wss is not supported in this build — use a real cert (Caddy/Let's Encrypt) or an onion door".into(),
149 )),
150 // huddle 1.0: in-process Tor via Arti. Bootstraps (once) an
151 // embedded Tor client and opens the stream to the onion through
152 // it, then speaks WebSocket over that stream — `spawn` is reused.
153 #[cfg(feature = "arti")]
154 DialMode::Arti { bridge } => {
155 let client =
156 crate::network::transport::arti_client(bridge.as_deref()).await?;
157 let hp = host_port_from_ws_url(url)?;
158 let (host, port_s) = hp.rsplit_once(':').ok_or_else(|| {
159 HuddleError::Network(format!("bad host:port from {url}"))
160 })?;
161 let port: u16 = port_s
162 .parse()
163 .map_err(|_| HuddleError::Network(format!("bad port in {url}")))?;
164 let stream = client
165 .connect((host, port))
166 .await
167 .map_err(|e| HuddleError::Network(format!("arti connect: {e}")))?;
168 let (ws, _resp) = tokio_tungstenite::client_async(url, stream)
169 .await
170 .map_err(|e| HuddleError::Network(format!("ws handshake: {e}")))?;
171 Ok(Self::spawn(ws, identity, rooms))
172 }
173 }
174 }
175
176 /// Spawn the read/write pumps for an established socket. Generic over
177 /// the inner stream so the Tor-SOCKS and direct paths (different
178 /// stream types) share one implementation.
179 fn spawn<S>(
180 ws: WebSocketStream<S>,
181 identity: Arc<Identity>,
182 rooms: Vec<String>,
183 ) -> (Self, mpsc::UnboundedReceiver<ServerEvent>)
184 where
185 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
186 {
187 let (mut sink, mut stream) = ws.split();
188 let (out_tx, mut out_rx) = mpsc::unbounded_channel::<ClientMsg>();
189 let (ev_tx, ev_rx) = mpsc::unbounded_channel::<ServerEvent>();
190
191 // huddle 1.1.4: we do NOT send `Hello` up front anymore. The relay
192 // opens with a `Challenge`; the reader pump (below) signs that nonce
193 // and queues the authenticated `Hello`. Because the relay rejects
194 // anything sent before a valid `Hello`, the writer pump holds back
195 // any other outgoing frame (a publish/subscribe the app issues during
196 // the handshake window) until the `Hello` has actually gone out.
197 tokio::spawn(async move {
198 let mut hello_sent = false;
199 let mut pending: Vec<ClientMsg> = Vec::new();
200 while let Some(msg) = out_rx.recv().await {
201 let is_hello = matches!(msg, ClientMsg::Hello { .. });
202 if !hello_sent && !is_hello {
203 pending.push(msg);
204 continue;
205 }
206 let json = match serde_json::to_string(&msg) {
207 Ok(j) => j,
208 Err(_) => continue,
209 };
210 if sink.send(WsMessage::Text(json.into())).await.is_err() {
211 return;
212 }
213 if is_hello {
214 hello_sent = true;
215 // Flush anything the app queued while we waited for the
216 // challenge, preserving its order after the Hello.
217 for m in pending.drain(..) {
218 let json = match serde_json::to_string(&m) {
219 Ok(j) => j,
220 Err(_) => continue,
221 };
222 if sink.send(WsMessage::Text(json.into())).await.is_err() {
223 return;
224 }
225 }
226 }
227 }
228 // When `out_rx` ends (every `ServerClient` handle dropped) close
229 // the socket so the server marks us offline and starts mailboxing.
230 let _ = sink.close().await;
231 });
232
233 // Reader pump: parse server messages into ServerEvents. On the opening
234 // `Challenge`, prove our identity by signing the nonce and sending the
235 // authenticated `Hello` through the writer.
236 // Held only long enough to send the one `Hello` in response to the
237 // challenge, then dropped. Crucially it must NOT outlive that: if the
238 // reader kept a permanent `out_tx` clone, dropping every public
239 // `ServerClient` handle would no longer end the writer's `out_rx`, the
240 // socket would never close, and the server would never mark us offline
241 // (breaking offline mailboxing). `Option::take()` releases it after use.
242 let mut hello_tx = Some(out_tx.clone());
243 tokio::spawn(async move {
244 while let Some(frame) = stream.next().await {
245 let frame = match frame {
246 Ok(f) => f,
247 Err(_) => break,
248 };
249 let text = match frame {
250 WsMessage::Text(t) => t.as_str().to_string(),
251 WsMessage::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
252 WsMessage::Close(_) => break,
253 _ => continue,
254 };
255 match serde_json::from_str::<ServerMsg>(&text) {
256 Ok(ServerMsg::Challenge { nonce_b64 }) => {
257 if let Some(tx) = hello_tx.take() {
258 match B64.decode(nonce_b64.as_bytes()) {
259 Ok(nonce) => {
260 let sig = identity.sign(&relay_auth_msg(&nonce));
261 let hello = ClientMsg::Hello {
262 fingerprint: identity.fingerprint().to_string(),
263 pubkey_b64: B64.encode(identity.public_bytes()),
264 signature_b64: B64.encode(sig),
265 rooms: rooms.clone(),
266 };
267 // If the writer is gone the connection is dead anyway.
268 let _ = tx.send(hello);
269 }
270 Err(e) => {
271 warn!(error = %e, "relay sent an undecodable challenge nonce");
272 break;
273 }
274 }
275 }
276 // `tx` dropped here — the reader no longer pins the
277 // outgoing channel open.
278 }
279 Ok(ServerMsg::Ready) => {
280 let _ = ev_tx.send(ServerEvent::Ready);
281 }
282 Ok(ServerMsg::Sent { id, delivered, queued }) => {
283 let _ = ev_tx.send(ServerEvent::Sent { id, delivered, queued });
284 }
285 Ok(ServerMsg::Message { room, id, payload_b64 }) => {
286 match B64.decode(payload_b64.as_bytes()) {
287 Ok(payload) => {
288 let _ = ev_tx.send(ServerEvent::Message { room, id, payload });
289 }
290 Err(e) => warn!(error = %e, "server sent undecodable payload"),
291 }
292 }
293 Ok(ServerMsg::Error { message }) => warn!(%message, "huddle-server error"),
294 Ok(ServerMsg::Pong) => {}
295 Err(e) => warn!(error = %e, "unparseable server message"),
296 }
297 }
298 let _ = ev_tx.send(ServerEvent::Disconnected);
299 });
300
301 (Self { out_tx }, ev_rx)
302 }
303
304 /// Send a room's opaque wire bytes to the server for fan-out.
305 pub fn publish(&self, room: &str, id: &str, payload: &[u8]) -> Result<()> {
306 self.send(ClientMsg::Publish {
307 room: room.to_string(),
308 id: id.to_string(),
309 payload_b64: B64.encode(payload),
310 })
311 }
312
313 /// huddle 1.2: deliver `payload` straight to recipient `to`'s
314 /// fingerprint, independent of room membership (1:1 DMs, friend requests).
315 /// The server delivers it live to every connection `to` has open, or
316 /// queues it in their mailbox when they're offline. `room` is the opaque
317 /// tag the recipient files it under.
318 pub fn send_direct(&self, to: &str, room: &str, id: &str, payload: &[u8]) -> Result<()> {
319 self.send(ClientMsg::SendDirect {
320 to: to.to_string(),
321 room: room.to_string(),
322 id: id.to_string(),
323 payload_b64: B64.encode(payload),
324 })
325 }
326
327 /// Assert membership of a room so the server mailboxes us when offline.
328 pub fn subscribe(&self, room: &str) -> Result<()> {
329 self.send(ClientMsg::Subscribe { room: room.to_string() })
330 }
331
332 pub fn unsubscribe(&self, room: &str) -> Result<()> {
333 self.send(ClientMsg::Unsubscribe { room: room.to_string() })
334 }
335
336 /// Ask the server to re-drain our mailbox.
337 pub fn fetch(&self) -> Result<()> {
338 self.send(ClientMsg::Fetch)
339 }
340
341 pub fn ping(&self) -> Result<()> {
342 self.send(ClientMsg::Ping)
343 }
344
345 fn send(&self, msg: ClientMsg) -> Result<()> {
346 self.out_tx
347 .send(msg)
348 .map_err(|_| HuddleError::Network("server connection closed".to_string()))
349 }
350}
351
352/// Extract `host:port` from a `ws://`/`wss://` URL for the SOCKS target.
353/// Defaults to port 80 for `ws://` (matches the onion's `HiddenServicePort
354/// 80`) and 443 for `wss://` when no explicit port is given.
355fn host_port_from_ws_url(url: &str) -> Result<String> {
356 let (rest, default_port) = if let Some(r) = url.strip_prefix("wss://") {
357 (r, 443)
358 } else if let Some(r) = url.strip_prefix("ws://") {
359 (r, 80)
360 } else {
361 return Err(HuddleError::Network(format!("expected ws:// url, got {url}")));
362 };
363 let authority = rest.split('/').next().unwrap_or(rest);
364 if authority.is_empty() {
365 return Err(HuddleError::Network(format!("no host in url: {url}")));
366 }
367 if authority.contains(':') {
368 Ok(authority.to_string())
369 } else {
370 Ok(format!("{authority}:{default_port}"))
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::host_port_from_ws_url;
377
378 #[test]
379 fn parses_host_port() {
380 assert_eq!(host_port_from_ws_url("ws://abc.onion/ws").unwrap(), "abc.onion:80");
381 assert_eq!(
382 host_port_from_ws_url("ws://127.0.0.1:8787/ws").unwrap(),
383 "127.0.0.1:8787"
384 );
385 assert_eq!(host_port_from_ws_url("wss://h:443").unwrap(), "h:443");
386 // huddle 1.0: bare wss:// defaults to 443, not 80.
387 assert_eq!(host_port_from_ws_url("wss://relay.example/ws").unwrap(), "relay.example:443");
388 assert!(host_port_from_ws_url("http://x").is_err());
389 }
390}