arcly_http/realtime/connection.rs
1//! Lock-free connection & broadcasting registry.
2//!
3//! ## Topology
4//!
5//! The hot path for real-time servers is *broadcast*: one inbound event fans
6//! out to thousands of sockets. A naive `Mutex<HashMap<Id, Sender>>` serialises
7//! every send behind one lock — the classic C10K/C100K bottleneck.
8//!
9//! `arcly-http` instead uses [`dashmap::DashMap`], which shards the key space
10//! across N independent locks (N = `4 * num_cpus` by default). Two broadcasts
11//! touching different shards never contend. Each connection owns an
12//! `mpsc::UnboundedSender`; sending is a wait-free enqueue (`try`-style) that
13//! never blocks the websocket frame-processing task. The per-socket writer task
14//! drains its own receiver — so a slow client back-pressures only itself.
15//!
16//! ## Encapsulation
17//!
18//! No `axum`/`tower` types appear here. The registry speaks [`WsMessage`], an
19//! arcly-owned enum. The websocket boundary in [`super::ws`] is the single
20//! place that translates `WsMessage` ↔ `axum::extract::ws::Message`.
21
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24
25use dashmap::{DashMap, DashSet};
26use serde::Serialize;
27use tokio::sync::mpsc;
28
29use crate::web::context::Claims;
30
31/// Monotonic, process-unique connection identifier.
32pub type ConnId = u64;
33
34/// An arcly-owned outbound frame. The websocket boundary maps this onto the
35/// concrete transport frame type — keeping `axum` out of the public surface.
36///
37/// `Text` holds `Arc<str>` so broadcast can enqueue N pointer copies from a
38/// single allocation rather than cloning the string body N times.
39#[derive(Clone, Debug)]
40pub enum WsMessage {
41 /// UTF-8 text frame (our event envelope is always JSON text).
42 Text(Arc<str>),
43 /// Graceful close.
44 Close,
45}
46
47/// Per-connection registry row. `tx` is the head of that socket's outbound
48/// queue; `member_rooms` tracks which rooms to evict the id from on disconnect.
49struct ConnEntry {
50 tx: mpsc::UnboundedSender<WsMessage>,
51 member_rooms: DashSet<String>,
52 #[allow(dead_code)]
53 claims: Option<Arc<Claims>>,
54}
55
56/// Sharded, lock-free-on-the-hot-path connection registry.
57///
58/// One instance is created at launch, leaked to `&'static`, and shared by every
59/// gateway route. All mutating operations are `O(1)` amortised and touch only a
60/// single shard; `broadcast` is `O(connections)` with no global lock.
61pub struct ConnectionRegistry {
62 next_id: AtomicU64,
63 conns: DashMap<ConnId, ConnEntry>,
64 rooms: DashMap<String, DashSet<ConnId>>,
65}
66
67impl ConnectionRegistry {
68 pub fn new() -> Self {
69 Self {
70 next_id: AtomicU64::new(1),
71 conns: DashMap::new(),
72 rooms: DashMap::new(),
73 }
74 }
75
76 /// Register a freshly-upgraded socket. Returns its assigned [`ConnId`].
77 pub fn register(
78 &self,
79 tx: mpsc::UnboundedSender<WsMessage>,
80 claims: Option<Arc<Claims>>,
81 ) -> ConnId {
82 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
83 self.conns.insert(
84 id,
85 ConnEntry {
86 tx,
87 member_rooms: DashSet::new(),
88 claims,
89 },
90 );
91 id
92 }
93
94 /// Remove a socket and evict it from every room it had joined.
95 pub fn unregister(&self, id: ConnId) {
96 if let Some((_, entry)) = self.conns.remove(&id) {
97 for room in entry.member_rooms.iter() {
98 let key = room.key();
99 if let Some(set) = self.rooms.get(key) {
100 set.remove(&id);
101 // Prune the room key when empty to prevent unbounded growth.
102 // Drop the Ref before acquiring the write lock inside remove_if.
103 if set.is_empty() {
104 drop(set);
105 self.rooms.remove_if(key, |_, s| s.is_empty());
106 }
107 }
108 }
109 }
110 }
111
112 /// Number of live connections. O(1).
113 #[inline]
114 pub fn connection_count(&self) -> usize {
115 self.conns.len()
116 }
117
118 /// Add a connection to a room (Socket.IO-style logical channel).
119 ///
120 /// ## TOCTOU safety
121 ///
122 /// Both `member_rooms` and `rooms` are updated while holding the DashMap
123 /// shard read-lock on `conns` (the `Ref` keeps the shard locked until it is
124 /// dropped). This prevents a concurrent `unregister()` — which needs the
125 /// shard *write*-lock — from removing the entry between the two writes,
126 /// which would otherwise leave a stale `ConnId` in `rooms` forever.
127 pub fn join_room(&self, id: ConnId, room: String) {
128 if let Some(entry) = self.conns.get(&id) {
129 // member_rooms updated while shard read-lock is held.
130 entry.member_rooms.insert(room.clone());
131 // rooms secondary index updated while same lock is still live.
132 self.rooms.entry(room).or_default().insert(id);
133 // Ref dropped here → shard lock released.
134 }
135 // If the conn was not found the socket is already disconnected; no-op.
136 }
137
138 /// Remove a connection from a room.
139 ///
140 /// Mirrors `join_room`'s lock order: hold the `conns` shard read-lock first
141 /// so a concurrent `unregister()` (which needs the write-lock) cannot remove
142 /// the entry between the two writes and leave a stale id in `rooms`.
143 pub fn leave_room(&self, id: ConnId, room: &str) {
144 if let Some(entry) = self.conns.get(&id) {
145 entry.member_rooms.remove(room);
146 if let Some(set) = self.rooms.get(room) {
147 set.remove(&id);
148 if set.is_empty() {
149 drop(set);
150 self.rooms.remove_if(room, |_, s| s.is_empty());
151 }
152 }
153 }
154 }
155
156 /// Enqueue a text frame to one connection. Non-blocking; a dead receiver
157 /// is silently dropped (the reader loop will unregister it shortly).
158 #[inline]
159 pub fn send_text(&self, id: ConnId, text: Arc<str>) {
160 if let Some(entry) = self.conns.get(&id) {
161 let _ = entry.tx.send(WsMessage::Text(text));
162 }
163 }
164
165 /// Fan a text frame out to every live connection. Creates one `Arc<str>`
166 /// allocation then enqueues N pointer copies — no per-connection memcpy.
167 pub fn broadcast_text(&self, text: &str) {
168 let arc: Arc<str> = Arc::from(text);
169 for entry in self.conns.iter() {
170 let _ = entry.value().tx.send(WsMessage::Text(Arc::clone(&arc)));
171 }
172 }
173
174 /// Fan a text frame out to the members of one room.
175 pub fn broadcast_room_text(&self, room: &str, text: &str) {
176 let Some(members) = self.rooms.get(room) else {
177 return;
178 };
179 let arc: Arc<str> = Arc::from(text);
180 for id in members.iter() {
181 if let Some(entry) = self.conns.get(&id) {
182 let _ = entry.tx.send(WsMessage::Text(Arc::clone(&arc)));
183 }
184 }
185 }
186}
187
188impl Default for ConnectionRegistry {
189 fn default() -> Self {
190 Self::new()
191 }
192}
193
194// ─── Event envelope ───────────────────────────────────────────────────────
195
196/// Serialise an `{ "event": <name>, "data": <payload> }` envelope. This is the
197/// wire format for both the multiplexed WebSocket protocol and room broadcasts.
198fn envelope<T: Serialize>(event: &str, payload: &T) -> String {
199 #[derive(Serialize)]
200 struct Envelope<'a, P> {
201 event: &'a str,
202 data: &'a P,
203 }
204 serde_json::to_string(&Envelope {
205 event,
206 data: payload,
207 })
208 .unwrap_or_else(|_| String::from(r#"{"event":"error","data":null}"#))
209}
210
211// ─── WsClient: the developer-facing connection handle ───────────────────────
212
213/// A cheap, clonable handle to one connected client.
214///
215/// Passed to every gateway lifecycle hook and `#[Subscribe]` handler. All
216/// methods are non-blocking and lock-free on the hot path: emitting just
217/// enqueues onto a sharded channel. Holds a `&'static ConnectionRegistry`, so
218/// cloning is a pointer copy — no `Arc` refcount traffic for the registry.
219#[derive(Clone)]
220pub struct WsClient {
221 id: ConnId,
222 reg: &'static ConnectionRegistry,
223 claims: Option<Arc<Claims>>,
224}
225
226impl WsClient {
227 #[doc(hidden)]
228 pub fn __new(
229 id: ConnId,
230 reg: &'static ConnectionRegistry,
231 claims: Option<Arc<Claims>>,
232 ) -> Self {
233 Self { id, reg, claims }
234 }
235
236 /// This connection's unique id.
237 #[inline]
238 pub fn id(&self) -> ConnId {
239 self.id
240 }
241
242 /// Session claims captured during the handshake (e.g. decoded JWT), if any.
243 #[inline]
244 pub fn claims(&self) -> Option<&Claims> {
245 self.claims.as_deref()
246 }
247
248 /// Send an event to **this** client only.
249 pub async fn emit<T: Serialize>(&self, event: &str, payload: T) {
250 let text: Arc<str> = envelope(event, &payload).into();
251 self.reg.send_text(self.id, text);
252 }
253
254 /// Broadcast an event to **every** connected client (including self).
255 pub async fn broadcast<T: Serialize>(&self, event: &str, payload: T) {
256 self.reg.broadcast_text(&envelope(event, &payload));
257 }
258
259 /// Broadcast an event to every client currently in `room`.
260 pub async fn broadcast_to_room<T: Serialize>(&self, room: &str, event: &str, payload: T) {
261 self.reg
262 .broadcast_room_text(room, &envelope(event, &payload));
263 }
264
265 /// Join a logical room/channel. Subsequent room broadcasts reach this client.
266 pub fn join_room(&self, room: impl Into<String>) {
267 self.reg.join_room(self.id, room.into());
268 }
269
270 /// Leave a room.
271 pub fn leave_room(&self, room: &str) {
272 self.reg.leave_room(self.id, room);
273 }
274}