use crate::ais_client::AisClient;
use crate::lifecycle::CredentialState;
use crate::transport::NetworkError;
use crate::wire::webrtc::SignalingClient;
use crate::wire::webrtc::{HookCallback, HookEvent};
use actr_protocol::{ActrId, RegisterRequest, ServiceAvailabilityState};
use actr_runtime_mailbox::Mailbox;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio_util::sync::CancellationToken;
fn expiry_to_system_time(expires_at: &prost_types::Timestamp) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_secs(expires_at.seconds.max(0) as u64)
}
async fn fire_hook(cb: Option<&HookCallback>, event: HookEvent) {
if let Some(cb) = cb {
cb(event).await;
}
}
const TYPICAL_CAPACITY: f32 = 1000.0;
const HEARTBEAT_FAILURE_LOG_EVERY: u64 = 12;
fn should_log_heartbeat_failure(consecutive_failures: u64) -> bool {
consecutive_failures == 1 || consecutive_failures.is_multiple_of(HEARTBEAT_FAILURE_LOG_EVERY)
}
async fn get_power_reserve_and_availability(
mailbox: &Arc<dyn Mailbox>,
) -> (f32, f32, ServiceAvailabilityState) {
let power_reserve = pwrzv::get_power_reserve_level_direct().await.unwrap_or(1.0);
let mailbox_backlog = match mailbox.status().await {
Ok(stats) => {
let total_messages = (stats.queued_messages + stats.inflight_messages) as f32;
(total_messages / TYPICAL_CAPACITY).min(1.0)
}
Err(e) => {
tracing::warn!("⚠️ Failed to get mailbox stats: {}", e);
0.0
}
};
let availability = if power_reserve > 4.2 && mailbox_backlog < 0.5 {
ServiceAvailabilityState::Full
} else if power_reserve > 3.0 && mailbox_backlog < 0.8 {
ServiceAvailabilityState::Degraded
} else if power_reserve > 1.8 && mailbox_backlog < 0.95 {
ServiceAvailabilityState::Overloaded
} else {
ServiceAvailabilityState::Unavailable
};
(power_reserve, mailbox_backlog, availability)
}
#[allow(clippy::too_many_arguments)]
async fn send_heartbeat_and_handle_response(
client: &Arc<dyn SignalingClient>,
actor_id: &ActrId,
credential_state: &CredentialState,
mailbox: &Arc<dyn Mailbox>,
heartbeat_interval: Duration,
register_request: &RegisterRequest,
consecutive_failures: &mut u64,
ais_endpoint: &str,
hook_callback: Option<&HookCallback>,
) -> Option<ActrId> {
let current_credential = credential_state.credential().await;
let (power_reserve, mailbox_backlog, availability) =
get_power_reserve_and_availability(mailbox).await;
let ping_timeout_secs = (heartbeat_interval.as_secs() as f64 * 0.4) as u64;
let pong_response = tokio::time::timeout(
Duration::from_secs(ping_timeout_secs),
client.send_heartbeat(
actor_id.clone(),
current_credential.clone(),
availability,
power_reserve,
mailbox_backlog,
),
)
.await;
let pong = match pong_response {
Ok(Ok(pong)) => pong,
Ok(Err(NetworkError::CredentialExpired(msg))) => {
tracing::warn!(
"⚠️ Credential expired during heartbeat: {}. Attempting re-registration.",
msg
);
if let Some(expires_at) = credential_state.expires_at().await {
fire_hook(
hook_callback,
HookEvent::CredentialExpiring {
new_expiry: expiry_to_system_time(&expires_at),
},
)
.await;
}
let new_actor_id = re_register_task(
client.clone(),
actor_id.clone(),
register_request.clone(),
credential_state.clone(),
ais_endpoint.to_string(),
hook_callback.cloned(),
)
.await;
if &new_actor_id != actor_id {
return Some(new_actor_id);
}
return None;
}
Ok(Err(e)) => {
*consecutive_failures += 1;
if should_log_heartbeat_failure(*consecutive_failures) {
tracing::warn!(
consecutive_failures = *consecutive_failures,
"⚠️ Failed to send heartbeat or receive Pong: {}",
e
);
} else {
tracing::debug!(
consecutive_failures = *consecutive_failures,
"Suppressed repeated heartbeat failure: {}",
e
);
}
return None;
}
Err(_) => {
*consecutive_failures += 1;
if should_log_heartbeat_failure(*consecutive_failures) {
tracing::warn!(
consecutive_failures = *consecutive_failures,
"⚠️ Heartbeat timeout after {}s",
ping_timeout_secs
);
} else {
tracing::debug!(
consecutive_failures = *consecutive_failures,
"Suppressed repeated heartbeat timeout after {}s",
ping_timeout_secs
);
}
return None;
}
};
if *consecutive_failures > 0 {
tracing::info!(
consecutive_failures = *consecutive_failures,
"✅ Heartbeat recovered after consecutive failures"
);
*consecutive_failures = 0;
}
tracing::trace!(
"💓 Heartbeat sent and Pong received for Actor {} (power_reserve={:.2}, mailbox_backlog={:.2}, availability={:?})",
actor_id,
power_reserve,
mailbox_backlog,
availability
);
if let Some(warning) = pong.credential_warning {
tracing::warn!(
"⚠️ Credential warning received: type={:?}, message={}",
warning.r#type(),
warning.message
);
if let Some(expires_at) = credential_state.expires_at().await {
fire_hook(
hook_callback,
HookEvent::CredentialExpiring {
new_expiry: expiry_to_system_time(&expires_at),
},
)
.await;
}
tokio::spawn(credential_refresh_task(
client.clone(),
actor_id.clone(),
credential_state.clone(),
hook_callback.cloned(),
));
}
None
}
#[allow(clippy::too_many_arguments)]
pub async fn heartbeat_task(
shutdown: CancellationToken,
client: Arc<dyn SignalingClient>,
actor_id: ActrId,
credential_state: CredentialState,
mailbox: Arc<dyn Mailbox>,
heartbeat_interval: Duration,
register_request: RegisterRequest,
ais_endpoint: String,
hook_callback: Option<HookCallback>,
) {
let mut interval = tokio::time::interval(heartbeat_interval);
let mut actor_id = actor_id;
let mut consecutive_failures = 0;
loop {
tokio::select! {
_ = shutdown.cancelled() => {
tracing::info!("💓 Heartbeat task received shutdown signal");
break;
}
_ = interval.tick() => {
if let Some(new_actor_id) = send_heartbeat_and_handle_response(
&client,
&actor_id,
&credential_state,
&mailbox,
heartbeat_interval,
®ister_request,
&mut consecutive_failures,
&ais_endpoint,
hook_callback.as_ref(),
)
.await {
tracing::info!(
"🔄 Heartbeat actor_id updated: {} -> {}",
actor_id,
new_actor_id,
);
actor_id = new_actor_id;
}
}
}
}
tracing::info!("✅ Heartbeat task terminated gracefully");
}
async fn credential_refresh_task(
client: Arc<dyn SignalingClient>,
actor_id: ActrId,
credential_state: CredentialState,
hook_callback: Option<HookCallback>,
) {
tracing::info!("🔑 Refreshing credential for Actor {}", actor_id);
match client
.send_credential_update_request(actor_id.clone(), credential_state.credential().await)
.await
{
Ok(register_response) => {
match register_response.result {
Some(actr_protocol::register_response::Result::Success(register_ok)) => {
let new_credential = register_ok.credential;
let new_expires_at = register_ok.credential_expires_at;
let new_turn_credential = Some(register_ok.turn_credential);
credential_state
.update(new_credential.clone(), new_expires_at, new_turn_credential)
.await;
tracing::info!(
"✅ Credential refreshed successfully for Actor {}",
actor_id,
);
tracing::debug!("TurnCredential updated, TURN authentication ready");
if let Some(expires_at) = &new_expires_at {
tracing::debug!("⏰ New credential expires at: {}s", expires_at.seconds);
fire_hook(
hook_callback.as_ref(),
HookEvent::CredentialRenewed {
new_expiry: expiry_to_system_time(expires_at),
},
)
.await;
}
}
Some(actr_protocol::register_response::Result::Error(err)) => {
tracing::error!(
"❌ Credential refresh failed: code={}, message={}",
err.code,
err.message
);
}
None => {
tracing::error!("❌ Credential refresh response missing result");
}
}
}
Err(e) => {
tracing::warn!("⚠️ Failed to send credential update request: {}", e);
}
}
}
async fn re_register_task(
client: Arc<dyn SignalingClient>,
actor_id: ActrId,
register_request: RegisterRequest,
credential_state: CredentialState,
ais_endpoint: String,
hook_callback: Option<HookCallback>,
) -> ActrId {
tracing::info!(
"🔄 Re-registering actor {} after credential expiry via AIS HTTP (type: {}/{})",
actor_id,
register_request.actr_type.manufacturer,
register_request.actr_type.name
);
let ais = AisClient::new(&ais_endpoint);
let resp = match ais.register_with_manifest(register_request.clone()).await {
Ok(resp) => resp,
Err(e) => {
tracing::error!("❌ AIS re-registration HTTP request failed: {}", e);
return actor_id;
}
};
match resp.result {
Some(actr_protocol::register_response::Result::Success(register_ok)) => {
let new_actor_id = register_ok.actr_id.clone();
let new_credential = register_ok.credential;
let new_expires_at = register_ok.credential_expires_at;
let new_turn_credential = Some(register_ok.turn_credential);
credential_state
.update(new_credential.clone(), new_expires_at, new_turn_credential)
.await;
client.clear_identity().await;
tracing::info!("🔌 Disconnecting signaling client to refresh session");
if let Err(e) = client.disconnect().await {
tracing::warn!("⚠️ Disconnect failed (non-fatal, continuing): {}", e);
}
client.set_actor_id(new_actor_id.clone()).await;
client.set_credential_state(credential_state.clone()).await;
tracing::info!("🔗 Reconnecting signaling client with new credential");
if let Err(e) = client.connect().await {
tracing::error!("❌ Failed to reconnect after re-registration: {}", e);
}
tracing::info!(
"✅ Re-registration successful via AIS HTTP (ActrId: {})",
new_actor_id,
);
tracing::debug!("TurnCredential updated, TURN authentication ready");
if let Some(expires_at) = &new_expires_at {
tracing::debug!("⏰ New credential expires at: {}s", expires_at.seconds);
fire_hook(
hook_callback.as_ref(),
HookEvent::CredentialRenewed {
new_expiry: expiry_to_system_time(expires_at),
},
)
.await;
}
new_actor_id
}
Some(actr_protocol::register_response::Result::Error(err)) => {
tracing::error!(
"❌ AIS re-registration failed: code={}, message={}",
err.code,
err.message
);
actor_id
}
None => {
tracing::error!("❌ AIS re-registration response missing result");
actor_id
}
}
}