use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use super::peers::{PeerErrorClass, PeerSet};
#[derive(Debug)]
pub(crate) struct HealthHandle {
shutdown: Option<oneshot::Sender<()>>,
task: Option<JoinHandle<()>>,
}
impl HealthHandle {
pub(crate) fn spawn(
http: reqwest::Client,
peers: Arc<Mutex<PeerSet>>,
interval: Duration,
) -> Self {
let (tx, rx) = oneshot::channel();
let task = tokio::spawn(probe_loop(http, peers, interval, rx));
Self {
shutdown: Some(tx),
task: Some(task),
}
}
}
impl Drop for HealthHandle {
fn drop(&mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let Some(task) = self.task.take() {
task.abort();
}
}
}
async fn probe_loop(
http: reqwest::Client,
peers: Arc<Mutex<PeerSet>>,
interval: Duration,
mut shutdown: oneshot::Receiver<()>,
) {
loop {
tokio::select! {
_ = &mut shutdown => return,
_ = tokio::time::sleep(interval) => {}
}
let targets: Vec<(String, reqwest::Url)> = {
match peers.lock() {
Ok(guard) => guard
.peers()
.iter()
.filter_map(|p| {
p.endpoint
.join_api("/status")
.ok()
.map(|u| (p.endpoint.key(), u))
})
.collect(),
Err(_) => return,
}
};
for (key, url) in targets {
let started = Instant::now();
let result = http
.get(url.clone())
.header(reqwest::header::ACCEPT, "application/json")
.send()
.await;
match result {
Ok(resp) => {
let status = resp.status();
if status.is_success() {
if let Ok(mut guard) = peers.lock() {
guard.mark_success(&key, started.elapsed());
}
} else if let Ok(mut guard) = peers.lock() {
let class = match status.as_u16() {
503 => PeerErrorClass::ServiceUnavailable,
500 | 502 | 504 => PeerErrorClass::Server5xx,
_ => continue,
};
guard.mark_failure(&key, class);
}
}
Err(e) => {
let class = if e.is_timeout() {
PeerErrorClass::Timeout
} else {
PeerErrorClass::Network
};
if let Ok(mut guard) = peers.lock() {
guard.mark_failure(&key, class);
}
}
}
}
}
}