actr_runtime/lifecycle/
heartbeat.rs1use crate::lifecycle::CredentialState;
7use crate::wire::webrtc::SignalingClient;
8use actr_mailbox::Mailbox;
9use actr_protocol::{ActrId, ActrIdExt, ServiceAvailabilityState};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_util::sync::CancellationToken;
13
14const TYPICAL_CAPACITY: f32 = 1000.0;
17
18async fn get_power_reserve_and_availability(
32 mailbox: &Arc<dyn Mailbox>,
33) -> (f32, f32, ServiceAvailabilityState) {
34 let power_reserve = pwrzv::get_power_reserve_level_direct().await.unwrap_or(1.0); let mailbox_backlog = match mailbox.status().await {
41 Ok(stats) => {
42 let total_messages = (stats.queued_messages + stats.inflight_messages) as f32;
43 (total_messages / TYPICAL_CAPACITY).min(1.0)
44 }
45 Err(e) => {
46 tracing::warn!("⚠️ Failed to get mailbox stats: {}", e);
47 0.0
48 }
49 };
50
51 let availability = if power_reserve > 4.2 && mailbox_backlog < 0.5 {
56 ServiceAvailabilityState::Full
57 } else if power_reserve > 3.0 && mailbox_backlog < 0.8 {
58 ServiceAvailabilityState::Degraded
59 } else if power_reserve > 1.8 && mailbox_backlog < 0.95 {
60 ServiceAvailabilityState::Overloaded
61 } else {
62 ServiceAvailabilityState::Unavailable
63 };
64
65 (power_reserve, mailbox_backlog, availability)
66}
67
68async fn send_heartbeat_and_handle_response(
80 client: &Arc<dyn SignalingClient>,
81 actor_id: &ActrId,
82 credential_state: &CredentialState,
83 mailbox: &Arc<dyn Mailbox>,
84 heartbeat_interval: Duration,
85) {
86 let current_credential = credential_state.credential().await;
88
89 let (power_reserve, mailbox_backlog, availability) =
91 get_power_reserve_and_availability(mailbox).await;
92
93 let ping_timeout_secs = (heartbeat_interval.as_secs() as f64 * 0.4) as u64;
94 let pong_response = tokio::time::timeout(
95 Duration::from_secs(ping_timeout_secs),
96 client.send_heartbeat(
97 actor_id.clone(),
98 current_credential.clone(),
99 availability,
100 power_reserve,
101 mailbox_backlog,
102 ),
103 )
104 .await;
105
106 let pong = match pong_response {
107 Ok(Ok(pong)) => pong,
108 Ok(Err(e)) => {
109 tracing::warn!("⚠️ Failed to send heartbeat or receive Pong: {}", e);
110 return;
111 }
112 Err(_) => {
113 tracing::warn!("⚠️ Heartbeat timeout after {}s", ping_timeout_secs);
114 return;
115 }
116 };
117
118 tracing::debug!(
119 "💓 Heartbeat sent and Pong received for Actor {} (power_reserve={:.2}, mailbox_backlog={:.2}, availability={:?})",
120 actor_id.to_string_repr(),
121 power_reserve,
122 mailbox_backlog,
123 availability
124 );
125 if let Some(warning) = pong.credential_warning {
128 tracing::warn!(
129 "⚠️ Credential warning received: type={:?}, message={}",
130 warning.r#type(),
131 warning.message
132 );
133
134 tokio::spawn(credential_refresh_task(
136 client.clone(),
137 actor_id.clone(),
138 credential_state.clone(),
139 ));
140 }
141}
142
143pub async fn heartbeat_task(
156 shutdown: CancellationToken,
157 client: Arc<dyn SignalingClient>,
158 actor_id: ActrId,
159 credential_state: CredentialState,
160 mailbox: Arc<dyn Mailbox>,
161 heartbeat_interval: Duration,
162) {
163 let mut interval = tokio::time::interval(heartbeat_interval);
164
165 loop {
166 tokio::select! {
167 _ = shutdown.cancelled() => {
168 tracing::info!("💓 Heartbeat task received shutdown signal");
169 break;
170 }
171 _ = interval.tick() => {
172 send_heartbeat_and_handle_response(
173 &client,
174 &actor_id,
175 &credential_state,
176 &mailbox,
177 heartbeat_interval,
178 )
179 .await;
180 }
181 }
182 }
183 tracing::info!("✅ Heartbeat task terminated gracefully");
184}
185
186async fn credential_refresh_task(
196 client: Arc<dyn SignalingClient>,
197 actor_id: ActrId,
198 credential_state: CredentialState,
199) {
200 tracing::info!(
201 "🔑 Refreshing credential for Actor {}",
202 actor_id.to_string_repr()
203 );
204
205 match client
206 .send_credential_update_request(actor_id.clone(), credential_state.credential().await)
207 .await
208 {
209 Ok(register_response) => {
210 match register_response.result {
211 Some(actr_protocol::register_response::Result::Success(register_ok)) => {
212 let new_credential = register_ok.credential;
213 let new_expires_at = register_ok.credential_expires_at;
214 let new_psk = register_ok.psk;
215
216 credential_state
218 .update(new_credential.clone(), new_expires_at, new_psk.clone())
219 .await;
220
221 tracing::info!(
222 "✅ Credential refreshed successfully for Actor {} (new key_id: {})",
223 actor_id.serial_number,
224 new_credential.token_key_id
225 );
226
227 if new_psk.is_some() {
228 tracing::debug!("🔑 PSK updated for TURN authentication");
229 }
230
231 if let Some(expires_at) = &new_expires_at {
232 tracing::debug!("⏰ New credential expires at: {}s", expires_at.seconds);
233 }
234 }
235 Some(actr_protocol::register_response::Result::Error(err)) => {
236 tracing::error!(
237 "❌ Credential refresh failed: code={}, message={}",
238 err.code,
239 err.message
240 );
241 }
242 None => {
243 tracing::error!("❌ Credential refresh response missing result");
244 }
245 }
246 }
247 Err(e) => {
248 tracing::warn!("⚠️ Failed to send credential update request: {}", e);
249 }
250 }
251}