orlando_cluster/
failover.rs1use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use tokio::sync::watch;
12
13use orlando_core::ClusterId;
14
15use crate::cross_cluster_directory::{CrossClusterDirectory, GrainOwnership};
16use crate::multi_cluster::{ClusterHealth, PeerStatus};
17
18#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum FailoverPhase {
21 Monitoring,
23 PrimaryUnreachable { since: Instant },
25 Promoting,
27 Promoted { epoch: u64 },
30}
31
32#[derive(Debug, Clone)]
34pub struct FailoverConfig {
35 pub grace_period: Duration,
38 pub check_interval: Duration,
40}
41
42impl Default for FailoverConfig {
43 fn default() -> Self {
44 Self {
45 grace_period: Duration::from_secs(30),
46 check_interval: Duration::from_secs(5),
47 }
48 }
49}
50
51pub struct FailoverManager {
57 local_cluster_id: ClusterId,
58 config: FailoverConfig,
59 health: Arc<ClusterHealth>,
60 directory: Arc<dyn CrossClusterDirectory>,
61 shutdown_rx: watch::Receiver<bool>,
62}
63
64impl FailoverManager {
65 pub fn new(
66 local_cluster_id: ClusterId,
67 config: FailoverConfig,
68 health: Arc<ClusterHealth>,
69 directory: Arc<dyn CrossClusterDirectory>,
70 shutdown_rx: watch::Receiver<bool>,
71 ) -> Self {
72 Self {
73 local_cluster_id,
74 config,
75 health,
76 directory,
77 shutdown_rx,
78 }
79 }
80
81 pub async fn run(mut self) {
83 let mut peer_phases: std::collections::HashMap<ClusterId, FailoverPhase> =
85 std::collections::HashMap::new();
86
87 loop {
88 tokio::select! {
89 _ = tokio::time::sleep(self.config.check_interval) => {}
90 _ = self.shutdown_rx.changed() => {
91 tracing::debug!("failover manager shutting down");
92 return;
93 }
94 }
95
96 let statuses = self.health.all_statuses();
97 for (cluster_id, status) in statuses.iter() {
98 let phase = peer_phases
99 .entry(cluster_id.clone())
100 .or_insert(FailoverPhase::Monitoring);
101
102 match (status, &phase) {
103 (PeerStatus::Healthy, _) => {
104 if *phase != FailoverPhase::Monitoring {
105 tracing::info!(
106 cluster = %cluster_id,
107 "peer cluster recovered, returning to monitoring"
108 );
109 *phase = FailoverPhase::Monitoring;
110 }
111 }
112 (PeerStatus::Unreachable, FailoverPhase::Monitoring) => {
113 tracing::warn!(
114 cluster = %cluster_id,
115 grace_period = ?self.config.grace_period,
116 "peer cluster unreachable, starting grace period"
117 );
118 *phase = FailoverPhase::PrimaryUnreachable {
119 since: Instant::now(),
120 };
121 }
122 (
123 PeerStatus::Unreachable,
124 FailoverPhase::PrimaryUnreachable { since },
125 ) => {
126 if since.elapsed() >= self.config.grace_period {
127 tracing::warn!(
128 cluster = %cluster_id,
129 "grace period elapsed, beginning promotion"
130 );
131 *phase = FailoverPhase::Promoting;
132 }
135 }
136 (PeerStatus::Unreachable, FailoverPhase::Promoting) => {
137 let grains = match self.directory.list_owned_by(cluster_id).await {
138 Ok(g) => g,
139 Err(e) => {
140 tracing::error!(
141 failed_cluster = %cluster_id,
142 error = %e,
143 "failover: directory list_owned_by failed, will retry next cycle"
144 );
145 continue;
146 }
147 };
148
149 tracing::warn!(
150 failed_cluster = %cluster_id,
151 local_cluster = %self.local_cluster_id,
152 grain_count = grains.len(),
153 "failover: promoting grains from failed cluster"
154 );
155
156 let mut max_epoch = 0u64;
157 let mut promoted = 0usize;
158 let mut lost = 0usize;
159 let mut failed = 0usize;
160 for (grain_id, ownership) in grains {
161 match self.promote_grain(&grain_id, &ownership).await {
162 Ok(new_owner) => {
163 if new_owner.cluster_id == self.local_cluster_id {
164 promoted += 1;
165 max_epoch = max_epoch.max(new_owner.epoch);
166 } else {
167 lost += 1;
168 }
169 }
170 Err(e) => {
171 failed += 1;
172 tracing::error!(
173 grain_type = grain_id.type_name,
174 grain_key = %grain_id.key,
175 error = %e,
176 "failover: promote_grain failed (continuing best-effort)"
177 );
178 }
179 }
180 }
181
182 tracing::info!(
183 failed_cluster = %cluster_id,
184 promoted, lost, failed,
185 "failover: promotion sweep complete"
186 );
187
188 *phase = FailoverPhase::Promoted { epoch: max_epoch };
192 }
193 _ => {}
194 }
195 }
196 }
197 }
198
199 pub async fn promote_grain(
205 &self,
206 grain_id: &orlando_core::GrainId,
207 current_ownership: &GrainOwnership,
208 ) -> Result<GrainOwnership, crate::cross_cluster_directory::DirectoryError> {
209 let new_epoch = current_ownership.epoch + 1;
210
211 tracing::info!(
212 grain_type = grain_id.type_name,
213 grain_key = %grain_id.key,
214 old_cluster = %current_ownership.cluster_id,
215 new_cluster = %self.local_cluster_id,
216 new_epoch = new_epoch,
217 "attempting grain promotion"
218 );
219
220 let result = self
221 .directory
222 .register(grain_id, &self.local_cluster_id, new_epoch)
223 .await?;
224
225 if result.cluster_id == self.local_cluster_id {
226 tracing::info!(
227 grain_type = grain_id.type_name,
228 grain_key = %grain_id.key,
229 epoch = new_epoch,
230 "grain promotion successful"
231 );
232 } else {
233 tracing::info!(
234 grain_type = grain_id.type_name,
235 grain_key = %grain_id.key,
236 winner = %result.cluster_id,
237 "grain promotion lost to another cluster"
238 );
239 }
240
241 Ok(result)
242 }
243}