use std::time::Duration;
use actix_web::web::Data;
use athena_control_plane::{
DaemonId, DaemonRegistryService, DaemonRuntimeHeartbeat, resolve_runtime_daemon_id,
resolve_runtime_host,
};
use chrono::Utc;
use serde_json::Value;
use tracing::{debug, warn};
use crate::AppState;
const DAEMON_HEARTBEAT_INTERVAL_SECS: u64 = 15;
pub fn resolve_process_daemon_id(runtime_kind: &str, override_value: Option<&str>) -> DaemonId {
resolve_runtime_daemon_id(runtime_kind, override_value)
.expect("runtime daemon id resolution should not fail for Athena runtimes")
}
pub fn spawn_runtime_registry_heartbeat(
state: Data<AppState>,
runtime_kind: &'static str,
daemon_id: Option<DaemonId>,
workers: Vec<String>,
metadata: Value,
) -> DaemonId {
let daemon_id = daemon_id.unwrap_or_else(|| resolve_process_daemon_id(runtime_kind, None));
let heartbeat = DaemonRuntimeHeartbeat::new(
daemon_id.clone(),
runtime_kind,
resolve_runtime_host(),
env!("CARGO_PKG_VERSION"),
workers,
metadata,
Utc::now(),
)
.expect("daemon runtime heartbeat inputs should be valid");
actix_web::rt::spawn(async move {
publish_runtime_heartbeat(state.get_ref(), &heartbeat).await;
let mut ticker = tokio::time::interval(Duration::from_secs(DAEMON_HEARTBEAT_INTERVAL_SECS));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
publish_runtime_heartbeat(state.get_ref(), &heartbeat).await;
}
});
daemon_id
}
async fn publish_runtime_heartbeat(state: &AppState, heartbeat: &DaemonRuntimeHeartbeat) {
let Some(logging_client_name) = state.logging_client_name.as_ref() else {
debug!("Skipping daemon runtime heartbeat because no logging client is configured");
return;
};
let Some(logging_pool) = state.pg_registry.get_pool(logging_client_name) else {
debug!(
client = %logging_client_name,
"Skipping daemon runtime heartbeat because logging pool is unavailable"
);
return;
};
if let Err(err) = DaemonRegistryService::new(logging_pool)
.upsert_heartbeat(heartbeat)
.await
{
warn!(
daemon_id = %heartbeat.daemon_id,
runtime_kind = %heartbeat.runtime_kind,
error = %err,
"Failed to publish daemon runtime heartbeat"
);
}
}