Skip to main content

huddle_core/network/
server.rs

1//! Client connector to the centralized `huddle-server`.
2//!
3//! huddle's primary transport is libp2p (mDNS on the LAN, gossipsub
4//! across direct/relayed connections). This module adds a *second* path:
5//! a WebSocket to a single canonical server that the operator hosts. The
6//! server is reachable only as a **Tor v3 onion**, so `.onion` URLs are
7//! dialed through Tor's local SOCKS5 proxy; plain `ws://host:port` URLs
8//! (used in tests) are dialed directly.
9//!
10//! The server is a dumb ciphertext mover: we hand it the same opaque
11//! huddle wire bytes we would have published on a gossipsub topic,
12//! tagged with the cleartext `room` id, base64-encoded. It fans them out
13//! to the room's other members and queues them for offline ones. All
14//! encryption/authentication stays in the layers above — this module
15//! never inspects the payload.
16
17use base64::engine::general_purpose::STANDARD as B64;
18use base64::Engine;
19use futures::{SinkExt, StreamExt};
20use serde::{Deserialize, Serialize};
21use tokio::sync::mpsc;
22use tokio_tungstenite::tungstenite::Message as WsMessage;
23use tokio_tungstenite::WebSocketStream;
24use tracing::warn;
25
26use crate::error::{HuddleError, Result};
27
28/// Messages we send to the server. Mirrors `huddle-server`'s `ClientMsg`.
29#[derive(Debug, Serialize)]
30#[serde(tag = "type", rename_all = "snake_case")]
31enum ClientMsg {
32    Hello { fingerprint: String, rooms: Vec<String> },
33    Subscribe { room: String },
34    Unsubscribe { room: String },
35    Publish { room: String, id: String, payload_b64: String },
36    Fetch,
37    Ping,
38}
39
40/// Messages the server sends back. Mirrors `huddle-server`'s `ServerMsg`.
41#[derive(Debug, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43enum ServerMsg {
44    // The server echoes our fingerprint on `ready`, but we already know
45    // our own identity, so we keep only the tag and let serde ignore the
46    // extra field.
47    Ready,
48    Message { room: String, id: String, payload_b64: String },
49    Sent { id: String, delivered: usize, queued: usize },
50    Pong,
51    Error { message: String },
52}
53
54/// What the connector surfaces to the rest of huddle-core. The caller
55/// drives these into the same path that handles a received gossipsub
56/// message (decode → decrypt → `AppEvent`).
57#[derive(Debug, Clone)]
58pub enum ServerEvent {
59    /// Handshake complete; the mailbox (if any) will follow as `Message`s.
60    Ready,
61    /// Delivery receipt for one of our `publish` calls: how many of the
62    /// room's other members received it live vs. were queued because they
63    /// were offline. Lets the UI mark a message delivered/pending.
64    Sent { id: String, delivered: usize, queued: usize },
65    /// A room message delivered (live or from the offline mailbox).
66    Message { room: String, id: String, payload: Vec<u8> },
67    /// The socket closed; the caller may choose to reconnect.
68    Disconnected,
69}
70
71/// A live connection to the server. Cloneable handle; cloning shares the
72/// same underlying socket.
73#[derive(Clone)]
74pub struct ServerClient {
75    out_tx: mpsc::UnboundedSender<ClientMsg>,
76}
77
78impl ServerClient {
79    /// Open a connection, send the initial `hello`, and return the client
80    /// plus a stream of [`ServerEvent`]s.
81    ///
82    /// - `url`: `ws://<onion>:80/ws` (onion), `wss://relay/ws` (clearnet TLS),
83    ///   or `ws://host:port/ws` (clearnet plain / tests).
84    /// - `dial`: how to physically reach it — one of the transport "doors"
85    ///   (`Socks5` for onion via Tor, `Tls` for `wss://`, `Direct` for `ws://`).
86    pub async fn connect(
87        url: &str,
88        dial: &crate::network::transport::DialMode,
89        fingerprint: String,
90        rooms: Vec<String>,
91    ) -> Result<(Self, mpsc::UnboundedReceiver<ServerEvent>)> {
92        use crate::network::transport::DialMode;
93        match dial {
94            DialMode::Socks5 { proxy } => {
95                let proxy: std::net::SocketAddr = proxy
96                    .parse()
97                    .map_err(|e| HuddleError::Network(format!("bad socks address: {e}")))?;
98                let target = host_port_from_ws_url(url)?;
99                let stream = tokio_socks::tcp::Socks5Stream::connect(proxy, target.as_str())
100                    .await
101                    .map_err(|e| HuddleError::Network(format!("tor socks connect: {e}")))?;
102                let (ws, _resp) = tokio_tungstenite::client_async(url, stream)
103                    .await
104                    .map_err(|e| HuddleError::Network(format!("ws handshake: {e}")))?;
105                Ok(Self::spawn(ws, fingerprint, rooms))
106            }
107            // Plain `ws://` and `wss://` with the system trust store both go
108            // through `connect_async`, which negotiates TLS from the URL
109            // scheme (tokio-tungstenite's rustls-tls-native-roots feature).
110            DialMode::Direct | DialMode::Tls { pinned_cert_der: None } => {
111                let (ws, _resp) = tokio_tungstenite::connect_async(url)
112                    .await
113                    .map_err(|e| HuddleError::Network(format!("ws connect: {e}")))?;
114                Ok(Self::spawn(ws, fingerprint, rooms))
115            }
116            // Self-signed cert pinning is structured but not wired in this
117            // build — the recommended clearnet-TLS path uses a real cert
118            // (Caddy / Let's Encrypt / Cloudflare), which the arm above
119            // handles. Onion doors remain available for stronger privacy.
120            DialMode::Tls {
121                pinned_cert_der: Some(_),
122            } => Err(HuddleError::Network(
123                "pinned-certificate wss is not supported in this build — use a real cert (Caddy/Let's Encrypt) or an onion door".into(),
124            )),
125            // huddle 1.0: in-process Tor via Arti. Bootstraps (once) an
126            // embedded Tor client and opens the stream to the onion through
127            // it, then speaks WebSocket over that stream — `spawn` is reused.
128            #[cfg(feature = "arti")]
129            DialMode::Arti { bridge } => {
130                let client =
131                    crate::network::transport::arti_client(bridge.as_deref()).await?;
132                let hp = host_port_from_ws_url(url)?;
133                let (host, port_s) = hp.rsplit_once(':').ok_or_else(|| {
134                    HuddleError::Network(format!("bad host:port from {url}"))
135                })?;
136                let port: u16 = port_s
137                    .parse()
138                    .map_err(|_| HuddleError::Network(format!("bad port in {url}")))?;
139                let stream = client
140                    .connect((host, port))
141                    .await
142                    .map_err(|e| HuddleError::Network(format!("arti connect: {e}")))?;
143                let (ws, _resp) = tokio_tungstenite::client_async(url, stream)
144                    .await
145                    .map_err(|e| HuddleError::Network(format!("ws handshake: {e}")))?;
146                Ok(Self::spawn(ws, fingerprint, rooms))
147            }
148        }
149    }
150
151    /// Spawn the read/write pumps for an established socket. Generic over
152    /// the inner stream so the Tor-SOCKS and direct paths (different
153    /// stream types) share one implementation.
154    fn spawn<S>(
155        ws: WebSocketStream<S>,
156        fingerprint: String,
157        rooms: Vec<String>,
158    ) -> (Self, mpsc::UnboundedReceiver<ServerEvent>)
159    where
160        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
161    {
162        let (mut sink, mut stream) = ws.split();
163        let (out_tx, mut out_rx) = mpsc::unbounded_channel::<ClientMsg>();
164        let (ev_tx, ev_rx) = mpsc::unbounded_channel::<ServerEvent>();
165
166        // Announce ourselves first; the server replies `ready` and then
167        // flushes any queued mailbox messages.
168        let _ = out_tx.send(ClientMsg::Hello { fingerprint, rooms });
169
170        // Writer pump: serialize outgoing messages onto the socket. When
171        // `out_rx` ends (every `ServerClient` handle was dropped) we must
172        // actively close the socket — otherwise the reader task below keeps
173        // the connection alive and the server never sees us disconnect (so
174        // it never marks us offline / starts mailboxing). Closing here sends
175        // a WebSocket Close frame; the server then tears down and our reader
176        // observes the end of stream.
177        tokio::spawn(async move {
178            while let Some(msg) = out_rx.recv().await {
179                let json = match serde_json::to_string(&msg) {
180                    Ok(j) => j,
181                    Err(_) => continue,
182                };
183                if sink.send(WsMessage::Text(json.into())).await.is_err() {
184                    return;
185                }
186            }
187            let _ = sink.close().await;
188        });
189
190        // Reader pump: parse server messages into ServerEvents.
191        tokio::spawn(async move {
192            while let Some(frame) = stream.next().await {
193                let frame = match frame {
194                    Ok(f) => f,
195                    Err(_) => break,
196                };
197                let text = match frame {
198                    WsMessage::Text(t) => t.as_str().to_string(),
199                    WsMessage::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
200                    WsMessage::Close(_) => break,
201                    _ => continue,
202                };
203                match serde_json::from_str::<ServerMsg>(&text) {
204                    Ok(ServerMsg::Ready) => {
205                        let _ = ev_tx.send(ServerEvent::Ready);
206                    }
207                    Ok(ServerMsg::Sent { id, delivered, queued }) => {
208                        let _ = ev_tx.send(ServerEvent::Sent { id, delivered, queued });
209                    }
210                    Ok(ServerMsg::Message { room, id, payload_b64 }) => {
211                        match B64.decode(payload_b64.as_bytes()) {
212                            Ok(payload) => {
213                                let _ = ev_tx.send(ServerEvent::Message { room, id, payload });
214                            }
215                            Err(e) => warn!(error = %e, "server sent undecodable payload"),
216                        }
217                    }
218                    Ok(ServerMsg::Error { message }) => warn!(%message, "huddle-server error"),
219                    Ok(ServerMsg::Pong) => {}
220                    Err(e) => warn!(error = %e, "unparseable server message"),
221                }
222            }
223            let _ = ev_tx.send(ServerEvent::Disconnected);
224        });
225
226        (Self { out_tx }, ev_rx)
227    }
228
229    /// Send a room's opaque wire bytes to the server for fan-out.
230    pub fn publish(&self, room: &str, id: &str, payload: &[u8]) -> Result<()> {
231        self.send(ClientMsg::Publish {
232            room: room.to_string(),
233            id: id.to_string(),
234            payload_b64: B64.encode(payload),
235        })
236    }
237
238    /// Assert membership of a room so the server mailboxes us when offline.
239    pub fn subscribe(&self, room: &str) -> Result<()> {
240        self.send(ClientMsg::Subscribe { room: room.to_string() })
241    }
242
243    pub fn unsubscribe(&self, room: &str) -> Result<()> {
244        self.send(ClientMsg::Unsubscribe { room: room.to_string() })
245    }
246
247    /// Ask the server to re-drain our mailbox.
248    pub fn fetch(&self) -> Result<()> {
249        self.send(ClientMsg::Fetch)
250    }
251
252    pub fn ping(&self) -> Result<()> {
253        self.send(ClientMsg::Ping)
254    }
255
256    fn send(&self, msg: ClientMsg) -> Result<()> {
257        self.out_tx
258            .send(msg)
259            .map_err(|_| HuddleError::Network("server connection closed".to_string()))
260    }
261}
262
263/// Extract `host:port` from a `ws://`/`wss://` URL for the SOCKS target.
264/// Defaults to port 80 for `ws://` (matches the onion's `HiddenServicePort
265/// 80`) and 443 for `wss://` when no explicit port is given.
266fn host_port_from_ws_url(url: &str) -> Result<String> {
267    let (rest, default_port) = if let Some(r) = url.strip_prefix("wss://") {
268        (r, 443)
269    } else if let Some(r) = url.strip_prefix("ws://") {
270        (r, 80)
271    } else {
272        return Err(HuddleError::Network(format!("expected ws:// url, got {url}")));
273    };
274    let authority = rest.split('/').next().unwrap_or(rest);
275    if authority.is_empty() {
276        return Err(HuddleError::Network(format!("no host in url: {url}")));
277    }
278    if authority.contains(':') {
279        Ok(authority.to_string())
280    } else {
281        Ok(format!("{authority}:{default_port}"))
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::host_port_from_ws_url;
288
289    #[test]
290    fn parses_host_port() {
291        assert_eq!(host_port_from_ws_url("ws://abc.onion/ws").unwrap(), "abc.onion:80");
292        assert_eq!(
293            host_port_from_ws_url("ws://127.0.0.1:8787/ws").unwrap(),
294            "127.0.0.1:8787"
295        );
296        assert_eq!(host_port_from_ws_url("wss://h:443").unwrap(), "h:443");
297        // huddle 1.0: bare wss:// defaults to 443, not 80.
298        assert_eq!(host_port_from_ws_url("wss://relay.example/ws").unwrap(), "relay.example:443");
299        assert!(host_port_from_ws_url("http://x").is_err());
300    }
301}