use crate::server::connection::ConnectionManagerTrait;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tracing::{info, warn};
pub struct HeartbeatDetector {
connection_manager: Arc<dyn ConnectionManagerTrait>,
timeout: Duration,
check_interval: Duration,
stop_tx: Option<tokio::sync::mpsc::Sender<()>>,
}
impl HeartbeatDetector {
pub fn new(
connection_manager: Arc<dyn ConnectionManagerTrait>,
timeout: Duration,
check_interval: Duration,
) -> Self {
Self {
connection_manager,
timeout,
check_interval,
stop_tx: None,
}
}
pub fn start(&mut self) {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
self.stop_tx = Some(tx);
let connection_manager = Arc::clone(&self.connection_manager);
let timeout = self.timeout;
let check_interval = self.check_interval;
tokio::spawn(async move {
let mut interval_timer = interval(check_interval);
loop {
tokio::select! {
_ = interval_timer.tick() => {
let timeout_connections = connection_manager.cleanup_timeout_connections(timeout).await;
if !timeout_connections.is_empty() {
info!(
"Cleaned up {} timeout connections: {:?}",
timeout_connections.len(),
timeout_connections
);
for connection_id in timeout_connections {
if let Err(e) = connection_manager.remove_connection(&connection_id).await {
warn!("Failed to remove timeout connection {}: {:?}", connection_id, e);
}
}
}
}
_ = rx.recv() => {
break;
}
}
}
});
}
pub fn stop(&mut self) {
if let Some(tx) = self.stop_tx.take() {
let _ = tx.send(());
}
}
}