Skip to main content

kevy_elect/
transport.rs

1//! TCP control-plane transport for [`crate::Elector`] — the network
2//! half of T1.5.6. Drives the elector by reading inbound frames off
3//! one accept-side listener + writing outbound frames over one
4//! persistent connection per peer.
5//!
6//! Architecture: **one thread for the listener** + **one thread per
7//! outbound peer** + **one orchestrator thread** that owns the
8//! `Elector` and drives `tick` / `on_message` against it. Inbound
9//! frames + outbound dispatch + tick fire all flow through MPSC
10//! channels into the orchestrator (single-threaded against the
11//! elector — no Mutex on the hot path).
12//!
13//! Sockets are blocking TCP — kevy-elect's traffic is rare
14//! (heartbeats at 5 Hz default) so the busy-wait / async machinery
15//! that the keyspace plane needs is overkill here. The orchestrator
16//! checks the inbound channel with `recv_timeout(hb_interval)` so
17//! ticks fire at the configured cadence without burning a core.
18//!
19//! Out of scope (Phase 1.5): TLS / auth / connection pooling.
20
21use std::io::{Read, Write};
22use std::net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs};
23use std::sync::mpsc::{Receiver, Sender, channel, RecvTimeoutError};
24use std::sync::{Arc, Mutex};
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::thread::JoinHandle;
27use std::time::{Duration, Instant};
28
29use crate::elector::{Elector, Outbound};
30use crate::message::Message;
31use crate::wire::{DecodeError, decode, encode};
32
33/// Maximum buffer the per-connection reader holds before declaring
34/// the framing busted. Election frames are ≤ 256 B; 16 KiB is
35/// generous for misaligned partial reads.
36const READ_BUF_CAP: usize = 16 * 1024;
37
38/// Read-loop sleep on transient EAGAIN-equivalents (peer closed,
39/// I/O error during decode). Keeps the worker from a tight retry
40/// loop while still recovering on reconnect.
41const READ_RETRY_BACKOFF: Duration = Duration::from_millis(100);
42
43/// One inbound event the orchestrator processes. Either a decoded
44/// election message from a peer, or a "the connection from $peer
45/// went down" notification (so the orchestrator can clear any
46/// state that assumed the link was up).
47pub enum InboundEvent {
48    /// `(from_node_id, msg)`.
49    Message(String, Message),
50    /// The accept thread saw a new inbound connection but the
51    /// handshake / first-frame read failed. `String` is the peer
52    /// addr for diagnostics.
53    InboundConnFailed(String),
54}
55
56/// Shared state between the orchestrator + worker threads. Wraps
57/// the elector in a Mutex so the per-peer outbound threads can read
58/// the latest `epoch` / `repl_offset` for the next heartbeat
59/// without round-tripping through the orchestrator — but **only the
60/// orchestrator mutates** via `tick` / `on_message`.
61struct Shared {
62    elector: Mutex<Elector>,
63    /// Per-peer outbound queue. Indexed by `node_id`. Each worker
64    /// drains its own queue + writes onto the persistent TCP
65    /// stream; on stream death the queue is held until the worker
66    /// reconnects. Bounded by `MAX_PENDING_PER_PEER` to prevent a
67    /// dead peer from leaking memory.
68    out_queues: Mutex<std::collections::HashMap<String, std::collections::VecDeque<Message>>>,
69}
70
71const MAX_PENDING_PER_PEER: usize = 256;
72
73/// Per-peer addressing. Maps `node_id` → outbound dial address.
74#[derive(Debug, Clone)]
75pub struct PeerAddr {
76    /// Peer's stable node id (matches the `node_id` field the
77    /// peer puts in its `HB`).
78    pub node_id: String,
79    /// Peer's elect-control host (IP or DNS).
80    pub host: String,
81    /// Peer's elect-control TCP port.
82    pub port: u16,
83}
84
85/// Public handle to a running transport. Owns the orchestrator +
86/// listener + outbound worker threads. Dropping it signals stop
87/// and joins (best-effort within `JOIN_TIMEOUT`).
88pub struct Transport {
89    stop: Arc<AtomicBool>,
90    handles: Vec<JoinHandle<()>>,
91    shared: Arc<Shared>,
92    /// Cloned at construction-time so the kevy-server adapter can
93    /// query the live `epoch` / `role` / `current_primary` without
94    /// owning the inbound channel.
95    state_view: Arc<Shared>,
96}
97
98impl Transport {
99    /// Spawn the listener, per-peer outbound workers, and the
100    /// orchestrator. Returns immediately — the threads run until
101    /// `Transport` is dropped.
102    ///
103    /// `listen_addr` is the local `host:port` the listener binds
104    /// to (typically `0.0.0.0:elect_port`). `peers` lists every
105    /// OTHER node in the cluster (this node's own id is filtered
106    /// out by the elector at run-time).
107    pub fn spawn(
108        elector: Elector,
109        hb_interval: Duration,
110        listen_addr: (std::net::IpAddr, u16),
111        peers: Vec<PeerAddr>,
112    ) -> std::io::Result<Self> {
113        let shared = Arc::new(Shared {
114            elector: Mutex::new(elector),
115            out_queues: Mutex::new(std::collections::HashMap::new()),
116        });
117        let stop = Arc::new(AtomicBool::new(false));
118        let mut handles = Vec::new();
119        let (inbound_tx, inbound_rx) = channel::<InboundEvent>();
120
121        let listener = TcpListener::bind(listen_addr)?;
122        listener.set_nonblocking(false)?;
123        let listener_stop = stop.clone();
124        let listener_tx = inbound_tx.clone();
125        handles.push(
126            std::thread::Builder::new()
127                .name("kevy-elect-listener".to_string())
128                .spawn(move || {
129                    accept_loop(listener, listener_tx, listener_stop);
130                })?,
131        );
132
133        for peer in &peers {
134            let peer_stop = stop.clone();
135            let peer_shared = shared.clone();
136            let peer_clone = peer.clone();
137            handles.push(
138                std::thread::Builder::new()
139                    .name(format!("kevy-elect-out-{}", peer.node_id))
140                    .spawn(move || {
141                        outbound_loop(peer_clone, peer_shared, peer_stop);
142                    })?,
143            );
144        }
145
146        let orch_stop = stop.clone();
147        let orch_shared = shared.clone();
148        handles.push(
149            std::thread::Builder::new()
150                .name("kevy-elect-orchestrator".to_string())
151                .spawn(move || {
152                    orchestrator_loop(orch_shared, inbound_rx, hb_interval, orch_stop);
153                })?,
154        );
155
156        Ok(Self {
157            stop,
158            handles,
159            state_view: shared.clone(),
160            shared,
161        })
162    }
163
164    /// Read-side snapshot of the elector for `ROLE` / `INFO
165    /// replication`. Locks the elector mutex briefly; cheap.
166    pub fn state_snapshot(&self) -> ElectorSnapshot {
167        let e = self.state_view.elector.lock().expect("elector lock");
168        let now = std::time::Instant::now();
169        // T3.11 / F4: include the list of peers this node considers
170        // DOWN at snapshot time. kevy-scope's F4 fallback path reads
171        // this to decide "writer DOWN → fallback takes over"; the
172        // computation here is cheap (one pass over peer_ids).
173        let down_peers: Vec<String> = e
174            .peer_ids
175            .iter()
176            .filter(|id| id.as_str() != e.node_id.as_str())
177            .filter(|id| e.is_peer_down(id, now))
178            .cloned()
179            .collect();
180        ElectorSnapshot {
181            role: e.role(),
182            epoch: e.epoch(),
183            current_primary: e.current_primary().map(str::to_string),
184            down_peers,
185        }
186    }
187
188    /// Feed this node's replication offset into the elector.
189    pub fn set_repl_offset(&self, offset: u64) {
190        self.shared
191            .elector
192            .lock()
193            .expect("elector lock")
194            .set_repl_offset(offset);
195    }
196
197    /// Stop the transport. Joins all threads (with best-effort
198    /// timeout). Idempotent.
199    pub fn shutdown(mut self) {
200        self.stop.store(true, Ordering::Relaxed);
201        // Drain handles. We can't tell threads to exit a blocking
202        // recv mid-flight (channel close on Sender drop handles it),
203        // but the per-loop checks of `stop` flag are the canonical
204        // exit signal.
205        for h in self.handles.drain(..) {
206            let _ = h.join();
207        }
208    }
209}
210
211impl Drop for Transport {
212    fn drop(&mut self) {
213        self.stop.store(true, Ordering::Relaxed);
214    }
215}
216
217/// Read-side snapshot returned by [`Transport::state_snapshot`].
218#[derive(Debug, Clone)]
219pub struct ElectorSnapshot {
220    /// Self-perceived role at snapshot time.
221    pub role: crate::message::Role,
222    /// Election epoch at snapshot time.
223    pub epoch: u64,
224    /// Currently-known primary id (`None` until first ANNOUNCE).
225    pub current_primary: Option<String>,
226    /// Peers (excluding self) whose last `HB` is older than
227    /// `ElectConfig::down_after` — i.e. the down-set this node would
228    /// vote on at quorum time. kevy-scope's F4 fallback reads this
229    /// to decide whether the declared scope writer is reachable;
230    /// when the writer's id is present, the fallback takes over the
231    /// scope's writes.
232    pub down_peers: Vec<String>,
233}
234
235// ─────────── per-thread loops ───────────
236
237fn accept_loop(listener: TcpListener, tx: Sender<InboundEvent>, stop: Arc<AtomicBool>) {
238    // Non-blocking + short sleep so the loop can observe `stop`
239    // between accepts. Blocking `accept` would need a Shutdown-on-
240    // try_clone trick to interrupt; the non-blocking poll keeps the
241    // surface uniform with the outbound loop's busy-but-cheap
242    // pattern (election control plane is low-volume).
243    listener
244        .set_nonblocking(true)
245        .expect("listener set_nonblocking(true)");
246    while !stop.load(Ordering::Relaxed) {
247        match listener.accept() {
248            Ok((stream, addr)) => {
249                let _ = stream.set_nonblocking(false); // children block on reads.
250                let tx_clone = tx.clone();
251                let stop_clone = stop.clone();
252                let addr_str = addr.to_string();
253                let _ = std::thread::Builder::new()
254                    .name(format!("kevy-elect-in-{addr_str}"))
255                    .spawn(move || {
256                        inbound_read_loop(stream, addr_str, tx_clone, stop_clone);
257                    });
258            }
259            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
260                std::thread::sleep(Duration::from_millis(20));
261            }
262            Err(_) => {
263                std::thread::sleep(READ_RETRY_BACKOFF);
264            }
265        }
266    }
267}
268
269fn inbound_read_loop(
270    mut stream: TcpStream,
271    peer_addr: String,
272    tx: Sender<InboundEvent>,
273    stop: Arc<AtomicBool>,
274) {
275    let _ = stream.set_nodelay(true);
276    // Short read timeout so the loop can observe `stop` between
277    // reads. Blocking read otherwise can't be interrupted by a
278    // flag.
279    let _ = stream.set_read_timeout(Some(Duration::from_millis(200)));
280    let mut buf: Vec<u8> = Vec::with_capacity(READ_BUF_CAP);
281    let mut chunk = [0u8; 1024];
282    while !stop.load(Ordering::Relaxed) {
283        match stream.read(&mut chunk) {
284            Ok(0) => {
285                let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
286                return;
287            }
288            Ok(n) => {
289                buf.extend_from_slice(&chunk[..n]);
290                if buf.len() > READ_BUF_CAP {
291                    let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
292                    return;
293                }
294                while !buf.is_empty() {
295                    match decode(&buf) {
296                        Ok((msg, used)) => {
297                            let from = message_sender(&msg);
298                            let _ = tx.send(InboundEvent::Message(from, msg));
299                            buf.drain(..used);
300                        }
301                        Err(DecodeError::Truncated) => break,
302                        Err(_) => {
303                            let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
304                            return;
305                        }
306                    }
307                }
308            }
309            Err(e)
310                if e.kind() == std::io::ErrorKind::WouldBlock
311                    || e.kind() == std::io::ErrorKind::TimedOut =>
312            {
313                // Read timeout — loop to re-check `stop`.
314                continue;
315            }
316            Err(_) => {
317                let _ = tx.send(InboundEvent::InboundConnFailed(peer_addr.clone()));
318                return;
319            }
320        }
321    }
322}
323
324fn message_sender(msg: &Message) -> String {
325    // Every message variant carries the sender's id in a known
326    // field — use that as the per-elector "from" key for the
327    // orchestrator's on_message route.
328    match msg {
329        Message::Hb { node_id, .. } => node_id.clone(),
330        Message::Offer { candidate_id, .. } => candidate_id.clone(),
331        Message::Accept { accepter_id, .. } => accepter_id.clone(),
332        Message::Announce { new_primary_id, .. } => new_primary_id.clone(),
333    }
334}
335
336fn outbound_loop(peer: PeerAddr, shared: Arc<Shared>, stop: Arc<AtomicBool>) {
337    let mut stream: Option<TcpStream> = None;
338    while !stop.load(Ordering::Relaxed) {
339        if stream.is_none() {
340            stream = dial(&peer);
341            if stream.is_none() {
342                std::thread::sleep(READ_RETRY_BACKOFF);
343                continue;
344            }
345        }
346        // Drain this peer's outbound queue.
347        let next_msg = {
348            let mut qs = shared.out_queues.lock().expect("out_queues lock");
349            qs.get_mut(&peer.node_id).and_then(|q| q.pop_front())
350        };
351        let Some(msg) = next_msg else {
352            std::thread::sleep(Duration::from_millis(1));
353            continue;
354        };
355        let bytes = encode(&msg);
356        let Some(s) = stream.as_mut() else {
357            continue;
358        };
359        if s.write_all(&bytes).is_err() {
360            // Connection died. Drop + reconnect next iter; re-
361            // queue the in-flight message at the head.
362            let _ = s.shutdown(Shutdown::Both);
363            stream = None;
364            let mut qs = shared.out_queues.lock().expect("out_queues lock");
365            qs.entry(peer.node_id.clone()).or_default().push_front(msg);
366        }
367    }
368}
369
370fn dial(peer: &PeerAddr) -> Option<TcpStream> {
371    let target = (peer.host.as_str(), peer.port);
372    let addr_iter = target.to_socket_addrs().ok()?;
373    for sa in addr_iter {
374        match TcpStream::connect_timeout(&sa, Duration::from_millis(500)) {
375            Ok(s) => {
376                let _ = s.set_nodelay(true);
377                return Some(s);
378            }
379            Err(_) => continue,
380        }
381    }
382    None
383}
384
385fn orchestrator_loop(
386    shared: Arc<Shared>,
387    inbound_rx: Receiver<InboundEvent>,
388    hb_interval: Duration,
389    stop: Arc<AtomicBool>,
390) {
391    // Tick at hb_interval — wait up to that long on the inbound
392    // channel; either a message arrives + we process it, or the
393    // timeout fires + we run tick.
394    while !stop.load(Ordering::Relaxed) {
395        let mut outs: Vec<Outbound> = Vec::new();
396        match inbound_rx.recv_timeout(hb_interval) {
397            Ok(InboundEvent::Message(from, msg)) => {
398                let now = Instant::now();
399                let mut e = shared.elector.lock().expect("elector lock");
400                outs.extend(e.on_message(&from, msg, now));
401                outs.extend(e.tick(now));
402            }
403            Ok(InboundEvent::InboundConnFailed(_)) => {
404                // Logged elsewhere; no elector state change here
405                // (DOWN detection is driven by the lack of HBs, not
406                // by the absence of a TCP socket).
407            }
408            Err(RecvTimeoutError::Timeout) => {
409                let now = Instant::now();
410                let mut e = shared.elector.lock().expect("elector lock");
411                outs.extend(e.tick(now));
412            }
413            Err(RecvTimeoutError::Disconnected) => return,
414        }
415        if !outs.is_empty() {
416            let mut qs = shared.out_queues.lock().expect("out_queues lock");
417            for out in outs {
418                let targets: Vec<String> = if out.to == Outbound::BROADCAST {
419                    // Broadcast: enqueue to every peer that has a
420                    // queue (which is all of them — pre-seeded at
421                    // first outbound to that peer).
422                    qs.keys().cloned().collect()
423                } else {
424                    vec![out.to]
425                };
426                for target in targets {
427                    let q = qs.entry(target).or_default();
428                    if q.len() < MAX_PENDING_PER_PEER {
429                        q.push_back(out.msg.clone());
430                    }
431                }
432            }
433        }
434    }
435}
436