Skip to main content

atomr_cluster/
cluster_daemon.rs

1//! `ClusterDaemon` — the actor that owns [`MembershipState`] and drives
2//! the active gossip / leader-action / SBR ticks.
3//!
4//! Phase 6.C / 6.D / 6.F of `docs/full-port-plan.md`. Akka.NET parity:
5//! `Cluster/ClusterDaemon.cs`.
6//!
7//! Architecture
8//! ------------
9//! * One `tokio::task::JoinHandle` runs the main loop.
10//! * `mpsc::UnboundedSender<DaemonCmd>` is the inbox for control
11//!   messages (`Join`, `Leave`, `ApplyGossip`, `Tick`, `Shutdown`).
12//! * Per-tick the daemon
13//!   1. applies leader actions (`MembershipState::apply_leader_actions`)
14//!   2. runs the SBR runtime (if installed) and applies the resulting
15//!      `SbrAction` (currently `DownUnreachable` is the only one that
16//!      mutates state directly).
17//!   3. picks a gossip target via [`pick_gossip_target`] and emits an
18//!      outbound `GossipPdu` through a caller-supplied transport
19//!      callback. The transport is abstracted as a `dyn GossipTransport`
20//!      trait so we can plug in either the in-process [`crate::ClusterRemoteAdapter`]
21//!      or a real `atomr-remote` endpoint once Phase 5.D wires it.
22//!
23//! Tests use [`InMemoryGossipTransport`] which routes between two
24//! daemons in the same process.
25
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use atomr_core::actor::Address;
30use parking_lot::Mutex;
31use tokio::sync::mpsc;
32use tokio::task::JoinHandle;
33
34use crate::events::ClusterEventBus;
35use crate::gossip_pdu::{decide as gossip_decide, pick_gossip_target, GossipDecision, GossipPdu};
36use crate::leader::elect_leader;
37use crate::member::Member;
38use crate::membership::MembershipState;
39use crate::sbr::DowningStrategy;
40use crate::sbr_runtime::{SbrAction, SbrRuntime};
41use crate::vector_clock::VectorClock;
42
43/// Pluggable transport for gossip PDUs.
44pub trait GossipTransport: Send + Sync + 'static {
45    /// Deliver `pdu` to `target`. The transport is "best effort" —
46    /// errors must not crash the daemon.
47    fn send(&self, target: &Address, pdu: GossipPdu);
48}
49
50/// Control commands accepted by the daemon mailbox.
51#[derive(Debug)]
52pub enum DaemonCmd {
53    /// Add/update a member (Joining).
54    Join(Member),
55    /// Mark `addr` as Leaving.
56    Leave(Address),
57    /// Inject a peer's gossip PDU (called by the transport on receive).
58    ApplyGossip(GossipPdu),
59    /// Force a single tick (mostly for tests).
60    Tick,
61    /// Stop the daemon loop.
62    Shutdown,
63}
64
65/// Configuration knobs.
66#[derive(Debug, Clone)]
67pub struct DaemonConfig {
68    /// How often the daemon ticks (gossip + leader actions).
69    pub gossip_interval: Duration,
70}
71
72impl Default for DaemonConfig {
73    fn default() -> Self {
74        Self { gossip_interval: Duration::from_millis(1_000) }
75    }
76}
77
78/// Snapshot of the daemon state used by `peer_state` queries.
79#[derive(Debug, Clone, Default)]
80pub struct DaemonSnapshot {
81    pub state: MembershipState,
82    pub leader: Option<Address>,
83    pub version: VectorClock,
84}
85
86/// Public handle to a running `ClusterDaemon`.
87pub struct ClusterDaemonHandle {
88    cmd: mpsc::UnboundedSender<DaemonCmd>,
89    snapshot: Arc<Mutex<DaemonSnapshot>>,
90    join: Option<JoinHandle<()>>,
91    bus: ClusterEventBus,
92    self_addr: Address,
93}
94
95impl ClusterDaemonHandle {
96    pub fn join(&self, m: Member) {
97        let _ = self.cmd.send(DaemonCmd::Join(m));
98    }
99    pub fn leave(&self, addr: Address) {
100        let _ = self.cmd.send(DaemonCmd::Leave(addr));
101    }
102    pub fn apply_gossip(&self, pdu: GossipPdu) {
103        let _ = self.cmd.send(DaemonCmd::ApplyGossip(pdu));
104    }
105    pub fn tick(&self) {
106        let _ = self.cmd.send(DaemonCmd::Tick);
107    }
108    pub fn snapshot(&self) -> DaemonSnapshot {
109        self.snapshot.lock().clone()
110    }
111    pub fn events(&self) -> &ClusterEventBus {
112        &self.bus
113    }
114    pub fn address(&self) -> &Address {
115        &self.self_addr
116    }
117    /// Cheaply-cloneable inbox that delivers `GossipPdu`s into this
118    /// daemon. Used by transport adapters that need to fan inbound
119    /// PDUs into the daemon without holding the [`ClusterDaemonHandle`]
120    /// itself (which is consume-on-shutdown).
121    pub fn gossip_inbox(&self) -> mpsc::UnboundedSender<GossipPdu> {
122        let cmd = self.cmd.clone();
123        let (tx, mut rx) = mpsc::unbounded_channel::<GossipPdu>();
124        tokio::spawn(async move {
125            while let Some(p) = rx.recv().await {
126                let _ = cmd.send(DaemonCmd::ApplyGossip(p));
127            }
128        });
129        tx
130    }
131
132    /// Stop and join.
133    pub async fn shutdown(mut self) {
134        let _ = self.cmd.send(DaemonCmd::Shutdown);
135        if let Some(j) = self.join.take() {
136            let _ = j.await;
137        }
138    }
139}
140
141impl Drop for ClusterDaemonHandle {
142    fn drop(&mut self) {
143        let _ = self.cmd.send(DaemonCmd::Shutdown);
144        if let Some(j) = self.join.take() {
145            j.abort();
146        }
147    }
148}
149
150/// Spawn a daemon. The caller provides a transport implementation;
151/// the daemon never blocks on it.
152pub fn spawn_daemon(
153    self_addr: Address,
154    transport: Arc<dyn GossipTransport>,
155    bus: ClusterEventBus,
156    cfg: DaemonConfig,
157) -> ClusterDaemonHandle {
158    spawn_daemon_with_sbr::<NoSbr>(self_addr, transport, bus, cfg, None)
159}
160
161/// Same as [`spawn_daemon`] but installs an SBR runtime.
162pub fn spawn_daemon_with_sbr<S>(
163    self_addr: Address,
164    transport: Arc<dyn GossipTransport>,
165    bus: ClusterEventBus,
166    cfg: DaemonConfig,
167    sbr: Option<SbrRuntime<S>>,
168) -> ClusterDaemonHandle
169where
170    S: DowningStrategy + Send + 'static,
171{
172    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
173    let snapshot = Arc::new(Mutex::new(DaemonSnapshot::default()));
174    let snap2 = snapshot.clone();
175    let bus2 = bus.clone();
176    let self_addr2 = self_addr.clone();
177    let join = tokio::spawn(run_daemon::<S>(self_addr.clone(), transport, bus2, cfg, sbr, cmd_rx, snap2));
178    ClusterDaemonHandle { cmd: cmd_tx, snapshot, join: Some(join), bus, self_addr: self_addr2 }
179}
180
181/// Marker `DowningStrategy` used when no SBR runtime is installed.
182pub struct NoSbr;
183
184impl DowningStrategy for NoSbr {
185    fn decide(&self, _r: &[&Member], _u: &[&Member]) -> crate::sbr::DowningDecision {
186        crate::sbr::DowningDecision::Stay
187    }
188}
189
190async fn run_daemon<S>(
191    self_addr: Address,
192    transport: Arc<dyn GossipTransport>,
193    bus: ClusterEventBus,
194    cfg: DaemonConfig,
195    mut sbr: Option<SbrRuntime<S>>,
196    mut cmd_rx: mpsc::UnboundedReceiver<DaemonCmd>,
197    snapshot: Arc<Mutex<DaemonSnapshot>>,
198) where
199    S: DowningStrategy + Send + 'static,
200{
201    let mut state = MembershipState::new();
202    let mut version = VectorClock::new();
203    let mut last_leader: Option<Address> = None;
204    let mut cursor: usize = 0;
205    let mut ticker = tokio::time::interval(cfg.gossip_interval);
206    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
207
208    loop {
209        tokio::select! {
210            biased;
211            cmd = cmd_rx.recv() => match cmd {
212                None => break,
213                Some(DaemonCmd::Shutdown) => break,
214                Some(DaemonCmd::Join(m)) => {
215                    let evt = state.join(m);
216                    version.tick(self_addr.to_string().as_str());
217                    bus.publish(evt);
218                }
219                Some(DaemonCmd::Leave(addr)) => {
220                    if let Some(evt) = state.leave(&addr) {
221                        version.tick(self_addr.to_string().as_str());
222                        bus.publish(evt);
223                    }
224                }
225                Some(DaemonCmd::ApplyGossip(pdu)) => {
226                    handle_pdu(&self_addr, &transport, &bus, &mut state, &mut version, pdu);
227                }
228                Some(DaemonCmd::Tick) => {
229                    do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
230                            &mut sbr, &mut last_leader, &mut cursor);
231                }
232            },
233            _ = ticker.tick() => {
234                do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
235                        &mut sbr, &mut last_leader, &mut cursor);
236            }
237        }
238        // Update snapshot.
239        let leader = elect_leader(&state);
240        *snapshot.lock() = DaemonSnapshot { state: state.clone(), leader, version: version.clone() };
241    }
242}
243
244#[allow(clippy::too_many_arguments)]
245fn do_tick<S>(
246    self_addr: &Address,
247    transport: &Arc<dyn GossipTransport>,
248    bus: &ClusterEventBus,
249    state: &mut MembershipState,
250    version: &mut VectorClock,
251    sbr: &mut Option<SbrRuntime<S>>,
252    last_leader: &mut Option<Address>,
253    cursor: &mut usize,
254) where
255    S: DowningStrategy + Send + 'static,
256{
257    // 1) Leader actions.
258    let evts = state.apply_leader_actions();
259    let mutated = !evts.is_empty();
260    for e in evts {
261        bus.publish(e);
262    }
263    if mutated {
264        version.tick(self_addr.to_string().as_str());
265    }
266
267    // 2) Leader change?
268    let leader_now = elect_leader(state);
269    if leader_now != *last_leader {
270        bus.publish(crate::events::ClusterEvent::LeaderChanged {
271            from: last_leader.clone(),
272            to: leader_now.clone(),
273        });
274        *last_leader = leader_now;
275    }
276
277    // 3) SBR.
278    if let Some(rt) = sbr.as_mut() {
279        match rt.tick(state, Instant::now()) {
280            SbrAction::None | SbrAction::DownSelf => {}
281            SbrAction::DownUnreachable(addrs) | SbrAction::DownAll(addrs) => {
282                for a in addrs {
283                    if let Some(m) = state.members.iter_mut().find(|m| m.address.to_string() == a) {
284                        m.status = crate::member::MemberStatus::Down;
285                    }
286                }
287                version.tick(self_addr.to_string().as_str());
288            }
289        }
290    }
291
292    // 4) Active gossip dissemination.
293    let peers: Vec<Address> = state.members.iter().map(|m| m.address.clone()).collect();
294    if let Some(target) = pick_gossip_target(&peers, self_addr, *cursor) {
295        let pdu = GossipPdu::Status { from: self_addr.to_string(), version: version.clone() };
296        transport.send(target, pdu);
297        *cursor = cursor.wrapping_add(1);
298    }
299}
300
301fn handle_pdu(
302    self_addr: &Address,
303    transport: &Arc<dyn GossipTransport>,
304    bus: &ClusterEventBus,
305    state: &mut MembershipState,
306    version: &mut VectorClock,
307    pdu: GossipPdu,
308) {
309    match pdu {
310        GossipPdu::Status { from, version: their } => {
311            let target = parse_address(&from);
312            match gossip_decide(version, &their) {
313                GossipDecision::SendEnvelope | GossipDecision::MergeBoth => {
314                    if let Some(t) = &target {
315                        transport.send(
316                            t,
317                            GossipPdu::Envelope {
318                                from: self_addr.to_string(),
319                                version: version.clone(),
320                                state: state.clone(),
321                            },
322                        );
323                    }
324                }
325                GossipDecision::RequestMerge => {
326                    if let Some(t) = &target {
327                        transport.send(
328                            t,
329                            GossipPdu::Merge { from: self_addr.to_string(), version: version.clone() },
330                        );
331                    }
332                }
333                GossipDecision::Same => {}
334            }
335        }
336        GossipPdu::Envelope { from: _, version: their, state: their_state } => {
337            // Naive merge: union members, prefer "later" status order, merge reachability.
338            merge_state(state, their_state);
339            *version = version.merge(&their);
340            let _ = bus; // events published via leader-action path on next tick
341        }
342        GossipPdu::Merge { from, version: _ } => {
343            if let Some(t) = parse_address(&from) {
344                transport.send(
345                    &t,
346                    GossipPdu::Envelope {
347                        from: self_addr.to_string(),
348                        version: version.clone(),
349                        state: state.clone(),
350                    },
351                );
352            }
353        }
354    }
355}
356
357fn parse_address(s: &str) -> Option<Address> {
358    Address::parse(s)
359}
360
361fn merge_state(local: &mut MembershipState, other: MembershipState) {
362    for m in other.members {
363        local.add_or_update(m);
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use crate::member::MemberStatus;
371    use std::collections::HashMap;
372
373    /// Dispatch table keyed by listener address.
374    #[derive(Default, Clone)]
375    struct InMemNet {
376        inboxes: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<GossipPdu>>>>,
377    }
378
379    impl GossipTransport for InMemNet {
380        fn send(&self, target: &Address, pdu: GossipPdu) {
381            if let Some(tx) = self.inboxes.lock().get(&target.to_string()) {
382                let _ = tx.send(pdu);
383            }
384        }
385    }
386
387    /// Bridge an mpsc receiver into a daemon's `apply_gossip` calls.
388    fn install_inbox(net: &InMemNet, addr: &Address, handle: &ClusterDaemonHandle) {
389        let (tx, mut rx) = mpsc::unbounded_channel();
390        net.inboxes.lock().insert(addr.to_string(), tx);
391        let cmd = handle.cmd.clone();
392        tokio::spawn(async move {
393            while let Some(p) = rx.recv().await {
394                let _ = cmd.send(DaemonCmd::ApplyGossip(p));
395            }
396        });
397    }
398
399    #[tokio::test]
400    async fn two_daemons_exchange_membership_via_gossip() {
401        let net = InMemNet::default();
402        let bus_a = ClusterEventBus::new();
403        let bus_b = ClusterEventBus::new();
404        let addr_a = Address::local("nodeA");
405        let addr_b = Address::local("nodeB");
406
407        let cfg = DaemonConfig { gossip_interval: Duration::from_millis(50) };
408        let a = spawn_daemon(addr_a.clone(), Arc::new(net.clone()), bus_a.clone(), cfg.clone());
409        let b = spawn_daemon(addr_b.clone(), Arc::new(net.clone()), bus_b.clone(), cfg);
410        install_inbox(&net, &addr_a, &a);
411        install_inbox(&net, &addr_b, &b);
412
413        // Each node "joins" itself.
414        a.join(Member::new(addr_a.clone(), vec![]));
415        b.join(Member::new(addr_b.clone(), vec![]));
416        // Inject knowledge of B into A so A's gossip target picker has a peer.
417        a.join(Member::new(addr_b.clone(), vec![]));
418        b.join(Member::new(addr_a.clone(), vec![]));
419
420        // Force a few ticks.
421        for _ in 0..6 {
422            a.tick();
423            b.tick();
424            tokio::time::sleep(Duration::from_millis(10)).await;
425        }
426
427        let snap_a = a.snapshot();
428        let snap_b = b.snapshot();
429        assert!(snap_a.state.member_count() >= 1);
430        assert!(snap_b.state.member_count() >= 1);
431        // Eventually both are converged and Up.
432        assert!(snap_a
433            .state
434            .members
435            .iter()
436            .any(|m| m.address == addr_a && matches!(m.status, MemberStatus::Up | MemberStatus::Joining)));
437        a.shutdown().await;
438        b.shutdown().await;
439    }
440
441    #[tokio::test]
442    async fn leader_change_event_published() {
443        let net = InMemNet::default();
444        let bus = ClusterEventBus::new();
445        let captured = Arc::new(Mutex::new(Vec::new()));
446        let c2 = captured.clone();
447        let _h = bus.subscribe(move |e| {
448            if let crate::events::ClusterEvent::LeaderChanged { .. } = e {
449                c2.lock().push(e.clone())
450            }
451        });
452        let addr = Address::local("only");
453        let cfg = DaemonConfig { gossip_interval: Duration::from_millis(20) };
454        let d = spawn_daemon(addr.clone(), Arc::new(net.clone()), bus.clone(), cfg);
455        install_inbox(&net, &addr, &d);
456        d.join(Member::new(addr.clone(), vec![]));
457        for _ in 0..5 {
458            d.tick();
459            tokio::time::sleep(Duration::from_millis(10)).await;
460        }
461        assert!(!captured.lock().is_empty());
462        d.shutdown().await;
463    }
464}