Skip to main content

arcly_http/realtime/
connection.rs

1//! Lock-free connection & broadcasting registry.
2//!
3//! ## Topology
4//!
5//! The hot path for real-time servers is *broadcast*: one inbound event fans
6//! out to thousands of sockets. A naive `Mutex<HashMap<Id, Sender>>` serialises
7//! every send behind one lock — the classic C10K/C100K bottleneck.
8//!
9//! `arcly-http` instead uses [`dashmap::DashMap`], which shards the key space
10//! across N independent locks (N = `4 * num_cpus` by default). Two broadcasts
11//! touching different shards never contend. Each connection owns an
12//! `mpsc::UnboundedSender`; sending is a wait-free enqueue (`try`-style) that
13//! never blocks the websocket frame-processing task. The per-socket writer task
14//! drains its own receiver — so a slow client back-pressures only itself.
15//!
16//! ## Encapsulation
17//!
18//! No `axum`/`tower` types appear here. The registry speaks [`WsMessage`], an
19//! arcly-owned enum. The websocket boundary in [`super::ws`] is the single
20//! place that translates `WsMessage` ↔ `axum::extract::ws::Message`.
21
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24
25use dashmap::{DashMap, DashSet};
26use serde::Serialize;
27use tokio::sync::mpsc;
28
29use crate::web::context::Claims;
30
31/// Monotonic, process-unique connection identifier.
32pub type ConnId = u64;
33
34/// An arcly-owned outbound frame. The websocket boundary maps this onto the
35/// concrete transport frame type — keeping `axum` out of the public surface.
36///
37/// `Text` holds `Arc<str>` so broadcast can enqueue N pointer copies from a
38/// single allocation rather than cloning the string body N times.
39#[derive(Clone, Debug)]
40pub enum WsMessage {
41    /// UTF-8 text frame (our event envelope is always JSON text).
42    Text(Arc<str>),
43    /// Graceful close.
44    Close,
45}
46
47/// Per-connection registry row. `tx` is the head of that socket's outbound
48/// queue; `member_rooms` tracks which rooms to evict the id from on disconnect.
49struct ConnEntry {
50    tx: mpsc::UnboundedSender<WsMessage>,
51    member_rooms: DashSet<String>,
52    #[allow(dead_code)]
53    claims: Option<Arc<Claims>>,
54}
55
56/// Sharded, lock-free-on-the-hot-path connection registry.
57///
58/// One instance is created at launch, leaked to `&'static`, and shared by every
59/// gateway route. All mutating operations are `O(1)` amortised and touch only a
60/// single shard; `broadcast` is `O(connections)` with no global lock.
61pub struct ConnectionRegistry {
62    next_id: AtomicU64,
63    conns: DashMap<ConnId, ConnEntry>,
64    rooms: DashMap<String, DashSet<ConnId>>,
65}
66
67impl ConnectionRegistry {
68    pub fn new() -> Self {
69        Self {
70            next_id: AtomicU64::new(1),
71            conns: DashMap::new(),
72            rooms: DashMap::new(),
73        }
74    }
75
76    /// Register a freshly-upgraded socket. Returns its assigned [`ConnId`].
77    pub fn register(
78        &self,
79        tx: mpsc::UnboundedSender<WsMessage>,
80        claims: Option<Arc<Claims>>,
81    ) -> ConnId {
82        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
83        self.conns.insert(
84            id,
85            ConnEntry {
86                tx,
87                member_rooms: DashSet::new(),
88                claims,
89            },
90        );
91        id
92    }
93
94    /// Send a Close frame to **every** live socket. Used by graceful
95    /// shutdown: WS connections otherwise keep axum's drain phase open
96    /// indefinitely (until clients disconnect or the supervisor SIGKILLs),
97    /// which would mean plugin `on_shutdown` hooks never run in production.
98    /// Each socket's writer task sends the frame and exits; readers follow.
99    pub fn close_all(&self) -> usize {
100        let mut n = 0;
101        for entry in self.conns.iter() {
102            let _ = entry.value().tx.send(WsMessage::Close);
103            n += 1;
104        }
105        n
106    }
107
108    /// Remove a socket and evict it from every room it had joined.
109    pub fn unregister(&self, id: ConnId) {
110        if let Some((_, entry)) = self.conns.remove(&id) {
111            for room in entry.member_rooms.iter() {
112                let key = room.key();
113                if let Some(set) = self.rooms.get(key) {
114                    set.remove(&id);
115                    // Prune the room key when empty to prevent unbounded growth.
116                    // Drop the Ref before acquiring the write lock inside remove_if.
117                    if set.is_empty() {
118                        drop(set);
119                        self.rooms.remove_if(key, |_, s| s.is_empty());
120                    }
121                }
122            }
123        }
124    }
125
126    /// Number of live connections. O(1).
127    #[inline]
128    pub fn connection_count(&self) -> usize {
129        self.conns.len()
130    }
131
132    /// Add a connection to a room (Socket.IO-style logical channel).
133    ///
134    /// ## TOCTOU safety
135    ///
136    /// Both `member_rooms` and `rooms` are updated while holding the DashMap
137    /// shard read-lock on `conns` (the `Ref` keeps the shard locked until it is
138    /// dropped). This prevents a concurrent `unregister()` — which needs the
139    /// shard *write*-lock — from removing the entry between the two writes,
140    /// which would otherwise leave a stale `ConnId` in `rooms` forever.
141    pub fn join_room(&self, id: ConnId, room: String) {
142        if let Some(entry) = self.conns.get(&id) {
143            // member_rooms updated while shard read-lock is held.
144            entry.member_rooms.insert(room.clone());
145            // rooms secondary index updated while same lock is still live.
146            self.rooms.entry(room).or_default().insert(id);
147            // Ref dropped here → shard lock released.
148        }
149        // If the conn was not found the socket is already disconnected; no-op.
150    }
151
152    /// Remove a connection from a room.
153    ///
154    /// Mirrors `join_room`'s lock order: hold the `conns` shard read-lock first
155    /// so a concurrent `unregister()` (which needs the write-lock) cannot remove
156    /// the entry between the two writes and leave a stale id in `rooms`.
157    pub fn leave_room(&self, id: ConnId, room: &str) {
158        if let Some(entry) = self.conns.get(&id) {
159            entry.member_rooms.remove(room);
160            if let Some(set) = self.rooms.get(room) {
161                set.remove(&id);
162                if set.is_empty() {
163                    drop(set);
164                    self.rooms.remove_if(room, |_, s| s.is_empty());
165                }
166            }
167        }
168    }
169
170    /// Enqueue a text frame to one connection. Non-blocking; a dead receiver
171    /// is silently dropped (the reader loop will unregister it shortly).
172    #[inline]
173    pub fn send_text(&self, id: ConnId, text: Arc<str>) {
174        if let Some(entry) = self.conns.get(&id) {
175            let _ = entry.tx.send(WsMessage::Text(text));
176        }
177    }
178
179    /// Fan a text frame out to every live connection. Creates one `Arc<str>`
180    /// allocation then enqueues N pointer copies — no per-connection memcpy.
181    pub fn broadcast_text(&self, text: &str) {
182        let arc: Arc<str> = Arc::from(text);
183        for entry in self.conns.iter() {
184            let _ = entry.value().tx.send(WsMessage::Text(Arc::clone(&arc)));
185        }
186    }
187
188    /// Fan a text frame out to the members of one room.
189    pub fn broadcast_room_text(&self, room: &str, text: &str) {
190        let Some(members) = self.rooms.get(room) else {
191            return;
192        };
193        let arc: Arc<str> = Arc::from(text);
194        for id in members.iter() {
195            if let Some(entry) = self.conns.get(&id) {
196                let _ = entry.tx.send(WsMessage::Text(Arc::clone(&arc)));
197            }
198        }
199    }
200}
201
202impl Default for ConnectionRegistry {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208// ─── Event envelope ───────────────────────────────────────────────────────
209
210/// Serialise an `{ "event": <name>, "data": <payload> }` envelope. This is the
211/// wire format for both the multiplexed WebSocket protocol and room broadcasts.
212fn envelope<T: Serialize>(event: &str, payload: &T) -> String {
213    #[derive(Serialize)]
214    struct Envelope<'a, P> {
215        event: &'a str,
216        data: &'a P,
217    }
218    serde_json::to_string(&Envelope {
219        event,
220        data: payload,
221    })
222    .unwrap_or_else(|_| String::from(r#"{"event":"error","data":null}"#))
223}
224
225// ─── WsClient: the developer-facing connection handle ───────────────────────
226
227/// A cheap, clonable handle to one connected client.
228///
229/// Passed to every gateway lifecycle hook and `#[Subscribe]` handler. All
230/// methods are non-blocking and lock-free on the hot path: emitting just
231/// enqueues onto a sharded channel. Holds a `&'static ConnectionRegistry`, so
232/// cloning is a pointer copy — no `Arc` refcount traffic for the registry.
233#[derive(Clone)]
234pub struct WsClient {
235    id: ConnId,
236    reg: &'static ConnectionRegistry,
237    claims: Option<Arc<Claims>>,
238}
239
240impl WsClient {
241    #[doc(hidden)]
242    pub fn __new(
243        id: ConnId,
244        reg: &'static ConnectionRegistry,
245        claims: Option<Arc<Claims>>,
246    ) -> Self {
247        Self { id, reg, claims }
248    }
249
250    /// This connection's unique id.
251    #[inline]
252    pub fn id(&self) -> ConnId {
253        self.id
254    }
255
256    /// Session claims captured during the handshake (e.g. decoded JWT), if any.
257    #[inline]
258    pub fn claims(&self) -> Option<&Claims> {
259        self.claims.as_deref()
260    }
261
262    /// Send an event to **this** client only.
263    pub async fn emit<T: Serialize>(&self, event: &str, payload: T) {
264        let text: Arc<str> = envelope(event, &payload).into();
265        self.reg.send_text(self.id, text);
266    }
267
268    /// Broadcast an event to **every** connected client (including self).
269    pub async fn broadcast<T: Serialize>(&self, event: &str, payload: T) {
270        self.reg.broadcast_text(&envelope(event, &payload));
271    }
272
273    /// Broadcast an event to every client currently in `room`.
274    pub async fn broadcast_to_room<T: Serialize>(&self, room: &str, event: &str, payload: T) {
275        self.reg
276            .broadcast_room_text(room, &envelope(event, &payload));
277    }
278
279    /// Join a logical room/channel. Subsequent room broadcasts reach this client.
280    pub fn join_room(&self, room: impl Into<String>) {
281        self.reg.join_room(self.id, room.into());
282    }
283
284    /// Leave a room.
285    pub fn leave_room(&self, room: &str) {
286        self.reg.leave_room(self.id, room);
287    }
288}