nodedb_cluster/subsystem/impls/
reachability_subsystem.rs1use std::sync::Arc;
11use std::time::Instant;
12
13use async_trait::async_trait;
14use tokio::sync::watch;
15
16use crate::reachability::driver::{ReachabilityDriver, ReachabilityDriverConfig};
17use crate::reachability::prober::TransportProber;
18
19use super::super::context::BootstrapCtx;
20use super::super::errors::{BootstrapError, ShutdownError};
21use super::super::health::SubsystemHealth;
22use super::super::r#trait::{ClusterSubsystem, SubsystemHandle};
23
24pub struct ReachabilitySubsystem {
26 cfg: ReachabilityDriverConfig,
27}
28
29impl ReachabilitySubsystem {
30 pub fn new(cfg: ReachabilityDriverConfig) -> Self {
31 Self { cfg }
32 }
33}
34
35#[async_trait]
36impl ClusterSubsystem for ReachabilitySubsystem {
37 fn name(&self) -> &'static str {
38 "reachability"
39 }
40
41 fn dependencies(&self) -> &'static [&'static str] {
42 &["swim"]
43 }
44
45 async fn start(&self, ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
46 let prober = Arc::new(TransportProber::new(
47 Arc::clone(&ctx.transport),
48 ctx.transport.node_id(),
49 ));
50 let breaker = Arc::clone(ctx.transport.circuit_breaker());
51 let driver = Arc::new(ReachabilityDriver::new(breaker, prober, self.cfg.clone()));
52
53 let (shutdown_tx, shutdown_rx) = watch::channel(false);
54 let task = tokio::spawn(async move { driver.run(shutdown_rx).await });
55
56 Ok(SubsystemHandle::new("reachability", task, shutdown_tx))
57 }
58
59 async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
60 Ok(())
64 }
65
66 fn health(&self) -> SubsystemHealth {
67 SubsystemHealth::Running
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74
75 #[test]
76 fn reachability_name_and_deps() {
77 let s = ReachabilitySubsystem::new(ReachabilityDriverConfig::default());
78 assert_eq!(s.name(), "reachability");
79 assert_eq!(s.dependencies(), &["swim"]);
80 }
81}