Skip to main content

nodedb_cluster/reachability/
driver.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! [`ReachabilityDriver`] — periodic open-breaker probe loop.
4//!
5//! Every `interval`, the driver asks the shared [`CircuitBreaker`]
6//! for its currently-Open peer set and fires a probe at each via the
7//! injected [`ReachabilityProber`]. Probes run in parallel via
8//! `tokio::spawn` so a slow peer never blocks the next one. Probe
9//! results are intentionally ignored: the production `TransportProber`
10//! routes through `NexarTransport::send_rpc`, which already walks the
11//! circuit breaker's `check → record_success|record_failure` path, so
12//! the driver does not need to bookkeep anything itself.
13//!
14//! Shutdown is cooperative via `tokio::sync::watch`. On `true` the
15//! run loop breaks at the next tick or immediately if it is waiting.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use tokio::sync::watch;
21use tokio::time::{MissedTickBehavior, interval};
22use tracing::{debug, trace};
23
24use crate::circuit_breaker::CircuitBreaker;
25use crate::loop_metrics::LoopMetrics;
26
27use super::prober::ReachabilityProber;
28
29/// Configuration for the reachability driver.
30#[derive(Debug, Clone)]
31pub struct ReachabilityDriverConfig {
32    /// Period between open-peer sweeps. Defaults to 30 s in
33    /// production; tests override to milliseconds.
34    pub interval: Duration,
35}
36
37impl Default for ReachabilityDriverConfig {
38    fn default() -> Self {
39        Self {
40            interval: Duration::from_secs(30),
41        }
42    }
43}
44
45/// Drives periodic reachability probes against every Open-state peer.
46pub struct ReachabilityDriver {
47    breaker: Arc<CircuitBreaker>,
48    prober: Arc<dyn ReachabilityProber>,
49    cfg: ReachabilityDriverConfig,
50    loop_metrics: Arc<LoopMetrics>,
51}
52
53impl ReachabilityDriver {
54    pub fn new(
55        breaker: Arc<CircuitBreaker>,
56        prober: Arc<dyn ReachabilityProber>,
57        cfg: ReachabilityDriverConfig,
58    ) -> Self {
59        Self {
60            breaker,
61            prober,
62            cfg,
63            loop_metrics: LoopMetrics::new("reachability_loop"),
64        }
65    }
66
67    /// Shared handle to this loop's standardized metrics.
68    pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
69        Arc::clone(&self.loop_metrics)
70    }
71
72    /// Run the driver until `shutdown` flips to `true`.
73    pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
74        let mut tick = interval(self.cfg.interval);
75        // Skip the immediate first tick so the first probe fires one
76        // full interval after start. Otherwise every process restart
77        // would stampede every open breaker at once.
78        tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
79        tick.tick().await;
80        self.loop_metrics.set_up(true);
81        loop {
82            tokio::select! {
83                biased;
84                changed = shutdown.changed() => {
85                    if changed.is_ok() && *shutdown.borrow() {
86                        break;
87                    }
88                }
89                _ = tick.tick() => {
90                    self.sweep_once().await;
91                }
92            }
93        }
94        self.loop_metrics.set_up(false);
95        debug!("reachability driver shutting down");
96    }
97
98    /// Single sweep — exposed for tests that drive the loop manually.
99    pub async fn sweep_once(&self) {
100        let started = Instant::now();
101        let open = self.breaker.open_peers();
102        if open.is_empty() {
103            self.loop_metrics.observe(started.elapsed());
104            return;
105        }
106        trace!(count = open.len(), "reachability sweep: probing open peers");
107        for peer in open {
108            let prober = Arc::clone(&self.prober);
109            let err_metrics = Arc::clone(&self.loop_metrics);
110            tokio::spawn(async move {
111                if let Err(e) = prober.probe(peer).await {
112                    tracing::debug!(peer, error = %e, "reachability probe failed");
113                    err_metrics.record_error("probe");
114                }
115            });
116        }
117        self.loop_metrics.observe(started.elapsed());
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use crate::circuit_breaker::CircuitBreakerConfig;
125    use async_trait::async_trait;
126    use std::sync::Mutex;
127
128    struct RecordingProber {
129        calls: Mutex<Vec<u64>>,
130    }
131
132    impl RecordingProber {
133        fn new() -> Arc<Self> {
134            Arc::new(Self {
135                calls: Mutex::new(Vec::new()),
136            })
137        }
138        fn take(&self) -> Vec<u64> {
139            let mut g = self.calls.lock().unwrap();
140            let out = g.clone();
141            g.clear();
142            out
143        }
144    }
145
146    #[async_trait]
147    impl ReachabilityProber for RecordingProber {
148        async fn probe(&self, peer: u64) -> Result<(), crate::error::ClusterError> {
149            self.calls.lock().unwrap().push(peer);
150            Ok(())
151        }
152    }
153
154    fn open_breaker() -> Arc<CircuitBreaker> {
155        Arc::new(CircuitBreaker::new(CircuitBreakerConfig {
156            failure_threshold: 1,
157            cooldown: Duration::from_secs(60),
158        }))
159    }
160
161    #[tokio::test]
162    async fn sweep_probes_every_open_peer() {
163        let breaker = open_breaker();
164        breaker.record_failure(1);
165        breaker.record_failure(2);
166        breaker.record_failure(3);
167
168        let prober = RecordingProber::new();
169        let driver = Arc::new(ReachabilityDriver::new(
170            Arc::clone(&breaker),
171            prober.clone() as Arc<dyn ReachabilityProber>,
172            ReachabilityDriverConfig {
173                interval: Duration::from_millis(50),
174            },
175        ));
176        driver.sweep_once().await;
177        // Let spawned probe tasks run.
178        for _ in 0..8 {
179            tokio::task::yield_now().await;
180        }
181        let mut calls = prober.take();
182        calls.sort_unstable();
183        assert_eq!(calls, vec![1, 2, 3]);
184    }
185
186    #[tokio::test]
187    async fn sweep_skips_closed_peers() {
188        let breaker = open_breaker();
189        breaker.record_success(1); // Registers 1 as Closed.
190        breaker.record_failure(2); // Opens 2.
191        let prober = RecordingProber::new();
192        let driver = Arc::new(ReachabilityDriver::new(
193            Arc::clone(&breaker),
194            prober.clone() as Arc<dyn ReachabilityProber>,
195            ReachabilityDriverConfig::default(),
196        ));
197        driver.sweep_once().await;
198        for _ in 0..8 {
199            tokio::task::yield_now().await;
200        }
201        assert_eq!(prober.take(), vec![2]);
202    }
203
204    #[tokio::test(start_paused = true)]
205    async fn run_loop_fires_sweeps_on_interval_and_shuts_down() {
206        let breaker = open_breaker();
207        breaker.record_failure(7);
208        let prober = RecordingProber::new();
209        let driver = Arc::new(ReachabilityDriver::new(
210            Arc::clone(&breaker),
211            prober.clone() as Arc<dyn ReachabilityProber>,
212            ReachabilityDriverConfig {
213                interval: Duration::from_millis(100),
214            },
215        ));
216        let (tx, rx) = watch::channel(false);
217        let handle = tokio::spawn({
218            let d = Arc::clone(&driver);
219            async move { d.run(rx).await }
220        });
221
222        // First tick is skipped, second delivers a sweep.
223        tokio::time::advance(Duration::from_millis(120)).await;
224        tokio::task::yield_now().await;
225        tokio::time::advance(Duration::from_millis(120)).await;
226        tokio::task::yield_now().await;
227        for _ in 0..16 {
228            tokio::task::yield_now().await;
229        }
230
231        assert!(
232            !prober.take().is_empty(),
233            "driver never probed in run-loop mode"
234        );
235
236        let _ = tx.send(true);
237        let _ = tokio::time::timeout(Duration::from_millis(500), handle).await;
238    }
239}