k8s_operator_controller/
leader.rs

1use std::time::Duration;
2
3use tokio::sync::watch;
4use tracing::{debug, info};
5
6use k8s_operator_core::{NodeRole, ReconcileError};
7
8pub struct LeaderGuard {
9    role_rx: watch::Receiver<NodeRole>,
10}
11
12impl LeaderGuard {
13    pub fn new(role_rx: watch::Receiver<NodeRole>) -> Self {
14        Self { role_rx }
15    }
16
17    pub fn is_leader(&self) -> bool {
18        matches!(*self.role_rx.borrow(), NodeRole::Leader)
19    }
20
21    pub fn current_role(&self) -> NodeRole {
22        *self.role_rx.borrow()
23    }
24
25    pub async fn wait_for_leadership(&mut self) -> NodeRole {
26        loop {
27            let role = *self.role_rx.borrow_and_update();
28            if role == NodeRole::Leader {
29                return role;
30            }
31
32            if self.role_rx.changed().await.is_err() {
33                return NodeRole::Follower;
34            }
35        }
36    }
37
38    pub async fn wait_for_leadership_with_timeout(&mut self, timeout: Duration) -> Option<NodeRole> {
39        tokio::select! {
40            role = self.wait_for_leadership() => Some(role),
41            _ = tokio::time::sleep(timeout) => None,
42        }
43    }
44
45    pub fn check(&self) -> Result<(), ReconcileError> {
46        if self.is_leader() {
47            Ok(())
48        } else {
49            Err(ReconcileError::NotLeader)
50        }
51    }
52}
53
54impl Clone for LeaderGuard {
55    fn clone(&self) -> Self {
56        Self {
57            role_rx: self.role_rx.clone(),
58        }
59    }
60}
61
62pub struct LeaderElection {
63    role_tx: watch::Sender<NodeRole>,
64    role_rx: watch::Receiver<NodeRole>,
65}
66
67impl LeaderElection {
68    pub fn new() -> Self {
69        let (role_tx, role_rx) = watch::channel(NodeRole::Follower);
70        Self { role_tx, role_rx }
71    }
72
73    pub fn set_role(&self, role: NodeRole) {
74        let _ = self.role_tx.send(role);
75        match role {
76            NodeRole::Leader => info!("This node is now the leader"),
77            NodeRole::Follower => debug!("This node is now a follower"),
78            NodeRole::Candidate => debug!("This node is now a candidate"),
79            NodeRole::Learner => debug!("This node is now a learner"),
80        }
81    }
82
83    pub fn guard(&self) -> LeaderGuard {
84        LeaderGuard::new(self.role_rx.clone())
85    }
86
87    pub fn subscribe(&self) -> watch::Receiver<NodeRole> {
88        self.role_rx.clone()
89    }
90}
91
92impl Default for LeaderElection {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98pub fn leader_only<F, T>(guard: &LeaderGuard, f: F) -> Result<T, ReconcileError>
99where
100    F: FnOnce() -> T,
101{
102    guard.check()?;
103    Ok(f())
104}
105
106pub async fn leader_only_async<F, Fut, T>(guard: &LeaderGuard, f: F) -> Result<T, ReconcileError>
107where
108    F: FnOnce() -> Fut,
109    Fut: std::future::Future<Output = T>,
110{
111    guard.check()?;
112    Ok(f().await)
113}