Skip to main content

orlando_cluster/
failover.rs

1//! Failover state machine for cross-cluster grain ownership promotion.
2//!
3//! Monitors the health of peer clusters and, when the primary for a set of
4//! grains becomes unreachable, attempts promotion via CAS on the cross-cluster
5//! directory. Fence tokens (epochs) prevent stale primaries from reclaiming
6//! ownership after a promotion.
7
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use tokio::sync::watch;
12
13use orlando_core::ClusterId;
14
15use crate::cross_cluster_directory::{CrossClusterDirectory, GrainOwnership};
16use crate::multi_cluster::{ClusterHealth, PeerStatus};
17
18/// Failover progression for a single peer cluster.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum FailoverPhase {
21    /// Peer is healthy, no action needed.
22    Monitoring,
23    /// Peer became unreachable. Waiting for grace period before promoting.
24    PrimaryUnreachable { since: Instant },
25    /// Grace period elapsed. Attempting promotion via directory CAS.
26    Promoting,
27    /// Successfully promoted — this cluster is the new primary for the
28    /// affected grains.
29    Promoted { epoch: u64 },
30}
31
32/// Configuration for failover behavior.
33#[derive(Debug, Clone)]
34pub struct FailoverConfig {
35    /// How long to wait after detecting a peer as unreachable before
36    /// beginning promotion. Prevents premature failover on transient blips.
37    pub grace_period: Duration,
38    /// How often the failover manager checks cluster health.
39    pub check_interval: Duration,
40}
41
42impl Default for FailoverConfig {
43    fn default() -> Self {
44        Self {
45            grace_period: Duration::from_secs(30),
46            check_interval: Duration::from_secs(5),
47        }
48    }
49}
50
51/// Manages failover for a single local cluster watching its peers.
52///
53/// When a peer cluster becomes unreachable for longer than the grace period,
54/// the failover manager attempts to re-register ownership of grains that
55/// were owned by the failed cluster.
56pub struct FailoverManager {
57    local_cluster_id: ClusterId,
58    config: FailoverConfig,
59    health: Arc<ClusterHealth>,
60    directory: Arc<dyn CrossClusterDirectory>,
61    shutdown_rx: watch::Receiver<bool>,
62}
63
64impl FailoverManager {
65    pub fn new(
66        local_cluster_id: ClusterId,
67        config: FailoverConfig,
68        health: Arc<ClusterHealth>,
69        directory: Arc<dyn CrossClusterDirectory>,
70        shutdown_rx: watch::Receiver<bool>,
71    ) -> Self {
72        Self {
73            local_cluster_id,
74            config,
75            health,
76            directory,
77            shutdown_rx,
78        }
79    }
80
81    /// Run the failover monitoring loop. Call via `tokio::spawn`.
82    pub async fn run(mut self) {
83        // Track per-peer failover state
84        let mut peer_phases: std::collections::HashMap<ClusterId, FailoverPhase> =
85            std::collections::HashMap::new();
86
87        loop {
88            tokio::select! {
89                _ = tokio::time::sleep(self.config.check_interval) => {}
90                _ = self.shutdown_rx.changed() => {
91                    tracing::debug!("failover manager shutting down");
92                    return;
93                }
94            }
95
96            let statuses = self.health.all_statuses();
97            for (cluster_id, status) in statuses.iter() {
98                let phase = peer_phases
99                    .entry(cluster_id.clone())
100                    .or_insert(FailoverPhase::Monitoring);
101
102                match (status, &phase) {
103                    (PeerStatus::Healthy, _) => {
104                        if *phase != FailoverPhase::Monitoring {
105                            tracing::info!(
106                                cluster = %cluster_id,
107                                "peer cluster recovered, returning to monitoring"
108                            );
109                            *phase = FailoverPhase::Monitoring;
110                        }
111                    }
112                    (PeerStatus::Unreachable, FailoverPhase::Monitoring) => {
113                        tracing::warn!(
114                            cluster = %cluster_id,
115                            grace_period = ?self.config.grace_period,
116                            "peer cluster unreachable, starting grace period"
117                        );
118                        *phase = FailoverPhase::PrimaryUnreachable {
119                            since: Instant::now(),
120                        };
121                    }
122                    (
123                        PeerStatus::Unreachable,
124                        FailoverPhase::PrimaryUnreachable { since },
125                    ) => {
126                        if since.elapsed() >= self.config.grace_period {
127                            tracing::warn!(
128                                cluster = %cluster_id,
129                                "grace period elapsed, beginning promotion"
130                            );
131                            *phase = FailoverPhase::Promoting;
132                            // Actual promotion happens in the next check cycle
133                            // or can be triggered immediately by the caller
134                        }
135                    }
136                    (PeerStatus::Unreachable, FailoverPhase::Promoting) => {
137                        let grains = match self.directory.list_owned_by(cluster_id).await {
138                            Ok(g) => g,
139                            Err(e) => {
140                                tracing::error!(
141                                    failed_cluster = %cluster_id,
142                                    error = %e,
143                                    "failover: directory list_owned_by failed, will retry next cycle"
144                                );
145                                continue;
146                            }
147                        };
148
149                        tracing::warn!(
150                            failed_cluster = %cluster_id,
151                            local_cluster = %self.local_cluster_id,
152                            grain_count = grains.len(),
153                            "failover: promoting grains from failed cluster"
154                        );
155
156                        let mut max_epoch = 0u64;
157                        let mut promoted = 0usize;
158                        let mut lost = 0usize;
159                        let mut failed = 0usize;
160                        for (grain_id, ownership) in grains {
161                            match self.promote_grain(&grain_id, &ownership).await {
162                                Ok(new_owner) => {
163                                    if new_owner.cluster_id == self.local_cluster_id {
164                                        promoted += 1;
165                                        max_epoch = max_epoch.max(new_owner.epoch);
166                                    } else {
167                                        lost += 1;
168                                    }
169                                }
170                                Err(e) => {
171                                    failed += 1;
172                                    tracing::error!(
173                                        grain_type = grain_id.type_name,
174                                        grain_key = %grain_id.key,
175                                        error = %e,
176                                        "failover: promote_grain failed (continuing best-effort)"
177                                    );
178                                }
179                            }
180                        }
181
182                        tracing::info!(
183                            failed_cluster = %cluster_id,
184                            promoted, lost, failed,
185                            "failover: promotion sweep complete"
186                        );
187
188                        // Transition to Promoted regardless of partial failures;
189                        // next health-check cycle will retry any grains that
190                        // remain owned by the failed cluster.
191                        *phase = FailoverPhase::Promoted { epoch: max_epoch };
192                    }
193                    _ => {}
194                }
195            }
196        }
197    }
198
199    /// Attempt to promote a specific grain owned by a failed cluster.
200    ///
201    /// Uses CAS on the directory with an incremented epoch. Returns the new
202    /// ownership record on success, or the existing owner if another cluster
203    /// won the promotion race.
204    pub async fn promote_grain(
205        &self,
206        grain_id: &orlando_core::GrainId,
207        current_ownership: &GrainOwnership,
208    ) -> Result<GrainOwnership, crate::cross_cluster_directory::DirectoryError> {
209        let new_epoch = current_ownership.epoch + 1;
210
211        tracing::info!(
212            grain_type = grain_id.type_name,
213            grain_key = %grain_id.key,
214            old_cluster = %current_ownership.cluster_id,
215            new_cluster = %self.local_cluster_id,
216            new_epoch = new_epoch,
217            "attempting grain promotion"
218        );
219
220        let result = self
221            .directory
222            .register(grain_id, &self.local_cluster_id, new_epoch)
223            .await?;
224
225        if result.cluster_id == self.local_cluster_id {
226            tracing::info!(
227                grain_type = grain_id.type_name,
228                grain_key = %grain_id.key,
229                epoch = new_epoch,
230                "grain promotion successful"
231            );
232        } else {
233            tracing::info!(
234                grain_type = grain_id.type_name,
235                grain_key = %grain_id.key,
236                winner = %result.cluster_id,
237                "grain promotion lost to another cluster"
238            );
239        }
240
241        Ok(result)
242    }
243}