Skip to main content

moonpool_transport/rpc/
failure_monitor.rs

1//! FailureMonitor: Reactive failure tracking for addresses and endpoints.
2//!
3//! Tracks two levels of failure:
4//! - **Address-level**: Is a remote machine reachable? (Missing = Failed)
5//! - **Endpoint-level**: Is a specific endpoint permanently dead?
6//!
7//! Producers (connection_task) call [`FailureMonitor::set_status`] and
8//! [`FailureMonitor::notify_disconnect`]. Consumers (delivery mode functions)
9//! poll [`FailureMonitor::on_disconnect_or_failure`] to race replies against
10//! disconnect signals.
11//!
12//! # FDB Reference
13//! `SimpleFailureMonitor` from `FailureMonitor.h:146`, `FailureMonitor.actor.cpp`
14
15use std::cell::RefCell;
16use std::collections::BTreeMap;
17use std::future::Future;
18use std::rc::Rc;
19use std::task::{Poll, Waker};
20
21use crate::Endpoint;
22
23/// Maximum number of permanently failed endpoints before clearing the map.
24///
25/// Matches FDB: `failedEndpoints.size() > 100000` triggers clear.
26const MAX_FAILED_ENDPOINTS: usize = 100_000;
27
28/// Status of a network address or endpoint.
29///
30/// # FDB Reference
31/// `FailureStatus` from `FailureMonitor.h:34-60`
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum FailureStatus {
34    /// Address is reachable.
35    Available,
36    /// Address is unreachable (default for unknown addresses).
37    Failed,
38}
39
40/// Reason an endpoint was permanently marked as failed.
41///
42/// # FDB Reference
43/// `FailedReason` from `FailureMonitor.h:65-68`
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum FailedReason {
46    /// The endpoint was not found on the remote machine.
47    NotFound,
48}
49
50/// Reactive failure monitor for address and endpoint tracking.
51///
52/// Single-threaded (`!Send`, `!Sync`) — uses `RefCell` for interior mutability.
53/// Consumers register wakers via `on_disconnect_or_failure` and similar methods;
54/// producers wake them via `set_status`, `notify_disconnect`, `endpoint_not_found`.
55///
56/// # FDB Reference
57/// `SimpleFailureMonitor` from `FailureMonitor.h:146`, `FailureMonitor.actor.cpp`
58pub struct FailureMonitor {
59    inner: RefCell<FailureMonitorInner>,
60}
61
62struct FailureMonitorInner {
63    /// Address-level status. Missing entry = Failed (FDB default).
64    address_status: BTreeMap<String, FailureStatus>,
65    /// Permanently failed endpoints (e.g., endpoint not found on remote).
66    failed_endpoints: BTreeMap<Endpoint, FailedReason>,
67    /// Wakers waiting for endpoint state changes, keyed by address.
68    /// Woken on: set_status change, notify_disconnect, endpoint_not_found.
69    endpoint_watchers: BTreeMap<String, Vec<Waker>>,
70    /// Wakers waiting for disconnect events, keyed by address.
71    /// Woken only on: notify_disconnect.
72    disconnect_watchers: BTreeMap<String, Vec<Waker>>,
73}
74
75impl FailureMonitor {
76    /// Create a new failure monitor with empty state.
77    ///
78    /// All unknown addresses default to [`FailureStatus::Failed`] until
79    /// a connection succeeds and calls [`set_status`](Self::set_status).
80    pub fn new() -> Self {
81        Self {
82            inner: RefCell::new(FailureMonitorInner {
83                address_status: BTreeMap::new(),
84                failed_endpoints: BTreeMap::new(),
85                endpoint_watchers: BTreeMap::new(),
86                disconnect_watchers: BTreeMap::new(),
87            }),
88        }
89    }
90
91    // =========================================================================
92    // Producer methods (called by connection_task)
93    // =========================================================================
94
95    /// Update the status of an address.
96    ///
97    /// Called by `connection_task` on successful connect (`Available`) or
98    /// connection failure (`Failed`).
99    ///
100    /// Wakes all endpoint watchers for this address on status change.
101    ///
102    /// # FDB Reference
103    /// `SimpleFailureMonitor::setStatus` (FailureMonitor.actor.cpp:83-115)
104    pub fn set_status(&self, address: &str, status: FailureStatus) {
105        let mut inner = self.inner.borrow_mut();
106
107        let changed = match status {
108            FailureStatus::Available => {
109                let prev = inner.address_status.insert(address.to_string(), status);
110                prev != Some(FailureStatus::Available)
111            }
112            FailureStatus::Failed => {
113                // Missing = Failed, so remove the entry
114                inner.address_status.remove(address).is_some()
115            }
116        };
117
118        if changed {
119            wake_all(&mut inner.endpoint_watchers, address);
120        }
121    }
122
123    /// Signal that a connection to the given address has been lost.
124    ///
125    /// Wakes both endpoint watchers and disconnect watchers for this address.
126    /// Called by `connection_task` after `disconnect_notify.notify_waiters()`.
127    ///
128    /// # FDB Reference
129    /// `SimpleFailureMonitor::notifyDisconnect` (FailureMonitor.actor.cpp:150-154)
130    pub fn notify_disconnect(&self, address: &str) {
131        let mut inner = self.inner.borrow_mut();
132        wake_all(&mut inner.endpoint_watchers, address);
133        wake_all(&mut inner.disconnect_watchers, address);
134    }
135
136    /// Mark an endpoint as permanently failed (e.g., not found on remote).
137    ///
138    /// Permanently failed endpoints are never automatically recovered.
139    /// Wakes endpoint watchers for the endpoint's address.
140    ///
141    /// Skips well-known tokens (system endpoints that always exist).
142    ///
143    /// # FDB Reference
144    /// `SimpleFailureMonitor::endpointNotFound` (FailureMonitor.actor.cpp:117-139)
145    pub fn endpoint_not_found(&self, endpoint: &Endpoint) {
146        // Skip well-known tokens (FDB: `if token.first() == -1 return`)
147        if endpoint.token.is_well_known() {
148            return;
149        }
150
151        let mut inner = self.inner.borrow_mut();
152
153        // Cap to prevent memory leaks in long-running simulations
154        if inner.failed_endpoints.len() >= MAX_FAILED_ENDPOINTS {
155            tracing::warn!(
156                "FailureMonitor: clearing {} permanently failed endpoints (cap reached)",
157                inner.failed_endpoints.len()
158            );
159            inner.failed_endpoints.clear();
160        }
161
162        inner
163            .failed_endpoints
164            .insert(endpoint.clone(), FailedReason::NotFound);
165
166        let address = endpoint.address.to_string();
167        wake_all(&mut inner.endpoint_watchers, &address);
168    }
169
170    // =========================================================================
171    // Consumer methods (used by delivery mode functions)
172    // =========================================================================
173
174    /// Get the current failure status of an endpoint.
175    ///
176    /// Returns [`FailureStatus::Failed`] if:
177    /// - The endpoint is permanently failed, OR
178    /// - The endpoint's address is unknown or failed
179    ///
180    /// # FDB Reference
181    /// `SimpleFailureMonitor::getState(Endpoint)` (FailureMonitor.actor.cpp:196-206)
182    pub fn state(&self, endpoint: &Endpoint) -> FailureStatus {
183        let inner = self.inner.borrow();
184
185        if inner.failed_endpoints.contains_key(endpoint) {
186            return FailureStatus::Failed;
187        }
188
189        let address = endpoint.address.to_string();
190        inner
191            .address_status
192            .get(&address)
193            .copied()
194            .unwrap_or(FailureStatus::Failed) // Missing = Failed
195    }
196
197    /// Get the current failure status of an address.
198    ///
199    /// Returns [`FailureStatus::Failed`] if the address is unknown.
200    ///
201    /// # FDB Reference
202    /// `SimpleFailureMonitor::getState(NetworkAddress)` (FailureMonitor.actor.cpp:208-214)
203    pub fn address_state(&self, address: &str) -> FailureStatus {
204        self.inner
205            .borrow()
206            .address_status
207            .get(address)
208            .copied()
209            .unwrap_or(FailureStatus::Failed)
210    }
211
212    /// Check if an endpoint is permanently failed.
213    ///
214    /// # FDB Reference
215    /// `SimpleFailureMonitor::permanentlyFailed` (FailureMonitor.h:226-228)
216    pub fn permanently_failed(&self, endpoint: &Endpoint) -> bool {
217        self.inner.borrow().failed_endpoints.contains_key(endpoint)
218    }
219
220    /// Returns a future that resolves when the endpoint's address disconnects
221    /// or the endpoint becomes permanently failed.
222    ///
223    /// **Fast path**: Returns `Ready` immediately if already failed.
224    ///
225    /// Used by `try_get_reply()` to race reply vs disconnect.
226    ///
227    /// # FDB Reference
228    /// `SimpleFailureMonitor::onDisconnectOrFailure` (FailureMonitor.actor.cpp:156-178)
229    pub fn on_disconnect_or_failure(
230        self: &Rc<Self>,
231        endpoint: &Endpoint,
232    ) -> impl Future<Output = ()> {
233        let fm = Rc::clone(self);
234        let address = endpoint.address.to_string();
235        let endpoint = endpoint.clone();
236
237        std::future::poll_fn(move |cx| {
238            let inner = fm.inner.borrow();
239
240            // Fast path: already failed
241            if inner.failed_endpoints.contains_key(&endpoint) {
242                return Poll::Ready(());
243            }
244            if !inner
245                .address_status
246                .get(&address)
247                .is_some_and(|s| *s == FailureStatus::Available)
248            {
249                // Missing or Failed → already failed
250                return Poll::Ready(());
251            }
252
253            // Slow path: register waker
254            drop(inner);
255            let mut inner = fm.inner.borrow_mut();
256            inner
257                .endpoint_watchers
258                .entry(address.clone())
259                .or_default()
260                .push(cx.waker().clone());
261            Poll::Pending
262        })
263    }
264
265    /// Returns a future that resolves when the endpoint's state changes.
266    ///
267    /// If the endpoint is permanently failed, the future never resolves.
268    ///
269    /// # FDB Reference
270    /// `SimpleFailureMonitor::onStateChanged` (FailureMonitor.actor.cpp:184-194)
271    pub fn on_state_changed(self: &Rc<Self>, endpoint: &Endpoint) -> impl Future<Output = ()> {
272        let fm = Rc::clone(self);
273        let address = endpoint.address.to_string();
274        let endpoint = endpoint.clone();
275        let registered = Rc::new(std::cell::Cell::new(false));
276
277        std::future::poll_fn(move |cx| {
278            let inner = fm.inner.borrow();
279
280            // Permanently failed → never resolves (FDB: onStateChanged returns Never)
281            if inner.failed_endpoints.contains_key(&endpoint) {
282                return Poll::Pending;
283            }
284
285            // If we already registered and got woken, a state change happened
286            if registered.get() {
287                return Poll::Ready(());
288            }
289
290            // First poll: register waker
291            registered.set(true);
292            drop(inner);
293            let mut inner = fm.inner.borrow_mut();
294            inner
295                .endpoint_watchers
296                .entry(address.clone())
297                .or_default()
298                .push(cx.waker().clone());
299            Poll::Pending
300        })
301    }
302
303    /// Returns a future that resolves when the given address disconnects.
304    ///
305    /// Only triggered by explicit disconnect events, not status changes.
306    ///
307    /// # FDB Reference
308    /// `SimpleFailureMonitor::onDisconnect` (FailureMonitor.actor.cpp:180-182)
309    pub fn on_disconnect(self: &Rc<Self>, address: &str) -> impl Future<Output = ()> {
310        let fm = Rc::clone(self);
311        let address = address.to_string();
312        let registered = Rc::new(std::cell::Cell::new(false));
313
314        std::future::poll_fn(move |cx| {
315            // If we already registered and got woken, the disconnect happened
316            if registered.get() {
317                return Poll::Ready(());
318            }
319
320            // First poll: register waker and mark as registered
321            registered.set(true);
322            let mut inner = fm.inner.borrow_mut();
323            inner
324                .disconnect_watchers
325                .entry(address.clone())
326                .or_default()
327                .push(cx.waker().clone());
328            Poll::Pending
329        })
330    }
331}
332
333impl Default for FailureMonitor {
334    fn default() -> Self {
335        Self::new()
336    }
337}
338
339impl std::fmt::Debug for FailureMonitor {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        let inner = self.inner.borrow();
342        f.debug_struct("FailureMonitor")
343            .field("addresses_available", &inner.address_status.len())
344            .field("endpoints_failed", &inner.failed_endpoints.len())
345            .finish()
346    }
347}
348
349/// Drain and wake all wakers registered for the given address.
350fn wake_all(watchers: &mut BTreeMap<String, Vec<Waker>>, address: &str) {
351    if let Some(wakers) = watchers.remove(address) {
352        for waker in wakers {
353            waker.wake();
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use std::net::{IpAddr, Ipv4Addr};
361
362    use super::*;
363    use crate::{NetworkAddress, UID};
364
365    fn test_addr() -> NetworkAddress {
366        NetworkAddress::new(IpAddr::V4(Ipv4Addr::new(10, 0, 1, 1)), 4500)
367    }
368
369    fn test_endpoint() -> Endpoint {
370        Endpoint::new(test_addr(), UID::new(42, 1))
371    }
372
373    #[test]
374    fn test_unknown_address_is_failed() {
375        let fm = FailureMonitor::new();
376        let ep = test_endpoint();
377        assert_eq!(fm.state(&ep), FailureStatus::Failed);
378        assert_eq!(fm.address_state("10.0.1.1:4500"), FailureStatus::Failed);
379    }
380
381    #[test]
382    fn test_set_status_available() {
383        let fm = FailureMonitor::new();
384        let ep = test_endpoint();
385        fm.set_status("10.0.1.1:4500", FailureStatus::Available);
386        assert_eq!(fm.state(&ep), FailureStatus::Available);
387        assert_eq!(fm.address_state("10.0.1.1:4500"), FailureStatus::Available);
388    }
389
390    #[test]
391    fn test_set_status_failed_removes_entry() {
392        let fm = FailureMonitor::new();
393        fm.set_status("10.0.1.1:4500", FailureStatus::Available);
394        fm.set_status("10.0.1.1:4500", FailureStatus::Failed);
395        assert_eq!(fm.address_state("10.0.1.1:4500"), FailureStatus::Failed);
396    }
397
398    #[test]
399    fn test_endpoint_not_found_marks_permanent() {
400        let fm = FailureMonitor::new();
401        let ep = test_endpoint();
402        fm.set_status("10.0.1.1:4500", FailureStatus::Available);
403        fm.endpoint_not_found(&ep);
404        assert!(fm.permanently_failed(&ep));
405        assert_eq!(fm.state(&ep), FailureStatus::Failed);
406    }
407
408    #[test]
409    fn test_endpoint_not_found_skips_well_known() {
410        let fm = FailureMonitor::new();
411        let ep = Endpoint::well_known(test_addr(), crate::WellKnownToken::Ping);
412        fm.endpoint_not_found(&ep);
413        assert!(!fm.permanently_failed(&ep));
414    }
415
416    #[tokio::test]
417    async fn test_on_disconnect_or_failure_fast_path_unknown() {
418        let fm = Rc::new(FailureMonitor::new());
419        let ep = test_endpoint();
420        // Unknown address → already failed → should resolve immediately
421        fm.on_disconnect_or_failure(&ep).await;
422    }
423
424    #[tokio::test]
425    async fn test_on_disconnect_or_failure_fast_path_permanent() {
426        let fm = Rc::new(FailureMonitor::new());
427        let ep = test_endpoint();
428        fm.set_status("10.0.1.1:4500", FailureStatus::Available);
429        fm.endpoint_not_found(&ep);
430        // Permanently failed → should resolve immediately
431        fm.on_disconnect_or_failure(&ep).await;
432    }
433
434    #[test]
435    fn test_on_disconnect_or_failure_wakes_on_status_change() {
436        let rt = tokio::runtime::Builder::new_current_thread()
437            .enable_all()
438            .build_local(tokio::runtime::LocalOptions::default())
439            .expect("build runtime");
440        rt.block_on(async {
441            let fm = Rc::new(FailureMonitor::new());
442            let ep = test_endpoint();
443            fm.set_status("10.0.1.1:4500", FailureStatus::Available);
444
445            let fm2 = Rc::clone(&fm);
446            let handle = tokio::task::spawn_local(async move {
447                fm2.on_disconnect_or_failure(&ep).await;
448            });
449
450            // Yield to let the future register its waker
451            tokio::task::yield_now().await;
452
453            // Trigger disconnect → should wake the future
454            fm.set_status("10.0.1.1:4500", FailureStatus::Failed);
455
456            handle.await.expect("task should complete");
457        });
458    }
459
460    #[test]
461    fn test_on_disconnect_wakes_on_notify() {
462        let rt = tokio::runtime::Builder::new_current_thread()
463            .enable_all()
464            .build_local(tokio::runtime::LocalOptions::default())
465            .expect("build runtime");
466        rt.block_on(async {
467            let fm = Rc::new(FailureMonitor::new());
468            fm.set_status("10.0.1.1:4500", FailureStatus::Available);
469
470            let fm2 = Rc::clone(&fm);
471            let handle = tokio::task::spawn_local(async move {
472                fm2.on_disconnect("10.0.1.1:4500").await;
473            });
474
475            tokio::task::yield_now().await;
476
477            fm.notify_disconnect("10.0.1.1:4500");
478
479            handle.await.expect("task should complete");
480        });
481    }
482
483    #[test]
484    fn test_debug_impl() {
485        let fm = FailureMonitor::new();
486        fm.set_status("10.0.1.1:4500", FailureStatus::Available);
487        let debug = format!("{:?}", fm);
488        assert!(debug.contains("FailureMonitor"));
489        assert!(debug.contains("addresses_available: 1"));
490    }
491
492    #[test]
493    fn test_default_impl() {
494        let fm = FailureMonitor::default();
495        assert_eq!(fm.address_state("any"), FailureStatus::Failed);
496    }
497}