1use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use atomr_core::actor::Address;
27use parking_lot::Mutex;
28use tokio::sync::mpsc;
29use tokio::task::JoinHandle;
30
31use crate::events::ClusterEventBus;
32use crate::gossip_pdu::{decide as gossip_decide, pick_gossip_target, GossipDecision, GossipPdu};
33use crate::leader::elect_leader;
34use crate::member::Member;
35use crate::membership::MembershipState;
36use crate::sbr::DowningStrategy;
37use crate::sbr_runtime::{SbrAction, SbrRuntime};
38use crate::vector_clock::VectorClock;
39
40pub trait GossipTransport: Send + Sync + 'static {
42 fn send(&self, target: &Address, pdu: GossipPdu);
45}
46
47#[derive(Debug)]
49pub enum DaemonCmd {
50 Join(Member),
52 Leave(Address),
54 Down(Address),
59 ApplyGossip(GossipPdu),
61 Tick,
63 Shutdown,
65}
66
67#[derive(Debug, Clone)]
69pub struct DaemonConfig {
70 pub gossip_interval: Duration,
72}
73
74impl Default for DaemonConfig {
75 fn default() -> Self {
76 Self { gossip_interval: Duration::from_millis(1_000) }
77 }
78}
79
80#[derive(Debug, Clone, Default)]
82pub struct DaemonSnapshot {
83 pub state: MembershipState,
84 pub leader: Option<Address>,
85 pub version: VectorClock,
86}
87
88pub struct ClusterDaemonHandle {
90 cmd: mpsc::UnboundedSender<DaemonCmd>,
91 snapshot: Arc<Mutex<DaemonSnapshot>>,
92 join: Option<JoinHandle<()>>,
93 bus: ClusterEventBus,
94 self_addr: Address,
95}
96
97impl ClusterDaemonHandle {
98 pub fn join(&self, m: Member) {
99 let _ = self.cmd.send(DaemonCmd::Join(m));
100 }
101 pub fn leave(&self, addr: Address) {
102 let _ = self.cmd.send(DaemonCmd::Leave(addr));
103 }
104 pub fn down(&self, addr: Address) {
109 let _ = self.cmd.send(DaemonCmd::Down(addr));
110 }
111 pub fn apply_gossip(&self, pdu: GossipPdu) {
112 let _ = self.cmd.send(DaemonCmd::ApplyGossip(pdu));
113 }
114 pub fn tick(&self) {
115 let _ = self.cmd.send(DaemonCmd::Tick);
116 }
117 pub fn snapshot(&self) -> DaemonSnapshot {
118 self.snapshot.lock().clone()
119 }
120 pub fn events(&self) -> &ClusterEventBus {
121 &self.bus
122 }
123 pub fn address(&self) -> &Address {
124 &self.self_addr
125 }
126 pub fn gossip_inbox(&self) -> mpsc::UnboundedSender<GossipPdu> {
131 let cmd = self.cmd.clone();
132 let (tx, mut rx) = mpsc::unbounded_channel::<GossipPdu>();
133 tokio::spawn(async move {
134 while let Some(p) = rx.recv().await {
135 let _ = cmd.send(DaemonCmd::ApplyGossip(p));
136 }
137 });
138 tx
139 }
140
141 pub async fn shutdown(mut self) {
143 let _ = self.cmd.send(DaemonCmd::Shutdown);
144 if let Some(j) = self.join.take() {
145 let _ = j.await;
146 }
147 }
148}
149
150impl Drop for ClusterDaemonHandle {
151 fn drop(&mut self) {
152 let _ = self.cmd.send(DaemonCmd::Shutdown);
153 if let Some(j) = self.join.take() {
154 j.abort();
155 }
156 }
157}
158
159pub fn spawn_daemon(
162 self_addr: Address,
163 transport: Arc<dyn GossipTransport>,
164 bus: ClusterEventBus,
165 cfg: DaemonConfig,
166) -> ClusterDaemonHandle {
167 spawn_daemon_with_sbr::<NoSbr>(self_addr, transport, bus, cfg, None)
168}
169
170pub fn spawn_daemon_with_sbr<S>(
172 self_addr: Address,
173 transport: Arc<dyn GossipTransport>,
174 bus: ClusterEventBus,
175 cfg: DaemonConfig,
176 sbr: Option<SbrRuntime<S>>,
177) -> ClusterDaemonHandle
178where
179 S: DowningStrategy + Send + 'static,
180{
181 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
182 let snapshot = Arc::new(Mutex::new(DaemonSnapshot::default()));
183 let snap2 = snapshot.clone();
184 let bus2 = bus.clone();
185 let self_addr2 = self_addr.clone();
186 let join = tokio::spawn(run_daemon::<S>(self_addr.clone(), transport, bus2, cfg, sbr, cmd_rx, snap2));
187 ClusterDaemonHandle { cmd: cmd_tx, snapshot, join: Some(join), bus, self_addr: self_addr2 }
188}
189
190pub struct NoSbr;
192
193impl DowningStrategy for NoSbr {
194 fn decide(&self, _r: &[&Member], _u: &[&Member]) -> crate::sbr::DowningDecision {
195 crate::sbr::DowningDecision::Stay
196 }
197}
198
199async fn run_daemon<S>(
200 self_addr: Address,
201 transport: Arc<dyn GossipTransport>,
202 bus: ClusterEventBus,
203 cfg: DaemonConfig,
204 mut sbr: Option<SbrRuntime<S>>,
205 mut cmd_rx: mpsc::UnboundedReceiver<DaemonCmd>,
206 snapshot: Arc<Mutex<DaemonSnapshot>>,
207) where
208 S: DowningStrategy + Send + 'static,
209{
210 let mut state = MembershipState::new();
211 let mut version = VectorClock::new();
212 let mut last_leader: Option<Address> = None;
213 let mut cursor: usize = 0;
214 let mut ticker = tokio::time::interval(cfg.gossip_interval);
215 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
216
217 loop {
218 tokio::select! {
219 biased;
220 cmd = cmd_rx.recv() => match cmd {
221 None => break,
222 Some(DaemonCmd::Shutdown) => break,
223 Some(DaemonCmd::Join(m)) => {
224 let evt = state.join(m);
225 version.tick(self_addr.to_string().as_str());
226 bus.publish(evt);
227 }
228 Some(DaemonCmd::Leave(addr)) => {
229 if let Some(evt) = state.leave(&addr) {
230 version.tick(self_addr.to_string().as_str());
231 bus.publish(evt);
232 }
233 }
234 Some(DaemonCmd::Down(addr)) => {
235 if let Some(evt) = state.down(&addr) {
236 version.tick(self_addr.to_string().as_str());
237 bus.publish(evt);
238 }
239 }
240 Some(DaemonCmd::ApplyGossip(pdu)) => {
241 handle_pdu(&self_addr, &transport, &bus, &mut state, &mut version, pdu);
242 }
243 Some(DaemonCmd::Tick) => {
244 do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
245 &mut sbr, &mut last_leader, &mut cursor);
246 }
247 },
248 _ = ticker.tick() => {
249 do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
250 &mut sbr, &mut last_leader, &mut cursor);
251 }
252 }
253 let leader = elect_leader(&state);
255 *snapshot.lock() = DaemonSnapshot { state: state.clone(), leader, version: version.clone() };
256 }
257}
258
259#[allow(clippy::too_many_arguments)]
260fn do_tick<S>(
261 self_addr: &Address,
262 transport: &Arc<dyn GossipTransport>,
263 bus: &ClusterEventBus,
264 state: &mut MembershipState,
265 version: &mut VectorClock,
266 sbr: &mut Option<SbrRuntime<S>>,
267 last_leader: &mut Option<Address>,
268 cursor: &mut usize,
269) where
270 S: DowningStrategy + Send + 'static,
271{
272 let evts = state.apply_leader_actions();
274 let mutated = !evts.is_empty();
275 for e in evts {
276 bus.publish(e);
277 }
278 if mutated {
279 version.tick(self_addr.to_string().as_str());
280 }
281
282 let leader_now = elect_leader(state);
284 if leader_now != *last_leader {
285 bus.publish(crate::events::ClusterEvent::LeaderChanged {
286 from: last_leader.clone(),
287 to: leader_now.clone(),
288 });
289 *last_leader = leader_now;
290 }
291
292 if let Some(rt) = sbr.as_mut() {
294 match rt.tick(state, Instant::now()) {
295 SbrAction::None | SbrAction::DownSelf => {}
296 SbrAction::DownUnreachable(addrs) | SbrAction::DownAll(addrs) => {
297 for a in addrs {
298 if let Some(m) = state.members.iter_mut().find(|m| m.address.to_string() == a) {
299 m.status = crate::member::MemberStatus::Down;
300 }
301 }
302 version.tick(self_addr.to_string().as_str());
303 }
304 }
305 }
306
307 let peers: Vec<Address> = state.members.iter().map(|m| m.address.clone()).collect();
309 if let Some(target) = pick_gossip_target(&peers, self_addr, *cursor) {
310 let pdu = GossipPdu::Status { from: self_addr.to_string(), version: version.clone() };
311 transport.send(target, pdu);
312 *cursor = cursor.wrapping_add(1);
313 }
314}
315
316fn handle_pdu(
317 self_addr: &Address,
318 transport: &Arc<dyn GossipTransport>,
319 bus: &ClusterEventBus,
320 state: &mut MembershipState,
321 version: &mut VectorClock,
322 pdu: GossipPdu,
323) {
324 match pdu {
325 GossipPdu::Status { from, version: their } => {
326 let target = parse_address(&from);
327 match gossip_decide(version, &their) {
328 GossipDecision::SendEnvelope | GossipDecision::MergeBoth => {
329 if let Some(t) = &target {
330 transport.send(
331 t,
332 GossipPdu::Envelope {
333 from: self_addr.to_string(),
334 version: version.clone(),
335 state: state.clone(),
336 },
337 );
338 }
339 }
340 GossipDecision::RequestMerge => {
341 if let Some(t) = &target {
342 transport.send(
343 t,
344 GossipPdu::Merge { from: self_addr.to_string(), version: version.clone() },
345 );
346 }
347 }
348 GossipDecision::Same => {}
349 }
350 }
351 GossipPdu::Envelope { from: _, version: their, state: their_state } => {
352 merge_state(state, their_state);
354 *version = version.merge(&their);
355 let _ = bus; }
357 GossipPdu::Merge { from, version: _ } => {
358 if let Some(t) = parse_address(&from) {
359 transport.send(
360 &t,
361 GossipPdu::Envelope {
362 from: self_addr.to_string(),
363 version: version.clone(),
364 state: state.clone(),
365 },
366 );
367 }
368 }
369 }
370}
371
372fn parse_address(s: &str) -> Option<Address> {
373 Address::parse(s)
374}
375
376fn merge_state(local: &mut MembershipState, other: MembershipState) {
377 for m in other.members {
378 local.add_or_update(m);
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::member::MemberStatus;
386 use std::collections::HashMap;
387
388 #[derive(Default, Clone)]
390 struct InMemNet {
391 inboxes: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<GossipPdu>>>>,
392 }
393
394 impl GossipTransport for InMemNet {
395 fn send(&self, target: &Address, pdu: GossipPdu) {
396 if let Some(tx) = self.inboxes.lock().get(&target.to_string()) {
397 let _ = tx.send(pdu);
398 }
399 }
400 }
401
402 fn install_inbox(net: &InMemNet, addr: &Address, handle: &ClusterDaemonHandle) {
404 let (tx, mut rx) = mpsc::unbounded_channel();
405 net.inboxes.lock().insert(addr.to_string(), tx);
406 let cmd = handle.cmd.clone();
407 tokio::spawn(async move {
408 while let Some(p) = rx.recv().await {
409 let _ = cmd.send(DaemonCmd::ApplyGossip(p));
410 }
411 });
412 }
413
414 #[tokio::test]
415 async fn two_daemons_exchange_membership_via_gossip() {
416 let net = InMemNet::default();
417 let bus_a = ClusterEventBus::new();
418 let bus_b = ClusterEventBus::new();
419 let addr_a = Address::local("nodeA");
420 let addr_b = Address::local("nodeB");
421
422 let cfg = DaemonConfig { gossip_interval: Duration::from_millis(50) };
423 let a = spawn_daemon(addr_a.clone(), Arc::new(net.clone()), bus_a.clone(), cfg.clone());
424 let b = spawn_daemon(addr_b.clone(), Arc::new(net.clone()), bus_b.clone(), cfg);
425 install_inbox(&net, &addr_a, &a);
426 install_inbox(&net, &addr_b, &b);
427
428 a.join(Member::new(addr_a.clone(), vec![]));
430 b.join(Member::new(addr_b.clone(), vec![]));
431 a.join(Member::new(addr_b.clone(), vec![]));
433 b.join(Member::new(addr_a.clone(), vec![]));
434
435 for _ in 0..6 {
437 a.tick();
438 b.tick();
439 tokio::time::sleep(Duration::from_millis(10)).await;
440 }
441
442 let snap_a = a.snapshot();
443 let snap_b = b.snapshot();
444 assert!(snap_a.state.member_count() >= 1);
445 assert!(snap_b.state.member_count() >= 1);
446 assert!(snap_a
448 .state
449 .members
450 .iter()
451 .any(|m| m.address == addr_a && matches!(m.status, MemberStatus::Up | MemberStatus::Joining)));
452 a.shutdown().await;
453 b.shutdown().await;
454 }
455
456 #[tokio::test]
457 async fn leader_change_event_published() {
458 let net = InMemNet::default();
459 let bus = ClusterEventBus::new();
460 let captured = Arc::new(Mutex::new(Vec::new()));
461 let c2 = captured.clone();
462 let _h = bus.subscribe(move |e| {
463 if let crate::events::ClusterEvent::LeaderChanged { .. } = e {
464 c2.lock().push(e.clone())
465 }
466 });
467 let addr = Address::local("only");
468 let cfg = DaemonConfig { gossip_interval: Duration::from_millis(20) };
469 let d = spawn_daemon(addr.clone(), Arc::new(net.clone()), bus.clone(), cfg);
470 install_inbox(&net, &addr, &d);
471 d.join(Member::new(addr.clone(), vec![]));
472 for _ in 0..5 {
473 d.tick();
474 tokio::time::sleep(Duration::from_millis(10)).await;
475 }
476 assert!(!captured.lock().is_empty());
477 d.shutdown().await;
478 }
479}