Skip to main content

atomr_cluster/
cluster_daemon.rs

1//! `ClusterDaemon` — the actor that owns [`MembershipState`] and drives
2//! the active gossip / leader-action / SBR ticks.
3//!
4//! Architecture
5//! ------------
6//! * One `tokio::task::JoinHandle` runs the main loop.
7//! * `mpsc::UnboundedSender<DaemonCmd>` is the inbox for control
8//!   messages (`Join`, `Leave`, `ApplyGossip`, `Tick`, `Shutdown`).
9//! * Per-tick the daemon
10//!   1. applies leader actions (`MembershipState::apply_leader_actions`)
11//!   2. runs the SBR runtime (if installed) and applies the resulting
12//!      `SbrAction` (currently `DownUnreachable` is the only one that
13//!      mutates state directly).
14//!   3. picks a gossip target via [`pick_gossip_target`] and emits an
15//!      outbound `GossipPdu` through a caller-supplied transport
16//!      callback. The transport is abstracted as a `dyn GossipTransport`
17//!      trait so we can plug in either the in-process [`crate::ClusterRemoteAdapter`]
18//!      or a real `atomr-remote` endpoint once Phase 5.D wires it.
19//!
20//! Tests use [`InMemoryGossipTransport`] which routes between two
21//! daemons in the same process.
22
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use atomr_core::actor::Address;
27use parking_lot::Mutex;
28use tokio::sync::mpsc;
29use tokio::task::JoinHandle;
30
31use crate::events::ClusterEventBus;
32use crate::gossip_pdu::{decide as gossip_decide, pick_gossip_target, GossipDecision, GossipPdu};
33use crate::leader::elect_leader;
34use crate::member::Member;
35use crate::membership::MembershipState;
36use crate::sbr::DowningStrategy;
37use crate::sbr_runtime::{SbrAction, SbrRuntime};
38use crate::vector_clock::VectorClock;
39
40/// Pluggable transport for gossip PDUs.
41pub trait GossipTransport: Send + Sync + 'static {
42    /// Deliver `pdu` to `target`. The transport is "best effort" —
43    /// errors must not crash the daemon.
44    fn send(&self, target: &Address, pdu: GossipPdu);
45}
46
47/// Control commands accepted by the daemon mailbox.
48#[derive(Debug)]
49pub enum DaemonCmd {
50    /// Add/update a member (Joining).
51    Join(Member),
52    /// Mark `addr` as Leaving.
53    Leave(Address),
54    /// Inject a peer's gossip PDU (called by the transport on receive).
55    ApplyGossip(GossipPdu),
56    /// Force a single tick (mostly for tests).
57    Tick,
58    /// Stop the daemon loop.
59    Shutdown,
60}
61
62/// Configuration knobs.
63#[derive(Debug, Clone)]
64pub struct DaemonConfig {
65    /// How often the daemon ticks (gossip + leader actions).
66    pub gossip_interval: Duration,
67}
68
69impl Default for DaemonConfig {
70    fn default() -> Self {
71        Self { gossip_interval: Duration::from_millis(1_000) }
72    }
73}
74
75/// Snapshot of the daemon state used by `peer_state` queries.
76#[derive(Debug, Clone, Default)]
77pub struct DaemonSnapshot {
78    pub state: MembershipState,
79    pub leader: Option<Address>,
80    pub version: VectorClock,
81}
82
83/// Public handle to a running `ClusterDaemon`.
84pub struct ClusterDaemonHandle {
85    cmd: mpsc::UnboundedSender<DaemonCmd>,
86    snapshot: Arc<Mutex<DaemonSnapshot>>,
87    join: Option<JoinHandle<()>>,
88    bus: ClusterEventBus,
89    self_addr: Address,
90}
91
92impl ClusterDaemonHandle {
93    pub fn join(&self, m: Member) {
94        let _ = self.cmd.send(DaemonCmd::Join(m));
95    }
96    pub fn leave(&self, addr: Address) {
97        let _ = self.cmd.send(DaemonCmd::Leave(addr));
98    }
99    pub fn apply_gossip(&self, pdu: GossipPdu) {
100        let _ = self.cmd.send(DaemonCmd::ApplyGossip(pdu));
101    }
102    pub fn tick(&self) {
103        let _ = self.cmd.send(DaemonCmd::Tick);
104    }
105    pub fn snapshot(&self) -> DaemonSnapshot {
106        self.snapshot.lock().clone()
107    }
108    pub fn events(&self) -> &ClusterEventBus {
109        &self.bus
110    }
111    pub fn address(&self) -> &Address {
112        &self.self_addr
113    }
114    /// Cheaply-cloneable inbox that delivers `GossipPdu`s into this
115    /// daemon. Used by transport adapters that need to fan inbound
116    /// PDUs into the daemon without holding the [`ClusterDaemonHandle`]
117    /// itself (which is consume-on-shutdown).
118    pub fn gossip_inbox(&self) -> mpsc::UnboundedSender<GossipPdu> {
119        let cmd = self.cmd.clone();
120        let (tx, mut rx) = mpsc::unbounded_channel::<GossipPdu>();
121        tokio::spawn(async move {
122            while let Some(p) = rx.recv().await {
123                let _ = cmd.send(DaemonCmd::ApplyGossip(p));
124            }
125        });
126        tx
127    }
128
129    /// Stop and join.
130    pub async fn shutdown(mut self) {
131        let _ = self.cmd.send(DaemonCmd::Shutdown);
132        if let Some(j) = self.join.take() {
133            let _ = j.await;
134        }
135    }
136}
137
138impl Drop for ClusterDaemonHandle {
139    fn drop(&mut self) {
140        let _ = self.cmd.send(DaemonCmd::Shutdown);
141        if let Some(j) = self.join.take() {
142            j.abort();
143        }
144    }
145}
146
147/// Spawn a daemon. The caller provides a transport implementation;
148/// the daemon never blocks on it.
149pub fn spawn_daemon(
150    self_addr: Address,
151    transport: Arc<dyn GossipTransport>,
152    bus: ClusterEventBus,
153    cfg: DaemonConfig,
154) -> ClusterDaemonHandle {
155    spawn_daemon_with_sbr::<NoSbr>(self_addr, transport, bus, cfg, None)
156}
157
158/// Same as [`spawn_daemon`] but installs an SBR runtime.
159pub fn spawn_daemon_with_sbr<S>(
160    self_addr: Address,
161    transport: Arc<dyn GossipTransport>,
162    bus: ClusterEventBus,
163    cfg: DaemonConfig,
164    sbr: Option<SbrRuntime<S>>,
165) -> ClusterDaemonHandle
166where
167    S: DowningStrategy + Send + 'static,
168{
169    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
170    let snapshot = Arc::new(Mutex::new(DaemonSnapshot::default()));
171    let snap2 = snapshot.clone();
172    let bus2 = bus.clone();
173    let self_addr2 = self_addr.clone();
174    let join = tokio::spawn(run_daemon::<S>(self_addr.clone(), transport, bus2, cfg, sbr, cmd_rx, snap2));
175    ClusterDaemonHandle { cmd: cmd_tx, snapshot, join: Some(join), bus, self_addr: self_addr2 }
176}
177
178/// Marker `DowningStrategy` used when no SBR runtime is installed.
179pub struct NoSbr;
180
181impl DowningStrategy for NoSbr {
182    fn decide(&self, _r: &[&Member], _u: &[&Member]) -> crate::sbr::DowningDecision {
183        crate::sbr::DowningDecision::Stay
184    }
185}
186
187async fn run_daemon<S>(
188    self_addr: Address,
189    transport: Arc<dyn GossipTransport>,
190    bus: ClusterEventBus,
191    cfg: DaemonConfig,
192    mut sbr: Option<SbrRuntime<S>>,
193    mut cmd_rx: mpsc::UnboundedReceiver<DaemonCmd>,
194    snapshot: Arc<Mutex<DaemonSnapshot>>,
195) where
196    S: DowningStrategy + Send + 'static,
197{
198    let mut state = MembershipState::new();
199    let mut version = VectorClock::new();
200    let mut last_leader: Option<Address> = None;
201    let mut cursor: usize = 0;
202    let mut ticker = tokio::time::interval(cfg.gossip_interval);
203    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
204
205    loop {
206        tokio::select! {
207            biased;
208            cmd = cmd_rx.recv() => match cmd {
209                None => break,
210                Some(DaemonCmd::Shutdown) => break,
211                Some(DaemonCmd::Join(m)) => {
212                    let evt = state.join(m);
213                    version.tick(self_addr.to_string().as_str());
214                    bus.publish(evt);
215                }
216                Some(DaemonCmd::Leave(addr)) => {
217                    if let Some(evt) = state.leave(&addr) {
218                        version.tick(self_addr.to_string().as_str());
219                        bus.publish(evt);
220                    }
221                }
222                Some(DaemonCmd::ApplyGossip(pdu)) => {
223                    handle_pdu(&self_addr, &transport, &bus, &mut state, &mut version, pdu);
224                }
225                Some(DaemonCmd::Tick) => {
226                    do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
227                            &mut sbr, &mut last_leader, &mut cursor);
228                }
229            },
230            _ = ticker.tick() => {
231                do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
232                        &mut sbr, &mut last_leader, &mut cursor);
233            }
234        }
235        // Update snapshot.
236        let leader = elect_leader(&state);
237        *snapshot.lock() = DaemonSnapshot { state: state.clone(), leader, version: version.clone() };
238    }
239}
240
241#[allow(clippy::too_many_arguments)]
242fn do_tick<S>(
243    self_addr: &Address,
244    transport: &Arc<dyn GossipTransport>,
245    bus: &ClusterEventBus,
246    state: &mut MembershipState,
247    version: &mut VectorClock,
248    sbr: &mut Option<SbrRuntime<S>>,
249    last_leader: &mut Option<Address>,
250    cursor: &mut usize,
251) where
252    S: DowningStrategy + Send + 'static,
253{
254    // 1) Leader actions.
255    let evts = state.apply_leader_actions();
256    let mutated = !evts.is_empty();
257    for e in evts {
258        bus.publish(e);
259    }
260    if mutated {
261        version.tick(self_addr.to_string().as_str());
262    }
263
264    // 2) Leader change?
265    let leader_now = elect_leader(state);
266    if leader_now != *last_leader {
267        bus.publish(crate::events::ClusterEvent::LeaderChanged {
268            from: last_leader.clone(),
269            to: leader_now.clone(),
270        });
271        *last_leader = leader_now;
272    }
273
274    // 3) SBR.
275    if let Some(rt) = sbr.as_mut() {
276        match rt.tick(state, Instant::now()) {
277            SbrAction::None | SbrAction::DownSelf => {}
278            SbrAction::DownUnreachable(addrs) | SbrAction::DownAll(addrs) => {
279                for a in addrs {
280                    if let Some(m) = state.members.iter_mut().find(|m| m.address.to_string() == a) {
281                        m.status = crate::member::MemberStatus::Down;
282                    }
283                }
284                version.tick(self_addr.to_string().as_str());
285            }
286        }
287    }
288
289    // 4) Active gossip dissemination.
290    let peers: Vec<Address> = state.members.iter().map(|m| m.address.clone()).collect();
291    if let Some(target) = pick_gossip_target(&peers, self_addr, *cursor) {
292        let pdu = GossipPdu::Status { from: self_addr.to_string(), version: version.clone() };
293        transport.send(target, pdu);
294        *cursor = cursor.wrapping_add(1);
295    }
296}
297
298fn handle_pdu(
299    self_addr: &Address,
300    transport: &Arc<dyn GossipTransport>,
301    bus: &ClusterEventBus,
302    state: &mut MembershipState,
303    version: &mut VectorClock,
304    pdu: GossipPdu,
305) {
306    match pdu {
307        GossipPdu::Status { from, version: their } => {
308            let target = parse_address(&from);
309            match gossip_decide(version, &their) {
310                GossipDecision::SendEnvelope | GossipDecision::MergeBoth => {
311                    if let Some(t) = &target {
312                        transport.send(
313                            t,
314                            GossipPdu::Envelope {
315                                from: self_addr.to_string(),
316                                version: version.clone(),
317                                state: state.clone(),
318                            },
319                        );
320                    }
321                }
322                GossipDecision::RequestMerge => {
323                    if let Some(t) = &target {
324                        transport.send(
325                            t,
326                            GossipPdu::Merge { from: self_addr.to_string(), version: version.clone() },
327                        );
328                    }
329                }
330                GossipDecision::Same => {}
331            }
332        }
333        GossipPdu::Envelope { from: _, version: their, state: their_state } => {
334            // Naive merge: union members, prefer "later" status order, merge reachability.
335            merge_state(state, their_state);
336            *version = version.merge(&their);
337            let _ = bus; // events published via leader-action path on next tick
338        }
339        GossipPdu::Merge { from, version: _ } => {
340            if let Some(t) = parse_address(&from) {
341                transport.send(
342                    &t,
343                    GossipPdu::Envelope {
344                        from: self_addr.to_string(),
345                        version: version.clone(),
346                        state: state.clone(),
347                    },
348                );
349            }
350        }
351    }
352}
353
354fn parse_address(s: &str) -> Option<Address> {
355    Address::parse(s)
356}
357
358fn merge_state(local: &mut MembershipState, other: MembershipState) {
359    for m in other.members {
360        local.add_or_update(m);
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use crate::member::MemberStatus;
368    use std::collections::HashMap;
369
370    /// Dispatch table keyed by listener address.
371    #[derive(Default, Clone)]
372    struct InMemNet {
373        inboxes: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<GossipPdu>>>>,
374    }
375
376    impl GossipTransport for InMemNet {
377        fn send(&self, target: &Address, pdu: GossipPdu) {
378            if let Some(tx) = self.inboxes.lock().get(&target.to_string()) {
379                let _ = tx.send(pdu);
380            }
381        }
382    }
383
384    /// Bridge an mpsc receiver into a daemon's `apply_gossip` calls.
385    fn install_inbox(net: &InMemNet, addr: &Address, handle: &ClusterDaemonHandle) {
386        let (tx, mut rx) = mpsc::unbounded_channel();
387        net.inboxes.lock().insert(addr.to_string(), tx);
388        let cmd = handle.cmd.clone();
389        tokio::spawn(async move {
390            while let Some(p) = rx.recv().await {
391                let _ = cmd.send(DaemonCmd::ApplyGossip(p));
392            }
393        });
394    }
395
396    #[tokio::test]
397    async fn two_daemons_exchange_membership_via_gossip() {
398        let net = InMemNet::default();
399        let bus_a = ClusterEventBus::new();
400        let bus_b = ClusterEventBus::new();
401        let addr_a = Address::local("nodeA");
402        let addr_b = Address::local("nodeB");
403
404        let cfg = DaemonConfig { gossip_interval: Duration::from_millis(50) };
405        let a = spawn_daemon(addr_a.clone(), Arc::new(net.clone()), bus_a.clone(), cfg.clone());
406        let b = spawn_daemon(addr_b.clone(), Arc::new(net.clone()), bus_b.clone(), cfg);
407        install_inbox(&net, &addr_a, &a);
408        install_inbox(&net, &addr_b, &b);
409
410        // Each node "joins" itself.
411        a.join(Member::new(addr_a.clone(), vec![]));
412        b.join(Member::new(addr_b.clone(), vec![]));
413        // Inject knowledge of B into A so A's gossip target picker has a peer.
414        a.join(Member::new(addr_b.clone(), vec![]));
415        b.join(Member::new(addr_a.clone(), vec![]));
416
417        // Force a few ticks.
418        for _ in 0..6 {
419            a.tick();
420            b.tick();
421            tokio::time::sleep(Duration::from_millis(10)).await;
422        }
423
424        let snap_a = a.snapshot();
425        let snap_b = b.snapshot();
426        assert!(snap_a.state.member_count() >= 1);
427        assert!(snap_b.state.member_count() >= 1);
428        // Eventually both are converged and Up.
429        assert!(snap_a
430            .state
431            .members
432            .iter()
433            .any(|m| m.address == addr_a && matches!(m.status, MemberStatus::Up | MemberStatus::Joining)));
434        a.shutdown().await;
435        b.shutdown().await;
436    }
437
438    #[tokio::test]
439    async fn leader_change_event_published() {
440        let net = InMemNet::default();
441        let bus = ClusterEventBus::new();
442        let captured = Arc::new(Mutex::new(Vec::new()));
443        let c2 = captured.clone();
444        let _h = bus.subscribe(move |e| {
445            if let crate::events::ClusterEvent::LeaderChanged { .. } = e {
446                c2.lock().push(e.clone())
447            }
448        });
449        let addr = Address::local("only");
450        let cfg = DaemonConfig { gossip_interval: Duration::from_millis(20) };
451        let d = spawn_daemon(addr.clone(), Arc::new(net.clone()), bus.clone(), cfg);
452        install_inbox(&net, &addr, &d);
453        d.join(Member::new(addr.clone(), vec![]));
454        for _ in 0..5 {
455            d.tick();
456            tokio::time::sleep(Duration::from_millis(10)).await;
457        }
458        assert!(!captured.lock().is_empty());
459        d.shutdown().await;
460    }
461}