network_protocol/transport/
cluster.rs1use std::collections::HashMap;
2use std::time::{Duration, Instant};
4use tokio::select;
5use tokio::sync::mpsc;
6use tokio::time::{interval, sleep};
7use tracing::{debug, info, instrument, warn};
8
9use crate::protocol::message::Message;
10use crate::service::client::Client;
11#[derive(Debug, Clone)]
14pub struct ClusterNode {
15 pub id: String,
16 pub addr: String,
17 pub last_seen: Option<Instant>,
18}
19
20pub struct Cluster {
21 peers: HashMap<String, ClusterNode>,
22 shutdown_tx: Option<mpsc::Sender<()>>,
23}
24
25impl Cluster {
26 pub fn new(peers: Vec<(String, String)>) -> Self {
27 let peers = peers
28 .into_iter()
29 .map(|(id, addr)| {
30 (
31 id.clone(),
32 ClusterNode {
33 id,
34 addr,
35 last_seen: None,
36 },
37 )
38 })
39 .collect();
40
41 Self {
42 peers,
43 shutdown_tx: None,
44 }
45 }
46
47 #[instrument(skip(self), fields(interval_ms = %heartbeat_interval.as_millis()))]
48 pub async fn start_heartbeat(&mut self, heartbeat_interval: Duration) -> mpsc::Sender<()> {
49 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
51
52 let peers = self.peers.clone();
54
55 self.shutdown_tx = Some(shutdown_tx.clone());
57
58 tokio::spawn(async move {
60 let mut interval_timer = interval(heartbeat_interval);
61
62 loop {
63 select! {
64 _ = shutdown_rx.recv() => {
66 info!("Received shutdown signal, stopping heartbeat");
67 break;
68 }
69
70 _ = interval_timer.tick() => {
72 for (id, node) in peers.iter() {
73 match Client::connect(&node.addr).await {
74 Ok(mut client) => {
75 match client.send_and_wait(Message::Ping).await {
76 Ok(Message::Pong) => {
77 debug!(node_id = %id, "Peer alive");
78 }
79 _ => {
80 warn!(node_id = %id, "Peer timeout");
81 }
82 }
83 }
84 Err(e) => {
85 warn!(node_id = %id, error = ?e, "Peer unreachable");
86 }
87 }
88 }
89 }
90 }
91 }
92
93 info!("Heartbeat task shut down gracefully");
94 });
95
96 shutdown_tx
98 }
99
100 pub fn get_peers(&self) -> Vec<&ClusterNode> {
101 self.peers.values().collect()
102 }
103
104 #[instrument(skip(self))]
106 pub async fn shutdown(&mut self) {
107 if let Some(tx) = self.shutdown_tx.take() {
108 if tx.send(()).await.is_err() {
109 info!("Heartbeat task already stopped");
110 } else {
111 info!("Shutdown signal sent to heartbeat task");
112 sleep(Duration::from_millis(100)).await;
114 }
115 } else {
116 info!("No active heartbeat to shut down");
117 }
118 }
119}