Skip to main content

orlando_cluster/
multi_cluster.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use arc_swap::ArcSwap;
6use tokio::sync::watch;
7
8use orlando_core::ClusterId;
9
10use crate::connection_pool::ConnectionPool;
11
12/// Configuration for a multi-cluster deployment.
13#[derive(Debug, Clone)]
14pub struct MultiClusterConfig {
15    /// This cluster's identity.
16    pub cluster_id: ClusterId,
17    /// Gateway endpoints of peer clusters (cluster_id -> "host:port").
18    pub peers: HashMap<ClusterId, String>,
19    /// How often to health-check peer clusters.
20    pub health_check_interval: Duration,
21}
22
23impl MultiClusterConfig {
24    pub fn new(cluster_id: impl Into<String>) -> Self {
25        Self {
26            cluster_id: ClusterId::new(cluster_id),
27            peers: HashMap::new(),
28            health_check_interval: Duration::from_secs(10),
29        }
30    }
31
32    pub fn peer(mut self, cluster_id: impl Into<String>, endpoint: impl Into<String>) -> Self {
33        self.peers
34            .insert(ClusterId::new(cluster_id), endpoint.into());
35        self
36    }
37
38    pub fn health_check_interval(mut self, interval: Duration) -> Self {
39        self.health_check_interval = interval;
40        self
41    }
42}
43
44/// Status of a peer cluster.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum PeerStatus {
47    Healthy,
48    Unreachable,
49    Unknown,
50}
51
52/// Tracks health of peer clusters via periodic gRPC probes.
53pub struct ClusterHealth {
54    config: MultiClusterConfig,
55    pool: Arc<ConnectionPool>,
56    statuses: Arc<ArcSwap<HashMap<ClusterId, PeerStatus>>>,
57    shutdown_rx: watch::Receiver<bool>,
58}
59
60impl ClusterHealth {
61    pub fn new(
62        config: MultiClusterConfig,
63        pool: Arc<ConnectionPool>,
64        shutdown_rx: watch::Receiver<bool>,
65    ) -> Self {
66        let initial: HashMap<ClusterId, PeerStatus> = config
67            .peers
68            .keys()
69            .map(|id| (id.clone(), PeerStatus::Unknown))
70            .collect();
71
72        Self {
73            config,
74            pool,
75            statuses: Arc::new(ArcSwap::from_pointee(initial)),
76            shutdown_rx,
77        }
78    }
79
80    /// Get the current status of a peer cluster.
81    pub fn status(&self, cluster_id: &ClusterId) -> PeerStatus {
82        self.statuses
83            .load()
84            .get(cluster_id)
85            .cloned()
86            .unwrap_or(PeerStatus::Unknown)
87    }
88
89    /// Get a snapshot of all peer statuses.
90    pub fn all_statuses(&self) -> Arc<HashMap<ClusterId, PeerStatus>> {
91        self.statuses.load_full()
92    }
93
94    /// Get the endpoint for a peer cluster.
95    pub fn peer_endpoint(&self, cluster_id: &ClusterId) -> Option<String> {
96        self.config.peers.get(cluster_id).cloned()
97    }
98
99    /// Run the health check loop (call via `tokio::spawn`).
100    pub async fn run(mut self) {
101        loop {
102            tokio::select! {
103                _ = tokio::time::sleep(self.config.health_check_interval) => {}
104                _ = self.shutdown_rx.changed() => {
105                    tracing::debug!("cluster health checker shutting down");
106                    return;
107                }
108            }
109
110            let mut new_statuses = HashMap::new();
111            for (cluster_id, endpoint) in &self.config.peers {
112                let status = match self.pool.get_membership(endpoint).await {
113                    Ok(mut client) => {
114                        match tokio::time::timeout(
115                            Duration::from_secs(5),
116                            client.get_members(crate::proto::GetMembersRequest {}),
117                        )
118                        .await
119                        {
120                            Ok(Ok(_)) => PeerStatus::Healthy,
121                            _ => PeerStatus::Unreachable,
122                        }
123                    }
124                    Err(_) => PeerStatus::Unreachable,
125                };
126
127                if status != self.status(cluster_id) {
128                    tracing::info!(
129                        cluster = %cluster_id,
130                        ?status,
131                        "peer cluster status changed"
132                    );
133                }
134                new_statuses.insert(cluster_id.clone(), status);
135            }
136            self.statuses.store(Arc::new(new_statuses));
137        }
138    }
139}