use crate::common::MessageParser;
use crate::transport::connection::Connection;
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
use std::time::Duration;
use tokio::time::interval;
pub struct HeartbeatManager {
interval: Duration,
timeout: Duration,
last_pong: Arc<std::sync::Mutex<Option<std::time::Instant>>>,
stop_tx: Option<mpsc::Sender<()>>,
}
impl HeartbeatManager {
pub fn new(interval: Duration, timeout: Duration) -> Self {
Self {
interval,
timeout,
last_pong: Arc::new(std::sync::Mutex::new(None)),
stop_tx: None,
}
}
pub fn start(
&mut self,
connection: Arc<Mutex<Box<dyn Connection>>>,
parser: MessageParser,
) {
let (tx, mut rx) = mpsc::channel(1);
self.stop_tx = Some(tx);
let interval_duration = self.interval;
let timeout_duration = self.timeout;
let last_pong = Arc::clone(&self.last_pong);
tokio::spawn(async move {
let mut interval_timer = interval(interval_duration);
loop {
tokio::select! {
_ = interval_timer.tick() => {
let ping_frame = crate::common::protocol::frame_with_system_command(
crate::common::protocol::ping(),
crate::common::protocol::Reliability::AtLeastOnce,
);
let data = match parser.serialize(&ping_frame) {
Ok(d) => d,
Err(_) => continue,
};
let send_result = {
let mut conn = connection.lock().await;
conn.send(&data).await
};
if send_result.is_ok() {
let should_close = {
let last = last_pong.lock().unwrap();
if let Some(last_pong_time) = *last {
last_pong_time.elapsed() > timeout_duration
} else {
false
}
};
if should_close {
{
let mut conn = connection.lock().await;
let _ = conn.close().await;
}
break;
}
}
}
_ = rx.recv() => {
break;
}
}
}
});
}
pub fn stop(&mut self) {
if let Some(tx) = self.stop_tx.take() {
let _ = tx.send(());
}
}
pub fn record_pong(&self) {
if let Ok(mut last) = self.last_pong.lock() {
*last = Some(std::time::Instant::now());
}
}
pub fn is_timeout(&self) -> bool {
if let Ok(last) = self.last_pong.lock() {
if let Some(last_pong_time) = *last {
return last_pong_time.elapsed() > self.timeout;
}
}
false
}
}