kevy_elect/transport.rs
1//! TCP control-plane transport for [`crate::Elector`] — the network
2//! half of T1.5.6. Drives the elector by reading inbound frames off
3//! one accept-side listener + writing outbound frames over one
4//! persistent connection per peer.
5//!
6//! Architecture: **one thread for the listener** + **one thread per
7//! outbound peer** + **one orchestrator thread** that owns the
8//! `Elector` and drives `tick` / `on_message` against it. Inbound
9//! frames + outbound dispatch + tick fire all flow through MPSC
10//! channels into the orchestrator (single-threaded against the
11//! elector — no Mutex on the hot path).
12//!
13//! Sockets are blocking TCP — kevy-elect's traffic is rare
14//! (heartbeats at 5 Hz default) so the busy-wait / async machinery
15//! that the keyspace plane needs is overkill here. The orchestrator
16//! checks the inbound channel with `recv_timeout(hb_interval)` so
17//! ticks fire at the configured cadence without burning a core.
18//!
19//! Out of scope (Phase 1.5): TLS / auth / connection pooling.
20
21use std::io::{Read, Write};
22use std::net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs};
23use std::sync::mpsc::{Receiver, Sender, channel, RecvTimeoutError};
24use std::sync::{Arc, Mutex};
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::thread::JoinHandle;
27use std::time::{Duration, Instant};
28
29use crate::elector::{Elector, Outbound};
30use crate::message::Message;
31use crate::wire::{DecodeError, decode, encode};
32
33/// Maximum buffer the per-connection reader holds before declaring
34/// the framing busted. Election frames are ≤ 256 B; 16 KiB is
35/// generous for misaligned partial reads.
36const READ_BUF_CAP: usize = 16 * 1024;
37
38/// Read-loop sleep on transient EAGAIN-equivalents (peer closed,
39/// I/O error during decode). Keeps the worker from a tight retry
40/// loop while still recovering on reconnect.
41const READ_RETRY_BACKOFF: Duration = Duration::from_millis(100);
42
43/// One inbound event the orchestrator processes. Either a decoded
44/// election message from a peer, or a "the connection from $peer
45/// went down" notification (so the orchestrator can clear any
46/// state that assumed the link was up).
47pub enum InboundEvent {
48 /// `(from_node_id, msg)`.
49 Message(String, Message),
50 /// The accept thread saw a new inbound connection but the
51 /// handshake / first-frame read failed. `String` is the peer
52 /// addr for diagnostics.
53 InboundConnFailed(String),
54}
55
56/// Shared state between the orchestrator + worker threads. Wraps
57/// the elector in a Mutex so the per-peer outbound threads can read
58/// the latest `epoch` / `repl_offset` for the next heartbeat
59/// without round-tripping through the orchestrator — but **only the
60/// orchestrator mutates** via `tick` / `on_message`.
61struct Shared {
62 elector: Mutex<Elector>,
63 /// Per-peer outbound queue. Indexed by `node_id`. Each worker
64 /// drains its own queue + writes onto the persistent TCP
65 /// stream; on stream death the queue is held until the worker
66 /// reconnects. Bounded by `MAX_PENDING_PER_PEER` to prevent a
67 /// dead peer from leaking memory.
68 out_queues: Mutex<std::collections::HashMap<String, std::collections::VecDeque<Message>>>,
69}
70
71const MAX_PENDING_PER_PEER: usize = 256;
72
73/// Per-peer addressing. Maps `node_id` → outbound dial address.
74#[derive(Debug, Clone)]
75pub struct PeerAddr {
76 /// Peer's stable node id (matches the `node_id` field the
77 /// peer puts in its `HB`).
78 pub node_id: String,
79 /// Peer's elect-control host (IP or DNS).
80 pub host: String,
81 /// Peer's elect-control TCP port.
82 pub port: u16,
83}
84
85/// Public handle to a running transport. Owns the orchestrator +
86/// listener + outbound worker threads. Dropping it signals stop
87/// and joins (best-effort within `JOIN_TIMEOUT`).
88pub struct Transport {
89 stop: Arc<AtomicBool>,
90 handles: Vec<JoinHandle<()>>,
91 shared: Arc<Shared>,
92 /// Cloned at construction-time so the kevy-server adapter can
93 /// query the live `epoch` / `role` / `current_primary` without
94 /// owning the inbound channel.
95 state_view: Arc<Shared>,
96}
97
98impl Transport {
99 /// Spawn the listener, per-peer outbound workers, and the
100 /// orchestrator. Returns immediately — the threads run until
101 /// `Transport` is dropped.
102 ///
103 /// `listen_addr` is the local `host:port` the listener binds
104 /// to (typically `0.0.0.0:elect_port`). `peers` lists every
105 /// OTHER node in the cluster (this node's own id is filtered
106 /// out by the elector at run-time).
107 pub fn spawn(
108 elector: Elector,
109 hb_interval: Duration,
110 listen_addr: (std::net::IpAddr, u16),
111 peers: Vec<PeerAddr>,
112 ) -> std::io::Result<Self> {
113 let shared = Arc::new(Shared {
114 elector: Mutex::new(elector),
115 out_queues: Mutex::new(std::collections::HashMap::new()),
116 });
117 let stop = Arc::new(AtomicBool::new(false));
118 let mut handles = Vec::new();
119 let (inbound_tx, inbound_rx) = channel::<InboundEvent>();
120
121 let listener = TcpListener::bind(listen_addr)?;
122 listener.set_nonblocking(false)?;
123 let listener_stop = stop.clone();
124 let listener_tx = inbound_tx.clone();
125 handles.push(
126 std::thread::Builder::new()
127 .name("kevy-elect-listener".to_string())
128 .spawn(move || {
129 accept_loop(listener, listener_tx, listener_stop);
130 })?,
131 );
132
133 for peer in &peers {
134 let peer_stop = stop.clone();
135 let peer_shared = shared.clone();
136 let peer_clone = peer.clone();
137 handles.push(
138 std::thread::Builder::new()
139 .name(format!("kevy-elect-out-{}", peer.node_id))
140 .spawn(move || {
141 outbound_loop(peer_clone, peer_shared, peer_stop);
142 })?,
143 );
144 }
145
146 let orch_stop = stop.clone();
147 let orch_shared = shared.clone();
148 handles.push(
149 std::thread::Builder::new()
150 .name("kevy-elect-orchestrator".to_string())
151 .spawn(move || {
152 orchestrator_loop(orch_shared, inbound_rx, hb_interval, orch_stop);
153 })?,
154 );
155
156 Ok(Self {
157 stop,
158 handles,
159 state_view: shared.clone(),
160 shared,
161 })
162 }
163
164 /// Read-side snapshot of the elector for `ROLE` / `INFO
165 /// replication`. Locks the elector mutex briefly; cheap.
166 pub fn state_snapshot(&self) -> ElectorSnapshot {
167 let e = self.state_view.elector.lock().expect("elector lock");
168 let now = std::time::Instant::now();
169 // T3.11 / F4: include the list of peers this node considers
170 // DOWN at snapshot time. kevy-scope's F4 fallback path reads
171 // this to decide "writer DOWN → fallback takes over"; the
172 // computation here is cheap (one pass over peer_ids).
173 let down_peers: Vec<String> = e
174 .peer_ids
175 .iter()
176 .filter(|id| id.as_str() != e.node_id.as_str())
177 .filter(|id| e.is_peer_down(id, now))
178 .cloned()
179 .collect();
180 ElectorSnapshot {
181 role: e.role(),
182 epoch: e.epoch(),
183 current_primary: e.current_primary().map(str::to_string),
184 down_peers,
185 }
186 }
187
188 /// Feed this node's replication offset into the elector.
189 pub fn set_repl_offset(&self, offset: u64) {
190 self.shared
191 .elector
192 .lock()
193 .expect("elector lock")
194 .set_repl_offset(offset);
195 }
196
197 /// Stop the transport. Joins all threads (with best-effort
198 /// timeout). Idempotent.
199 pub fn shutdown(mut self) {
200 self.stop.store(true, Ordering::Relaxed);
201 // Drain handles. We can't tell threads to exit a blocking
202 // recv mid-flight (channel close on Sender drop handles it),
203 // but the per-loop checks of `stop` flag are the canonical
204 // exit signal.
205 for h in self.handles.drain(..) {
206 let _ = h.join();
207 }
208 }
209}
210
211impl Drop for Transport {
212 fn drop(&mut self) {
213 self.stop.store(true, Ordering::Relaxed);
214 }
215}
216
217/// Read-side snapshot returned by [`Transport::state_snapshot`].
218#[derive(Debug, Clone)]
219pub struct ElectorSnapshot {
220 /// Self-perceived role at snapshot time.
221 pub role: crate::message::Role,
222 /// Election epoch at snapshot time.
223 pub epoch: u64,
224 /// Currently-known primary id (`None` until first ANNOUNCE).
225 pub current_primary: Option<String>,
226 /// Peers (excluding self) whose last `HB` is older than
227 /// `ElectConfig::down_after` — i.e. the down-set this node would
228 /// vote on at quorum time. kevy-scope's F4 fallback reads this
229 /// to decide whether the declared scope writer is reachable;
230 /// when the writer's id is present, the fallback takes over the
231 /// scope's writes.
232 pub down_peers: Vec<String>,
233}
234
235// ─────────── per-thread loops ───────────
236
237fn accept_loop(listener: TcpListener, tx: Sender<InboundEvent>, stop: Arc<AtomicBool>) {
238 // Non-blocking + short sleep so the loop can observe `stop`
239 // between accepts. Blocking `accept` would need a Shutdown-on-
240 // try_clone trick to interrupt; the non-blocking poll keeps the
241 // surface uniform with the outbound loop's busy-but-cheap
242 // pattern (election control plane is low-volume).
243 listener
244 .set_nonblocking(true)
245 .expect("listener set_nonblocking(true)");
246 while !stop.load(Ordering::Relaxed) {
247 match listener.accept() {
248 Ok((stream, addr)) => {
249 let _ = stream.set_nonblocking(false); // children block on reads.
250 let tx_clone = tx.clone();
251 let stop_clone = stop.clone();
252 let addr_str = addr.to_string();
253 let _ = std::thread::Builder::new()
254 .name(format!("kevy-elect-in-{addr_str}"))
255 .spawn(move || {
256 inbound_read_loop(stream, addr_str, tx_clone, stop_clone);
257 });
258 }
259 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
260 std::thread::sleep(Duration::from_millis(20));
261 }
262 Err(_) => {
263 std::thread::sleep(READ_RETRY_BACKOFF);
264 }
265 }
266 }
267}
268
269fn inbound_read_loop(
270 mut stream: TcpStream,
271 peer_addr: String,
272 tx: Sender<InboundEvent>,
273 stop: Arc<AtomicBool>,
274) {
275 let _ = stream.set_nodelay(true);
276 // Short read timeout so the loop can observe `stop` between
277 // reads. Blocking read otherwise can't be interrupted by a
278 // flag.
279 let _ = stream.set_read_timeout(Some(Duration::from_millis(200)));
280 let mut buf: Vec<u8> = Vec::with_capacity(READ_BUF_CAP);
281 let mut chunk = [0u8; 1024];
282 while !stop.load(Ordering::Relaxed) {
283 match stream.read(&mut chunk) {
284 Ok(0) => {
285 let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
286 return;
287 }
288 Ok(n) => {
289 buf.extend_from_slice(&chunk[..n]);
290 if buf.len() > READ_BUF_CAP {
291 let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
292 return;
293 }
294 while !buf.is_empty() {
295 match decode(&buf) {
296 Ok((msg, used)) => {
297 let from = message_sender(&msg);
298 let _ = tx.send(InboundEvent::Message(from, msg));
299 buf.drain(..used);
300 }
301 Err(DecodeError::Truncated) => break,
302 Err(_) => {
303 let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
304 return;
305 }
306 }
307 }
308 }
309 Err(e)
310 if e.kind() == std::io::ErrorKind::WouldBlock
311 || e.kind() == std::io::ErrorKind::TimedOut =>
312 {
313 // Read timeout — loop to re-check `stop`.
314 continue;
315 }
316 Err(_) => {
317 let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
318 return;
319 }
320 }
321 }
322}
323
324fn message_sender(msg: &Message) -> String {
325 // Every message variant carries the sender's id in a known
326 // field — use that as the per-elector "from" key for the
327 // orchestrator's on_message route.
328 match msg {
329 Message::Hb { node_id, .. } => node_id.clone(),
330 Message::Offer { candidate_id, .. } => candidate_id.clone(),
331 Message::Accept { accepter_id, .. } => accepter_id.clone(),
332 Message::Announce { new_primary_id, .. } => new_primary_id.clone(),
333 }
334}
335
336fn outbound_loop(peer: PeerAddr, shared: Arc<Shared>, stop: Arc<AtomicBool>) {
337 let mut stream: Option<TcpStream> = None;
338 while !stop.load(Ordering::Relaxed) {
339 if stream.is_none() {
340 stream = dial(&peer);
341 if stream.is_none() {
342 std::thread::sleep(READ_RETRY_BACKOFF);
343 continue;
344 }
345 }
346 // Drain this peer's outbound queue.
347 let next_msg = {
348 let mut qs = shared.out_queues.lock().expect("out_queues lock");
349 qs.get_mut(&peer.node_id).and_then(|q| q.pop_front())
350 };
351 let Some(msg) = next_msg else {
352 std::thread::sleep(Duration::from_millis(1));
353 continue;
354 };
355 let bytes = encode(&msg);
356 let Some(s) = stream.as_mut() else {
357 continue;
358 };
359 if s.write_all(&bytes).is_err() {
360 // Connection died. Drop + reconnect next iter; re-
361 // queue the in-flight message at the head.
362 let _ = s.shutdown(Shutdown::Both);
363 stream = None;
364 let mut qs = shared.out_queues.lock().expect("out_queues lock");
365 qs.entry(peer.node_id.clone()).or_default().push_front(msg);
366 }
367 }
368}
369
370fn dial(peer: &PeerAddr) -> Option<TcpStream> {
371 let target = (peer.host.as_str(), peer.port);
372 let addr_iter = target.to_socket_addrs().ok()?;
373 for sa in addr_iter {
374 match TcpStream::connect_timeout(&sa, Duration::from_millis(500)) {
375 Ok(s) => {
376 let _ = s.set_nodelay(true);
377 return Some(s);
378 }
379 Err(_) => continue,
380 }
381 }
382 None
383}
384
385fn orchestrator_loop(
386 shared: Arc<Shared>,
387 inbound_rx: Receiver<InboundEvent>,
388 hb_interval: Duration,
389 stop: Arc<AtomicBool>,
390) {
391 // Tick at hb_interval — wait up to that long on the inbound
392 // channel; either a message arrives + we process it, or the
393 // timeout fires + we run tick.
394 while !stop.load(Ordering::Relaxed) {
395 let mut outs: Vec<Outbound> = Vec::new();
396 match inbound_rx.recv_timeout(hb_interval) {
397 Ok(InboundEvent::Message(from, msg)) => {
398 let now = Instant::now();
399 let mut e = shared.elector.lock().expect("elector lock");
400 outs.extend(e.on_message(&from, msg, now));
401 outs.extend(e.tick(now));
402 }
403 Ok(InboundEvent::InboundConnFailed(_)) => {
404 // Logged elsewhere; no elector state change here
405 // (DOWN detection is driven by the lack of HBs, not
406 // by the absence of a TCP socket).
407 }
408 Err(RecvTimeoutError::Timeout) => {
409 let now = Instant::now();
410 let mut e = shared.elector.lock().expect("elector lock");
411 outs.extend(e.tick(now));
412 }
413 Err(RecvTimeoutError::Disconnected) => return,
414 }
415 if !outs.is_empty() {
416 let mut qs = shared.out_queues.lock().expect("out_queues lock");
417 for out in outs {
418 let targets: Vec<String> = if out.to == Outbound::BROADCAST {
419 // Broadcast: enqueue to every peer that has a
420 // queue (which is all of them — pre-seeded at
421 // first outbound to that peer).
422 qs.keys().cloned().collect()
423 } else {
424 vec![out.to]
425 };
426 for target in targets {
427 let q = qs.entry(target).or_default();
428 if q.len() < MAX_PENDING_PER_PEER {
429 q.push_back(out.msg.clone());
430 }
431 }
432 }
433 }
434 }
435}
436