Skip to main content

monocoque_core/
router.rs

1//! ROUTER Hub (Phase 2.1)
2//!
3//! Goals:
4//! - Runtime-agnostic async loop (`flume::select`!, no tokio)
5//! - Strict types: `RouterCmd` has envelope, `PeerCmd` is body-only
6//! - Envelope normalization:
7//!   - inbound (actor->user) is normalized elsewhere to [ID, Empty, Body...]
8//!   - outbound (user->hub) accepts [ID, (Empty), Body...] in Standard mode
9//! - Load balancer mode: round-robin dispatch when no explicit routing id is used
10//! - "Ghost peer" self-heal: stale IDs removed from rr list when detected
11
12use bytes::Bytes;
13use flume::{Receiver, Sender};
14use hashbrown::HashMap;
15
16/// Commands sent from application to Router Hub
17#[derive(Debug)]
18pub enum RouterCmd {
19    /// Send a message (with routing envelope in Standard mode, or body-only in LB mode)
20    SendMessage(Vec<Bytes>),
21    /// Close all peers
22    Close,
23}
24
25/// Commands sent from Hub -> Peer (body only; hub strips any envelope)
26#[derive(Debug)]
27pub enum PeerCmd {
28    SendBody(Vec<Bytes>),
29    Close,
30}
31
32/// Events sent from Peer -> Hub (lifecycle)
33#[derive(Debug)]
34pub enum HubEvent {
35    PeerUp {
36        routing_id: Bytes, // Owned + stable
37        tx: Sender<PeerCmd>,
38    },
39    PeerDown {
40        routing_id: Bytes,
41    },
42}
43
44/// Router behavior modes.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum RouterBehavior {
47    /// Standard ROUTER: expects user outbound as [ID, (Empty), Body...]
48    /// If ID is unknown, drop silently (libzmq behavior).
49    Standard,
50
51    /// Load balancer: expects user outbound as [Body...]
52    /// Hub picks a peer using strict-ish RR.
53    LoadBalancer,
54}
55
56/// The Router Supervisor.
57///
58/// This runs once per ROUTER socket (listener), and coordinates N peers.
59pub struct RouterHub {
60    // routing table
61    peers: HashMap<Bytes, Sender<PeerCmd>>,
62
63    // LB rotation list (routing IDs)
64    lb_list: Vec<Bytes>,
65    lb_cursor: usize,
66    behavior: RouterBehavior,
67
68    // channels
69    hub_rx: Receiver<HubEvent>,
70    user_tx_rx: Receiver<RouterCmd>,
71}
72
73impl RouterHub {
74    #[must_use]
75    pub fn new(
76        hub_rx: Receiver<HubEvent>,
77        user_tx_rx: Receiver<RouterCmd>,
78        behavior: RouterBehavior,
79    ) -> Self {
80        Self {
81            peers: HashMap::new(),
82            lb_list: Vec::new(),
83            lb_cursor: 0,
84            behavior,
85            hub_rx,
86            user_tx_rx,
87        }
88    }
89
90    pub async fn run(mut self) {
91        use futures::select;
92        use futures::FutureExt;
93
94        loop {
95            // Use futures::select! for runtime-agnostic multiplexing
96            select! {
97                msg = self.hub_rx.recv_async().fuse() => {
98                    match msg {
99                        Ok(ev) => self.handle_peer_event(ev),
100                        Err(_) => break, // channel closed
101                    }
102                }
103                msg = self.user_tx_rx.recv_async().fuse() => {
104                    match msg {
105                        Ok(cmd) => self.handle_user_cmd(cmd),
106                        Err(_) => break, // channel closed
107                    }
108                }
109            }
110        }
111
112        // Best-effort: close all peers on hub shutdown.
113        for tx in self.peers.values() {
114            let _ = tx.send(PeerCmd::Close);
115        }
116    }
117
118    fn handle_peer_event(&mut self, event: HubEvent) {
119        match event {
120            HubEvent::PeerUp { routing_id, tx } => {
121                // Strict dedup: if ID exists, remove it from lb_list first to prevent drift.
122                if self.peers.contains_key(&routing_id) {
123                    if let Some(pos) = self.lb_list.iter().position(|x| x == &routing_id) {
124                        self.lb_list.remove(pos);
125                        if self.lb_cursor >= self.lb_list.len() {
126                            self.lb_cursor = 0;
127                        }
128                    }
129                }
130
131                // Move routing_id into lb_list, clone for peers map
132                self.lb_list.push(routing_id.clone());
133                self.peers.insert(routing_id, tx);
134            }
135
136            HubEvent::PeerDown { routing_id } => {
137                self.peers.remove(&routing_id);
138
139                // Remove from LB list (O(N) but churn is not hot-path).
140                if let Some(pos) = self.lb_list.iter().position(|x| x == &routing_id) {
141                    self.lb_list.remove(pos);
142                    if self.lb_cursor >= self.lb_list.len() {
143                        self.lb_cursor = 0;
144                    }
145                }
146            }
147        }
148    }
149
150    fn handle_user_cmd(&mut self, cmd: RouterCmd) {
151        match cmd {
152            RouterCmd::SendMessage(parts) => self.route_outbound(parts),
153            RouterCmd::Close => {
154                // broadcast close to peers
155                for tx in self.peers.values() {
156                    let _ = tx.send(PeerCmd::Close);
157                }
158            }
159        }
160    }
161
162    /// Self-healing Round Robin peer selection.
163    ///
164    /// Returns a routing id that is present in `peers`, while repairing stale entries in `lb_list`.
165    fn pick_rr_peer(&mut self) -> Option<Bytes> {
166        let mut attempts = 0usize;
167        let max_attempts = self.lb_list.len();
168
169        while !self.lb_list.is_empty() && attempts <= max_attempts {
170            if self.lb_cursor >= self.lb_list.len() {
171                self.lb_cursor = 0;
172            }
173
174            let id = self.lb_list[self.lb_cursor].clone();
175            // advance cursor for next pick
176            self.lb_cursor = (self.lb_cursor + 1) % self.lb_list.len();
177
178            if self.peers.contains_key(&id) {
179                return Some(id);
180            }
181
182            // stale entry => repair
183            if let Some(pos) = self.lb_list.iter().position(|x| x == &id) {
184                self.lb_list.remove(pos);
185                // cursor might now be out of bounds; loop header fixes it.
186            }
187
188            attempts += 1;
189        }
190
191        None
192    }
193
194    fn route_outbound(&mut self, mut parts: Vec<Bytes>) {
195        if parts.is_empty() {
196            return;
197        }
198
199        match self.behavior {
200            RouterBehavior::Standard => {
201                // Expect: [ID, (Empty), Body...]
202                // NOTE: `remove(0)` is O(n), but this is hub-path, not IO hot loop.
203                let target_id = parts.remove(0);
204
205                // Normalize: drop optional empty delimiter frame
206                if !parts.is_empty() && parts[0].is_empty() {
207                    parts.remove(0);
208                }
209
210                if let Some(tx) = self.peers.get(&target_id) {
211                    let _ = tx.send(PeerCmd::SendBody(parts));
212                } else {
213                    // ZMQ behavior: silently drop if unknown id
214                }
215            }
216
217            RouterBehavior::LoadBalancer => {
218                // Expect: [Body...]
219                if let Some(id) = self.pick_rr_peer() {
220                    if let Some(tx) = self.peers.get(&id) {
221                        let _ = tx.send(PeerCmd::SendBody(parts));
222                    }
223                } else {
224                    // No peers available: drop for now (backpressure elsewhere)
225                }
226            }
227        }
228    }
229}