hyperstack_server/
bus.rs

1use bytes::Bytes;
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::{broadcast, watch, RwLock};
5
6/// Message sent through the event bus
7#[derive(Debug, Clone)]
8pub struct BusMessage {
9    pub key: String,
10    pub entity: String,
11    pub payload: Arc<Bytes>,
12}
13
14/// Manager for all event buses in the system
15/// Supports multiple bus types for different streaming semantics
16#[derive(Clone)]
17#[allow(clippy::type_complexity)]
18pub struct BusManager {
19    state_buses: Arc<RwLock<HashMap<(String, String), watch::Sender<Arc<Bytes>>>>>,
20    kv_buses: Arc<RwLock<HashMap<String, broadcast::Sender<Arc<BusMessage>>>>>,
21    list_buses: Arc<RwLock<HashMap<String, broadcast::Sender<Arc<BusMessage>>>>>,
22    broadcast_capacity: usize,
23}
24
25impl BusManager {
26    pub fn new() -> Self {
27        Self::with_capacity(1000)
28    }
29
30    pub fn with_capacity(capacity: usize) -> Self {
31        Self {
32            state_buses: Arc::new(RwLock::new(HashMap::new())),
33            kv_buses: Arc::new(RwLock::new(HashMap::new())),
34            list_buses: Arc::new(RwLock::new(HashMap::new())),
35            broadcast_capacity: capacity,
36        }
37    }
38
39    /// Get or create a state bus (latest-value semantics)
40    /// Each (view_id, key) pair gets its own watch channel
41    pub async fn get_or_create_state_bus(
42        &self,
43        view_id: &str,
44        key: &str,
45    ) -> watch::Receiver<Arc<Bytes>> {
46        let mut buses = self.state_buses.write().await;
47        let entry = (view_id.to_string(), key.to_string());
48
49        let tx = buses
50            .entry(entry)
51            .or_insert_with(|| {
52                let empty = Arc::new(Bytes::new());
53                watch::channel(empty).0
54            })
55            .clone();
56
57        tx.subscribe()
58    }
59
60    /// Get or create a KV bus (key-value semantics with broadcast)
61    pub async fn get_or_create_kv_bus(
62        &self,
63        view_id: &str,
64    ) -> broadcast::Receiver<Arc<BusMessage>> {
65        let mut buses = self.kv_buses.write().await;
66
67        let tx = buses
68            .entry(view_id.to_string())
69            .or_insert_with(|| broadcast::channel(self.broadcast_capacity).0)
70            .clone();
71
72        tx.subscribe()
73    }
74
75    /// Get or create a list bus (append-only semantics)
76    pub async fn get_or_create_list_bus(
77        &self,
78        view_id: &str,
79    ) -> broadcast::Receiver<Arc<BusMessage>> {
80        let mut buses = self.list_buses.write().await;
81
82        let tx = buses
83            .entry(view_id.to_string())
84            .or_insert_with(|| broadcast::channel(self.broadcast_capacity).0)
85            .clone();
86
87        tx.subscribe()
88    }
89
90    /// Publish to a state bus (latest-value)
91    pub async fn publish_state(&self, view_id: &str, key: &str, frame: Arc<Bytes>) {
92        let buses = self.state_buses.read().await;
93        if let Some(tx) = buses.get(&(view_id.to_string(), key.to_string())) {
94            let _ = tx.send(frame);
95        }
96    }
97
98    /// Publish to a KV bus
99    pub async fn publish_kv(&self, view_id: &str, message: Arc<BusMessage>) {
100        let buses = self.kv_buses.read().await;
101        if let Some(tx) = buses.get(view_id) {
102            let _ = tx.send(message);
103        }
104    }
105
106    /// Publish to a list bus
107    pub async fn publish_list(&self, view_id: &str, message: Arc<BusMessage>) {
108        let buses = self.list_buses.read().await;
109        if let Some(tx) = buses.get(view_id) {
110            let _ = tx.send(message);
111        }
112    }
113}
114
115impl Default for BusManager {
116    fn default() -> Self {
117        Self::new()
118    }
119}