Skip to main content

smooth_operator/
backplane.rs

1//! Connection backplane: the scale-out + event-delivery seam for the WebSocket
2//! server.
3//!
4//! The reference server is single-process — each connection's outbound sink is a
5//! per-connection in-process channel, with no registry and no way to reach a
6//! connection from outside its own read loop. That blocks two things we need:
7//!
8//! 1. **Horizontal scale-out.** With >1 replica, an agent turn (or any event)
9//!    produced on pod A must reach a socket held by pod B.
10//! 2. **Non-AI realtime.** Other parts of a host system (job status, ingestion
11//!    progress, notifications) want to push events to a connected client without
12//!    going through an agent turn.
13//!
14//! The [`Backplane`] trait is the seam for both. Each connection's local sink is
15//! [`attach`](Backplane::attach)ed on connect and [`associate`](Backplane::associate)d
16//! with targets (its session / user / org / agent) as they're learned;
17//! [`publish`](Backplane::publish) delivers an event to **every connection for a
18//! target, wherever its pod is**.
19//!
20//! - [`InMemoryBackplane`] (the default) keeps a local registry and delivers
21//!   straight to local sinks — single process, no external services.
22//! - A Redis / NATS impl (separate crate work) publishes to the bus, and each
23//!   pod's subscriber delivers to *its* local sinks — making the same `publish`
24//!   call fan out across the fleet.
25//!
26//! This module is a public **mechanism**: a host plugs its chosen impl in via
27//! `AppState::with_backplane(...)`.
28
29use std::collections::{HashMap, HashSet};
30use std::sync::{Arc, RwLock};
31
32use async_trait::async_trait;
33use serde::{Deserialize, Serialize};
34use serde_json::Value;
35
36/// A connection's local delivery sink: given an event, write it to that
37/// connection's socket. Runtime-agnostic — the server wraps its outbound channel
38/// in a closure, so the backplane (and this lib) take no async-runtime
39/// dependency.
40pub type LocalSink = Arc<dyn Fn(Value) + Send + Sync>;
41
42/// A delivery target: a single connection, or every connection associated with a
43/// session / user / org / agent.
44///
45/// `Serialize`/`Deserialize` so a distributed [`Backplane`] (Redis/NATS) can put
46/// the target on the wire alongside the event, and each pod re-resolve it against
47/// *its* local registry.
48#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub enum Target {
50    /// One specific connection.
51    Connection(String),
52    /// Every connection in a conversation session.
53    Session(String),
54    /// Every connection for a user.
55    User(String),
56    /// Every connection for an org/tenant.
57    Org(String),
58    /// Every connection talking to an agent.
59    Agent(String),
60}
61
62/// The wire format a distributed [`Backplane`] broadcasts on its bus: who
63/// published it (so a pod can skip its own echo), the [`Target`] to re-resolve
64/// locally, and the event payload.
65///
66/// Shared so the Redis and NATS adapters — and any host's own transport adapter —
67/// speak the same envelope; a pod on the other end deserializes this and calls
68/// `publish` on its **local** [`InMemoryBackplane`] to fan out to its sockets.
69#[derive(Clone, Debug, Serialize, Deserialize)]
70pub struct BackplaneEnvelope {
71    /// Opaque id of the publishing node. A receiver compares it to its own id and
72    /// skips the message if equal — the origin already delivered locally.
73    pub origin: String,
74    /// The delivery target, re-resolved against the receiving pod's registry.
75    pub target: Target,
76    /// The event payload, delivered verbatim to matching local sinks.
77    pub event: Value,
78}
79
80/// The connection backplane: a per-pod sink registry + cross-pod event delivery.
81///
82/// Implementations must be cheap to clone behind an `Arc` and safe to share
83/// across every connection task.
84#[async_trait]
85pub trait Backplane: Send + Sync {
86    /// Attach a connection's local outbound sink — this pod owns the socket.
87    /// Idempotent re-attach replaces the sink. The connection is always
88    /// reachable by [`Target::Connection`] with its own id.
89    async fn attach(&self, conn_id: &str, sink: LocalSink);
90
91    /// Detach a connection and drop all of its target associations and its local
92    /// sink (call on disconnect).
93    async fn detach(&self, conn_id: &str);
94
95    /// Associate a connection with a target (idempotent). Learned over the
96    /// connection's life: the session at `create_conversation_session`, the
97    /// user/org from auth, etc.
98    async fn associate(&self, conn_id: &str, target: Target);
99
100    /// Deliver `event` to every connection associated with `target`. Returns the
101    /// number of **local** deliveries made on this pod; cross-pod impls also fan
102    /// the event out to other pods (whose local deliveries this count omits).
103    async fn publish(&self, target: Target, event: Value) -> usize;
104}
105
106/// Single-process [`Backplane`]: an in-memory registry with direct local
107/// delivery. The default — keeps the server runnable standalone. Multi-pod
108/// deployments install a Redis / NATS impl instead.
109#[derive(Default)]
110pub struct InMemoryBackplane {
111    inner: RwLock<Registry>,
112}
113
114#[derive(Default)]
115struct Registry {
116    /// conn id → its local delivery sink.
117    sinks: HashMap<String, LocalSink>,
118    /// conn id → the targets it's associated with (for cleanup on detach).
119    conn_targets: HashMap<String, HashSet<Target>>,
120    /// target → the conn ids associated with it (for publish fan-out).
121    target_conns: HashMap<Target, HashSet<String>>,
122}
123
124impl InMemoryBackplane {
125    #[must_use]
126    pub fn new() -> Self {
127        Self::default()
128    }
129
130    /// Test/inspection helper: number of attached connections.
131    #[must_use]
132    pub fn connection_count(&self) -> usize {
133        self.inner.read().expect("backplane lock").sinks.len()
134    }
135}
136
137#[async_trait]
138impl Backplane for InMemoryBackplane {
139    async fn attach(&self, conn_id: &str, sink: LocalSink) {
140        let mut r = self.inner.write().expect("backplane lock");
141        r.sinks.insert(conn_id.to_string(), sink);
142        // Always reachable by its own connection id.
143        let self_target = Target::Connection(conn_id.to_string());
144        r.conn_targets
145            .entry(conn_id.to_string())
146            .or_default()
147            .insert(self_target.clone());
148        r.target_conns
149            .entry(self_target)
150            .or_default()
151            .insert(conn_id.to_string());
152    }
153
154    async fn detach(&self, conn_id: &str) {
155        let mut r = self.inner.write().expect("backplane lock");
156        r.sinks.remove(conn_id);
157        if let Some(targets) = r.conn_targets.remove(conn_id) {
158            for t in targets {
159                let empty = if let Some(set) = r.target_conns.get_mut(&t) {
160                    set.remove(conn_id);
161                    set.is_empty()
162                } else {
163                    false
164                };
165                if empty {
166                    r.target_conns.remove(&t);
167                }
168            }
169        }
170    }
171
172    async fn associate(&self, conn_id: &str, target: Target) {
173        let mut r = self.inner.write().expect("backplane lock");
174        r.conn_targets
175            .entry(conn_id.to_string())
176            .or_default()
177            .insert(target.clone());
178        r.target_conns
179            .entry(target)
180            .or_default()
181            .insert(conn_id.to_string());
182    }
183
184    async fn publish(&self, target: Target, event: Value) -> usize {
185        let r = self.inner.read().expect("backplane lock");
186        let Some(conns) = r.target_conns.get(&target) else {
187            return 0;
188        };
189        let mut delivered = 0;
190        for conn in conns {
191            if let Some(sink) = r.sinks.get(conn) {
192                // The sink closure is non-blocking (it pushes onto the
193                // connection's channel); safe to call under the read lock.
194                sink(event.clone());
195                delivered += 1;
196            }
197        }
198        delivered
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use serde_json::json;
206    use std::sync::mpsc::{channel, Receiver};
207
208    /// A runtime-agnostic test sink: a [`LocalSink`] closure feeding a std channel.
209    fn sink() -> (LocalSink, Receiver<Value>) {
210        let (tx, rx) = channel::<Value>();
211        (
212            Arc::new(move |v| {
213                let _ = tx.send(v);
214            }),
215            rx,
216        )
217    }
218
219    #[tokio::test]
220    async fn publishes_to_a_session_across_its_connections() {
221        let bp = InMemoryBackplane::new();
222        let (sa, rx_a) = sink();
223        let (sb, rx_b) = sink();
224        bp.attach("conn-a", sa).await;
225        bp.attach("conn-b", sb).await;
226        bp.associate("conn-a", Target::Session("s1".into())).await;
227        bp.associate("conn-b", Target::Session("s1".into())).await;
228
229        let n = bp
230            .publish(Target::Session("s1".into()), json!({"hi": 1}))
231            .await;
232        assert_eq!(n, 2);
233        assert_eq!(rx_a.try_recv().unwrap(), json!({"hi": 1}));
234        assert_eq!(rx_b.try_recv().unwrap(), json!({"hi": 1}));
235    }
236
237    #[tokio::test]
238    async fn publishes_to_a_single_connection() {
239        let bp = InMemoryBackplane::new();
240        let (s, rx) = sink();
241        bp.attach("conn-1", s).await;
242        let n = bp
243            .publish(Target::Connection("conn-1".into()), json!("ping"))
244            .await;
245        assert_eq!(n, 1);
246        assert_eq!(rx.try_recv().unwrap(), json!("ping"));
247    }
248
249    #[tokio::test]
250    async fn unknown_target_delivers_to_nobody() {
251        let bp = InMemoryBackplane::new();
252        assert_eq!(
253            bp.publish(Target::Session("nope".into()), json!(1)).await,
254            0
255        );
256    }
257
258    #[tokio::test]
259    async fn detach_removes_sink_and_associations() {
260        let bp = InMemoryBackplane::new();
261        let (s, _rx) = sink();
262        bp.attach("conn-x", s).await;
263        bp.associate("conn-x", Target::User("u1".into())).await;
264        assert_eq!(bp.connection_count(), 1);
265
266        bp.detach("conn-x").await;
267        assert_eq!(bp.connection_count(), 0);
268        // Its targets no longer resolve to it.
269        assert_eq!(bp.publish(Target::User("u1".into()), json!(1)).await, 0);
270        assert_eq!(
271            bp.publish(Target::Connection("conn-x".into()), json!(1))
272                .await,
273            0
274        );
275    }
276
277    #[tokio::test]
278    async fn a_connection_can_serve_multiple_targets() {
279        let bp = InMemoryBackplane::new();
280        let (s, rx) = sink();
281        bp.attach("c", s).await;
282        bp.associate("c", Target::Session("s".into())).await;
283        bp.associate("c", Target::Org("o".into())).await;
284        assert_eq!(
285            bp.publish(Target::Org("o".into()), json!("org-event"))
286                .await,
287            1
288        );
289        assert_eq!(
290            bp.publish(Target::Session("s".into()), json!("sess-event"))
291                .await,
292            1
293        );
294        assert_eq!(rx.try_recv().unwrap(), json!("org-event"));
295        assert_eq!(rx.try_recv().unwrap(), json!("sess-event"));
296    }
297}