use crate::capability::manager::CapabilityManager;
use crate::core::types::{ChannelType, DeviceId, Message, MessagePayload, NetworkType};
use crate::router::selector::Router;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
const NEAR_MIN_INTERVAL: Duration = Duration::from_secs(1);
const NEAR_MAX_INTERVAL: Duration = Duration::from_secs(5);
const REMOTE_MIN_INTERVAL: Duration = Duration::from_secs(30);
const REMOTE_MAX_INTERVAL: Duration = Duration::from_secs(60);
const FAILURE_THRESHOLD: u32 = 3;
const SIGNAL_STRENGTH_NEAR_THRESHOLD: i8 = -60;
pub struct HeartbeatManager {
local_device_id: DeviceId,
router: Arc<Router>,
cap_manager: Arc<CapabilityManager>,
running_task: Option<JoinHandle<()>>,
}
impl HeartbeatManager {
pub fn new(
local_device_id: DeviceId,
router: Arc<Router>,
cap_manager: Arc<CapabilityManager>,
) -> Self {
Self {
local_device_id,
router,
cap_manager,
running_task: None,
}
}
pub fn start(&mut self) -> Option<JoinHandle<()>> {
if self.running_task.is_some() {
return None;
}
let router = self.router.clone();
let cap_manager = self.cap_manager.clone();
let local_id = self.local_device_id;
let task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_millis() as u64;
let devices = cap_manager.get_all_remote_devices();
for device_id in devices {
let channel_type = ChannelType::Internet;
if let Some(mut state) =
cap_manager.get_channel_state(&device_id, &channel_type)
{
let distance = state.distance_meters.unwrap_or(20.0);
let signal_strength = state.signal_strength.unwrap_or(-100);
let network_type = state.network_type;
let required_interval = if distance <= 10.0
|| signal_strength >= SIGNAL_STRENGTH_NEAR_THRESHOLD
|| state.rtt_ms < 100
{
let base_interval = NEAR_MIN_INTERVAL;
let max_interval = NEAR_MAX_INTERVAL;
let distance_factor = (distance / 10.0).clamp(0.0, 1.0);
let signal_factor =
((signal_strength + 100) as f32 / 40.0).clamp(0.0, 1.0);
let rtt_factor = (state.rtt_ms as f32 / 200.0).clamp(0.0, 1.0);
let factor =
distance_factor * 0.7 + signal_factor * 0.15 + rtt_factor * 0.15;
let interval_ms = base_interval.as_millis() as f32
+ (max_interval.as_millis() as f32
- base_interval.as_millis() as f32)
* factor;
Duration::from_millis(interval_ms as u64)
} else {
let base_interval = REMOTE_MIN_INTERVAL;
let max_interval = REMOTE_MAX_INTERVAL;
let network_factor = match network_type {
NetworkType::Bluetooth => 0.3, NetworkType::WiFi => 0.5,
NetworkType::Ethernet => 1.0,
_ => 0.8, };
let rtt_factor = (state.rtt_ms as f32 / 1000.0).clamp(0.0, 1.0);
let factor = (network_factor + rtt_factor) / 2.0;
let interval_ms = base_interval.as_millis() as f32
+ (max_interval.as_millis() as f32
- base_interval.as_millis() as f32)
* factor;
Duration::from_millis(interval_ms as u64)
};
let elapsed = now.saturating_sub(state.last_heartbeat);
if elapsed < required_interval.as_millis() as u64 {
continue; }
let payload = MessagePayload::Ping(now);
let msg = Message::new(local_id, device_id, payload);
state.failure_count += 1;
if state.failure_count >= FAILURE_THRESHOLD {
state.available = false;
log::warn!(
"Device {} marked unavailable ({} failures)",
device_id,
state.failure_count
);
}
cap_manager.update_channel_state(device_id, channel_type, state);
let r_clone = router.clone();
tokio::spawn(async move {
if let Ok(ch) = r_clone.select_channel(&msg).await {
let _ = ch.send(msg).await;
}
});
}
}
}
});
self.running_task = None;
Some(task)
}
pub fn stop(&mut self) {
log::info!("HeartbeatManager stop called (task managed by SDK)");
}
pub async fn handle_heartbeat(&self, message: &Message) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
match message.payload {
MessagePayload::Ping(ts) => {
let response = Message::new(
self.local_device_id,
message.sender,
MessagePayload::Pong(ts),
);
if let Ok(ch) = self.router.select_channel(&response).await {
let _ = ch.send(response).await;
}
}
MessagePayload::Pong(ts) => {
let rtt = (now.saturating_sub(ts)) as u32;
let channel_type = ChannelType::Internet;
if let Some(mut state) = self
.cap_manager
.get_channel_state(&message.sender, &channel_type)
{
state.available = true;
state.failure_count = 0;
state.last_heartbeat = now;
state.rtt_ms = (state.rtt_ms * 7 + rtt * 3) / 10;
self.cap_manager
.update_channel_state(message.sender, channel_type, state);
log::debug!("Heartbeat success: {} RTT={}ms", message.sender, rtt);
}
}
_ => {}
}
}
}