nodedb_cluster/reachability/
driver.rs1use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use tokio::sync::watch;
19use tokio::time::{MissedTickBehavior, interval};
20use tracing::{debug, trace};
21
22use crate::circuit_breaker::CircuitBreaker;
23use crate::loop_metrics::LoopMetrics;
24
25use super::prober::ReachabilityProber;
26
27#[derive(Debug, Clone)]
29pub struct ReachabilityDriverConfig {
30 pub interval: Duration,
33}
34
35impl Default for ReachabilityDriverConfig {
36 fn default() -> Self {
37 Self {
38 interval: Duration::from_secs(30),
39 }
40 }
41}
42
43pub struct ReachabilityDriver {
45 breaker: Arc<CircuitBreaker>,
46 prober: Arc<dyn ReachabilityProber>,
47 cfg: ReachabilityDriverConfig,
48 loop_metrics: Arc<LoopMetrics>,
49}
50
51impl ReachabilityDriver {
52 pub fn new(
53 breaker: Arc<CircuitBreaker>,
54 prober: Arc<dyn ReachabilityProber>,
55 cfg: ReachabilityDriverConfig,
56 ) -> Self {
57 Self {
58 breaker,
59 prober,
60 cfg,
61 loop_metrics: LoopMetrics::new("reachability_loop"),
62 }
63 }
64
65 pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
67 Arc::clone(&self.loop_metrics)
68 }
69
70 pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
72 let mut tick = interval(self.cfg.interval);
73 tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
77 tick.tick().await;
78 self.loop_metrics.set_up(true);
79 loop {
80 tokio::select! {
81 biased;
82 changed = shutdown.changed() => {
83 if changed.is_ok() && *shutdown.borrow() {
84 break;
85 }
86 }
87 _ = tick.tick() => {
88 self.sweep_once().await;
89 }
90 }
91 }
92 self.loop_metrics.set_up(false);
93 debug!("reachability driver shutting down");
94 }
95
96 pub async fn sweep_once(&self) {
98 let started = Instant::now();
99 let open = self.breaker.open_peers();
100 if open.is_empty() {
101 self.loop_metrics.observe(started.elapsed());
102 return;
103 }
104 trace!(count = open.len(), "reachability sweep: probing open peers");
105 for peer in open {
106 let prober = Arc::clone(&self.prober);
107 let err_metrics = Arc::clone(&self.loop_metrics);
108 tokio::spawn(async move {
109 if let Err(e) = prober.probe(peer).await {
110 tracing::debug!(peer, error = %e, "reachability probe failed");
111 err_metrics.record_error("probe");
112 }
113 });
114 }
115 self.loop_metrics.observe(started.elapsed());
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use crate::circuit_breaker::CircuitBreakerConfig;
123 use async_trait::async_trait;
124 use std::sync::Mutex;
125
126 struct RecordingProber {
127 calls: Mutex<Vec<u64>>,
128 }
129
130 impl RecordingProber {
131 fn new() -> Arc<Self> {
132 Arc::new(Self {
133 calls: Mutex::new(Vec::new()),
134 })
135 }
136 fn take(&self) -> Vec<u64> {
137 let mut g = self.calls.lock().unwrap();
138 let out = g.clone();
139 g.clear();
140 out
141 }
142 }
143
144 #[async_trait]
145 impl ReachabilityProber for RecordingProber {
146 async fn probe(&self, peer: u64) -> Result<(), crate::error::ClusterError> {
147 self.calls.lock().unwrap().push(peer);
148 Ok(())
149 }
150 }
151
152 fn open_breaker() -> Arc<CircuitBreaker> {
153 Arc::new(CircuitBreaker::new(CircuitBreakerConfig {
154 failure_threshold: 1,
155 cooldown: Duration::from_secs(60),
156 }))
157 }
158
159 #[tokio::test]
160 async fn sweep_probes_every_open_peer() {
161 let breaker = open_breaker();
162 breaker.record_failure(1);
163 breaker.record_failure(2);
164 breaker.record_failure(3);
165
166 let prober = RecordingProber::new();
167 let driver = Arc::new(ReachabilityDriver::new(
168 Arc::clone(&breaker),
169 prober.clone() as Arc<dyn ReachabilityProber>,
170 ReachabilityDriverConfig {
171 interval: Duration::from_millis(50),
172 },
173 ));
174 driver.sweep_once().await;
175 for _ in 0..8 {
177 tokio::task::yield_now().await;
178 }
179 let mut calls = prober.take();
180 calls.sort_unstable();
181 assert_eq!(calls, vec![1, 2, 3]);
182 }
183
184 #[tokio::test]
185 async fn sweep_skips_closed_peers() {
186 let breaker = open_breaker();
187 breaker.record_success(1); breaker.record_failure(2); let prober = RecordingProber::new();
190 let driver = Arc::new(ReachabilityDriver::new(
191 Arc::clone(&breaker),
192 prober.clone() as Arc<dyn ReachabilityProber>,
193 ReachabilityDriverConfig::default(),
194 ));
195 driver.sweep_once().await;
196 for _ in 0..8 {
197 tokio::task::yield_now().await;
198 }
199 assert_eq!(prober.take(), vec![2]);
200 }
201
202 #[tokio::test(start_paused = true)]
203 async fn run_loop_fires_sweeps_on_interval_and_shuts_down() {
204 let breaker = open_breaker();
205 breaker.record_failure(7);
206 let prober = RecordingProber::new();
207 let driver = Arc::new(ReachabilityDriver::new(
208 Arc::clone(&breaker),
209 prober.clone() as Arc<dyn ReachabilityProber>,
210 ReachabilityDriverConfig {
211 interval: Duration::from_millis(100),
212 },
213 ));
214 let (tx, rx) = watch::channel(false);
215 let handle = tokio::spawn({
216 let d = Arc::clone(&driver);
217 async move { d.run(rx).await }
218 });
219
220 tokio::time::advance(Duration::from_millis(120)).await;
222 tokio::task::yield_now().await;
223 tokio::time::advance(Duration::from_millis(120)).await;
224 tokio::task::yield_now().await;
225 for _ in 0..16 {
226 tokio::task::yield_now().await;
227 }
228
229 assert!(
230 !prober.take().is_empty(),
231 "driver never probed in run-loop mode"
232 );
233
234 let _ = tx.send(true);
235 let _ = tokio::time::timeout(Duration::from_millis(500), handle).await;
236 }
237}