use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub type LocalSink = Arc<dyn Fn(Value) + Send + Sync>;
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Target {
Connection(String),
Session(String),
User(String),
Org(String),
Agent(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BackplaneEnvelope {
pub origin: String,
pub target: Target,
pub event: Value,
}
#[async_trait]
pub trait Backplane: Send + Sync {
async fn attach(&self, conn_id: &str, sink: LocalSink);
async fn detach(&self, conn_id: &str);
async fn associate(&self, conn_id: &str, target: Target);
async fn publish(&self, target: Target, event: Value) -> usize;
}
#[derive(Default)]
pub struct InMemoryBackplane {
inner: RwLock<Registry>,
}
#[derive(Default)]
struct Registry {
sinks: HashMap<String, LocalSink>,
conn_targets: HashMap<String, HashSet<Target>>,
target_conns: HashMap<Target, HashSet<String>>,
}
impl InMemoryBackplane {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn connection_count(&self) -> usize {
self.inner.read().expect("backplane lock").sinks.len()
}
}
#[async_trait]
impl Backplane for InMemoryBackplane {
async fn attach(&self, conn_id: &str, sink: LocalSink) {
let mut r = self.inner.write().expect("backplane lock");
r.sinks.insert(conn_id.to_string(), sink);
let self_target = Target::Connection(conn_id.to_string());
r.conn_targets
.entry(conn_id.to_string())
.or_default()
.insert(self_target.clone());
r.target_conns
.entry(self_target)
.or_default()
.insert(conn_id.to_string());
}
async fn detach(&self, conn_id: &str) {
let mut r = self.inner.write().expect("backplane lock");
r.sinks.remove(conn_id);
if let Some(targets) = r.conn_targets.remove(conn_id) {
for t in targets {
let empty = if let Some(set) = r.target_conns.get_mut(&t) {
set.remove(conn_id);
set.is_empty()
} else {
false
};
if empty {
r.target_conns.remove(&t);
}
}
}
}
async fn associate(&self, conn_id: &str, target: Target) {
let mut r = self.inner.write().expect("backplane lock");
r.conn_targets
.entry(conn_id.to_string())
.or_default()
.insert(target.clone());
r.target_conns
.entry(target)
.or_default()
.insert(conn_id.to_string());
}
async fn publish(&self, target: Target, event: Value) -> usize {
let r = self.inner.read().expect("backplane lock");
let Some(conns) = r.target_conns.get(&target) else {
return 0;
};
let mut delivered = 0;
for conn in conns {
if let Some(sink) = r.sinks.get(conn) {
sink(event.clone());
delivered += 1;
}
}
delivered
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::mpsc::{channel, Receiver};
fn sink() -> (LocalSink, Receiver<Value>) {
let (tx, rx) = channel::<Value>();
(
Arc::new(move |v| {
let _ = tx.send(v);
}),
rx,
)
}
#[tokio::test]
async fn publishes_to_a_session_across_its_connections() {
let bp = InMemoryBackplane::new();
let (sa, rx_a) = sink();
let (sb, rx_b) = sink();
bp.attach("conn-a", sa).await;
bp.attach("conn-b", sb).await;
bp.associate("conn-a", Target::Session("s1".into())).await;
bp.associate("conn-b", Target::Session("s1".into())).await;
let n = bp
.publish(Target::Session("s1".into()), json!({"hi": 1}))
.await;
assert_eq!(n, 2);
assert_eq!(rx_a.try_recv().unwrap(), json!({"hi": 1}));
assert_eq!(rx_b.try_recv().unwrap(), json!({"hi": 1}));
}
#[tokio::test]
async fn publishes_to_a_single_connection() {
let bp = InMemoryBackplane::new();
let (s, rx) = sink();
bp.attach("conn-1", s).await;
let n = bp
.publish(Target::Connection("conn-1".into()), json!("ping"))
.await;
assert_eq!(n, 1);
assert_eq!(rx.try_recv().unwrap(), json!("ping"));
}
#[tokio::test]
async fn unknown_target_delivers_to_nobody() {
let bp = InMemoryBackplane::new();
assert_eq!(
bp.publish(Target::Session("nope".into()), json!(1)).await,
0
);
}
#[tokio::test]
async fn detach_removes_sink_and_associations() {
let bp = InMemoryBackplane::new();
let (s, _rx) = sink();
bp.attach("conn-x", s).await;
bp.associate("conn-x", Target::User("u1".into())).await;
assert_eq!(bp.connection_count(), 1);
bp.detach("conn-x").await;
assert_eq!(bp.connection_count(), 0);
assert_eq!(bp.publish(Target::User("u1".into()), json!(1)).await, 0);
assert_eq!(
bp.publish(Target::Connection("conn-x".into()), json!(1))
.await,
0
);
}
#[tokio::test]
async fn a_connection_can_serve_multiple_targets() {
let bp = InMemoryBackplane::new();
let (s, rx) = sink();
bp.attach("c", s).await;
bp.associate("c", Target::Session("s".into())).await;
bp.associate("c", Target::Org("o".into())).await;
assert_eq!(
bp.publish(Target::Org("o".into()), json!("org-event"))
.await,
1
);
assert_eq!(
bp.publish(Target::Session("s".into()), json!("sess-event"))
.await,
1
);
assert_eq!(rx.try_recv().unwrap(), json!("org-event"));
assert_eq!(rx.try_recv().unwrap(), json!("sess-event"));
}
}