smooth_operator/backplane.rs
1//! Connection backplane: the scale-out + event-delivery seam for the WebSocket
2//! server.
3//!
4//! The reference server is single-process — each connection's outbound sink is a
5//! per-connection in-process channel, with no registry and no way to reach a
6//! connection from outside its own read loop. That blocks two things we need:
7//!
8//! 1. **Horizontal scale-out.** With >1 replica, an agent turn (or any event)
9//! produced on pod A must reach a socket held by pod B.
10//! 2. **Non-AI realtime.** Other parts of a host system (job status, ingestion
11//! progress, notifications) want to push events to a connected client without
12//! going through an agent turn.
13//!
14//! The [`Backplane`] trait is the seam for both. Each connection's local sink is
15//! [`attach`](Backplane::attach)ed on connect and [`associate`](Backplane::associate)d
16//! with targets (its session / user / org / agent) as they're learned;
17//! [`publish`](Backplane::publish) delivers an event to **every connection for a
18//! target, wherever its pod is**.
19//!
20//! - [`InMemoryBackplane`] (the default) keeps a local registry and delivers
21//! straight to local sinks — single process, no external services.
22//! - A Redis / NATS impl (separate crate work) publishes to the bus, and each
23//! pod's subscriber delivers to *its* local sinks — making the same `publish`
24//! call fan out across the fleet.
25//!
26//! This module is a public **mechanism**: a host plugs its chosen impl in via
27//! `AppState::with_backplane(...)`.
28
29use std::collections::{HashMap, HashSet};
30use std::sync::{Arc, RwLock};
31
32use async_trait::async_trait;
33use serde::{Deserialize, Serialize};
34use serde_json::Value;
35
36/// A connection's local delivery sink: given an event, write it to that
37/// connection's socket. Runtime-agnostic — the server wraps its outbound channel
38/// in a closure, so the backplane (and this lib) take no async-runtime
39/// dependency.
40pub type LocalSink = Arc<dyn Fn(Value) + Send + Sync>;
41
42/// A delivery target: a single connection, or every connection associated with a
43/// session / user / org / agent.
44///
45/// `Serialize`/`Deserialize` so a distributed [`Backplane`] (Redis/NATS) can put
46/// the target on the wire alongside the event, and each pod re-resolve it against
47/// *its* local registry.
48#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub enum Target {
50 /// One specific connection.
51 Connection(String),
52 /// Every connection in a conversation session.
53 Session(String),
54 /// Every connection for a user.
55 User(String),
56 /// Every connection for an org/tenant.
57 Org(String),
58 /// Every connection talking to an agent.
59 Agent(String),
60}
61
62/// The wire format a distributed [`Backplane`] broadcasts on its bus: who
63/// published it (so a pod can skip its own echo), the [`Target`] to re-resolve
64/// locally, and the event payload.
65///
66/// Shared so the Redis and NATS adapters — and any host's own transport adapter —
67/// speak the same envelope; a pod on the other end deserializes this and calls
68/// `publish` on its **local** [`InMemoryBackplane`] to fan out to its sockets.
69#[derive(Clone, Debug, Serialize, Deserialize)]
70pub struct BackplaneEnvelope {
71 /// Opaque id of the publishing node. A receiver compares it to its own id and
72 /// skips the message if equal — the origin already delivered locally.
73 pub origin: String,
74 /// The delivery target, re-resolved against the receiving pod's registry.
75 pub target: Target,
76 /// The event payload, delivered verbatim to matching local sinks.
77 pub event: Value,
78}
79
80/// The connection backplane: a per-pod sink registry + cross-pod event delivery.
81///
82/// Implementations must be cheap to clone behind an `Arc` and safe to share
83/// across every connection task.
84#[async_trait]
85pub trait Backplane: Send + Sync {
86 /// Attach a connection's local outbound sink — this pod owns the socket.
87 /// Idempotent re-attach replaces the sink. The connection is always
88 /// reachable by [`Target::Connection`] with its own id.
89 async fn attach(&self, conn_id: &str, sink: LocalSink);
90
91 /// Detach a connection and drop all of its target associations and its local
92 /// sink (call on disconnect).
93 async fn detach(&self, conn_id: &str);
94
95 /// Associate a connection with a target (idempotent). Learned over the
96 /// connection's life: the session at `create_conversation_session`, the
97 /// user/org from auth, etc.
98 async fn associate(&self, conn_id: &str, target: Target);
99
100 /// Deliver `event` to every connection associated with `target`. Returns the
101 /// number of **local** deliveries made on this pod; cross-pod impls also fan
102 /// the event out to other pods (whose local deliveries this count omits).
103 async fn publish(&self, target: Target, event: Value) -> usize;
104}
105
106/// Single-process [`Backplane`]: an in-memory registry with direct local
107/// delivery. The default — keeps the server runnable standalone. Multi-pod
108/// deployments install a Redis / NATS impl instead.
109#[derive(Default)]
110pub struct InMemoryBackplane {
111 inner: RwLock<Registry>,
112}
113
114#[derive(Default)]
115struct Registry {
116 /// conn id → its local delivery sink.
117 sinks: HashMap<String, LocalSink>,
118 /// conn id → the targets it's associated with (for cleanup on detach).
119 conn_targets: HashMap<String, HashSet<Target>>,
120 /// target → the conn ids associated with it (for publish fan-out).
121 target_conns: HashMap<Target, HashSet<String>>,
122}
123
124impl InMemoryBackplane {
125 #[must_use]
126 pub fn new() -> Self {
127 Self::default()
128 }
129
130 /// Test/inspection helper: number of attached connections.
131 #[must_use]
132 pub fn connection_count(&self) -> usize {
133 self.inner.read().expect("backplane lock").sinks.len()
134 }
135}
136
137#[async_trait]
138impl Backplane for InMemoryBackplane {
139 async fn attach(&self, conn_id: &str, sink: LocalSink) {
140 let mut r = self.inner.write().expect("backplane lock");
141 r.sinks.insert(conn_id.to_string(), sink);
142 // Always reachable by its own connection id.
143 let self_target = Target::Connection(conn_id.to_string());
144 r.conn_targets
145 .entry(conn_id.to_string())
146 .or_default()
147 .insert(self_target.clone());
148 r.target_conns
149 .entry(self_target)
150 .or_default()
151 .insert(conn_id.to_string());
152 }
153
154 async fn detach(&self, conn_id: &str) {
155 let mut r = self.inner.write().expect("backplane lock");
156 r.sinks.remove(conn_id);
157 if let Some(targets) = r.conn_targets.remove(conn_id) {
158 for t in targets {
159 let empty = if let Some(set) = r.target_conns.get_mut(&t) {
160 set.remove(conn_id);
161 set.is_empty()
162 } else {
163 false
164 };
165 if empty {
166 r.target_conns.remove(&t);
167 }
168 }
169 }
170 }
171
172 async fn associate(&self, conn_id: &str, target: Target) {
173 let mut r = self.inner.write().expect("backplane lock");
174 r.conn_targets
175 .entry(conn_id.to_string())
176 .or_default()
177 .insert(target.clone());
178 r.target_conns
179 .entry(target)
180 .or_default()
181 .insert(conn_id.to_string());
182 }
183
184 async fn publish(&self, target: Target, event: Value) -> usize {
185 let r = self.inner.read().expect("backplane lock");
186 let Some(conns) = r.target_conns.get(&target) else {
187 return 0;
188 };
189 let mut delivered = 0;
190 for conn in conns {
191 if let Some(sink) = r.sinks.get(conn) {
192 // The sink closure is non-blocking (it pushes onto the
193 // connection's channel); safe to call under the read lock.
194 sink(event.clone());
195 delivered += 1;
196 }
197 }
198 delivered
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use serde_json::json;
206 use std::sync::mpsc::{channel, Receiver};
207
208 /// A runtime-agnostic test sink: a [`LocalSink`] closure feeding a std channel.
209 fn sink() -> (LocalSink, Receiver<Value>) {
210 let (tx, rx) = channel::<Value>();
211 (
212 Arc::new(move |v| {
213 let _ = tx.send(v);
214 }),
215 rx,
216 )
217 }
218
219 #[tokio::test]
220 async fn publishes_to_a_session_across_its_connections() {
221 let bp = InMemoryBackplane::new();
222 let (sa, rx_a) = sink();
223 let (sb, rx_b) = sink();
224 bp.attach("conn-a", sa).await;
225 bp.attach("conn-b", sb).await;
226 bp.associate("conn-a", Target::Session("s1".into())).await;
227 bp.associate("conn-b", Target::Session("s1".into())).await;
228
229 let n = bp
230 .publish(Target::Session("s1".into()), json!({"hi": 1}))
231 .await;
232 assert_eq!(n, 2);
233 assert_eq!(rx_a.try_recv().unwrap(), json!({"hi": 1}));
234 assert_eq!(rx_b.try_recv().unwrap(), json!({"hi": 1}));
235 }
236
237 #[tokio::test]
238 async fn publishes_to_a_single_connection() {
239 let bp = InMemoryBackplane::new();
240 let (s, rx) = sink();
241 bp.attach("conn-1", s).await;
242 let n = bp
243 .publish(Target::Connection("conn-1".into()), json!("ping"))
244 .await;
245 assert_eq!(n, 1);
246 assert_eq!(rx.try_recv().unwrap(), json!("ping"));
247 }
248
249 #[tokio::test]
250 async fn unknown_target_delivers_to_nobody() {
251 let bp = InMemoryBackplane::new();
252 assert_eq!(
253 bp.publish(Target::Session("nope".into()), json!(1)).await,
254 0
255 );
256 }
257
258 #[tokio::test]
259 async fn detach_removes_sink_and_associations() {
260 let bp = InMemoryBackplane::new();
261 let (s, _rx) = sink();
262 bp.attach("conn-x", s).await;
263 bp.associate("conn-x", Target::User("u1".into())).await;
264 assert_eq!(bp.connection_count(), 1);
265
266 bp.detach("conn-x").await;
267 assert_eq!(bp.connection_count(), 0);
268 // Its targets no longer resolve to it.
269 assert_eq!(bp.publish(Target::User("u1".into()), json!(1)).await, 0);
270 assert_eq!(
271 bp.publish(Target::Connection("conn-x".into()), json!(1))
272 .await,
273 0
274 );
275 }
276
277 #[tokio::test]
278 async fn a_connection_can_serve_multiple_targets() {
279 let bp = InMemoryBackplane::new();
280 let (s, rx) = sink();
281 bp.attach("c", s).await;
282 bp.associate("c", Target::Session("s".into())).await;
283 bp.associate("c", Target::Org("o".into())).await;
284 assert_eq!(
285 bp.publish(Target::Org("o".into()), json!("org-event"))
286 .await,
287 1
288 );
289 assert_eq!(
290 bp.publish(Target::Session("s".into()), json!("sess-event"))
291 .await,
292 1
293 );
294 assert_eq!(rx.try_recv().unwrap(), json!("org-event"));
295 assert_eq!(rx.try_recv().unwrap(), json!("sess-event"));
296 }
297}