nodedb_cluster/reachability/
driver.rs1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use tokio::sync::watch;
21use tokio::time::{MissedTickBehavior, interval};
22use tracing::{debug, trace};
23
24use crate::circuit_breaker::CircuitBreaker;
25use crate::loop_metrics::LoopMetrics;
26
27use super::prober::ReachabilityProber;
28
29#[derive(Debug, Clone)]
31pub struct ReachabilityDriverConfig {
32 pub interval: Duration,
35}
36
37impl Default for ReachabilityDriverConfig {
38 fn default() -> Self {
39 Self {
40 interval: Duration::from_secs(30),
41 }
42 }
43}
44
45pub struct ReachabilityDriver {
47 breaker: Arc<CircuitBreaker>,
48 prober: Arc<dyn ReachabilityProber>,
49 cfg: ReachabilityDriverConfig,
50 loop_metrics: Arc<LoopMetrics>,
51}
52
53impl ReachabilityDriver {
54 pub fn new(
55 breaker: Arc<CircuitBreaker>,
56 prober: Arc<dyn ReachabilityProber>,
57 cfg: ReachabilityDriverConfig,
58 ) -> Self {
59 Self {
60 breaker,
61 prober,
62 cfg,
63 loop_metrics: LoopMetrics::new("reachability_loop"),
64 }
65 }
66
67 pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
69 Arc::clone(&self.loop_metrics)
70 }
71
72 pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
74 let mut tick = interval(self.cfg.interval);
75 tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
79 tick.tick().await;
80 self.loop_metrics.set_up(true);
81 loop {
82 tokio::select! {
83 biased;
84 changed = shutdown.changed() => {
85 if changed.is_ok() && *shutdown.borrow() {
86 break;
87 }
88 }
89 _ = tick.tick() => {
90 self.sweep_once().await;
91 }
92 }
93 }
94 self.loop_metrics.set_up(false);
95 debug!("reachability driver shutting down");
96 }
97
98 pub async fn sweep_once(&self) {
100 let started = Instant::now();
101 let open = self.breaker.open_peers();
102 if open.is_empty() {
103 self.loop_metrics.observe(started.elapsed());
104 return;
105 }
106 trace!(count = open.len(), "reachability sweep: probing open peers");
107 for peer in open {
108 let prober = Arc::clone(&self.prober);
109 let err_metrics = Arc::clone(&self.loop_metrics);
110 tokio::spawn(async move {
111 if let Err(e) = prober.probe(peer).await {
112 tracing::debug!(peer, error = %e, "reachability probe failed");
113 err_metrics.record_error("probe");
114 }
115 });
116 }
117 self.loop_metrics.observe(started.elapsed());
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124 use crate::circuit_breaker::CircuitBreakerConfig;
125 use async_trait::async_trait;
126 use std::sync::Mutex;
127
128 struct RecordingProber {
129 calls: Mutex<Vec<u64>>,
130 }
131
132 impl RecordingProber {
133 fn new() -> Arc<Self> {
134 Arc::new(Self {
135 calls: Mutex::new(Vec::new()),
136 })
137 }
138 fn take(&self) -> Vec<u64> {
139 let mut g = self.calls.lock().unwrap();
140 let out = g.clone();
141 g.clear();
142 out
143 }
144 }
145
146 #[async_trait]
147 impl ReachabilityProber for RecordingProber {
148 async fn probe(&self, peer: u64) -> Result<(), crate::error::ClusterError> {
149 self.calls.lock().unwrap().push(peer);
150 Ok(())
151 }
152 }
153
154 fn open_breaker() -> Arc<CircuitBreaker> {
155 Arc::new(CircuitBreaker::new(CircuitBreakerConfig {
156 failure_threshold: 1,
157 cooldown: Duration::from_secs(60),
158 }))
159 }
160
161 #[tokio::test]
162 async fn sweep_probes_every_open_peer() {
163 let breaker = open_breaker();
164 breaker.record_failure(1);
165 breaker.record_failure(2);
166 breaker.record_failure(3);
167
168 let prober = RecordingProber::new();
169 let driver = Arc::new(ReachabilityDriver::new(
170 Arc::clone(&breaker),
171 prober.clone() as Arc<dyn ReachabilityProber>,
172 ReachabilityDriverConfig {
173 interval: Duration::from_millis(50),
174 },
175 ));
176 driver.sweep_once().await;
177 for _ in 0..8 {
179 tokio::task::yield_now().await;
180 }
181 let mut calls = prober.take();
182 calls.sort_unstable();
183 assert_eq!(calls, vec![1, 2, 3]);
184 }
185
186 #[tokio::test]
187 async fn sweep_skips_closed_peers() {
188 let breaker = open_breaker();
189 breaker.record_success(1); breaker.record_failure(2); let prober = RecordingProber::new();
192 let driver = Arc::new(ReachabilityDriver::new(
193 Arc::clone(&breaker),
194 prober.clone() as Arc<dyn ReachabilityProber>,
195 ReachabilityDriverConfig::default(),
196 ));
197 driver.sweep_once().await;
198 for _ in 0..8 {
199 tokio::task::yield_now().await;
200 }
201 assert_eq!(prober.take(), vec![2]);
202 }
203
204 #[tokio::test(start_paused = true)]
205 async fn run_loop_fires_sweeps_on_interval_and_shuts_down() {
206 let breaker = open_breaker();
207 breaker.record_failure(7);
208 let prober = RecordingProber::new();
209 let driver = Arc::new(ReachabilityDriver::new(
210 Arc::clone(&breaker),
211 prober.clone() as Arc<dyn ReachabilityProber>,
212 ReachabilityDriverConfig {
213 interval: Duration::from_millis(100),
214 },
215 ));
216 let (tx, rx) = watch::channel(false);
217 let handle = tokio::spawn({
218 let d = Arc::clone(&driver);
219 async move { d.run(rx).await }
220 });
221
222 tokio::time::advance(Duration::from_millis(120)).await;
224 tokio::task::yield_now().await;
225 tokio::time::advance(Duration::from_millis(120)).await;
226 tokio::task::yield_now().await;
227 for _ in 0..16 {
228 tokio::task::yield_now().await;
229 }
230
231 assert!(
232 !prober.take().is_empty(),
233 "driver never probed in run-loop mode"
234 );
235
236 let _ = tx.send(true);
237 let _ = tokio::time::timeout(Duration::from_millis(500), handle).await;
238 }
239}