monocoque_core/router.rs
1//! ROUTER Hub (Phase 2.1)
2//!
3//! Goals:
4//! - Runtime-agnostic async loop (`flume::select`!, no tokio)
5//! - Strict types: `RouterCmd` has envelope, `PeerCmd` is body-only
6//! - Envelope normalization:
7//! - inbound (actor->user) is normalized elsewhere to [ID, Empty, Body...]
8//! - outbound (user->hub) accepts [ID, (Empty), Body...] in Standard mode
9//! - Load balancer mode: round-robin dispatch when no explicit routing id is used
10//! - "Ghost peer" self-heal: stale IDs removed from rr list when detected
11
12use bytes::Bytes;
13use flume::{Receiver, Sender};
14use hashbrown::HashMap;
15
16/// Commands sent from application to Router Hub
17#[derive(Debug)]
18pub enum RouterCmd {
19 /// Send a message (with routing envelope in Standard mode, or body-only in LB mode)
20 SendMessage(Vec<Bytes>),
21 /// Close all peers
22 Close,
23}
24
25/// Commands sent from Hub -> Peer (body only; hub strips any envelope)
26#[derive(Debug)]
27pub enum PeerCmd {
28 SendBody(Vec<Bytes>),
29 Close,
30}
31
32/// Events sent from Peer -> Hub (lifecycle)
33#[derive(Debug)]
34pub enum HubEvent {
35 PeerUp {
36 routing_id: Bytes, // Owned + stable
37 tx: Sender<PeerCmd>,
38 },
39 PeerDown {
40 routing_id: Bytes,
41 },
42}
43
44/// Router behavior modes.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum RouterBehavior {
47 /// Standard ROUTER: expects user outbound as [ID, (Empty), Body...]
48 /// If ID is unknown, drop silently (libzmq behavior).
49 Standard,
50
51 /// Load balancer: expects user outbound as [Body...]
52 /// Hub picks a peer using strict-ish RR.
53 LoadBalancer,
54}
55
56/// The Router Supervisor.
57///
58/// This runs once per ROUTER socket (listener), and coordinates N peers.
59pub struct RouterHub {
60 // routing table
61 peers: HashMap<Bytes, Sender<PeerCmd>>,
62
63 // LB rotation list (routing IDs)
64 lb_list: Vec<Bytes>,
65 lb_cursor: usize,
66 behavior: RouterBehavior,
67
68 // channels
69 hub_rx: Receiver<HubEvent>,
70 user_tx_rx: Receiver<RouterCmd>,
71}
72
73impl RouterHub {
74 #[must_use]
75 pub fn new(
76 hub_rx: Receiver<HubEvent>,
77 user_tx_rx: Receiver<RouterCmd>,
78 behavior: RouterBehavior,
79 ) -> Self {
80 Self {
81 peers: HashMap::new(),
82 lb_list: Vec::new(),
83 lb_cursor: 0,
84 behavior,
85 hub_rx,
86 user_tx_rx,
87 }
88 }
89
90 pub async fn run(mut self) {
91 use futures::select;
92 use futures::FutureExt;
93
94 loop {
95 // Use futures::select! for runtime-agnostic multiplexing
96 select! {
97 msg = self.hub_rx.recv_async().fuse() => {
98 match msg {
99 Ok(ev) => self.handle_peer_event(ev),
100 Err(_) => break, // channel closed
101 }
102 }
103 msg = self.user_tx_rx.recv_async().fuse() => {
104 match msg {
105 Ok(cmd) => self.handle_user_cmd(cmd),
106 Err(_) => break, // channel closed
107 }
108 }
109 }
110 }
111
112 // Best-effort: close all peers on hub shutdown.
113 for tx in self.peers.values() {
114 let _ = tx.send(PeerCmd::Close);
115 }
116 }
117
118 fn handle_peer_event(&mut self, event: HubEvent) {
119 match event {
120 HubEvent::PeerUp { routing_id, tx } => {
121 // Strict dedup: if ID exists, remove it from lb_list first to prevent drift.
122 if self.peers.contains_key(&routing_id) {
123 if let Some(pos) = self.lb_list.iter().position(|x| x == &routing_id) {
124 self.lb_list.remove(pos);
125 if self.lb_cursor >= self.lb_list.len() {
126 self.lb_cursor = 0;
127 }
128 }
129 }
130
131 // Move routing_id into lb_list, clone for peers map
132 self.lb_list.push(routing_id.clone());
133 self.peers.insert(routing_id, tx);
134 }
135
136 HubEvent::PeerDown { routing_id } => {
137 self.peers.remove(&routing_id);
138
139 // Remove from LB list (O(N) but churn is not hot-path).
140 if let Some(pos) = self.lb_list.iter().position(|x| x == &routing_id) {
141 self.lb_list.remove(pos);
142 if self.lb_cursor >= self.lb_list.len() {
143 self.lb_cursor = 0;
144 }
145 }
146 }
147 }
148 }
149
150 fn handle_user_cmd(&mut self, cmd: RouterCmd) {
151 match cmd {
152 RouterCmd::SendMessage(parts) => self.route_outbound(parts),
153 RouterCmd::Close => {
154 // broadcast close to peers
155 for tx in self.peers.values() {
156 let _ = tx.send(PeerCmd::Close);
157 }
158 }
159 }
160 }
161
162 /// Self-healing Round Robin peer selection.
163 ///
164 /// Returns a routing id that is present in `peers`, while repairing stale entries in `lb_list`.
165 fn pick_rr_peer(&mut self) -> Option<Bytes> {
166 let mut attempts = 0usize;
167 let max_attempts = self.lb_list.len();
168
169 while !self.lb_list.is_empty() && attempts <= max_attempts {
170 if self.lb_cursor >= self.lb_list.len() {
171 self.lb_cursor = 0;
172 }
173
174 let id = self.lb_list[self.lb_cursor].clone();
175 // advance cursor for next pick
176 self.lb_cursor = (self.lb_cursor + 1) % self.lb_list.len();
177
178 if self.peers.contains_key(&id) {
179 return Some(id);
180 }
181
182 // stale entry => repair
183 if let Some(pos) = self.lb_list.iter().position(|x| x == &id) {
184 self.lb_list.remove(pos);
185 // cursor might now be out of bounds; loop header fixes it.
186 }
187
188 attempts += 1;
189 }
190
191 None
192 }
193
194 fn route_outbound(&mut self, mut parts: Vec<Bytes>) {
195 if parts.is_empty() {
196 return;
197 }
198
199 match self.behavior {
200 RouterBehavior::Standard => {
201 // Expect: [ID, (Empty), Body...]
202 // NOTE: `remove(0)` is O(n), but this is hub-path, not IO hot loop.
203 let target_id = parts.remove(0);
204
205 // Normalize: drop optional empty delimiter frame
206 if !parts.is_empty() && parts[0].is_empty() {
207 parts.remove(0);
208 }
209
210 if let Some(tx) = self.peers.get(&target_id) {
211 let _ = tx.send(PeerCmd::SendBody(parts));
212 } else {
213 // ZMQ behavior: silently drop if unknown id
214 }
215 }
216
217 RouterBehavior::LoadBalancer => {
218 // Expect: [Body...]
219 if let Some(id) = self.pick_rr_peer() {
220 if let Some(tx) = self.peers.get(&id) {
221 let _ = tx.send(PeerCmd::SendBody(parts));
222 }
223 } else {
224 // No peers available: drop for now (backpressure elsewhere)
225 }
226 }
227 }
228 }
229}