nodedb_cluster/decommission/
observer.rs1use std::sync::{Arc, RwLock};
24use std::time::Duration;
25
26use tokio::sync::watch;
27use tokio::time::interval;
28use tracing::{info, warn};
29
30use crate::topology::{ClusterTopology, NodeState};
31
32pub struct DecommissionObserver {
35 topology: Arc<RwLock<ClusterTopology>>,
36 local_node_id: u64,
37 shutdown_tx: watch::Sender<bool>,
38 poll_interval: Duration,
39}
40
41impl DecommissionObserver {
42 pub fn new(
47 topology: Arc<RwLock<ClusterTopology>>,
48 local_node_id: u64,
49 poll_interval: Duration,
50 ) -> (Self, watch::Receiver<bool>) {
51 let (shutdown_tx, shutdown_rx) = watch::channel(false);
52 (
53 Self {
54 topology,
55 local_node_id,
56 shutdown_tx,
57 poll_interval,
58 },
59 shutdown_rx,
60 )
61 }
62
63 pub fn check_once(&self) -> bool {
67 if *self.shutdown_tx.borrow() {
68 return true;
69 }
70 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
71 let should_fire = match topo.get_node(self.local_node_id) {
72 Some(node) => node.state == NodeState::Decommissioned,
73 None => true,
77 };
78 if should_fire {
79 info!(
80 local_node_id = self.local_node_id,
81 "decommission observer firing local shutdown signal"
82 );
83 if let Err(e) = self.shutdown_tx.send(true) {
84 warn!(error = %e, "shutdown watch receivers all dropped");
85 }
86 return true;
87 }
88 false
89 }
90
91 pub async fn run(self, mut cancel: watch::Receiver<bool>) {
95 let mut tick = interval(self.poll_interval);
96 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
97 loop {
98 tokio::select! {
99 biased;
100 changed = cancel.changed() => {
101 if changed.is_ok() && *cancel.borrow() {
102 return;
103 }
104 }
105 _ = tick.tick() => {
106 if self.check_once() {
107 return;
108 }
109 }
110 }
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use crate::topology::NodeInfo;
119 use std::net::SocketAddr;
120
121 fn topo_with(node_id: u64, state: NodeState) -> Arc<RwLock<ClusterTopology>> {
122 let mut t = ClusterTopology::new();
123 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
124 t.add_node(NodeInfo::new(node_id, addr, state));
125 Arc::new(RwLock::new(t))
126 }
127
128 #[test]
129 fn check_once_does_not_fire_while_active() {
130 let topo = topo_with(5, NodeState::Active);
131 let (obs, _rx) = DecommissionObserver::new(topo, 5, Duration::from_millis(10));
132 assert!(!obs.check_once());
133 }
134
135 #[test]
136 fn check_once_fires_on_decommissioned_state() {
137 let topo = topo_with(5, NodeState::Active);
138 let (obs, mut rx) = DecommissionObserver::new(topo.clone(), 5, Duration::from_millis(10));
139 assert!(!obs.check_once());
140 topo.write()
141 .unwrap()
142 .set_state(5, NodeState::Decommissioned);
143 assert!(obs.check_once());
144 assert!(*rx.borrow_and_update());
145 }
146
147 #[test]
148 fn check_once_fires_when_node_removed_from_topology() {
149 let topo = topo_with(5, NodeState::Active);
150 let (obs, _rx) = DecommissionObserver::new(topo.clone(), 5, Duration::from_millis(10));
151 topo.write().unwrap().remove_node(5);
152 assert!(obs.check_once());
153 }
154
155 #[test]
156 fn check_once_is_idempotent_after_firing() {
157 let topo = topo_with(5, NodeState::Decommissioned);
158 let (obs, _rx) = DecommissionObserver::new(topo, 5, Duration::from_millis(10));
159 assert!(obs.check_once());
160 assert!(obs.check_once());
162 }
163
164 #[tokio::test(start_paused = true)]
165 async fn run_loop_fires_shutdown_and_exits() {
166 let topo = topo_with(5, NodeState::Active);
167 let (obs, mut rx) = DecommissionObserver::new(topo.clone(), 5, Duration::from_millis(50));
168 let (_cancel_tx, cancel_rx) = watch::channel(false);
169 let handle = tokio::spawn(async move { obs.run(cancel_rx).await });
170
171 tokio::time::advance(Duration::from_millis(60)).await;
173 tokio::task::yield_now().await;
174 topo.write()
175 .unwrap()
176 .set_state(5, NodeState::Decommissioned);
177 tokio::time::advance(Duration::from_millis(60)).await;
178 tokio::task::yield_now().await;
179
180 let _ = tokio::time::timeout(Duration::from_millis(500), handle)
181 .await
182 .expect("observer run loop did not exit");
183 assert!(*rx.borrow_and_update());
184 }
185
186 #[tokio::test(start_paused = true)]
187 async fn run_loop_exits_on_cancel_without_firing() {
188 let topo = topo_with(5, NodeState::Active);
189 let (obs, rx) = DecommissionObserver::new(topo, 5, Duration::from_millis(50));
190 let (cancel_tx, cancel_rx) = watch::channel(false);
191 let handle = tokio::spawn(async move { obs.run(cancel_rx).await });
192 let _ = cancel_tx.send(true);
193 let _ = tokio::time::timeout(Duration::from_millis(500), handle)
194 .await
195 .expect("cancel did not end run loop");
196 assert!(!*rx.borrow());
197 }
198}