k8s_operator_controller/
leader.rs1use std::time::Duration;
2
3use tokio::sync::watch;
4use tracing::{debug, info};
5
6use k8s_operator_core::{NodeRole, ReconcileError};
7
8pub struct LeaderGuard {
9 role_rx: watch::Receiver<NodeRole>,
10}
11
12impl LeaderGuard {
13 pub fn new(role_rx: watch::Receiver<NodeRole>) -> Self {
14 Self { role_rx }
15 }
16
17 pub fn is_leader(&self) -> bool {
18 matches!(*self.role_rx.borrow(), NodeRole::Leader)
19 }
20
21 pub fn current_role(&self) -> NodeRole {
22 *self.role_rx.borrow()
23 }
24
25 pub async fn wait_for_leadership(&mut self) -> NodeRole {
26 loop {
27 let role = *self.role_rx.borrow_and_update();
28 if role == NodeRole::Leader {
29 return role;
30 }
31
32 if self.role_rx.changed().await.is_err() {
33 return NodeRole::Follower;
34 }
35 }
36 }
37
38 pub async fn wait_for_leadership_with_timeout(&mut self, timeout: Duration) -> Option<NodeRole> {
39 tokio::select! {
40 role = self.wait_for_leadership() => Some(role),
41 _ = tokio::time::sleep(timeout) => None,
42 }
43 }
44
45 pub fn check(&self) -> Result<(), ReconcileError> {
46 if self.is_leader() {
47 Ok(())
48 } else {
49 Err(ReconcileError::NotLeader)
50 }
51 }
52}
53
54impl Clone for LeaderGuard {
55 fn clone(&self) -> Self {
56 Self {
57 role_rx: self.role_rx.clone(),
58 }
59 }
60}
61
62pub struct LeaderElection {
63 role_tx: watch::Sender<NodeRole>,
64 role_rx: watch::Receiver<NodeRole>,
65}
66
67impl LeaderElection {
68 pub fn new() -> Self {
69 let (role_tx, role_rx) = watch::channel(NodeRole::Follower);
70 Self { role_tx, role_rx }
71 }
72
73 pub fn set_role(&self, role: NodeRole) {
74 let _ = self.role_tx.send(role);
75 match role {
76 NodeRole::Leader => info!("This node is now the leader"),
77 NodeRole::Follower => debug!("This node is now a follower"),
78 NodeRole::Candidate => debug!("This node is now a candidate"),
79 NodeRole::Learner => debug!("This node is now a learner"),
80 }
81 }
82
83 pub fn guard(&self) -> LeaderGuard {
84 LeaderGuard::new(self.role_rx.clone())
85 }
86
87 pub fn subscribe(&self) -> watch::Receiver<NodeRole> {
88 self.role_rx.clone()
89 }
90}
91
92impl Default for LeaderElection {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98pub fn leader_only<F, T>(guard: &LeaderGuard, f: F) -> Result<T, ReconcileError>
99where
100 F: FnOnce() -> T,
101{
102 guard.check()?;
103 Ok(f())
104}
105
106pub async fn leader_only_async<F, Fut, T>(guard: &LeaderGuard, f: F) -> Result<T, ReconcileError>
107where
108 F: FnOnce() -> Fut,
109 Fut: std::future::Future<Output = T>,
110{
111 guard.check()?;
112 Ok(f().await)
113}