arcly_http/realtime/connection.rs
1//! Lock-free connection & broadcasting registry.
2//!
3//! ## Topology
4//!
5//! The hot path for real-time servers is *broadcast*: one inbound event fans
6//! out to thousands of sockets. A naive `Mutex<HashMap<Id, Sender>>` serialises
7//! every send behind one lock — the classic C10K/C100K bottleneck.
8//!
9//! `arcly-http` instead uses [`dashmap::DashMap`], which shards the key space
10//! across N independent locks (N = `4 * num_cpus` by default). Two broadcasts
11//! touching different shards never contend. Each connection owns an
12//! `mpsc::UnboundedSender`; sending is a wait-free enqueue (`try`-style) that
13//! never blocks the websocket frame-processing task. The per-socket writer task
14//! drains its own receiver — so a slow client back-pressures only itself.
15//!
16//! ## Encapsulation
17//!
18//! No `axum`/`tower` types appear here. The registry speaks [`WsMessage`], an
19//! arcly-owned enum. The websocket boundary in [`super::ws`] is the single
20//! place that translates `WsMessage` ↔ `axum::extract::ws::Message`.
21
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24
25use dashmap::{DashMap, DashSet};
26use serde::Serialize;
27use tokio::sync::mpsc;
28
29use crate::web::context::Claims;
30
31/// Monotonic, process-unique connection identifier.
32pub type ConnId = u64;
33
34/// An arcly-owned outbound frame. The websocket boundary maps this onto the
35/// concrete transport frame type — keeping `axum` out of the public surface.
36///
37/// `Text` holds `Arc<str>` so broadcast can enqueue N pointer copies from a
38/// single allocation rather than cloning the string body N times.
39#[derive(Clone, Debug)]
40pub enum WsMessage {
41 /// UTF-8 text frame (our event envelope is always JSON text).
42 Text(Arc<str>),
43 /// Graceful close.
44 Close,
45}
46
47/// Per-connection registry row. `tx` is the head of that socket's outbound
48/// queue; `member_rooms` tracks which rooms to evict the id from on disconnect.
49struct ConnEntry {
50 tx: mpsc::UnboundedSender<WsMessage>,
51 member_rooms: DashSet<String>,
52 #[allow(dead_code)]
53 claims: Option<Arc<Claims>>,
54}
55
56/// Sharded, lock-free-on-the-hot-path connection registry.
57///
58/// One instance is created at launch, leaked to `&'static`, and shared by every
59/// gateway route. All mutating operations are `O(1)` amortised and touch only a
60/// single shard; `broadcast` is `O(connections)` with no global lock.
61pub struct ConnectionRegistry {
62 next_id: AtomicU64,
63 conns: DashMap<ConnId, ConnEntry>,
64 rooms: DashMap<String, DashSet<ConnId>>,
65}
66
67impl ConnectionRegistry {
68 pub fn new() -> Self {
69 Self {
70 next_id: AtomicU64::new(1),
71 conns: DashMap::new(),
72 rooms: DashMap::new(),
73 }
74 }
75
76 /// Register a freshly-upgraded socket. Returns its assigned [`ConnId`].
77 pub fn register(
78 &self,
79 tx: mpsc::UnboundedSender<WsMessage>,
80 claims: Option<Arc<Claims>>,
81 ) -> ConnId {
82 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
83 self.conns.insert(
84 id,
85 ConnEntry {
86 tx,
87 member_rooms: DashSet::new(),
88 claims,
89 },
90 );
91 id
92 }
93
94 /// Send a Close frame to **every** live socket. Used by graceful
95 /// shutdown: WS connections otherwise keep axum's drain phase open
96 /// indefinitely (until clients disconnect or the supervisor SIGKILLs),
97 /// which would mean plugin `on_shutdown` hooks never run in production.
98 /// Each socket's writer task sends the frame and exits; readers follow.
99 pub fn close_all(&self) -> usize {
100 let mut n = 0;
101 for entry in self.conns.iter() {
102 let _ = entry.value().tx.send(WsMessage::Close);
103 n += 1;
104 }
105 n
106 }
107
108 /// Remove a socket and evict it from every room it had joined.
109 pub fn unregister(&self, id: ConnId) {
110 if let Some((_, entry)) = self.conns.remove(&id) {
111 for room in entry.member_rooms.iter() {
112 let key = room.key();
113 if let Some(set) = self.rooms.get(key) {
114 set.remove(&id);
115 // Prune the room key when empty to prevent unbounded growth.
116 // Drop the Ref before acquiring the write lock inside remove_if.
117 if set.is_empty() {
118 drop(set);
119 self.rooms.remove_if(key, |_, s| s.is_empty());
120 }
121 }
122 }
123 }
124 }
125
126 /// Number of live connections. O(1).
127 #[inline]
128 pub fn connection_count(&self) -> usize {
129 self.conns.len()
130 }
131
132 /// Add a connection to a room (Socket.IO-style logical channel).
133 ///
134 /// ## TOCTOU safety
135 ///
136 /// Both `member_rooms` and `rooms` are updated while holding the DashMap
137 /// shard read-lock on `conns` (the `Ref` keeps the shard locked until it is
138 /// dropped). This prevents a concurrent `unregister()` — which needs the
139 /// shard *write*-lock — from removing the entry between the two writes,
140 /// which would otherwise leave a stale `ConnId` in `rooms` forever.
141 pub fn join_room(&self, id: ConnId, room: String) {
142 if let Some(entry) = self.conns.get(&id) {
143 // member_rooms updated while shard read-lock is held.
144 entry.member_rooms.insert(room.clone());
145 // rooms secondary index updated while same lock is still live.
146 self.rooms.entry(room).or_default().insert(id);
147 // Ref dropped here → shard lock released.
148 }
149 // If the conn was not found the socket is already disconnected; no-op.
150 }
151
152 /// Remove a connection from a room.
153 ///
154 /// Mirrors `join_room`'s lock order: hold the `conns` shard read-lock first
155 /// so a concurrent `unregister()` (which needs the write-lock) cannot remove
156 /// the entry between the two writes and leave a stale id in `rooms`.
157 pub fn leave_room(&self, id: ConnId, room: &str) {
158 if let Some(entry) = self.conns.get(&id) {
159 entry.member_rooms.remove(room);
160 if let Some(set) = self.rooms.get(room) {
161 set.remove(&id);
162 if set.is_empty() {
163 drop(set);
164 self.rooms.remove_if(room, |_, s| s.is_empty());
165 }
166 }
167 }
168 }
169
170 /// Enqueue a text frame to one connection. Non-blocking; a dead receiver
171 /// is silently dropped (the reader loop will unregister it shortly).
172 #[inline]
173 pub fn send_text(&self, id: ConnId, text: Arc<str>) {
174 if let Some(entry) = self.conns.get(&id) {
175 let _ = entry.tx.send(WsMessage::Text(text));
176 }
177 }
178
179 /// Fan a text frame out to every live connection. Creates one `Arc<str>`
180 /// allocation then enqueues N pointer copies — no per-connection memcpy.
181 pub fn broadcast_text(&self, text: &str) {
182 let arc: Arc<str> = Arc::from(text);
183 for entry in self.conns.iter() {
184 let _ = entry.value().tx.send(WsMessage::Text(Arc::clone(&arc)));
185 }
186 }
187
188 /// Fan a text frame out to the members of one room.
189 pub fn broadcast_room_text(&self, room: &str, text: &str) {
190 let Some(members) = self.rooms.get(room) else {
191 return;
192 };
193 let arc: Arc<str> = Arc::from(text);
194 for id in members.iter() {
195 if let Some(entry) = self.conns.get(&id) {
196 let _ = entry.tx.send(WsMessage::Text(Arc::clone(&arc)));
197 }
198 }
199 }
200}
201
202impl Default for ConnectionRegistry {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208// ─── Event envelope ───────────────────────────────────────────────────────
209
210/// Serialise an `{ "event": <name>, "data": <payload> }` envelope. This is the
211/// wire format for both the multiplexed WebSocket protocol and room broadcasts.
212fn envelope<T: Serialize>(event: &str, payload: &T) -> String {
213 #[derive(Serialize)]
214 struct Envelope<'a, P> {
215 event: &'a str,
216 data: &'a P,
217 }
218 serde_json::to_string(&Envelope {
219 event,
220 data: payload,
221 })
222 .unwrap_or_else(|_| String::from(r#"{"event":"error","data":null}"#))
223}
224
225// ─── WsClient: the developer-facing connection handle ───────────────────────
226
227/// A cheap, clonable handle to one connected client.
228///
229/// Passed to every gateway lifecycle hook and `#[Subscribe]` handler. All
230/// methods are non-blocking and lock-free on the hot path: emitting just
231/// enqueues onto a sharded channel. Holds a `&'static ConnectionRegistry`, so
232/// cloning is a pointer copy — no `Arc` refcount traffic for the registry.
233#[derive(Clone)]
234pub struct WsClient {
235 id: ConnId,
236 reg: &'static ConnectionRegistry,
237 claims: Option<Arc<Claims>>,
238}
239
240impl WsClient {
241 #[doc(hidden)]
242 pub fn __new(
243 id: ConnId,
244 reg: &'static ConnectionRegistry,
245 claims: Option<Arc<Claims>>,
246 ) -> Self {
247 Self { id, reg, claims }
248 }
249
250 /// This connection's unique id.
251 #[inline]
252 pub fn id(&self) -> ConnId {
253 self.id
254 }
255
256 /// Session claims captured during the handshake (e.g. decoded JWT), if any.
257 #[inline]
258 pub fn claims(&self) -> Option<&Claims> {
259 self.claims.as_deref()
260 }
261
262 /// Send an event to **this** client only.
263 pub async fn emit<T: Serialize>(&self, event: &str, payload: T) {
264 let text: Arc<str> = envelope(event, &payload).into();
265 self.reg.send_text(self.id, text);
266 }
267
268 /// Broadcast an event to **every** connected client (including self).
269 pub async fn broadcast<T: Serialize>(&self, event: &str, payload: T) {
270 self.reg.broadcast_text(&envelope(event, &payload));
271 }
272
273 /// Broadcast an event to every client currently in `room`.
274 pub async fn broadcast_to_room<T: Serialize>(&self, room: &str, event: &str, payload: T) {
275 self.reg
276 .broadcast_room_text(room, &envelope(event, &payload));
277 }
278
279 /// Join a logical room/channel. Subsequent room broadcasts reach this client.
280 pub fn join_room(&self, room: impl Into<String>) {
281 self.reg.join_room(self.id, room.into());
282 }
283
284 /// Leave a room.
285 pub fn leave_room(&self, room: &str) {
286 self.reg.leave_room(self.id, room);
287 }
288}