use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use nodedb_cluster::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitState};
use nodedb_cluster::error::{ClusterError, Result};
use nodedb_cluster::reachability::{
ReachabilityDriver, ReachabilityDriverConfig, ReachabilityProber,
};
use tokio::sync::watch;
struct Flappy {
healthy: AtomicBool,
}
impl Flappy {
fn new() -> Arc<Self> {
Arc::new(Self {
healthy: AtomicBool::new(false),
})
}
fn heal(&self) {
self.healthy.store(true, Ordering::SeqCst);
}
}
#[async_trait]
impl ReachabilityProber for Flappy {
async fn probe(&self, peer: u64) -> Result<()> {
if self.healthy.load(Ordering::SeqCst) {
Ok(())
} else {
Err(ClusterError::Transport {
detail: format!("mock: peer {peer} unreachable"),
})
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reachability_loop_recovers_open_breaker_without_user_traffic() {
let breaker = Arc::new(CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 1,
cooldown: Duration::from_millis(100),
}));
breaker.record_failure(42);
assert_eq!(breaker.state(42), CircuitState::Open);
let prober = Flappy::new();
struct RelayProber {
inner: Arc<Flappy>,
breaker: Arc<CircuitBreaker>,
}
#[async_trait]
impl ReachabilityProber for RelayProber {
async fn probe(&self, peer: u64) -> Result<()> {
if self.breaker.check(peer).is_err() {
return Err(ClusterError::CircuitOpen {
node_id: peer,
failures: self.breaker.failure_count(peer),
});
}
match self.inner.probe(peer).await {
Ok(()) => {
self.breaker.record_success(peer);
Ok(())
}
Err(e) => {
self.breaker.record_failure(peer);
Err(e)
}
}
}
}
let relay: Arc<dyn ReachabilityProber> = Arc::new(RelayProber {
inner: prober.clone(),
breaker: Arc::clone(&breaker),
});
let driver = Arc::new(ReachabilityDriver::new(
Arc::clone(&breaker),
relay,
ReachabilityDriverConfig {
interval: Duration::from_millis(150),
},
));
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn({
let d = Arc::clone(&driver);
async move { d.run(shutdown_rx).await }
});
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(
breaker.state(42),
CircuitState::Open,
"breaker should stay open while peer is unhealthy"
);
prober.heal();
let deadline = Instant::now() + Duration::from_secs(3);
loop {
if breaker.state(42) == CircuitState::Closed {
break;
}
if Instant::now() >= deadline {
panic!("breaker never recovered; state = {:?}", breaker.state(42));
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let _ = shutdown_tx.send(true);
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
}