1use athena_rs::client::backend::QueryResult;
2use athena_rs::AthenaClient;
3use chrono::{DateTime, Local, Utc};
4use once_cell::sync::Lazy;
5use serde_json::{json, Value};
6use std::collections::HashSet;
7use std::env;
8use std::future::Future;
9use std::path::{Path, PathBuf};
10use tokio::sync::Mutex;
11use tracing::warn;
12
13use crate::logging::{LogEntry, LogLevel};
14use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
15use crate::utils::{find_xbp_config_upwards, parse_config_with_auto_heal};
16
17const DB_NOT_CONFIGURED: &str = "athena database is not configured";
18const DEFAULT_BACKEND: &str = "supabase";
19const DEFAULT_SCHEMA: &str = "public";
20const BOOTSTRAP_SQL: &str = include_str!("../../sql/schema.sql");
21
22static BOOTSTRAPPED_BACKENDS: Lazy<Mutex<HashSet<String>>> =
23 Lazy::new(|| Mutex::new(HashSet::new()));
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct AthenaRuntimeConfig {
27 pub backend: String,
28 pub url: String,
29 pub key: String,
30 pub schema: String,
31}
32
33#[derive(Debug, Clone)]
34struct ProjectContext {
35 project_root: PathBuf,
36 config: XbpConfig,
37 config_kind: String,
38}
39
40pub async fn persist_project_snapshot(
41 project_root: &Path,
42 config: &XbpConfig,
43 config_kind: Option<&str>,
44) {
45 let _ = with_fail_open(
46 "persist project snapshot",
47 persist_project_snapshot_inner(project_root, config, config_kind),
48 )
49 .await;
50}
51
52pub async fn persist_log_entry(entry: &LogEntry) {
53 let _ = with_fail_open("persist xbp log entry", persist_log_entry_inner(entry)).await;
54}
55
56pub async fn persist_nginx_config_snapshot(
57 domain: &str,
58 config_path: &Path,
59 content: &str,
60 upstream_ports: &[u16],
61 listen_ports: &[u16],
62 source: &str,
63) {
64 let _ = with_fail_open(
65 "persist nginx config snapshot",
66 persist_nginx_config_snapshot_inner(
67 domain,
68 config_path,
69 content,
70 upstream_ports,
71 listen_ports,
72 source,
73 ),
74 )
75 .await;
76}
77
78pub async fn persist_nginx_log(
79 domain: Option<&str>,
80 action: &str,
81 success: bool,
82 message: &str,
83 details: Option<&str>,
84 metadata: Value,
85) {
86 let _ = with_fail_open(
87 "persist nginx log",
88 persist_nginx_log_inner(domain, action, success, message, details, metadata),
89 )
90 .await;
91}
92
93pub async fn persist_nginx_edit_audit_log(
94 domain: Option<&str>,
95 config_path: Option<&Path>,
96 actor: Option<&str>,
97 action: &str,
98 old_content: Option<&str>,
99 new_content: Option<&str>,
100 metadata: Value,
101) {
102 let _ = with_fail_open(
103 "persist nginx edit audit log",
104 persist_nginx_edit_audit_log_inner(
105 domain,
106 config_path,
107 actor,
108 action,
109 old_content,
110 new_content,
111 metadata,
112 ),
113 )
114 .await;
115}
116
117pub async fn persist_docker_container_snapshot(
118 container_id: &str,
119 container_name: &str,
120 status: Option<&str>,
121 ports: Option<&str>,
122 metadata: Value,
123) {
124 let _ = with_fail_open(
125 "persist docker container snapshot",
126 persist_docker_container_snapshot_inner(
127 container_id,
128 container_name,
129 status,
130 ports,
131 metadata,
132 ),
133 )
134 .await;
135}
136
137pub async fn persist_docker_log(
138 container_id: Option<&str>,
139 command: Option<&str>,
140 stream: &str,
141 message: &str,
142 metadata: Value,
143) {
144 let _ = with_fail_open(
145 "persist docker log",
146 persist_docker_log_inner(container_id, command, stream, message, metadata),
147 )
148 .await;
149}
150
151pub async fn persist_schedule(
152 schedule_type: &str,
153 target_kind: &str,
154 target_ref: Option<&str>,
155 expression: &str,
156 enabled: bool,
157 metadata: Value,
158) {
159 let _ = with_fail_open(
160 "persist schedule",
161 persist_schedule_inner(
162 schedule_type,
163 target_kind,
164 target_ref,
165 expression,
166 enabled,
167 metadata,
168 ),
169 )
170 .await;
171}
172
173pub fn extract_cron_restart_expression(args: &[String]) -> Option<String> {
174 for (index, arg) in args.iter().enumerate() {
175 if arg == "--cron-restart" {
176 if let Some(value) = args.get(index + 1) {
177 let value = value.trim();
178 if !value.is_empty() {
179 return Some(value.to_string());
180 }
181 }
182 } else if let Some(value) = arg.strip_prefix("--cron-restart=") {
183 let value = value.trim();
184 if !value.is_empty() {
185 return Some(value.to_string());
186 }
187 }
188 }
189 None
190}
191
192pub fn resolve_runtime_config(config: Option<&XbpConfig>) -> Option<AthenaRuntimeConfig> {
193 let database = config.and_then(|cfg| cfg.database.as_ref());
194 let enabled = database.and_then(|db| db.enabled).unwrap_or(true);
195 if !enabled {
196 return None;
197 }
198
199 let backend = value_or_default(
200 database.and_then(|db| db.backend.as_deref()),
201 env::var("XBP_ATHENA_BACKEND").ok().as_deref(),
202 DEFAULT_BACKEND,
203 );
204
205 let (url, key) = resolve_connection_pair(database)?;
206 let schema = sanitize_identifier(
207 value_or_default(
208 database.and_then(|db| db.schema.as_deref()),
209 None,
210 DEFAULT_SCHEMA,
211 )
212 .as_str(),
213 )
214 .unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
215
216 Some(AthenaRuntimeConfig {
217 backend,
218 url,
219 key,
220 schema,
221 })
222}
223
224async fn persist_project_snapshot_inner(
225 project_root: &Path,
226 config: &XbpConfig,
227 config_kind: Option<&str>,
228) -> Result<(), String> {
229 let (client, runtime) = initialize_client(Some(config)).await?;
230 let _ =
231 upsert_project_snapshot_with_client(&client, &runtime, project_root, config, config_kind)
232 .await?;
233 Ok(())
234}
235async fn persist_log_entry_inner(entry: &LogEntry) -> Result<(), String> {
236 let context = load_current_project_context();
237 let config_ref = context.as_ref().map(|ctx| &ctx.config);
238 let (client, runtime) = initialize_client(config_ref).await?;
239
240 let project_id = if let Some(ctx) = context.as_ref() {
241 Some(
242 upsert_project_snapshot_with_client(
243 &client,
244 &runtime,
245 &ctx.project_root,
246 &ctx.config,
247 Some(ctx.config_kind.as_str()),
248 )
249 .await?,
250 )
251 } else {
252 None
253 };
254
255 let log_table = qualified_table(&runtime.schema, "xbp_logs");
256 let timestamp = to_rfc3339(entry.timestamp);
257 let metadata = json!({
258 "project_name": context.as_ref().map(|ctx| ctx.config.project_name.clone()),
259 "project_path": context.as_ref().map(|ctx| ctx.project_root.display().to_string()),
260 });
261
262 let global_sql = format!(
263 "INSERT INTO {table} (log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
264 VALUES ({log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now()) \
265 RETURNING log_id",
266 table = log_table,
267 log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
268 command = sql_literal(&Value::String(entry.command.clone())),
269 message = sql_literal(&Value::String(entry.message.clone())),
270 details = optional_text_literal(entry.details.as_deref()),
271 duration = optional_u64_literal(entry.duration_ms),
272 occurred_at = sql_literal(&Value::String(timestamp)),
273 metadata = sql_literal(&metadata),
274 );
275
276 let global_result = execute_sql(&client, &global_sql).await?;
277 let global_log_id = query_first_column_as_string(&global_result, "log_id");
278
279 if let (Some(project_id), Some(global_log_id)) =
280 (project_id.as_deref(), global_log_id.as_deref())
281 {
282 let project_table = qualified_table(&runtime.schema, "xbp_project_logs");
283 let project_sql = format!(
284 "INSERT INTO {table} (project_id, global_log_id, log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
285 VALUES ({project_id}::uuid, {global_log_id}::uuid, {log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now())",
286 table = project_table,
287 project_id = sql_literal(&Value::String(project_id.to_string())),
288 global_log_id = sql_literal(&Value::String(global_log_id.to_string())),
289 log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
290 command = sql_literal(&Value::String(entry.command.clone())),
291 message = sql_literal(&Value::String(entry.message.clone())),
292 details = optional_text_literal(entry.details.as_deref()),
293 duration = optional_u64_literal(entry.duration_ms),
294 occurred_at = sql_literal(&Value::String(to_rfc3339(entry.timestamp))),
295 metadata = sql_literal(&metadata),
296 );
297 let _ = execute_sql(&client, &project_sql).await?;
298 }
299
300 Ok(())
301}
302
303async fn persist_nginx_config_snapshot_inner(
304 domain: &str,
305 config_path: &Path,
306 content: &str,
307 upstream_ports: &[u16],
308 listen_ports: &[u16],
309 source: &str,
310) -> Result<(), String> {
311 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
312 let table = qualified_table(&runtime.schema, "xbp_nginx_configs");
313 let metadata = json!({ "source": source });
314 let upstream = Value::Array(
315 upstream_ports
316 .iter()
317 .map(|port| Value::from(*port as u64))
318 .collect(),
319 );
320 let listen = Value::Array(
321 listen_ports
322 .iter()
323 .map(|port| Value::from(*port as u64))
324 .collect(),
325 );
326
327 let sql = format!(
328 "INSERT INTO {table} (project_id, domain, config_path, content, upstream_ports, listen_ports, metadata, updated_at) \
329 VALUES ({project_id}, {domain}, {config_path}, {content}, {upstream_ports}, {listen_ports}, {metadata}, now()) \
330 ON CONFLICT (domain, config_path) DO UPDATE SET \
331 project_id = EXCLUDED.project_id, \
332 content = EXCLUDED.content, \
333 upstream_ports = EXCLUDED.upstream_ports, \
334 listen_ports = EXCLUDED.listen_ports, \
335 metadata = EXCLUDED.metadata, \
336 updated_at = now()",
337 table = table,
338 project_id = optional_uuid_literal(project_id.as_deref()),
339 domain = sql_literal(&Value::String(domain.to_string())),
340 config_path = sql_literal(&Value::String(config_path.display().to_string())),
341 content = sql_literal(&Value::String(content.to_string())),
342 upstream_ports = sql_literal(&upstream),
343 listen_ports = sql_literal(&listen),
344 metadata = sql_literal(&metadata),
345 );
346
347 let _ = execute_sql(&client, &sql).await?;
348 Ok(())
349}
350
351async fn persist_nginx_log_inner(
352 domain: Option<&str>,
353 action: &str,
354 success: bool,
355 message: &str,
356 details: Option<&str>,
357 metadata: Value,
358) -> Result<(), String> {
359 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
360 let table = qualified_table(&runtime.schema, "xbp_nginx_logs");
361
362 let sql = format!(
363 "INSERT INTO {table} (project_id, domain, action, success, message, details, occurred_at, metadata, updated_at) \
364 VALUES ({project_id}, {domain}, {action}, {success}, {message}, {details}, now(), {metadata}, now())",
365 table = table,
366 project_id = optional_uuid_literal(project_id.as_deref()),
367 domain = optional_text_literal(domain),
368 action = sql_literal(&Value::String(action.to_string())),
369 success = if success { "true" } else { "false" },
370 message = sql_literal(&Value::String(message.to_string())),
371 details = optional_text_literal(details),
372 metadata = sql_literal(&metadata),
373 );
374
375 let _ = execute_sql(&client, &sql).await?;
376 Ok(())
377}
378
379async fn persist_nginx_edit_audit_log_inner(
380 domain: Option<&str>,
381 config_path: Option<&Path>,
382 actor: Option<&str>,
383 action: &str,
384 old_content: Option<&str>,
385 new_content: Option<&str>,
386 metadata: Value,
387) -> Result<(), String> {
388 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
389 let table = qualified_table(&runtime.schema, "xbp_nginx_edit_audit_logs");
390
391 let sql = format!(
392 "INSERT INTO {table} (project_id, domain, config_path, actor, action, old_content, new_content, occurred_at, metadata, updated_at) \
393 VALUES ({project_id}, {domain}, {config_path}, {actor}, {action}, {old_content}, {new_content}, now(), {metadata}, now())",
394 table = table,
395 project_id = optional_uuid_literal(project_id.as_deref()),
396 domain = optional_text_literal(domain),
397 config_path = config_path
398 .map(|path| sql_literal(&Value::String(path.display().to_string())))
399 .unwrap_or_else(|| "NULL".to_string()),
400 actor = optional_text_literal(actor),
401 action = sql_literal(&Value::String(action.to_string())),
402 old_content = optional_text_literal(old_content),
403 new_content = optional_text_literal(new_content),
404 metadata = sql_literal(&metadata),
405 );
406
407 let _ = execute_sql(&client, &sql).await?;
408 Ok(())
409}
410
411async fn persist_docker_container_snapshot_inner(
412 container_id: &str,
413 container_name: &str,
414 status: Option<&str>,
415 ports: Option<&str>,
416 metadata: Value,
417) -> Result<(), String> {
418 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
419 let table = qualified_table(&runtime.schema, "xbp_docker_containers");
420 let sql = format!(
421 "INSERT INTO {table} (project_id, container_id, container_name, status, ports, inspected_at, metadata, updated_at) \
422 VALUES ({project_id}, {container_id}, {container_name}, {status}, {ports}, now(), {metadata}, now()) \
423 ON CONFLICT (container_id) DO UPDATE SET \
424 project_id = EXCLUDED.project_id, \
425 container_name = EXCLUDED.container_name, \
426 status = EXCLUDED.status, \
427 ports = EXCLUDED.ports, \
428 inspected_at = EXCLUDED.inspected_at, \
429 metadata = EXCLUDED.metadata, \
430 updated_at = now()",
431 table = table,
432 project_id = optional_uuid_literal(project_id.as_deref()),
433 container_id = sql_literal(&Value::String(container_id.to_string())),
434 container_name = sql_literal(&Value::String(container_name.to_string())),
435 status = optional_text_literal(status),
436 ports = optional_text_literal(ports),
437 metadata = sql_literal(&metadata),
438 );
439
440 let _ = execute_sql(&client, &sql).await?;
441 Ok(())
442}
443async fn persist_docker_log_inner(
444 container_id: Option<&str>,
445 command: Option<&str>,
446 stream: &str,
447 message: &str,
448 metadata: Value,
449) -> Result<(), String> {
450 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
451 let table = qualified_table(&runtime.schema, "xbp_docker_logs");
452 let sql = format!(
453 "INSERT INTO {table} (project_id, container_id, command, stream, message, occurred_at, metadata, updated_at) \
454 VALUES ({project_id}, {container_id}, {command}, {stream}, {message}, now(), {metadata}, now())",
455 table = table,
456 project_id = optional_uuid_literal(project_id.as_deref()),
457 container_id = optional_text_literal(container_id),
458 command = optional_text_literal(command),
459 stream = sql_literal(&Value::String(stream.to_string())),
460 message = sql_literal(&Value::String(message.to_string())),
461 metadata = sql_literal(&metadata),
462 );
463
464 let _ = execute_sql(&client, &sql).await?;
465 Ok(())
466}
467
468async fn persist_schedule_inner(
469 schedule_type: &str,
470 target_kind: &str,
471 target_ref: Option<&str>,
472 expression: &str,
473 enabled: bool,
474 metadata: Value,
475) -> Result<(), String> {
476 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
477 let table = qualified_table(&runtime.schema, "xbp_schedules");
478 let normalized_target_ref = target_ref.unwrap_or("");
479 let sql = format!(
480 "INSERT INTO {table} (project_id, schedule_type, target_kind, target_ref, expression, timezone, enabled, metadata, occurred_at, updated_at) \
481 VALUES ({project_id}, {schedule_type}, {target_kind}, {target_ref}, {expression}, {timezone}, {enabled}, {metadata}, now(), now()) \
482 ON CONFLICT (schedule_type, target_kind, target_ref, expression) DO UPDATE SET \
483 project_id = EXCLUDED.project_id, \
484 timezone = EXCLUDED.timezone, \
485 enabled = EXCLUDED.enabled, \
486 metadata = EXCLUDED.metadata, \
487 occurred_at = EXCLUDED.occurred_at, \
488 updated_at = now()",
489 table = table,
490 project_id = optional_uuid_literal(project_id.as_deref()),
491 schedule_type = sql_literal(&Value::String(schedule_type.to_string())),
492 target_kind = sql_literal(&Value::String(target_kind.to_string())),
493 target_ref = sql_literal(&Value::String(normalized_target_ref.to_string())),
494 expression = sql_literal(&Value::String(expression.to_string())),
495 timezone = sql_literal(&Value::String("UTC".to_string())),
496 enabled = if enabled { "true" } else { "false" },
497 metadata = sql_literal(&metadata),
498 );
499
500 let _ = execute_sql(&client, &sql).await?;
501 Ok(())
502}
503
504async fn initialize_client_with_project_context(
505) -> Result<(AthenaClient, AthenaRuntimeConfig, Option<String>), String> {
506 let context = load_current_project_context();
507 let config_ref = context.as_ref().map(|ctx| &ctx.config);
508 let (client, runtime) = initialize_client(config_ref).await?;
509 let project_id = if let Some(ctx) = context {
510 Some(
511 upsert_project_snapshot_with_client(
512 &client,
513 &runtime,
514 &ctx.project_root,
515 &ctx.config,
516 Some(ctx.config_kind.as_str()),
517 )
518 .await?,
519 )
520 } else {
521 None
522 };
523 Ok((client, runtime, project_id))
524}
525
526async fn initialize_client(
527 config: Option<&XbpConfig>,
528) -> Result<(AthenaClient, AthenaRuntimeConfig), String> {
529 let runtime = resolve_runtime_config(config).ok_or_else(|| DB_NOT_CONFIGURED.to_string())?;
530
531 let client = AthenaClient::new_with_backend_name(
532 runtime.url.clone(),
533 runtime.key.clone(),
534 "xbp-cli",
535 &runtime.backend,
536 )
537 .await
538 .map_err(|err| format!("failed to initialize Athena client: {err}"))?;
539
540 ensure_schema_bootstrapped(&client, &runtime).await?;
541 Ok((client, runtime))
542}
543
544async fn ensure_schema_bootstrapped(
545 client: &AthenaClient,
546 runtime: &AthenaRuntimeConfig,
547) -> Result<(), String> {
548 let key = format!("{}|{}|{}", runtime.backend, runtime.url, runtime.schema);
549 let mut guard = BOOTSTRAPPED_BACKENDS.lock().await;
550 if guard.contains(&key) {
551 return Ok(());
552 }
553
554 let schema = sanitize_identifier(&runtime.schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
555 let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS {}", schema);
556 let _ = execute_sql(client, &create_schema_sql).await?;
557
558 for statement in BOOTSTRAP_SQL.split(';') {
559 let trimmed = statement.trim();
560 if trimmed.is_empty() {
561 continue;
562 }
563 let sql = if schema == DEFAULT_SCHEMA {
564 format!("{};", trimmed)
565 } else {
566 format!("SET search_path TO {}; {};", schema, trimmed)
567 };
568 let _ = execute_sql(client, &sql).await?;
569 }
570
571 guard.insert(key);
572 Ok(())
573}
574
575async fn upsert_project_snapshot_with_client(
576 client: &AthenaClient,
577 runtime: &AthenaRuntimeConfig,
578 project_root: &Path,
579 config: &XbpConfig,
580 config_kind: Option<&str>,
581) -> Result<String, String> {
582 let project_table = qualified_table(&runtime.schema, "xbp_projects");
583 let metadata = build_project_metadata(config);
584 let project_sql = format!(
585 "INSERT INTO {table} (project_name, project_path, version, build_dir, port, app_type, branch, target, config_kind, metadata, updated_at) \
586 VALUES ({project_name}, {project_path}, {version}, {build_dir}, {port}, {app_type}, {branch}, {target}, {config_kind}, {metadata}, now()) \
587 ON CONFLICT (project_path) DO UPDATE SET \
588 project_name = EXCLUDED.project_name, \
589 version = EXCLUDED.version, \
590 build_dir = EXCLUDED.build_dir, \
591 port = EXCLUDED.port, \
592 app_type = EXCLUDED.app_type, \
593 branch = EXCLUDED.branch, \
594 target = EXCLUDED.target, \
595 config_kind = EXCLUDED.config_kind, \
596 metadata = EXCLUDED.metadata, \
597 updated_at = now() \
598 RETURNING project_id",
599 table = project_table,
600 project_name = sql_literal(&Value::String(config.project_name.clone())),
601 project_path = sql_literal(&Value::String(project_root.display().to_string())),
602 version = sql_literal(&Value::String(config.version.clone())),
603 build_dir = sql_literal(&Value::String(config.build_dir.clone())),
604 port = config.port,
605 app_type = optional_text_literal(config.app_type.as_deref()),
606 branch = optional_text_literal(config.branch.as_deref()),
607 target = optional_text_literal(config.target.as_deref()),
608 config_kind = optional_text_literal(config_kind),
609 metadata = sql_literal(&metadata),
610 );
611
612 let project_result = execute_sql(client, &project_sql).await?;
613 let project_id = query_first_column_as_string(&project_result, "project_id")
614 .ok_or_else(|| "failed to resolve project_id from upsert".to_string())?;
615
616 if let Some(services) = config.services.as_ref() {
617 let services_table = qualified_table(&runtime.schema, "xbp_project_services");
618 for service in services {
619 upsert_project_service(client, &services_table, &project_id, service).await?;
620 }
621 }
622
623 Ok(project_id)
624}
625async fn upsert_project_service(
626 client: &AthenaClient,
627 table: &str,
628 project_id: &str,
629 service: &ServiceConfig,
630) -> Result<(), String> {
631 let commands = serde_json::to_value(&service.commands).unwrap_or_else(|_| json!({}));
632 let environment = serde_json::to_value(&service.environment).unwrap_or_else(|_| json!({}));
633 let metadata = build_service_metadata(service);
634 let sql = format!(
635 "INSERT INTO {table} (project_id, service_name, target, branch, port, root_directory, url, healthcheck_path, restart_policy, start_wrapper, systemd_service_name, commands, environment, metadata, updated_at) \
636 VALUES ({project_id}::uuid, {service_name}, {target}, {branch}, {port}, {root_directory}, {url}, {healthcheck_path}, {restart_policy}, {start_wrapper}, {systemd_service_name}, {commands}, {environment}, {metadata}, now()) \
637 ON CONFLICT (project_id, service_name) DO UPDATE SET \
638 target = EXCLUDED.target, \
639 branch = EXCLUDED.branch, \
640 port = EXCLUDED.port, \
641 root_directory = EXCLUDED.root_directory, \
642 url = EXCLUDED.url, \
643 healthcheck_path = EXCLUDED.healthcheck_path, \
644 restart_policy = EXCLUDED.restart_policy, \
645 start_wrapper = EXCLUDED.start_wrapper, \
646 systemd_service_name = EXCLUDED.systemd_service_name, \
647 commands = EXCLUDED.commands, \
648 environment = EXCLUDED.environment, \
649 metadata = EXCLUDED.metadata, \
650 updated_at = now()",
651 table = table,
652 project_id = sql_literal(&Value::String(project_id.to_string())),
653 service_name = sql_literal(&Value::String(service.name.clone())),
654 target = sql_literal(&Value::String(service.target.clone())),
655 branch = sql_literal(&Value::String(service.branch.clone())),
656 port = service.port,
657 root_directory = optional_text_literal(service.root_directory.as_deref()),
658 url = optional_text_literal(service.url.as_deref()),
659 healthcheck_path = optional_text_literal(service.healthcheck_path.as_deref()),
660 restart_policy = optional_text_literal(service.restart_policy.as_deref()),
661 start_wrapper = optional_text_literal(service.start_wrapper.as_deref()),
662 systemd_service_name = optional_text_literal(service.systemd_service_name.as_deref()),
663 commands = sql_literal(&commands),
664 environment = sql_literal(&environment),
665 metadata = sql_literal(&metadata),
666 );
667
668 let _ = execute_sql(client, &sql).await?;
669 Ok(())
670}
671
672fn build_project_metadata(config: &XbpConfig) -> Value {
673 json!({
674 "services_count": config.services.as_ref().map(|services| services.len()).unwrap_or(0),
675 "monitor_url": config.monitor_url,
676 "kafka_topic": config.kafka_topic,
677 "systemd_service_name": config.systemd_service_name,
678 })
679}
680
681fn build_service_metadata(service: &ServiceConfig) -> Value {
682 json!({
683 "force_run_from_root": service.force_run_from_root,
684 "restart_policy_max_failure_count": service.restart_policy_max_failure_count,
685 })
686}
687
688fn resolve_connection_pair(database: Option<&DatabaseConfig>) -> Option<(String, String)> {
689 let block_url = database
690 .and_then(|db| db.url_env.as_deref())
691 .and_then(read_env_nonempty);
692 let block_key = database
693 .and_then(|db| db.key_env.as_deref())
694 .and_then(read_env_nonempty);
695
696 if let (Some(url), Some(key)) = (block_url, block_key) {
697 return Some((url, key));
698 }
699
700 let xbp_url = read_env_nonempty("XBP_ATHENA_URL");
701 let xbp_key = read_env_nonempty("XBP_ATHENA_KEY");
702 if let (Some(url), Some(key)) = (xbp_url, xbp_key) {
703 return Some((url, key));
704 }
705
706 let supabase_url = read_env_nonempty("SUPABASE_URL");
707 let supabase_key = read_env_nonempty("SUPABASE_KEY");
708 if let (Some(url), Some(key)) = (supabase_url, supabase_key) {
709 return Some((url, key));
710 }
711
712 let xlx_supabase_url = read_env_nonempty("XLX_SUPABASE_URL");
713 let xlx_supabase_key = read_env_nonempty("XLX_SUPABASE_ANON_KEY");
714 if let (Some(url), Some(key)) = (xlx_supabase_url, xlx_supabase_key) {
715 return Some((url, key));
716 }
717
718 None
719}
720
721fn read_env_nonempty(name: &str) -> Option<String> {
722 match env::var(name) {
723 Ok(value) if !value.trim().is_empty() => Some(value),
724 _ => None,
725 }
726}
727
728fn value_or_default(primary: Option<&str>, secondary: Option<&str>, default: &str) -> String {
729 if let Some(value) = primary {
730 let trimmed = value.trim();
731 if !trimmed.is_empty() {
732 return trimmed.to_string();
733 }
734 }
735 if let Some(value) = secondary {
736 let trimmed = value.trim();
737 if !trimmed.is_empty() {
738 return trimmed.to_string();
739 }
740 }
741 default.to_string()
742}
743
744async fn execute_sql(client: &AthenaClient, sql: &str) -> Result<QueryResult, String> {
745 client
746 .execute_sql(sql)
747 .await
748 .map_err(|err| format!("athena sql execution failed: {err}"))
749}
750
751fn query_first_column_as_string(result: &QueryResult, key: &str) -> Option<String> {
752 result.rows.first().and_then(|row| {
753 row.get(key).and_then(|value| {
754 value
755 .as_str()
756 .map(|value| value.to_string())
757 .or_else(|| value.as_i64().map(|value| value.to_string()))
758 })
759 })
760}
761
762fn load_current_project_context() -> Option<ProjectContext> {
763 let current_dir = env::current_dir().ok()?;
764 let found = find_xbp_config_upwards(¤t_dir)?;
765 let content = std::fs::read_to_string(&found.config_path).ok()?;
766 let (config, _) = parse_config_with_auto_heal::<XbpConfig>(&content, found.kind).ok()?;
767
768 Some(ProjectContext {
769 project_root: found.project_root,
770 config,
771 config_kind: found.kind.to_string(),
772 })
773}
774
775fn log_level_label(level: &LogLevel) -> &'static str {
776 match level {
777 LogLevel::Info => "INFO",
778 LogLevel::Warning => "WARN",
779 LogLevel::Error => "ERROR",
780 LogLevel::Debug => "DEBUG",
781 LogLevel::Success => "SUCCESS",
782 }
783}
784
785fn to_rfc3339(ts: DateTime<Local>) -> String {
786 ts.with_timezone(&Utc).to_rfc3339()
787}
788
789fn sanitize_identifier(value: &str) -> Option<String> {
790 let trimmed = value.trim();
791 if trimmed.is_empty() {
792 return None;
793 }
794 let mut chars = trimmed.chars();
795 let first = chars.next()?;
796 if !(first.is_ascii_alphabetic() || first == '_') {
797 return None;
798 }
799 if chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
800 Some(trimmed.to_string())
801 } else {
802 None
803 }
804}
805
806fn qualified_table(schema: &str, table: &str) -> String {
807 let schema = sanitize_identifier(schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
808 format!("{}.{}", schema, table)
809}
810
811pub fn sql_literal(value: &Value) -> String {
812 match value {
813 Value::Null => "NULL".to_string(),
814 Value::Bool(boolean) => boolean.to_string(),
815 Value::Number(number) => number.to_string(),
816 Value::String(text) => format!("'{}'", text.replace('\'', "''")),
817 Value::Array(_) | Value::Object(_) => {
818 format!("'{}'::jsonb", value.to_string().replace('\'', "''"))
819 }
820 }
821}
822
823fn optional_text_literal(value: Option<&str>) -> String {
824 match value {
825 Some(value) if !value.trim().is_empty() => sql_literal(&Value::String(value.to_string())),
826 _ => "NULL".to_string(),
827 }
828}
829
830fn optional_u64_literal(value: Option<u64>) -> String {
831 match value {
832 Some(value) => value.to_string(),
833 None => "NULL".to_string(),
834 }
835}
836
837fn optional_uuid_literal(value: Option<&str>) -> String {
838 match value {
839 Some(value) if !value.trim().is_empty() => {
840 format!("{}::uuid", sql_literal(&Value::String(value.to_string())))
841 }
842 _ => "NULL".to_string(),
843 }
844}
845
846pub async fn with_fail_open<T, F>(operation: &str, fut: F) -> Option<T>
847where
848 F: Future<Output = Result<T, String>>,
849{
850 match fut.await {
851 Ok(result) => Some(result),
852 Err(error) if error == DB_NOT_CONFIGURED => None,
853 Err(error) => {
854 warn!("{} (fail-open): {}", operation, error);
855 None
856 }
857 }
858}
859#[cfg(test)]
860mod tests {
861 use super::{
862 build_project_metadata, build_service_metadata, extract_cron_restart_expression,
863 resolve_runtime_config, sql_literal, with_fail_open,
864 };
865 use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
866 use serde_json::json;
867 use std::collections::HashMap;
868 use std::sync::Mutex;
869
870 static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
871
872 fn base_config() -> XbpConfig {
873 XbpConfig {
874 project_name: "demo".to_string(),
875 version: "0.1.0".to_string(),
876 port: 3000,
877 build_dir: "/tmp/demo".to_string(),
878 app_type: None,
879 build_command: None,
880 start_command: None,
881 install_command: None,
882 environment: None,
883 services: None,
884 systemd_service_name: None,
885 systemd: None,
886 kafka_brokers: None,
887 kafka_topic: None,
888 kafka_public_url: None,
889 log_files: None,
890 monitor_url: None,
891 monitor_method: None,
892 monitor_expected_code: None,
893 monitor_interval: None,
894 target: None,
895 branch: None,
896 crate_name: None,
897 npm_script: None,
898 port_storybook: None,
899 url: None,
900 url_storybook: None,
901 database: None,
902 linear: None,
903 github: None,
904 publish: None,
905 version_targets: Vec::new(),
906 }
907 }
908
909 fn clear_env() {
910 for key in [
911 "BLOCK_URL",
912 "BLOCK_KEY",
913 "XBP_ATHENA_BACKEND",
914 "XBP_ATHENA_URL",
915 "XBP_ATHENA_KEY",
916 "SUPABASE_URL",
917 "SUPABASE_KEY",
918 "XLX_SUPABASE_URL",
919 "XLX_SUPABASE_ANON_KEY",
920 ] {
921 std::env::remove_var(key);
922 }
923 }
924
925 #[test]
926 fn resolve_runtime_config_prefers_database_block_env_binding() {
927 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
928 clear_env();
929 std::env::set_var("BLOCK_URL", "https://block.example");
930 std::env::set_var("BLOCK_KEY", "block-key");
931 std::env::set_var("XBP_ATHENA_URL", "https://xbp.example");
932 std::env::set_var("XBP_ATHENA_KEY", "xbp-key");
933
934 let mut config = base_config();
935 config.database = Some(DatabaseConfig {
936 enabled: Some(true),
937 backend: Some("postgres".to_string()),
938 url_env: Some("BLOCK_URL".to_string()),
939 key_env: Some("BLOCK_KEY".to_string()),
940 schema: Some("custom_schema".to_string()),
941 });
942
943 let runtime = resolve_runtime_config(Some(&config)).expect("runtime");
944 assert_eq!(runtime.backend, "postgres");
945 assert_eq!(runtime.url, "https://block.example");
946 assert_eq!(runtime.key, "block-key");
947 assert_eq!(runtime.schema, "custom_schema");
948 }
949
950 #[test]
951 fn resolve_runtime_config_uses_xbp_env_before_supabase_fallback() {
952 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
953 clear_env();
954 std::env::set_var("XBP_ATHENA_BACKEND", "supabase");
955 std::env::set_var("XBP_ATHENA_URL", "https://xbp.env");
956 std::env::set_var("XBP_ATHENA_KEY", "xbp-env-key");
957 std::env::set_var("SUPABASE_URL", "https://supabase.env");
958 std::env::set_var("SUPABASE_KEY", "supabase-key");
959
960 let runtime = resolve_runtime_config(None).expect("runtime");
961 assert_eq!(runtime.url, "https://xbp.env");
962 assert_eq!(runtime.key, "xbp-env-key");
963 }
964
965 #[test]
966 fn resolve_runtime_config_falls_back_to_supabase_then_xlx_supabase() {
967 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
968 clear_env();
969 std::env::set_var("SUPABASE_URL", "https://supabase.env");
970 std::env::set_var("SUPABASE_KEY", "supabase-key");
971 let runtime = resolve_runtime_config(None).expect("runtime");
972 assert_eq!(runtime.url, "https://supabase.env");
973 assert_eq!(runtime.key, "supabase-key");
974
975 clear_env();
976 std::env::set_var("XLX_SUPABASE_URL", "https://xlx-supabase.env");
977 std::env::set_var("XLX_SUPABASE_ANON_KEY", "xlx-key");
978 let runtime = resolve_runtime_config(None).expect("runtime");
979 assert_eq!(runtime.url, "https://xlx-supabase.env");
980 assert_eq!(runtime.key, "xlx-key");
981 }
982
983 #[test]
984 fn resolve_runtime_config_respects_database_disable_switch() {
985 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
986 clear_env();
987 std::env::set_var("SUPABASE_URL", "https://supabase.env");
988 std::env::set_var("SUPABASE_KEY", "supabase-key");
989
990 let mut config = base_config();
991 config.database = Some(DatabaseConfig {
992 enabled: Some(false),
993 backend: None,
994 url_env: None,
995 key_env: None,
996 schema: None,
997 });
998
999 assert!(resolve_runtime_config(Some(&config)).is_none());
1000 }
1001
1002 #[test]
1003 fn sql_literal_escapes_quotes_and_json_payloads() {
1004 assert_eq!(sql_literal(&json!("O'Hara")), "'O''Hara'");
1005 assert_eq!(sql_literal(&json!(true)), "true");
1006 assert_eq!(sql_literal(&json!(12.5)), "12.5");
1007 assert_eq!(
1008 sql_literal(&json!({"nested":"quote's"})),
1009 "'{\"nested\":\"quote''s\"}'::jsonb"
1010 );
1011 }
1012
1013 #[test]
1014 fn payload_mappers_produce_expected_shapes() {
1015 let mut config = base_config();
1016 config.monitor_url = Some("https://monitor.example".to_string());
1017 config.kafka_topic = Some("xbp.logs".to_string());
1018 config.systemd_service_name = Some("xbp-api".to_string());
1019 config.services = Some(vec![ServiceConfig {
1020 name: "api".to_string(),
1021 target: "rust".to_string(),
1022 branch: "main".to_string(),
1023 port: 8080,
1024 root_directory: Some("services/api".to_string()),
1025 environment: Some(HashMap::from([(
1026 "RUST_LOG".to_string(),
1027 "info".to_string(),
1028 )])),
1029 url: Some("https://api.example.com".to_string()),
1030 healthcheck_path: Some("/health".to_string()),
1031 restart_policy: Some("always".to_string()),
1032 restart_policy_max_failure_count: Some(5),
1033 start_wrapper: Some("pm2".to_string()),
1034 commands: None,
1035 force_run_from_root: Some(false),
1036 version_targets: None,
1037 systemd_service_name: Some("xbp-api".to_string()),
1038 systemd: None,
1039 }]);
1040
1041 let project_meta = build_project_metadata(&config);
1042 assert_eq!(project_meta["services_count"], 1);
1043 assert_eq!(project_meta["kafka_topic"], "xbp.logs");
1044
1045 let service_meta =
1046 build_service_metadata(config.services.as_ref().unwrap().first().unwrap());
1047 assert_eq!(service_meta["force_run_from_root"], false);
1048 assert_eq!(service_meta["restart_policy_max_failure_count"], 5);
1049 }
1050
1051 #[tokio::test]
1052 async fn fail_open_wrapper_returns_none_on_error_and_value_on_success() {
1053 let success = with_fail_open("test-success", async { Ok::<_, String>(42) }).await;
1054 assert_eq!(success, Some(42));
1055
1056 let failed = with_fail_open::<i32, _>("test-failure", async {
1057 Err::<i32, _>("forced failure".to_string())
1058 })
1059 .await;
1060 assert_eq!(failed, None);
1061 }
1062
1063 #[test]
1064 fn cron_restart_parser_handles_split_and_equals_forms() {
1065 let args = vec![
1066 "npm".to_string(),
1067 "start".to_string(),
1068 "--cron-restart".to_string(),
1069 "0 */6 * * *".to_string(),
1070 ];
1071 assert_eq!(
1072 extract_cron_restart_expression(&args),
1073 Some("0 */6 * * *".to_string())
1074 );
1075
1076 let args = vec![
1077 "npm".to_string(),
1078 "start".to_string(),
1079 "--cron-restart=*/5 * * * *".to_string(),
1080 ];
1081 assert_eq!(
1082 extract_cron_restart_expression(&args),
1083 Some("*/5 * * * *".to_string())
1084 );
1085 }
1086}