#[cfg(all(test, feature = "tokio"))]
#[path = "../../tests/session/server_health.rs"]
mod tests;
use std::sync::{Arc, Weak};
use std::time::Duration;
use futures::future::{Either, select};
use futures::pin_mut;
use log::debug;
use rand::Rng;
use crate::session::server::OutgoingRouter;
use crate::settings::Settings;
use crate::settings::keys::*;
use crate::tailer::{IdentityType, ReturnCode, Tailer};
use crate::utils::random::get_rng;
use crate::utils::sync::{AsyncExecutor, WatchReceiver, WatchSender, create_watch, sleep};
use crate::utils::unix_timestamp_ms;
enum ServerDecayEvent {
Timeout,
Terminated,
Triggered(u32, u64),
}
async fn wait_for_trigger(timeout_ms: u64, trigger_rx: &mut WatchReceiver<(u32, u64)>) -> ServerDecayEvent {
let sleep_fut = sleep(Duration::from_millis(timeout_ms));
let recv_fut = trigger_rx.recv();
pin_mut!(sleep_fut, recv_fut);
match select(sleep_fut, recv_fut).await {
Either::Left(_) => ServerDecayEvent::Timeout,
Either::Right((Some((next_in, pn)), _)) => ServerDecayEvent::Triggered(next_in, pn),
Either::Right((None, _)) => ServerDecayEvent::Terminated,
}
}
pub(super) struct ServerHealthProvider {
trigger_tx: WatchSender<(u32, u64)>,
}
impl ServerHealthProvider {
pub(super) fn new<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static>(router: Weak<dyn OutgoingRouter<T>>, identity: T, settings: Arc<Settings<AE>>, initial_server_next_in: u32) -> Self {
let (trigger_tx, trigger_rx) = create_watch();
let executor = settings.executor().clone();
executor.spawn(Self::timer_task(router, identity, settings, trigger_rx, initial_server_next_in));
Self {
trigger_tx,
}
}
pub(super) fn feed_health_check(&self, client_next_in: u32, client_pn: u64) {
self.trigger_tx.send((client_next_in, client_pn));
}
async fn timer_task<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static>(router: Weak<dyn OutgoingRouter<T>>, identity: T, settings: Arc<Settings<AE>>, mut trigger_rx: WatchReceiver<(u32, u64)>, initial_server_next_in: u32) {
let timeout = settings.get(&TIMEOUT_DEFAULT).clamp(settings.get(&TIMEOUT_MIN), settings.get(&TIMEOUT_MAX));
let mut current_timeout = initial_server_next_in as u64 + timeout;
let mut retry_count: u64 = 0;
'outer: loop {
match wait_for_trigger(current_timeout, &mut trigger_rx).await {
ServerDecayEvent::Triggered(client_next_in, client_pn) => {
retry_count = 0;
let mut response_pn = client_pn;
let mut delay_ms = (client_next_in as u64).clamp(settings.get(&HEALTH_CHECK_NEXT_IN_MIN), settings.get(&HEALTH_CHECK_NEXT_IN_MAX));
loop {
match wait_for_trigger(delay_ms, &mut trigger_rx).await {
ServerDecayEvent::Triggered(new_next_in, new_pn) => {
response_pn = new_pn;
delay_ms = (new_next_in as u64).clamp(settings.get(&HEALTH_CHECK_NEXT_IN_MIN), settings.get(&HEALTH_CHECK_NEXT_IN_MAX));
}
ServerDecayEvent::Timeout => break,
ServerDecayEvent::Terminated => {
debug!("ServerHealthProvider: trigger channel closed, stopping");
break 'outer;
}
}
}
let server_next_in = get_rng().gen_range(settings.get(&HEALTH_CHECK_NEXT_IN_MIN)..=settings.get(&HEALTH_CHECK_NEXT_IN_MAX)) as u32;
let buf = settings.pool().allocate(Some(T::length()));
let response = Tailer::health_check(buf, &identity, server_next_in, response_pn).into_buffer();
let Some(r) = router.upgrade() else {
debug!("ServerHealthProvider: router dropped, stopping");
break 'outer;
};
r.route_packet(response, &identity).await;
current_timeout = server_next_in as u64 + timeout;
debug!("ServerHealthProvider: response sent (server_next_in={server_next_in}ms), next timeout={current_timeout}ms");
}
ServerDecayEvent::Timeout => {
retry_count += 1;
if retry_count < settings.get(&MAX_RETRIES) {
debug!("ServerHealthProvider: health check timeout, retry {}/{}", retry_count, settings.get(&MAX_RETRIES));
continue;
}
debug!("ServerHealthProvider: connection decayed after {retry_count} retries");
if let Some(r) = router.upgrade() {
let pn = (unix_timestamp_ms() / 1000) as u64 * (1u64 << 32);
let buf = settings.pool().allocate(Some(T::length()));
let termination = Tailer::termination(buf, &identity, ReturnCode::ConnectionDecayed, pn).into_buffer();
r.route_packet(termination, &identity).await;
r.remove_session(&identity).await;
}
break 'outer;
}
ServerDecayEvent::Terminated => {
debug!("ServerHealthProvider: trigger channel closed, stopping");
break 'outer;
}
}
}
}
}