Skip to main content

arcly_http/realtime/
connection.rs

1//! Lock-free connection & broadcasting registry.
2//!
3//! ## Topology
4//!
5//! The hot path for real-time servers is *broadcast*: one inbound event fans
6//! out to thousands of sockets. A naive `Mutex<HashMap<Id, Sender>>` serialises
7//! every send behind one lock — the classic C10K/C100K bottleneck.
8//!
9//! `arcly-http` instead uses [`dashmap::DashMap`], which shards the key space
10//! across N independent locks (N = `4 * num_cpus` by default). Two broadcasts
11//! touching different shards never contend. Each connection owns a **bounded**
12//! `mpsc::Sender`; sending is a wait-free `try_send` that never blocks the
13//! frame-processing task. The per-socket writer drains its own receiver, and a
14//! client that can't keep up is **evicted** when its queue fills — a slow
15//! consumer costs at most `ws_outbound_buffer` frames of memory, never an
16//! unbounded buffer.
17//!
18//! ## Encapsulation
19//!
20//! No `axum`/`tower` types appear here. The registry speaks [`WsMessage`], an
21//! arcly-owned enum. The websocket boundary in [`super::ws`] is the single
22//! place that translates `WsMessage` ↔ `axum::extract::ws::Message`.
23
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26
27use dashmap::{DashMap, DashSet};
28use serde::Serialize;
29use tokio::sync::mpsc;
30
31use crate::web::context::Claims;
32
33/// Monotonic, process-unique connection identifier.
34pub type ConnId = u64;
35
36/// An arcly-owned outbound frame. The websocket boundary maps this onto the
37/// concrete transport frame type — keeping `axum` out of the public surface.
38///
39/// `Text` holds `Arc<str>` so broadcast can enqueue N pointer copies from a
40/// single allocation rather than cloning the string body N times.
41#[derive(Clone, Debug)]
42pub enum WsMessage {
43    /// UTF-8 text frame (our event envelope is always JSON text).
44    Text(Arc<str>),
45    /// Server-initiated keep-alive ping (liveness probe through NATs/proxies).
46    Ping,
47    /// Graceful close.
48    Close,
49}
50
51/// Per-connection registry row. `tx` is the head of that socket's **bounded**
52/// outbound queue; `member_rooms` tracks which rooms to evict the id from on
53/// disconnect; `last_seen` (unix secs) is touched by the reader on every
54/// inbound frame and consulted by the idle sweeper.
55struct ConnEntry {
56    tx: mpsc::Sender<WsMessage>,
57    member_rooms: DashSet<String>,
58    last_seen: AtomicU64,
59    #[allow(dead_code)]
60    claims: Option<Arc<Claims>>,
61}
62
63#[inline]
64fn unix_now() -> u64 {
65    std::time::SystemTime::now()
66        .duration_since(std::time::UNIX_EPOCH)
67        .map(|d| d.as_secs())
68        .unwrap_or(0)
69}
70
71/// Sharded, lock-free-on-the-hot-path connection registry.
72///
73/// One instance is created at launch, leaked to `&'static`, and shared by every
74/// gateway route. All mutating operations are `O(1)` amortised and touch only a
75/// single shard; `broadcast` is `O(connections)` with no global lock.
76pub struct ConnectionRegistry {
77    next_id: AtomicU64,
78    conns: DashMap<ConnId, ConnEntry>,
79    rooms: DashMap<String, DashSet<ConnId>>,
80}
81
82impl ConnectionRegistry {
83    pub fn new() -> Self {
84        Self {
85            next_id: AtomicU64::new(1),
86            conns: DashMap::new(),
87            rooms: DashMap::new(),
88        }
89    }
90
91    /// Register a freshly-upgraded socket. Returns its assigned [`ConnId`].
92    ///
93    /// `tx` must be the sending half of a **bounded** channel — the queue
94    /// depth is the slow-client memory ceiling (`LaunchConfig::ws_outbound_buffer`).
95    pub fn register(&self, tx: mpsc::Sender<WsMessage>, claims: Option<Arc<Claims>>) -> ConnId {
96        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
97        self.conns.insert(
98            id,
99            ConnEntry {
100                tx,
101                member_rooms: DashSet::new(),
102                last_seen: AtomicU64::new(unix_now()),
103                claims,
104            },
105        );
106        metrics::gauge!("ws_connections").increment(1.0);
107        id
108    }
109
110    /// Record inbound activity for the idle sweeper. Called by the reader on
111    /// every frame (including pongs); one relaxed store.
112    #[inline]
113    pub fn touch(&self, id: ConnId) {
114        if let Some(entry) = self.conns.get(&id) {
115            entry.last_seen.store(unix_now(), Ordering::Relaxed);
116        }
117    }
118
119    /// Enqueue to one entry; on a **full queue** the client is too slow to
120    /// keep up — evict it (drop the entry; the writer drains the backlog,
121    /// sees the closed channel, and tears the socket down). Unbounded growth
122    /// in server memory is never an option.
123    ///
124    /// Returns `false` when the entry was evicted (caller loops may skip it).
125    fn enqueue_or_evict(&self, id: ConnId, entry: &ConnEntry, msg: WsMessage) -> bool {
126        match entry.tx.try_send(msg) {
127            Ok(()) => {
128                metrics::counter!("ws_messages_out_total").increment(1);
129                true
130            }
131            Err(mpsc::error::TrySendError::Full(_)) => {
132                metrics::counter!("ws_slow_client_evictions_total").increment(1);
133                tracing::warn!(conn = id, "WS outbound queue full — evicting slow client");
134                false
135            }
136            // Receiver already gone — reader will unregister shortly.
137            Err(mpsc::error::TrySendError::Closed(_)) => true,
138        }
139    }
140
141    /// Send a Close frame to **every** live socket. Used by graceful
142    /// shutdown: WS connections otherwise keep axum's drain phase open
143    /// indefinitely (until clients disconnect or the supervisor SIGKILLs),
144    /// which would mean plugin `on_shutdown` hooks never run in production.
145    /// Each socket's writer task sends the frame and exits; readers follow.
146    pub fn close_all(&self) -> usize {
147        let mut n = 0;
148        for entry in self.conns.iter() {
149            // try_send: a full queue can't take the Close, but eviction by
150            // the next broadcast — or the drain deadline — handles it.
151            let _ = entry.value().tx.try_send(WsMessage::Close);
152            n += 1;
153        }
154        n
155    }
156
157    /// Close connections with no inbound activity for `max_idle_secs`.
158    /// Dead TCP links (NAT timeouts, vanished mobile clients) never send a
159    /// Close frame — without reaping they linger in the registry forever,
160    /// eating queue memory on every broadcast. Returns the ids reaped.
161    ///
162    /// Pair with a server ping (`ws_ping_interval`): pings provoke pongs,
163    /// pongs touch `last_seen`, so only truly dead links exceed the window.
164    pub fn sweep_idle(&self, max_idle_secs: u64) -> Vec<ConnId> {
165        let now = unix_now();
166        let stale: Vec<ConnId> = self
167            .conns
168            .iter()
169            .filter(|e| {
170                now.saturating_sub(e.value().last_seen.load(Ordering::Relaxed)) > max_idle_secs
171            })
172            .map(|e| *e.key())
173            .collect();
174        for id in &stale {
175            metrics::counter!("ws_idle_reaped_total").increment(1);
176            if let Some(entry) = self.conns.get(id) {
177                let _ = entry.tx.try_send(WsMessage::Close);
178            }
179            // If the queue was full the Close never lands — force the writer
180            // down by dropping the entry (channel closes, writer exits).
181            self.unregister(*id);
182        }
183        stale
184    }
185
186    /// Remove a socket and evict it from every room it had joined.
187    pub fn unregister(&self, id: ConnId) {
188        if let Some((_, entry)) = self.conns.remove(&id) {
189            metrics::gauge!("ws_connections").decrement(1.0);
190            for room in entry.member_rooms.iter() {
191                let key = room.key();
192                if let Some(set) = self.rooms.get(key) {
193                    set.remove(&id);
194                    // Prune the room key when empty to prevent unbounded growth.
195                    // Drop the Ref before acquiring the write lock inside remove_if.
196                    if set.is_empty() {
197                        drop(set);
198                        self.rooms.remove_if(key, |_, s| s.is_empty());
199                    }
200                }
201            }
202        }
203    }
204
205    /// Number of live connections. O(1).
206    #[inline]
207    pub fn connection_count(&self) -> usize {
208        self.conns.len()
209    }
210
211    /// Add a connection to a room (Socket.IO-style logical channel).
212    ///
213    /// ## TOCTOU safety
214    ///
215    /// Both `member_rooms` and `rooms` are updated while holding the DashMap
216    /// shard read-lock on `conns` (the `Ref` keeps the shard locked until it is
217    /// dropped). This prevents a concurrent `unregister()` — which needs the
218    /// shard *write*-lock — from removing the entry between the two writes,
219    /// which would otherwise leave a stale `ConnId` in `rooms` forever.
220    pub fn join_room(&self, id: ConnId, room: String) {
221        if let Some(entry) = self.conns.get(&id) {
222            // member_rooms updated while shard read-lock is held.
223            entry.member_rooms.insert(room.clone());
224            // rooms secondary index updated while same lock is still live.
225            self.rooms.entry(room).or_default().insert(id);
226            // Ref dropped here → shard lock released.
227        }
228        // If the conn was not found the socket is already disconnected; no-op.
229    }
230
231    /// Remove a connection from a room.
232    ///
233    /// Mirrors `join_room`'s lock order: hold the `conns` shard read-lock first
234    /// so a concurrent `unregister()` (which needs the write-lock) cannot remove
235    /// the entry between the two writes and leave a stale id in `rooms`.
236    pub fn leave_room(&self, id: ConnId, room: &str) {
237        if let Some(entry) = self.conns.get(&id) {
238            entry.member_rooms.remove(room);
239            if let Some(set) = self.rooms.get(room) {
240                set.remove(&id);
241                if set.is_empty() {
242                    drop(set);
243                    self.rooms.remove_if(room, |_, s| s.is_empty());
244                }
245            }
246        }
247    }
248
249    /// Enqueue a text frame to one connection. Non-blocking; a full queue
250    /// evicts the slow client, a dead receiver is dropped silently.
251    #[inline]
252    pub fn send_text(&self, id: ConnId, text: Arc<str>) {
253        let evict = match self.conns.get(&id) {
254            Some(entry) => !self.enqueue_or_evict(id, &entry, WsMessage::Text(text)),
255            None => false,
256        }; // Ref dropped before any removal — same-shard deadlock safety.
257        if evict {
258            self.unregister(id);
259        }
260    }
261
262    /// Fan a text frame out to every live connection. Creates one `Arc<str>`
263    /// allocation then enqueues N pointer copies — no per-connection memcpy.
264    /// Clients whose queues are full are evicted **after** the sweep
265    /// (removal inside iteration would deadlock the DashMap shard).
266    pub fn broadcast_text(&self, text: &str) {
267        let arc: Arc<str> = Arc::from(text);
268        let mut evict: Vec<ConnId> = Vec::new();
269        for entry in self.conns.iter() {
270            if !self.enqueue_or_evict(
271                *entry.key(),
272                entry.value(),
273                WsMessage::Text(Arc::clone(&arc)),
274            ) {
275                evict.push(*entry.key());
276            }
277        }
278        for id in evict {
279            self.unregister(id);
280        }
281    }
282
283    /// Fan a text frame out to the members of one room.
284    pub fn broadcast_room_text(&self, room: &str, text: &str) {
285        let Some(members) = self.rooms.get(room) else {
286            return;
287        };
288        let arc: Arc<str> = Arc::from(text);
289        let mut evict: Vec<ConnId> = Vec::new();
290        for id in members.iter() {
291            if let Some(entry) = self.conns.get(&id) {
292                if !self.enqueue_or_evict(*id, &entry, WsMessage::Text(Arc::clone(&arc))) {
293                    evict.push(*id);
294                }
295            }
296        }
297        drop(members); // release the rooms shard before unregister mutates it
298        for id in evict {
299            self.unregister(id);
300        }
301    }
302}
303
304impl Default for ConnectionRegistry {
305    fn default() -> Self {
306        Self::new()
307    }
308}
309
310// ─── Event envelope ───────────────────────────────────────────────────────
311
312/// Serialise an `{ "event": <name>, "data": <payload> }` envelope. This is the
313/// wire format for both the multiplexed WebSocket protocol and room broadcasts.
314fn envelope<T: Serialize>(event: &str, payload: &T) -> String {
315    #[derive(Serialize)]
316    struct Envelope<'a, P> {
317        event: &'a str,
318        data: &'a P,
319    }
320    serde_json::to_string(&Envelope {
321        event,
322        data: payload,
323    })
324    .unwrap_or_else(|_| String::from(r#"{"event":"error","data":null}"#))
325}
326
327// ─── WsClient: the developer-facing connection handle ───────────────────────
328
329/// A cheap, clonable handle to one connected client.
330///
331/// Passed to every gateway lifecycle hook and `#[Subscribe]` handler. All
332/// methods are non-blocking and lock-free on the hot path: emitting just
333/// enqueues onto a sharded channel. Holds a `&'static ConnectionRegistry`, so
334/// cloning is a pointer copy — no `Arc` refcount traffic for the registry.
335#[derive(Clone)]
336pub struct WsClient {
337    id: ConnId,
338    reg: &'static ConnectionRegistry,
339    claims: Option<Arc<Claims>>,
340    tenant: Option<Arc<crate::web::tenant::TenantConfig>>,
341}
342
343impl WsClient {
344    #[doc(hidden)]
345    pub fn __new(
346        id: ConnId,
347        reg: &'static ConnectionRegistry,
348        claims: Option<Arc<Claims>>,
349        tenant: Option<Arc<crate::web::tenant::TenantConfig>>,
350    ) -> Self {
351        Self {
352            id,
353            reg,
354            claims,
355            tenant,
356        }
357    }
358
359    /// This connection's unique id.
360    #[inline]
361    pub fn id(&self) -> ConnId {
362        self.id
363    }
364
365    /// Session claims captured during the handshake (e.g. decoded JWT), if any.
366    #[inline]
367    pub fn claims(&self) -> Option<&Claims> {
368        self.claims.as_deref()
369    }
370
371    /// The tenant resolved during the handshake — same registry, strategy,
372    /// and suspension semantics as HTTP requests. Gateways enforce
373    /// multi-tenancy with this exactly like controllers use `ctx.tenant()`.
374    #[inline]
375    pub fn tenant(&self) -> Option<&crate::web::tenant::TenantConfig> {
376        self.tenant.as_deref()
377    }
378
379    /// Send an event to **this** client only.
380    pub async fn emit<T: Serialize>(&self, event: &str, payload: T) {
381        let text: Arc<str> = envelope(event, &payload).into();
382        self.reg.send_text(self.id, text);
383    }
384
385    /// Broadcast an event to **every** connected client (including self).
386    pub async fn broadcast<T: Serialize>(&self, event: &str, payload: T) {
387        self.reg.broadcast_text(&envelope(event, &payload));
388    }
389
390    /// Broadcast an event to every client currently in `room`.
391    pub async fn broadcast_to_room<T: Serialize>(&self, room: &str, event: &str, payload: T) {
392        self.reg
393            .broadcast_room_text(room, &envelope(event, &payload));
394    }
395
396    /// Join a logical room/channel. Subsequent room broadcasts reach this client.
397    pub fn join_room(&self, room: impl Into<String>) {
398        self.reg.join_room(self.id, room.into());
399    }
400
401    /// Leave a room.
402    pub fn leave_room(&self, room: &str) {
403        self.reg.leave_room(self.id, room);
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[cfg(test)]
412    impl ConnectionRegistry {
413        /// Test helper: backdate a connection's last activity.
414        fn set_last_seen(&self, id: ConnId, unix_secs: u64) {
415            if let Some(entry) = self.conns.get(&id) {
416                entry.last_seen.store(unix_secs, Ordering::Relaxed);
417            }
418        }
419    }
420
421    fn reg_with_conn(buffer: usize) -> (ConnectionRegistry, ConnId, mpsc::Receiver<WsMessage>) {
422        let reg = ConnectionRegistry::new();
423        let (tx, rx) = mpsc::channel(buffer);
424        let id = reg.register(tx, None);
425        (reg, id, rx)
426    }
427
428    #[tokio::test]
429    async fn slow_client_is_evicted_when_queue_fills() {
430        let (reg, _id, _rx) = reg_with_conn(1);
431        assert_eq!(reg.connection_count(), 1);
432
433        // First frame fills the (never-drained) queue…
434        reg.broadcast_text("one");
435        assert_eq!(reg.connection_count(), 1);
436        // …the second finds it full → the slow client is evicted.
437        reg.broadcast_text("two");
438        assert_eq!(reg.connection_count(), 0, "slow client must be evicted");
439    }
440
441    #[tokio::test]
442    async fn fast_client_receives_broadcasts() {
443        let (reg, _id, mut rx) = reg_with_conn(8);
444        reg.broadcast_text("hello");
445        match rx.recv().await {
446            Some(WsMessage::Text(t)) => assert_eq!(&*t, "hello"),
447            other => panic!("expected text frame, got {other:?}"),
448        }
449    }
450
451    #[tokio::test]
452    async fn room_membership_and_cleanup() {
453        let (reg, id, mut rx) = reg_with_conn(8);
454        reg.join_room(id, "alpha".into());
455
456        reg.broadcast_room_text("alpha", "in-room");
457        assert!(matches!(rx.recv().await, Some(WsMessage::Text(t)) if &*t == "in-room"));
458
459        reg.leave_room(id, "alpha");
460        reg.broadcast_room_text("alpha", "after-leave");
461        assert!(
462            rx.try_recv().is_err(),
463            "must not receive after leaving the room"
464        );
465
466        // Unregister evicts from rooms and prunes empty room keys.
467        reg.join_room(id, "beta".into());
468        reg.unregister(id);
469        assert_eq!(reg.connection_count(), 0);
470        reg.broadcast_room_text("beta", "to-nobody"); // must not panic
471    }
472
473    #[tokio::test]
474    async fn sweep_idle_reaps_only_stale_connections() {
475        let (reg, stale, _rx1) = reg_with_conn(8);
476        let (tx2, mut rx2) = mpsc::channel(8);
477        let fresh = reg.register(tx2, None);
478
479        reg.set_last_seen(stale, unix_now() - 3600);
480        let reaped = reg.sweep_idle(60);
481
482        assert_eq!(reaped, vec![stale]);
483        assert_eq!(reg.connection_count(), 1);
484        // The fresh connection is untouched and still receives traffic.
485        reg.send_text(fresh, Arc::from("still-alive"));
486        assert!(matches!(rx2.recv().await, Some(WsMessage::Text(t)) if &*t == "still-alive"));
487    }
488
489    #[tokio::test]
490    async fn close_all_enqueues_close_frames() {
491        let (reg, _id, mut rx) = reg_with_conn(8);
492        assert_eq!(reg.close_all(), 1);
493        assert!(matches!(rx.recv().await, Some(WsMessage::Close)));
494    }
495}