hyperstack_server/
bus.rs

1use bytes::Bytes;
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::{broadcast, watch, RwLock};
5
6/// Message sent through the event bus
7#[derive(Debug, Clone)]
8pub struct BusMessage {
9    pub key: String,
10    pub entity: String,
11    pub payload: Arc<Bytes>,
12}
13
14#[derive(Clone)]
15#[allow(clippy::type_complexity)]
16pub struct BusManager {
17    state_buses: Arc<RwLock<HashMap<(String, String), watch::Sender<Arc<Bytes>>>>>,
18    list_buses: Arc<RwLock<HashMap<String, broadcast::Sender<Arc<BusMessage>>>>>,
19    broadcast_capacity: usize,
20}
21
22impl BusManager {
23    pub fn new() -> Self {
24        Self::with_capacity(1000)
25    }
26
27    pub fn with_capacity(capacity: usize) -> Self {
28        Self {
29            state_buses: Arc::new(RwLock::new(HashMap::new())),
30            list_buses: Arc::new(RwLock::new(HashMap::new())),
31            broadcast_capacity: capacity,
32        }
33    }
34
35    /// Get or create a state bus (latest-value semantics)
36    /// Each (view_id, key) pair gets its own watch channel
37    pub async fn get_or_create_state_bus(
38        &self,
39        view_id: &str,
40        key: &str,
41    ) -> watch::Receiver<Arc<Bytes>> {
42        let mut buses = self.state_buses.write().await;
43        let entry = (view_id.to_string(), key.to_string());
44
45        let tx = buses
46            .entry(entry)
47            .or_insert_with(|| {
48                let empty = Arc::new(Bytes::new());
49                watch::channel(empty).0
50            })
51            .clone();
52
53        tx.subscribe()
54    }
55
56    pub async fn get_or_create_list_bus(
57        &self,
58        view_id: &str,
59    ) -> broadcast::Receiver<Arc<BusMessage>> {
60        let mut buses = self.list_buses.write().await;
61
62        let tx = buses
63            .entry(view_id.to_string())
64            .or_insert_with(|| broadcast::channel(self.broadcast_capacity).0)
65            .clone();
66
67        tx.subscribe()
68    }
69
70    /// Publish to a state bus (latest-value)
71    pub async fn publish_state(&self, view_id: &str, key: &str, frame: Arc<Bytes>) {
72        let buses = self.state_buses.read().await;
73        if let Some(tx) = buses.get(&(view_id.to_string(), key.to_string())) {
74            let _ = tx.send(frame);
75        }
76    }
77
78    pub async fn publish_list(&self, view_id: &str, message: Arc<BusMessage>) {
79        let buses = self.list_buses.read().await;
80        if let Some(tx) = buses.get(view_id) {
81            let _ = tx.send(message);
82        }
83    }
84
85    pub async fn cleanup_stale_state_buses(&self) -> usize {
86        let mut buses = self.state_buses.write().await;
87        let before = buses.len();
88
89        buses.retain(|_, tx| tx.receiver_count() > 0);
90
91        before - buses.len()
92    }
93
94    pub async fn cleanup_stale_list_buses(&self) -> usize {
95        let mut buses = self.list_buses.write().await;
96        let before = buses.len();
97
98        buses.retain(|_, tx| tx.receiver_count() > 0);
99
100        before - buses.len()
101    }
102
103    pub async fn bus_counts(&self) -> (usize, usize) {
104        let state_count = self.state_buses.read().await.len();
105        let list_count = self.list_buses.read().await.len();
106        (state_count, list_count)
107    }
108}
109
110impl Default for BusManager {
111    fn default() -> Self {
112        Self::new()
113    }
114}