athena_rs 3.23.0

Hyper performant polyglot Database driver
Documentation
use actix_web::web::Data;
use anyhow::{Result, anyhow, bail};
use serde_json::json;
use std::time::Duration;
use tracing::info;

use crate::AppState;
use crate::config::Config;

const BACKUP_WORKER_NAMES: [&str; 2] = ["backup_execution_worker", "backup_schedule_worker"];
const DEFERRED_QUERY_WORKER_NAME: &str = "deferred_query_worker";
const CHAT_OUTBOX_WORKER_NAME: &str = "chat_outbox_worker";
const LEGACY_WORKER_NAMES: [&str; 4] = [
    "connection_monitor",
    "outbox_relay_worker",
    "registry_reconnect_worker",
    CHAT_OUTBOX_WORKER_NAME,
];
const TYPESENSE_WORKER_NAME: &str = "typesense_sync_worker";

pub fn spawn_api_inline_runtime(state: Data<AppState>, config: &Config) -> Vec<String> {
    let mut workers = spawn_legacy_worker_runtime(state.clone());
    workers.extend(spawn_deferred_query_worker_runtime(state.clone(), config));
    workers.extend(spawn_backup_worker_runtime(state.clone()));
    workers.extend(spawn_typesense_worker_runtime(state));
    workers.sort();
    workers.dedup();
    workers
}

pub async fn run_legacy_worker_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
    let state = context.bootstrap.app_state.clone();
    let identity = super::install_process_daemon_identity("athena_daemon_legacy_workers", None)?;
    let workers = spawn_legacy_worker_runtime(state.clone());
    ensure_runtime_started("athena_daemon_legacy_workers", &workers)?;
    crate::daemon::spawn_runtime_registry_heartbeat(
        state,
        "athena_daemon_legacy_workers",
        Some(identity.daemon_id),
        workers,
        json!({
            "management_mode": "legacy_workers_only",
            "runtime": "athena_daemon_legacy_workers",
        }),
    );
    wait_for_shutdown().await
}

pub async fn run_backup_worker_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
    let state = context.bootstrap.app_state.clone();
    let identity = super::install_process_daemon_identity("athena_backup_worker", None)?;
    maybe_ensure_pg_tools_for_backup(state.get_ref()).await?;
    let workers = spawn_backup_worker_runtime(state.clone());
    ensure_runtime_started("athena_backup_worker", &workers)?;
    crate::daemon::spawn_runtime_registry_heartbeat(
        state,
        "athena_backup_worker",
        Some(identity.daemon_id),
        workers,
        json!({
            "management_mode": "dedicated_worker",
            "runtime": "athena_backup_worker",
        }),
    );
    wait_for_shutdown().await
}

pub async fn run_typesense_worker_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
    let state = context.bootstrap.app_state.clone();
    let identity = super::install_process_daemon_identity("athena_typesense_worker", None)?;
    let workers = spawn_typesense_worker_runtime(state.clone());
    ensure_runtime_started("athena_typesense_worker", &workers)?;
    crate::daemon::spawn_runtime_registry_heartbeat(
        state,
        "athena_typesense_worker",
        Some(identity.daemon_id),
        workers,
        json!({
            "management_mode": "dedicated_worker",
            "runtime": "athena_typesense_worker",
        }),
    );
    wait_for_shutdown().await
}

pub async fn run_deferred_query_worker_runtime(
    context: super::RuntimeBootstrapContext,
) -> Result<()> {
    let state = context.bootstrap.app_state.clone();
    let identity = super::install_process_daemon_identity("athena_deferred_query_worker", None)?;
    let workers = spawn_deferred_query_worker_runtime(state.clone(), &context.config);
    ensure_runtime_started("athena_deferred_query_worker", &workers)?;
    crate::daemon::spawn_runtime_registry_heartbeat(
        state,
        "athena_deferred_query_worker",
        Some(identity.daemon_id),
        workers,
        json!({
            "management_mode": "dedicated_worker",
            "runtime": "athena_deferred_query_worker",
        }),
    );
    wait_for_shutdown().await
}

pub fn spawn_legacy_worker_runtime(state: Data<AppState>) -> Vec<String> {
    crate::daemon::spawn_connection_monitor(state.clone());
    crate::workers::outbox_relay::spawn_outbox_relay_worker(state.clone());
    crate::daemon::spawn_registry_reconnect_worker(state.clone());
    crate::api::chat::runtime::spawn_chat_outbox_worker(state.clone());

    let mut workers = LEGACY_WORKER_NAMES
        .iter()
        .map(|worker| worker.to_string())
        .collect::<Vec<_>>();

    #[cfg(feature = "vacuum_health")]
    {
        crate::features::vacuum_health::spawn_vacuum_health_collector(state);
        workers.push("vacuum_health_collector".to_string());
    }

    workers
}

pub fn spawn_backup_worker_runtime(state: Data<AppState>) -> Vec<String> {
    crate::api::backup::spawn_backup_workers(state.clone());
    if crate::backup_workers_effectively_enabled(state.get_ref()) {
        BACKUP_WORKER_NAMES
            .iter()
            .map(|worker| worker.to_string())
            .collect()
    } else {
        Vec::new()
    }
}

pub fn spawn_typesense_worker_runtime(state: Data<AppState>) -> Vec<String> {
    crate::api::typesense::spawn_typesense_sync_worker(state.clone());
    if state.typesense_sync_worker_enabled {
        vec![TYPESENSE_WORKER_NAME.to_string()]
    } else {
        Vec::new()
    }
}

pub fn spawn_deferred_query_worker_runtime(state: Data<AppState>, config: &Config) -> Vec<String> {
    if !config.get_gateway_deferred_query_worker_enabled() {
        info!("Deferred query worker disabled");
        return Vec::new();
    }

    let deferred_worker_poll_ms = config.get_gateway_deferred_query_worker_poll_ms().max(100);
    let deferred_cleanup_interval_ms = config
        .get_gateway_deferred_query_cleanup_interval_ms()
        .max(1_000);
    let deferred_retention_secs = config.get_gateway_deferred_query_retention_secs();
    let deferred_retention = if deferred_retention_secs == 0 {
        None
    } else {
        Some(Duration::from_secs(deferred_retention_secs))
    };
    let gateway_log_retention_secs = config.get_gateway_log_retention_secs();
    let gateway_log_retention = if gateway_log_retention_secs == 0 {
        None
    } else {
        Some(Duration::from_secs(gateway_log_retention_secs))
    };

    crate::api::gateway::deferred::spawn_deferred_query_worker(
        state,
        true,
        Duration::from_millis(deferred_worker_poll_ms),
        deferred_retention,
        Duration::from_millis(deferred_cleanup_interval_ms),
        gateway_log_retention,
    );
    vec![DEFERRED_QUERY_WORKER_NAME.to_string()]
}

fn ensure_runtime_started(runtime_kind: &str, workers: &[String]) -> Result<()> {
    if workers.is_empty() {
        bail!(
            "{runtime_kind} did not start any workers; enable the corresponding worker settings before booting this binary"
        );
    }
    Ok(())
}

async fn maybe_ensure_pg_tools_for_backup(state: &AppState) -> Result<()> {
    if crate::backup_workers_effectively_enabled(state) {
        crate::utils::pg_tools::ensure_pg_tools()
            .await
            .map_err(|err| anyhow!("PostgreSQL tools unavailable: {err}"))?;
    }
    Ok(())
}

pub async fn wait_for_shutdown() -> Result<()> {
    tokio::signal::ctrl_c().await?;
    Ok(())
}