1use super::passive::PassiveHandle;
6use super::primary::ReplicatorConfig;
7use super::protocol::FencingToken;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use tokio::sync::watch;
11use tracing::{error, info};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum NodeRole {
16 Primary,
18
19 Passive,
21
22 Promoting,
24
25 Standalone,
27}
28
29impl std::fmt::Display for NodeRole {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 match self {
32 NodeRole::Primary => write!(f, "primary"),
33 NodeRole::Passive => write!(f, "passive"),
34 NodeRole::Promoting => write!(f, "promoting"),
35 NodeRole::Standalone => write!(f, "standalone"),
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum FailoverState {
43 Normal,
45
46 InProgress,
48
49 Completed,
51
52 Failed,
54}
55
56#[derive(Debug, Clone)]
58pub struct FailoverConfig {
59 pub node_id: String,
61
62 pub initial_role: NodeRole,
64
65 pub auto_promote: bool,
67
68 pub promotion_delay_ms: u64,
70
71 pub replicator_config: Option<ReplicatorConfig>,
73}
74
75impl Default for FailoverConfig {
76 fn default() -> Self {
77 Self {
78 node_id: "node".to_string(),
79 initial_role: NodeRole::Standalone,
80 auto_promote: true,
81 promotion_delay_ms: 5000,
82 replicator_config: None,
83 }
84 }
85}
86
87pub struct FailoverCoordinator {
89 config: FailoverConfig,
90
91 role: Arc<RwLock<NodeRole>>,
93
94 failover_state: Arc<RwLock<FailoverState>>,
96
97 fencing_token: AtomicU64,
99
100 role_tx: watch::Sender<NodeRole>,
102
103 role_rx: watch::Receiver<NodeRole>,
105}
106
107impl FailoverCoordinator {
108 pub fn new(config: FailoverConfig) -> Self {
110 let (role_tx, role_rx) = watch::channel(config.initial_role);
111
112 Self {
113 role: Arc::new(RwLock::new(config.initial_role)),
114 failover_state: Arc::new(RwLock::new(FailoverState::Normal)),
115 fencing_token: AtomicU64::new(0),
116 role_tx,
117 role_rx,
118 config,
119 }
120 }
121
122 pub fn role(&self) -> NodeRole {
124 self.role.read().map(|r| *r).unwrap_or(NodeRole::Standalone)
125 }
126
127 pub fn failover_state(&self) -> FailoverState {
129 self.failover_state.read().map(|s| *s).unwrap_or(FailoverState::Normal)
130 }
131
132 pub fn fencing_token(&self) -> u64 {
134 self.fencing_token.load(Ordering::Acquire)
135 }
136
137 pub fn subscribe_role_changes(&self) -> watch::Receiver<NodeRole> {
139 self.role_rx.clone()
140 }
141
142 pub async fn promote(&self, passive_handle: &PassiveHandle) -> Result<u64, FailoverError> {
146 let current_role = self.role();
148 if current_role != NodeRole::Passive {
149 return Err(FailoverError::InvalidRoleTransition {
150 from: current_role,
151 to: NodeRole::Primary,
152 });
153 }
154
155 {
157 let mut state = self.failover_state.write()
158 .map_err(|_| FailoverError::LockPoisoned)?;
159 *state = FailoverState::InProgress;
160 }
161
162 info!("Starting promotion to primary");
163
164 tokio::time::sleep(std::time::Duration::from_millis(self.config.promotion_delay_ms)).await;
166
167 let new_token = self.fencing_token.fetch_add(1, Ordering::AcqRel) + 1;
169
170 let integrity = passive_handle.integrity();
172 let validators = passive_handle.validator_states();
173
174 info!(
175 sequence = integrity.sequence_number,
176 validators = validators.len(),
177 fencing_token = new_token,
178 "Promotion state captured"
179 );
180
181 {
183 let mut role = self.role.write()
184 .map_err(|_| FailoverError::LockPoisoned)?;
185 *role = NodeRole::Primary;
186 }
187
188 let _ = self.role_tx.send(NodeRole::Primary);
190
191 {
193 let mut state = self.failover_state.write()
194 .map_err(|_| FailoverError::LockPoisoned)?;
195 *state = FailoverState::Completed;
196 }
197
198 info!(
199 fencing_token = new_token,
200 "Successfully promoted to primary"
201 );
202
203 crate::metrics::set_node_role("primary");
205
206 Ok(new_token)
207 }
208
209 pub fn demote(&self) -> Result<(), FailoverError> {
211 let current_role = self.role();
212 if current_role != NodeRole::Primary {
213 return Err(FailoverError::InvalidRoleTransition {
214 from: current_role,
215 to: NodeRole::Passive,
216 });
217 }
218
219 info!("Demoting to passive");
220
221 {
222 let mut role = self.role.write()
223 .map_err(|_| FailoverError::LockPoisoned)?;
224 *role = NodeRole::Passive;
225 }
226
227 let _ = self.role_tx.send(NodeRole::Passive);
228
229 crate::metrics::set_node_role("passive");
231
232 Ok(())
233 }
234
235 pub fn set_role(&self, new_role: NodeRole) -> Result<(), FailoverError> {
237 {
238 let mut role = self.role.write()
239 .map_err(|_| FailoverError::LockPoisoned)?;
240 *role = new_role;
241 }
242
243 let _ = self.role_tx.send(new_role);
244
245 crate::metrics::set_node_role(&new_role.to_string());
247
248 Ok(())
249 }
250
251 pub fn is_primary(&self) -> bool {
253 self.role() == NodeRole::Primary
254 }
255
256 pub fn is_passive(&self) -> bool {
258 self.role() == NodeRole::Passive
259 }
260
261 pub fn validate_fencing_token(&self, token: &FencingToken) -> bool {
263 let our_token = self.fencing_token.load(Ordering::Acquire);
264 token.token >= our_token
265 }
266}
267
268#[derive(Debug, thiserror::Error)]
270pub enum FailoverError {
271 #[error("Invalid role transition from {from} to {to}")]
272 InvalidRoleTransition { from: NodeRole, to: NodeRole },
273
274 #[error("Lock poisoned")]
275 LockPoisoned,
276
277 #[error("Fencing token rejected")]
278 FencingTokenRejected,
279
280 #[error("Promotion failed: {0}")]
281 PromotionFailed(String),
282}
283
284pub fn start_auto_failover(
289 coordinator: Arc<FailoverCoordinator>,
290 passive_handle: PassiveHandle,
291) -> watch::Receiver<bool> {
292 let (done_tx, done_rx) = watch::channel(false);
293
294 let mut failover_rx = passive_handle.subscribe_failover();
295
296 tokio::spawn(async move {
297 loop {
298 if failover_rx.changed().await.is_err() {
300 break;
301 }
302
303 if *failover_rx.borrow() {
304 info!("Failover detected, attempting promotion");
305
306 match coordinator.promote(&passive_handle).await {
307 Ok(token) => {
308 info!(fencing_token = token, "Promotion successful");
309 let _ = done_tx.send(true);
310 break;
311 }
312 Err(e) => {
313 error!(error = %e, "Promotion failed");
314 }
315 }
316 }
317 }
318 });
319
320 done_rx
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_coordinator_creation() {
329 let config = FailoverConfig {
330 node_id: "test-node".to_string(),
331 initial_role: NodeRole::Passive,
332 ..Default::default()
333 };
334
335 let coordinator = FailoverCoordinator::new(config);
336 assert_eq!(coordinator.role(), NodeRole::Passive);
337 assert_eq!(coordinator.failover_state(), FailoverState::Normal);
338 }
339
340 #[test]
341 fn test_role_change() {
342 let config = FailoverConfig {
343 initial_role: NodeRole::Standalone,
344 ..Default::default()
345 };
346
347 let coordinator = FailoverCoordinator::new(config);
348 assert_eq!(coordinator.role(), NodeRole::Standalone);
349
350 coordinator.set_role(NodeRole::Primary).unwrap();
351 assert_eq!(coordinator.role(), NodeRole::Primary);
352 assert!(coordinator.is_primary());
353 }
354
355 #[test]
356 fn test_fencing_token_validation() {
357 let config = FailoverConfig::default();
358 let coordinator = FailoverCoordinator::new(config);
359
360 let token = FencingToken::new(1, "node-a");
361 assert!(coordinator.validate_fencing_token(&token));
362
363 coordinator.fencing_token.store(5, Ordering::Release);
365
366 assert!(!coordinator.validate_fencing_token(&token));
368
369 let newer_token = FencingToken::new(5, "node-b");
371 assert!(coordinator.validate_fencing_token(&newer_token));
372 }
373}