Skip to main content

nodedb_cluster/subsystem/impls/
reachability_subsystem.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! [`ReachabilitySubsystem`] — wraps the [`ReachabilityDriver`] lifecycle.
4//!
5//! Depends on `swim` because the driver probes peers that SWIM knows
6//! about via the shared [`CircuitBreaker`]. The driver only fires if
7//! there are open-circuit peers; SWIM must be up to start accumulating
8//! failures that open breakers.
9
10use std::sync::Arc;
11use std::time::Instant;
12
13use async_trait::async_trait;
14use tokio::sync::watch;
15
16use crate::reachability::driver::{ReachabilityDriver, ReachabilityDriverConfig};
17use crate::reachability::prober::TransportProber;
18
19use super::super::context::BootstrapCtx;
20use super::super::errors::{BootstrapError, ShutdownError};
21use super::super::health::SubsystemHealth;
22use super::super::r#trait::{ClusterSubsystem, SubsystemHandle};
23
24/// Owns the reachability driver lifecycle.
25pub struct ReachabilitySubsystem {
26    cfg: ReachabilityDriverConfig,
27}
28
29impl ReachabilitySubsystem {
30    pub fn new(cfg: ReachabilityDriverConfig) -> Self {
31        Self { cfg }
32    }
33}
34
35#[async_trait]
36impl ClusterSubsystem for ReachabilitySubsystem {
37    fn name(&self) -> &'static str {
38        "reachability"
39    }
40
41    fn dependencies(&self) -> &'static [&'static str] {
42        &["swim"]
43    }
44
45    async fn start(&self, ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
46        let prober = Arc::new(TransportProber::new(
47            Arc::clone(&ctx.transport),
48            ctx.transport.node_id(),
49        ));
50        let breaker = Arc::clone(ctx.transport.circuit_breaker());
51        let driver = Arc::new(ReachabilityDriver::new(breaker, prober, self.cfg.clone()));
52
53        let (shutdown_tx, shutdown_rx) = watch::channel(false);
54        let task = tokio::spawn(async move { driver.run(shutdown_rx).await });
55
56        Ok(SubsystemHandle::new("reachability", task, shutdown_tx))
57    }
58
59    async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
60        // Shutdown is driven entirely by the `SubsystemHandle::shutdown_tx`
61        // watch that the registry holds. The driver listens on that watch
62        // and exits cleanly. Nothing extra needed here.
63        Ok(())
64    }
65
66    fn health(&self) -> SubsystemHealth {
67        SubsystemHealth::Running
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    #[test]
76    fn reachability_name_and_deps() {
77        let s = ReachabilitySubsystem::new(ReachabilityDriverConfig::default());
78        assert_eq!(s.name(), "reachability");
79        assert_eq!(s.dependencies(), &["swim"]);
80    }
81}