network_protocol/transport/
cluster.rs

1use std::collections::HashMap;
2// No need for Arc in this module
3use std::time::{Duration, Instant};
4use tokio::select;
5use tokio::sync::mpsc;
6use tokio::time::{interval, sleep};
7use tracing::{debug, info, instrument, warn};
8
9use crate::protocol::message::Message;
10use crate::service::client::Client;
11//use crate::error::Result;
12
13#[derive(Debug, Clone)]
14pub struct ClusterNode {
15    pub id: String,
16    pub addr: String,
17    pub last_seen: Option<Instant>,
18}
19
20pub struct Cluster {
21    peers: HashMap<String, ClusterNode>,
22    shutdown_tx: Option<mpsc::Sender<()>>,
23}
24
25impl Cluster {
26    pub fn new(peers: Vec<(String, String)>) -> Self {
27        let peers = peers
28            .into_iter()
29            .map(|(id, addr)| {
30                (
31                    id.clone(),
32                    ClusterNode {
33                        id,
34                        addr,
35                        last_seen: None,
36                    },
37                )
38            })
39            .collect();
40
41        Self {
42            peers,
43            shutdown_tx: None,
44        }
45    }
46
47    #[instrument(skip(self), fields(interval_ms = %heartbeat_interval.as_millis()))]
48    pub async fn start_heartbeat(&mut self, heartbeat_interval: Duration) -> mpsc::Sender<()> {
49        // Create shutdown channel
50        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
51
52        // Clone necessary data for the heartbeat task
53        let peers = self.peers.clone();
54
55        // Store the sender for shutdown
56        self.shutdown_tx = Some(shutdown_tx.clone());
57
58        // Spawn the heartbeat task
59        tokio::spawn(async move {
60            let mut interval_timer = interval(heartbeat_interval);
61
62            loop {
63                select! {
64                    // Check for shutdown signal
65                    _ = shutdown_rx.recv() => {
66                        info!("Received shutdown signal, stopping heartbeat");
67                        break;
68                    }
69
70                    // Run heartbeat on interval
71                    _ = interval_timer.tick() => {
72                        for (id, node) in peers.iter() {
73                            match Client::connect(&node.addr).await {
74                                Ok(mut client) => {
75                                    match client.send_and_wait(Message::Ping).await {
76                                        Ok(Message::Pong) => {
77                                            debug!(node_id = %id, "Peer alive");
78                                        }
79                                        _ => {
80                                            warn!(node_id = %id, "Peer timeout");
81                                        }
82                                    }
83                                }
84                                Err(e) => {
85                                    warn!(node_id = %id, error = ?e, "Peer unreachable");
86                                }
87                            }
88                        }
89                    }
90                }
91            }
92
93            info!("Heartbeat task shut down gracefully");
94        });
95
96        // Return the shutdown sender so the caller can trigger shutdown
97        shutdown_tx
98    }
99
100    pub fn get_peers(&self) -> Vec<&ClusterNode> {
101        self.peers.values().collect()
102    }
103
104    /// Gracefully shut down the cluster's heartbeat task
105    #[instrument(skip(self))]
106    pub async fn shutdown(&mut self) {
107        if let Some(tx) = self.shutdown_tx.take() {
108            if tx.send(()).await.is_err() {
109                info!("Heartbeat task already stopped");
110            } else {
111                info!("Shutdown signal sent to heartbeat task");
112                // Give heartbeat task time to finish
113                sleep(Duration::from_millis(100)).await;
114            }
115        } else {
116            info!("No active heartbeat to shut down");
117        }
118    }
119}