1use bytes::Bytes;
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::{broadcast, watch, RwLock};
5
6#[derive(Debug, Clone)]
8pub struct BusMessage {
9 pub key: String,
10 pub entity: String,
11 pub payload: Arc<Bytes>,
12}
13
14#[derive(Clone)]
17#[allow(clippy::type_complexity)]
18pub struct BusManager {
19 state_buses: Arc<RwLock<HashMap<(String, String), watch::Sender<Arc<Bytes>>>>>,
20 kv_buses: Arc<RwLock<HashMap<String, broadcast::Sender<Arc<BusMessage>>>>>,
21 list_buses: Arc<RwLock<HashMap<String, broadcast::Sender<Arc<BusMessage>>>>>,
22 broadcast_capacity: usize,
23}
24
25impl BusManager {
26 pub fn new() -> Self {
27 Self::with_capacity(1000)
28 }
29
30 pub fn with_capacity(capacity: usize) -> Self {
31 Self {
32 state_buses: Arc::new(RwLock::new(HashMap::new())),
33 kv_buses: Arc::new(RwLock::new(HashMap::new())),
34 list_buses: Arc::new(RwLock::new(HashMap::new())),
35 broadcast_capacity: capacity,
36 }
37 }
38
39 pub async fn get_or_create_state_bus(
42 &self,
43 view_id: &str,
44 key: &str,
45 ) -> watch::Receiver<Arc<Bytes>> {
46 let mut buses = self.state_buses.write().await;
47 let entry = (view_id.to_string(), key.to_string());
48
49 let tx = buses
50 .entry(entry)
51 .or_insert_with(|| {
52 let empty = Arc::new(Bytes::new());
53 watch::channel(empty).0
54 })
55 .clone();
56
57 tx.subscribe()
58 }
59
60 pub async fn get_or_create_kv_bus(
62 &self,
63 view_id: &str,
64 ) -> broadcast::Receiver<Arc<BusMessage>> {
65 let mut buses = self.kv_buses.write().await;
66
67 let tx = buses
68 .entry(view_id.to_string())
69 .or_insert_with(|| broadcast::channel(self.broadcast_capacity).0)
70 .clone();
71
72 tx.subscribe()
73 }
74
75 pub async fn get_or_create_list_bus(
77 &self,
78 view_id: &str,
79 ) -> broadcast::Receiver<Arc<BusMessage>> {
80 let mut buses = self.list_buses.write().await;
81
82 let tx = buses
83 .entry(view_id.to_string())
84 .or_insert_with(|| broadcast::channel(self.broadcast_capacity).0)
85 .clone();
86
87 tx.subscribe()
88 }
89
90 pub async fn publish_state(&self, view_id: &str, key: &str, frame: Arc<Bytes>) {
92 let buses = self.state_buses.read().await;
93 if let Some(tx) = buses.get(&(view_id.to_string(), key.to_string())) {
94 let _ = tx.send(frame);
95 }
96 }
97
98 pub async fn publish_kv(&self, view_id: &str, message: Arc<BusMessage>) {
100 let buses = self.kv_buses.read().await;
101 if let Some(tx) = buses.get(view_id) {
102 let _ = tx.send(message);
103 }
104 }
105
106 pub async fn publish_list(&self, view_id: &str, message: Arc<BusMessage>) {
108 let buses = self.list_buses.read().await;
109 if let Some(tx) = buses.get(view_id) {
110 let _ = tx.send(message);
111 }
112 }
113}
114
115impl Default for BusManager {
116 fn default() -> Self {
117 Self::new()
118 }
119}