Skip to main content

arcly_http/realtime/
connection.rs

1//! Lock-free connection & broadcasting registry.
2//!
3//! ## Topology
4//!
5//! The hot path for real-time servers is *broadcast*: one inbound event fans
6//! out to thousands of sockets. A naive `Mutex<HashMap<Id, Sender>>` serialises
7//! every send behind one lock — the classic C10K/C100K bottleneck.
8//!
9//! `arcly-http` instead uses [`dashmap::DashMap`], which shards the key space
10//! across N independent locks (N = `4 * num_cpus` by default). Two broadcasts
11//! touching different shards never contend. Each connection owns an
12//! `mpsc::UnboundedSender`; sending is a wait-free enqueue (`try`-style) that
13//! never blocks the websocket frame-processing task. The per-socket writer task
14//! drains its own receiver — so a slow client back-pressures only itself.
15//!
16//! ## Encapsulation
17//!
18//! No `axum`/`tower` types appear here. The registry speaks [`WsMessage`], an
19//! arcly-owned enum. The websocket boundary in [`super::ws`] is the single
20//! place that translates `WsMessage` ↔ `axum::extract::ws::Message`.
21
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24
25use dashmap::{DashMap, DashSet};
26use serde::Serialize;
27use tokio::sync::mpsc;
28
29use crate::web::context::Claims;
30
31/// Monotonic, process-unique connection identifier.
32pub type ConnId = u64;
33
34/// An arcly-owned outbound frame. The websocket boundary maps this onto the
35/// concrete transport frame type — keeping `axum` out of the public surface.
36///
37/// `Text` holds `Arc<str>` so broadcast can enqueue N pointer copies from a
38/// single allocation rather than cloning the string body N times.
39#[derive(Clone, Debug)]
40pub enum WsMessage {
41    /// UTF-8 text frame (our event envelope is always JSON text).
42    Text(Arc<str>),
43    /// Graceful close.
44    Close,
45}
46
47/// Per-connection registry row. `tx` is the head of that socket's outbound
48/// queue; `member_rooms` tracks which rooms to evict the id from on disconnect.
49struct ConnEntry {
50    tx: mpsc::UnboundedSender<WsMessage>,
51    member_rooms: DashSet<String>,
52    #[allow(dead_code)]
53    claims: Option<Arc<Claims>>,
54}
55
56/// Sharded, lock-free-on-the-hot-path connection registry.
57///
58/// One instance is created at launch, leaked to `&'static`, and shared by every
59/// gateway route. All mutating operations are `O(1)` amortised and touch only a
60/// single shard; `broadcast` is `O(connections)` with no global lock.
61pub struct ConnectionRegistry {
62    next_id: AtomicU64,
63    conns: DashMap<ConnId, ConnEntry>,
64    rooms: DashMap<String, DashSet<ConnId>>,
65}
66
67impl ConnectionRegistry {
68    pub fn new() -> Self {
69        Self {
70            next_id: AtomicU64::new(1),
71            conns: DashMap::new(),
72            rooms: DashMap::new(),
73        }
74    }
75
76    /// Register a freshly-upgraded socket. Returns its assigned [`ConnId`].
77    pub fn register(
78        &self,
79        tx: mpsc::UnboundedSender<WsMessage>,
80        claims: Option<Arc<Claims>>,
81    ) -> ConnId {
82        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
83        self.conns.insert(
84            id,
85            ConnEntry {
86                tx,
87                member_rooms: DashSet::new(),
88                claims,
89            },
90        );
91        id
92    }
93
94    /// Remove a socket and evict it from every room it had joined.
95    pub fn unregister(&self, id: ConnId) {
96        if let Some((_, entry)) = self.conns.remove(&id) {
97            for room in entry.member_rooms.iter() {
98                let key = room.key();
99                if let Some(set) = self.rooms.get(key) {
100                    set.remove(&id);
101                    // Prune the room key when empty to prevent unbounded growth.
102                    // Drop the Ref before acquiring the write lock inside remove_if.
103                    if set.is_empty() {
104                        drop(set);
105                        self.rooms.remove_if(key, |_, s| s.is_empty());
106                    }
107                }
108            }
109        }
110    }
111
112    /// Number of live connections. O(1).
113    #[inline]
114    pub fn connection_count(&self) -> usize {
115        self.conns.len()
116    }
117
118    /// Add a connection to a room (Socket.IO-style logical channel).
119    ///
120    /// ## TOCTOU safety
121    ///
122    /// Both `member_rooms` and `rooms` are updated while holding the DashMap
123    /// shard read-lock on `conns` (the `Ref` keeps the shard locked until it is
124    /// dropped). This prevents a concurrent `unregister()` — which needs the
125    /// shard *write*-lock — from removing the entry between the two writes,
126    /// which would otherwise leave a stale `ConnId` in `rooms` forever.
127    pub fn join_room(&self, id: ConnId, room: String) {
128        if let Some(entry) = self.conns.get(&id) {
129            // member_rooms updated while shard read-lock is held.
130            entry.member_rooms.insert(room.clone());
131            // rooms secondary index updated while same lock is still live.
132            self.rooms.entry(room).or_default().insert(id);
133            // Ref dropped here → shard lock released.
134        }
135        // If the conn was not found the socket is already disconnected; no-op.
136    }
137
138    /// Remove a connection from a room.
139    ///
140    /// Mirrors `join_room`'s lock order: hold the `conns` shard read-lock first
141    /// so a concurrent `unregister()` (which needs the write-lock) cannot remove
142    /// the entry between the two writes and leave a stale id in `rooms`.
143    pub fn leave_room(&self, id: ConnId, room: &str) {
144        if let Some(entry) = self.conns.get(&id) {
145            entry.member_rooms.remove(room);
146            if let Some(set) = self.rooms.get(room) {
147                set.remove(&id);
148                if set.is_empty() {
149                    drop(set);
150                    self.rooms.remove_if(room, |_, s| s.is_empty());
151                }
152            }
153        }
154    }
155
156    /// Enqueue a text frame to one connection. Non-blocking; a dead receiver
157    /// is silently dropped (the reader loop will unregister it shortly).
158    #[inline]
159    pub fn send_text(&self, id: ConnId, text: Arc<str>) {
160        if let Some(entry) = self.conns.get(&id) {
161            let _ = entry.tx.send(WsMessage::Text(text));
162        }
163    }
164
165    /// Fan a text frame out to every live connection. Creates one `Arc<str>`
166    /// allocation then enqueues N pointer copies — no per-connection memcpy.
167    pub fn broadcast_text(&self, text: &str) {
168        let arc: Arc<str> = Arc::from(text);
169        for entry in self.conns.iter() {
170            let _ = entry.value().tx.send(WsMessage::Text(Arc::clone(&arc)));
171        }
172    }
173
174    /// Fan a text frame out to the members of one room.
175    pub fn broadcast_room_text(&self, room: &str, text: &str) {
176        let Some(members) = self.rooms.get(room) else {
177            return;
178        };
179        let arc: Arc<str> = Arc::from(text);
180        for id in members.iter() {
181            if let Some(entry) = self.conns.get(&id) {
182                let _ = entry.tx.send(WsMessage::Text(Arc::clone(&arc)));
183            }
184        }
185    }
186}
187
188impl Default for ConnectionRegistry {
189    fn default() -> Self {
190        Self::new()
191    }
192}
193
194// ─── Event envelope ───────────────────────────────────────────────────────
195
196/// Serialise an `{ "event": <name>, "data": <payload> }` envelope. This is the
197/// wire format for both the multiplexed WebSocket protocol and room broadcasts.
198fn envelope<T: Serialize>(event: &str, payload: &T) -> String {
199    #[derive(Serialize)]
200    struct Envelope<'a, P> {
201        event: &'a str,
202        data: &'a P,
203    }
204    serde_json::to_string(&Envelope {
205        event,
206        data: payload,
207    })
208    .unwrap_or_else(|_| String::from(r#"{"event":"error","data":null}"#))
209}
210
211// ─── WsClient: the developer-facing connection handle ───────────────────────
212
213/// A cheap, clonable handle to one connected client.
214///
215/// Passed to every gateway lifecycle hook and `#[Subscribe]` handler. All
216/// methods are non-blocking and lock-free on the hot path: emitting just
217/// enqueues onto a sharded channel. Holds a `&'static ConnectionRegistry`, so
218/// cloning is a pointer copy — no `Arc` refcount traffic for the registry.
219#[derive(Clone)]
220pub struct WsClient {
221    id: ConnId,
222    reg: &'static ConnectionRegistry,
223    claims: Option<Arc<Claims>>,
224}
225
226impl WsClient {
227    #[doc(hidden)]
228    pub fn __new(
229        id: ConnId,
230        reg: &'static ConnectionRegistry,
231        claims: Option<Arc<Claims>>,
232    ) -> Self {
233        Self { id, reg, claims }
234    }
235
236    /// This connection's unique id.
237    #[inline]
238    pub fn id(&self) -> ConnId {
239        self.id
240    }
241
242    /// Session claims captured during the handshake (e.g. decoded JWT), if any.
243    #[inline]
244    pub fn claims(&self) -> Option<&Claims> {
245        self.claims.as_deref()
246    }
247
248    /// Send an event to **this** client only.
249    pub async fn emit<T: Serialize>(&self, event: &str, payload: T) {
250        let text: Arc<str> = envelope(event, &payload).into();
251        self.reg.send_text(self.id, text);
252    }
253
254    /// Broadcast an event to **every** connected client (including self).
255    pub async fn broadcast<T: Serialize>(&self, event: &str, payload: T) {
256        self.reg.broadcast_text(&envelope(event, &payload));
257    }
258
259    /// Broadcast an event to every client currently in `room`.
260    pub async fn broadcast_to_room<T: Serialize>(&self, room: &str, event: &str, payload: T) {
261        self.reg
262            .broadcast_room_text(room, &envelope(event, &payload));
263    }
264
265    /// Join a logical room/channel. Subsequent room broadcasts reach this client.
266    pub fn join_room(&self, room: impl Into<String>) {
267        self.reg.join_room(self.id, room.into());
268    }
269
270    /// Leave a room.
271    pub fn leave_room(&self, room: &str) {
272        self.reg.leave_room(self.id, room);
273    }
274}