Skip to main content

nklave_core/replication/
failover.rs

1//! Failover coordination and promotion logic
2//!
3//! Handles the transition from passive to primary role when failover is detected.
4
5use super::passive::PassiveHandle;
6use super::primary::ReplicatorConfig;
7use super::protocol::FencingToken;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use tokio::sync::watch;
11use tracing::{error, info};
12
13/// Node role in the replication cluster
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum NodeRole {
16    /// Primary node - handles signing and replicates to passives
17    Primary,
18
19    /// Passive node - receives replicated state, ready for failover
20    Passive,
21
22    /// Promoting - in the process of becoming primary
23    Promoting,
24
25    /// Standalone - not participating in replication
26    Standalone,
27}
28
29impl std::fmt::Display for NodeRole {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        match self {
32            NodeRole::Primary => write!(f, "primary"),
33            NodeRole::Passive => write!(f, "passive"),
34            NodeRole::Promoting => write!(f, "promoting"),
35            NodeRole::Standalone => write!(f, "standalone"),
36        }
37    }
38}
39
40/// Failover state
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum FailoverState {
43    /// Normal operation
44    Normal,
45
46    /// Failover in progress
47    InProgress,
48
49    /// Failover completed
50    Completed,
51
52    /// Failover failed
53    Failed,
54}
55
56/// Configuration for failover coordination
57#[derive(Debug, Clone)]
58pub struct FailoverConfig {
59    /// Node identifier
60    pub node_id: String,
61
62    /// Initial role
63    pub initial_role: NodeRole,
64
65    /// Whether to auto-promote on failover detection
66    pub auto_promote: bool,
67
68    /// Minimum time to wait after primary failure before promoting (ms)
69    pub promotion_delay_ms: u64,
70
71    /// Replicator configuration (for primary/after promotion)
72    pub replicator_config: Option<ReplicatorConfig>,
73}
74
75impl Default for FailoverConfig {
76    fn default() -> Self {
77        Self {
78            node_id: "node".to_string(),
79            initial_role: NodeRole::Standalone,
80            auto_promote: true,
81            promotion_delay_ms: 5000,
82            replicator_config: None,
83        }
84    }
85}
86
87/// Failover coordinator manages role transitions
88pub struct FailoverCoordinator {
89    config: FailoverConfig,
90
91    /// Current role
92    role: Arc<RwLock<NodeRole>>,
93
94    /// Current failover state
95    failover_state: Arc<RwLock<FailoverState>>,
96
97    /// Current fencing token
98    fencing_token: AtomicU64,
99
100    /// Role change notifications
101    role_tx: watch::Sender<NodeRole>,
102
103    /// Role change receiver
104    role_rx: watch::Receiver<NodeRole>,
105}
106
107impl FailoverCoordinator {
108    /// Create a new failover coordinator
109    pub fn new(config: FailoverConfig) -> Self {
110        let (role_tx, role_rx) = watch::channel(config.initial_role);
111
112        Self {
113            role: Arc::new(RwLock::new(config.initial_role)),
114            failover_state: Arc::new(RwLock::new(FailoverState::Normal)),
115            fencing_token: AtomicU64::new(0),
116            role_tx,
117            role_rx,
118            config,
119        }
120    }
121
122    /// Get current role
123    pub fn role(&self) -> NodeRole {
124        self.role.read().map(|r| *r).unwrap_or(NodeRole::Standalone)
125    }
126
127    /// Get current failover state
128    pub fn failover_state(&self) -> FailoverState {
129        self.failover_state.read().map(|s| *s).unwrap_or(FailoverState::Normal)
130    }
131
132    /// Get current fencing token
133    pub fn fencing_token(&self) -> u64 {
134        self.fencing_token.load(Ordering::Acquire)
135    }
136
137    /// Subscribe to role changes
138    pub fn subscribe_role_changes(&self) -> watch::Receiver<NodeRole> {
139        self.role_rx.clone()
140    }
141
142    /// Attempt to promote this node to primary
143    ///
144    /// Returns Ok(new_fencing_token) on success
145    pub async fn promote(&self, passive_handle: &PassiveHandle) -> Result<u64, FailoverError> {
146        // Check current role
147        let current_role = self.role();
148        if current_role != NodeRole::Passive {
149            return Err(FailoverError::InvalidRoleTransition {
150                from: current_role,
151                to: NodeRole::Primary,
152            });
153        }
154
155        // Set state to promoting
156        {
157            let mut state = self.failover_state.write()
158                .map_err(|_| FailoverError::LockPoisoned)?;
159            *state = FailoverState::InProgress;
160        }
161
162        info!("Starting promotion to primary");
163
164        // Wait for promotion delay (allows time for other nodes to detect failure)
165        tokio::time::sleep(std::time::Duration::from_millis(self.config.promotion_delay_ms)).await;
166
167        // Generate new fencing token
168        let new_token = self.fencing_token.fetch_add(1, Ordering::AcqRel) + 1;
169
170        // Get current state from passive
171        let integrity = passive_handle.integrity();
172        let validators = passive_handle.validator_states();
173
174        info!(
175            sequence = integrity.sequence_number,
176            validators = validators.len(),
177            fencing_token = new_token,
178            "Promotion state captured"
179        );
180
181        // Update role
182        {
183            let mut role = self.role.write()
184                .map_err(|_| FailoverError::LockPoisoned)?;
185            *role = NodeRole::Primary;
186        }
187
188        // Notify role change
189        let _ = self.role_tx.send(NodeRole::Primary);
190
191        // Update failover state
192        {
193            let mut state = self.failover_state.write()
194                .map_err(|_| FailoverError::LockPoisoned)?;
195            *state = FailoverState::Completed;
196        }
197
198        info!(
199            fencing_token = new_token,
200            "Successfully promoted to primary"
201        );
202
203        // Update metrics
204        crate::metrics::set_node_role("primary");
205
206        Ok(new_token)
207    }
208
209    /// Demote this node back to passive (e.g., when original primary recovers)
210    pub fn demote(&self) -> Result<(), FailoverError> {
211        let current_role = self.role();
212        if current_role != NodeRole::Primary {
213            return Err(FailoverError::InvalidRoleTransition {
214                from: current_role,
215                to: NodeRole::Passive,
216            });
217        }
218
219        info!("Demoting to passive");
220
221        {
222            let mut role = self.role.write()
223                .map_err(|_| FailoverError::LockPoisoned)?;
224            *role = NodeRole::Passive;
225        }
226
227        let _ = self.role_tx.send(NodeRole::Passive);
228
229        // Update metrics
230        crate::metrics::set_node_role("passive");
231
232        Ok(())
233    }
234
235    /// Set role directly (for initialization)
236    pub fn set_role(&self, new_role: NodeRole) -> Result<(), FailoverError> {
237        {
238            let mut role = self.role.write()
239                .map_err(|_| FailoverError::LockPoisoned)?;
240            *role = new_role;
241        }
242
243        let _ = self.role_tx.send(new_role);
244
245        // Update metrics
246        crate::metrics::set_node_role(&new_role.to_string());
247
248        Ok(())
249    }
250
251    /// Check if this is the primary node
252    pub fn is_primary(&self) -> bool {
253        self.role() == NodeRole::Primary
254    }
255
256    /// Check if this is a passive node
257    pub fn is_passive(&self) -> bool {
258        self.role() == NodeRole::Passive
259    }
260
261    /// Validate a fencing token (reject if stale)
262    pub fn validate_fencing_token(&self, token: &FencingToken) -> bool {
263        let our_token = self.fencing_token.load(Ordering::Acquire);
264        token.token >= our_token
265    }
266}
267
268/// Errors from failover operations
269#[derive(Debug, thiserror::Error)]
270pub enum FailoverError {
271    #[error("Invalid role transition from {from} to {to}")]
272    InvalidRoleTransition { from: NodeRole, to: NodeRole },
273
274    #[error("Lock poisoned")]
275    LockPoisoned,
276
277    #[error("Fencing token rejected")]
278    FencingTokenRejected,
279
280    #[error("Promotion failed: {0}")]
281    PromotionFailed(String),
282}
283
284/// Start automatic failover monitoring
285///
286/// This spawns a task that monitors for failover events and promotes
287/// the passive node when appropriate.
288pub fn start_auto_failover(
289    coordinator: Arc<FailoverCoordinator>,
290    passive_handle: PassiveHandle,
291) -> watch::Receiver<bool> {
292    let (done_tx, done_rx) = watch::channel(false);
293
294    let mut failover_rx = passive_handle.subscribe_failover();
295
296    tokio::spawn(async move {
297        loop {
298            // Wait for failover signal
299            if failover_rx.changed().await.is_err() {
300                break;
301            }
302
303            if *failover_rx.borrow() {
304                info!("Failover detected, attempting promotion");
305
306                match coordinator.promote(&passive_handle).await {
307                    Ok(token) => {
308                        info!(fencing_token = token, "Promotion successful");
309                        let _ = done_tx.send(true);
310                        break;
311                    }
312                    Err(e) => {
313                        error!(error = %e, "Promotion failed");
314                    }
315                }
316            }
317        }
318    });
319
320    done_rx
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_coordinator_creation() {
329        let config = FailoverConfig {
330            node_id: "test-node".to_string(),
331            initial_role: NodeRole::Passive,
332            ..Default::default()
333        };
334
335        let coordinator = FailoverCoordinator::new(config);
336        assert_eq!(coordinator.role(), NodeRole::Passive);
337        assert_eq!(coordinator.failover_state(), FailoverState::Normal);
338    }
339
340    #[test]
341    fn test_role_change() {
342        let config = FailoverConfig {
343            initial_role: NodeRole::Standalone,
344            ..Default::default()
345        };
346
347        let coordinator = FailoverCoordinator::new(config);
348        assert_eq!(coordinator.role(), NodeRole::Standalone);
349
350        coordinator.set_role(NodeRole::Primary).unwrap();
351        assert_eq!(coordinator.role(), NodeRole::Primary);
352        assert!(coordinator.is_primary());
353    }
354
355    #[test]
356    fn test_fencing_token_validation() {
357        let config = FailoverConfig::default();
358        let coordinator = FailoverCoordinator::new(config);
359
360        let token = FencingToken::new(1, "node-a");
361        assert!(coordinator.validate_fencing_token(&token));
362
363        // Increment our token
364        coordinator.fencing_token.store(5, Ordering::Release);
365
366        // Old token should be rejected
367        assert!(!coordinator.validate_fencing_token(&token));
368
369        // Current or newer should be accepted
370        let newer_token = FencingToken::new(5, "node-b");
371        assert!(coordinator.validate_fencing_token(&newer_token));
372    }
373}