Skip to main content

nodedb_cluster/reachability/
driver.rs

1//! [`ReachabilityDriver`] — periodic open-breaker probe loop.
2//!
3//! Every `interval`, the driver asks the shared [`CircuitBreaker`]
4//! for its currently-Open peer set and fires a probe at each via the
5//! injected [`ReachabilityProber`]. Probes run in parallel via
6//! `tokio::spawn` so a slow peer never blocks the next one. Probe
7//! results are intentionally ignored: the production `TransportProber`
8//! routes through `NexarTransport::send_rpc`, which already walks the
9//! circuit breaker's `check → record_success|record_failure` path, so
10//! the driver does not need to bookkeep anything itself.
11//!
12//! Shutdown is cooperative via `tokio::sync::watch`. On `true` the
13//! run loop breaks at the next tick or immediately if it is waiting.
14
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use tokio::sync::watch;
19use tokio::time::{MissedTickBehavior, interval};
20use tracing::{debug, trace};
21
22use crate::circuit_breaker::CircuitBreaker;
23use crate::loop_metrics::LoopMetrics;
24
25use super::prober::ReachabilityProber;
26
27/// Configuration for the reachability driver.
28#[derive(Debug, Clone)]
29pub struct ReachabilityDriverConfig {
30    /// Period between open-peer sweeps. Defaults to 30 s in
31    /// production; tests override to milliseconds.
32    pub interval: Duration,
33}
34
35impl Default for ReachabilityDriverConfig {
36    fn default() -> Self {
37        Self {
38            interval: Duration::from_secs(30),
39        }
40    }
41}
42
43/// Drives periodic reachability probes against every Open-state peer.
44pub struct ReachabilityDriver {
45    breaker: Arc<CircuitBreaker>,
46    prober: Arc<dyn ReachabilityProber>,
47    cfg: ReachabilityDriverConfig,
48    loop_metrics: Arc<LoopMetrics>,
49}
50
51impl ReachabilityDriver {
52    pub fn new(
53        breaker: Arc<CircuitBreaker>,
54        prober: Arc<dyn ReachabilityProber>,
55        cfg: ReachabilityDriverConfig,
56    ) -> Self {
57        Self {
58            breaker,
59            prober,
60            cfg,
61            loop_metrics: LoopMetrics::new("reachability_loop"),
62        }
63    }
64
65    /// Shared handle to this loop's standardized metrics.
66    pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
67        Arc::clone(&self.loop_metrics)
68    }
69
70    /// Run the driver until `shutdown` flips to `true`.
71    pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
72        let mut tick = interval(self.cfg.interval);
73        // Skip the immediate first tick so the first probe fires one
74        // full interval after start. Otherwise every process restart
75        // would stampede every open breaker at once.
76        tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
77        tick.tick().await;
78        self.loop_metrics.set_up(true);
79        loop {
80            tokio::select! {
81                biased;
82                changed = shutdown.changed() => {
83                    if changed.is_ok() && *shutdown.borrow() {
84                        break;
85                    }
86                }
87                _ = tick.tick() => {
88                    self.sweep_once().await;
89                }
90            }
91        }
92        self.loop_metrics.set_up(false);
93        debug!("reachability driver shutting down");
94    }
95
96    /// Single sweep — exposed for tests that drive the loop manually.
97    pub async fn sweep_once(&self) {
98        let started = Instant::now();
99        let open = self.breaker.open_peers();
100        if open.is_empty() {
101            self.loop_metrics.observe(started.elapsed());
102            return;
103        }
104        trace!(count = open.len(), "reachability sweep: probing open peers");
105        for peer in open {
106            let prober = Arc::clone(&self.prober);
107            let err_metrics = Arc::clone(&self.loop_metrics);
108            tokio::spawn(async move {
109                if let Err(e) = prober.probe(peer).await {
110                    tracing::debug!(peer, error = %e, "reachability probe failed");
111                    err_metrics.record_error("probe");
112                }
113            });
114        }
115        self.loop_metrics.observe(started.elapsed());
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use crate::circuit_breaker::CircuitBreakerConfig;
123    use async_trait::async_trait;
124    use std::sync::Mutex;
125
126    struct RecordingProber {
127        calls: Mutex<Vec<u64>>,
128    }
129
130    impl RecordingProber {
131        fn new() -> Arc<Self> {
132            Arc::new(Self {
133                calls: Mutex::new(Vec::new()),
134            })
135        }
136        fn take(&self) -> Vec<u64> {
137            let mut g = self.calls.lock().unwrap();
138            let out = g.clone();
139            g.clear();
140            out
141        }
142    }
143
144    #[async_trait]
145    impl ReachabilityProber for RecordingProber {
146        async fn probe(&self, peer: u64) -> Result<(), crate::error::ClusterError> {
147            self.calls.lock().unwrap().push(peer);
148            Ok(())
149        }
150    }
151
152    fn open_breaker() -> Arc<CircuitBreaker> {
153        Arc::new(CircuitBreaker::new(CircuitBreakerConfig {
154            failure_threshold: 1,
155            cooldown: Duration::from_secs(60),
156        }))
157    }
158
159    #[tokio::test]
160    async fn sweep_probes_every_open_peer() {
161        let breaker = open_breaker();
162        breaker.record_failure(1);
163        breaker.record_failure(2);
164        breaker.record_failure(3);
165
166        let prober = RecordingProber::new();
167        let driver = Arc::new(ReachabilityDriver::new(
168            Arc::clone(&breaker),
169            prober.clone() as Arc<dyn ReachabilityProber>,
170            ReachabilityDriverConfig {
171                interval: Duration::from_millis(50),
172            },
173        ));
174        driver.sweep_once().await;
175        // Let spawned probe tasks run.
176        for _ in 0..8 {
177            tokio::task::yield_now().await;
178        }
179        let mut calls = prober.take();
180        calls.sort_unstable();
181        assert_eq!(calls, vec![1, 2, 3]);
182    }
183
184    #[tokio::test]
185    async fn sweep_skips_closed_peers() {
186        let breaker = open_breaker();
187        breaker.record_success(1); // Registers 1 as Closed.
188        breaker.record_failure(2); // Opens 2.
189        let prober = RecordingProber::new();
190        let driver = Arc::new(ReachabilityDriver::new(
191            Arc::clone(&breaker),
192            prober.clone() as Arc<dyn ReachabilityProber>,
193            ReachabilityDriverConfig::default(),
194        ));
195        driver.sweep_once().await;
196        for _ in 0..8 {
197            tokio::task::yield_now().await;
198        }
199        assert_eq!(prober.take(), vec![2]);
200    }
201
202    #[tokio::test(start_paused = true)]
203    async fn run_loop_fires_sweeps_on_interval_and_shuts_down() {
204        let breaker = open_breaker();
205        breaker.record_failure(7);
206        let prober = RecordingProber::new();
207        let driver = Arc::new(ReachabilityDriver::new(
208            Arc::clone(&breaker),
209            prober.clone() as Arc<dyn ReachabilityProber>,
210            ReachabilityDriverConfig {
211                interval: Duration::from_millis(100),
212            },
213        ));
214        let (tx, rx) = watch::channel(false);
215        let handle = tokio::spawn({
216            let d = Arc::clone(&driver);
217            async move { d.run(rx).await }
218        });
219
220        // First tick is skipped, second delivers a sweep.
221        tokio::time::advance(Duration::from_millis(120)).await;
222        tokio::task::yield_now().await;
223        tokio::time::advance(Duration::from_millis(120)).await;
224        tokio::task::yield_now().await;
225        for _ in 0..16 {
226            tokio::task::yield_now().await;
227        }
228
229        assert!(
230            !prober.take().is_empty(),
231            "driver never probed in run-loop mode"
232        );
233
234        let _ = tx.send(true);
235        let _ = tokio::time::timeout(Duration::from_millis(500), handle).await;
236    }
237}