Skip to main content

assay_engine/
engine_api.rs

1//! Engine-core HTTP admin API.
2//!
3//! Mounted at `/api/v1/engine/core/*` by the engine binary so the engine
4//! console (`/engine/console`) has structured JSON to render.
5//!
6//! Endpoints (all return JSON):
7//!
8//! - `GET    /api/v1/engine/core/info`              public, no auth
9//! - `GET    /api/v1/engine/core/health`            public, no auth
10//! - `GET    /api/v1/engine/core/active-modules`    public, no auth
11//! - `GET    /api/v1/engine/core/modules`           admin
12//! - `POST   /api/v1/engine/core/modules/{name}/toggle`  admin
13//! - `GET    /api/v1/engine/core/instances`         admin
14//! - `GET    /api/v1/engine/core/audit`             admin
15//! - `GET    /api/v1/engine/core/config`            admin (secrets redacted)
16//!
17//! Admin-gated endpoints reuse the same `Authorization: Bearer ...`
18//! check the auth admin router uses (compared in constant-ish time
19//! against `EngineState.admin_api_keys`). When `admin_api_keys` is
20//! empty every admin endpoint returns 401 — locking the surface
21//! entirely. `info`, `health`, and `active-modules` are always public
22//! so dashboards can render the header bar + cross-nav before an
23//! operator has supplied credentials.
24//!
25//! Backend-agnostic: handlers branch on the `BackendConfig` variant
26//! to call into either `PgEngineSchema` or `SqliteEngineSchema`. SQLite
27//! engines are single-instance so the `instances` endpoint typically
28//! returns one row; the shape stays identical so the UI doesn't care.
29
30use axum::Router;
31use axum::extract::{Path, Query, State};
32use axum::http::{HeaderMap, StatusCode, header};
33use axum::response::{IntoResponse, Json, Response};
34use axum::routing::{get, post};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37
38use assay_workflow::WorkflowStore;
39
40use crate::config::{BackendConfig, EngineConfig};
41use crate::state::EngineState;
42
43/// Build the engine-core admin router. Bound to `EngineState<S>` so
44/// handlers can pluck the workflow store, the parsed config, and the
45/// admin keys list off the parent state.
46pub fn router<S>() -> Router<EngineState<S>>
47where
48    S: WorkflowStore + Clone + 'static,
49{
50    Router::new()
51        .route("/api/v1/engine/core/info", get(engine_info::<S>))
52        .route("/api/v1/engine/core/health", get(engine_health::<S>))
53        .route(
54            "/api/v1/engine/core/active-modules",
55            get(active_modules::<S>),
56        )
57        .route("/api/v1/engine/core/modules", get(list_modules::<S>))
58        .route(
59            "/api/v1/engine/core/modules/{name}/toggle",
60            post(toggle_module::<S>),
61        )
62        .route("/api/v1/engine/core/instances", get(list_instances::<S>))
63        .route("/api/v1/engine/core/audit", get(list_audit::<S>))
64        .route("/api/v1/engine/core/config", get(get_config::<S>))
65}
66
67// =====================================================================
68//   /api/v1/engine/core/health
69// =====================================================================
70
71/// Engine-core health probe. Returns the same envelope previously
72/// served by the legacy `/healthz` endpoint (status + version +
73/// instance_id + active modules + leader flag) so existing operator
74/// scripts that scrape the JSON keep working after the URL move.
75async fn engine_health<S: WorkflowStore + Clone + 'static>(
76    State(s): State<EngineState<S>>,
77) -> Json<Value> {
78    Json(json!({
79        "status": "ok",
80        "engine_version": s.engine_version,
81        "instance_id": s.instance_id.to_string(),
82        "modules": &*s.modules,
83        // SQLite is single-instance and PG uses session-scoped
84        // pg_try_advisory_lock; both make leadership a runtime
85        // property. Surface it as `leader = true` for SQLite (no
86        // election) so dashboards keep the field stable.
87        "leader": true,
88    }))
89}
90
91// =====================================================================
92//   /api/v1/engine/core/active-modules
93// =====================================================================
94
95/// Active-modules listing — public, no auth. Read by the cross-console
96/// nav strip JS so disabled modules' pills don't render. Replaces the
97/// legacy top-level `/api/v1/modules` endpoint.
98async fn active_modules<S: WorkflowStore + Clone + 'static>(
99    State(s): State<EngineState<S>>,
100) -> Json<Value> {
101    Json(json!({
102        "modules": &*s.modules,
103    }))
104}
105
106// =====================================================================
107//   /api/v1/engine/core/info
108// =====================================================================
109
110#[derive(Debug, Clone, Serialize)]
111pub struct EngineInfo {
112    pub version: &'static str,
113    pub instance_id: String,
114    pub started_at: f64,
115    pub leader: bool,
116    pub modules: Vec<String>,
117    pub backend_kind: &'static str,
118    /// SQLite data directory when the backend is SQLite. `None` for PG.
119    pub backend_data_dir: Option<String>,
120    /// Postgres connection URL with the userinfo + password redacted.
121    /// `None` for SQLite.
122    pub backend_url_redacted: Option<String>,
123    pub bind_addr: String,
124    pub public_url: String,
125}
126
127async fn engine_info<S: WorkflowStore + Clone + 'static>(
128    State(s): State<EngineState<S>>,
129) -> Json<EngineInfo> {
130    let cfg: &EngineConfig = &s.engine_config;
131    let (kind, data_dir, url_redacted) = match &cfg.backend {
132        BackendConfig::Sqlite { data_dir, .. } => ("sqlite", Some(data_dir.clone()), None),
133        BackendConfig::Postgres { url } => ("postgres", None, Some(redact_pg_url(url))),
134    };
135    Json(EngineInfo {
136        version: s.engine_version,
137        instance_id: s.instance_id.to_string(),
138        started_at: s.started_at,
139        leader: true,
140        modules: (*s.modules).clone(),
141        backend_kind: kind,
142        backend_data_dir: data_dir,
143        backend_url_redacted: url_redacted,
144        bind_addr: cfg.server.bind_addr.clone(),
145        public_url: cfg.server.public_url.clone(),
146    })
147}
148
149// =====================================================================
150//   /api/v1/engine/core/modules
151// =====================================================================
152
153#[derive(Debug, Clone, Serialize)]
154pub struct ModuleEntry {
155    pub name: String,
156    pub enabled: bool,
157    pub enabled_at: Option<f64>,
158    pub enabled_by: Option<String>,
159    pub version: Option<String>,
160    pub config: Value,
161}
162
163#[derive(Debug, Clone, Serialize)]
164pub struct ListModulesResponse {
165    pub items: Vec<ModuleEntry>,
166}
167
168async fn list_modules<S: WorkflowStore + Clone + 'static>(
169    State(s): State<EngineState<S>>,
170    headers: HeaderMap,
171) -> Response {
172    if let Err(r) = require_admin(&headers, &s).await {
173        return *r;
174    }
175    let items = match list_module_records(&s.engine_config).await {
176        Ok(v) => v,
177        Err(e) => return server_error(&format!("list modules: {e}")),
178    };
179    let entries = items
180        .into_iter()
181        .map(|m| ModuleEntry {
182            name: m.name,
183            enabled: m.enabled,
184            enabled_at: m.enabled_at,
185            enabled_by: m.enabled_by,
186            version: m.version,
187            config: m.config,
188        })
189        .collect();
190    (StatusCode::OK, Json(ListModulesResponse { items: entries })).into_response()
191}
192
193#[derive(Debug, Clone, Deserialize)]
194pub struct ToggleBody {
195    /// Optional explicit enabled flag. When omitted the handler flips
196    /// the current value.
197    pub enabled: Option<bool>,
198}
199
200#[derive(Debug, Clone, Serialize)]
201pub struct ToggleResponse {
202    pub enabled: bool,
203    pub restart_required: bool,
204    pub message: String,
205}
206
207async fn toggle_module<S: WorkflowStore + Clone + 'static>(
208    State(s): State<EngineState<S>>,
209    headers: HeaderMap,
210    Path(name): Path<String>,
211    body: Option<Json<ToggleBody>>,
212) -> Response {
213    if let Err(r) = require_admin(&headers, &s).await {
214        return *r;
215    }
216    // Look up the current module row so we know what to flip to.
217    let modules = match list_module_records(&s.engine_config).await {
218        Ok(v) => v,
219        Err(e) => return server_error(&format!("list modules: {e}")),
220    };
221    let Some(current) = modules.iter().find(|m| m.name == name) else {
222        return (
223            StatusCode::NOT_FOUND,
224            Json(json!({"error": "unknown module name"})),
225        )
226            .into_response();
227    };
228    let target = body
229        .and_then(|Json(b)| b.enabled)
230        .unwrap_or(!current.enabled);
231    let actor = bearer_token(&headers).map(short_actor);
232    if let Err(e) =
233        set_module_enabled(&s.engine_config, &name, target, actor.as_deref()).await
234    {
235        return server_error(&format!("set enabled: {e}"));
236    }
237    if let Err(e) = audit_module_toggle(&s.engine_config, &name, target, actor.as_deref()).await
238    {
239        // Failure to audit doesn't undo the flip; surface as a warning.
240        tracing::warn!(?e, "engine.audit insert failed for module toggle");
241    }
242    let msg = if target {
243        format!("module {name} marked enabled — restart engine to load")
244    } else {
245        format!("module {name} marked disabled — restart engine to unload")
246    };
247    (
248        StatusCode::OK,
249        Json(ToggleResponse {
250            enabled: target,
251            restart_required: true,
252            message: msg,
253        }),
254    )
255        .into_response()
256}
257
258// =====================================================================
259//   /api/v1/engine/core/instances
260// =====================================================================
261
262#[derive(Debug, Clone, Serialize)]
263pub struct InstanceEntry {
264    pub id: String,
265    pub started_at: f64,
266    pub last_heartbeat: f64,
267    pub namespaces: Vec<String>,
268    pub version: Option<String>,
269}
270
271#[derive(Debug, Clone, Serialize)]
272pub struct ListInstancesResponse {
273    pub items: Vec<InstanceEntry>,
274}
275
276#[derive(Debug, Clone, Default, Deserialize)]
277pub struct PageQuery {
278    #[serde(default)]
279    pub limit: Option<i64>,
280    #[serde(default)]
281    pub offset: Option<i64>,
282}
283
284async fn list_instances<S: WorkflowStore + Clone + 'static>(
285    State(s): State<EngineState<S>>,
286    headers: HeaderMap,
287    Query(_q): Query<PageQuery>,
288) -> Response {
289    if let Err(r) = require_admin(&headers, &s).await {
290        return *r;
291    }
292    let items = match list_instance_records(&s.engine_config).await {
293        Ok(v) => v,
294        Err(e) => return server_error(&format!("list instances: {e}")),
295    };
296    let entries = items
297        .into_iter()
298        .map(|i| InstanceEntry {
299            id: i.id,
300            started_at: i.started_at,
301            last_heartbeat: i.last_heartbeat,
302            namespaces: i.namespaces,
303            version: i.version,
304        })
305        .collect();
306    (
307        StatusCode::OK,
308        Json(ListInstancesResponse { items: entries }),
309    )
310        .into_response()
311}
312
313// =====================================================================
314//   /api/v1/engine/core/audit
315// =====================================================================
316
317#[derive(Debug, Clone, Default, Deserialize)]
318pub struct AuditQuery {
319    #[serde(default)]
320    pub limit: Option<i64>,
321    #[serde(default)]
322    pub offset: Option<i64>,
323    #[serde(default)]
324    pub actor: Option<String>,
325    #[serde(default)]
326    pub action: Option<String>,
327    #[serde(default)]
328    pub since: Option<f64>,
329    #[serde(default)]
330    pub until: Option<f64>,
331}
332
333#[derive(Debug, Clone, Serialize)]
334pub struct AuditEntry {
335    pub id: String,
336    pub ts: f64,
337    pub actor: Option<String>,
338    pub action: String,
339    pub details: Value,
340}
341
342#[derive(Debug, Clone, Serialize)]
343pub struct ListAuditResponse {
344    pub items: Vec<AuditEntry>,
345    pub total: i64,
346    pub limit: i64,
347    pub offset: i64,
348}
349
350async fn list_audit<S: WorkflowStore + Clone + 'static>(
351    State(s): State<EngineState<S>>,
352    headers: HeaderMap,
353    Query(q): Query<AuditQuery>,
354) -> Response {
355    if let Err(r) = require_admin(&headers, &s).await {
356        return *r;
357    }
358    let limit = q.limit.unwrap_or(50).clamp(1, 500);
359    let offset = q.offset.unwrap_or(0).max(0);
360    let (rows, total) = match list_audit_records(
361        &s.engine_config,
362        limit,
363        offset,
364        q.actor.as_deref(),
365        q.action.as_deref(),
366        q.since,
367        q.until,
368    )
369    .await
370    {
371        Ok(v) => v,
372        Err(e) => return server_error(&format!("list audit: {e}")),
373    };
374    let items = rows
375        .into_iter()
376        .map(|a| AuditEntry {
377            id: a.id,
378            ts: a.ts,
379            actor: a.actor,
380            action: a.action,
381            details: a.details,
382        })
383        .collect();
384    (
385        StatusCode::OK,
386        Json(ListAuditResponse {
387            items,
388            total,
389            limit,
390            offset,
391        }),
392    )
393        .into_response()
394}
395
396// =====================================================================
397//   /api/v1/engine/core/config
398// =====================================================================
399
400async fn get_config<S: WorkflowStore + Clone + 'static>(
401    State(s): State<EngineState<S>>,
402    headers: HeaderMap,
403) -> Response {
404    if let Err(r) = require_admin(&headers, &s).await {
405        return *r;
406    }
407    let mut value = match serde_json::to_value(&*s.engine_config) {
408        Ok(v) => v,
409        Err(e) => return server_error(&format!("serialise config: {e}")),
410    };
411    redact_secrets(&mut value);
412    (StatusCode::OK, Json(value)).into_response()
413}
414
415/// Replace secrets in the serialised config with `[REDACTED]`. Targets
416/// `admin_api_keys` and any key containing `password`, `secret`,
417/// `token`, or `key` (case-insensitive), excluding the structural
418/// `kid` / `keys`. Conservative on purpose — the engine console
419/// renders this for operators; over-redaction beats credential leaks.
420fn redact_secrets(v: &mut Value) {
421    let placeholder = Value::String("[REDACTED]".to_string());
422    match v {
423        Value::Object(map) => {
424            // Snapshot the keys upfront — modifying values while iterating
425            // owned-mut keys is fine, but we want a clear walk.
426            let keys: Vec<String> = map.keys().cloned().collect();
427            for k in keys {
428                let lk = k.to_lowercase();
429                if k == "admin_api_keys" {
430                    if let Some(Value::Array(arr)) = map.get_mut(&k) {
431                        for entry in arr {
432                            *entry = placeholder.clone();
433                        }
434                    }
435                    continue;
436                }
437                if (lk.contains("password")
438                    || lk.contains("secret")
439                    || lk.contains("api_key")
440                    || lk.contains("api-key"))
441                    && let Some(slot) = map.get_mut(&k)
442                {
443                    match slot {
444                        Value::String(_) => *slot = placeholder.clone(),
445                        Value::Array(arr) => {
446                            for entry in arr {
447                                if let Value::String(_) = entry {
448                                    *entry = placeholder.clone();
449                                } else {
450                                    redact_secrets(entry);
451                                }
452                            }
453                        }
454                        other => redact_secrets(other),
455                    }
456                    continue;
457                }
458                if let Some(slot) = map.get_mut(&k) {
459                    redact_secrets(slot);
460                }
461            }
462        }
463        Value::Array(arr) => {
464            for entry in arr {
465                redact_secrets(entry);
466            }
467        }
468        _ => {}
469    }
470}
471
472// =====================================================================
473//   helpers — admin auth
474// =====================================================================
475
476/// Engine-core admin gate.
477///
478/// Auth is mandatory so the engine always has an
479/// [`AuthCtx`]. Dispatch to [`assay_auth::gate::require_role_for`]
480/// for `engine#core#admin`. Admin api-key callers bypass as
481/// break-glass; session/JWT callers go through Zanzibar.
482async fn require_admin<S: WorkflowStore + Clone + 'static>(
483    headers: &HeaderMap,
484    state: &EngineState<S>,
485) -> Result<(), Box<Response>> {
486    let auth = state
487        .auth
488        .as_ref()
489        .ok_or_else(|| svc_unavailable_box("auth not configured on this engine instance"))?;
490    let keys = crate::state::AdminApiKeys(std::sync::Arc::clone(&state.admin_api_keys));
491    assay_auth::gate::require_role_for(headers, auth, &keys, "engine", "core", "admin")
492        .await
493        .map(|_| ())
494}
495
496fn svc_unavailable_box(msg: &str) -> Box<Response> {
497    Box::new(
498        (
499            StatusCode::SERVICE_UNAVAILABLE,
500            Json(json!({"error": "service_unavailable", "error_description": msg})),
501        )
502            .into_response(),
503    )
504}
505
506fn bearer_token(headers: &HeaderMap) -> Option<String> {
507    let raw = headers
508        .get(header::AUTHORIZATION)
509        .and_then(|v| v.to_str().ok())?;
510    raw.strip_prefix("Bearer ")
511        .or_else(|| raw.strip_prefix("bearer "))
512        .map(|s| s.trim().to_string())
513}
514
515/// Reduce an admin token to a short, non-reversible identifier so the
516/// audit log doesn't store the token itself. Truncated last 6 chars
517/// labelled `admin:****abcdef` — same shape used elsewhere for keyed
518/// admin actions (see auth admin.rs `audit` calls).
519fn short_actor(token: String) -> String {
520    let t = token.trim();
521    if t.len() <= 6 {
522        return format!("admin:****{t}");
523    }
524    let tail = &t[t.len() - 6..];
525    format!("admin:****{tail}")
526}
527
528fn server_error(msg: &str) -> Response {
529    (
530        StatusCode::INTERNAL_SERVER_ERROR,
531        Json(json!({"error": "server_error", "error_description": msg})),
532    )
533        .into_response()
534}
535
536// =====================================================================
537//   helpers — backend-routed schema reads
538// =====================================================================
539
540/// Open a fresh schema handle on demand. Cheap — the underlying pool
541/// is owned by the workflow context that already initialised at boot.
542/// We re-resolve the connection string from the parsed config so this
543/// helper is callable from handlers that only carry the cloned state.
544async fn list_module_records(
545    cfg: &EngineConfig,
546) -> anyhow::Result<Vec<assay_domain::engine::ModuleRecord>> {
547    match &cfg.backend {
548        #[cfg(feature = "backend-postgres")]
549        BackendConfig::Postgres { url } => {
550            let pool = sqlx::PgPool::connect(url)
551                .await
552                .map_err(|e| anyhow::anyhow!("connect pg: {e}"))?;
553            let schema = assay_domain::engine::PgEngineSchema::new(pool);
554            let rows = schema
555                .list_modules()
556                .await
557                .map_err(|e| anyhow::anyhow!("list modules (pg): {e}"))?;
558            Ok(rows)
559        }
560        #[cfg(feature = "backend-sqlite")]
561        BackendConfig::Sqlite { .. } => {
562            let pool = open_sqlite_engine_pool(cfg).await?;
563            let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
564            let rows = schema
565                .list_modules()
566                .await
567                .map_err(|e| anyhow::anyhow!("list modules (sqlite): {e}"))?;
568            Ok(rows)
569        }
570        #[allow(unreachable_patterns)]
571        _ => anyhow::bail!("backend not enabled at compile time"),
572    }
573}
574
575async fn set_module_enabled(
576    cfg: &EngineConfig,
577    name: &str,
578    enabled: bool,
579    actor: Option<&str>,
580) -> anyhow::Result<bool> {
581    match &cfg.backend {
582        #[cfg(feature = "backend-postgres")]
583        BackendConfig::Postgres { url } => {
584            let pool = sqlx::PgPool::connect(url).await?;
585            let schema = assay_domain::engine::PgEngineSchema::new(pool);
586            schema.set_module_enabled(name, enabled, actor).await
587        }
588        #[cfg(feature = "backend-sqlite")]
589        BackendConfig::Sqlite { .. } => {
590            let pool = open_sqlite_engine_pool(cfg).await?;
591            let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
592            schema.set_module_enabled(name, enabled, actor).await
593        }
594        #[allow(unreachable_patterns)]
595        _ => anyhow::bail!("backend not enabled at compile time"),
596    }
597}
598
599async fn audit_module_toggle(
600    cfg: &EngineConfig,
601    name: &str,
602    enabled: bool,
603    actor: Option<&str>,
604) -> anyhow::Result<()> {
605    let details = json!({"module": name, "enabled": enabled});
606    match &cfg.backend {
607        #[cfg(feature = "backend-postgres")]
608        BackendConfig::Postgres { url } => {
609            let pool = sqlx::PgPool::connect(url).await?;
610            let schema = assay_domain::engine::PgEngineSchema::new(pool);
611            schema.audit(actor, "engine.module.toggle", &details).await
612        }
613        #[cfg(feature = "backend-sqlite")]
614        BackendConfig::Sqlite { .. } => {
615            let pool = open_sqlite_engine_pool(cfg).await?;
616            let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
617            schema.audit(actor, "engine.module.toggle", &details).await
618        }
619        #[allow(unreachable_patterns)]
620        _ => anyhow::bail!("backend not enabled at compile time"),
621    }
622}
623
624async fn list_instance_records(
625    cfg: &EngineConfig,
626) -> anyhow::Result<Vec<assay_domain::engine::InstanceRecord>> {
627    match &cfg.backend {
628        #[cfg(feature = "backend-postgres")]
629        BackendConfig::Postgres { url } => {
630            let pool = sqlx::PgPool::connect(url).await?;
631            let schema = assay_domain::engine::PgEngineSchema::new(pool);
632            schema.list_instances().await
633        }
634        #[cfg(feature = "backend-sqlite")]
635        BackendConfig::Sqlite { .. } => {
636            let pool = open_sqlite_engine_pool(cfg).await?;
637            let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
638            schema.list_instances().await
639        }
640        #[allow(unreachable_patterns)]
641        _ => anyhow::bail!("backend not enabled at compile time"),
642    }
643}
644
645async fn list_audit_records(
646    cfg: &EngineConfig,
647    limit: i64,
648    offset: i64,
649    actor: Option<&str>,
650    action: Option<&str>,
651    since: Option<f64>,
652    until: Option<f64>,
653) -> anyhow::Result<(Vec<assay_domain::engine::AuditRecord>, i64)> {
654    match &cfg.backend {
655        #[cfg(feature = "backend-postgres")]
656        BackendConfig::Postgres { url } => {
657            let pool = sqlx::PgPool::connect(url).await?;
658            let schema = assay_domain::engine::PgEngineSchema::new(pool);
659            schema
660                .list_audit(limit, offset, actor, action, since, until)
661                .await
662        }
663        #[cfg(feature = "backend-sqlite")]
664        BackendConfig::Sqlite { .. } => {
665            let pool = open_sqlite_engine_pool(cfg).await?;
666            let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
667            schema
668                .list_audit(limit, offset, actor, action, since, until)
669                .await
670        }
671        #[allow(unreachable_patterns)]
672        _ => anyhow::bail!("backend not enabled at compile time"),
673    }
674}
675
676/// Open a fresh `engine.db`-only sqlite pool, mirroring the
677/// `init.rs::sqlite_boot` ATTACH layout. Used by handlers that need a
678/// schema-qualified `engine.modules` query without sharing the boot
679/// pool's connection. Lighter than re-running boot — skips workflow
680/// + auth ATTACH.
681#[cfg(feature = "backend-sqlite")]
682async fn open_sqlite_engine_pool(cfg: &EngineConfig) -> anyhow::Result<sqlx::SqlitePool> {
683    use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
684    use std::str::FromStr;
685
686    let data_dir = cfg
687        .backend
688        .sqlite_data_dir()
689        .ok_or_else(|| anyhow::anyhow!("sqlite_data_dir missing on non-sqlite backend"))?;
690    let path = format!("file:{data_dir}/engine.db?mode=rw");
691    let opts = SqliteConnectOptions::from_str("sqlite::memory:")?.create_if_missing(true);
692    let pool = SqlitePoolOptions::new()
693        .max_connections(1)
694        .after_connect(move |conn, _meta| {
695            let path = path.clone();
696            Box::pin(async move {
697                use sqlx::Executor;
698                conn.execute(format!("ATTACH DATABASE '{path}' AS engine").as_str())
699                    .await?;
700                Ok(())
701            })
702        })
703        .connect_with(opts)
704        .await
705        .map_err(|e| anyhow::anyhow!("connect engine.db: {e}"))?;
706    Ok(pool)
707}
708
709/// Strip userinfo (user + password) from a Postgres URL for safe
710/// display — keeps host / port / db / params. `postgres://u:p@h:5/db`
711/// → `postgres://[REDACTED]@h:5/db`. We rebuild the URL string by
712/// hand because `url::Url::set_username` percent-encodes `[`/`]`,
713/// which would garble the placeholder; the string surface only ever
714/// flows out to the dashboard so a plain rebuild is safe.
715fn redact_pg_url(raw: &str) -> String {
716    let Ok(u) = url::Url::parse(raw) else {
717        return "[REDACTED]".to_string();
718    };
719    let scheme = u.scheme();
720    let host = u.host_str().unwrap_or("");
721    let port = u.port().map(|p| format!(":{p}")).unwrap_or_default();
722    let path = u.path();
723    let query = match u.query() {
724        Some(q) => format!("?{q}"),
725        None => String::new(),
726    };
727    let userinfo = if !u.username().is_empty() || u.password().is_some() {
728        "[REDACTED]@"
729    } else {
730        ""
731    };
732    format!("{scheme}://{userinfo}{host}{port}{path}{query}")
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    #[test]
740    fn redact_secrets_replaces_admin_api_keys_array() {
741        let mut v = json!({
742            "auth": {
743                "admin_api_keys": ["secret1", "secret2"],
744                "issuer": "https://issuer.example",
745            }
746        });
747        redact_secrets(&mut v);
748        let arr = v["auth"]["admin_api_keys"].as_array().unwrap();
749        assert!(arr.iter().all(|x| x.as_str() == Some("[REDACTED]")));
750        // Non-secret strings stay intact.
751        assert_eq!(v["auth"]["issuer"], json!("https://issuer.example"));
752    }
753
754    #[test]
755    fn redact_secrets_replaces_password_fields() {
756        let mut v = json!({"backend": {"password": "hunter2"}});
757        redact_secrets(&mut v);
758        assert_eq!(v["backend"]["password"], json!("[REDACTED]"));
759    }
760
761    #[test]
762    fn redact_pg_url_strips_userinfo() {
763        let raw = "postgres://alice:hunter2@db.example.com:5432/assay";
764        let red = redact_pg_url(raw);
765        assert!(!red.contains("hunter2"), "password leak: {red}");
766        assert!(red.contains("[REDACTED]"));
767        assert!(red.contains("db.example.com:5432"));
768        assert!(red.contains("/assay"));
769    }
770
771    // Admin-gate behaviour now lives in `assay_auth::gate` —
772    // engine_api's `require_admin` is a one-line wrapper. Tests for
773    // the gate live in that crate's gate.rs.
774
775    #[test]
776    fn short_actor_safely_truncates() {
777        let s = short_actor("abcdef0123456789".to_string());
778        assert_eq!(s, "admin:****456789");
779        // Short token guard.
780        let s = short_actor("xyz".to_string());
781        assert!(s.starts_with("admin:****"));
782    }
783}