arcly_http/realtime/connection.rs
1//! Lock-free connection & broadcasting registry.
2//!
3//! ## Topology
4//!
5//! The hot path for real-time servers is *broadcast*: one inbound event fans
6//! out to thousands of sockets. A naive `Mutex<HashMap<Id, Sender>>` serialises
7//! every send behind one lock — the classic C10K/C100K bottleneck.
8//!
9//! `arcly-http` instead uses [`dashmap::DashMap`], which shards the key space
10//! across N independent locks (N = `4 * num_cpus` by default). Two broadcasts
11//! touching different shards never contend. Each connection owns a **bounded**
12//! `mpsc::Sender`; sending is a wait-free `try_send` that never blocks the
13//! frame-processing task. The per-socket writer drains its own receiver, and a
14//! client that can't keep up is **evicted** when its queue fills — a slow
15//! consumer costs at most `ws_outbound_buffer` frames of memory, never an
16//! unbounded buffer.
17//!
18//! ## Encapsulation
19//!
20//! No `axum`/`tower` types appear here. The registry speaks [`WsMessage`], an
21//! arcly-owned enum. The websocket boundary in [`super::ws`] is the single
22//! place that translates `WsMessage` ↔ `axum::extract::ws::Message`.
23
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26
27use dashmap::{DashMap, DashSet};
28use serde::Serialize;
29use tokio::sync::mpsc;
30
31use crate::web::context::Claims;
32
33/// Monotonic, process-unique connection identifier.
34pub type ConnId = u64;
35
36/// An arcly-owned outbound frame. The websocket boundary maps this onto the
37/// concrete transport frame type — keeping `axum` out of the public surface.
38///
39/// `Text` holds `Arc<str>` so broadcast can enqueue N pointer copies from a
40/// single allocation rather than cloning the string body N times.
41#[derive(Clone, Debug)]
42pub enum WsMessage {
43 /// UTF-8 text frame (our event envelope is always JSON text).
44 Text(Arc<str>),
45 /// Server-initiated keep-alive ping (liveness probe through NATs/proxies).
46 Ping,
47 /// Graceful close.
48 Close,
49}
50
51/// Per-connection registry row. `tx` is the head of that socket's **bounded**
52/// outbound queue; `member_rooms` tracks which rooms to evict the id from on
53/// disconnect; `last_seen` (unix secs) is touched by the reader on every
54/// inbound frame and consulted by the idle sweeper.
55struct ConnEntry {
56 tx: mpsc::Sender<WsMessage>,
57 member_rooms: DashSet<String>,
58 last_seen: AtomicU64,
59 #[allow(dead_code)]
60 claims: Option<Arc<Claims>>,
61}
62
63#[inline]
64fn unix_now() -> u64 {
65 std::time::SystemTime::now()
66 .duration_since(std::time::UNIX_EPOCH)
67 .map(|d| d.as_secs())
68 .unwrap_or(0)
69}
70
71/// Sharded, lock-free-on-the-hot-path connection registry.
72///
73/// One instance is created at launch, leaked to `&'static`, and shared by every
74/// gateway route. All mutating operations are `O(1)` amortised and touch only a
75/// single shard; `broadcast` is `O(connections)` with no global lock.
76pub struct ConnectionRegistry {
77 next_id: AtomicU64,
78 conns: DashMap<ConnId, ConnEntry>,
79 rooms: DashMap<String, DashSet<ConnId>>,
80}
81
82impl ConnectionRegistry {
83 pub fn new() -> Self {
84 Self {
85 next_id: AtomicU64::new(1),
86 conns: DashMap::new(),
87 rooms: DashMap::new(),
88 }
89 }
90
91 /// Register a freshly-upgraded socket. Returns its assigned [`ConnId`].
92 ///
93 /// `tx` must be the sending half of a **bounded** channel — the queue
94 /// depth is the slow-client memory ceiling (`LaunchConfig::ws_outbound_buffer`).
95 pub fn register(&self, tx: mpsc::Sender<WsMessage>, claims: Option<Arc<Claims>>) -> ConnId {
96 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
97 self.conns.insert(
98 id,
99 ConnEntry {
100 tx,
101 member_rooms: DashSet::new(),
102 last_seen: AtomicU64::new(unix_now()),
103 claims,
104 },
105 );
106 metrics::gauge!("ws_connections").increment(1.0);
107 id
108 }
109
110 /// Record inbound activity for the idle sweeper. Called by the reader on
111 /// every frame (including pongs); one relaxed store.
112 #[inline]
113 pub fn touch(&self, id: ConnId) {
114 if let Some(entry) = self.conns.get(&id) {
115 entry.last_seen.store(unix_now(), Ordering::Relaxed);
116 }
117 }
118
119 /// Enqueue to one entry; on a **full queue** the client is too slow to
120 /// keep up — evict it (drop the entry; the writer drains the backlog,
121 /// sees the closed channel, and tears the socket down). Unbounded growth
122 /// in server memory is never an option.
123 ///
124 /// Returns `false` when the entry was evicted (caller loops may skip it).
125 fn enqueue_or_evict(&self, id: ConnId, entry: &ConnEntry, msg: WsMessage) -> bool {
126 match entry.tx.try_send(msg) {
127 Ok(()) => {
128 metrics::counter!("ws_messages_out_total").increment(1);
129 true
130 }
131 Err(mpsc::error::TrySendError::Full(_)) => {
132 metrics::counter!("ws_slow_client_evictions_total").increment(1);
133 tracing::warn!(conn = id, "WS outbound queue full — evicting slow client");
134 false
135 }
136 // Receiver already gone — reader will unregister shortly.
137 Err(mpsc::error::TrySendError::Closed(_)) => true,
138 }
139 }
140
141 /// Send a Close frame to **every** live socket. Used by graceful
142 /// shutdown: WS connections otherwise keep axum's drain phase open
143 /// indefinitely (until clients disconnect or the supervisor SIGKILLs),
144 /// which would mean plugin `on_shutdown` hooks never run in production.
145 /// Each socket's writer task sends the frame and exits; readers follow.
146 pub fn close_all(&self) -> usize {
147 let mut n = 0;
148 for entry in self.conns.iter() {
149 // try_send: a full queue can't take the Close, but eviction by
150 // the next broadcast — or the drain deadline — handles it.
151 let _ = entry.value().tx.try_send(WsMessage::Close);
152 n += 1;
153 }
154 n
155 }
156
157 /// Close connections with no inbound activity for `max_idle_secs`.
158 /// Dead TCP links (NAT timeouts, vanished mobile clients) never send a
159 /// Close frame — without reaping they linger in the registry forever,
160 /// eating queue memory on every broadcast. Returns the ids reaped.
161 ///
162 /// Pair with a server ping (`ws_ping_interval`): pings provoke pongs,
163 /// pongs touch `last_seen`, so only truly dead links exceed the window.
164 pub fn sweep_idle(&self, max_idle_secs: u64) -> Vec<ConnId> {
165 let now = unix_now();
166 let stale: Vec<ConnId> = self
167 .conns
168 .iter()
169 .filter(|e| {
170 now.saturating_sub(e.value().last_seen.load(Ordering::Relaxed)) > max_idle_secs
171 })
172 .map(|e| *e.key())
173 .collect();
174 for id in &stale {
175 metrics::counter!("ws_idle_reaped_total").increment(1);
176 if let Some(entry) = self.conns.get(id) {
177 let _ = entry.tx.try_send(WsMessage::Close);
178 }
179 // If the queue was full the Close never lands — force the writer
180 // down by dropping the entry (channel closes, writer exits).
181 self.unregister(*id);
182 }
183 stale
184 }
185
186 /// Remove a socket and evict it from every room it had joined.
187 pub fn unregister(&self, id: ConnId) {
188 if let Some((_, entry)) = self.conns.remove(&id) {
189 metrics::gauge!("ws_connections").decrement(1.0);
190 for room in entry.member_rooms.iter() {
191 let key = room.key();
192 if let Some(set) = self.rooms.get(key) {
193 set.remove(&id);
194 // Prune the room key when empty to prevent unbounded growth.
195 // Drop the Ref before acquiring the write lock inside remove_if.
196 if set.is_empty() {
197 drop(set);
198 self.rooms.remove_if(key, |_, s| s.is_empty());
199 }
200 }
201 }
202 }
203 }
204
205 /// Number of live connections. O(1).
206 #[inline]
207 pub fn connection_count(&self) -> usize {
208 self.conns.len()
209 }
210
211 /// Add a connection to a room (Socket.IO-style logical channel).
212 ///
213 /// ## TOCTOU safety
214 ///
215 /// Both `member_rooms` and `rooms` are updated while holding the DashMap
216 /// shard read-lock on `conns` (the `Ref` keeps the shard locked until it is
217 /// dropped). This prevents a concurrent `unregister()` — which needs the
218 /// shard *write*-lock — from removing the entry between the two writes,
219 /// which would otherwise leave a stale `ConnId` in `rooms` forever.
220 pub fn join_room(&self, id: ConnId, room: String) {
221 if let Some(entry) = self.conns.get(&id) {
222 // member_rooms updated while shard read-lock is held.
223 entry.member_rooms.insert(room.clone());
224 // rooms secondary index updated while same lock is still live.
225 self.rooms.entry(room).or_default().insert(id);
226 // Ref dropped here → shard lock released.
227 }
228 // If the conn was not found the socket is already disconnected; no-op.
229 }
230
231 /// Remove a connection from a room.
232 ///
233 /// Mirrors `join_room`'s lock order: hold the `conns` shard read-lock first
234 /// so a concurrent `unregister()` (which needs the write-lock) cannot remove
235 /// the entry between the two writes and leave a stale id in `rooms`.
236 pub fn leave_room(&self, id: ConnId, room: &str) {
237 if let Some(entry) = self.conns.get(&id) {
238 entry.member_rooms.remove(room);
239 if let Some(set) = self.rooms.get(room) {
240 set.remove(&id);
241 if set.is_empty() {
242 drop(set);
243 self.rooms.remove_if(room, |_, s| s.is_empty());
244 }
245 }
246 }
247 }
248
249 /// Enqueue a text frame to one connection. Non-blocking; a full queue
250 /// evicts the slow client, a dead receiver is dropped silently.
251 #[inline]
252 pub fn send_text(&self, id: ConnId, text: Arc<str>) {
253 let evict = match self.conns.get(&id) {
254 Some(entry) => !self.enqueue_or_evict(id, &entry, WsMessage::Text(text)),
255 None => false,
256 }; // Ref dropped before any removal — same-shard deadlock safety.
257 if evict {
258 self.unregister(id);
259 }
260 }
261
262 /// Fan a text frame out to every live connection. Creates one `Arc<str>`
263 /// allocation then enqueues N pointer copies — no per-connection memcpy.
264 /// Clients whose queues are full are evicted **after** the sweep
265 /// (removal inside iteration would deadlock the DashMap shard).
266 pub fn broadcast_text(&self, text: &str) {
267 let arc: Arc<str> = Arc::from(text);
268 let mut evict: Vec<ConnId> = Vec::new();
269 for entry in self.conns.iter() {
270 if !self.enqueue_or_evict(
271 *entry.key(),
272 entry.value(),
273 WsMessage::Text(Arc::clone(&arc)),
274 ) {
275 evict.push(*entry.key());
276 }
277 }
278 for id in evict {
279 self.unregister(id);
280 }
281 }
282
283 /// Fan a text frame out to the members of one room.
284 pub fn broadcast_room_text(&self, room: &str, text: &str) {
285 let Some(members) = self.rooms.get(room) else {
286 return;
287 };
288 let arc: Arc<str> = Arc::from(text);
289 let mut evict: Vec<ConnId> = Vec::new();
290 for id in members.iter() {
291 if let Some(entry) = self.conns.get(&id) {
292 if !self.enqueue_or_evict(*id, &entry, WsMessage::Text(Arc::clone(&arc))) {
293 evict.push(*id);
294 }
295 }
296 }
297 drop(members); // release the rooms shard before unregister mutates it
298 for id in evict {
299 self.unregister(id);
300 }
301 }
302}
303
304impl Default for ConnectionRegistry {
305 fn default() -> Self {
306 Self::new()
307 }
308}
309
310// ─── Event envelope ───────────────────────────────────────────────────────
311
312/// Serialise an `{ "event": <name>, "data": <payload> }` envelope. This is the
313/// wire format for both the multiplexed WebSocket protocol and room broadcasts.
314fn envelope<T: Serialize>(event: &str, payload: &T) -> String {
315 #[derive(Serialize)]
316 struct Envelope<'a, P> {
317 event: &'a str,
318 data: &'a P,
319 }
320 serde_json::to_string(&Envelope {
321 event,
322 data: payload,
323 })
324 .unwrap_or_else(|_| String::from(r#"{"event":"error","data":null}"#))
325}
326
327// ─── WsClient: the developer-facing connection handle ───────────────────────
328
329/// A cheap, clonable handle to one connected client.
330///
331/// Passed to every gateway lifecycle hook and `#[Subscribe]` handler. All
332/// methods are non-blocking and lock-free on the hot path: emitting just
333/// enqueues onto a sharded channel. Holds a `&'static ConnectionRegistry`, so
334/// cloning is a pointer copy — no `Arc` refcount traffic for the registry.
335#[derive(Clone)]
336pub struct WsClient {
337 id: ConnId,
338 reg: &'static ConnectionRegistry,
339 claims: Option<Arc<Claims>>,
340 tenant: Option<Arc<crate::web::tenant::TenantConfig>>,
341}
342
343impl WsClient {
344 #[doc(hidden)]
345 pub fn __new(
346 id: ConnId,
347 reg: &'static ConnectionRegistry,
348 claims: Option<Arc<Claims>>,
349 tenant: Option<Arc<crate::web::tenant::TenantConfig>>,
350 ) -> Self {
351 Self {
352 id,
353 reg,
354 claims,
355 tenant,
356 }
357 }
358
359 /// This connection's unique id.
360 #[inline]
361 pub fn id(&self) -> ConnId {
362 self.id
363 }
364
365 /// Session claims captured during the handshake (e.g. decoded JWT), if any.
366 #[inline]
367 pub fn claims(&self) -> Option<&Claims> {
368 self.claims.as_deref()
369 }
370
371 /// The tenant resolved during the handshake — same registry, strategy,
372 /// and suspension semantics as HTTP requests. Gateways enforce
373 /// multi-tenancy with this exactly like controllers use `ctx.tenant()`.
374 #[inline]
375 pub fn tenant(&self) -> Option<&crate::web::tenant::TenantConfig> {
376 self.tenant.as_deref()
377 }
378
379 /// Send an event to **this** client only.
380 pub async fn emit<T: Serialize>(&self, event: &str, payload: T) {
381 let text: Arc<str> = envelope(event, &payload).into();
382 self.reg.send_text(self.id, text);
383 }
384
385 /// Broadcast an event to **every** connected client (including self).
386 pub async fn broadcast<T: Serialize>(&self, event: &str, payload: T) {
387 self.reg.broadcast_text(&envelope(event, &payload));
388 }
389
390 /// Broadcast an event to every client currently in `room`.
391 pub async fn broadcast_to_room<T: Serialize>(&self, room: &str, event: &str, payload: T) {
392 self.reg
393 .broadcast_room_text(room, &envelope(event, &payload));
394 }
395
396 /// Join a logical room/channel. Subsequent room broadcasts reach this client.
397 pub fn join_room(&self, room: impl Into<String>) {
398 self.reg.join_room(self.id, room.into());
399 }
400
401 /// Leave a room.
402 pub fn leave_room(&self, room: &str) {
403 self.reg.leave_room(self.id, room);
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[cfg(test)]
412 impl ConnectionRegistry {
413 /// Test helper: backdate a connection's last activity.
414 fn set_last_seen(&self, id: ConnId, unix_secs: u64) {
415 if let Some(entry) = self.conns.get(&id) {
416 entry.last_seen.store(unix_secs, Ordering::Relaxed);
417 }
418 }
419 }
420
421 fn reg_with_conn(buffer: usize) -> (ConnectionRegistry, ConnId, mpsc::Receiver<WsMessage>) {
422 let reg = ConnectionRegistry::new();
423 let (tx, rx) = mpsc::channel(buffer);
424 let id = reg.register(tx, None);
425 (reg, id, rx)
426 }
427
428 #[tokio::test]
429 async fn slow_client_is_evicted_when_queue_fills() {
430 let (reg, _id, _rx) = reg_with_conn(1);
431 assert_eq!(reg.connection_count(), 1);
432
433 // First frame fills the (never-drained) queue…
434 reg.broadcast_text("one");
435 assert_eq!(reg.connection_count(), 1);
436 // …the second finds it full → the slow client is evicted.
437 reg.broadcast_text("two");
438 assert_eq!(reg.connection_count(), 0, "slow client must be evicted");
439 }
440
441 #[tokio::test]
442 async fn fast_client_receives_broadcasts() {
443 let (reg, _id, mut rx) = reg_with_conn(8);
444 reg.broadcast_text("hello");
445 match rx.recv().await {
446 Some(WsMessage::Text(t)) => assert_eq!(&*t, "hello"),
447 other => panic!("expected text frame, got {other:?}"),
448 }
449 }
450
451 #[tokio::test]
452 async fn room_membership_and_cleanup() {
453 let (reg, id, mut rx) = reg_with_conn(8);
454 reg.join_room(id, "alpha".into());
455
456 reg.broadcast_room_text("alpha", "in-room");
457 assert!(matches!(rx.recv().await, Some(WsMessage::Text(t)) if &*t == "in-room"));
458
459 reg.leave_room(id, "alpha");
460 reg.broadcast_room_text("alpha", "after-leave");
461 assert!(
462 rx.try_recv().is_err(),
463 "must not receive after leaving the room"
464 );
465
466 // Unregister evicts from rooms and prunes empty room keys.
467 reg.join_room(id, "beta".into());
468 reg.unregister(id);
469 assert_eq!(reg.connection_count(), 0);
470 reg.broadcast_room_text("beta", "to-nobody"); // must not panic
471 }
472
473 #[tokio::test]
474 async fn sweep_idle_reaps_only_stale_connections() {
475 let (reg, stale, _rx1) = reg_with_conn(8);
476 let (tx2, mut rx2) = mpsc::channel(8);
477 let fresh = reg.register(tx2, None);
478
479 reg.set_last_seen(stale, unix_now() - 3600);
480 let reaped = reg.sweep_idle(60);
481
482 assert_eq!(reaped, vec![stale]);
483 assert_eq!(reg.connection_count(), 1);
484 // The fresh connection is untouched and still receives traffic.
485 reg.send_text(fresh, Arc::from("still-alive"));
486 assert!(matches!(rx2.recv().await, Some(WsMessage::Text(t)) if &*t == "still-alive"));
487 }
488
489 #[tokio::test]
490 async fn close_all_enqueues_close_frames() {
491 let (reg, _id, mut rx) = reg_with_conn(8);
492 assert_eq!(reg.close_all(), 1);
493 assert!(matches!(rx.recv().await, Some(WsMessage::Close)));
494 }
495}