athena_rs 3.26.2

Hyper performant polyglot Database driver
Documentation
use std::time::Duration;

use actix_web::web::Data;
use athena_control_plane::{
    DaemonId, DaemonRegistryService, DaemonRuntimeHeartbeat, resolve_runtime_daemon_id,
    resolve_runtime_host,
};
use chrono::Utc;
use serde_json::Value;
use tracing::{debug, warn};

use crate::AppState;

const DAEMON_HEARTBEAT_INTERVAL_SECS: u64 = 15;

pub fn resolve_process_daemon_id(runtime_kind: &str, override_value: Option<&str>) -> DaemonId {
    resolve_runtime_daemon_id(runtime_kind, override_value)
        .expect("runtime daemon id resolution should not fail for Athena runtimes")
}

pub fn spawn_runtime_registry_heartbeat(
    state: Data<AppState>,
    runtime_kind: &'static str,
    daemon_id: Option<DaemonId>,
    workers: Vec<String>,
    metadata: Value,
) -> DaemonId {
    let daemon_id = daemon_id.unwrap_or_else(|| resolve_process_daemon_id(runtime_kind, None));
    let heartbeat = DaemonRuntimeHeartbeat::new(
        daemon_id.clone(),
        runtime_kind,
        resolve_runtime_host(),
        env!("CARGO_PKG_VERSION"),
        workers,
        metadata,
        Utc::now(),
    )
    .expect("daemon runtime heartbeat inputs should be valid");

    actix_web::rt::spawn(async move {
        publish_runtime_heartbeat(state.get_ref(), &heartbeat).await;

        let mut ticker = tokio::time::interval(Duration::from_secs(DAEMON_HEARTBEAT_INTERVAL_SECS));
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        loop {
            ticker.tick().await;
            publish_runtime_heartbeat(state.get_ref(), &heartbeat).await;
        }
    });

    daemon_id
}

async fn publish_runtime_heartbeat(state: &AppState, heartbeat: &DaemonRuntimeHeartbeat) {
    let Some(logging_client_name) = state.logging_client_name.as_ref() else {
        debug!("Skipping daemon runtime heartbeat because no logging client is configured");
        return;
    };
    let Some(logging_pool) = state.pg_registry.get_pool(logging_client_name) else {
        debug!(
            client = %logging_client_name,
            "Skipping daemon runtime heartbeat because logging pool is unavailable"
        );
        return;
    };

    if let Err(err) = DaemonRegistryService::new(logging_pool)
        .upsert_heartbeat(heartbeat)
        .await
    {
        warn!(
            daemon_id = %heartbeat.daemon_id,
            runtime_kind = %heartbeat.runtime_kind,
            error = %err,
            "Failed to publish daemon runtime heartbeat"
        );
    }
}