1use axum::Router;
31use axum::extract::{Path, Query, State};
32use axum::http::{HeaderMap, StatusCode, header};
33use axum::response::{IntoResponse, Json, Response};
34use axum::routing::{get, post};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37
38use assay_workflow::WorkflowStore;
39
40use crate::config::{BackendConfig, EngineConfig};
41use crate::state::EngineState;
42
43pub fn router<S>() -> Router<EngineState<S>>
47where
48 S: WorkflowStore + Clone + 'static,
49{
50 Router::new()
51 .route("/api/v1/engine/core/info", get(engine_info::<S>))
52 .route("/api/v1/engine/core/health", get(engine_health::<S>))
53 .route(
54 "/api/v1/engine/core/active-modules",
55 get(active_modules::<S>),
56 )
57 .route("/api/v1/engine/core/modules", get(list_modules::<S>))
58 .route(
59 "/api/v1/engine/core/modules/{name}/toggle",
60 post(toggle_module::<S>),
61 )
62 .route("/api/v1/engine/core/instances", get(list_instances::<S>))
63 .route("/api/v1/engine/core/audit", get(list_audit::<S>))
64 .route("/api/v1/engine/core/config", get(get_config::<S>))
65}
66
67async fn engine_health<S: WorkflowStore + Clone + 'static>(
76 State(s): State<EngineState<S>>,
77) -> Json<Value> {
78 Json(json!({
79 "status": "ok",
80 "engine_version": s.engine_version,
81 "instance_id": s.instance_id.to_string(),
82 "modules": &*s.modules,
83 "leader": true,
88 }))
89}
90
91async fn active_modules<S: WorkflowStore + Clone + 'static>(
99 State(s): State<EngineState<S>>,
100) -> Json<Value> {
101 Json(json!({
102 "modules": &*s.modules,
103 }))
104}
105
106#[derive(Debug, Clone, Serialize)]
111pub struct EngineInfo {
112 pub version: &'static str,
113 pub instance_id: String,
114 pub started_at: f64,
115 pub leader: bool,
116 pub modules: Vec<String>,
117 pub backend_kind: &'static str,
118 pub backend_data_dir: Option<String>,
120 pub backend_url_redacted: Option<String>,
123 pub bind_addr: String,
124 pub public_url: String,
125}
126
127async fn engine_info<S: WorkflowStore + Clone + 'static>(
128 State(s): State<EngineState<S>>,
129) -> Json<EngineInfo> {
130 let cfg: &EngineConfig = &s.engine_config;
131 let (kind, data_dir, url_redacted) = match &cfg.backend {
132 BackendConfig::Sqlite { data_dir, .. } => ("sqlite", Some(data_dir.clone()), None),
133 BackendConfig::Postgres { url } => ("postgres", None, Some(redact_pg_url(url))),
134 };
135 Json(EngineInfo {
136 version: s.engine_version,
137 instance_id: s.instance_id.to_string(),
138 started_at: s.started_at,
139 leader: true,
140 modules: (*s.modules).clone(),
141 backend_kind: kind,
142 backend_data_dir: data_dir,
143 backend_url_redacted: url_redacted,
144 bind_addr: cfg.server.bind_addr.clone(),
145 public_url: cfg.server.public_url.clone(),
146 })
147}
148
149#[derive(Debug, Clone, Serialize)]
154pub struct ModuleEntry {
155 pub name: String,
156 pub enabled: bool,
157 pub enabled_at: Option<f64>,
158 pub enabled_by: Option<String>,
159 pub version: Option<String>,
160 pub config: Value,
161}
162
163#[derive(Debug, Clone, Serialize)]
164pub struct ListModulesResponse {
165 pub items: Vec<ModuleEntry>,
166}
167
168async fn list_modules<S: WorkflowStore + Clone + 'static>(
169 State(s): State<EngineState<S>>,
170 headers: HeaderMap,
171) -> Response {
172 if let Err(r) = require_admin(&headers, &s).await {
173 return *r;
174 }
175 let items = match list_module_records(&s.engine_config).await {
176 Ok(v) => v,
177 Err(e) => return server_error(&format!("list modules: {e}")),
178 };
179 let entries = items
180 .into_iter()
181 .map(|m| ModuleEntry {
182 name: m.name,
183 enabled: m.enabled,
184 enabled_at: m.enabled_at,
185 enabled_by: m.enabled_by,
186 version: m.version,
187 config: m.config,
188 })
189 .collect();
190 (StatusCode::OK, Json(ListModulesResponse { items: entries })).into_response()
191}
192
193#[derive(Debug, Clone, Deserialize)]
194pub struct ToggleBody {
195 pub enabled: Option<bool>,
198}
199
200#[derive(Debug, Clone, Serialize)]
201pub struct ToggleResponse {
202 pub enabled: bool,
203 pub restart_required: bool,
204 pub message: String,
205}
206
207async fn toggle_module<S: WorkflowStore + Clone + 'static>(
208 State(s): State<EngineState<S>>,
209 headers: HeaderMap,
210 Path(name): Path<String>,
211 body: Option<Json<ToggleBody>>,
212) -> Response {
213 if let Err(r) = require_admin(&headers, &s).await {
214 return *r;
215 }
216 let modules = match list_module_records(&s.engine_config).await {
218 Ok(v) => v,
219 Err(e) => return server_error(&format!("list modules: {e}")),
220 };
221 let Some(current) = modules.iter().find(|m| m.name == name) else {
222 return (
223 StatusCode::NOT_FOUND,
224 Json(json!({"error": "unknown module name"})),
225 )
226 .into_response();
227 };
228 let target = body
229 .and_then(|Json(b)| b.enabled)
230 .unwrap_or(!current.enabled);
231 let actor = bearer_token(&headers).map(short_actor);
232 if let Err(e) =
233 set_module_enabled(&s.engine_config, &name, target, actor.as_deref()).await
234 {
235 return server_error(&format!("set enabled: {e}"));
236 }
237 if let Err(e) = audit_module_toggle(&s.engine_config, &name, target, actor.as_deref()).await
238 {
239 tracing::warn!(?e, "engine.audit insert failed for module toggle");
241 }
242 let msg = if target {
243 format!("module {name} marked enabled — restart engine to load")
244 } else {
245 format!("module {name} marked disabled — restart engine to unload")
246 };
247 (
248 StatusCode::OK,
249 Json(ToggleResponse {
250 enabled: target,
251 restart_required: true,
252 message: msg,
253 }),
254 )
255 .into_response()
256}
257
258#[derive(Debug, Clone, Serialize)]
263pub struct InstanceEntry {
264 pub id: String,
265 pub started_at: f64,
266 pub last_heartbeat: f64,
267 pub namespaces: Vec<String>,
268 pub version: Option<String>,
269}
270
271#[derive(Debug, Clone, Serialize)]
272pub struct ListInstancesResponse {
273 pub items: Vec<InstanceEntry>,
274}
275
276#[derive(Debug, Clone, Default, Deserialize)]
277pub struct PageQuery {
278 #[serde(default)]
279 pub limit: Option<i64>,
280 #[serde(default)]
281 pub offset: Option<i64>,
282}
283
284async fn list_instances<S: WorkflowStore + Clone + 'static>(
285 State(s): State<EngineState<S>>,
286 headers: HeaderMap,
287 Query(_q): Query<PageQuery>,
288) -> Response {
289 if let Err(r) = require_admin(&headers, &s).await {
290 return *r;
291 }
292 let items = match list_instance_records(&s.engine_config).await {
293 Ok(v) => v,
294 Err(e) => return server_error(&format!("list instances: {e}")),
295 };
296 let entries = items
297 .into_iter()
298 .map(|i| InstanceEntry {
299 id: i.id,
300 started_at: i.started_at,
301 last_heartbeat: i.last_heartbeat,
302 namespaces: i.namespaces,
303 version: i.version,
304 })
305 .collect();
306 (
307 StatusCode::OK,
308 Json(ListInstancesResponse { items: entries }),
309 )
310 .into_response()
311}
312
313#[derive(Debug, Clone, Default, Deserialize)]
318pub struct AuditQuery {
319 #[serde(default)]
320 pub limit: Option<i64>,
321 #[serde(default)]
322 pub offset: Option<i64>,
323 #[serde(default)]
324 pub actor: Option<String>,
325 #[serde(default)]
326 pub action: Option<String>,
327 #[serde(default)]
328 pub since: Option<f64>,
329 #[serde(default)]
330 pub until: Option<f64>,
331}
332
333#[derive(Debug, Clone, Serialize)]
334pub struct AuditEntry {
335 pub id: String,
336 pub ts: f64,
337 pub actor: Option<String>,
338 pub action: String,
339 pub details: Value,
340}
341
342#[derive(Debug, Clone, Serialize)]
343pub struct ListAuditResponse {
344 pub items: Vec<AuditEntry>,
345 pub total: i64,
346 pub limit: i64,
347 pub offset: i64,
348}
349
350async fn list_audit<S: WorkflowStore + Clone + 'static>(
351 State(s): State<EngineState<S>>,
352 headers: HeaderMap,
353 Query(q): Query<AuditQuery>,
354) -> Response {
355 if let Err(r) = require_admin(&headers, &s).await {
356 return *r;
357 }
358 let limit = q.limit.unwrap_or(50).clamp(1, 500);
359 let offset = q.offset.unwrap_or(0).max(0);
360 let (rows, total) = match list_audit_records(
361 &s.engine_config,
362 limit,
363 offset,
364 q.actor.as_deref(),
365 q.action.as_deref(),
366 q.since,
367 q.until,
368 )
369 .await
370 {
371 Ok(v) => v,
372 Err(e) => return server_error(&format!("list audit: {e}")),
373 };
374 let items = rows
375 .into_iter()
376 .map(|a| AuditEntry {
377 id: a.id,
378 ts: a.ts,
379 actor: a.actor,
380 action: a.action,
381 details: a.details,
382 })
383 .collect();
384 (
385 StatusCode::OK,
386 Json(ListAuditResponse {
387 items,
388 total,
389 limit,
390 offset,
391 }),
392 )
393 .into_response()
394}
395
396async fn get_config<S: WorkflowStore + Clone + 'static>(
401 State(s): State<EngineState<S>>,
402 headers: HeaderMap,
403) -> Response {
404 if let Err(r) = require_admin(&headers, &s).await {
405 return *r;
406 }
407 let mut value = match serde_json::to_value(&*s.engine_config) {
408 Ok(v) => v,
409 Err(e) => return server_error(&format!("serialise config: {e}")),
410 };
411 redact_secrets(&mut value);
412 (StatusCode::OK, Json(value)).into_response()
413}
414
415fn redact_secrets(v: &mut Value) {
421 let placeholder = Value::String("[REDACTED]".to_string());
422 match v {
423 Value::Object(map) => {
424 let keys: Vec<String> = map.keys().cloned().collect();
427 for k in keys {
428 let lk = k.to_lowercase();
429 if k == "admin_api_keys" {
430 if let Some(Value::Array(arr)) = map.get_mut(&k) {
431 for entry in arr {
432 *entry = placeholder.clone();
433 }
434 }
435 continue;
436 }
437 if (lk.contains("password")
438 || lk.contains("secret")
439 || lk.contains("api_key")
440 || lk.contains("api-key"))
441 && let Some(slot) = map.get_mut(&k)
442 {
443 match slot {
444 Value::String(_) => *slot = placeholder.clone(),
445 Value::Array(arr) => {
446 for entry in arr {
447 if let Value::String(_) = entry {
448 *entry = placeholder.clone();
449 } else {
450 redact_secrets(entry);
451 }
452 }
453 }
454 other => redact_secrets(other),
455 }
456 continue;
457 }
458 if let Some(slot) = map.get_mut(&k) {
459 redact_secrets(slot);
460 }
461 }
462 }
463 Value::Array(arr) => {
464 for entry in arr {
465 redact_secrets(entry);
466 }
467 }
468 _ => {}
469 }
470}
471
472async fn require_admin<S: WorkflowStore + Clone + 'static>(
483 headers: &HeaderMap,
484 state: &EngineState<S>,
485) -> Result<(), Box<Response>> {
486 let auth = state
487 .auth
488 .as_ref()
489 .ok_or_else(|| svc_unavailable_box("auth not configured on this engine instance"))?;
490 let keys = crate::state::AdminApiKeys(std::sync::Arc::clone(&state.admin_api_keys));
491 assay_auth::gate::require_role_for(headers, auth, &keys, "engine", "core", "admin")
492 .await
493 .map(|_| ())
494}
495
496fn svc_unavailable_box(msg: &str) -> Box<Response> {
497 Box::new(
498 (
499 StatusCode::SERVICE_UNAVAILABLE,
500 Json(json!({"error": "service_unavailable", "error_description": msg})),
501 )
502 .into_response(),
503 )
504}
505
506fn bearer_token(headers: &HeaderMap) -> Option<String> {
507 let raw = headers
508 .get(header::AUTHORIZATION)
509 .and_then(|v| v.to_str().ok())?;
510 raw.strip_prefix("Bearer ")
511 .or_else(|| raw.strip_prefix("bearer "))
512 .map(|s| s.trim().to_string())
513}
514
515fn short_actor(token: String) -> String {
520 let t = token.trim();
521 if t.len() <= 6 {
522 return format!("admin:****{t}");
523 }
524 let tail = &t[t.len() - 6..];
525 format!("admin:****{tail}")
526}
527
528fn server_error(msg: &str) -> Response {
529 (
530 StatusCode::INTERNAL_SERVER_ERROR,
531 Json(json!({"error": "server_error", "error_description": msg})),
532 )
533 .into_response()
534}
535
536async fn list_module_records(
545 cfg: &EngineConfig,
546) -> anyhow::Result<Vec<assay_domain::engine::ModuleRecord>> {
547 match &cfg.backend {
548 #[cfg(feature = "backend-postgres")]
549 BackendConfig::Postgres { url } => {
550 let pool = sqlx::PgPool::connect(url)
551 .await
552 .map_err(|e| anyhow::anyhow!("connect pg: {e}"))?;
553 let schema = assay_domain::engine::PgEngineSchema::new(pool);
554 let rows = schema
555 .list_modules()
556 .await
557 .map_err(|e| anyhow::anyhow!("list modules (pg): {e}"))?;
558 Ok(rows)
559 }
560 #[cfg(feature = "backend-sqlite")]
561 BackendConfig::Sqlite { .. } => {
562 let pool = open_sqlite_engine_pool(cfg).await?;
563 let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
564 let rows = schema
565 .list_modules()
566 .await
567 .map_err(|e| anyhow::anyhow!("list modules (sqlite): {e}"))?;
568 Ok(rows)
569 }
570 #[allow(unreachable_patterns)]
571 _ => anyhow::bail!("backend not enabled at compile time"),
572 }
573}
574
575async fn set_module_enabled(
576 cfg: &EngineConfig,
577 name: &str,
578 enabled: bool,
579 actor: Option<&str>,
580) -> anyhow::Result<bool> {
581 match &cfg.backend {
582 #[cfg(feature = "backend-postgres")]
583 BackendConfig::Postgres { url } => {
584 let pool = sqlx::PgPool::connect(url).await?;
585 let schema = assay_domain::engine::PgEngineSchema::new(pool);
586 schema.set_module_enabled(name, enabled, actor).await
587 }
588 #[cfg(feature = "backend-sqlite")]
589 BackendConfig::Sqlite { .. } => {
590 let pool = open_sqlite_engine_pool(cfg).await?;
591 let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
592 schema.set_module_enabled(name, enabled, actor).await
593 }
594 #[allow(unreachable_patterns)]
595 _ => anyhow::bail!("backend not enabled at compile time"),
596 }
597}
598
599async fn audit_module_toggle(
600 cfg: &EngineConfig,
601 name: &str,
602 enabled: bool,
603 actor: Option<&str>,
604) -> anyhow::Result<()> {
605 let details = json!({"module": name, "enabled": enabled});
606 match &cfg.backend {
607 #[cfg(feature = "backend-postgres")]
608 BackendConfig::Postgres { url } => {
609 let pool = sqlx::PgPool::connect(url).await?;
610 let schema = assay_domain::engine::PgEngineSchema::new(pool);
611 schema.audit(actor, "engine.module.toggle", &details).await
612 }
613 #[cfg(feature = "backend-sqlite")]
614 BackendConfig::Sqlite { .. } => {
615 let pool = open_sqlite_engine_pool(cfg).await?;
616 let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
617 schema.audit(actor, "engine.module.toggle", &details).await
618 }
619 #[allow(unreachable_patterns)]
620 _ => anyhow::bail!("backend not enabled at compile time"),
621 }
622}
623
624async fn list_instance_records(
625 cfg: &EngineConfig,
626) -> anyhow::Result<Vec<assay_domain::engine::InstanceRecord>> {
627 match &cfg.backend {
628 #[cfg(feature = "backend-postgres")]
629 BackendConfig::Postgres { url } => {
630 let pool = sqlx::PgPool::connect(url).await?;
631 let schema = assay_domain::engine::PgEngineSchema::new(pool);
632 schema.list_instances().await
633 }
634 #[cfg(feature = "backend-sqlite")]
635 BackendConfig::Sqlite { .. } => {
636 let pool = open_sqlite_engine_pool(cfg).await?;
637 let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
638 schema.list_instances().await
639 }
640 #[allow(unreachable_patterns)]
641 _ => anyhow::bail!("backend not enabled at compile time"),
642 }
643}
644
645async fn list_audit_records(
646 cfg: &EngineConfig,
647 limit: i64,
648 offset: i64,
649 actor: Option<&str>,
650 action: Option<&str>,
651 since: Option<f64>,
652 until: Option<f64>,
653) -> anyhow::Result<(Vec<assay_domain::engine::AuditRecord>, i64)> {
654 match &cfg.backend {
655 #[cfg(feature = "backend-postgres")]
656 BackendConfig::Postgres { url } => {
657 let pool = sqlx::PgPool::connect(url).await?;
658 let schema = assay_domain::engine::PgEngineSchema::new(pool);
659 schema
660 .list_audit(limit, offset, actor, action, since, until)
661 .await
662 }
663 #[cfg(feature = "backend-sqlite")]
664 BackendConfig::Sqlite { .. } => {
665 let pool = open_sqlite_engine_pool(cfg).await?;
666 let schema = assay_domain::engine::SqliteEngineSchema::new(pool);
667 schema
668 .list_audit(limit, offset, actor, action, since, until)
669 .await
670 }
671 #[allow(unreachable_patterns)]
672 _ => anyhow::bail!("backend not enabled at compile time"),
673 }
674}
675
676#[cfg(feature = "backend-sqlite")]
682async fn open_sqlite_engine_pool(cfg: &EngineConfig) -> anyhow::Result<sqlx::SqlitePool> {
683 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
684 use std::str::FromStr;
685
686 let data_dir = cfg
687 .backend
688 .sqlite_data_dir()
689 .ok_or_else(|| anyhow::anyhow!("sqlite_data_dir missing on non-sqlite backend"))?;
690 let path = format!("file:{data_dir}/engine.db?mode=rw");
691 let opts = SqliteConnectOptions::from_str("sqlite::memory:")?.create_if_missing(true);
692 let pool = SqlitePoolOptions::new()
693 .max_connections(1)
694 .after_connect(move |conn, _meta| {
695 let path = path.clone();
696 Box::pin(async move {
697 use sqlx::Executor;
698 conn.execute(format!("ATTACH DATABASE '{path}' AS engine").as_str())
699 .await?;
700 Ok(())
701 })
702 })
703 .connect_with(opts)
704 .await
705 .map_err(|e| anyhow::anyhow!("connect engine.db: {e}"))?;
706 Ok(pool)
707}
708
709fn redact_pg_url(raw: &str) -> String {
716 let Ok(u) = url::Url::parse(raw) else {
717 return "[REDACTED]".to_string();
718 };
719 let scheme = u.scheme();
720 let host = u.host_str().unwrap_or("");
721 let port = u.port().map(|p| format!(":{p}")).unwrap_or_default();
722 let path = u.path();
723 let query = match u.query() {
724 Some(q) => format!("?{q}"),
725 None => String::new(),
726 };
727 let userinfo = if !u.username().is_empty() || u.password().is_some() {
728 "[REDACTED]@"
729 } else {
730 ""
731 };
732 format!("{scheme}://{userinfo}{host}{port}{path}{query}")
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 #[test]
740 fn redact_secrets_replaces_admin_api_keys_array() {
741 let mut v = json!({
742 "auth": {
743 "admin_api_keys": ["secret1", "secret2"],
744 "issuer": "https://issuer.example",
745 }
746 });
747 redact_secrets(&mut v);
748 let arr = v["auth"]["admin_api_keys"].as_array().unwrap();
749 assert!(arr.iter().all(|x| x.as_str() == Some("[REDACTED]")));
750 assert_eq!(v["auth"]["issuer"], json!("https://issuer.example"));
752 }
753
754 #[test]
755 fn redact_secrets_replaces_password_fields() {
756 let mut v = json!({"backend": {"password": "hunter2"}});
757 redact_secrets(&mut v);
758 assert_eq!(v["backend"]["password"], json!("[REDACTED]"));
759 }
760
761 #[test]
762 fn redact_pg_url_strips_userinfo() {
763 let raw = "postgres://alice:hunter2@db.example.com:5432/assay";
764 let red = redact_pg_url(raw);
765 assert!(!red.contains("hunter2"), "password leak: {red}");
766 assert!(red.contains("[REDACTED]"));
767 assert!(red.contains("db.example.com:5432"));
768 assert!(red.contains("/assay"));
769 }
770
771 #[test]
776 fn short_actor_safely_truncates() {
777 let s = short_actor("abcdef0123456789".to_string());
778 assert_eq!(s, "admin:****456789");
779 let s = short_actor("xyz".to_string());
781 assert!(s.starts_with("admin:****"));
782 }
783}