1use athena_rs::client::backend::QueryResult;
2use athena_rs::AthenaClient;
3use chrono::{DateTime, Local, Utc};
4use once_cell::sync::Lazy;
5use serde_json::{json, Value};
6use std::collections::HashSet;
7use std::env;
8use std::future::Future;
9use std::path::{Path, PathBuf};
10use tokio::sync::Mutex;
11use tracing::warn;
12
13use crate::logging::{LogEntry, LogLevel};
14use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
15use crate::utils::{find_xbp_config_upwards, parse_config_with_auto_heal};
16
17const DB_NOT_CONFIGURED: &str = "athena database is not configured";
18const DEFAULT_BACKEND: &str = "supabase";
19const DEFAULT_SCHEMA: &str = "public";
20const BOOTSTRAP_SQL: &str = include_str!("../../sql/schema.sql");
21
22static BOOTSTRAPPED_BACKENDS: Lazy<Mutex<HashSet<String>>> =
23 Lazy::new(|| Mutex::new(HashSet::new()));
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct AthenaRuntimeConfig {
27 pub backend: String,
28 pub url: String,
29 pub key: String,
30 pub schema: String,
31}
32
33#[derive(Debug, Clone)]
34struct ProjectContext {
35 project_root: PathBuf,
36 config: XbpConfig,
37 config_kind: String,
38}
39
40pub async fn persist_project_snapshot(
41 project_root: &Path,
42 config: &XbpConfig,
43 config_kind: Option<&str>,
44) {
45 let _ = with_fail_open(
46 "persist project snapshot",
47 persist_project_snapshot_inner(project_root, config, config_kind),
48 )
49 .await;
50}
51
52pub async fn persist_log_entry(entry: &LogEntry) {
53 let _ = with_fail_open("persist xbp log entry", persist_log_entry_inner(entry)).await;
54}
55
56pub async fn persist_nginx_config_snapshot(
57 domain: &str,
58 config_path: &Path,
59 content: &str,
60 upstream_ports: &[u16],
61 listen_ports: &[u16],
62 source: &str,
63) {
64 let _ = with_fail_open(
65 "persist nginx config snapshot",
66 persist_nginx_config_snapshot_inner(
67 domain,
68 config_path,
69 content,
70 upstream_ports,
71 listen_ports,
72 source,
73 ),
74 )
75 .await;
76}
77
78pub async fn persist_nginx_log(
79 domain: Option<&str>,
80 action: &str,
81 success: bool,
82 message: &str,
83 details: Option<&str>,
84 metadata: Value,
85) {
86 let _ = with_fail_open(
87 "persist nginx log",
88 persist_nginx_log_inner(domain, action, success, message, details, metadata),
89 )
90 .await;
91}
92
93pub async fn persist_nginx_edit_audit_log(
94 domain: Option<&str>,
95 config_path: Option<&Path>,
96 actor: Option<&str>,
97 action: &str,
98 old_content: Option<&str>,
99 new_content: Option<&str>,
100 metadata: Value,
101) {
102 let _ = with_fail_open(
103 "persist nginx edit audit log",
104 persist_nginx_edit_audit_log_inner(
105 domain,
106 config_path,
107 actor,
108 action,
109 old_content,
110 new_content,
111 metadata,
112 ),
113 )
114 .await;
115}
116
117pub async fn persist_docker_container_snapshot(
118 container_id: &str,
119 container_name: &str,
120 status: Option<&str>,
121 ports: Option<&str>,
122 metadata: Value,
123) {
124 let _ = with_fail_open(
125 "persist docker container snapshot",
126 persist_docker_container_snapshot_inner(
127 container_id,
128 container_name,
129 status,
130 ports,
131 metadata,
132 ),
133 )
134 .await;
135}
136
137pub async fn persist_docker_log(
138 container_id: Option<&str>,
139 command: Option<&str>,
140 stream: &str,
141 message: &str,
142 metadata: Value,
143) {
144 let _ = with_fail_open(
145 "persist docker log",
146 persist_docker_log_inner(container_id, command, stream, message, metadata),
147 )
148 .await;
149}
150
151pub async fn persist_schedule(
152 schedule_type: &str,
153 target_kind: &str,
154 target_ref: Option<&str>,
155 expression: &str,
156 enabled: bool,
157 metadata: Value,
158) {
159 let _ = with_fail_open(
160 "persist schedule",
161 persist_schedule_inner(
162 schedule_type,
163 target_kind,
164 target_ref,
165 expression,
166 enabled,
167 metadata,
168 ),
169 )
170 .await;
171}
172
173pub fn extract_cron_restart_expression(args: &[String]) -> Option<String> {
174 for (index, arg) in args.iter().enumerate() {
175 if arg == "--cron-restart" {
176 if let Some(value) = args.get(index + 1) {
177 let value = value.trim();
178 if !value.is_empty() {
179 return Some(value.to_string());
180 }
181 }
182 } else if let Some(value) = arg.strip_prefix("--cron-restart=") {
183 let value = value.trim();
184 if !value.is_empty() {
185 return Some(value.to_string());
186 }
187 }
188 }
189 None
190}
191
192pub fn resolve_runtime_config(config: Option<&XbpConfig>) -> Option<AthenaRuntimeConfig> {
193 let database = config.and_then(|cfg| cfg.database.as_ref());
194 let enabled = database.and_then(|db| db.enabled).unwrap_or(true);
195 if !enabled {
196 return None;
197 }
198
199 let backend = value_or_default(
200 database.and_then(|db| db.backend.as_deref()),
201 env::var("XBP_ATHENA_BACKEND").ok().as_deref(),
202 DEFAULT_BACKEND,
203 );
204
205 let (url, key) = resolve_connection_pair(database)?;
206 let schema = sanitize_identifier(
207 value_or_default(
208 database.and_then(|db| db.schema.as_deref()),
209 None,
210 DEFAULT_SCHEMA,
211 )
212 .as_str(),
213 )
214 .unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
215
216 Some(AthenaRuntimeConfig {
217 backend,
218 url,
219 key,
220 schema,
221 })
222}
223
224async fn persist_project_snapshot_inner(
225 project_root: &Path,
226 config: &XbpConfig,
227 config_kind: Option<&str>,
228) -> Result<(), String> {
229 let (client, runtime) = initialize_client(Some(config)).await?;
230 let _ =
231 upsert_project_snapshot_with_client(&client, &runtime, project_root, config, config_kind)
232 .await?;
233 Ok(())
234}
235async fn persist_log_entry_inner(entry: &LogEntry) -> Result<(), String> {
236 let context = load_current_project_context();
237 let config_ref = context.as_ref().map(|ctx| &ctx.config);
238 let (client, runtime) = initialize_client(config_ref).await?;
239
240 let project_id = if let Some(ctx) = context.as_ref() {
241 Some(
242 upsert_project_snapshot_with_client(
243 &client,
244 &runtime,
245 &ctx.project_root,
246 &ctx.config,
247 Some(ctx.config_kind.as_str()),
248 )
249 .await?,
250 )
251 } else {
252 None
253 };
254
255 let log_table = qualified_table(&runtime.schema, "xbp_logs");
256 let timestamp = to_rfc3339(entry.timestamp);
257 let metadata = json!({
258 "project_name": context.as_ref().map(|ctx| ctx.config.project_name.clone()),
259 "project_path": context.as_ref().map(|ctx| ctx.project_root.display().to_string()),
260 });
261
262 let global_sql = format!(
263 "INSERT INTO {table} (log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
264 VALUES ({log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now()) \
265 RETURNING log_id",
266 table = log_table,
267 log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
268 command = sql_literal(&Value::String(entry.command.clone())),
269 message = sql_literal(&Value::String(entry.message.clone())),
270 details = optional_text_literal(entry.details.as_deref()),
271 duration = optional_u64_literal(entry.duration_ms),
272 occurred_at = sql_literal(&Value::String(timestamp)),
273 metadata = sql_literal(&metadata),
274 );
275
276 let global_result = execute_sql(&client, &global_sql).await?;
277 let global_log_id = query_first_column_as_string(&global_result, "log_id");
278
279 if let (Some(project_id), Some(global_log_id)) =
280 (project_id.as_deref(), global_log_id.as_deref())
281 {
282 let project_table = qualified_table(&runtime.schema, "xbp_project_logs");
283 let project_sql = format!(
284 "INSERT INTO {table} (project_id, global_log_id, log_level, command, message, details, duration_ms, occurred_at, metadata, updated_at) \
285 VALUES ({project_id}::uuid, {global_log_id}::uuid, {log_level}, {command}, {message}, {details}, {duration}, {occurred_at}::timestamptz, {metadata}, now())",
286 table = project_table,
287 project_id = sql_literal(&Value::String(project_id.to_string())),
288 global_log_id = sql_literal(&Value::String(global_log_id.to_string())),
289 log_level = sql_literal(&Value::String(log_level_label(&entry.level).to_string())),
290 command = sql_literal(&Value::String(entry.command.clone())),
291 message = sql_literal(&Value::String(entry.message.clone())),
292 details = optional_text_literal(entry.details.as_deref()),
293 duration = optional_u64_literal(entry.duration_ms),
294 occurred_at = sql_literal(&Value::String(to_rfc3339(entry.timestamp))),
295 metadata = sql_literal(&metadata),
296 );
297 let _ = execute_sql(&client, &project_sql).await?;
298 }
299
300 Ok(())
301}
302
303async fn persist_nginx_config_snapshot_inner(
304 domain: &str,
305 config_path: &Path,
306 content: &str,
307 upstream_ports: &[u16],
308 listen_ports: &[u16],
309 source: &str,
310) -> Result<(), String> {
311 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
312 let table = qualified_table(&runtime.schema, "xbp_nginx_configs");
313 let metadata = json!({ "source": source });
314 let upstream = Value::Array(
315 upstream_ports
316 .iter()
317 .map(|port| Value::from(*port as u64))
318 .collect(),
319 );
320 let listen = Value::Array(
321 listen_ports
322 .iter()
323 .map(|port| Value::from(*port as u64))
324 .collect(),
325 );
326
327 let sql = format!(
328 "INSERT INTO {table} (project_id, domain, config_path, content, upstream_ports, listen_ports, metadata, updated_at) \
329 VALUES ({project_id}, {domain}, {config_path}, {content}, {upstream_ports}, {listen_ports}, {metadata}, now()) \
330 ON CONFLICT (domain, config_path) DO UPDATE SET \
331 project_id = EXCLUDED.project_id, \
332 content = EXCLUDED.content, \
333 upstream_ports = EXCLUDED.upstream_ports, \
334 listen_ports = EXCLUDED.listen_ports, \
335 metadata = EXCLUDED.metadata, \
336 updated_at = now()",
337 table = table,
338 project_id = optional_uuid_literal(project_id.as_deref()),
339 domain = sql_literal(&Value::String(domain.to_string())),
340 config_path = sql_literal(&Value::String(config_path.display().to_string())),
341 content = sql_literal(&Value::String(content.to_string())),
342 upstream_ports = sql_literal(&upstream),
343 listen_ports = sql_literal(&listen),
344 metadata = sql_literal(&metadata),
345 );
346
347 let _ = execute_sql(&client, &sql).await?;
348 Ok(())
349}
350
351async fn persist_nginx_log_inner(
352 domain: Option<&str>,
353 action: &str,
354 success: bool,
355 message: &str,
356 details: Option<&str>,
357 metadata: Value,
358) -> Result<(), String> {
359 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
360 let table = qualified_table(&runtime.schema, "xbp_nginx_logs");
361
362 let sql = format!(
363 "INSERT INTO {table} (project_id, domain, action, success, message, details, occurred_at, metadata, updated_at) \
364 VALUES ({project_id}, {domain}, {action}, {success}, {message}, {details}, now(), {metadata}, now())",
365 table = table,
366 project_id = optional_uuid_literal(project_id.as_deref()),
367 domain = optional_text_literal(domain),
368 action = sql_literal(&Value::String(action.to_string())),
369 success = if success { "true" } else { "false" },
370 message = sql_literal(&Value::String(message.to_string())),
371 details = optional_text_literal(details),
372 metadata = sql_literal(&metadata),
373 );
374
375 let _ = execute_sql(&client, &sql).await?;
376 Ok(())
377}
378
379async fn persist_nginx_edit_audit_log_inner(
380 domain: Option<&str>,
381 config_path: Option<&Path>,
382 actor: Option<&str>,
383 action: &str,
384 old_content: Option<&str>,
385 new_content: Option<&str>,
386 metadata: Value,
387) -> Result<(), String> {
388 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
389 let table = qualified_table(&runtime.schema, "xbp_nginx_edit_audit_logs");
390
391 let sql = format!(
392 "INSERT INTO {table} (project_id, domain, config_path, actor, action, old_content, new_content, occurred_at, metadata, updated_at) \
393 VALUES ({project_id}, {domain}, {config_path}, {actor}, {action}, {old_content}, {new_content}, now(), {metadata}, now())",
394 table = table,
395 project_id = optional_uuid_literal(project_id.as_deref()),
396 domain = optional_text_literal(domain),
397 config_path = config_path
398 .map(|path| sql_literal(&Value::String(path.display().to_string())))
399 .unwrap_or_else(|| "NULL".to_string()),
400 actor = optional_text_literal(actor),
401 action = sql_literal(&Value::String(action.to_string())),
402 old_content = optional_text_literal(old_content),
403 new_content = optional_text_literal(new_content),
404 metadata = sql_literal(&metadata),
405 );
406
407 let _ = execute_sql(&client, &sql).await?;
408 Ok(())
409}
410
411async fn persist_docker_container_snapshot_inner(
412 container_id: &str,
413 container_name: &str,
414 status: Option<&str>,
415 ports: Option<&str>,
416 metadata: Value,
417) -> Result<(), String> {
418 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
419 let table = qualified_table(&runtime.schema, "xbp_docker_containers");
420 let sql = format!(
421 "INSERT INTO {table} (project_id, container_id, container_name, status, ports, inspected_at, metadata, updated_at) \
422 VALUES ({project_id}, {container_id}, {container_name}, {status}, {ports}, now(), {metadata}, now()) \
423 ON CONFLICT (container_id) DO UPDATE SET \
424 project_id = EXCLUDED.project_id, \
425 container_name = EXCLUDED.container_name, \
426 status = EXCLUDED.status, \
427 ports = EXCLUDED.ports, \
428 inspected_at = EXCLUDED.inspected_at, \
429 metadata = EXCLUDED.metadata, \
430 updated_at = now()",
431 table = table,
432 project_id = optional_uuid_literal(project_id.as_deref()),
433 container_id = sql_literal(&Value::String(container_id.to_string())),
434 container_name = sql_literal(&Value::String(container_name.to_string())),
435 status = optional_text_literal(status),
436 ports = optional_text_literal(ports),
437 metadata = sql_literal(&metadata),
438 );
439
440 let _ = execute_sql(&client, &sql).await?;
441 Ok(())
442}
443async fn persist_docker_log_inner(
444 container_id: Option<&str>,
445 command: Option<&str>,
446 stream: &str,
447 message: &str,
448 metadata: Value,
449) -> Result<(), String> {
450 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
451 let table = qualified_table(&runtime.schema, "xbp_docker_logs");
452 let sql = format!(
453 "INSERT INTO {table} (project_id, container_id, command, stream, message, occurred_at, metadata, updated_at) \
454 VALUES ({project_id}, {container_id}, {command}, {stream}, {message}, now(), {metadata}, now())",
455 table = table,
456 project_id = optional_uuid_literal(project_id.as_deref()),
457 container_id = optional_text_literal(container_id),
458 command = optional_text_literal(command),
459 stream = sql_literal(&Value::String(stream.to_string())),
460 message = sql_literal(&Value::String(message.to_string())),
461 metadata = sql_literal(&metadata),
462 );
463
464 let _ = execute_sql(&client, &sql).await?;
465 Ok(())
466}
467
468async fn persist_schedule_inner(
469 schedule_type: &str,
470 target_kind: &str,
471 target_ref: Option<&str>,
472 expression: &str,
473 enabled: bool,
474 metadata: Value,
475) -> Result<(), String> {
476 let (client, runtime, project_id) = initialize_client_with_project_context().await?;
477 let table = qualified_table(&runtime.schema, "xbp_schedules");
478 let normalized_target_ref = target_ref.unwrap_or("");
479 let sql = format!(
480 "INSERT INTO {table} (project_id, schedule_type, target_kind, target_ref, expression, timezone, enabled, metadata, occurred_at, updated_at) \
481 VALUES ({project_id}, {schedule_type}, {target_kind}, {target_ref}, {expression}, {timezone}, {enabled}, {metadata}, now(), now()) \
482 ON CONFLICT (schedule_type, target_kind, target_ref, expression) DO UPDATE SET \
483 project_id = EXCLUDED.project_id, \
484 timezone = EXCLUDED.timezone, \
485 enabled = EXCLUDED.enabled, \
486 metadata = EXCLUDED.metadata, \
487 occurred_at = EXCLUDED.occurred_at, \
488 updated_at = now()",
489 table = table,
490 project_id = optional_uuid_literal(project_id.as_deref()),
491 schedule_type = sql_literal(&Value::String(schedule_type.to_string())),
492 target_kind = sql_literal(&Value::String(target_kind.to_string())),
493 target_ref = sql_literal(&Value::String(normalized_target_ref.to_string())),
494 expression = sql_literal(&Value::String(expression.to_string())),
495 timezone = sql_literal(&Value::String("UTC".to_string())),
496 enabled = if enabled { "true" } else { "false" },
497 metadata = sql_literal(&metadata),
498 );
499
500 let _ = execute_sql(&client, &sql).await?;
501 Ok(())
502}
503
504async fn initialize_client_with_project_context(
505) -> Result<(AthenaClient, AthenaRuntimeConfig, Option<String>), String> {
506 let context = load_current_project_context();
507 let config_ref = context.as_ref().map(|ctx| &ctx.config);
508 let (client, runtime) = initialize_client(config_ref).await?;
509 let project_id = if let Some(ctx) = context {
510 Some(
511 upsert_project_snapshot_with_client(
512 &client,
513 &runtime,
514 &ctx.project_root,
515 &ctx.config,
516 Some(ctx.config_kind.as_str()),
517 )
518 .await?,
519 )
520 } else {
521 None
522 };
523 Ok((client, runtime, project_id))
524}
525
526async fn initialize_client(
527 config: Option<&XbpConfig>,
528) -> Result<(AthenaClient, AthenaRuntimeConfig), String> {
529 let runtime = resolve_runtime_config(config).ok_or_else(|| DB_NOT_CONFIGURED.to_string())?;
530
531 let client = AthenaClient::new_with_backend_name(
532 runtime.url.clone(),
533 runtime.key.clone(),
534 &runtime.backend,
535 )
536 .await
537 .map_err(|err| format!("failed to initialize Athena client: {err}"))?;
538
539 ensure_schema_bootstrapped(&client, &runtime).await?;
540 Ok((client, runtime))
541}
542
543async fn ensure_schema_bootstrapped(
544 client: &AthenaClient,
545 runtime: &AthenaRuntimeConfig,
546) -> Result<(), String> {
547 let key = format!("{}|{}|{}", runtime.backend, runtime.url, runtime.schema);
548 let mut guard = BOOTSTRAPPED_BACKENDS.lock().await;
549 if guard.contains(&key) {
550 return Ok(());
551 }
552
553 let schema = sanitize_identifier(&runtime.schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
554 let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS {}", schema);
555 let _ = execute_sql(client, &create_schema_sql).await?;
556
557 for statement in BOOTSTRAP_SQL.split(';') {
558 let trimmed = statement.trim();
559 if trimmed.is_empty() {
560 continue;
561 }
562 let sql = if schema == DEFAULT_SCHEMA {
563 format!("{};", trimmed)
564 } else {
565 format!("SET search_path TO {}; {};", schema, trimmed)
566 };
567 let _ = execute_sql(client, &sql).await?;
568 }
569
570 guard.insert(key);
571 Ok(())
572}
573
574async fn upsert_project_snapshot_with_client(
575 client: &AthenaClient,
576 runtime: &AthenaRuntimeConfig,
577 project_root: &Path,
578 config: &XbpConfig,
579 config_kind: Option<&str>,
580) -> Result<String, String> {
581 let project_table = qualified_table(&runtime.schema, "xbp_projects");
582 let metadata = build_project_metadata(config);
583 let project_sql = format!(
584 "INSERT INTO {table} (project_name, project_path, version, build_dir, port, app_type, branch, target, config_kind, metadata, updated_at) \
585 VALUES ({project_name}, {project_path}, {version}, {build_dir}, {port}, {app_type}, {branch}, {target}, {config_kind}, {metadata}, now()) \
586 ON CONFLICT (project_path) DO UPDATE SET \
587 project_name = EXCLUDED.project_name, \
588 version = EXCLUDED.version, \
589 build_dir = EXCLUDED.build_dir, \
590 port = EXCLUDED.port, \
591 app_type = EXCLUDED.app_type, \
592 branch = EXCLUDED.branch, \
593 target = EXCLUDED.target, \
594 config_kind = EXCLUDED.config_kind, \
595 metadata = EXCLUDED.metadata, \
596 updated_at = now() \
597 RETURNING project_id",
598 table = project_table,
599 project_name = sql_literal(&Value::String(config.project_name.clone())),
600 project_path = sql_literal(&Value::String(project_root.display().to_string())),
601 version = sql_literal(&Value::String(config.version.clone())),
602 build_dir = sql_literal(&Value::String(config.build_dir.clone())),
603 port = config.port,
604 app_type = optional_text_literal(config.app_type.as_deref()),
605 branch = optional_text_literal(config.branch.as_deref()),
606 target = optional_text_literal(config.target.as_deref()),
607 config_kind = optional_text_literal(config_kind),
608 metadata = sql_literal(&metadata),
609 );
610
611 let project_result = execute_sql(client, &project_sql).await?;
612 let project_id = query_first_column_as_string(&project_result, "project_id")
613 .ok_or_else(|| "failed to resolve project_id from upsert".to_string())?;
614
615 if let Some(services) = config.services.as_ref() {
616 let services_table = qualified_table(&runtime.schema, "xbp_project_services");
617 for service in services {
618 upsert_project_service(client, &services_table, &project_id, service).await?;
619 }
620 }
621
622 Ok(project_id)
623}
624async fn upsert_project_service(
625 client: &AthenaClient,
626 table: &str,
627 project_id: &str,
628 service: &ServiceConfig,
629) -> Result<(), String> {
630 let commands = serde_json::to_value(&service.commands).unwrap_or_else(|_| json!({}));
631 let environment = serde_json::to_value(&service.environment).unwrap_or_else(|_| json!({}));
632 let metadata = build_service_metadata(service);
633 let sql = format!(
634 "INSERT INTO {table} (project_id, service_name, target, branch, port, root_directory, url, healthcheck_path, restart_policy, start_wrapper, systemd_service_name, commands, environment, metadata, updated_at) \
635 VALUES ({project_id}::uuid, {service_name}, {target}, {branch}, {port}, {root_directory}, {url}, {healthcheck_path}, {restart_policy}, {start_wrapper}, {systemd_service_name}, {commands}, {environment}, {metadata}, now()) \
636 ON CONFLICT (project_id, service_name) DO UPDATE SET \
637 target = EXCLUDED.target, \
638 branch = EXCLUDED.branch, \
639 port = EXCLUDED.port, \
640 root_directory = EXCLUDED.root_directory, \
641 url = EXCLUDED.url, \
642 healthcheck_path = EXCLUDED.healthcheck_path, \
643 restart_policy = EXCLUDED.restart_policy, \
644 start_wrapper = EXCLUDED.start_wrapper, \
645 systemd_service_name = EXCLUDED.systemd_service_name, \
646 commands = EXCLUDED.commands, \
647 environment = EXCLUDED.environment, \
648 metadata = EXCLUDED.metadata, \
649 updated_at = now()",
650 table = table,
651 project_id = sql_literal(&Value::String(project_id.to_string())),
652 service_name = sql_literal(&Value::String(service.name.clone())),
653 target = sql_literal(&Value::String(service.target.clone())),
654 branch = sql_literal(&Value::String(service.branch.clone())),
655 port = service.port,
656 root_directory = optional_text_literal(service.root_directory.as_deref()),
657 url = optional_text_literal(service.url.as_deref()),
658 healthcheck_path = optional_text_literal(service.healthcheck_path.as_deref()),
659 restart_policy = optional_text_literal(service.restart_policy.as_deref()),
660 start_wrapper = optional_text_literal(service.start_wrapper.as_deref()),
661 systemd_service_name = optional_text_literal(service.systemd_service_name.as_deref()),
662 commands = sql_literal(&commands),
663 environment = sql_literal(&environment),
664 metadata = sql_literal(&metadata),
665 );
666
667 let _ = execute_sql(client, &sql).await?;
668 Ok(())
669}
670
671fn build_project_metadata(config: &XbpConfig) -> Value {
672 json!({
673 "services_count": config.services.as_ref().map(|services| services.len()).unwrap_or(0),
674 "monitor_url": config.monitor_url,
675 "kafka_topic": config.kafka_topic,
676 "systemd_service_name": config.systemd_service_name,
677 })
678}
679
680fn build_service_metadata(service: &ServiceConfig) -> Value {
681 json!({
682 "force_run_from_root": service.force_run_from_root,
683 "restart_policy_max_failure_count": service.restart_policy_max_failure_count,
684 })
685}
686
687fn resolve_connection_pair(database: Option<&DatabaseConfig>) -> Option<(String, String)> {
688 let block_url = database
689 .and_then(|db| db.url_env.as_deref())
690 .and_then(read_env_nonempty);
691 let block_key = database
692 .and_then(|db| db.key_env.as_deref())
693 .and_then(read_env_nonempty);
694
695 if let (Some(url), Some(key)) = (block_url, block_key) {
696 return Some((url, key));
697 }
698
699 let xbp_url = read_env_nonempty("XBP_ATHENA_URL");
700 let xbp_key = read_env_nonempty("XBP_ATHENA_KEY");
701 if let (Some(url), Some(key)) = (xbp_url, xbp_key) {
702 return Some((url, key));
703 }
704
705 let supabase_url = read_env_nonempty("SUPABASE_URL");
706 let supabase_key = read_env_nonempty("SUPABASE_KEY");
707 if let (Some(url), Some(key)) = (supabase_url, supabase_key) {
708 return Some((url, key));
709 }
710
711 let xlx_supabase_url = read_env_nonempty("XLX_SUPABASE_URL");
712 let xlx_supabase_key = read_env_nonempty("XLX_SUPABASE_ANON_KEY");
713 if let (Some(url), Some(key)) = (xlx_supabase_url, xlx_supabase_key) {
714 return Some((url, key));
715 }
716
717 None
718}
719
720fn read_env_nonempty(name: &str) -> Option<String> {
721 match env::var(name) {
722 Ok(value) if !value.trim().is_empty() => Some(value),
723 _ => None,
724 }
725}
726
727fn value_or_default(primary: Option<&str>, secondary: Option<&str>, default: &str) -> String {
728 if let Some(value) = primary {
729 let trimmed = value.trim();
730 if !trimmed.is_empty() {
731 return trimmed.to_string();
732 }
733 }
734 if let Some(value) = secondary {
735 let trimmed = value.trim();
736 if !trimmed.is_empty() {
737 return trimmed.to_string();
738 }
739 }
740 default.to_string()
741}
742
743async fn execute_sql(client: &AthenaClient, sql: &str) -> Result<QueryResult, String> {
744 client
745 .execute_sql(sql)
746 .await
747 .map_err(|err| format!("athena sql execution failed: {err}"))
748}
749
750fn query_first_column_as_string(result: &QueryResult, key: &str) -> Option<String> {
751 result.rows.first().and_then(|row| {
752 row.get(key).and_then(|value| {
753 value
754 .as_str()
755 .map(|value| value.to_string())
756 .or_else(|| value.as_i64().map(|value| value.to_string()))
757 })
758 })
759}
760
761fn load_current_project_context() -> Option<ProjectContext> {
762 let current_dir = env::current_dir().ok()?;
763 let found = find_xbp_config_upwards(¤t_dir)?;
764 let content = std::fs::read_to_string(&found.config_path).ok()?;
765 let (config, _) = parse_config_with_auto_heal::<XbpConfig>(&content, found.kind).ok()?;
766
767 Some(ProjectContext {
768 project_root: found.project_root,
769 config,
770 config_kind: found.kind.to_string(),
771 })
772}
773
774fn log_level_label(level: &LogLevel) -> &'static str {
775 match level {
776 LogLevel::Info => "INFO",
777 LogLevel::Warning => "WARN",
778 LogLevel::Error => "ERROR",
779 LogLevel::Debug => "DEBUG",
780 LogLevel::Success => "SUCCESS",
781 }
782}
783
784fn to_rfc3339(ts: DateTime<Local>) -> String {
785 ts.with_timezone(&Utc).to_rfc3339()
786}
787
788fn sanitize_identifier(value: &str) -> Option<String> {
789 let trimmed = value.trim();
790 if trimmed.is_empty() {
791 return None;
792 }
793 let mut chars = trimmed.chars();
794 let first = chars.next()?;
795 if !(first.is_ascii_alphabetic() || first == '_') {
796 return None;
797 }
798 if chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
799 Some(trimmed.to_string())
800 } else {
801 None
802 }
803}
804
805fn qualified_table(schema: &str, table: &str) -> String {
806 let schema = sanitize_identifier(schema).unwrap_or_else(|| DEFAULT_SCHEMA.to_string());
807 format!("{}.{}", schema, table)
808}
809
810pub fn sql_literal(value: &Value) -> String {
811 match value {
812 Value::Null => "NULL".to_string(),
813 Value::Bool(boolean) => boolean.to_string(),
814 Value::Number(number) => number.to_string(),
815 Value::String(text) => format!("'{}'", text.replace('\'', "''")),
816 Value::Array(_) | Value::Object(_) => {
817 format!("'{}'::jsonb", value.to_string().replace('\'', "''"))
818 }
819 }
820}
821
822fn optional_text_literal(value: Option<&str>) -> String {
823 match value {
824 Some(value) if !value.trim().is_empty() => sql_literal(&Value::String(value.to_string())),
825 _ => "NULL".to_string(),
826 }
827}
828
829fn optional_u64_literal(value: Option<u64>) -> String {
830 match value {
831 Some(value) => value.to_string(),
832 None => "NULL".to_string(),
833 }
834}
835
836fn optional_uuid_literal(value: Option<&str>) -> String {
837 match value {
838 Some(value) if !value.trim().is_empty() => {
839 format!("{}::uuid", sql_literal(&Value::String(value.to_string())))
840 }
841 _ => "NULL".to_string(),
842 }
843}
844
845pub async fn with_fail_open<T, F>(operation: &str, fut: F) -> Option<T>
846where
847 F: Future<Output = Result<T, String>>,
848{
849 match fut.await {
850 Ok(result) => Some(result),
851 Err(error) if error == DB_NOT_CONFIGURED => None,
852 Err(error) => {
853 warn!("{} (fail-open): {}", operation, error);
854 None
855 }
856 }
857}
858#[cfg(test)]
859mod tests {
860 use super::{
861 build_project_metadata, build_service_metadata, extract_cron_restart_expression,
862 resolve_runtime_config, sql_literal, with_fail_open,
863 };
864 use crate::strategies::{DatabaseConfig, ServiceConfig, XbpConfig};
865 use serde_json::json;
866 use std::collections::HashMap;
867 use std::sync::Mutex;
868
869 static ENV_TEST_MUTEX: Mutex<()> = Mutex::new(());
870
871 fn base_config() -> XbpConfig {
872 XbpConfig {
873 project_name: "demo".to_string(),
874 version: "0.1.0".to_string(),
875 port: 3000,
876 build_dir: "/tmp/demo".to_string(),
877 app_type: None,
878 build_command: None,
879 start_command: None,
880 install_command: None,
881 environment: None,
882 services: None,
883 systemd_service_name: None,
884 systemd: None,
885 kafka_brokers: None,
886 kafka_topic: None,
887 kafka_public_url: None,
888 log_files: None,
889 monitor_url: None,
890 monitor_method: None,
891 monitor_expected_code: None,
892 monitor_interval: None,
893 target: None,
894 branch: None,
895 crate_name: None,
896 npm_script: None,
897 port_storybook: None,
898 url: None,
899 url_storybook: None,
900 database: None,
901 linear: None,
902 github: None,
903 publish: None,
904 }
905 }
906
907 fn clear_env() {
908 for key in [
909 "BLOCK_URL",
910 "BLOCK_KEY",
911 "XBP_ATHENA_BACKEND",
912 "XBP_ATHENA_URL",
913 "XBP_ATHENA_KEY",
914 "SUPABASE_URL",
915 "SUPABASE_KEY",
916 "XLX_SUPABASE_URL",
917 "XLX_SUPABASE_ANON_KEY",
918 ] {
919 std::env::remove_var(key);
920 }
921 }
922
923 #[test]
924 fn resolve_runtime_config_prefers_database_block_env_binding() {
925 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
926 clear_env();
927 std::env::set_var("BLOCK_URL", "https://block.example");
928 std::env::set_var("BLOCK_KEY", "block-key");
929 std::env::set_var("XBP_ATHENA_URL", "https://xbp.example");
930 std::env::set_var("XBP_ATHENA_KEY", "xbp-key");
931
932 let mut config = base_config();
933 config.database = Some(DatabaseConfig {
934 enabled: Some(true),
935 backend: Some("postgres".to_string()),
936 url_env: Some("BLOCK_URL".to_string()),
937 key_env: Some("BLOCK_KEY".to_string()),
938 schema: Some("custom_schema".to_string()),
939 });
940
941 let runtime = resolve_runtime_config(Some(&config)).expect("runtime");
942 assert_eq!(runtime.backend, "postgres");
943 assert_eq!(runtime.url, "https://block.example");
944 assert_eq!(runtime.key, "block-key");
945 assert_eq!(runtime.schema, "custom_schema");
946 }
947
948 #[test]
949 fn resolve_runtime_config_uses_xbp_env_before_supabase_fallback() {
950 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
951 clear_env();
952 std::env::set_var("XBP_ATHENA_BACKEND", "supabase");
953 std::env::set_var("XBP_ATHENA_URL", "https://xbp.env");
954 std::env::set_var("XBP_ATHENA_KEY", "xbp-env-key");
955 std::env::set_var("SUPABASE_URL", "https://supabase.env");
956 std::env::set_var("SUPABASE_KEY", "supabase-key");
957
958 let runtime = resolve_runtime_config(None).expect("runtime");
959 assert_eq!(runtime.url, "https://xbp.env");
960 assert_eq!(runtime.key, "xbp-env-key");
961 }
962
963 #[test]
964 fn resolve_runtime_config_falls_back_to_supabase_then_xlx_supabase() {
965 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
966 clear_env();
967 std::env::set_var("SUPABASE_URL", "https://supabase.env");
968 std::env::set_var("SUPABASE_KEY", "supabase-key");
969 let runtime = resolve_runtime_config(None).expect("runtime");
970 assert_eq!(runtime.url, "https://supabase.env");
971 assert_eq!(runtime.key, "supabase-key");
972
973 clear_env();
974 std::env::set_var("XLX_SUPABASE_URL", "https://xlx-supabase.env");
975 std::env::set_var("XLX_SUPABASE_ANON_KEY", "xlx-key");
976 let runtime = resolve_runtime_config(None).expect("runtime");
977 assert_eq!(runtime.url, "https://xlx-supabase.env");
978 assert_eq!(runtime.key, "xlx-key");
979 }
980
981 #[test]
982 fn resolve_runtime_config_respects_database_disable_switch() {
983 let _guard = ENV_TEST_MUTEX.lock().expect("env lock");
984 clear_env();
985 std::env::set_var("SUPABASE_URL", "https://supabase.env");
986 std::env::set_var("SUPABASE_KEY", "supabase-key");
987
988 let mut config = base_config();
989 config.database = Some(DatabaseConfig {
990 enabled: Some(false),
991 backend: None,
992 url_env: None,
993 key_env: None,
994 schema: None,
995 });
996
997 assert!(resolve_runtime_config(Some(&config)).is_none());
998 }
999
1000 #[test]
1001 fn sql_literal_escapes_quotes_and_json_payloads() {
1002 assert_eq!(sql_literal(&json!("O'Hara")), "'O''Hara'");
1003 assert_eq!(sql_literal(&json!(true)), "true");
1004 assert_eq!(sql_literal(&json!(12.5)), "12.5");
1005 assert_eq!(
1006 sql_literal(&json!({"nested":"quote's"})),
1007 "'{\"nested\":\"quote''s\"}'::jsonb"
1008 );
1009 }
1010
1011 #[test]
1012 fn payload_mappers_produce_expected_shapes() {
1013 let mut config = base_config();
1014 config.monitor_url = Some("https://monitor.example".to_string());
1015 config.kafka_topic = Some("xbp.logs".to_string());
1016 config.systemd_service_name = Some("xbp-api".to_string());
1017 config.services = Some(vec![ServiceConfig {
1018 name: "api".to_string(),
1019 target: "rust".to_string(),
1020 branch: "main".to_string(),
1021 port: 8080,
1022 root_directory: Some("services/api".to_string()),
1023 environment: Some(HashMap::from([(
1024 "RUST_LOG".to_string(),
1025 "info".to_string(),
1026 )])),
1027 url: Some("https://api.example.com".to_string()),
1028 healthcheck_path: Some("/health".to_string()),
1029 restart_policy: Some("always".to_string()),
1030 restart_policy_max_failure_count: Some(5),
1031 start_wrapper: Some("pm2".to_string()),
1032 commands: None,
1033 force_run_from_root: Some(false),
1034 systemd_service_name: Some("xbp-api".to_string()),
1035 systemd: None,
1036 }]);
1037
1038 let project_meta = build_project_metadata(&config);
1039 assert_eq!(project_meta["services_count"], 1);
1040 assert_eq!(project_meta["kafka_topic"], "xbp.logs");
1041
1042 let service_meta =
1043 build_service_metadata(config.services.as_ref().unwrap().first().unwrap());
1044 assert_eq!(service_meta["force_run_from_root"], false);
1045 assert_eq!(service_meta["restart_policy_max_failure_count"], 5);
1046 }
1047
1048 #[tokio::test]
1049 async fn fail_open_wrapper_returns_none_on_error_and_value_on_success() {
1050 let success = with_fail_open("test-success", async { Ok::<_, String>(42) }).await;
1051 assert_eq!(success, Some(42));
1052
1053 let failed = with_fail_open::<i32, _>("test-failure", async {
1054 Err::<i32, _>("forced failure".to_string())
1055 })
1056 .await;
1057 assert_eq!(failed, None);
1058 }
1059
1060 #[test]
1061 fn cron_restart_parser_handles_split_and_equals_forms() {
1062 let args = vec![
1063 "npm".to_string(),
1064 "start".to_string(),
1065 "--cron-restart".to_string(),
1066 "0 */6 * * *".to_string(),
1067 ];
1068 assert_eq!(
1069 extract_cron_restart_expression(&args),
1070 Some("0 */6 * * *".to_string())
1071 );
1072
1073 let args = vec![
1074 "npm".to_string(),
1075 "start".to_string(),
1076 "--cron-restart=*/5 * * * *".to_string(),
1077 ];
1078 assert_eq!(
1079 extract_cron_restart_expression(&args),
1080 Some("*/5 * * * *".to_string())
1081 );
1082 }
1083}