Skip to main content

atomr_cluster/
cluster_daemon.rs

1//! `ClusterDaemon` — the actor that owns [`MembershipState`] and drives
2//! the active gossip / leader-action / SBR ticks.
3//!
4//! Architecture
5//! ------------
6//! * One `tokio::task::JoinHandle` runs the main loop.
7//! * `mpsc::UnboundedSender<DaemonCmd>` is the inbox for control
8//!   messages (`Join`, `Leave`, `ApplyGossip`, `Tick`, `Shutdown`).
9//! * Per-tick the daemon
10//!   1. applies leader actions (`MembershipState::apply_leader_actions`)
11//!   2. runs the SBR runtime (if installed) and applies the resulting
12//!      `SbrAction` (currently `DownUnreachable` is the only one that
13//!      mutates state directly).
14//!   3. picks a gossip target via [`pick_gossip_target`] and emits an
15//!      outbound `GossipPdu` through a caller-supplied transport
16//!      callback. The transport is abstracted as a `dyn GossipTransport`
17//!      trait so we can plug in either the in-process [`crate::ClusterRemoteAdapter`]
18//!      or a real `atomr-remote` endpoint once Phase 5.D wires it.
19//!
20//! Tests use [`InMemoryGossipTransport`] which routes between two
21//! daemons in the same process.
22
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use atomr_core::actor::Address;
27use parking_lot::Mutex;
28use tokio::sync::mpsc;
29use tokio::task::JoinHandle;
30
31use crate::events::ClusterEventBus;
32use crate::gossip_pdu::{decide as gossip_decide, pick_gossip_target, GossipDecision, GossipPdu};
33use crate::leader::elect_leader;
34use crate::member::Member;
35use crate::membership::MembershipState;
36use crate::sbr::DowningStrategy;
37use crate::sbr_runtime::{SbrAction, SbrRuntime};
38use crate::vector_clock::VectorClock;
39
40/// Pluggable transport for gossip PDUs.
41pub trait GossipTransport: Send + Sync + 'static {
42    /// Deliver `pdu` to `target`. The transport is "best effort" —
43    /// errors must not crash the daemon.
44    fn send(&self, target: &Address, pdu: GossipPdu);
45}
46
47/// Control commands accepted by the daemon mailbox.
48#[derive(Debug)]
49pub enum DaemonCmd {
50    /// Add/update a member (Joining).
51    Join(Member),
52    /// Mark `addr` as Leaving.
53    Leave(Address),
54    /// Mark `addr` as `Down` directly. Operator-initiated terminal
55    /// down — does not require the member to be `Up` first. Accepts
56    /// `Up | WeaklyUp | Leaving`. Subsequent leader-action ticks then
57    /// promote `Down → Removed`.
58    Down(Address),
59    /// Inject a peer's gossip PDU (called by the transport on receive).
60    ApplyGossip(GossipPdu),
61    /// Force a single tick (mostly for tests).
62    Tick,
63    /// Stop the daemon loop.
64    Shutdown,
65}
66
67/// Configuration knobs.
68#[derive(Debug, Clone)]
69pub struct DaemonConfig {
70    /// How often the daemon ticks (gossip + leader actions).
71    pub gossip_interval: Duration,
72}
73
74impl Default for DaemonConfig {
75    fn default() -> Self {
76        Self { gossip_interval: Duration::from_millis(1_000) }
77    }
78}
79
80/// Snapshot of the daemon state used by `peer_state` queries.
81#[derive(Debug, Clone, Default)]
82pub struct DaemonSnapshot {
83    pub state: MembershipState,
84    pub leader: Option<Address>,
85    pub version: VectorClock,
86}
87
88/// Public handle to a running `ClusterDaemon`.
89pub struct ClusterDaemonHandle {
90    cmd: mpsc::UnboundedSender<DaemonCmd>,
91    snapshot: Arc<Mutex<DaemonSnapshot>>,
92    join: Option<JoinHandle<()>>,
93    bus: ClusterEventBus,
94    self_addr: Address,
95}
96
97impl ClusterDaemonHandle {
98    pub fn join(&self, m: Member) {
99        let _ = self.cmd.send(DaemonCmd::Join(m));
100    }
101    pub fn leave(&self, addr: Address) {
102        let _ = self.cmd.send(DaemonCmd::Leave(addr));
103    }
104    /// Operator-initiated terminal-down for `addr`. Transitions the
105    /// member to `MemberStatus::Down` (publishes `MemberDowned`); the
106    /// daemon's regular convergence flow then promotes `Down → Removed`
107    /// on the next leader tick.
108    pub fn down(&self, addr: Address) {
109        let _ = self.cmd.send(DaemonCmd::Down(addr));
110    }
111    pub fn apply_gossip(&self, pdu: GossipPdu) {
112        let _ = self.cmd.send(DaemonCmd::ApplyGossip(pdu));
113    }
114    pub fn tick(&self) {
115        let _ = self.cmd.send(DaemonCmd::Tick);
116    }
117    pub fn snapshot(&self) -> DaemonSnapshot {
118        self.snapshot.lock().clone()
119    }
120    pub fn events(&self) -> &ClusterEventBus {
121        &self.bus
122    }
123    pub fn address(&self) -> &Address {
124        &self.self_addr
125    }
126    /// Cheaply-cloneable inbox that delivers `GossipPdu`s into this
127    /// daemon. Used by transport adapters that need to fan inbound
128    /// PDUs into the daemon without holding the [`ClusterDaemonHandle`]
129    /// itself (which is consume-on-shutdown).
130    pub fn gossip_inbox(&self) -> mpsc::UnboundedSender<GossipPdu> {
131        let cmd = self.cmd.clone();
132        let (tx, mut rx) = mpsc::unbounded_channel::<GossipPdu>();
133        tokio::spawn(async move {
134            while let Some(p) = rx.recv().await {
135                let _ = cmd.send(DaemonCmd::ApplyGossip(p));
136            }
137        });
138        tx
139    }
140
141    /// Stop and join.
142    pub async fn shutdown(mut self) {
143        let _ = self.cmd.send(DaemonCmd::Shutdown);
144        if let Some(j) = self.join.take() {
145            let _ = j.await;
146        }
147    }
148}
149
150impl Drop for ClusterDaemonHandle {
151    fn drop(&mut self) {
152        let _ = self.cmd.send(DaemonCmd::Shutdown);
153        if let Some(j) = self.join.take() {
154            j.abort();
155        }
156    }
157}
158
159/// Spawn a daemon. The caller provides a transport implementation;
160/// the daemon never blocks on it.
161pub fn spawn_daemon(
162    self_addr: Address,
163    transport: Arc<dyn GossipTransport>,
164    bus: ClusterEventBus,
165    cfg: DaemonConfig,
166) -> ClusterDaemonHandle {
167    spawn_daemon_with_sbr::<NoSbr>(self_addr, transport, bus, cfg, None)
168}
169
170/// Same as [`spawn_daemon`] but installs an SBR runtime.
171pub fn spawn_daemon_with_sbr<S>(
172    self_addr: Address,
173    transport: Arc<dyn GossipTransport>,
174    bus: ClusterEventBus,
175    cfg: DaemonConfig,
176    sbr: Option<SbrRuntime<S>>,
177) -> ClusterDaemonHandle
178where
179    S: DowningStrategy + Send + 'static,
180{
181    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
182    let snapshot = Arc::new(Mutex::new(DaemonSnapshot::default()));
183    let snap2 = snapshot.clone();
184    let bus2 = bus.clone();
185    let self_addr2 = self_addr.clone();
186    let join = tokio::spawn(run_daemon::<S>(self_addr.clone(), transport, bus2, cfg, sbr, cmd_rx, snap2));
187    ClusterDaemonHandle { cmd: cmd_tx, snapshot, join: Some(join), bus, self_addr: self_addr2 }
188}
189
190/// Marker `DowningStrategy` used when no SBR runtime is installed.
191pub struct NoSbr;
192
193impl DowningStrategy for NoSbr {
194    fn decide(&self, _r: &[&Member], _u: &[&Member]) -> crate::sbr::DowningDecision {
195        crate::sbr::DowningDecision::Stay
196    }
197}
198
199async fn run_daemon<S>(
200    self_addr: Address,
201    transport: Arc<dyn GossipTransport>,
202    bus: ClusterEventBus,
203    cfg: DaemonConfig,
204    mut sbr: Option<SbrRuntime<S>>,
205    mut cmd_rx: mpsc::UnboundedReceiver<DaemonCmd>,
206    snapshot: Arc<Mutex<DaemonSnapshot>>,
207) where
208    S: DowningStrategy + Send + 'static,
209{
210    let mut state = MembershipState::new();
211    let mut version = VectorClock::new();
212    let mut last_leader: Option<Address> = None;
213    let mut cursor: usize = 0;
214    let mut ticker = tokio::time::interval(cfg.gossip_interval);
215    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
216
217    loop {
218        tokio::select! {
219            biased;
220            cmd = cmd_rx.recv() => match cmd {
221                None => break,
222                Some(DaemonCmd::Shutdown) => break,
223                Some(DaemonCmd::Join(m)) => {
224                    let evt = state.join(m);
225                    version.tick(self_addr.to_string().as_str());
226                    bus.publish(evt);
227                }
228                Some(DaemonCmd::Leave(addr)) => {
229                    if let Some(evt) = state.leave(&addr) {
230                        version.tick(self_addr.to_string().as_str());
231                        bus.publish(evt);
232                    }
233                }
234                Some(DaemonCmd::Down(addr)) => {
235                    if let Some(evt) = state.down(&addr) {
236                        version.tick(self_addr.to_string().as_str());
237                        bus.publish(evt);
238                    }
239                }
240                Some(DaemonCmd::ApplyGossip(pdu)) => {
241                    handle_pdu(&self_addr, &transport, &bus, &mut state, &mut version, pdu);
242                }
243                Some(DaemonCmd::Tick) => {
244                    do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
245                            &mut sbr, &mut last_leader, &mut cursor);
246                }
247            },
248            _ = ticker.tick() => {
249                do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
250                        &mut sbr, &mut last_leader, &mut cursor);
251            }
252        }
253        // Update snapshot.
254        let leader = elect_leader(&state);
255        *snapshot.lock() = DaemonSnapshot { state: state.clone(), leader, version: version.clone() };
256    }
257}
258
259#[allow(clippy::too_many_arguments)]
260fn do_tick<S>(
261    self_addr: &Address,
262    transport: &Arc<dyn GossipTransport>,
263    bus: &ClusterEventBus,
264    state: &mut MembershipState,
265    version: &mut VectorClock,
266    sbr: &mut Option<SbrRuntime<S>>,
267    last_leader: &mut Option<Address>,
268    cursor: &mut usize,
269) where
270    S: DowningStrategy + Send + 'static,
271{
272    // 1) Leader actions.
273    let evts = state.apply_leader_actions();
274    let mutated = !evts.is_empty();
275    for e in evts {
276        bus.publish(e);
277    }
278    if mutated {
279        version.tick(self_addr.to_string().as_str());
280    }
281
282    // 2) Leader change?
283    let leader_now = elect_leader(state);
284    if leader_now != *last_leader {
285        bus.publish(crate::events::ClusterEvent::LeaderChanged {
286            from: last_leader.clone(),
287            to: leader_now.clone(),
288        });
289        *last_leader = leader_now;
290    }
291
292    // 3) SBR.
293    if let Some(rt) = sbr.as_mut() {
294        match rt.tick(state, Instant::now()) {
295            SbrAction::None | SbrAction::DownSelf => {}
296            SbrAction::DownUnreachable(addrs) | SbrAction::DownAll(addrs) => {
297                for a in addrs {
298                    if let Some(m) = state.members.iter_mut().find(|m| m.address.to_string() == a) {
299                        m.status = crate::member::MemberStatus::Down;
300                    }
301                }
302                version.tick(self_addr.to_string().as_str());
303            }
304        }
305    }
306
307    // 4) Active gossip dissemination.
308    let peers: Vec<Address> = state.members.iter().map(|m| m.address.clone()).collect();
309    if let Some(target) = pick_gossip_target(&peers, self_addr, *cursor) {
310        let pdu = GossipPdu::Status { from: self_addr.to_string(), version: version.clone() };
311        transport.send(target, pdu);
312        *cursor = cursor.wrapping_add(1);
313    }
314}
315
316fn handle_pdu(
317    self_addr: &Address,
318    transport: &Arc<dyn GossipTransport>,
319    bus: &ClusterEventBus,
320    state: &mut MembershipState,
321    version: &mut VectorClock,
322    pdu: GossipPdu,
323) {
324    match pdu {
325        GossipPdu::Status { from, version: their } => {
326            let target = parse_address(&from);
327            match gossip_decide(version, &their) {
328                GossipDecision::SendEnvelope | GossipDecision::MergeBoth => {
329                    if let Some(t) = &target {
330                        transport.send(
331                            t,
332                            GossipPdu::Envelope {
333                                from: self_addr.to_string(),
334                                version: version.clone(),
335                                state: state.clone(),
336                            },
337                        );
338                    }
339                }
340                GossipDecision::RequestMerge => {
341                    if let Some(t) = &target {
342                        transport.send(
343                            t,
344                            GossipPdu::Merge { from: self_addr.to_string(), version: version.clone() },
345                        );
346                    }
347                }
348                GossipDecision::Same => {}
349            }
350        }
351        GossipPdu::Envelope { from: _, version: their, state: their_state } => {
352            // Naive merge: union members, prefer "later" status order, merge reachability.
353            merge_state(state, their_state);
354            *version = version.merge(&their);
355            let _ = bus; // events published via leader-action path on next tick
356        }
357        GossipPdu::Merge { from, version: _ } => {
358            if let Some(t) = parse_address(&from) {
359                transport.send(
360                    &t,
361                    GossipPdu::Envelope {
362                        from: self_addr.to_string(),
363                        version: version.clone(),
364                        state: state.clone(),
365                    },
366                );
367            }
368        }
369    }
370}
371
372fn parse_address(s: &str) -> Option<Address> {
373    Address::parse(s)
374}
375
376fn merge_state(local: &mut MembershipState, other: MembershipState) {
377    for m in other.members {
378        local.add_or_update(m);
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::member::MemberStatus;
386    use std::collections::HashMap;
387
388    /// Dispatch table keyed by listener address.
389    #[derive(Default, Clone)]
390    struct InMemNet {
391        inboxes: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<GossipPdu>>>>,
392    }
393
394    impl GossipTransport for InMemNet {
395        fn send(&self, target: &Address, pdu: GossipPdu) {
396            if let Some(tx) = self.inboxes.lock().get(&target.to_string()) {
397                let _ = tx.send(pdu);
398            }
399        }
400    }
401
402    /// Bridge an mpsc receiver into a daemon's `apply_gossip` calls.
403    fn install_inbox(net: &InMemNet, addr: &Address, handle: &ClusterDaemonHandle) {
404        let (tx, mut rx) = mpsc::unbounded_channel();
405        net.inboxes.lock().insert(addr.to_string(), tx);
406        let cmd = handle.cmd.clone();
407        tokio::spawn(async move {
408            while let Some(p) = rx.recv().await {
409                let _ = cmd.send(DaemonCmd::ApplyGossip(p));
410            }
411        });
412    }
413
414    #[tokio::test]
415    async fn two_daemons_exchange_membership_via_gossip() {
416        let net = InMemNet::default();
417        let bus_a = ClusterEventBus::new();
418        let bus_b = ClusterEventBus::new();
419        let addr_a = Address::local("nodeA");
420        let addr_b = Address::local("nodeB");
421
422        let cfg = DaemonConfig { gossip_interval: Duration::from_millis(50) };
423        let a = spawn_daemon(addr_a.clone(), Arc::new(net.clone()), bus_a.clone(), cfg.clone());
424        let b = spawn_daemon(addr_b.clone(), Arc::new(net.clone()), bus_b.clone(), cfg);
425        install_inbox(&net, &addr_a, &a);
426        install_inbox(&net, &addr_b, &b);
427
428        // Each node "joins" itself.
429        a.join(Member::new(addr_a.clone(), vec![]));
430        b.join(Member::new(addr_b.clone(), vec![]));
431        // Inject knowledge of B into A so A's gossip target picker has a peer.
432        a.join(Member::new(addr_b.clone(), vec![]));
433        b.join(Member::new(addr_a.clone(), vec![]));
434
435        // Force a few ticks.
436        for _ in 0..6 {
437            a.tick();
438            b.tick();
439            tokio::time::sleep(Duration::from_millis(10)).await;
440        }
441
442        let snap_a = a.snapshot();
443        let snap_b = b.snapshot();
444        assert!(snap_a.state.member_count() >= 1);
445        assert!(snap_b.state.member_count() >= 1);
446        // Eventually both are converged and Up.
447        assert!(snap_a
448            .state
449            .members
450            .iter()
451            .any(|m| m.address == addr_a && matches!(m.status, MemberStatus::Up | MemberStatus::Joining)));
452        a.shutdown().await;
453        b.shutdown().await;
454    }
455
456    #[tokio::test]
457    async fn leader_change_event_published() {
458        let net = InMemNet::default();
459        let bus = ClusterEventBus::new();
460        let captured = Arc::new(Mutex::new(Vec::new()));
461        let c2 = captured.clone();
462        let _h = bus.subscribe(move |e| {
463            if let crate::events::ClusterEvent::LeaderChanged { .. } = e {
464                c2.lock().push(e.clone())
465            }
466        });
467        let addr = Address::local("only");
468        let cfg = DaemonConfig { gossip_interval: Duration::from_millis(20) };
469        let d = spawn_daemon(addr.clone(), Arc::new(net.clone()), bus.clone(), cfg);
470        install_inbox(&net, &addr, &d);
471        d.join(Member::new(addr.clone(), vec![]));
472        for _ in 0..5 {
473            d.tick();
474            tokio::time::sleep(Duration::from_millis(10)).await;
475        }
476        assert!(!captured.lock().is_empty());
477        d.shutdown().await;
478    }
479}