1use bytes::Bytes;
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::{broadcast, watch, RwLock};
5
6#[derive(Debug, Clone)]
8pub struct BusMessage {
9 pub key: String,
10 pub entity: String,
11 pub payload: Arc<Bytes>,
12}
13
14#[derive(Clone)]
15#[allow(clippy::type_complexity)]
16pub struct BusManager {
17 state_buses: Arc<RwLock<HashMap<(String, String), watch::Sender<Arc<Bytes>>>>>,
18 list_buses: Arc<RwLock<HashMap<String, broadcast::Sender<Arc<BusMessage>>>>>,
19 broadcast_capacity: usize,
20}
21
22impl BusManager {
23 pub fn new() -> Self {
24 Self::with_capacity(1000)
25 }
26
27 pub fn with_capacity(capacity: usize) -> Self {
28 Self {
29 state_buses: Arc::new(RwLock::new(HashMap::new())),
30 list_buses: Arc::new(RwLock::new(HashMap::new())),
31 broadcast_capacity: capacity,
32 }
33 }
34
35 pub async fn get_or_create_state_bus(
38 &self,
39 view_id: &str,
40 key: &str,
41 ) -> watch::Receiver<Arc<Bytes>> {
42 let mut buses = self.state_buses.write().await;
43 let entry = (view_id.to_string(), key.to_string());
44
45 let tx = buses
46 .entry(entry)
47 .or_insert_with(|| {
48 let empty = Arc::new(Bytes::new());
49 watch::channel(empty).0
50 })
51 .clone();
52
53 tx.subscribe()
54 }
55
56 pub async fn get_or_create_list_bus(
57 &self,
58 view_id: &str,
59 ) -> broadcast::Receiver<Arc<BusMessage>> {
60 let mut buses = self.list_buses.write().await;
61
62 let tx = buses
63 .entry(view_id.to_string())
64 .or_insert_with(|| broadcast::channel(self.broadcast_capacity).0)
65 .clone();
66
67 tx.subscribe()
68 }
69
70 pub async fn publish_state(&self, view_id: &str, key: &str, frame: Arc<Bytes>) {
72 let buses = self.state_buses.read().await;
73 if let Some(tx) = buses.get(&(view_id.to_string(), key.to_string())) {
74 let _ = tx.send(frame);
75 }
76 }
77
78 pub async fn publish_list(&self, view_id: &str, message: Arc<BusMessage>) {
79 let buses = self.list_buses.read().await;
80 if let Some(tx) = buses.get(view_id) {
81 let _ = tx.send(message);
82 }
83 }
84
85 pub async fn cleanup_stale_state_buses(&self) -> usize {
86 let mut buses = self.state_buses.write().await;
87 let before = buses.len();
88
89 buses.retain(|_, tx| tx.receiver_count() > 0);
90
91 before - buses.len()
92 }
93
94 pub async fn cleanup_stale_list_buses(&self) -> usize {
95 let mut buses = self.list_buses.write().await;
96 let before = buses.len();
97
98 buses.retain(|_, tx| tx.receiver_count() > 0);
99
100 before - buses.len()
101 }
102
103 pub async fn bus_counts(&self) -> (usize, usize) {
104 let state_count = self.state_buses.read().await.len();
105 let list_count = self.list_buses.read().await.len();
106 (state_count, list_count)
107 }
108}
109
110impl Default for BusManager {
111 fn default() -> Self {
112 Self::new()
113 }
114}