moonpool_transport/rpc/
failure_monitor.rs1use std::cell::RefCell;
16use std::collections::BTreeMap;
17use std::future::Future;
18use std::rc::Rc;
19use std::task::{Poll, Waker};
20
21use crate::Endpoint;
22
23const MAX_FAILED_ENDPOINTS: usize = 100_000;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum FailureStatus {
34 Available,
36 Failed,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum FailedReason {
46 NotFound,
48}
49
50pub struct FailureMonitor {
59 inner: RefCell<FailureMonitorInner>,
60}
61
62struct FailureMonitorInner {
63 address_status: BTreeMap<String, FailureStatus>,
65 failed_endpoints: BTreeMap<Endpoint, FailedReason>,
67 endpoint_watchers: BTreeMap<String, Vec<Waker>>,
70 disconnect_watchers: BTreeMap<String, Vec<Waker>>,
73}
74
75impl FailureMonitor {
76 pub fn new() -> Self {
81 Self {
82 inner: RefCell::new(FailureMonitorInner {
83 address_status: BTreeMap::new(),
84 failed_endpoints: BTreeMap::new(),
85 endpoint_watchers: BTreeMap::new(),
86 disconnect_watchers: BTreeMap::new(),
87 }),
88 }
89 }
90
91 pub fn set_status(&self, address: &str, status: FailureStatus) {
105 let mut inner = self.inner.borrow_mut();
106
107 let changed = match status {
108 FailureStatus::Available => {
109 let prev = inner.address_status.insert(address.to_string(), status);
110 prev != Some(FailureStatus::Available)
111 }
112 FailureStatus::Failed => {
113 inner.address_status.remove(address).is_some()
115 }
116 };
117
118 if changed {
119 wake_all(&mut inner.endpoint_watchers, address);
120 }
121 }
122
123 pub fn notify_disconnect(&self, address: &str) {
131 let mut inner = self.inner.borrow_mut();
132 wake_all(&mut inner.endpoint_watchers, address);
133 wake_all(&mut inner.disconnect_watchers, address);
134 }
135
136 pub fn endpoint_not_found(&self, endpoint: &Endpoint) {
146 if endpoint.token.is_well_known() {
148 return;
149 }
150
151 let mut inner = self.inner.borrow_mut();
152
153 if inner.failed_endpoints.len() >= MAX_FAILED_ENDPOINTS {
155 tracing::warn!(
156 "FailureMonitor: clearing {} permanently failed endpoints (cap reached)",
157 inner.failed_endpoints.len()
158 );
159 inner.failed_endpoints.clear();
160 }
161
162 inner
163 .failed_endpoints
164 .insert(endpoint.clone(), FailedReason::NotFound);
165
166 let address = endpoint.address.to_string();
167 wake_all(&mut inner.endpoint_watchers, &address);
168 }
169
170 pub fn state(&self, endpoint: &Endpoint) -> FailureStatus {
183 let inner = self.inner.borrow();
184
185 if inner.failed_endpoints.contains_key(endpoint) {
186 return FailureStatus::Failed;
187 }
188
189 let address = endpoint.address.to_string();
190 inner
191 .address_status
192 .get(&address)
193 .copied()
194 .unwrap_or(FailureStatus::Failed) }
196
197 pub fn address_state(&self, address: &str) -> FailureStatus {
204 self.inner
205 .borrow()
206 .address_status
207 .get(address)
208 .copied()
209 .unwrap_or(FailureStatus::Failed)
210 }
211
212 pub fn permanently_failed(&self, endpoint: &Endpoint) -> bool {
217 self.inner.borrow().failed_endpoints.contains_key(endpoint)
218 }
219
220 pub fn on_disconnect_or_failure(
230 self: &Rc<Self>,
231 endpoint: &Endpoint,
232 ) -> impl Future<Output = ()> {
233 let fm = Rc::clone(self);
234 let address = endpoint.address.to_string();
235 let endpoint = endpoint.clone();
236
237 std::future::poll_fn(move |cx| {
238 let inner = fm.inner.borrow();
239
240 if inner.failed_endpoints.contains_key(&endpoint) {
242 return Poll::Ready(());
243 }
244 if !inner
245 .address_status
246 .get(&address)
247 .is_some_and(|s| *s == FailureStatus::Available)
248 {
249 return Poll::Ready(());
251 }
252
253 drop(inner);
255 let mut inner = fm.inner.borrow_mut();
256 inner
257 .endpoint_watchers
258 .entry(address.clone())
259 .or_default()
260 .push(cx.waker().clone());
261 Poll::Pending
262 })
263 }
264
265 pub fn on_state_changed(self: &Rc<Self>, endpoint: &Endpoint) -> impl Future<Output = ()> {
272 let fm = Rc::clone(self);
273 let address = endpoint.address.to_string();
274 let endpoint = endpoint.clone();
275 let registered = Rc::new(std::cell::Cell::new(false));
276
277 std::future::poll_fn(move |cx| {
278 let inner = fm.inner.borrow();
279
280 if inner.failed_endpoints.contains_key(&endpoint) {
282 return Poll::Pending;
283 }
284
285 if registered.get() {
287 return Poll::Ready(());
288 }
289
290 registered.set(true);
292 drop(inner);
293 let mut inner = fm.inner.borrow_mut();
294 inner
295 .endpoint_watchers
296 .entry(address.clone())
297 .or_default()
298 .push(cx.waker().clone());
299 Poll::Pending
300 })
301 }
302
303 pub fn on_disconnect(self: &Rc<Self>, address: &str) -> impl Future<Output = ()> {
310 let fm = Rc::clone(self);
311 let address = address.to_string();
312 let registered = Rc::new(std::cell::Cell::new(false));
313
314 std::future::poll_fn(move |cx| {
315 if registered.get() {
317 return Poll::Ready(());
318 }
319
320 registered.set(true);
322 let mut inner = fm.inner.borrow_mut();
323 inner
324 .disconnect_watchers
325 .entry(address.clone())
326 .or_default()
327 .push(cx.waker().clone());
328 Poll::Pending
329 })
330 }
331}
332
333impl Default for FailureMonitor {
334 fn default() -> Self {
335 Self::new()
336 }
337}
338
339impl std::fmt::Debug for FailureMonitor {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 let inner = self.inner.borrow();
342 f.debug_struct("FailureMonitor")
343 .field("addresses_available", &inner.address_status.len())
344 .field("endpoints_failed", &inner.failed_endpoints.len())
345 .finish()
346 }
347}
348
349fn wake_all(watchers: &mut BTreeMap<String, Vec<Waker>>, address: &str) {
351 if let Some(wakers) = watchers.remove(address) {
352 for waker in wakers {
353 waker.wake();
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use std::net::{IpAddr, Ipv4Addr};
361
362 use super::*;
363 use crate::{NetworkAddress, UID};
364
365 fn test_addr() -> NetworkAddress {
366 NetworkAddress::new(IpAddr::V4(Ipv4Addr::new(10, 0, 1, 1)), 4500)
367 }
368
369 fn test_endpoint() -> Endpoint {
370 Endpoint::new(test_addr(), UID::new(42, 1))
371 }
372
373 #[test]
374 fn test_unknown_address_is_failed() {
375 let fm = FailureMonitor::new();
376 let ep = test_endpoint();
377 assert_eq!(fm.state(&ep), FailureStatus::Failed);
378 assert_eq!(fm.address_state("10.0.1.1:4500"), FailureStatus::Failed);
379 }
380
381 #[test]
382 fn test_set_status_available() {
383 let fm = FailureMonitor::new();
384 let ep = test_endpoint();
385 fm.set_status("10.0.1.1:4500", FailureStatus::Available);
386 assert_eq!(fm.state(&ep), FailureStatus::Available);
387 assert_eq!(fm.address_state("10.0.1.1:4500"), FailureStatus::Available);
388 }
389
390 #[test]
391 fn test_set_status_failed_removes_entry() {
392 let fm = FailureMonitor::new();
393 fm.set_status("10.0.1.1:4500", FailureStatus::Available);
394 fm.set_status("10.0.1.1:4500", FailureStatus::Failed);
395 assert_eq!(fm.address_state("10.0.1.1:4500"), FailureStatus::Failed);
396 }
397
398 #[test]
399 fn test_endpoint_not_found_marks_permanent() {
400 let fm = FailureMonitor::new();
401 let ep = test_endpoint();
402 fm.set_status("10.0.1.1:4500", FailureStatus::Available);
403 fm.endpoint_not_found(&ep);
404 assert!(fm.permanently_failed(&ep));
405 assert_eq!(fm.state(&ep), FailureStatus::Failed);
406 }
407
408 #[test]
409 fn test_endpoint_not_found_skips_well_known() {
410 let fm = FailureMonitor::new();
411 let ep = Endpoint::well_known(test_addr(), crate::WellKnownToken::Ping);
412 fm.endpoint_not_found(&ep);
413 assert!(!fm.permanently_failed(&ep));
414 }
415
416 #[tokio::test]
417 async fn test_on_disconnect_or_failure_fast_path_unknown() {
418 let fm = Rc::new(FailureMonitor::new());
419 let ep = test_endpoint();
420 fm.on_disconnect_or_failure(&ep).await;
422 }
423
424 #[tokio::test]
425 async fn test_on_disconnect_or_failure_fast_path_permanent() {
426 let fm = Rc::new(FailureMonitor::new());
427 let ep = test_endpoint();
428 fm.set_status("10.0.1.1:4500", FailureStatus::Available);
429 fm.endpoint_not_found(&ep);
430 fm.on_disconnect_or_failure(&ep).await;
432 }
433
434 #[test]
435 fn test_on_disconnect_or_failure_wakes_on_status_change() {
436 let rt = tokio::runtime::Builder::new_current_thread()
437 .enable_all()
438 .build_local(tokio::runtime::LocalOptions::default())
439 .expect("build runtime");
440 rt.block_on(async {
441 let fm = Rc::new(FailureMonitor::new());
442 let ep = test_endpoint();
443 fm.set_status("10.0.1.1:4500", FailureStatus::Available);
444
445 let fm2 = Rc::clone(&fm);
446 let handle = tokio::task::spawn_local(async move {
447 fm2.on_disconnect_or_failure(&ep).await;
448 });
449
450 tokio::task::yield_now().await;
452
453 fm.set_status("10.0.1.1:4500", FailureStatus::Failed);
455
456 handle.await.expect("task should complete");
457 });
458 }
459
460 #[test]
461 fn test_on_disconnect_wakes_on_notify() {
462 let rt = tokio::runtime::Builder::new_current_thread()
463 .enable_all()
464 .build_local(tokio::runtime::LocalOptions::default())
465 .expect("build runtime");
466 rt.block_on(async {
467 let fm = Rc::new(FailureMonitor::new());
468 fm.set_status("10.0.1.1:4500", FailureStatus::Available);
469
470 let fm2 = Rc::clone(&fm);
471 let handle = tokio::task::spawn_local(async move {
472 fm2.on_disconnect("10.0.1.1:4500").await;
473 });
474
475 tokio::task::yield_now().await;
476
477 fm.notify_disconnect("10.0.1.1:4500");
478
479 handle.await.expect("task should complete");
480 });
481 }
482
483 #[test]
484 fn test_debug_impl() {
485 let fm = FailureMonitor::new();
486 fm.set_status("10.0.1.1:4500", FailureStatus::Available);
487 let debug = format!("{:?}", fm);
488 assert!(debug.contains("FailureMonitor"));
489 assert!(debug.contains("addresses_available: 1"));
490 }
491
492 #[test]
493 fn test_default_impl() {
494 let fm = FailureMonitor::default();
495 assert_eq!(fm.address_state("any"), FailureStatus::Failed);
496 }
497}