Skip to main content

nodedb_cluster/decommission/
observer.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `DecommissionObserver` — local-node self-shutdown signal.
4//!
5//! The coordinator proposes a full decommission plan through the
6//! metadata Raft group. Every node (including the target itself)
7//! applies the resulting entries through `CacheApplier`, which, when
8//! attached with [`CacheApplier::with_live_state`](crate::metadata_group::CacheApplier::with_live_state),
9//! cascades topology state transitions into the live
10//! `Arc<RwLock<ClusterTopology>>` handle.
11//!
12//! The observer polls that handle for the *local* node id. Once the
13//! node's own state reaches `Decommissioned` — or the node has been
14//! removed from topology entirely by a committed `Leave` — the
15//! observer flips a `tokio::sync::watch` channel to `true`, which is
16//! the cooperative shutdown signal every long-lived background task
17//! on this node is already listening on.
18//!
19//! This is the last link in the decommission chain: once the watch
20//! is flipped, the raft loops, SWIM detector, reachability driver,
21//! and transport accept loops all drain and exit on their own.
22
23use std::sync::{Arc, RwLock};
24use std::time::Duration;
25
26use tokio::sync::watch;
27use tokio::time::interval;
28use tracing::{info, warn};
29
30use crate::topology::{ClusterTopology, NodeState};
31
32/// Periodically checks the local node's topology state and fires a
33/// shutdown signal on `Decommissioned` or removal.
34pub struct DecommissionObserver {
35    topology: Arc<RwLock<ClusterTopology>>,
36    local_node_id: u64,
37    shutdown_tx: watch::Sender<bool>,
38    poll_interval: Duration,
39}
40
41impl DecommissionObserver {
42    /// Build an observer and return it alongside the receiver half of
43    /// its shutdown watch channel. Every subsystem that wants to
44    /// cooperatively drain on decommission can call
45    /// [`watch::Receiver::clone`] on the returned receiver.
46    pub fn new(
47        topology: Arc<RwLock<ClusterTopology>>,
48        local_node_id: u64,
49        poll_interval: Duration,
50    ) -> (Self, watch::Receiver<bool>) {
51        let (shutdown_tx, shutdown_rx) = watch::channel(false);
52        (
53            Self {
54                topology,
55                local_node_id,
56                shutdown_tx,
57                poll_interval,
58            },
59            shutdown_rx,
60        )
61    }
62
63    /// Single check. Returns `true` iff the observer fired the
64    /// shutdown signal during this call (or had already fired it
65    /// previously — the watch is level-triggered, not edge).
66    pub fn check_once(&self) -> bool {
67        if *self.shutdown_tx.borrow() {
68            return true;
69        }
70        let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
71        let should_fire = match topo.get_node(self.local_node_id) {
72            Some(node) => node.state == NodeState::Decommissioned,
73            // Node is gone from topology — either a committed `Leave`
74            // (post-decommission) or manual removal. Either way, we
75            // are no longer part of the cluster.
76            None => true,
77        };
78        if should_fire {
79            info!(
80                local_node_id = self.local_node_id,
81                "decommission observer firing local shutdown signal"
82            );
83            if let Err(e) = self.shutdown_tx.send(true) {
84                warn!(error = %e, "shutdown watch receivers all dropped");
85            }
86            return true;
87        }
88        false
89    }
90
91    /// Run the observer's poll loop until `cancel` flips to `true`.
92    /// Exits immediately after firing its own shutdown signal —
93    /// there is nothing more to watch.
94    pub async fn run(self, mut cancel: watch::Receiver<bool>) {
95        let mut tick = interval(self.poll_interval);
96        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
97        loop {
98            tokio::select! {
99                biased;
100                changed = cancel.changed() => {
101                    if changed.is_ok() && *cancel.borrow() {
102                        return;
103                    }
104                }
105                _ = tick.tick() => {
106                    if self.check_once() {
107                        return;
108                    }
109                }
110            }
111        }
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use crate::topology::NodeInfo;
119    use std::net::SocketAddr;
120
121    fn topo_with(node_id: u64, state: NodeState) -> Arc<RwLock<ClusterTopology>> {
122        let mut t = ClusterTopology::new();
123        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
124        t.add_node(NodeInfo::new(node_id, addr, state));
125        Arc::new(RwLock::new(t))
126    }
127
128    #[test]
129    fn check_once_does_not_fire_while_active() {
130        let topo = topo_with(5, NodeState::Active);
131        let (obs, _rx) = DecommissionObserver::new(topo, 5, Duration::from_millis(10));
132        assert!(!obs.check_once());
133    }
134
135    #[test]
136    fn check_once_fires_on_decommissioned_state() {
137        let topo = topo_with(5, NodeState::Active);
138        let (obs, mut rx) = DecommissionObserver::new(topo.clone(), 5, Duration::from_millis(10));
139        assert!(!obs.check_once());
140        topo.write()
141            .unwrap()
142            .set_state(5, NodeState::Decommissioned);
143        assert!(obs.check_once());
144        assert!(*rx.borrow_and_update());
145    }
146
147    #[test]
148    fn check_once_fires_when_node_removed_from_topology() {
149        let topo = topo_with(5, NodeState::Active);
150        let (obs, _rx) = DecommissionObserver::new(topo.clone(), 5, Duration::from_millis(10));
151        topo.write().unwrap().remove_node(5);
152        assert!(obs.check_once());
153    }
154
155    #[test]
156    fn check_once_is_idempotent_after_firing() {
157        let topo = topo_with(5, NodeState::Decommissioned);
158        let (obs, _rx) = DecommissionObserver::new(topo, 5, Duration::from_millis(10));
159        assert!(obs.check_once());
160        // Second call sees the fired signal and reports true again.
161        assert!(obs.check_once());
162    }
163
164    #[tokio::test(start_paused = true)]
165    async fn run_loop_fires_shutdown_and_exits() {
166        let topo = topo_with(5, NodeState::Active);
167        let (obs, mut rx) = DecommissionObserver::new(topo.clone(), 5, Duration::from_millis(50));
168        let (_cancel_tx, cancel_rx) = watch::channel(false);
169        let handle = tokio::spawn(async move { obs.run(cancel_rx).await });
170
171        // Advance twice — first tick = no-op, then flip state.
172        tokio::time::advance(Duration::from_millis(60)).await;
173        tokio::task::yield_now().await;
174        topo.write()
175            .unwrap()
176            .set_state(5, NodeState::Decommissioned);
177        tokio::time::advance(Duration::from_millis(60)).await;
178        tokio::task::yield_now().await;
179
180        let _ = tokio::time::timeout(Duration::from_millis(500), handle)
181            .await
182            .expect("observer run loop did not exit");
183        assert!(*rx.borrow_and_update());
184    }
185
186    #[tokio::test(start_paused = true)]
187    async fn run_loop_exits_on_cancel_without_firing() {
188        let topo = topo_with(5, NodeState::Active);
189        let (obs, rx) = DecommissionObserver::new(topo, 5, Duration::from_millis(50));
190        let (cancel_tx, cancel_rx) = watch::channel(false);
191        let handle = tokio::spawn(async move { obs.run(cancel_rx).await });
192        let _ = cancel_tx.send(true);
193        let _ = tokio::time::timeout(Duration::from_millis(500), handle)
194            .await
195            .expect("cancel did not end run loop");
196        assert!(!*rx.borrow());
197    }
198}