Skip to main content

huddle_core/network/
server.rs

1//! Client connector to the centralized `huddle-server`.
2//!
3//! huddle's primary transport is libp2p (mDNS on the LAN, gossipsub
4//! across direct/relayed connections). This module adds a *second* path:
5//! a WebSocket to a single canonical server that the operator hosts. The
6//! server is reachable only as a **Tor v3 onion**, so `.onion` URLs are
7//! dialed through Tor's local SOCKS5 proxy; plain `ws://host:port` URLs
8//! (used in tests) are dialed directly.
9//!
10//! The server is a dumb ciphertext mover: we hand it the same opaque
11//! huddle wire bytes we would have published on a gossipsub topic,
12//! tagged with the cleartext `room` id, base64-encoded. It fans them out
13//! to the room's other members and queues them for offline ones. All
14//! encryption/authentication stays in the layers above — this module
15//! never inspects the payload.
16
17use std::sync::Arc;
18
19use base64::engine::general_purpose::STANDARD as B64;
20use base64::Engine;
21use futures::{SinkExt, StreamExt};
22use serde::{Deserialize, Serialize};
23use tokio::sync::mpsc;
24use tokio_tungstenite::tungstenite::Message as WsMessage;
25use tokio_tungstenite::WebSocketStream;
26use tracing::warn;
27
28use crate::error::{HuddleError, Result};
29use crate::identity::{relay_auth_msg, Identity};
30
31/// Messages we send to the server. Mirrors `huddle-server`'s `ClientMsg`.
32#[derive(Debug, Serialize)]
33#[serde(tag = "type", rename_all = "snake_case")]
34enum ClientMsg {
35    /// huddle 1.1.4: `Hello` now authenticates. It carries our Ed25519
36    /// pubkey and a signature over `relay_auth_msg(nonce)` for the nonce the
37    /// server sent in its opening `Challenge`. The relay verifies the
38    /// signature and that the pubkey hashes to `fingerprint` before it lets
39    /// us touch any mailbox.
40    Hello {
41        fingerprint: String,
42        pubkey_b64: String,
43        signature_b64: String,
44        rooms: Vec<String>,
45    },
46    Subscribe { room: String },
47    Unsubscribe { room: String },
48    Publish { room: String, id: String, payload_b64: String },
49    /// huddle 1.2: deliver straight to a recipient fingerprint (`to`),
50    /// independent of room membership. Used for 1:1 DMs and friend requests,
51    /// where we know exactly who the recipient is. `room` is the opaque tag
52    /// the recipient files it under (DM room id, or their inbox id). Mirrors
53    /// `huddle-server`'s `ClientMsg::SendDirect`.
54    SendDirect { to: String, room: String, id: String, payload_b64: String },
55    /// huddle 1.2.1: mint a short-lived connect code bound to our identity.
56    CreateConnectToken,
57    /// huddle 1.2.1: resolve a connect code → owner fingerprint + pubkey.
58    RedeemConnectToken { token: String },
59    Fetch,
60    Ping,
61}
62
63/// Messages the server sends back. Mirrors `huddle-server`'s `ServerMsg`.
64#[derive(Debug, Deserialize)]
65#[serde(tag = "type", rename_all = "snake_case")]
66enum ServerMsg {
67    /// huddle 1.1.4: the relay opens the connection with a random challenge
68    /// nonce. We sign it and answer with an authenticated `Hello`.
69    Challenge { nonce_b64: String },
70    // The server echoes our fingerprint on `ready`, but we already know
71    // our own identity, so we keep only the tag and let serde ignore the
72    // extra field.
73    Ready,
74    Message { room: String, id: String, payload_b64: String },
75    Sent { id: String, delivered: usize, queued: usize },
76    /// huddle 1.2.1: a freshly minted connect code + its lifetime in seconds.
77    ConnectToken { token: String, ttl_secs: u64 },
78    /// huddle 1.2.1: result of redeeming a connect code. `fingerprint`/`pubkey_b64`
79    /// are `None` when the code was unknown or expired. (The relay also echoes
80    /// the `token`, but we don't need it client-side — serde ignores it.)
81    ConnectTokenResolved {
82        #[serde(default)]
83        fingerprint: Option<String>,
84        #[serde(default)]
85        pubkey_b64: Option<String>,
86    },
87    Pong,
88    Error { message: String },
89}
90
91/// What the connector surfaces to the rest of huddle-core. The caller
92/// drives these into the same path that handles a received gossipsub
93/// message (decode → decrypt → `AppEvent`).
94#[derive(Debug, Clone)]
95pub enum ServerEvent {
96    /// Handshake complete; the mailbox (if any) will follow as `Message`s.
97    Ready,
98    /// Delivery receipt for one of our `publish` calls: how many of the
99    /// room's other members received it live vs. were queued because they
100    /// were offline. Lets the UI mark a message delivered/pending.
101    Sent { id: String, delivered: usize, queued: usize },
102    /// A room message delivered (live or from the offline mailbox).
103    Message { room: String, id: String, payload: Vec<u8> },
104    /// huddle 1.2.1: the relay minted a connect code for us (with its TTL).
105    ConnectToken { token: String, ttl_secs: u64 },
106    /// huddle 1.2.1: the relay resolved a connect code we redeemed.
107    /// `fingerprint`/`pubkey` are `None` when the code was unknown or expired.
108    ConnectTokenResolved {
109        fingerprint: Option<String>,
110        pubkey_b64: Option<String>,
111    },
112    /// The socket closed; the caller may choose to reconnect.
113    Disconnected,
114}
115
116/// A live connection to the server. Cloneable handle; cloning shares the
117/// same underlying socket.
118#[derive(Clone)]
119pub struct ServerClient {
120    out_tx: mpsc::UnboundedSender<ClientMsg>,
121}
122
123impl ServerClient {
124    /// Open a connection, send the initial `hello`, and return the client
125    /// plus a stream of [`ServerEvent`]s.
126    ///
127    /// - `url`: `ws://<onion>:80/ws` (onion), `wss://relay/ws` (clearnet TLS),
128    ///   or `ws://host:port/ws` (clearnet plain / tests).
129    /// - `dial`: how to physically reach it — one of the transport "doors"
130    ///   (`Socks5` for onion via Tor, `Tls` for `wss://`, `Direct` for `ws://`).
131    /// - `identity`: our identity, used to answer the relay's auth `Challenge`
132    ///   (huddle 1.1.4). The connector signs the challenge nonce and sends the
133    ///   pubkey + signature in `Hello`; the relay rejects us otherwise.
134    pub async fn connect(
135        url: &str,
136        dial: &crate::network::transport::DialMode,
137        identity: Arc<Identity>,
138        rooms: Vec<String>,
139    ) -> Result<(Self, mpsc::UnboundedReceiver<ServerEvent>)> {
140        use crate::network::transport::DialMode;
141        match dial {
142            DialMode::Socks5 { proxy } => {
143                let proxy: std::net::SocketAddr = proxy
144                    .parse()
145                    .map_err(|e| HuddleError::Network(format!("bad socks address: {e}")))?;
146                let target = host_port_from_ws_url(url)?;
147                let stream = tokio_socks::tcp::Socks5Stream::connect(proxy, target.as_str())
148                    .await
149                    .map_err(|e| HuddleError::Network(format!("tor socks connect: {e}")))?;
150                let (ws, _resp) = tokio_tungstenite::client_async(url, stream)
151                    .await
152                    .map_err(|e| HuddleError::Network(format!("ws handshake: {e}")))?;
153                Ok(Self::spawn(ws, identity, rooms))
154            }
155            // Plain `ws://` and `wss://` with the system trust store both go
156            // through `connect_async`, which negotiates TLS from the URL
157            // scheme (tokio-tungstenite's rustls-tls-native-roots feature).
158            DialMode::Direct | DialMode::Tls { pinned_cert_der: None } => {
159                let (ws, _resp) = tokio_tungstenite::connect_async(url)
160                    .await
161                    .map_err(|e| HuddleError::Network(format!("ws connect: {e}")))?;
162                Ok(Self::spawn(ws, identity, rooms))
163            }
164            // Self-signed cert pinning is structured but not wired in this
165            // build — the recommended clearnet-TLS path uses a real cert
166            // (Caddy / Let's Encrypt / Cloudflare), which the arm above
167            // handles. Onion doors remain available for stronger privacy.
168            DialMode::Tls {
169                pinned_cert_der: Some(_),
170            } => Err(HuddleError::Network(
171                "pinned-certificate wss is not supported in this build — use a real cert (Caddy/Let's Encrypt) or an onion door".into(),
172            )),
173            // huddle 1.0: in-process Tor via Arti. Bootstraps (once) an
174            // embedded Tor client and opens the stream to the onion through
175            // it, then speaks WebSocket over that stream — `spawn` is reused.
176            #[cfg(feature = "arti")]
177            DialMode::Arti { bridge } => {
178                let client =
179                    crate::network::transport::arti_client(bridge.as_deref()).await?;
180                let hp = host_port_from_ws_url(url)?;
181                let (host, port_s) = hp.rsplit_once(':').ok_or_else(|| {
182                    HuddleError::Network(format!("bad host:port from {url}"))
183                })?;
184                let port: u16 = port_s
185                    .parse()
186                    .map_err(|_| HuddleError::Network(format!("bad port in {url}")))?;
187                let stream = client
188                    .connect((host, port))
189                    .await
190                    .map_err(|e| HuddleError::Network(format!("arti connect: {e}")))?;
191                let (ws, _resp) = tokio_tungstenite::client_async(url, stream)
192                    .await
193                    .map_err(|e| HuddleError::Network(format!("ws handshake: {e}")))?;
194                Ok(Self::spawn(ws, identity, rooms))
195            }
196        }
197    }
198
199    /// Spawn the read/write pumps for an established socket. Generic over
200    /// the inner stream so the Tor-SOCKS and direct paths (different
201    /// stream types) share one implementation.
202    fn spawn<S>(
203        ws: WebSocketStream<S>,
204        identity: Arc<Identity>,
205        rooms: Vec<String>,
206    ) -> (Self, mpsc::UnboundedReceiver<ServerEvent>)
207    where
208        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
209    {
210        let (mut sink, mut stream) = ws.split();
211        let (out_tx, mut out_rx) = mpsc::unbounded_channel::<ClientMsg>();
212        let (ev_tx, ev_rx) = mpsc::unbounded_channel::<ServerEvent>();
213
214        // huddle 1.1.4: we do NOT send `Hello` up front anymore. The relay
215        // opens with a `Challenge`; the reader pump (below) signs that nonce
216        // and queues the authenticated `Hello`. Because the relay rejects
217        // anything sent before a valid `Hello`, the writer pump holds back
218        // any other outgoing frame (a publish/subscribe the app issues during
219        // the handshake window) until the `Hello` has actually gone out.
220        tokio::spawn(async move {
221            let mut hello_sent = false;
222            let mut pending: Vec<ClientMsg> = Vec::new();
223            while let Some(msg) = out_rx.recv().await {
224                let is_hello = matches!(msg, ClientMsg::Hello { .. });
225                if !hello_sent && !is_hello {
226                    pending.push(msg);
227                    continue;
228                }
229                let json = match serde_json::to_string(&msg) {
230                    Ok(j) => j,
231                    Err(_) => continue,
232                };
233                if sink.send(WsMessage::Text(json.into())).await.is_err() {
234                    return;
235                }
236                if is_hello {
237                    hello_sent = true;
238                    // Flush anything the app queued while we waited for the
239                    // challenge, preserving its order after the Hello.
240                    for m in pending.drain(..) {
241                        let json = match serde_json::to_string(&m) {
242                            Ok(j) => j,
243                            Err(_) => continue,
244                        };
245                        if sink.send(WsMessage::Text(json.into())).await.is_err() {
246                            return;
247                        }
248                    }
249                }
250            }
251            // When `out_rx` ends (every `ServerClient` handle dropped) close
252            // the socket so the server marks us offline and starts mailboxing.
253            let _ = sink.close().await;
254        });
255
256        // Reader pump: parse server messages into ServerEvents. On the opening
257        // `Challenge`, prove our identity by signing the nonce and sending the
258        // authenticated `Hello` through the writer.
259        // Held only long enough to send the one `Hello` in response to the
260        // challenge, then dropped. Crucially it must NOT outlive that: if the
261        // reader kept a permanent `out_tx` clone, dropping every public
262        // `ServerClient` handle would no longer end the writer's `out_rx`, the
263        // socket would never close, and the server would never mark us offline
264        // (breaking offline mailboxing). `Option::take()` releases it after use.
265        let mut hello_tx = Some(out_tx.clone());
266        tokio::spawn(async move {
267            while let Some(frame) = stream.next().await {
268                let frame = match frame {
269                    Ok(f) => f,
270                    Err(_) => break,
271                };
272                let text = match frame {
273                    WsMessage::Text(t) => t.as_str().to_string(),
274                    WsMessage::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
275                    WsMessage::Close(_) => break,
276                    _ => continue,
277                };
278                match serde_json::from_str::<ServerMsg>(&text) {
279                    Ok(ServerMsg::Challenge { nonce_b64 }) => {
280                        if let Some(tx) = hello_tx.take() {
281                            match B64.decode(nonce_b64.as_bytes()) {
282                                Ok(nonce) => {
283                                    let sig = identity.sign(&relay_auth_msg(&nonce));
284                                    let hello = ClientMsg::Hello {
285                                        fingerprint: identity.fingerprint().to_string(),
286                                        pubkey_b64: B64.encode(identity.public_bytes()),
287                                        signature_b64: B64.encode(sig),
288                                        rooms: rooms.clone(),
289                                    };
290                                    // If the writer is gone the connection is dead anyway.
291                                    let _ = tx.send(hello);
292                                }
293                                Err(e) => {
294                                    warn!(error = %e, "relay sent an undecodable challenge nonce");
295                                    break;
296                                }
297                            }
298                        }
299                        // `tx` dropped here — the reader no longer pins the
300                        // outgoing channel open.
301                    }
302                    Ok(ServerMsg::Ready) => {
303                        let _ = ev_tx.send(ServerEvent::Ready);
304                    }
305                    Ok(ServerMsg::Sent { id, delivered, queued }) => {
306                        let _ = ev_tx.send(ServerEvent::Sent { id, delivered, queued });
307                    }
308                    Ok(ServerMsg::ConnectToken { token, ttl_secs }) => {
309                        let _ = ev_tx.send(ServerEvent::ConnectToken { token, ttl_secs });
310                    }
311                    Ok(ServerMsg::ConnectTokenResolved { fingerprint, pubkey_b64 }) => {
312                        let _ = ev_tx.send(ServerEvent::ConnectTokenResolved {
313                            fingerprint,
314                            pubkey_b64,
315                        });
316                    }
317                    Ok(ServerMsg::Message { room, id, payload_b64 }) => {
318                        match B64.decode(payload_b64.as_bytes()) {
319                            Ok(payload) => {
320                                let _ = ev_tx.send(ServerEvent::Message { room, id, payload });
321                            }
322                            Err(e) => warn!(error = %e, "server sent undecodable payload"),
323                        }
324                    }
325                    Ok(ServerMsg::Error { message }) => warn!(%message, "huddle-server error"),
326                    Ok(ServerMsg::Pong) => {}
327                    Err(e) => warn!(error = %e, "unparseable server message"),
328                }
329            }
330            let _ = ev_tx.send(ServerEvent::Disconnected);
331        });
332
333        (Self { out_tx }, ev_rx)
334    }
335
336    /// Send a room's opaque wire bytes to the server for fan-out.
337    pub fn publish(&self, room: &str, id: &str, payload: &[u8]) -> Result<()> {
338        self.send(ClientMsg::Publish {
339            room: room.to_string(),
340            id: id.to_string(),
341            payload_b64: B64.encode(payload),
342        })
343    }
344
345    /// huddle 1.2: deliver `payload` straight to recipient `to`'s
346    /// fingerprint, independent of room membership (1:1 DMs, friend requests).
347    /// The server delivers it live to every connection `to` has open, or
348    /// queues it in their mailbox when they're offline. `room` is the opaque
349    /// tag the recipient files it under.
350    pub fn send_direct(&self, to: &str, room: &str, id: &str, payload: &[u8]) -> Result<()> {
351        self.send(ClientMsg::SendDirect {
352            to: to.to_string(),
353            room: room.to_string(),
354            id: id.to_string(),
355            payload_b64: B64.encode(payload),
356        })
357    }
358
359    /// Assert membership of a room so the server mailboxes us when offline.
360    pub fn subscribe(&self, room: &str) -> Result<()> {
361        self.send(ClientMsg::Subscribe { room: room.to_string() })
362    }
363
364    pub fn unsubscribe(&self, room: &str) -> Result<()> {
365        self.send(ClientMsg::Unsubscribe { room: room.to_string() })
366    }
367
368    /// huddle 1.2.1: ask the relay to mint a short-lived connect code bound to
369    /// our identity. The reply arrives as `ServerEvent::ConnectToken`.
370    pub fn create_connect_token(&self) -> Result<()> {
371        self.send(ClientMsg::CreateConnectToken)
372    }
373
374    /// huddle 1.2.1: ask the relay to resolve a connect code to its owner.
375    /// The reply arrives as `ServerEvent::ConnectTokenResolved`.
376    pub fn redeem_connect_token(&self, token: &str) -> Result<()> {
377        self.send(ClientMsg::RedeemConnectToken { token: token.to_string() })
378    }
379
380    /// Ask the server to re-drain our mailbox.
381    pub fn fetch(&self) -> Result<()> {
382        self.send(ClientMsg::Fetch)
383    }
384
385    pub fn ping(&self) -> Result<()> {
386        self.send(ClientMsg::Ping)
387    }
388
389    fn send(&self, msg: ClientMsg) -> Result<()> {
390        self.out_tx
391            .send(msg)
392            .map_err(|_| HuddleError::Network("server connection closed".to_string()))
393    }
394}
395
396/// Extract `host:port` from a `ws://`/`wss://` URL for the SOCKS target.
397/// Defaults to port 80 for `ws://` (matches the onion's `HiddenServicePort
398/// 80`) and 443 for `wss://` when no explicit port is given.
399fn host_port_from_ws_url(url: &str) -> Result<String> {
400    let (rest, default_port) = if let Some(r) = url.strip_prefix("wss://") {
401        (r, 443)
402    } else if let Some(r) = url.strip_prefix("ws://") {
403        (r, 80)
404    } else {
405        return Err(HuddleError::Network(format!("expected ws:// url, got {url}")));
406    };
407    let authority = rest.split('/').next().unwrap_or(rest);
408    if authority.is_empty() {
409        return Err(HuddleError::Network(format!("no host in url: {url}")));
410    }
411    if authority.contains(':') {
412        Ok(authority.to_string())
413    } else {
414        Ok(format!("{authority}:{default_port}"))
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::host_port_from_ws_url;
421
422    #[test]
423    fn parses_host_port() {
424        assert_eq!(host_port_from_ws_url("ws://abc.onion/ws").unwrap(), "abc.onion:80");
425        assert_eq!(
426            host_port_from_ws_url("ws://127.0.0.1:8787/ws").unwrap(),
427            "127.0.0.1:8787"
428        );
429        assert_eq!(host_port_from_ws_url("wss://h:443").unwrap(), "h:443");
430        // huddle 1.0: bare wss:// defaults to 443, not 80.
431        assert_eq!(host_port_from_ws_url("wss://relay.example/ws").unwrap(), "relay.example:443");
432        assert!(host_port_from_ws_url("http://x").is_err());
433    }
434}