1use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use atomr_core::actor::Address;
30use parking_lot::Mutex;
31use tokio::sync::mpsc;
32use tokio::task::JoinHandle;
33
34use crate::events::ClusterEventBus;
35use crate::gossip_pdu::{decide as gossip_decide, pick_gossip_target, GossipDecision, GossipPdu};
36use crate::leader::elect_leader;
37use crate::member::Member;
38use crate::membership::MembershipState;
39use crate::sbr::DowningStrategy;
40use crate::sbr_runtime::{SbrAction, SbrRuntime};
41use crate::vector_clock::VectorClock;
42
43pub trait GossipTransport: Send + Sync + 'static {
45 fn send(&self, target: &Address, pdu: GossipPdu);
48}
49
50#[derive(Debug)]
52pub enum DaemonCmd {
53 Join(Member),
55 Leave(Address),
57 ApplyGossip(GossipPdu),
59 Tick,
61 Shutdown,
63}
64
65#[derive(Debug, Clone)]
67pub struct DaemonConfig {
68 pub gossip_interval: Duration,
70}
71
72impl Default for DaemonConfig {
73 fn default() -> Self {
74 Self { gossip_interval: Duration::from_millis(1_000) }
75 }
76}
77
78#[derive(Debug, Clone, Default)]
80pub struct DaemonSnapshot {
81 pub state: MembershipState,
82 pub leader: Option<Address>,
83 pub version: VectorClock,
84}
85
86pub struct ClusterDaemonHandle {
88 cmd: mpsc::UnboundedSender<DaemonCmd>,
89 snapshot: Arc<Mutex<DaemonSnapshot>>,
90 join: Option<JoinHandle<()>>,
91 bus: ClusterEventBus,
92 self_addr: Address,
93}
94
95impl ClusterDaemonHandle {
96 pub fn join(&self, m: Member) {
97 let _ = self.cmd.send(DaemonCmd::Join(m));
98 }
99 pub fn leave(&self, addr: Address) {
100 let _ = self.cmd.send(DaemonCmd::Leave(addr));
101 }
102 pub fn apply_gossip(&self, pdu: GossipPdu) {
103 let _ = self.cmd.send(DaemonCmd::ApplyGossip(pdu));
104 }
105 pub fn tick(&self) {
106 let _ = self.cmd.send(DaemonCmd::Tick);
107 }
108 pub fn snapshot(&self) -> DaemonSnapshot {
109 self.snapshot.lock().clone()
110 }
111 pub fn events(&self) -> &ClusterEventBus {
112 &self.bus
113 }
114 pub fn address(&self) -> &Address {
115 &self.self_addr
116 }
117 pub fn gossip_inbox(&self) -> mpsc::UnboundedSender<GossipPdu> {
122 let cmd = self.cmd.clone();
123 let (tx, mut rx) = mpsc::unbounded_channel::<GossipPdu>();
124 tokio::spawn(async move {
125 while let Some(p) = rx.recv().await {
126 let _ = cmd.send(DaemonCmd::ApplyGossip(p));
127 }
128 });
129 tx
130 }
131
132 pub async fn shutdown(mut self) {
134 let _ = self.cmd.send(DaemonCmd::Shutdown);
135 if let Some(j) = self.join.take() {
136 let _ = j.await;
137 }
138 }
139}
140
141impl Drop for ClusterDaemonHandle {
142 fn drop(&mut self) {
143 let _ = self.cmd.send(DaemonCmd::Shutdown);
144 if let Some(j) = self.join.take() {
145 j.abort();
146 }
147 }
148}
149
150pub fn spawn_daemon(
153 self_addr: Address,
154 transport: Arc<dyn GossipTransport>,
155 bus: ClusterEventBus,
156 cfg: DaemonConfig,
157) -> ClusterDaemonHandle {
158 spawn_daemon_with_sbr::<NoSbr>(self_addr, transport, bus, cfg, None)
159}
160
161pub fn spawn_daemon_with_sbr<S>(
163 self_addr: Address,
164 transport: Arc<dyn GossipTransport>,
165 bus: ClusterEventBus,
166 cfg: DaemonConfig,
167 sbr: Option<SbrRuntime<S>>,
168) -> ClusterDaemonHandle
169where
170 S: DowningStrategy + Send + 'static,
171{
172 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
173 let snapshot = Arc::new(Mutex::new(DaemonSnapshot::default()));
174 let snap2 = snapshot.clone();
175 let bus2 = bus.clone();
176 let self_addr2 = self_addr.clone();
177 let join = tokio::spawn(run_daemon::<S>(self_addr.clone(), transport, bus2, cfg, sbr, cmd_rx, snap2));
178 ClusterDaemonHandle { cmd: cmd_tx, snapshot, join: Some(join), bus, self_addr: self_addr2 }
179}
180
181pub struct NoSbr;
183
184impl DowningStrategy for NoSbr {
185 fn decide(&self, _r: &[&Member], _u: &[&Member]) -> crate::sbr::DowningDecision {
186 crate::sbr::DowningDecision::Stay
187 }
188}
189
190async fn run_daemon<S>(
191 self_addr: Address,
192 transport: Arc<dyn GossipTransport>,
193 bus: ClusterEventBus,
194 cfg: DaemonConfig,
195 mut sbr: Option<SbrRuntime<S>>,
196 mut cmd_rx: mpsc::UnboundedReceiver<DaemonCmd>,
197 snapshot: Arc<Mutex<DaemonSnapshot>>,
198) where
199 S: DowningStrategy + Send + 'static,
200{
201 let mut state = MembershipState::new();
202 let mut version = VectorClock::new();
203 let mut last_leader: Option<Address> = None;
204 let mut cursor: usize = 0;
205 let mut ticker = tokio::time::interval(cfg.gossip_interval);
206 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
207
208 loop {
209 tokio::select! {
210 biased;
211 cmd = cmd_rx.recv() => match cmd {
212 None => break,
213 Some(DaemonCmd::Shutdown) => break,
214 Some(DaemonCmd::Join(m)) => {
215 let evt = state.join(m);
216 version.tick(self_addr.to_string().as_str());
217 bus.publish(evt);
218 }
219 Some(DaemonCmd::Leave(addr)) => {
220 if let Some(evt) = state.leave(&addr) {
221 version.tick(self_addr.to_string().as_str());
222 bus.publish(evt);
223 }
224 }
225 Some(DaemonCmd::ApplyGossip(pdu)) => {
226 handle_pdu(&self_addr, &transport, &bus, &mut state, &mut version, pdu);
227 }
228 Some(DaemonCmd::Tick) => {
229 do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
230 &mut sbr, &mut last_leader, &mut cursor);
231 }
232 },
233 _ = ticker.tick() => {
234 do_tick(&self_addr, &transport, &bus, &mut state, &mut version,
235 &mut sbr, &mut last_leader, &mut cursor);
236 }
237 }
238 let leader = elect_leader(&state);
240 *snapshot.lock() = DaemonSnapshot { state: state.clone(), leader, version: version.clone() };
241 }
242}
243
244#[allow(clippy::too_many_arguments)]
245fn do_tick<S>(
246 self_addr: &Address,
247 transport: &Arc<dyn GossipTransport>,
248 bus: &ClusterEventBus,
249 state: &mut MembershipState,
250 version: &mut VectorClock,
251 sbr: &mut Option<SbrRuntime<S>>,
252 last_leader: &mut Option<Address>,
253 cursor: &mut usize,
254) where
255 S: DowningStrategy + Send + 'static,
256{
257 let evts = state.apply_leader_actions();
259 let mutated = !evts.is_empty();
260 for e in evts {
261 bus.publish(e);
262 }
263 if mutated {
264 version.tick(self_addr.to_string().as_str());
265 }
266
267 let leader_now = elect_leader(state);
269 if leader_now != *last_leader {
270 bus.publish(crate::events::ClusterEvent::LeaderChanged {
271 from: last_leader.clone(),
272 to: leader_now.clone(),
273 });
274 *last_leader = leader_now;
275 }
276
277 if let Some(rt) = sbr.as_mut() {
279 match rt.tick(state, Instant::now()) {
280 SbrAction::None | SbrAction::DownSelf => {}
281 SbrAction::DownUnreachable(addrs) | SbrAction::DownAll(addrs) => {
282 for a in addrs {
283 if let Some(m) = state.members.iter_mut().find(|m| m.address.to_string() == a) {
284 m.status = crate::member::MemberStatus::Down;
285 }
286 }
287 version.tick(self_addr.to_string().as_str());
288 }
289 }
290 }
291
292 let peers: Vec<Address> = state.members.iter().map(|m| m.address.clone()).collect();
294 if let Some(target) = pick_gossip_target(&peers, self_addr, *cursor) {
295 let pdu = GossipPdu::Status { from: self_addr.to_string(), version: version.clone() };
296 transport.send(target, pdu);
297 *cursor = cursor.wrapping_add(1);
298 }
299}
300
301fn handle_pdu(
302 self_addr: &Address,
303 transport: &Arc<dyn GossipTransport>,
304 bus: &ClusterEventBus,
305 state: &mut MembershipState,
306 version: &mut VectorClock,
307 pdu: GossipPdu,
308) {
309 match pdu {
310 GossipPdu::Status { from, version: their } => {
311 let target = parse_address(&from);
312 match gossip_decide(version, &their) {
313 GossipDecision::SendEnvelope | GossipDecision::MergeBoth => {
314 if let Some(t) = &target {
315 transport.send(
316 t,
317 GossipPdu::Envelope {
318 from: self_addr.to_string(),
319 version: version.clone(),
320 state: state.clone(),
321 },
322 );
323 }
324 }
325 GossipDecision::RequestMerge => {
326 if let Some(t) = &target {
327 transport.send(
328 t,
329 GossipPdu::Merge { from: self_addr.to_string(), version: version.clone() },
330 );
331 }
332 }
333 GossipDecision::Same => {}
334 }
335 }
336 GossipPdu::Envelope { from: _, version: their, state: their_state } => {
337 merge_state(state, their_state);
339 *version = version.merge(&their);
340 let _ = bus; }
342 GossipPdu::Merge { from, version: _ } => {
343 if let Some(t) = parse_address(&from) {
344 transport.send(
345 &t,
346 GossipPdu::Envelope {
347 from: self_addr.to_string(),
348 version: version.clone(),
349 state: state.clone(),
350 },
351 );
352 }
353 }
354 }
355}
356
357fn parse_address(s: &str) -> Option<Address> {
358 Address::parse(s)
359}
360
361fn merge_state(local: &mut MembershipState, other: MembershipState) {
362 for m in other.members {
363 local.add_or_update(m);
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use crate::member::MemberStatus;
371 use std::collections::HashMap;
372
373 #[derive(Default, Clone)]
375 struct InMemNet {
376 inboxes: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<GossipPdu>>>>,
377 }
378
379 impl GossipTransport for InMemNet {
380 fn send(&self, target: &Address, pdu: GossipPdu) {
381 if let Some(tx) = self.inboxes.lock().get(&target.to_string()) {
382 let _ = tx.send(pdu);
383 }
384 }
385 }
386
387 fn install_inbox(net: &InMemNet, addr: &Address, handle: &ClusterDaemonHandle) {
389 let (tx, mut rx) = mpsc::unbounded_channel();
390 net.inboxes.lock().insert(addr.to_string(), tx);
391 let cmd = handle.cmd.clone();
392 tokio::spawn(async move {
393 while let Some(p) = rx.recv().await {
394 let _ = cmd.send(DaemonCmd::ApplyGossip(p));
395 }
396 });
397 }
398
399 #[tokio::test]
400 async fn two_daemons_exchange_membership_via_gossip() {
401 let net = InMemNet::default();
402 let bus_a = ClusterEventBus::new();
403 let bus_b = ClusterEventBus::new();
404 let addr_a = Address::local("nodeA");
405 let addr_b = Address::local("nodeB");
406
407 let cfg = DaemonConfig { gossip_interval: Duration::from_millis(50) };
408 let a = spawn_daemon(addr_a.clone(), Arc::new(net.clone()), bus_a.clone(), cfg.clone());
409 let b = spawn_daemon(addr_b.clone(), Arc::new(net.clone()), bus_b.clone(), cfg);
410 install_inbox(&net, &addr_a, &a);
411 install_inbox(&net, &addr_b, &b);
412
413 a.join(Member::new(addr_a.clone(), vec![]));
415 b.join(Member::new(addr_b.clone(), vec![]));
416 a.join(Member::new(addr_b.clone(), vec![]));
418 b.join(Member::new(addr_a.clone(), vec![]));
419
420 for _ in 0..6 {
422 a.tick();
423 b.tick();
424 tokio::time::sleep(Duration::from_millis(10)).await;
425 }
426
427 let snap_a = a.snapshot();
428 let snap_b = b.snapshot();
429 assert!(snap_a.state.member_count() >= 1);
430 assert!(snap_b.state.member_count() >= 1);
431 assert!(snap_a
433 .state
434 .members
435 .iter()
436 .any(|m| m.address == addr_a && matches!(m.status, MemberStatus::Up | MemberStatus::Joining)));
437 a.shutdown().await;
438 b.shutdown().await;
439 }
440
441 #[tokio::test]
442 async fn leader_change_event_published() {
443 let net = InMemNet::default();
444 let bus = ClusterEventBus::new();
445 let captured = Arc::new(Mutex::new(Vec::new()));
446 let c2 = captured.clone();
447 let _h = bus.subscribe(move |e| {
448 if let crate::events::ClusterEvent::LeaderChanged { .. } = e {
449 c2.lock().push(e.clone())
450 }
451 });
452 let addr = Address::local("only");
453 let cfg = DaemonConfig { gossip_interval: Duration::from_millis(20) };
454 let d = spawn_daemon(addr.clone(), Arc::new(net.clone()), bus.clone(), cfg);
455 install_inbox(&net, &addr, &d);
456 d.join(Member::new(addr.clone(), vec![]));
457 for _ in 0..5 {
458 d.tick();
459 tokio::time::sleep(Duration::from_millis(10)).await;
460 }
461 assert!(!captured.lock().is_empty());
462 d.shutdown().await;
463 }
464}