actr_runtime/lifecycle/
heartbeat.rs

1//! Heartbeat management for ActrNode
2//!
3//! This module contains functions for sending periodic heartbeat messages
4//! to the signaling server and handling responses.
5
6use crate::lifecycle::CredentialState;
7use crate::wire::webrtc::SignalingClient;
8use actr_mailbox::Mailbox;
9use actr_protocol::{ActrId, ActrIdExt, ServiceAvailabilityState};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_util::sync::CancellationToken;
13
14/// Typical mailbox capacity for backlog ratio calculation
15/// A typical_capacity of 1000 means 100 messages = 10% backlog
16const TYPICAL_CAPACITY: f32 = 1000.0;
17
18/// Get power reserve, mailbox backlog and calculate service availability
19///
20/// This function fetches the power reserve from pwrzv and mailbox backlog,
21/// then calculates the service availability state based on both metrics.
22///
23/// # Arguments
24/// * `mailbox` - Mailbox instance to get backlog statistics
25///
26/// # Returns
27/// A tuple of (power_reserve, mailbox_backlog, availability) where:
28/// - `power_reserve`: Power reserve level from pwrzv (1.0 to 5.0, where higher = more available)
29/// - `mailbox_backlog`: Mailbox backlog ratio (0.0 to 1.0, where higher = more backlog)
30/// - `availability`: Calculated ServiceAvailabilityState
31async fn get_power_reserve_and_availability(
32    mailbox: &Arc<dyn Mailbox>,
33) -> (f32, f32, ServiceAvailabilityState) {
34    // TODO: Ensure the default value is correct
35    // Get real power reserve from pwrzv (returns 1.0 to 5.0, where higher = more available)
36    let power_reserve = pwrzv::get_power_reserve_level_direct().await.unwrap_or(1.0); // Default to minimum capacity on error
37
38    // Get mailbox backlog from mailbox stats
39    // Calculate backlog ratio: (queued + inflight) / typical_capacity
40    let mailbox_backlog = match mailbox.status().await {
41        Ok(stats) => {
42            let total_messages = (stats.queued_messages + stats.inflight_messages) as f32;
43            (total_messages / TYPICAL_CAPACITY).min(1.0)
44        }
45        Err(e) => {
46            tracing::warn!("⚠️ Failed to get mailbox stats: {}", e);
47            0.0
48        }
49    };
50
51    // TODO: Improve availability calculation
52    // Determine availability based on power reserve and mailbox backlog
53    // Power reserve range: 1.0 (worst) to 5.0 (best)
54    // Thresholds adjusted for 1.0-5.0 range: 4.2 (80%), 3.0 (50%), 1.8 (20%)
55    let availability = if power_reserve > 4.2 && mailbox_backlog < 0.5 {
56        ServiceAvailabilityState::Full
57    } else if power_reserve > 3.0 && mailbox_backlog < 0.8 {
58        ServiceAvailabilityState::Degraded
59    } else if power_reserve > 1.8 && mailbox_backlog < 0.95 {
60        ServiceAvailabilityState::Overloaded
61    } else {
62        ServiceAvailabilityState::Unavailable
63    };
64
65    (power_reserve, mailbox_backlog, availability)
66}
67
68/// Send a single heartbeat and handle the Pong response
69///
70/// This function sends a heartbeat message to the signaling server,
71/// waits for the Pong response, and handles credential warnings if present.
72///
73/// # Arguments
74/// * `client` - Signaling client for sending heartbeats
75/// * `actor_id` - Actor ID for heartbeat messages
76/// * `credential_state` - Shared credential state
77/// * `mailbox` - Mailbox instance for backlog statistics
78/// * `heartbeat_interval` - Interval between heartbeats (used for timeout calculation)
79async fn send_heartbeat_and_handle_response(
80    client: &Arc<dyn SignalingClient>,
81    actor_id: &ActrId,
82    credential_state: &CredentialState,
83    mailbox: &Arc<dyn Mailbox>,
84    heartbeat_interval: Duration,
85) {
86    // Get current credential from shared state
87    let current_credential = credential_state.credential().await;
88
89    // Get power reserve, mailbox backlog and calculate availability
90    let (power_reserve, mailbox_backlog, availability) =
91        get_power_reserve_and_availability(mailbox).await;
92
93    let ping_timeout_secs = (heartbeat_interval.as_secs() as f64 * 0.4) as u64;
94    let pong_response = tokio::time::timeout(
95        Duration::from_secs(ping_timeout_secs),
96        client.send_heartbeat(
97            actor_id.clone(),
98            current_credential.clone(),
99            availability,
100            power_reserve,
101            mailbox_backlog,
102        ),
103    )
104    .await;
105
106    let pong = match pong_response {
107        Ok(Ok(pong)) => pong,
108        Ok(Err(e)) => {
109            tracing::warn!("⚠️ Failed to send heartbeat or receive Pong: {}", e);
110            return;
111        }
112        Err(_) => {
113            tracing::warn!("⚠️ Heartbeat timeout after {}s", ping_timeout_secs);
114            return;
115        }
116    };
117
118    tracing::debug!(
119        "💓 Heartbeat sent and Pong received for Actor {} (power_reserve={:.2}, mailbox_backlog={:.2}, availability={:?})",
120        actor_id.to_string_repr(),
121        power_reserve,
122        mailbox_backlog,
123        availability
124    );
125    // TODO: Handle suggest_interval_secs
126    // Handle credential_warning
127    if let Some(warning) = pong.credential_warning {
128        tracing::warn!(
129            "⚠️ Credential warning received: type={:?}, message={}",
130            warning.r#type(),
131            warning.message
132        );
133
134        // Trigger immediate credential refresh in a spawned task
135        tokio::spawn(credential_refresh_task(
136            client.clone(),
137            actor_id.clone(),
138            credential_state.clone(),
139        ));
140    }
141}
142
143/// Heartbeat task that periodically sends Ping messages to signaling server
144///
145/// This task runs in a loop, sending heartbeat messages at the specified interval
146/// and handling Pong responses, including credential warnings.
147///
148/// # Arguments
149/// * `shutdown` - Cancellation token for graceful shutdown
150/// * `client` - Signaling client for sending heartbeats
151/// * `actor_id` - Actor ID for heartbeat messages
152/// * `credential_state` - Shared credential state
153/// * `mailbox` - Mailbox instance for backlog statistics
154/// * `heartbeat_interval` - Interval between heartbeats
155pub async fn heartbeat_task(
156    shutdown: CancellationToken,
157    client: Arc<dyn SignalingClient>,
158    actor_id: ActrId,
159    credential_state: CredentialState,
160    mailbox: Arc<dyn Mailbox>,
161    heartbeat_interval: Duration,
162) {
163    let mut interval = tokio::time::interval(heartbeat_interval);
164
165    loop {
166        tokio::select! {
167            _ = shutdown.cancelled() => {
168                tracing::info!("💓 Heartbeat task received shutdown signal");
169                break;
170            }
171            _ = interval.tick() => {
172                send_heartbeat_and_handle_response(
173                    &client,
174                    &actor_id,
175                    &credential_state,
176                    &mailbox,
177                    heartbeat_interval,
178                )
179                .await;
180            }
181        }
182    }
183    tracing::info!("✅ Heartbeat task terminated gracefully");
184}
185
186/// Refresh credential for an actor
187///
188/// This function sends a credential update request to the signaling server
189/// and updates the shared credential state upon success.
190///
191/// # Arguments
192/// * `client` - Signaling client for sending credential update request
193/// * `actor_id` - Actor ID for the credential update
194/// * `credential_state` - Shared credential state to update
195async fn credential_refresh_task(
196    client: Arc<dyn SignalingClient>,
197    actor_id: ActrId,
198    credential_state: CredentialState,
199) {
200    tracing::info!(
201        "🔑 Refreshing credential for Actor {}",
202        actor_id.to_string_repr()
203    );
204
205    match client
206        .send_credential_update_request(actor_id.clone(), credential_state.credential().await)
207        .await
208    {
209        Ok(register_response) => {
210            match register_response.result {
211                Some(actr_protocol::register_response::Result::Success(register_ok)) => {
212                    let new_credential = register_ok.credential;
213                    let new_expires_at = register_ok.credential_expires_at;
214                    let new_psk = register_ok.psk;
215
216                    // Update shared state including PSK
217                    credential_state
218                        .update(new_credential.clone(), new_expires_at, new_psk.clone())
219                        .await;
220
221                    tracing::info!(
222                        "✅ Credential refreshed successfully for Actor {} (new key_id: {})",
223                        actor_id.serial_number,
224                        new_credential.token_key_id
225                    );
226
227                    if new_psk.is_some() {
228                        tracing::debug!("🔑 PSK updated for TURN authentication");
229                    }
230
231                    if let Some(expires_at) = &new_expires_at {
232                        tracing::debug!("⏰ New credential expires at: {}s", expires_at.seconds);
233                    }
234                }
235                Some(actr_protocol::register_response::Result::Error(err)) => {
236                    tracing::error!(
237                        "❌ Credential refresh failed: code={}, message={}",
238                        err.code,
239                        err.message
240                    );
241                }
242                None => {
243                    tracing::error!("❌ Credential refresh response missing result");
244                }
245            }
246        }
247        Err(e) => {
248            tracing::warn!("⚠️ Failed to send credential update request: {}", e);
249        }
250    }
251}