monocoque_core/pubsub/
hub.rs1use crate::pubsub::index::{PeerKey, SubscriptionIndex};
15use crate::router::PeerCmd;
16
17use bytes::Bytes;
18use flume::{Receiver, Sender};
19use hashbrown::HashMap;
20
21#[derive(Debug)]
23pub enum PubSubCmd {
24 Publish(Vec<Bytes>),
26 Close,
28}
29
30#[derive(Debug)]
37pub enum PubSubEvent {
38 PeerUp {
39 routing_id: Bytes,
40 epoch: u64,
41 tx: Sender<PeerCmd>,
42 },
43 PeerDown {
44 routing_id: Bytes,
45 epoch: u64,
46 },
47 Subscribe {
48 routing_id: Bytes,
49 prefix: Bytes,
50 },
51 Unsubscribe {
52 routing_id: Bytes,
53 prefix: Bytes,
54 },
55}
56
57pub struct PubSubHub {
62 index: SubscriptionIndex,
64
65 rid_to_key: HashMap<Bytes, PeerKey>,
67
68 key_to_rid: HashMap<PeerKey, Bytes>,
70
71 peers: HashMap<PeerKey, (u64, Sender<PeerCmd>)>,
73
74 next_key: PeerKey,
76
77 hub_rx: Receiver<PubSubEvent>,
79
80 user_tx_rx: Receiver<PubSubCmd>,
82}
83
84impl PubSubHub {
85 #[must_use]
86 pub fn new(hub_rx: Receiver<PubSubEvent>, user_tx_rx: Receiver<PubSubCmd>) -> Self {
87 Self {
88 index: SubscriptionIndex::new(),
89 rid_to_key: HashMap::new(),
90 key_to_rid: HashMap::new(),
91 peers: HashMap::new(),
92 next_key: 1, hub_rx,
94 user_tx_rx,
95 }
96 }
97
98 pub async fn run(mut self) {
100 use futures::select;
101 use futures::FutureExt;
102
103 loop {
104 select! {
106 msg = self.hub_rx.recv_async().fuse() => {
107 match msg {
108 Ok(ev) => self.on_hub_event(ev),
109 Err(_) => break, }
111 }
112 msg = self.user_tx_rx.recv_async().fuse() => {
113 match msg {
114 Ok(cmd) => self.on_user_cmd(cmd),
115 Err(_) => break, }
117 }
118 }
119 }
120 }
121
122 fn on_hub_event(&mut self, ev: PubSubEvent) {
123 match ev {
124 PubSubEvent::PeerUp {
125 routing_id,
126 epoch,
127 tx,
128 } => {
129 let key = if let Some(&k) = self.rid_to_key.get(&routing_id) {
131 k
132 } else {
133 let k = self.next_key;
134 self.next_key += 1;
135 self.key_to_rid.insert(k, routing_id.clone());
137 self.rid_to_key.insert(routing_id, k);
138 k
139 };
140
141 self.peers.insert(key, (epoch, tx));
143 }
144
145 PubSubEvent::PeerDown { routing_id, epoch } => {
146 if let Some(&key) = self.rid_to_key.get(&routing_id) {
147 if let Some((current_epoch, _)) = self.peers.get(&key) {
148 if *current_epoch == epoch {
150 self.peers.remove(&key);
151 self.index.remove_peer_everywhere(key);
152 }
153 }
154 }
155 }
156
157 PubSubEvent::Subscribe { routing_id, prefix } => {
158 if let Some(&key) = self.rid_to_key.get(&routing_id) {
159 if self.peers.contains_key(&key) {
160 self.index.subscribe(key, prefix);
161 }
162 }
163 }
164
165 PubSubEvent::Unsubscribe { routing_id, prefix } => {
166 if let Some(&key) = self.rid_to_key.get(&routing_id) {
167 self.index.unsubscribe(key, &prefix);
168 }
169 }
170 }
171 }
172
173 fn on_user_cmd(&mut self, cmd: PubSubCmd) {
174 match cmd {
175 PubSubCmd::Publish(parts) => self.publish(parts),
176 PubSubCmd::Close => {
177 for (_, (_, tx)) in &self.peers {
179 let _ = tx.send(PeerCmd::Close);
180 }
181 }
182 }
183 }
184
185 fn publish(&mut self, parts: Vec<Bytes>) {
190 if parts.is_empty() || self.index.is_empty() {
191 return;
192 }
193
194 let topic = &parts[0];
195 let keys = self.index.match_topic(topic);
196
197 if keys.is_empty() {
198 return;
199 }
200
201 for key in keys {
205 if let Some((_, tx)) = self.peers.get(&key) {
206 let _ = tx.send(PeerCmd::SendBody(parts.clone()));
207 }
208 }
209 }
210}