1use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use atomr_core::actor::Address;
27use parking_lot::Mutex;
28use tokio::sync::mpsc;
29use tokio::task::JoinHandle;
30
31use crate::events::ClusterEventBus;
32use crate::gossip_pdu::{decide as gossip_decide, pick_gossip_target, GossipDecision, GossipPdu};
33use crate::leader::elect_leader;
34use crate::member::Member;
35use crate::membership::MembershipState;
36use crate::sbr::DowningStrategy;
37use crate::sbr_runtime::{SbrAction, SbrRuntime};
38use crate::vector_clock::VectorClock;
39
40pub trait GossipTransport: Send + Sync + 'static {
42 fn send(&self, target: &Address, pdu: GossipPdu);
45}
46
47#[derive(Debug)]
49pub enum DaemonCmd {
50 Join(Member),
52 Leave(Address),
54 ApplyGossip(GossipPdu),
56 Tick,
58 Shutdown,
60}
61
62#[derive(Debug, Clone)]
64pub struct DaemonConfig {
65 pub gossip_interval: Duration,
67}
68
69impl Default for DaemonConfig {
70 fn default() -> Self {
71 Self { gossip_interval: Duration::from_millis(1_000) }
72 }
73}
74
75#[derive(Debug, Clone, Default)]
77pub struct DaemonSnapshot {
78 pub state: MembershipState,
79 pub leader: Option<Address>,
80 pub version: VectorClock,
81}
82
83pub struct ClusterDaemonHandle {
85 cmd: mpsc::UnboundedSender<DaemonCmd>,
86 snapshot: Arc<Mutex<DaemonSnapshot>>,
87 join: Option<JoinHandle<()>>,
88 bus: ClusterEventBus,
89 self_addr: Address,
90}
91
92impl ClusterDaemonHandle {
93 pub fn join(&self, m: Member) {
94 let _ = self.cmd.send(DaemonCmd::Join(m));
95 }
96 pub fn leave(&self, addr: Address) {
97 let _ = self.cmd.send(DaemonCmd::Leave(addr));
98 }
99 pub fn apply_gossip(&self, pdu: GossipPdu) {
100 let _ = self.cmd.send(DaemonCmd::ApplyGossip(pdu));
101 }
102 pub fn tick(&self) {
103 let _ = self.cmd.send(DaemonCmd::Tick);
104 }
105 pub fn snapshot(&self) -> DaemonSnapshot {
106 self.snapshot.lock().clone()
107 }
108 pub fn events(&self) -> &ClusterEventBus {
109 &self.bus
110 }
111 pub fn address(&self) -> &Address {
112 &self.self_addr
113 }
114 pub fn gossip_inbox(&self) -> mpsc::UnboundedSender<GossipPdu> {
119 let cmd = self.cmd.clone();
120 let (tx, mut rx) = mpsc::unbounded_channel::<GossipPdu>();
121 tokio::spawn(async move {
122 while let Some(p) = rx.recv().await {
123 let _ = cmd.send(DaemonCmd::ApplyGossip(p));
124 }
125 });
126 tx
127 }
128
129 pub async fn shutdown(mut self) {
131 let _ = self.cmd.send(DaemonCmd::Shutdown);
132 if let Some(j) = self.join.take() {
133 let _ = j.await;
134 }
135 }
136}
137
138impl Drop for ClusterDaemonHandle {
139 fn drop(&mut self) {
140 let _ = self.cmd.send(DaemonCmd::Shutdown);
141 if let Some(j) = self.join.take() {
142 j.abort();
143 }
144 }
145}
146
147pub fn spawn_daemon(
150 self_addr: Address,
151 transport: Arc<dyn GossipTransport>,
152 bus: ClusterEventBus,
153 cfg: DaemonConfig,
154) -> ClusterDaemonHandle {
155 spawn_daemon_with_sbr::<NoSbr>(self_addr, transport, bus, cfg, None)
156}
157
158pub fn spawn_daemon_with_sbr<S>(
160 self_addr: Address,
161 transport: Arc<dyn GossipTransport>,
162 bus: ClusterEventBus,
163 cfg: DaemonConfig,
164 sbr: Option<SbrRuntime<S>>,
165) -> ClusterDaemonHandle
166where
167 S: DowningStrategy + Send + 'static,
168{
169 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
170 let snapshot = Arc::new(Mutex::new(DaemonSnapshot::default()));
171 let snap2 = snapshot.clone();
172 let bus2 = bus.clone();
173 let self_addr2 = self_addr.clone();
174 let join = tokio::spawn(run_daemon::<S>(self_addr.clone(), transport, bus2, cfg, sbr, cmd_rx, snap2));
175 ClusterDaemonHandle { cmd: cmd_tx, snapshot, join: Some(join), bus, self_addr: self_addr2 }
176}
177
178pub struct NoSbr;
180
181impl DowningStrategy for NoSbr {
182 fn decide(&self, _r: &[&Member], _u: &[&Member]) -> crate::sbr::DowningDecision {
183 crate::sbr::DowningDecision::Stay
184 }
185}
186
187async fn run_daemon<S>(
188 self_addr: Address,
189 transport: Arc<dyn GossipTransport>,
190 bus: ClusterEventBus,
191 cfg: DaemonConfig,
192 mut sbr: Option<SbrRuntime<S>>,
193 mut cmd_rx: mpsc::UnboundedReceiver<DaemonCmd>,
194 snapshot: Arc<Mutex<DaemonSnapshot>>,
195) where
196 S: DowningStrategy + Send + 'static,
197{
198 let mut state = MembershipState::new();
199 let mut version = VectorClock::new();
200 let mut last_leader: Option<Address> = None;
201 let mut cursor: usize = 0;
202 let mut ticker = tokio::time::interval(cfg.gossip_interval);
203 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
204
205 loop {
206 tokio::select! {
207 biased;
208 cmd = cmd_rx.recv() => match cmd {
209 None => break,
210 Some(DaemonCmd::Shutdown) => break,
211 Some(DaemonCmd::Join(m)) => {
212 let evt = state.join(m);
213 version.tick(self_addr.to_string().as_str());
214 bus.publish(evt);
215 }
216 Some(DaemonCmd::Leave(addr)) => {
217 if let Some(evt) = state.leave(&addr) {
218 version.tick(self_addr.to_string().as_str());
219 bus.publish(evt);
220 }
221 }
222 Some(DaemonCmd::ApplyGossip(pdu)) => {
223 handle_pdu(&self_addr, &transport, &bus, &mut state, &mut version, pdu);
224 }
225 Some(DaemonCmd::Tick) => {
226 do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
227 &mut sbr, &mut last_leader, &mut cursor);
228 }
229 },
230 _ = ticker.tick() => {
231 do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
232 &mut sbr, &mut last_leader, &mut cursor);
233 }
234 }
235 let leader = elect_leader(&state);
237 *snapshot.lock() = DaemonSnapshot { state: state.clone(), leader, version: version.clone() };
238 }
239}
240
241#[allow(clippy::too_many_arguments)]
242fn do_tick<S>(
243 self_addr: &Address,
244 transport: &Arc<dyn GossipTransport>,
245 bus: &ClusterEventBus,
246 state: &mut MembershipState,
247 version: &mut VectorClock,
248 sbr: &mut Option<SbrRuntime<S>>,
249 last_leader: &mut Option<Address>,
250 cursor: &mut usize,
251) where
252 S: DowningStrategy + Send + 'static,
253{
254 let evts = state.apply_leader_actions();
256 let mutated = !evts.is_empty();
257 for e in evts {
258 bus.publish(e);
259 }
260 if mutated {
261 version.tick(self_addr.to_string().as_str());
262 }
263
264 let leader_now = elect_leader(state);
266 if leader_now != *last_leader {
267 bus.publish(crate::events::ClusterEvent::LeaderChanged {
268 from: last_leader.clone(),
269 to: leader_now.clone(),
270 });
271 *last_leader = leader_now;
272 }
273
274 if let Some(rt) = sbr.as_mut() {
276 match rt.tick(state, Instant::now()) {
277 SbrAction::None | SbrAction::DownSelf => {}
278 SbrAction::DownUnreachable(addrs) | SbrAction::DownAll(addrs) => {
279 for a in addrs {
280 if let Some(m) = state.members.iter_mut().find(|m| m.address.to_string() == a) {
281 m.status = crate::member::MemberStatus::Down;
282 }
283 }
284 version.tick(self_addr.to_string().as_str());
285 }
286 }
287 }
288
289 let peers: Vec<Address> = state.members.iter().map(|m| m.address.clone()).collect();
291 if let Some(target) = pick_gossip_target(&peers, self_addr, *cursor) {
292 let pdu = GossipPdu::Status { from: self_addr.to_string(), version: version.clone() };
293 transport.send(target, pdu);
294 *cursor = cursor.wrapping_add(1);
295 }
296}
297
298fn handle_pdu(
299 self_addr: &Address,
300 transport: &Arc<dyn GossipTransport>,
301 bus: &ClusterEventBus,
302 state: &mut MembershipState,
303 version: &mut VectorClock,
304 pdu: GossipPdu,
305) {
306 match pdu {
307 GossipPdu::Status { from, version: their } => {
308 let target = parse_address(&from);
309 match gossip_decide(version, &their) {
310 GossipDecision::SendEnvelope | GossipDecision::MergeBoth => {
311 if let Some(t) = &target {
312 transport.send(
313 t,
314 GossipPdu::Envelope {
315 from: self_addr.to_string(),
316 version: version.clone(),
317 state: state.clone(),
318 },
319 );
320 }
321 }
322 GossipDecision::RequestMerge => {
323 if let Some(t) = &target {
324 transport.send(
325 t,
326 GossipPdu::Merge { from: self_addr.to_string(), version: version.clone() },
327 );
328 }
329 }
330 GossipDecision::Same => {}
331 }
332 }
333 GossipPdu::Envelope { from: _, version: their, state: their_state } => {
334 merge_state(state, their_state);
336 *version = version.merge(&their);
337 let _ = bus; }
339 GossipPdu::Merge { from, version: _ } => {
340 if let Some(t) = parse_address(&from) {
341 transport.send(
342 &t,
343 GossipPdu::Envelope {
344 from: self_addr.to_string(),
345 version: version.clone(),
346 state: state.clone(),
347 },
348 );
349 }
350 }
351 }
352}
353
354fn parse_address(s: &str) -> Option<Address> {
355 Address::parse(s)
356}
357
358fn merge_state(local: &mut MembershipState, other: MembershipState) {
359 for m in other.members {
360 local.add_or_update(m);
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use crate::member::MemberStatus;
368 use std::collections::HashMap;
369
370 #[derive(Default, Clone)]
372 struct InMemNet {
373 inboxes: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<GossipPdu>>>>,
374 }
375
376 impl GossipTransport for InMemNet {
377 fn send(&self, target: &Address, pdu: GossipPdu) {
378 if let Some(tx) = self.inboxes.lock().get(&target.to_string()) {
379 let _ = tx.send(pdu);
380 }
381 }
382 }
383
384 fn install_inbox(net: &InMemNet, addr: &Address, handle: &ClusterDaemonHandle) {
386 let (tx, mut rx) = mpsc::unbounded_channel();
387 net.inboxes.lock().insert(addr.to_string(), tx);
388 let cmd = handle.cmd.clone();
389 tokio::spawn(async move {
390 while let Some(p) = rx.recv().await {
391 let _ = cmd.send(DaemonCmd::ApplyGossip(p));
392 }
393 });
394 }
395
396 #[tokio::test]
397 async fn two_daemons_exchange_membership_via_gossip() {
398 let net = InMemNet::default();
399 let bus_a = ClusterEventBus::new();
400 let bus_b = ClusterEventBus::new();
401 let addr_a = Address::local("nodeA");
402 let addr_b = Address::local("nodeB");
403
404 let cfg = DaemonConfig { gossip_interval: Duration::from_millis(50) };
405 let a = spawn_daemon(addr_a.clone(), Arc::new(net.clone()), bus_a.clone(), cfg.clone());
406 let b = spawn_daemon(addr_b.clone(), Arc::new(net.clone()), bus_b.clone(), cfg);
407 install_inbox(&net, &addr_a, &a);
408 install_inbox(&net, &addr_b, &b);
409
410 a.join(Member::new(addr_a.clone(), vec![]));
412 b.join(Member::new(addr_b.clone(), vec![]));
413 a.join(Member::new(addr_b.clone(), vec![]));
415 b.join(Member::new(addr_a.clone(), vec![]));
416
417 for _ in 0..6 {
419 a.tick();
420 b.tick();
421 tokio::time::sleep(Duration::from_millis(10)).await;
422 }
423
424 let snap_a = a.snapshot();
425 let snap_b = b.snapshot();
426 assert!(snap_a.state.member_count() >= 1);
427 assert!(snap_b.state.member_count() >= 1);
428 assert!(snap_a
430 .state
431 .members
432 .iter()
433 .any(|m| m.address == addr_a && matches!(m.status, MemberStatus::Up | MemberStatus::Joining)));
434 a.shutdown().await;
435 b.shutdown().await;
436 }
437
438 #[tokio::test]
439 async fn leader_change_event_published() {
440 let net = InMemNet::default();
441 let bus = ClusterEventBus::new();
442 let captured = Arc::new(Mutex::new(Vec::new()));
443 let c2 = captured.clone();
444 let _h = bus.subscribe(move |e| {
445 if let crate::events::ClusterEvent::LeaderChanged { .. } = e {
446 c2.lock().push(e.clone())
447 }
448 });
449 let addr = Address::local("only");
450 let cfg = DaemonConfig { gossip_interval: Duration::from_millis(20) };
451 let d = spawn_daemon(addr.clone(), Arc::new(net.clone()), bus.clone(), cfg);
452 install_inbox(&net, &addr, &d);
453 d.join(Member::new(addr.clone(), vec![]));
454 for _ in 0..5 {
455 d.tick();
456 tokio::time::sleep(Duration::from_millis(10)).await;
457 }
458 assert!(!captured.lock().is_empty());
459 d.shutdown().await;
460 }
461}