use actix_web::web::Data;
use anyhow::{Result, anyhow, bail};
use serde_json::json;
use std::time::Duration;
use tracing::info;
use crate::AppState;
use crate::config::Config;
const BACKUP_WORKER_NAMES: [&str; 2] = ["backup_execution_worker", "backup_schedule_worker"];
const DEFERRED_QUERY_WORKER_NAME: &str = "deferred_query_worker";
const CHAT_OUTBOX_WORKER_NAME: &str = "chat_outbox_worker";
const LEGACY_WORKER_NAMES: [&str; 4] = [
"connection_monitor",
"outbox_relay_worker",
"registry_reconnect_worker",
CHAT_OUTBOX_WORKER_NAME,
];
const TYPESENSE_WORKER_NAME: &str = "typesense_sync_worker";
pub fn spawn_api_inline_runtime(state: Data<AppState>, config: &Config) -> Vec<String> {
let mut workers = spawn_legacy_worker_runtime(state.clone());
workers.extend(spawn_deferred_query_worker_runtime(state.clone(), config));
workers.extend(spawn_backup_worker_runtime(state.clone()));
workers.extend(spawn_typesense_worker_runtime(state));
workers.sort();
workers.dedup();
workers
}
pub async fn run_legacy_worker_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
let state = context.bootstrap.app_state.clone();
let identity = super::install_process_daemon_identity("athena_daemon_legacy_workers", None)?;
let workers = spawn_legacy_worker_runtime(state.clone());
ensure_runtime_started("athena_daemon_legacy_workers", &workers)?;
crate::daemon::spawn_runtime_registry_heartbeat(
state,
"athena_daemon_legacy_workers",
Some(identity.daemon_id),
workers,
json!({
"management_mode": "legacy_workers_only",
"runtime": "athena_daemon_legacy_workers",
}),
);
wait_for_shutdown().await
}
pub async fn run_backup_worker_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
let state = context.bootstrap.app_state.clone();
let identity = super::install_process_daemon_identity("athena_backup_worker", None)?;
maybe_ensure_pg_tools_for_backup(state.get_ref()).await?;
let workers = spawn_backup_worker_runtime(state.clone());
ensure_runtime_started("athena_backup_worker", &workers)?;
crate::daemon::spawn_runtime_registry_heartbeat(
state,
"athena_backup_worker",
Some(identity.daemon_id),
workers,
json!({
"management_mode": "dedicated_worker",
"runtime": "athena_backup_worker",
}),
);
wait_for_shutdown().await
}
pub async fn run_typesense_worker_runtime(context: super::RuntimeBootstrapContext) -> Result<()> {
let state = context.bootstrap.app_state.clone();
let identity = super::install_process_daemon_identity("athena_typesense_worker", None)?;
let workers = spawn_typesense_worker_runtime(state.clone());
ensure_runtime_started("athena_typesense_worker", &workers)?;
crate::daemon::spawn_runtime_registry_heartbeat(
state,
"athena_typesense_worker",
Some(identity.daemon_id),
workers,
json!({
"management_mode": "dedicated_worker",
"runtime": "athena_typesense_worker",
}),
);
wait_for_shutdown().await
}
pub async fn run_deferred_query_worker_runtime(
context: super::RuntimeBootstrapContext,
) -> Result<()> {
let state = context.bootstrap.app_state.clone();
let identity = super::install_process_daemon_identity("athena_deferred_query_worker", None)?;
let workers = spawn_deferred_query_worker_runtime(state.clone(), &context.config);
ensure_runtime_started("athena_deferred_query_worker", &workers)?;
crate::daemon::spawn_runtime_registry_heartbeat(
state,
"athena_deferred_query_worker",
Some(identity.daemon_id),
workers,
json!({
"management_mode": "dedicated_worker",
"runtime": "athena_deferred_query_worker",
}),
);
wait_for_shutdown().await
}
pub fn spawn_legacy_worker_runtime(state: Data<AppState>) -> Vec<String> {
crate::daemon::spawn_connection_monitor(state.clone());
crate::workers::outbox_relay::spawn_outbox_relay_worker(state.clone());
crate::daemon::spawn_registry_reconnect_worker(state.clone());
crate::api::chat::runtime::spawn_chat_outbox_worker(state.clone());
let mut workers = LEGACY_WORKER_NAMES
.iter()
.map(|worker| worker.to_string())
.collect::<Vec<_>>();
#[cfg(feature = "vacuum_health")]
{
crate::features::vacuum_health::spawn_vacuum_health_collector(state);
workers.push("vacuum_health_collector".to_string());
}
workers
}
pub fn spawn_backup_worker_runtime(state: Data<AppState>) -> Vec<String> {
crate::api::backup::spawn_backup_workers(state.clone());
if crate::backup_workers_effectively_enabled(state.get_ref()) {
BACKUP_WORKER_NAMES
.iter()
.map(|worker| worker.to_string())
.collect()
} else {
Vec::new()
}
}
pub fn spawn_typesense_worker_runtime(state: Data<AppState>) -> Vec<String> {
crate::api::typesense::spawn_typesense_sync_worker(state.clone());
if state.typesense_sync_worker_enabled {
vec![TYPESENSE_WORKER_NAME.to_string()]
} else {
Vec::new()
}
}
pub fn spawn_deferred_query_worker_runtime(state: Data<AppState>, config: &Config) -> Vec<String> {
if !config.get_gateway_deferred_query_worker_enabled() {
info!("Deferred query worker disabled");
return Vec::new();
}
let deferred_worker_poll_ms = config.get_gateway_deferred_query_worker_poll_ms().max(100);
let deferred_cleanup_interval_ms = config
.get_gateway_deferred_query_cleanup_interval_ms()
.max(1_000);
let deferred_retention_secs = config.get_gateway_deferred_query_retention_secs();
let deferred_retention = if deferred_retention_secs == 0 {
None
} else {
Some(Duration::from_secs(deferred_retention_secs))
};
let gateway_log_retention_secs = config.get_gateway_log_retention_secs();
let gateway_log_retention = if gateway_log_retention_secs == 0 {
None
} else {
Some(Duration::from_secs(gateway_log_retention_secs))
};
crate::api::gateway::deferred::spawn_deferred_query_worker(
state,
true,
Duration::from_millis(deferred_worker_poll_ms),
deferred_retention,
Duration::from_millis(deferred_cleanup_interval_ms),
gateway_log_retention,
);
vec![DEFERRED_QUERY_WORKER_NAME.to_string()]
}
fn ensure_runtime_started(runtime_kind: &str, workers: &[String]) -> Result<()> {
if workers.is_empty() {
bail!(
"{runtime_kind} did not start any workers; enable the corresponding worker settings before booting this binary"
);
}
Ok(())
}
async fn maybe_ensure_pg_tools_for_backup(state: &AppState) -> Result<()> {
if crate::backup_workers_effectively_enabled(state) {
crate::utils::pg_tools::ensure_pg_tools()
.await
.map_err(|err| anyhow!("PostgreSQL tools unavailable: {err}"))?;
}
Ok(())
}
pub async fn wait_for_shutdown() -> Result<()> {
tokio::signal::ctrl_c().await?;
Ok(())
}