orlando_cluster/
multi_cluster.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use arc_swap::ArcSwap;
6use tokio::sync::watch;
7
8use orlando_core::ClusterId;
9
10use crate::connection_pool::ConnectionPool;
11
12#[derive(Debug, Clone)]
14pub struct MultiClusterConfig {
15 pub cluster_id: ClusterId,
17 pub peers: HashMap<ClusterId, String>,
19 pub health_check_interval: Duration,
21}
22
23impl MultiClusterConfig {
24 pub fn new(cluster_id: impl Into<String>) -> Self {
25 Self {
26 cluster_id: ClusterId::new(cluster_id),
27 peers: HashMap::new(),
28 health_check_interval: Duration::from_secs(10),
29 }
30 }
31
32 pub fn peer(mut self, cluster_id: impl Into<String>, endpoint: impl Into<String>) -> Self {
33 self.peers
34 .insert(ClusterId::new(cluster_id), endpoint.into());
35 self
36 }
37
38 pub fn health_check_interval(mut self, interval: Duration) -> Self {
39 self.health_check_interval = interval;
40 self
41 }
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum PeerStatus {
47 Healthy,
48 Unreachable,
49 Unknown,
50}
51
52pub struct ClusterHealth {
54 config: MultiClusterConfig,
55 pool: Arc<ConnectionPool>,
56 statuses: Arc<ArcSwap<HashMap<ClusterId, PeerStatus>>>,
57 shutdown_rx: watch::Receiver<bool>,
58}
59
60impl ClusterHealth {
61 pub fn new(
62 config: MultiClusterConfig,
63 pool: Arc<ConnectionPool>,
64 shutdown_rx: watch::Receiver<bool>,
65 ) -> Self {
66 let initial: HashMap<ClusterId, PeerStatus> = config
67 .peers
68 .keys()
69 .map(|id| (id.clone(), PeerStatus::Unknown))
70 .collect();
71
72 Self {
73 config,
74 pool,
75 statuses: Arc::new(ArcSwap::from_pointee(initial)),
76 shutdown_rx,
77 }
78 }
79
80 pub fn status(&self, cluster_id: &ClusterId) -> PeerStatus {
82 self.statuses
83 .load()
84 .get(cluster_id)
85 .cloned()
86 .unwrap_or(PeerStatus::Unknown)
87 }
88
89 pub fn all_statuses(&self) -> Arc<HashMap<ClusterId, PeerStatus>> {
91 self.statuses.load_full()
92 }
93
94 pub fn peer_endpoint(&self, cluster_id: &ClusterId) -> Option<String> {
96 self.config.peers.get(cluster_id).cloned()
97 }
98
99 pub async fn run(mut self) {
101 loop {
102 tokio::select! {
103 _ = tokio::time::sleep(self.config.health_check_interval) => {}
104 _ = self.shutdown_rx.changed() => {
105 tracing::debug!("cluster health checker shutting down");
106 return;
107 }
108 }
109
110 let mut new_statuses = HashMap::new();
111 for (cluster_id, endpoint) in &self.config.peers {
112 let status = match self.pool.get_membership(endpoint).await {
113 Ok(mut client) => {
114 match tokio::time::timeout(
115 Duration::from_secs(5),
116 client.get_members(crate::proto::GetMembersRequest {}),
117 )
118 .await
119 {
120 Ok(Ok(_)) => PeerStatus::Healthy,
121 _ => PeerStatus::Unreachable,
122 }
123 }
124 Err(_) => PeerStatus::Unreachable,
125 };
126
127 if status != self.status(cluster_id) {
128 tracing::info!(
129 cluster = %cluster_id,
130 ?status,
131 "peer cluster status changed"
132 );
133 }
134 new_statuses.insert(cluster_id.clone(), status);
135 }
136 self.statuses.store(Arc::new(new_statuses));
137 }
138 }
139}