Skip to main content

xbp_cli/data/
athena.rs

1use athena_rs::client::backend::QueryResult;
2use athena_rs::AthenaClient;
3use chrono::{DateTime, Local, Utc};
4use once_cell::sync::Lazy;
5use serde_json::{json, Value};
6use std::collections::HashSet;
7use std::env;
8use std::future::Future;
9use std::path::{Path, PathBuf};
10use tokio::sync::Mutex;
11use tracing::warn;
12
13use crate::logging::{LogEntry, LogLevel};
14use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
15use crate::utils::{find_xbp_config_upwards, parse_config_with_auto_heal};
16
17const DB_NOT_CONFIGURED: &str = "athena database is not configured";
18const DEFAULT_BACKEND: &str = "supabase";
19const DEFAULT_SCHEMA: &str = "public";
20const BOOTSTRAP_SQL: &str = include_str!("../../sql/schema.sql");
21
22static BOOTSTRAPPED_BACKENDS: Lazy<Mutex<HashSet<String>>> =
23    Lazy::new(|| Mutex::new(HashSet::new()));
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct AthenaRuntimeConfig {
27    pub backend: String,
28    pub url: String,
29    pub key: String,
30    pub schema: String,
31}
32
33#[derive(Debug, Clone)]
34struct ProjectContext {
35    project_root: PathBuf,
36    config: XbpConfig,
37    config_kind: String,
38}
39
40pub async fn persist_project_snapshot(
41    project_root: &Path,
42    config: &XbpConfig,
43    config_kind: Option<&str>,
44) {
45    let _ = with_fail_open(
46        "persist project snapshot",
47        persist_project_snapshot_inner(project_root, config, config_kind),
48    )
49    .await;
50}
51
52pub async fn persist_log_entry(entry: &LogEntry) {
53    let _ = with_fail_open("persist xbp log entry", persist_log_entry_inner(entry)).await;
54}
55
56pub async fn persist_nginx_config_snapshot(
57    domain: &str,
58    config_path: &Path,
59    content: &str,
60    upstream_ports: &[u16],
61    listen_ports: &[u16],
62    source: &str,
63) {
64    let _ = with_fail_open(
65        "persist nginx config snapshot",
66        persist_nginx_config_snapshot_inner(
67            domain,
68            config_path,
69            content,
70            upstream_ports,
71            listen_ports,
72            source,
73        ),
74    )
75    .await;
76}
77
78pub async fn persist_nginx_log(
79    domain: Option<&str>,
80    action: &str,
81    success: bool,
82    message: &str,
83    details: Option<&str>,
84    metadata: Value,
85) {
86    let _ = with_fail_open(
87        "persist nginx log",
88        persist_nginx_log_inner(domain, action, success, message, details, metadata),
89    )
90    .await;
91}
92
93pub async fn persist_nginx_edit_audit_log(
94    domain: Option<&str>,
95    config_path: Option<&Path>,
96    actor: Option<&str>,
97    action: &str,
98    old_content: Option<&str>,
99    new_content: Option<&str>,
100    metadata: Value,
101) {
102    let _ = with_fail_open(
103        "persist nginx edit audit log",
104        persist_nginx_edit_audit_log_inner(
105            domain,
106            config_path,
107            actor,
108            action,
109            old_content,
110            new_content,
111            metadata,
112        ),
113    )
114    .await;
115}
116
117pub async fn persist_docker_container_snapshot(
118    container_id: &str,
119    container_name: &str,
120    status: Option<&str>,
121    ports: Option<&str>,
122    metadata: Value,
123) {
124    let _ = with_fail_open(
125        "persist docker container snapshot",
126        persist_docker_container_snapshot_inner(
127            container_id,
128            container_name,
129            status,
130            ports,
131            metadata,
132        ),
133    )
134    .await;
135}
136
137pub async fn persist_docker_log(
138    container_id: Option<&str>,
139    command: Option<&str>,
140    stream: &str,
141    message: &str,
142    metadata: Value,
143) {
144    let _ = with_fail_open(
145        "persist docker log",
146        persist_docker_log_inner(container_id, command, stream, message, metadata),
147    )
148    .await;
149}
150
151pub async fn persist_schedule(
152    schedule_type: &str,
153    target_kind: &str,
154    target_ref: Option<&str>,
155    expression: &str,
156    enabled: bool,
157    metadata: Value,
158) {
159    let _ = with_fail_open(
160        "persist schedule",
161        persist_schedule_inner(
162            schedule_type,
163            target_kind,
164            target_ref,
165            expression,
166            enabled,
167            metadata,
168        ),
169    )
170    .await;
171}
172
173pub fn extract_cron_restart_expression(args: &[String]) -> Option<String> {
174    for (index, arg) in args.iter().enumerate() {
175        if arg == "--cron-restart" {
176            if let Some(value) = args.get(index + 1) {
177                let value = value.trim();
178                if !value.is_empty() {
179                    return Some(value.to_string());
180                }
181            }
182        } else if let Some(value) = arg.strip_prefix("--cron-restart=") {
183            let value = value.trim();
184            if !value.is_empty() {
185                return Some(value.to_string());
186            }
187        }
188    }
189    None
190}
191
192pub fn resolve_runtime_config(config: Option<&XbpConfig>) -> Option<AthenaRuntimeConfig> {
193    let database = config.and_then(|cfg| cfg.database.as_ref());
194    let enabled = database.and_then(|db| db.enabled).unwrap_or(true);
195    if !enabled {
196        return None;
197    }
198
199    let backend = value_or_default(
200        database.and_then(|db| db.backend.as_deref()),
201        env::var("XBP_ATHENA_BACKEND").ok().as_deref(),
202        DEFAULT_BACKEND,
203    );
204
205    let (url, key) = resolve_connection_pair(database)?;
206    let schema = sanitize_identifier(
207        value_or_default(
208            database.and_then(|db| db.schema.as_deref()),
209            None,
210            DEFAULT_SCHEMA,
211        )
212        .as_str(),
213    )
214    .unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
215
216    Some(AthenaRuntimeConfig {
217        backend,
218        url,
219        key,
220        schema,
221    })
222}
223
224async fn persist_project_snapshot_inner(
225    project_root: &Path,
226    config: &XbpConfig,
227    config_kind: Option<&str>,
228) -> Result<(), String> {
229    let (client, runtime) = initialize_client(Some(config)).await?;
230    let _ =
231        upsert_project_snapshot_with_client(&client, &runtime, project_root, config, config_kind)
232            .await?;
233    Ok(())
234}
235async fn persist_log_entry_inner(entry: &LogEntry) -> Result<(), String> {
236    let context = load_current_project_context();
237    let config_ref = context.as_ref().map(|ctx| &ctx.config);
238    let (client, runtime) = initialize_client(config_ref).await?;
239
240    let project_id = if let Some(ctx) = context.as_ref() {
241        Some(
242            upsert_project_snapshot_with_client(
243                &client,
244                &runtime,
245                &ctx.project_root,
246                &ctx.config,
247                Some(ctx.config_kind.as_str()),
248            )
249            .await?,
250        )
251    } else {
252        None
253    };
254
255    let log_table = qualified_table(&runtime.schema, "xbp_logs");
256    let timestamp = to_rfc3339(entry.timestamp);
257    let metadata = json!({
258        "project_name": context.as_ref().map(|ctx| ctx.config.project_name.clone()),
259        "project_path": context.as_ref().map(|ctx| ctx.project_root.display().to_string()),
260    });
261
262    let global_sql = format!(
263        "INSERT INTO {table} (log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
264         VALUES ({log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now()) \
265         RETURNING log_id",
266        table = log_table,
267        log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
268        command = sql_literal(&Value::String(entry.command.clone())),
269        message = sql_literal(&Value::String(entry.message.clone())),
270        details = optional_text_literal(entry.details.as_deref()),
271        duration = optional_u64_literal(entry.duration_ms),
272        occurred_at = sql_literal(&Value::String(timestamp)),
273        metadata = sql_literal(&metadata),
274    );
275
276    let global_result = execute_sql(&client, &global_sql).await?;
277    let global_log_id = query_first_column_as_string(&global_result, "log_id");
278
279    if let (Some(project_id), Some(global_log_id)) =
280        (project_id.as_deref(), global_log_id.as_deref())
281    {
282        let project_table = qualified_table(&runtime.schema, "xbp_project_logs");
283        let project_sql = format!(
284            "INSERT INTO {table} (project_id, global_log_id, log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
285             VALUES ({project_id}::uuid, {global_log_id}::uuid, {log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now())",
286            table = project_table,
287            project_id = sql_literal(&Value::String(project_id.to_string())),
288            global_log_id = sql_literal(&Value::String(global_log_id.to_string())),
289            log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
290            command = sql_literal(&Value::String(entry.command.clone())),
291            message = sql_literal(&Value::String(entry.message.clone())),
292            details = optional_text_literal(entry.details.as_deref()),
293            duration = optional_u64_literal(entry.duration_ms),
294            occurred_at = sql_literal(&Value::String(to_rfc3339(entry.timestamp))),
295            metadata = sql_literal(&metadata),
296        );
297        let _ = execute_sql(&client, &project_sql).await?;
298    }
299
300    Ok(())
301}
302
303async fn persist_nginx_config_snapshot_inner(
304    domain: &str,
305    config_path: &Path,
306    content: &str,
307    upstream_ports: &[u16],
308    listen_ports: &[u16],
309    source: &str,
310) -> Result<(), String> {
311    let (client, runtime, project_id) = initialize_client_with_project_context().await?;
312    let table = qualified_table(&runtime.schema, "xbp_nginx_configs");
313    let metadata = json!({ "source": source });
314    let upstream = Value::Array(
315        upstream_ports
316            .iter()
317            .map(|port| Value::from(*port as u64))
318            .collect(),
319    );
320    let listen = Value::Array(
321        listen_ports
322            .iter()
323            .map(|port| Value::from(*port as u64))
324            .collect(),
325    );
326
327    let sql = format!(
328        "INSERT INTO {table} (project_id, domain, config_path, content, upstream_ports, listen_ports, metadata, updated_at) \
329         VALUES ({project_id}, {domain}, {config_path}, {content}, {upstream_ports}, {listen_ports}, {metadata}, now()) \
330         ON CONFLICT (domain, config_path) DO UPDATE SET \
331             project_id = EXCLUDED.project_id, \
332             content = EXCLUDED.content, \
333             upstream_ports = EXCLUDED.upstream_ports, \
334             listen_ports = EXCLUDED.listen_ports, \
335             metadata = EXCLUDED.metadata, \
336             updated_at = now()",
337        table = table,
338        project_id = optional_uuid_literal(project_id.as_deref()),
339        domain = sql_literal(&Value::String(domain.to_string())),
340        config_path = sql_literal(&Value::String(config_path.display().to_string())),
341        content = sql_literal(&Value::String(content.to_string())),
342        upstream_ports = sql_literal(&upstream),
343        listen_ports = sql_literal(&listen),
344        metadata = sql_literal(&metadata),
345    );
346
347    let _ = execute_sql(&client, &sql).await?;
348    Ok(())
349}
350
351async fn persist_nginx_log_inner(
352    domain: Option<&str>,
353    action: &str,
354    success: bool,
355    message: &str,
356    details: Option<&str>,
357    metadata: Value,
358) -> Result<(), String> {
359    let (client, runtime, project_id) = initialize_client_with_project_context().await?;
360    let table = qualified_table(&runtime.schema, "xbp_nginx_logs");
361
362    let sql = format!(
363        "INSERT INTO {table} (project_id, domain, action, success, message, details, occurred_at, metadata, updated_at) \
364         VALUES ({project_id}, {domain}, {action}, {success}, {message}, {details}, now(), {metadata}, now())",
365        table = table,
366        project_id = optional_uuid_literal(project_id.as_deref()),
367        domain = optional_text_literal(domain),
368        action = sql_literal(&Value::String(action.to_string())),
369        success = if success { "true" } else { "false" },
370        message = sql_literal(&Value::String(message.to_string())),
371        details = optional_text_literal(details),
372        metadata = sql_literal(&metadata),
373    );
374
375    let _ = execute_sql(&client, &sql).await?;
376    Ok(())
377}
378
379async fn persist_nginx_edit_audit_log_inner(
380    domain: Option<&str>,
381    config_path: Option<&Path>,
382    actor: Option<&str>,
383    action: &str,
384    old_content: Option<&str>,
385    new_content: Option<&str>,
386    metadata: Value,
387) -> Result<(), String> {
388    let (client, runtime, project_id) = initialize_client_with_project_context().await?;
389    let table = qualified_table(&runtime.schema, "xbp_nginx_edit_audit_logs");
390
391    let sql = format!(
392        "INSERT INTO {table} (project_id, domain, config_path, actor, action, old_content, new_content, occurred_at, metadata, updated_at) \
393         VALUES ({project_id}, {domain}, {config_path}, {actor}, {action}, {old_content}, {new_content}, now(), {metadata}, now())",
394        table = table,
395        project_id = optional_uuid_literal(project_id.as_deref()),
396        domain = optional_text_literal(domain),
397        config_path = config_path
398            .map(|path| sql_literal(&Value::String(path.display().to_string())))
399            .unwrap_or_else(|| "NULL".to_string()),
400        actor = optional_text_literal(actor),
401        action = sql_literal(&Value::String(action.to_string())),
402        old_content = optional_text_literal(old_content),
403        new_content = optional_text_literal(new_content),
404        metadata = sql_literal(&metadata),
405    );
406
407    let _ = execute_sql(&client, &sql).await?;
408    Ok(())
409}
410
411async fn persist_docker_container_snapshot_inner(
412    container_id: &str,
413    container_name: &str,
414    status: Option<&str>,
415    ports: Option<&str>,
416    metadata: Value,
417) -> Result<(), String> {
418    let (client, runtime, project_id) = initialize_client_with_project_context().await?;
419    let table = qualified_table(&runtime.schema, "xbp_docker_containers");
420    let sql = format!(
421        "INSERT INTO {table} (project_id, container_id, container_name, status, ports, inspected_at, metadata, updated_at) \
422         VALUES ({project_id}, {container_id}, {container_name}, {status}, {ports}, now(), {metadata}, now()) \
423         ON CONFLICT (container_id) DO UPDATE SET \
424             project_id = EXCLUDED.project_id, \
425             container_name = EXCLUDED.container_name, \
426             status = EXCLUDED.status, \
427             ports = EXCLUDED.ports, \
428             inspected_at = EXCLUDED.inspected_at, \
429             metadata = EXCLUDED.metadata, \
430             updated_at = now()",
431        table = table,
432        project_id = optional_uuid_literal(project_id.as_deref()),
433        container_id = sql_literal(&Value::String(container_id.to_string())),
434        container_name = sql_literal(&Value::String(container_name.to_string())),
435        status = optional_text_literal(status),
436        ports = optional_text_literal(ports),
437        metadata = sql_literal(&metadata),
438    );
439
440    let _ = execute_sql(&client, &sql).await?;
441    Ok(())
442}
443async fn persist_docker_log_inner(
444    container_id: Option<&str>,
445    command: Option<&str>,
446    stream: &str,
447    message: &str,
448    metadata: Value,
449) -> Result<(), String> {
450    let (client, runtime, project_id) = initialize_client_with_project_context().await?;
451    let table = qualified_table(&runtime.schema, "xbp_docker_logs");
452    let sql = format!(
453        "INSERT INTO {table} (project_id, container_id, command, stream, message, occurred_at, metadata, updated_at) \
454         VALUES ({project_id}, {container_id}, {command}, {stream}, {message}, now(), {metadata}, now())",
455        table = table,
456        project_id = optional_uuid_literal(project_id.as_deref()),
457        container_id = optional_text_literal(container_id),
458        command = optional_text_literal(command),
459        stream = sql_literal(&Value::String(stream.to_string())),
460        message = sql_literal(&Value::String(message.to_string())),
461        metadata = sql_literal(&metadata),
462    );
463
464    let _ = execute_sql(&client, &sql).await?;
465    Ok(())
466}
467
468async fn persist_schedule_inner(
469    schedule_type: &str,
470    target_kind: &str,
471    target_ref: Option<&str>,
472    expression: &str,
473    enabled: bool,
474    metadata: Value,
475) -> Result<(), String> {
476    let (client, runtime, project_id) = initialize_client_with_project_context().await?;
477    let table = qualified_table(&runtime.schema, "xbp_schedules");
478    let normalized_target_ref = target_ref.unwrap_or("");
479    let sql = format!(
480        "INSERT INTO {table} (project_id, schedule_type, target_kind, target_ref, expression, timezone, enabled, metadata, occurred_at, updated_at) \
481         VALUES ({project_id}, {schedule_type}, {target_kind}, {target_ref}, {expression}, {timezone}, {enabled}, {metadata}, now(), now()) \
482         ON CONFLICT (schedule_type, target_kind, target_ref, expression) DO UPDATE SET \
483             project_id = EXCLUDED.project_id, \
484             timezone = EXCLUDED.timezone, \
485             enabled = EXCLUDED.enabled, \
486             metadata = EXCLUDED.metadata, \
487             occurred_at = EXCLUDED.occurred_at, \
488             updated_at = now()",
489        table = table,
490        project_id = optional_uuid_literal(project_id.as_deref()),
491        schedule_type = sql_literal(&Value::String(schedule_type.to_string())),
492        target_kind = sql_literal(&Value::String(target_kind.to_string())),
493        target_ref = sql_literal(&Value::String(normalized_target_ref.to_string())),
494        expression = sql_literal(&Value::String(expression.to_string())),
495        timezone = sql_literal(&Value::String("UTC".to_string())),
496        enabled = if enabled { "true" } else { "false" },
497        metadata = sql_literal(&metadata),
498    );
499
500    let _ = execute_sql(&client, &sql).await?;
501    Ok(())
502}
503
504async fn initialize_client_with_project_context(
505) -> Result<(AthenaClient, AthenaRuntimeConfig, Option<String>), String> {
506    let context = load_current_project_context();
507    let config_ref = context.as_ref().map(|ctx| &ctx.config);
508    let (client, runtime) = initialize_client(config_ref).await?;
509    let project_id = if let Some(ctx) = context {
510        Some(
511            upsert_project_snapshot_with_client(
512                &client,
513                &runtime,
514                &ctx.project_root,
515                &ctx.config,
516                Some(ctx.config_kind.as_str()),
517            )
518            .await?,
519        )
520    } else {
521        None
522    };
523    Ok((client, runtime, project_id))
524}
525
526async fn initialize_client(
527    config: Option<&XbpConfig>,
528) -> Result<(AthenaClient, AthenaRuntimeConfig), String> {
529    let runtime = resolve_runtime_config(config).ok_or_else(|| DB_NOT_CONFIGURED.to_string())?;
530
531    let client = AthenaClient::new_with_backend_name(
532        runtime.url.clone(),
533        runtime.key.clone(),
534        &runtime.backend,
535    )
536    .await
537    .map_err(|err| format!("failed to initialize Athena client: {err}"))?;
538
539    ensure_schema_bootstrapped(&client, &runtime).await?;
540    Ok((client, runtime))
541}
542
543async fn ensure_schema_bootstrapped(
544    client: &AthenaClient,
545    runtime: &AthenaRuntimeConfig,
546) -> Result<(), String> {
547    let key = format!("{}|{}|{}", runtime.backend, runtime.url, runtime.schema);
548    let mut guard = BOOTSTRAPPED_BACKENDS.lock().await;
549    if guard.contains(&key) {
550        return Ok(());
551    }
552
553    let schema = sanitize_identifier(&runtime.schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
554    let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS {}", schema);
555    let _ = execute_sql(client, &create_schema_sql).await?;
556
557    for statement in BOOTSTRAP_SQL.split(';') {
558        let trimmed = statement.trim();
559        if trimmed.is_empty() {
560            continue;
561        }
562        let sql = if schema == DEFAULT_SCHEMA {
563            format!("{};", trimmed)
564        } else {
565            format!("SET search_path TO {}; {};", schema, trimmed)
566        };
567        let _ = execute_sql(client, &sql).await?;
568    }
569
570    guard.insert(key);
571    Ok(())
572}
573
574async fn upsert_project_snapshot_with_client(
575    client: &AthenaClient,
576    runtime: &AthenaRuntimeConfig,
577    project_root: &Path,
578    config: &XbpConfig,
579    config_kind: Option<&str>,
580) -> Result<String, String> {
581    let project_table = qualified_table(&runtime.schema, "xbp_projects");
582    let metadata = build_project_metadata(config);
583    let project_sql = format!(
584        "INSERT INTO {table} (project_name, project_path, version, build_dir, port, app_type, branch, target, config_kind, metadata, updated_at) \
585         VALUES ({project_name}, {project_path}, {version}, {build_dir}, {port}, {app_type}, {branch}, {target}, {config_kind}, {metadata}, now()) \
586         ON CONFLICT (project_path) DO UPDATE SET \
587             project_name = EXCLUDED.project_name, \
588             version = EXCLUDED.version, \
589             build_dir = EXCLUDED.build_dir, \
590             port = EXCLUDED.port, \
591             app_type = EXCLUDED.app_type, \
592             branch = EXCLUDED.branch, \
593             target = EXCLUDED.target, \
594             config_kind = EXCLUDED.config_kind, \
595             metadata = EXCLUDED.metadata, \
596             updated_at = now() \
597         RETURNING project_id",
598        table = project_table,
599        project_name = sql_literal(&Value::String(config.project_name.clone())),
600        project_path = sql_literal(&Value::String(project_root.display().to_string())),
601        version = sql_literal(&Value::String(config.version.clone())),
602        build_dir = sql_literal(&Value::String(config.build_dir.clone())),
603        port = config.port,
604        app_type = optional_text_literal(config.app_type.as_deref()),
605        branch = optional_text_literal(config.branch.as_deref()),
606        target = optional_text_literal(config.target.as_deref()),
607        config_kind = optional_text_literal(config_kind),
608        metadata = sql_literal(&metadata),
609    );
610
611    let project_result = execute_sql(client, &project_sql).await?;
612    let project_id = query_first_column_as_string(&project_result, "project_id")
613        .ok_or_else(|| "failed to resolve project_id from upsert".to_string())?;
614
615    if let Some(services) = config.services.as_ref() {
616        let services_table = qualified_table(&runtime.schema, "xbp_project_services");
617        for service in services {
618            upsert_project_service(client, &services_table, &project_id, service).await?;
619        }
620    }
621
622    Ok(project_id)
623}
624async fn upsert_project_service(
625    client: &AthenaClient,
626    table: &str,
627    project_id: &str,
628    service: &ServiceConfig,
629) -> Result<(), String> {
630    let commands = serde_json::to_value(&service.commands).unwrap_or_else(|_| json!({}));
631    let environment = serde_json::to_value(&service.environment).unwrap_or_else(|_| json!({}));
632    let metadata = build_service_metadata(service);
633    let sql = format!(
634        "INSERT INTO {table} (project_id, service_name, target, branch, port, root_directory, url, healthcheck_path, restart_policy, start_wrapper, systemd_service_name, commands, environment, metadata, updated_at) \
635         VALUES ({project_id}::uuid, {service_name}, {target}, {branch}, {port}, {root_directory}, {url}, {healthcheck_path}, {restart_policy}, {start_wrapper}, {systemd_service_name}, {commands}, {environment}, {metadata}, now()) \
636         ON CONFLICT (project_id, service_name) DO UPDATE SET \
637             target = EXCLUDED.target, \
638             branch = EXCLUDED.branch, \
639             port = EXCLUDED.port, \
640             root_directory = EXCLUDED.root_directory, \
641             url = EXCLUDED.url, \
642             healthcheck_path = EXCLUDED.healthcheck_path, \
643             restart_policy = EXCLUDED.restart_policy, \
644             start_wrapper = EXCLUDED.start_wrapper, \
645             systemd_service_name = EXCLUDED.systemd_service_name, \
646             commands = EXCLUDED.commands, \
647             environment = EXCLUDED.environment, \
648             metadata = EXCLUDED.metadata, \
649             updated_at = now()",
650        table = table,
651        project_id = sql_literal(&Value::String(project_id.to_string())),
652        service_name = sql_literal(&Value::String(service.name.clone())),
653        target = sql_literal(&Value::String(service.target.clone())),
654        branch = sql_literal(&Value::String(service.branch.clone())),
655        port = service.port,
656        root_directory = optional_text_literal(service.root_directory.as_deref()),
657        url = optional_text_literal(service.url.as_deref()),
658        healthcheck_path = optional_text_literal(service.healthcheck_path.as_deref()),
659        restart_policy = optional_text_literal(service.restart_policy.as_deref()),
660        start_wrapper = optional_text_literal(service.start_wrapper.as_deref()),
661        systemd_service_name = optional_text_literal(service.systemd_service_name.as_deref()),
662        commands = sql_literal(&commands),
663        environment = sql_literal(&environment),
664        metadata = sql_literal(&metadata),
665    );
666
667    let _ = execute_sql(client, &sql).await?;
668    Ok(())
669}
670
671fn build_project_metadata(config: &XbpConfig) -> Value {
672    json!({
673        "services_count": config.services.as_ref().map(|services| services.len()).unwrap_or(0),
674        "monitor_url": config.monitor_url,
675        "kafka_topic": config.kafka_topic,
676        "systemd_service_name": config.systemd_service_name,
677    })
678}
679
680fn build_service_metadata(service: &ServiceConfig) -> Value {
681    json!({
682        "force_run_from_root": service.force_run_from_root,
683        "restart_policy_max_failure_count": service.restart_policy_max_failure_count,
684    })
685}
686
687fn resolve_connection_pair(database: Option<&DatabaseConfig>) -> Option<(String, String)> {
688    let block_url = database
689        .and_then(|db| db.url_env.as_deref())
690        .and_then(read_env_nonempty);
691    let block_key = database
692        .and_then(|db| db.key_env.as_deref())
693        .and_then(read_env_nonempty);
694
695    if let (Some(url), Some(key)) = (block_url, block_key) {
696        return Some((url, key));
697    }
698
699    let xbp_url = read_env_nonempty("XBP_ATHENA_URL");
700    let xbp_key = read_env_nonempty("XBP_ATHENA_KEY");
701    if let (Some(url), Some(key)) = (xbp_url, xbp_key) {
702        return Some((url, key));
703    }
704
705    let supabase_url = read_env_nonempty("SUPABASE_URL");
706    let supabase_key = read_env_nonempty("SUPABASE_KEY");
707    if let (Some(url), Some(key)) = (supabase_url, supabase_key) {
708        return Some((url, key));
709    }
710
711    let xlx_supabase_url = read_env_nonempty("XLX_SUPABASE_URL");
712    let xlx_supabase_key = read_env_nonempty("XLX_SUPABASE_ANON_KEY");
713    if let (Some(url), Some(key)) = (xlx_supabase_url, xlx_supabase_key) {
714        return Some((url, key));
715    }
716
717    None
718}
719
720fn read_env_nonempty(name: &str) -> Option<String> {
721    match env::var(name) {
722        Ok(value) if !value.trim().is_empty() => Some(value),
723        _ => None,
724    }
725}
726
727fn value_or_default(primary: Option<&str>, secondary: Option<&str>, default: &str) -> String {
728    if let Some(value) = primary {
729        let trimmed = value.trim();
730        if !trimmed.is_empty() {
731            return trimmed.to_string();
732        }
733    }
734    if let Some(value) = secondary {
735        let trimmed = value.trim();
736        if !trimmed.is_empty() {
737            return trimmed.to_string();
738        }
739    }
740    default.to_string()
741}
742
743async fn execute_sql(client: &AthenaClient, sql: &str) -> Result<QueryResult, String> {
744    client
745        .execute_sql(sql)
746        .await
747        .map_err(|err| format!("athena sql execution failed: {err}"))
748}
749
750fn query_first_column_as_string(result: &QueryResult, key: &str) -> Option<String> {
751    result.rows.first().and_then(|row| {
752        row.get(key).and_then(|value| {
753            value
754                .as_str()
755                .map(|value| value.to_string())
756                .or_else(|| value.as_i64().map(|value| value.to_string()))
757        })
758    })
759}
760
761fn load_current_project_context() -> Option<ProjectContext> {
762    let current_dir = env::current_dir().ok()?;
763    let found = find_xbp_config_upwards(&current_dir)?;
764    let content = std::fs::read_to_string(&found.config_path).ok()?;
765    let (config, _) = parse_config_with_auto_heal::<XbpConfig>(&content, found.kind).ok()?;
766
767    Some(ProjectContext {
768        project_root: found.project_root,
769        config,
770        config_kind: found.kind.to_string(),
771    })
772}
773
774fn log_level_label(level: &LogLevel) -> &'static str {
775    match level {
776        LogLevel::Info => "INFO",
777        LogLevel::Warning => "WARN",
778        LogLevel::Error => "ERROR",
779        LogLevel::Debug => "DEBUG",
780        LogLevel::Success => "SUCCESS",
781    }
782}
783
784fn to_rfc3339(ts: DateTime<Local>) -> String {
785    ts.with_timezone(&Utc).to_rfc3339()
786}
787
788fn sanitize_identifier(value: &str) -> Option<String> {
789    let trimmed = value.trim();
790    if trimmed.is_empty() {
791        return None;
792    }
793    let mut chars = trimmed.chars();
794    let first = chars.next()?;
795    if !(first.is_ascii_alphabetic() || first == '_') {
796        return None;
797    }
798    if chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
799        Some(trimmed.to_string())
800    } else {
801        None
802    }
803}
804
805fn qualified_table(schema: &str, table: &str) -> String {
806    let schema = sanitize_identifier(schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
807    format!("{}.{}", schema, table)
808}
809
810pub fn sql_literal(value: &Value) -> String {
811    match value {
812        Value::Null => "NULL".to_string(),
813        Value::Bool(boolean) => boolean.to_string(),
814        Value::Number(number) => number.to_string(),
815        Value::String(text) => format!("'{}'", text.replace('\'', "''")),
816        Value::Array(_) | Value::Object(_) => {
817            format!("'{}'::jsonb", value.to_string().replace('\'', "''"))
818        }
819    }
820}
821
822fn optional_text_literal(value: Option<&str>) -> String {
823    match value {
824        Some(value) if !value.trim().is_empty() => sql_literal(&Value::String(value.to_string())),
825        _ => "NULL".to_string(),
826    }
827}
828
829fn optional_u64_literal(value: Option<u64>) -> String {
830    match value {
831        Some(value) => value.to_string(),
832        None => "NULL".to_string(),
833    }
834}
835
836fn optional_uuid_literal(value: Option<&str>) -> String {
837    match value {
838        Some(value) if !value.trim().is_empty() => {
839            format!("{}::uuid", sql_literal(&Value::String(value.to_string())))
840        }
841        _ => "NULL".to_string(),
842    }
843}
844
845pub async fn with_fail_open<T, F>(operation: &str, fut: F) -> Option<T>
846where
847    F: Future<Output = Result<T, String>>,
848{
849    match fut.await {
850        Ok(result) => Some(result),
851        Err(error) if error == DB_NOT_CONFIGURED => None,
852        Err(error) => {
853            warn!("{} (fail-open): {}", operation, error);
854            None
855        }
856    }
857}
858#[cfg(test)]
859mod tests {
860    use super::{
861        build_project_metadata, build_service_metadata, extract_cron_restart_expression,
862        resolve_runtime_config, sql_literal, with_fail_open,
863    };
864    use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
865    use serde_json::json;
866    use std::collections::HashMap;
867    use std::sync::Mutex;
868
869    static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
870
871    fn base_config() -> XbpConfig {
872        XbpConfig {
873            project_name: "demo".to_string(),
874            version: "0.1.0".to_string(),
875            port: 3000,
876            build_dir: "/tmp/demo".to_string(),
877            app_type: None,
878            build_command: None,
879            start_command: None,
880            install_command: None,
881            environment: None,
882            services: None,
883            systemd_service_name: None,
884            systemd: None,
885            kafka_brokers: None,
886            kafka_topic: None,
887            kafka_public_url: None,
888            log_files: None,
889            monitor_url: None,
890            monitor_method: None,
891            monitor_expected_code: None,
892            monitor_interval: None,
893            target: None,
894            branch: None,
895            crate_name: None,
896            npm_script: None,
897            port_storybook: None,
898            url: None,
899            url_storybook: None,
900            database: None,
901            linear: None,
902            github: None,
903            publish: None,
904        }
905    }
906
907    fn clear_env() {
908        for key in [
909            "BLOCK_URL",
910            "BLOCK_KEY",
911            "XBP_ATHENA_BACKEND",
912            "XBP_ATHENA_URL",
913            "XBP_ATHENA_KEY",
914            "SUPABASE_URL",
915            "SUPABASE_KEY",
916            "XLX_SUPABASE_URL",
917            "XLX_SUPABASE_ANON_KEY",
918        ] {
919            std::env::remove_var(key);
920        }
921    }
922
923    #[test]
924    fn resolve_runtime_config_prefers_database_block_env_binding() {
925        let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
926        clear_env();
927        std::env::set_var("BLOCK_URL", "https://block.example");
928        std::env::set_var("BLOCK_KEY", "block-key");
929        std::env::set_var("XBP_ATHENA_URL", "https://xbp.example");
930        std::env::set_var("XBP_ATHENA_KEY", "xbp-key");
931
932        let mut config = base_config();
933        config.database = Some(DatabaseConfig {
934            enabled: Some(true),
935            backend: Some("postgres".to_string()),
936            url_env: Some("BLOCK_URL".to_string()),
937            key_env: Some("BLOCK_KEY".to_string()),
938            schema: Some("custom_schema".to_string()),
939        });
940
941        let runtime = resolve_runtime_config(Some(&config)).expect("runtime");
942        assert_eq!(runtime.backend, "postgres");
943        assert_eq!(runtime.url, "https://block.example");
944        assert_eq!(runtime.key, "block-key");
945        assert_eq!(runtime.schema, "custom_schema");
946    }
947
948    #[test]
949    fn resolve_runtime_config_uses_xbp_env_before_supabase_fallback() {
950        let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
951        clear_env();
952        std::env::set_var("XBP_ATHENA_BACKEND", "supabase");
953        std::env::set_var("XBP_ATHENA_URL", "https://xbp.env");
954        std::env::set_var("XBP_ATHENA_KEY", "xbp-env-key");
955        std::env::set_var("SUPABASE_URL", "https://supabase.env");
956        std::env::set_var("SUPABASE_KEY", "supabase-key");
957
958        let runtime = resolve_runtime_config(None).expect("runtime");
959        assert_eq!(runtime.url, "https://xbp.env");
960        assert_eq!(runtime.key, "xbp-env-key");
961    }
962
963    #[test]
964    fn resolve_runtime_config_falls_back_to_supabase_then_xlx_supabase() {
965        let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
966        clear_env();
967        std::env::set_var("SUPABASE_URL", "https://supabase.env");
968        std::env::set_var("SUPABASE_KEY", "supabase-key");
969        let runtime = resolve_runtime_config(None).expect("runtime");
970        assert_eq!(runtime.url, "https://supabase.env");
971        assert_eq!(runtime.key, "supabase-key");
972
973        clear_env();
974        std::env::set_var("XLX_SUPABASE_URL", "https://xlx-supabase.env");
975        std::env::set_var("XLX_SUPABASE_ANON_KEY", "xlx-key");
976        let runtime = resolve_runtime_config(None).expect("runtime");
977        assert_eq!(runtime.url, "https://xlx-supabase.env");
978        assert_eq!(runtime.key, "xlx-key");
979    }
980
981    #[test]
982    fn resolve_runtime_config_respects_database_disable_switch() {
983        let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
984        clear_env();
985        std::env::set_var("SUPABASE_URL", "https://supabase.env");
986        std::env::set_var("SUPABASE_KEY", "supabase-key");
987
988        let mut config = base_config();
989        config.database = Some(DatabaseConfig {
990            enabled: Some(false),
991            backend: None,
992            url_env: None,
993            key_env: None,
994            schema: None,
995        });
996
997        assert!(resolve_runtime_config(Some(&config)).is_none());
998    }
999
1000    #[test]
1001    fn sql_literal_escapes_quotes_and_json_payloads() {
1002        assert_eq!(sql_literal(&json!("O'Hara")), "'O''Hara'");
1003        assert_eq!(sql_literal(&json!(true)), "true");
1004        assert_eq!(sql_literal(&json!(12.5)), "12.5");
1005        assert_eq!(
1006            sql_literal(&json!({"nested":"quote's"})),
1007            "'{\"nested\":\"quote''s\"}'::jsonb"
1008        );
1009    }
1010
1011    #[test]
1012    fn payload_mappers_produce_expected_shapes() {
1013        let mut config = base_config();
1014        config.monitor_url = Some("https://monitor.example".to_string());
1015        config.kafka_topic = Some("xbp.logs".to_string());
1016        config.systemd_service_name = Some("xbp-api".to_string());
1017        config.services = Some(vec![ServiceConfig {
1018            name: "api".to_string(),
1019            target: "rust".to_string(),
1020            branch: "main".to_string(),
1021            port: 8080,
1022            root_directory: Some("services/api".to_string()),
1023            environment: Some(HashMap::from([(
1024                "RUST_LOG".to_string(),
1025                "info".to_string(),
1026            )])),
1027            url: Some("https://api.example.com".to_string()),
1028            healthcheck_path: Some("/health".to_string()),
1029            restart_policy: Some("always".to_string()),
1030            restart_policy_max_failure_count: Some(5),
1031            start_wrapper: Some("pm2".to_string()),
1032            commands: None,
1033            force_run_from_root: Some(false),
1034            systemd_service_name: Some("xbp-api".to_string()),
1035            systemd: None,
1036        }]);
1037
1038        let project_meta = build_project_metadata(&config);
1039        assert_eq!(project_meta["services_count"], 1);
1040        assert_eq!(project_meta["kafka_topic"], "xbp.logs");
1041
1042        let service_meta =
1043            build_service_metadata(config.services.as_ref().unwrap().first().unwrap());
1044        assert_eq!(service_meta["force_run_from_root"], false);
1045        assert_eq!(service_meta["restart_policy_max_failure_count"], 5);
1046    }
1047
1048    #[tokio::test]
1049    async fn fail_open_wrapper_returns_none_on_error_and_value_on_success() {
1050        let success = with_fail_open("test-success", async { Ok::<_, String>(42) }).await;
1051        assert_eq!(success, Some(42));
1052
1053        let failed = with_fail_open::<i32, _>("test-failure", async {
1054            Err::<i32, _>("forced failure".to_string())
1055        })
1056        .await;
1057        assert_eq!(failed, None);
1058    }
1059
1060    #[test]
1061    fn cron_restart_parser_handles_split_and_equals_forms() {
1062        let args = vec![
1063            "npm".to_string(),
1064            "start".to_string(),
1065            "--cron-restart".to_string(),
1066            "0 */6 * * *".to_string(),
1067        ];
1068        assert_eq!(
1069            extract_cron_restart_expression(&args),
1070            Some("0 */6 * * *".to_string())
1071        );
1072
1073        let args = vec![
1074            "npm".to_string(),
1075            "start".to_string(),
1076            "--cron-restart=*/5 * * * *".to_string(),
1077        ];
1078        assert_eq!(
1079            extract_cron_restart_expression(&args),
1080            Some("*/5 * * * *".to_string())
1081        );
1082    }
1083}