Skip to main content

monocoque_core/pubsub/
hub.rs

1//! PUB/SUB Hub (Phase 3)
2//!
3//! Responsibilities:
4//! - Maintain a stable mapping from `RoutingID` -> `PeerKey` (compact u64).
5//! - Track active peers with an Epoch to avoid ghost-peer races.
6//! - Apply SUB / UNSUB commands to the `SubscriptionIndex`.
7//! - Fan out published messages to matching peers (zero-copy via Bytes).
8//!
9//! Concurrency model:
10//! - Single-threaded async task.
11//! - Uses `flume::select`! for runtime-agnostic multiplexing.
12//! - No locks on the hot publish path.
13
14use crate::pubsub::index::{PeerKey, SubscriptionIndex};
15use crate::router::PeerCmd;
16
17use bytes::Bytes;
18use flume::{Receiver, Sender};
19use hashbrown::HashMap;
20
21/// Commands from application to `PubSub` Hub
22#[derive(Debug)]
23pub enum PubSubCmd {
24    /// Publish a message (frame 0 is topic)
25    Publish(Vec<Bytes>),
26    /// Close all peers
27    Close,
28}
29
30/// Events coming from peers (SUB sockets).
31///
32/// These are emitted when:
33/// - handshake completes
34/// - connection closes
35/// - SUB / UNSUB commands are parsed
36#[derive(Debug)]
37pub enum PubSubEvent {
38    PeerUp {
39        routing_id: Bytes,
40        epoch: u64,
41        tx: Sender<PeerCmd>,
42    },
43    PeerDown {
44        routing_id: Bytes,
45        epoch: u64,
46    },
47    Subscribe {
48        routing_id: Bytes,
49        prefix: Bytes,
50    },
51    Unsubscribe {
52        routing_id: Bytes,
53        prefix: Bytes,
54    },
55}
56
57/// Supervisor for PUB/SUB sockets.
58///
59/// This hub does *no* I/O itself.
60/// It only routes already-decoded messages between peers.
61pub struct PubSubHub {
62    /// Subscription index (topic -> peers)
63    index: SubscriptionIndex,
64
65    /// Stable mapping: `RoutingID` -> `PeerKey`
66    rid_to_key: HashMap<Bytes, PeerKey>,
67
68    /// Reverse mapping for cleanup/debug
69    key_to_rid: HashMap<PeerKey, Bytes>,
70
71    /// Active peers: `PeerKey` -> (epoch, sender)
72    peers: HashMap<PeerKey, (u64, Sender<PeerCmd>)>,
73
74    /// Monotonic key generator
75    next_key: PeerKey,
76
77    /// Events from peers
78    hub_rx: Receiver<PubSubEvent>,
79
80    /// Messages from user (publish path)
81    user_tx_rx: Receiver<PubSubCmd>,
82}
83
84impl PubSubHub {
85    #[must_use]
86    pub fn new(hub_rx: Receiver<PubSubEvent>, user_tx_rx: Receiver<PubSubCmd>) -> Self {
87        Self {
88            index: SubscriptionIndex::new(),
89            rid_to_key: HashMap::new(),
90            key_to_rid: HashMap::new(),
91            peers: HashMap::new(),
92            next_key: 1, // reserve 0
93            hub_rx,
94            user_tx_rx,
95        }
96    }
97
98    /// Main event loop.
99    pub async fn run(mut self) {
100        use futures::select;
101        use futures::FutureExt;
102
103        loop {
104            // Use futures::select! for runtime-agnostic multiplexing
105            select! {
106                msg = self.hub_rx.recv_async().fuse() => {
107                    match msg {
108                        Ok(ev) => self.on_hub_event(ev),
109                        Err(_) => break, // shutdown
110                    }
111                }
112                msg = self.user_tx_rx.recv_async().fuse() => {
113                    match msg {
114                        Ok(cmd) => self.on_user_cmd(cmd),
115                        Err(_) => break, // shutdown
116                    }
117                }
118            }
119        }
120    }
121
122    fn on_hub_event(&mut self, ev: PubSubEvent) {
123        match ev {
124            PubSubEvent::PeerUp {
125                routing_id,
126                epoch,
127                tx,
128            } => {
129                // Resolve or allocate PeerKey
130                let key = if let Some(&k) = self.rid_to_key.get(&routing_id) {
131                    k
132                } else {
133                    let k = self.next_key;
134                    self.next_key += 1;
135                    // Single clone for both bidirectional map inserts
136                    self.key_to_rid.insert(k, routing_id.clone());
137                    self.rid_to_key.insert(routing_id, k);
138                    k
139                };
140
141                // Overwrite any previous epoch (reconnect case)
142                self.peers.insert(key, (epoch, tx));
143            }
144
145            PubSubEvent::PeerDown { routing_id, epoch } => {
146                if let Some(&key) = self.rid_to_key.get(&routing_id) {
147                    if let Some((current_epoch, _)) = self.peers.get(&key) {
148                        // Epoch check prevents ghost-peer removal
149                        if *current_epoch == epoch {
150                            self.peers.remove(&key);
151                            self.index.remove_peer_everywhere(key);
152                        }
153                    }
154                }
155            }
156
157            PubSubEvent::Subscribe { routing_id, prefix } => {
158                if let Some(&key) = self.rid_to_key.get(&routing_id) {
159                    if self.peers.contains_key(&key) {
160                        self.index.subscribe(key, prefix);
161                    }
162                }
163            }
164
165            PubSubEvent::Unsubscribe { routing_id, prefix } => {
166                if let Some(&key) = self.rid_to_key.get(&routing_id) {
167                    self.index.unsubscribe(key, &prefix);
168                }
169            }
170        }
171    }
172
173    fn on_user_cmd(&mut self, cmd: PubSubCmd) {
174        match cmd {
175            PubSubCmd::Publish(parts) => self.publish(parts),
176            PubSubCmd::Close => {
177                // Broadcast close to all peers
178                for (_, (_, tx)) in &self.peers {
179                    let _ = tx.send(PeerCmd::Close);
180                }
181            }
182        }
183    }
184
185    /// Publish a multipart message.
186    ///
187    /// ZMQ convention:
188    /// - Frame 0 is the topic
189    fn publish(&mut self, parts: Vec<Bytes>) {
190        if parts.is_empty() || self.index.is_empty() {
191            return;
192        }
193
194        let topic = &parts[0];
195        let keys = self.index.match_topic(topic);
196
197        if keys.is_empty() {
198            return;
199        }
200
201        // Zero-copy fanout:
202        // - Vec<Bytes> is cloned (cheap)
203        // - Bytes are refcounted
204        for key in keys {
205            if let Some((_, tx)) = self.peers.get(&key) {
206                let _ = tx.send(PeerCmd::SendBody(parts.clone()));
207            }
208        }
209    }
210}