use athena_rs::client::backend::QueryResult;
use athena_rs::AthenaClient;
use chrono::{DateTime, Local, Utc};
use once_cell::sync::Lazy;
use serde_json::{json, Value};
use std::collections::HashSet;
use std::env;
use std::future::Future;
use std::path::{Path, PathBuf};
use tokio::sync::Mutex;
use tracing::warn;
use crate::logging::{LogEntry, LogLevel};
use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
use crate::utils::{find_xbp_config_upwards, parse_config_with_auto_heal};
const DB_NOT_CONFIGURED: &str = "athena database is not configured";
const DEFAULT_BACKEND: &str = "supabase";
const DEFAULT_SCHEMA: &str = "public";
const BOOTSTRAP_SQL: &str = include_str!("../../sql/schema.sql");
static BOOTSTRAPPED_BACKENDS: Lazy<Mutex<HashSet<String>>> =
Lazy::new(|| Mutex::new(HashSet::new()));
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AthenaRuntimeConfig {
pub backend: String,
pub url: String,
pub key: String,
pub schema: String,
}
#[derive(Debug, Clone)]
struct ProjectContext {
project_root: PathBuf,
config: XbpConfig,
config_kind: String,
}
pub async fn persist_project_snapshot(
project_root: &Path,
config: &XbpConfig,
config_kind: Option<&str>,
) {
let _ = with_fail_open(
"persist project snapshot",
persist_project_snapshot_inner(project_root, config, config_kind),
)
.await;
}
pub async fn persist_log_entry(entry: &LogEntry) {
let _ = with_fail_open("persist xbp log entry", persist_log_entry_inner(entry)).await;
}
pub async fn persist_nginx_config_snapshot(
domain: &str,
config_path: &Path,
content: &str,
upstream_ports: &[u16],
listen_ports: &[u16],
source: &str,
) {
let _ = with_fail_open(
"persist nginx config snapshot",
persist_nginx_config_snapshot_inner(
domain,
config_path,
content,
upstream_ports,
listen_ports,
source,
),
)
.await;
}
pub async fn persist_nginx_log(
domain: Option<&str>,
action: &str,
success: bool,
message: &str,
details: Option<&str>,
metadata: Value,
) {
let _ = with_fail_open(
"persist nginx log",
persist_nginx_log_inner(domain, action, success, message, details, metadata),
)
.await;
}
pub async fn persist_nginx_edit_audit_log(
domain: Option<&str>,
config_path: Option<&Path>,
actor: Option<&str>,
action: &str,
old_content: Option<&str>,
new_content: Option<&str>,
metadata: Value,
) {
let _ = with_fail_open(
"persist nginx edit audit log",
persist_nginx_edit_audit_log_inner(
domain,
config_path,
actor,
action,
old_content,
new_content,
metadata,
),
)
.await;
}
pub async fn persist_docker_container_snapshot(
container_id: &str,
container_name: &str,
status: Option<&str>,
ports: Option<&str>,
metadata: Value,
) {
let _ = with_fail_open(
"persist docker container snapshot",
persist_docker_container_snapshot_inner(
container_id,
container_name,
status,
ports,
metadata,
),
)
.await;
}
pub async fn persist_docker_log(
container_id: Option<&str>,
command: Option<&str>,
stream: &str,
message: &str,
metadata: Value,
) {
let _ = with_fail_open(
"persist docker log",
persist_docker_log_inner(container_id, command, stream, message, metadata),
)
.await;
}
pub async fn persist_schedule(
schedule_type: &str,
target_kind: &str,
target_ref: Option<&str>,
expression: &str,
enabled: bool,
metadata: Value,
) {
let _ = with_fail_open(
"persist schedule",
persist_schedule_inner(
schedule_type,
target_kind,
target_ref,
expression,
enabled,
metadata,
),
)
.await;
}
pub fn extract_cron_restart_expression(args: &[String]) -> Option<String> {
for (index, arg) in args.iter().enumerate() {
if arg == "--cron-restart" {
if let Some(value) = args.get(index + 1) {
let value = value.trim();
if !value.is_empty() {
return Some(value.to_string());
}
}
} else if let Some(value) = arg.strip_prefix("--cron-restart=") {
let value = value.trim();
if !value.is_empty() {
return Some(value.to_string());
}
}
}
None
}
pub fn resolve_runtime_config(config: Option<&XbpConfig>) -> Option<AthenaRuntimeConfig> {
let database = config.and_then(|cfg| cfg.database.as_ref());
let enabled = database.and_then(|db| db.enabled).unwrap_or(true);
if !enabled {
return None;
}
let backend = value_or_default(
database.and_then(|db| db.backend.as_deref()),
env::var("XBP_ATHENA_BACKEND").ok().as_deref(),
DEFAULT_BACKEND,
);
let (url, key) = resolve_connection_pair(database)?;
let schema = sanitize_identifier(
value_or_default(
database.and_then(|db| db.schema.as_deref()),
None,
DEFAULT_SCHEMA,
)
.as_str(),
)
.unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
Some(AthenaRuntimeConfig {
backend,
url,
key,
schema,
})
}
async fn persist_project_snapshot_inner(
project_root: &Path,
config: &XbpConfig,
config_kind: Option<&str>,
) -> Result<(), String> {
let (client, runtime) = initialize_client(Some(config)).await?;
let _ =
upsert_project_snapshot_with_client(&client, &runtime, project_root, config, config_kind)
.await?;
Ok(())
}
async fn persist_log_entry_inner(entry: &LogEntry) -> Result<(), String> {
let context = load_current_project_context();
let config_ref = context.as_ref().map(|ctx| &ctx.config);
let (client, runtime) = initialize_client(config_ref).await?;
let project_id = if let Some(ctx) = context.as_ref() {
Some(
upsert_project_snapshot_with_client(
&client,
&runtime,
&ctx.project_root,
&ctx.config,
Some(ctx.config_kind.as_str()),
)
.await?,
)
} else {
None
};
let log_table = qualified_table(&runtime.schema, "xbp_logs");
let timestamp = to_rfc3339(entry.timestamp);
let metadata = json!({
"project_name": context.as_ref().map(|ctx| ctx.config.project_name.clone()),
"project_path": context.as_ref().map(|ctx| ctx.project_root.display().to_string()),
});
let global_sql = format!(
"INSERT INTO {table} (log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
VALUES ({log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now()) \
RETURNING log_id",
table = log_table,
log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
command = sql_literal(&Value::String(entry.command.clone())),
message = sql_literal(&Value::String(entry.message.clone())),
details = optional_text_literal(entry.details.as_deref()),
duration = optional_u64_literal(entry.duration_ms),
occurred_at = sql_literal(&Value::String(timestamp)),
metadata = sql_literal(&metadata),
);
let global_result = execute_sql(&client, &global_sql).await?;
let global_log_id = query_first_column_as_string(&global_result, "log_id");
if let (Some(project_id), Some(global_log_id)) =
(project_id.as_deref(), global_log_id.as_deref())
{
let project_table = qualified_table(&runtime.schema, "xbp_project_logs");
let project_sql = format!(
"INSERT INTO {table} (project_id, global_log_id, log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
VALUES ({project_id}::uuid, {global_log_id}::uuid, {log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now())",
table = project_table,
project_id = sql_literal(&Value::String(project_id.to_string())),
global_log_id = sql_literal(&Value::String(global_log_id.to_string())),
log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
command = sql_literal(&Value::String(entry.command.clone())),
message = sql_literal(&Value::String(entry.message.clone())),
details = optional_text_literal(entry.details.as_deref()),
duration = optional_u64_literal(entry.duration_ms),
occurred_at = sql_literal(&Value::String(to_rfc3339(entry.timestamp))),
metadata = sql_literal(&metadata),
);
let _ = execute_sql(&client, &project_sql).await?;
}
Ok(())
}
async fn persist_nginx_config_snapshot_inner(
domain: &str,
config_path: &Path,
content: &str,
upstream_ports: &[u16],
listen_ports: &[u16],
source: &str,
) -> Result<(), String> {
let (client, runtime, project_id) = initialize_client_with_project_context().await?;
let table = qualified_table(&runtime.schema, "xbp_nginx_configs");
let metadata = json!({ "source": source });
let upstream = Value::Array(
upstream_ports
.iter()
.map(|port| Value::from(*port as u64))
.collect(),
);
let listen = Value::Array(
listen_ports
.iter()
.map(|port| Value::from(*port as u64))
.collect(),
);
let sql = format!(
"INSERT INTO {table} (project_id, domain, config_path, content, upstream_ports, listen_ports, metadata, updated_at) \
VALUES ({project_id}, {domain}, {config_path}, {content}, {upstream_ports}, {listen_ports}, {metadata}, now()) \
ON CONFLICT (domain, config_path) DO UPDATE SET \
project_id = EXCLUDED.project_id, \
content = EXCLUDED.content, \
upstream_ports = EXCLUDED.upstream_ports, \
listen_ports = EXCLUDED.listen_ports, \
metadata = EXCLUDED.metadata, \
updated_at = now()",
table = table,
project_id = optional_uuid_literal(project_id.as_deref()),
domain = sql_literal(&Value::String(domain.to_string())),
config_path = sql_literal(&Value::String(config_path.display().to_string())),
content = sql_literal(&Value::String(content.to_string())),
upstream_ports = sql_literal(&upstream),
listen_ports = sql_literal(&listen),
metadata = sql_literal(&metadata),
);
let _ = execute_sql(&client, &sql).await?;
Ok(())
}
async fn persist_nginx_log_inner(
domain: Option<&str>,
action: &str,
success: bool,
message: &str,
details: Option<&str>,
metadata: Value,
) -> Result<(), String> {
let (client, runtime, project_id) = initialize_client_with_project_context().await?;
let table = qualified_table(&runtime.schema, "xbp_nginx_logs");
let sql = format!(
"INSERT INTO {table} (project_id, domain, action, success, message, details, occurred_at, metadata, updated_at) \
VALUES ({project_id}, {domain}, {action}, {success}, {message}, {details}, now(), {metadata}, now())",
table = table,
project_id = optional_uuid_literal(project_id.as_deref()),
domain = optional_text_literal(domain),
action = sql_literal(&Value::String(action.to_string())),
success = if success { "true" } else { "false" },
message = sql_literal(&Value::String(message.to_string())),
details = optional_text_literal(details),
metadata = sql_literal(&metadata),
);
let _ = execute_sql(&client, &sql).await?;
Ok(())
}
async fn persist_nginx_edit_audit_log_inner(
domain: Option<&str>,
config_path: Option<&Path>,
actor: Option<&str>,
action: &str,
old_content: Option<&str>,
new_content: Option<&str>,
metadata: Value,
) -> Result<(), String> {
let (client, runtime, project_id) = initialize_client_with_project_context().await?;
let table = qualified_table(&runtime.schema, "xbp_nginx_edit_audit_logs");
let sql = format!(
"INSERT INTO {table} (project_id, domain, config_path, actor, action, old_content, new_content, occurred_at, metadata, updated_at) \
VALUES ({project_id}, {domain}, {config_path}, {actor}, {action}, {old_content}, {new_content}, now(), {metadata}, now())",
table = table,
project_id = optional_uuid_literal(project_id.as_deref()),
domain = optional_text_literal(domain),
config_path = config_path
.map(|path| sql_literal(&Value::String(path.display().to_string())))
.unwrap_or_else(|| "NULL".to_string()),
actor = optional_text_literal(actor),
action = sql_literal(&Value::String(action.to_string())),
old_content = optional_text_literal(old_content),
new_content = optional_text_literal(new_content),
metadata = sql_literal(&metadata),
);
let _ = execute_sql(&client, &sql).await?;
Ok(())
}
async fn persist_docker_container_snapshot_inner(
container_id: &str,
container_name: &str,
status: Option<&str>,
ports: Option<&str>,
metadata: Value,
) -> Result<(), String> {
let (client, runtime, project_id) = initialize_client_with_project_context().await?;
let table = qualified_table(&runtime.schema, "xbp_docker_containers");
let sql = format!(
"INSERT INTO {table} (project_id, container_id, container_name, status, ports, inspected_at, metadata, updated_at) \
VALUES ({project_id}, {container_id}, {container_name}, {status}, {ports}, now(), {metadata}, now()) \
ON CONFLICT (container_id) DO UPDATE SET \
project_id = EXCLUDED.project_id, \
container_name = EXCLUDED.container_name, \
status = EXCLUDED.status, \
ports = EXCLUDED.ports, \
inspected_at = EXCLUDED.inspected_at, \
metadata = EXCLUDED.metadata, \
updated_at = now()",
table = table,
project_id = optional_uuid_literal(project_id.as_deref()),
container_id = sql_literal(&Value::String(container_id.to_string())),
container_name = sql_literal(&Value::String(container_name.to_string())),
status = optional_text_literal(status),
ports = optional_text_literal(ports),
metadata = sql_literal(&metadata),
);
let _ = execute_sql(&client, &sql).await?;
Ok(())
}
async fn persist_docker_log_inner(
container_id: Option<&str>,
command: Option<&str>,
stream: &str,
message: &str,
metadata: Value,
) -> Result<(), String> {
let (client, runtime, project_id) = initialize_client_with_project_context().await?;
let table = qualified_table(&runtime.schema, "xbp_docker_logs");
let sql = format!(
"INSERT INTO {table} (project_id, container_id, command, stream, message, occurred_at, metadata, updated_at) \
VALUES ({project_id}, {container_id}, {command}, {stream}, {message}, now(), {metadata}, now())",
table = table,
project_id = optional_uuid_literal(project_id.as_deref()),
container_id = optional_text_literal(container_id),
command = optional_text_literal(command),
stream = sql_literal(&Value::String(stream.to_string())),
message = sql_literal(&Value::String(message.to_string())),
metadata = sql_literal(&metadata),
);
let _ = execute_sql(&client, &sql).await?;
Ok(())
}
async fn persist_schedule_inner(
schedule_type: &str,
target_kind: &str,
target_ref: Option<&str>,
expression: &str,
enabled: bool,
metadata: Value,
) -> Result<(), String> {
let (client, runtime, project_id) = initialize_client_with_project_context().await?;
let table = qualified_table(&runtime.schema, "xbp_schedules");
let normalized_target_ref = target_ref.unwrap_or("");
let sql = format!(
"INSERT INTO {table} (project_id, schedule_type, target_kind, target_ref, expression, timezone, enabled, metadata, occurred_at, updated_at) \
VALUES ({project_id}, {schedule_type}, {target_kind}, {target_ref}, {expression}, {timezone}, {enabled}, {metadata}, now(), now()) \
ON CONFLICT (schedule_type, target_kind, target_ref, expression) DO UPDATE SET \
project_id = EXCLUDED.project_id, \
timezone = EXCLUDED.timezone, \
enabled = EXCLUDED.enabled, \
metadata = EXCLUDED.metadata, \
occurred_at = EXCLUDED.occurred_at, \
updated_at = now()",
table = table,
project_id = optional_uuid_literal(project_id.as_deref()),
schedule_type = sql_literal(&Value::String(schedule_type.to_string())),
target_kind = sql_literal(&Value::String(target_kind.to_string())),
target_ref = sql_literal(&Value::String(normalized_target_ref.to_string())),
expression = sql_literal(&Value::String(expression.to_string())),
timezone = sql_literal(&Value::String("UTC".to_string())),
enabled = if enabled { "true" } else { "false" },
metadata = sql_literal(&metadata),
);
let _ = execute_sql(&client, &sql).await?;
Ok(())
}
async fn initialize_client_with_project_context(
) -> Result<(AthenaClient, AthenaRuntimeConfig, Option<String>), String> {
let context = load_current_project_context();
let config_ref = context.as_ref().map(|ctx| &ctx.config);
let (client, runtime) = initialize_client(config_ref).await?;
let project_id = if let Some(ctx) = context {
Some(
upsert_project_snapshot_with_client(
&client,
&runtime,
&ctx.project_root,
&ctx.config,
Some(ctx.config_kind.as_str()),
)
.await?,
)
} else {
None
};
Ok((client, runtime, project_id))
}
async fn initialize_client(
config: Option<&XbpConfig>,
) -> Result<(AthenaClient, AthenaRuntimeConfig), String> {
let runtime = resolve_runtime_config(config).ok_or_else(|| DB_NOT_CONFIGURED.to_string())?;
let client = AthenaClient::new_with_backend_name(
runtime.url.clone(),
runtime.key.clone(),
&runtime.backend,
)
.await
.map_err(|err| format!("failed to initialize Athena client: {err}"))?;
ensure_schema_bootstrapped(&client, &runtime).await?;
Ok((client, runtime))
}
async fn ensure_schema_bootstrapped(
client: &AthenaClient,
runtime: &AthenaRuntimeConfig,
) -> Result<(), String> {
let key = format!("{}|{}|{}", runtime.backend, runtime.url, runtime.schema);
let mut guard = BOOTSTRAPPED_BACKENDS.lock().await;
if guard.contains(&key) {
return Ok(());
}
let schema = sanitize_identifier(&runtime.schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS {}", schema);
let _ = execute_sql(client, &create_schema_sql).await?;
for statement in BOOTSTRAP_SQL.split(';') {
let trimmed = statement.trim();
if trimmed.is_empty() {
continue;
}
let sql = if schema == DEFAULT_SCHEMA {
format!("{};", trimmed)
} else {
format!("SET search_path TO {}; {};", schema, trimmed)
};
let _ = execute_sql(client, &sql).await?;
}
guard.insert(key);
Ok(())
}
async fn upsert_project_snapshot_with_client(
client: &AthenaClient,
runtime: &AthenaRuntimeConfig,
project_root: &Path,
config: &XbpConfig,
config_kind: Option<&str>,
) -> Result<String, String> {
let project_table = qualified_table(&runtime.schema, "xbp_projects");
let metadata = build_project_metadata(config);
let project_sql = format!(
"INSERT INTO {table} (project_name, project_path, version, build_dir, port, app_type, branch, target, config_kind, metadata, updated_at) \
VALUES ({project_name}, {project_path}, {version}, {build_dir}, {port}, {app_type}, {branch}, {target}, {config_kind}, {metadata}, now()) \
ON CONFLICT (project_path) DO UPDATE SET \
project_name = EXCLUDED.project_name, \
version = EXCLUDED.version, \
build_dir = EXCLUDED.build_dir, \
port = EXCLUDED.port, \
app_type = EXCLUDED.app_type, \
branch = EXCLUDED.branch, \
target = EXCLUDED.target, \
config_kind = EXCLUDED.config_kind, \
metadata = EXCLUDED.metadata, \
updated_at = now() \
RETURNING project_id",
table = project_table,
project_name = sql_literal(&Value::String(config.project_name.clone())),
project_path = sql_literal(&Value::String(project_root.display().to_string())),
version = sql_literal(&Value::String(config.version.clone())),
build_dir = sql_literal(&Value::String(config.build_dir.clone())),
port = config.port,
app_type = optional_text_literal(config.app_type.as_deref()),
branch = optional_text_literal(config.branch.as_deref()),
target = optional_text_literal(config.target.as_deref()),
config_kind = optional_text_literal(config_kind),
metadata = sql_literal(&metadata),
);
let project_result = execute_sql(client, &project_sql).await?;
let project_id = query_first_column_as_string(&project_result, "project_id")
.ok_or_else(|| "failed to resolve project_id from upsert".to_string())?;
if let Some(services) = config.services.as_ref() {
let services_table = qualified_table(&runtime.schema, "xbp_project_services");
for service in services {
upsert_project_service(client, &services_table, &project_id, service).await?;
}
}
Ok(project_id)
}
async fn upsert_project_service(
client: &AthenaClient,
table: &str,
project_id: &str,
service: &ServiceConfig,
) -> Result<(), String> {
let commands = serde_json::to_value(&service.commands).unwrap_or_else(|_| json!({}));
let environment = serde_json::to_value(&service.environment).unwrap_or_else(|_| json!({}));
let metadata = build_service_metadata(service);
let sql = format!(
"INSERT INTO {table} (project_id, service_name, target, branch, port, root_directory, url, healthcheck_path, restart_policy, start_wrapper, systemd_service_name, commands, environment, metadata, updated_at) \
VALUES ({project_id}::uuid, {service_name}, {target}, {branch}, {port}, {root_directory}, {url}, {healthcheck_path}, {restart_policy}, {start_wrapper}, {systemd_service_name}, {commands}, {environment}, {metadata}, now()) \
ON CONFLICT (project_id, service_name) DO UPDATE SET \
target = EXCLUDED.target, \
branch = EXCLUDED.branch, \
port = EXCLUDED.port, \
root_directory = EXCLUDED.root_directory, \
url = EXCLUDED.url, \
healthcheck_path = EXCLUDED.healthcheck_path, \
restart_policy = EXCLUDED.restart_policy, \
start_wrapper = EXCLUDED.start_wrapper, \
systemd_service_name = EXCLUDED.systemd_service_name, \
commands = EXCLUDED.commands, \
environment = EXCLUDED.environment, \
metadata = EXCLUDED.metadata, \
updated_at = now()",
table = table,
project_id = sql_literal(&Value::String(project_id.to_string())),
service_name = sql_literal(&Value::String(service.name.clone())),
target = sql_literal(&Value::String(service.target.clone())),
branch = sql_literal(&Value::String(service.branch.clone())),
port = service.port,
root_directory = optional_text_literal(service.root_directory.as_deref()),
url = optional_text_literal(service.url.as_deref()),
healthcheck_path = optional_text_literal(service.healthcheck_path.as_deref()),
restart_policy = optional_text_literal(service.restart_policy.as_deref()),
start_wrapper = optional_text_literal(service.start_wrapper.as_deref()),
systemd_service_name = optional_text_literal(service.systemd_service_name.as_deref()),
commands = sql_literal(&commands),
environment = sql_literal(&environment),
metadata = sql_literal(&metadata),
);
let _ = execute_sql(client, &sql).await?;
Ok(())
}
fn build_project_metadata(config: &XbpConfig) -> Value {
json!({
"services_count": config.services.as_ref().map(|services| services.len()).unwrap_or(0),
"monitor_url": config.monitor_url,
"kafka_topic": config.kafka_topic,
"systemd_service_name": config.systemd_service_name,
})
}
fn build_service_metadata(service: &ServiceConfig) -> Value {
json!({
"force_run_from_root": service.force_run_from_root,
"restart_policy_max_failure_count": service.restart_policy_max_failure_count,
})
}
fn resolve_connection_pair(database: Option<&DatabaseConfig>) -> Option<(String, String)> {
let block_url = database
.and_then(|db| db.url_env.as_deref())
.and_then(read_env_nonempty);
let block_key = database
.and_then(|db| db.key_env.as_deref())
.and_then(read_env_nonempty);
if let (Some(url), Some(key)) = (block_url, block_key) {
return Some((url, key));
}
let xbp_url = read_env_nonempty("XBP_ATHENA_URL");
let xbp_key = read_env_nonempty("XBP_ATHENA_KEY");
if let (Some(url), Some(key)) = (xbp_url, xbp_key) {
return Some((url, key));
}
let supabase_url = read_env_nonempty("SUPABASE_URL");
let supabase_key = read_env_nonempty("SUPABASE_KEY");
if let (Some(url), Some(key)) = (supabase_url, supabase_key) {
return Some((url, key));
}
let xlx_supabase_url = read_env_nonempty("XLX_SUPABASE_URL");
let xlx_supabase_key = read_env_nonempty("XLX_SUPABASE_ANON_KEY");
if let (Some(url), Some(key)) = (xlx_supabase_url, xlx_supabase_key) {
return Some((url, key));
}
None
}
fn read_env_nonempty(name: &str) -> Option<String> {
match env::var(name) {
Ok(value) if !value.trim().is_empty() => Some(value),
_ => None,
}
}
fn value_or_default(primary: Option<&str>, secondary: Option<&str>, default: &str) -> String {
if let Some(value) = primary {
let trimmed = value.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
if let Some(value) = secondary {
let trimmed = value.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
default.to_string()
}
async fn execute_sql(client: &AthenaClient, sql: &str) -> Result<QueryResult, String> {
client
.execute_sql(sql)
.await
.map_err(|err| format!("athena sql execution failed: {err}"))
}
fn query_first_column_as_string(result: &QueryResult, key: &str) -> Option<String> {
result.rows.first().and_then(|row| {
row.get(key).and_then(|value| {
value
.as_str()
.map(|value| value.to_string())
.or_else(|| value.as_i64().map(|value| value.to_string()))
})
})
}
fn load_current_project_context() -> Option<ProjectContext> {
let current_dir = env::current_dir().ok()?;
let found = find_xbp_config_upwards(¤t_dir)?;
let content = std::fs::read_to_string(&found.config_path).ok()?;
let (config, _) = parse_config_with_auto_heal::<XbpConfig>(&content, found.kind).ok()?;
Some(ProjectContext {
project_root: found.project_root,
config,
config_kind: found.kind.to_string(),
})
}
fn log_level_label(level: &LogLevel) -> &'static str {
match level {
LogLevel::Info => "INFO",
LogLevel::Warning => "WARN",
LogLevel::Error => "ERROR",
LogLevel::Debug => "DEBUG",
LogLevel::Success => "SUCCESS",
}
}
fn to_rfc3339(ts: DateTime<Local>) -> String {
ts.with_timezone(&Utc).to_rfc3339()
}
fn sanitize_identifier(value: &str) -> Option<String> {
let trimmed = value.trim();
if trimmed.is_empty() {
return None;
}
let mut chars = trimmed.chars();
let first = chars.next()?;
if !(first.is_ascii_alphabetic() || first == '_') {
return None;
}
if chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
Some(trimmed.to_string())
} else {
None
}
}
fn qualified_table(schema: &str, table: &str) -> String {
let schema = sanitize_identifier(schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
format!("{}.{}", schema, table)
}
pub fn sql_literal(value: &Value) -> String {
match value {
Value::Null => "NULL".to_string(),
Value::Bool(boolean) => boolean.to_string(),
Value::Number(number) => number.to_string(),
Value::String(text) => format!("'{}'", text.replace('\'', "''")),
Value::Array(_) | Value::Object(_) => {
format!("'{}'::jsonb", value.to_string().replace('\'', "''"))
}
}
}
fn optional_text_literal(value: Option<&str>) -> String {
match value {
Some(value) if !value.trim().is_empty() => sql_literal(&Value::String(value.to_string())),
_ => "NULL".to_string(),
}
}
fn optional_u64_literal(value: Option<u64>) -> String {
match value {
Some(value) => value.to_string(),
None => "NULL".to_string(),
}
}
fn optional_uuid_literal(value: Option<&str>) -> String {
match value {
Some(value) if !value.trim().is_empty() => {
format!("{}::uuid", sql_literal(&Value::String(value.to_string())))
}
_ => "NULL".to_string(),
}
}
pub async fn with_fail_open<T, F>(operation: &str, fut: F) -> Option<T>
where
F: Future<Output = Result<T, String>>,
{
match fut.await {
Ok(result) => Some(result),
Err(error) if error == DB_NOT_CONFIGURED => None,
Err(error) => {
warn!("{} (fail-open): {}", operation, error);
None
}
}
}
#[cfg(test)]
mod tests {
use super::{
build_project_metadata, build_service_metadata, extract_cron_restart_expression,
resolve_runtime_config, sql_literal, with_fail_open,
};
use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Mutex;
static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
fn base_config() -> XbpConfig {
XbpConfig {
project_name: "demo".to_string(),
version: "0.1.0".to_string(),
port: 3000,
build_dir: "/tmp/demo".to_string(),
app_type: None,
build_command: None,
start_command: None,
install_command: None,
environment: None,
services: None,
systemd_service_name: None,
kafka_brokers: None,
kafka_topic: None,
kafka_public_url: None,
log_files: None,
monitor_url: None,
monitor_method: None,
monitor_expected_code: None,
monitor_interval: None,
target: None,
branch: None,
crate_name: None,
npm_script: None,
port_storybook: None,
url: None,
url_storybook: None,
database: None,
}
}
fn clear_env() {
for key in [
"BLOCK_URL",
"BLOCK_KEY",
"XBP_ATHENA_BACKEND",
"XBP_ATHENA_URL",
"XBP_ATHENA_KEY",
"SUPABASE_URL",
"SUPABASE_KEY",
"XLX_SUPABASE_URL",
"XLX_SUPABASE_ANON_KEY",
] {
std::env::remove_var(key);
}
}
#[test]
fn resolve_runtime_config_prefers_database_block_env_binding() {
let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
clear_env();
std::env::set_var("BLOCK_URL", "https://block.example");
std::env::set_var("BLOCK_KEY", "block-key");
std::env::set_var("XBP_ATHENA_URL", "https://xbp.example");
std::env::set_var("XBP_ATHENA_KEY", "xbp-key");
let mut config = base_config();
config.database = Some(DatabaseConfig {
enabled: Some(true),
backend: Some("postgres".to_string()),
url_env: Some("BLOCK_URL".to_string()),
key_env: Some("BLOCK_KEY".to_string()),
schema: Some("custom_schema".to_string()),
});
let runtime = resolve_runtime_config(Some(&config)).expect("runtime");
assert_eq!(runtime.backend, "postgres");
assert_eq!(runtime.url, "https://block.example");
assert_eq!(runtime.key, "block-key");
assert_eq!(runtime.schema, "custom_schema");
}
#[test]
fn resolve_runtime_config_uses_xbp_env_before_supabase_fallback() {
let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
clear_env();
std::env::set_var("XBP_ATHENA_BACKEND", "supabase");
std::env::set_var("XBP_ATHENA_URL", "https://xbp.env");
std::env::set_var("XBP_ATHENA_KEY", "xbp-env-key");
std::env::set_var("SUPABASE_URL", "https://supabase.env");
std::env::set_var("SUPABASE_KEY", "supabase-key");
let runtime = resolve_runtime_config(None).expect("runtime");
assert_eq!(runtime.url, "https://xbp.env");
assert_eq!(runtime.key, "xbp-env-key");
}
#[test]
fn resolve_runtime_config_falls_back_to_supabase_then_xlx_supabase() {
let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
clear_env();
std::env::set_var("SUPABASE_URL", "https://supabase.env");
std::env::set_var("SUPABASE_KEY", "supabase-key");
let runtime = resolve_runtime_config(None).expect("runtime");
assert_eq!(runtime.url, "https://supabase.env");
assert_eq!(runtime.key, "supabase-key");
clear_env();
std::env::set_var("XLX_SUPABASE_URL", "https://xlx-supabase.env");
std::env::set_var("XLX_SUPABASE_ANON_KEY", "xlx-key");
let runtime = resolve_runtime_config(None).expect("runtime");
assert_eq!(runtime.url, "https://xlx-supabase.env");
assert_eq!(runtime.key, "xlx-key");
}
#[test]
fn resolve_runtime_config_respects_database_disable_switch() {
let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
clear_env();
std::env::set_var("SUPABASE_URL", "https://supabase.env");
std::env::set_var("SUPABASE_KEY", "supabase-key");
let mut config = base_config();
config.database = Some(DatabaseConfig {
enabled: Some(false),
backend: None,
url_env: None,
key_env: None,
schema: None,
});
assert!(resolve_runtime_config(Some(&config)).is_none());
}
#[test]
fn sql_literal_escapes_quotes_and_json_payloads() {
assert_eq!(sql_literal(&json!("O'Hara")), "'O''Hara'");
assert_eq!(sql_literal(&json!(true)), "true");
assert_eq!(sql_literal(&json!(12.5)), "12.5");
assert_eq!(
sql_literal(&json!({"nested":"quote's"})),
"'{\"nested\":\"quote''s\"}'::jsonb"
);
}
#[test]
fn payload_mappers_produce_expected_shapes() {
let mut config = base_config();
config.monitor_url = Some("https://monitor.example".to_string());
config.kafka_topic = Some("xbp.logs".to_string());
config.systemd_service_name = Some("xbp-api".to_string());
config.services = Some(vec![ServiceConfig {
name: "api".to_string(),
target: "rust".to_string(),
branch: "main".to_string(),
port: 8080,
root_directory: Some("services/api".to_string()),
environment: Some(HashMap::from([(
"RUST_LOG".to_string(),
"info".to_string(),
)])),
url: Some("https://api.example.com".to_string()),
healthcheck_path: Some("/health".to_string()),
restart_policy: Some("always".to_string()),
restart_policy_max_failure_count: Some(5),
start_wrapper: Some("pm2".to_string()),
commands: None,
force_run_from_root: Some(false),
systemd_service_name: Some("xbp-api".to_string()),
}]);
let project_meta = build_project_metadata(&config);
assert_eq!(project_meta["services_count"], 1);
assert_eq!(project_meta["kafka_topic"], "xbp.logs");
let service_meta =
build_service_metadata(config.services.as_ref().unwrap().first().unwrap());
assert_eq!(service_meta["force_run_from_root"], false);
assert_eq!(service_meta["restart_policy_max_failure_count"], 5);
}
#[tokio::test]
async fn fail_open_wrapper_returns_none_on_error_and_value_on_success() {
let success = with_fail_open("test-success", async { Ok::<_, String>(42) }).await;
assert_eq!(success, Some(42));
let failed = with_fail_open::<i32, _>("test-failure", async {
Err::<i32, _>("forced failure".to_string())
})
.await;
assert_eq!(failed, None);
}
#[test]
fn cron_restart_parser_handles_split_and_equals_forms() {
let args = vec![
"npm".to_string(),
"start".to_string(),
"--cron-restart".to_string(),
"0 */6 * * *".to_string(),
];
assert_eq!(
extract_cron_restart_expression(&args),
Some("0 */6 * * *".to_string())
);
let args = vec![
"npm".to_string(),
"start".to_string(),
"--cron-restart=*/5 * * * *".to_string(),
];
assert_eq!(
extract_cron_restart_expression(&args),
Some("*/5 * * * *".to_string())
);
}
}