use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, PartialEq)]
pub enum KeepalivePreset {
Mobile,
Home,
Corporate,
DataCenter,
Disabled,
}
#[derive(Debug, Clone)]
pub struct KeepaliveConfig {
pub interval: Duration,
pub timeout: Duration,
pub max_missed: u32,
pub enabled: bool,
pub adaptive: bool,
}
impl KeepaliveConfig {
pub fn mobile() -> Self {
KeepaliveConfig {
interval: Duration::from_secs(20),
timeout: Duration::from_secs(5),
max_missed: 3,
enabled: true,
adaptive: true,
}
}
pub fn home() -> Self {
KeepaliveConfig {
interval: Duration::from_secs(60),
timeout: Duration::from_secs(10),
max_missed: 3,
enabled: true,
adaptive: true,
}
}
pub fn corporate() -> Self {
KeepaliveConfig {
interval: Duration::from_secs(120),
timeout: Duration::from_secs(15),
max_missed: 2,
enabled: true,
adaptive: false,
}
}
pub fn datacenter() -> Self {
KeepaliveConfig {
interval: Duration::from_secs(30),
timeout: Duration::from_secs(10),
max_missed: 5,
enabled: true,
adaptive: false,
}
}
pub fn aggressive() -> Self {
Self::mobile()
}
pub fn disabled() -> Self {
KeepaliveConfig {
interval: Duration::from_secs(u64::MAX / 2),
timeout: Duration::from_secs(30),
max_missed: u32::MAX,
enabled: false,
adaptive: false,
}
}
pub fn from_preset(preset: KeepalivePreset) -> Self {
match preset {
KeepalivePreset::Mobile => Self::mobile(),
KeepalivePreset::Home => Self::home(),
KeepalivePreset::Corporate => Self::corporate(),
KeepalivePreset::DataCenter => Self::datacenter(),
KeepalivePreset::Disabled => Self::disabled(),
}
}
}
impl Default for KeepaliveConfig {
fn default() -> Self {
Self::home()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum KeepaliveAction {
Idle,
SendPing,
PongTimeout,
ConnectionDead,
}
pub struct KeepaliveManager {
config: KeepaliveConfig,
last_sent: Option<Instant>,
last_pong: Option<Instant>,
last_activity: Instant,
waiting_for_pong: bool,
ping_sent_at: Option<Instant>,
missed_pongs: u32,
total_sent: u64,
total_pongs: u64,
srtt: Option<Duration>,
adaptive_interval: Duration,
}
impl KeepaliveManager {
pub fn new(config: KeepaliveConfig) -> Self {
let adaptive_interval = config.interval;
info!(
interval_secs = config.interval.as_secs(),
max_missed = config.max_missed,
adaptive = config.adaptive,
enabled = config.enabled,
"KeepaliveManager created"
);
KeepaliveManager {
adaptive_interval,
config,
last_sent: None,
last_pong: None,
last_activity: Instant::now(),
waiting_for_pong: false,
ping_sent_at: None,
missed_pongs: 0,
total_sent: 0,
total_pongs: 0,
srtt: None,
}
}
pub fn from_preset(preset: KeepalivePreset) -> Self {
Self::new(KeepaliveConfig::from_preset(preset))
}
pub fn check(&mut self) -> KeepaliveAction {
if !self.config.enabled {
return KeepaliveAction::Idle;
}
if self.waiting_for_pong {
if let Some(sent_at) = self.ping_sent_at {
if sent_at.elapsed() > self.config.timeout {
self.waiting_for_pong = false;
self.ping_sent_at = None;
warn!(
missed = self.missed_pongs + 1,
max = self.config.max_missed,
"Keepalive pong timed out"
);
self.missed_pongs += 1;
if self.missed_pongs >= self.config.max_missed {
warn!("Too many missed keepalive pongs — connection declared dead");
return KeepaliveAction::ConnectionDead;
}
return KeepaliveAction::PongTimeout;
}
return KeepaliveAction::Idle;
}
}
if self.should_send_keepalive() {
return KeepaliveAction::SendPing;
}
KeepaliveAction::Idle
}
pub fn should_send_keepalive(&self) -> bool {
if !self.config.enabled || self.waiting_for_pong {
return false;
}
let since_activity = self.last_activity.elapsed();
let since_sent = self.last_sent
.map(|t| t.elapsed())
.unwrap_or(Duration::MAX);
since_activity >= self.adaptive_interval
|| since_sent >= self.adaptive_interval
}
pub fn record_keepalive_sent(&mut self) {
let now = Instant::now();
self.last_sent = Some(now);
self.ping_sent_at = Some(now);
self.waiting_for_pong = true;
self.total_sent += 1;
debug!(total_sent = self.total_sent, "Keepalive ping sent");
}
pub fn record_pong_received(&mut self) {
let now = Instant::now();
self.last_pong = Some(now);
self.last_activity = now;
self.total_pongs += 1;
if let Some(sent_at) = self.ping_sent_at.take() {
let rtt = sent_at.elapsed();
self.update_srtt(rtt);
if self.config.adaptive {
self.adjust_interval(rtt);
}
}
self.waiting_for_pong = false;
self.missed_pongs = 0;
debug!(total_pongs = self.total_pongs, "Keepalive pong received");
}
pub fn record_pong_missed(&mut self) {
self.missed_pongs += 1;
warn!(
missed = self.missed_pongs,
max = self.config.max_missed,
"Keepalive pong missed"
);
}
pub fn record_activity(&mut self) {
self.last_activity = Instant::now();
if self.waiting_for_pong {
self.waiting_for_pong = false;
self.ping_sent_at = None;
self.missed_pongs = 0;
}
}
pub fn reset_misses(&mut self) {
self.missed_pongs = 0;
self.waiting_for_pong = false;
self.ping_sent_at = None;
self.last_activity = Instant::now();
info!("Keepalive miss counter reset");
}
fn update_srtt(&mut self, rtt: Duration) {
self.srtt = Some(match self.srtt {
None => rtt,
Some(srtt) => {
let s = srtt.as_nanos() as u64;
let r = rtt.as_nanos() as u64;
Duration::from_nanos(s / 8 * 7 + r / 8)
}
});
}
fn adjust_interval(&mut self, rtt: Duration) {
let min_interval = (rtt * 4).max(Duration::from_secs(10));
let new_interval = min_interval
.min(self.config.interval)
.max(Duration::from_secs(10));
if new_interval != self.adaptive_interval {
debug!(
old_secs = self.adaptive_interval.as_secs(),
new_secs = new_interval.as_secs(),
rtt_ms = rtt.as_millis(),
"Adaptive keepalive interval adjusted"
);
self.adaptive_interval = new_interval;
}
}
pub fn srtt(&self) -> Option<Duration> {
self.srtt
}
pub fn current_interval(&self) -> Duration {
self.adaptive_interval
}
pub fn missed_pongs(&self) -> u32 {
self.missed_pongs
}
pub fn is_waiting_for_pong(&self) -> bool {
self.waiting_for_pong
}
pub fn is_dead(&self) -> bool {
self.missed_pongs >= self.config.max_missed
}
pub fn total_sent(&self) -> u64 {
self.total_sent
}
pub fn total_pongs(&self) -> u64 {
self.total_pongs
}
pub fn last_pong(&self) -> Option<Instant> {
self.last_pong
}
pub fn config(&self) -> &KeepaliveConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
fn instant_manager() -> KeepaliveManager {
KeepaliveManager::new(KeepaliveConfig {
interval: Duration::from_millis(1),
timeout: Duration::from_millis(50),
max_missed: 3,
enabled: true,
adaptive: false,
})
}
#[test]
fn test_config_mobile() {
let c = KeepaliveConfig::mobile();
assert_eq!(c.interval, Duration::from_secs(20));
assert!(c.enabled);
assert!(c.adaptive);
}
#[test]
fn test_config_home() {
let c = KeepaliveConfig::home();
assert_eq!(c.interval, Duration::from_secs(60));
}
#[test]
fn test_config_corporate() {
let c = KeepaliveConfig::corporate();
assert_eq!(c.max_missed, 2);
assert!(!c.adaptive);
}
#[test]
fn test_config_disabled() {
let c = KeepaliveConfig::disabled();
assert!(!c.enabled);
}
#[test]
fn test_config_from_preset_mobile() {
let c = KeepaliveConfig::from_preset(KeepalivePreset::Mobile);
assert_eq!(c.interval, Duration::from_secs(20));
}
#[test]
fn test_config_default_is_home() {
let c = KeepaliveConfig::default();
assert_eq!(c.interval, Duration::from_secs(60));
}
#[test]
fn test_manager_new() {
let m = KeepaliveManager::new(KeepaliveConfig::default());
assert_eq!(m.missed_pongs(), 0);
assert!(!m.is_waiting_for_pong());
assert!(!m.is_dead());
assert_eq!(m.total_sent(), 0);
}
#[test]
fn test_should_send_keepalive_initially() {
let m = instant_manager();
std::thread::sleep(Duration::from_millis(5));
assert!(m.should_send_keepalive());
}
#[test]
fn test_should_not_send_while_waiting() {
let mut m = instant_manager();
std::thread::sleep(Duration::from_millis(5));
m.record_keepalive_sent();
assert!(!m.should_send_keepalive());
}
#[test]
fn test_record_keepalive_sent() {
let mut m = instant_manager();
m.record_keepalive_sent();
assert!(m.is_waiting_for_pong());
assert_eq!(m.total_sent(), 1);
}
#[test]
fn test_record_pong_received() {
let mut m = instant_manager();
m.record_keepalive_sent();
assert!(m.is_waiting_for_pong());
m.record_pong_received();
assert!(!m.is_waiting_for_pong());
assert_eq!(m.total_pongs(), 1);
assert_eq!(m.missed_pongs(), 0);
assert!(m.last_pong().is_some());
}
#[test]
fn test_srtt_updated_after_pong() {
let mut m = instant_manager();
m.record_keepalive_sent();
std::thread::sleep(Duration::from_millis(5));
m.record_pong_received();
assert!(m.srtt().is_some());
}
#[test]
fn test_record_activity_resets_wait() {
let mut m = instant_manager();
m.record_keepalive_sent();
assert!(m.is_waiting_for_pong());
m.record_activity();
assert!(!m.is_waiting_for_pong());
assert_eq!(m.missed_pongs(), 0);
}
#[test]
fn test_check_disabled() {
let mut m = KeepaliveManager::new(KeepaliveConfig::disabled());
assert_eq!(m.check(), KeepaliveAction::Idle);
}
#[test]
fn test_check_send_ping() {
let mut m = instant_manager();
std::thread::sleep(Duration::from_millis(5));
assert_eq!(m.check(), KeepaliveAction::SendPing);
}
#[test]
fn test_check_pong_timeout() {
let mut m = KeepaliveManager::new(KeepaliveConfig {
interval: Duration::from_millis(1),
timeout: Duration::from_millis(1),
max_missed: 3,
enabled: true,
adaptive: false,
});
std::thread::sleep(Duration::from_millis(5));
m.record_keepalive_sent();
std::thread::sleep(Duration::from_millis(5));
let action = m.check();
assert!(
action == KeepaliveAction::PongTimeout
|| action == KeepaliveAction::ConnectionDead
);
}
#[test]
fn test_connection_dead_after_max_missed() {
let mut m = KeepaliveManager::new(KeepaliveConfig {
interval: Duration::from_millis(1),
timeout: Duration::from_millis(1),
max_missed: 2,
enabled: true,
adaptive: false,
});
for _ in 0..2 {
std::thread::sleep(Duration::from_millis(3));
m.record_keepalive_sent();
std::thread::sleep(Duration::from_millis(3));
let action = m.check();
if action == KeepaliveAction::ConnectionDead {
assert!(m.is_dead());
return;
}
}
assert!(m.missed_pongs() > 0);
}
#[test]
fn test_reset_misses() {
let mut m = instant_manager();
m.record_pong_missed();
m.record_pong_missed();
assert_eq!(m.missed_pongs(), 2);
m.reset_misses();
assert_eq!(m.missed_pongs(), 0);
assert!(!m.is_waiting_for_pong());
}
#[test]
fn test_is_dead() {
let mut m = KeepaliveManager::new(KeepaliveConfig {
max_missed: 2,
..KeepaliveConfig::mobile()
});
assert!(!m.is_dead());
m.record_pong_missed();
assert!(!m.is_dead());
m.record_pong_missed();
assert!(m.is_dead());
}
#[test]
fn test_from_preset() {
let m = KeepaliveManager::from_preset(KeepalivePreset::Mobile);
assert_eq!(m.config().interval, Duration::from_secs(20));
}
#[test]
fn test_adaptive_interval_adjusts() {
let mut m = KeepaliveManager::new(KeepaliveConfig {
interval: Duration::from_secs(60),
timeout: Duration::from_secs(5),
max_missed: 3,
enabled: true,
adaptive: true,
});
m.record_keepalive_sent();
std::thread::sleep(Duration::from_millis(10));
m.record_pong_received();
assert!(m.current_interval() >= Duration::from_secs(10));
}
#[test]
fn test_current_interval_default() {
let m = KeepaliveManager::new(KeepaliveConfig::home());
assert_eq!(m.current_interval(), Duration::from_secs(60));
}
}