Skip to main content

assay_engine/
init.rs

1//! Engine boot sequence (v0.1.2 — schema/ATTACH layout).
2//!
3//! Implements the 8-step boot sequence from plan 14:
4//!
5//! 1. Open engine storage (PG: connect; SQLite: create data_dir + open
6//!    a router connection that ATTACHes one file per module)
7//! 2. Apply engine schema migrations (creates `engine.modules`,
8//!    `engine.audit`, `engine.instances`, `engine.migrations`)
9//! 3. Read `engine.modules` — on first boot seed it from the running
10//!    build's compile-time modules; on subsequent boots just SELECT
11//!    enabled modules
12//! 4. For each enabled module: PG `CREATE SCHEMA IF NOT EXISTS <m>`
13//!    or SQLite ensure-attached, then run module migrations
14//! 5. Wire trait routing — handled by callers (engine binary builds the
15//!    `WorkflowStore` against the prepared pool)
16//! 6. Engine-level multi-node coordination:
17//!    - PG: pg_try_advisory_lock(1) for leader election (existing path)
18//!    - SQLite: engine.lock single-row exclusive (existing path)
19//!    - Insert into `engine.instances` on startup, refresh on timer,
20//!      DELETE on graceful shutdown
21//! 7. Mount HTTP routers from each enabled module (caller wires them)
22//! 8. Start scheduler, workers, etc. (caller wires them)
23//!
24//! [`EngineBoot`] returns the prepared pool(s), the engine-events bus,
25//! the instance id, and the list of enabled modules — everything callers
26//! need to compose `WorkflowStore`, `WorkflowCtx`, and the HTTP router.
27
28use std::sync::Arc;
29use std::time::Duration;
30
31use assay_domain::events::EngineEventBus;
32use tracing::info;
33
34use crate::config::{BackendConfig, EngineConfig};
35
36/// One row to seed into `engine.modules` on first boot.
37/// `default_enabled = false` means operators must flip it to TRUE
38/// before its migrations run — used for opt-in modules like auth so
39/// existing v0.1.2 deployments don't get unexpected schema changes.
40#[derive(Debug, Clone)]
41pub struct BuiltinModule {
42    pub name: &'static str,
43    pub version: &'static str,
44    pub default_enabled: bool,
45}
46
47/// Built-in modules implied by the running build's compile-time features.
48///
49/// Workflow is always-on (the engine is currently the workflow runtime).
50/// Auth — when compiled in via the `auth` Cargo feature — seeds disabled
51/// so operators of existing v0.1.2 deployments don't get unexpected
52/// auth migrations on upgrade. Local dev flips this via
53/// `EngineConfig.auto_enable_modules = ["auth"]`.
54pub fn builtin_modules() -> Vec<BuiltinModule> {
55    #[cfg_attr(not(feature = "vault"), allow(unused_mut))]
56    let mut mods = vec![
57        BuiltinModule {
58            name: "workflow",
59            version: env!("CARGO_PKG_VERSION"),
60            default_enabled: true,
61        },
62        // engine itself authenticates every admin + workflow request via
63        // the auth module; running with auth disabled isn't supported.
64        BuiltinModule {
65            name: "auth",
66            version: env!("CARGO_PKG_VERSION"),
67            default_enabled: true,
68        },
69    ];
70    // Vault module (plan 17 / v0.3.0). Default-enabled when compiled in —
71    // this is the marquee module of v0.3.0 and the engine binary's vault
72    // wiring panics if the module is on without a backing VaultCtx.
73    #[cfg(feature = "vault")]
74    mods.push(BuiltinModule {
75        name: "vault",
76        version: env!("CARGO_PKG_VERSION"),
77        default_enabled: true,
78    });
79    mods
80}
81
82/// Heartbeat interval for the engine.instances row. Tightened in
83/// v0.3.0 (plan 17 §S9) so secondary engine pods can detect a failed
84/// primary within [`INSTANCE_STALE_SECS`] of the actual failure.
85///
86/// 3-second heartbeat × 10-second stale-cutoff = primary loss is
87/// observed by every other instance within ~10s (worst case: a
88/// heartbeat just succeeded, then the primary dies; the row stays
89/// "fresh" for the remainder of the 10-second window).
90///
91/// HA tradeoffs:
92/// - Lower heartbeat → faster failover detection, more PG writes per
93///   second per pod (~1 row/s/pod is negligible at any realistic
94///   fleet size).
95/// - Higher stale cutoff → reduces false-positive failovers from
96///   transient network blips, but slows real-failure detection.
97///
98/// 3s × 10s is plan §S9's locked target.
99const INSTANCE_HEARTBEAT_SECS: u64 = 3;
100// Used by the PG cleanup task that prunes dead `engine.instances` rows.
101// SQLite path is single-instance and never accumulates stale rows.
102#[cfg(feature = "backend-postgres")]
103const INSTANCE_STALE_SECS: f64 = 10.0;
104
105/// Result of the engine boot sequence — the parts each backend wired up.
106/// The engine binary uses these to compose its `WorkflowStore` /
107/// `WorkflowCtx` / HTTP router.
108pub enum EngineBoot {
109    #[cfg(feature = "backend-postgres")]
110    Postgres(PgBoot),
111    #[cfg(feature = "backend-sqlite")]
112    Sqlite(SqliteBoot),
113}
114
115#[cfg(feature = "backend-postgres")]
116pub struct PgBoot {
117    pub pool: sqlx::PgPool,
118    pub bus: Arc<dyn EngineEventBus>,
119    pub instance_id: uuid::Uuid,
120    pub modules: Vec<String>,
121}
122
123#[cfg(feature = "backend-sqlite")]
124pub struct SqliteBoot {
125    pub pool: sqlx::SqlitePool,
126    pub bus: Arc<dyn EngineEventBus>,
127    pub instance_id: uuid::Uuid,
128    pub modules: Vec<String>,
129}
130
131impl EngineBoot {
132    /// Run the boot sequence end-to-end against the configured backend.
133    pub async fn run(cfg: &EngineConfig) -> anyhow::Result<Self> {
134        match cfg.backend.clone() {
135            #[cfg(feature = "backend-postgres")]
136            BackendConfig::Postgres { url } => {
137                let boot = pg_boot(&url, &cfg.auto_enable_modules).await?;
138                Ok(EngineBoot::Postgres(boot))
139            }
140            #[cfg(feature = "backend-sqlite")]
141            BackendConfig::Sqlite { .. } => {
142                let data_dir = cfg
143                    .backend
144                    .sqlite_data_dir()
145                    .expect("sqlite backend yields data_dir");
146                let boot = sqlite_boot(&data_dir, &cfg.auto_enable_modules).await?;
147                Ok(EngineBoot::Sqlite(boot))
148            }
149            #[allow(unreachable_patterns)]
150            _ => anyhow::bail!("backend not enabled at compile time"),
151        }
152    }
153
154    pub fn modules(&self) -> &[String] {
155        match self {
156            #[cfg(feature = "backend-postgres")]
157            EngineBoot::Postgres(b) => &b.modules,
158            #[cfg(feature = "backend-sqlite")]
159            EngineBoot::Sqlite(b) => &b.modules,
160        }
161    }
162
163    pub fn instance_id(&self) -> uuid::Uuid {
164        match self {
165            #[cfg(feature = "backend-postgres")]
166            EngineBoot::Postgres(b) => b.instance_id,
167            #[cfg(feature = "backend-sqlite")]
168            EngineBoot::Sqlite(b) => b.instance_id,
169        }
170    }
171}
172
173#[cfg(feature = "backend-postgres")]
174async fn pg_boot(url: &str, auto_enable: &[String]) -> anyhow::Result<PgBoot> {
175    use assay_domain::engine::PgEngineSchema;
176    use assay_domain::events::PgEngineEventBus;
177    use sqlx::PgPool;
178
179    info!(target: "assay-engine", "boot: connecting to postgres");
180    let pool = PgPool::connect(url)
181        .await
182        .map_err(|e| anyhow::anyhow!("connect postgres: {e}"))?;
183
184    let schema = PgEngineSchema::new(pool.clone());
185    schema
186        .migrate()
187        .await
188        .map_err(|e| anyhow::anyhow!("engine schema migrate (pg): {e}"))?;
189    record_engine_migration_pg(&pool, "engine", 1).await?;
190
191    let modules = read_or_seed_modules_pg(&schema, auto_enable).await?;
192
193    // Per-module schema setup. The workflow module's actual DDL still
194    // runs inside `PostgresStore::migrate` when the engine binary builds
195    // the store — Phase 2 already moved those tables into the `workflow`
196    // schema. We just ensure the schema container exists here so a fresh
197    // boot doesn't fail before the store's CREATE TABLE runs.
198    for name in &modules {
199        let create = format!("CREATE SCHEMA IF NOT EXISTS {name}");
200        sqlx::query(&create)
201            .execute(&pool)
202            .await
203            .map_err(|e| anyhow::anyhow!("create schema {name}: {e}"))?;
204        record_engine_migration_pg(&pool, name, 1).await?;
205    }
206
207    // Auth schema migration — always runs (auth is mandatory per
208    // boot) and smoke-touches the OIDC provider tables so missing DDL or
209    // permission issues surface here rather than at first request.
210    if modules.iter().any(|m| m == "auth") {
211        assay_auth::schema::migrate_postgres(&pool)
212            .await
213            .map_err(|e| anyhow::anyhow!("auth schema migrate (pg): {e}"))?;
214        let _ = assay_auth::biscuit::load_or_init_postgres(&pool)
215            .await
216            .map_err(|e| anyhow::anyhow!("biscuit root key bootstrap (pg): {e}"))?;
217        sqlx::query("SELECT COUNT(*) FROM auth.oidc_clients")
218            .fetch_one(&pool)
219            .await
220            .map_err(|e| anyhow::anyhow!("oidc provider tables (pg): {e}"))?;
221    }
222
223    // Vault schema migration (plan 17 / v0.3.0). Smoke-touches one of
224    // the locked tables so missing DDL or permission issues surface here.
225    #[cfg(feature = "vault")]
226    if modules.iter().any(|m| m == "vault") {
227        assay_vault::schema::migrate_postgres(&pool)
228            .await
229            .map_err(|e| anyhow::anyhow!("vault schema migrate (pg): {e}"))?;
230        sqlx::query("SELECT COUNT(*) FROM vault.kv_meta")
231            .fetch_one(&pool)
232            .await
233            .map_err(|e| anyhow::anyhow!("vault tables (pg): {e}"))?;
234    }
235
236    let bus: Arc<dyn EngineEventBus> = Arc::new(
237        PgEngineEventBus::new(pool.clone(), url)
238            .await
239            .map_err(|e| anyhow::anyhow!("engine-events bus (pg): {e}"))?,
240    );
241
242    let instance_id = schema
243        .register_instance(&modules, Some(env!("CARGO_PKG_VERSION")))
244        .await
245        .map_err(|e| anyhow::anyhow!("register engine.instances row: {e}"))?;
246    spawn_pg_instance_lifecycle(pool.clone(), instance_id);
247
248    info!(target: "assay-engine", instance = %instance_id, modules = ?modules, "boot complete (pg)");
249    Ok(PgBoot {
250        pool,
251        bus,
252        instance_id,
253        modules,
254    })
255}
256
257#[cfg(feature = "backend-postgres")]
258async fn read_or_seed_modules_pg(
259    schema: &assay_domain::engine::PgEngineSchema,
260    auto_enable: &[String],
261) -> anyhow::Result<Vec<String>> {
262    let existing = schema
263        .list_modules()
264        .await
265        .map_err(|e| anyhow::anyhow!("list engine.modules (pg): {e}"))?;
266    let known: std::collections::HashSet<String> =
267        existing.iter().map(|m| m.name.clone()).collect();
268
269    // Seed any compile-time module that isn't already in engine.modules.
270    // Each module's `default_enabled` is honoured unless the operator
271    // explicitly listed it in `auto_enable_modules` — that override
272    // exists so local-dev configs can flip auth on without an extra
273    // setup step.
274    for module in builtin_modules() {
275        if known.contains(module.name) {
276            continue;
277        }
278        let enabled = module.default_enabled
279            || auto_enable.iter().any(|n| n == module.name);
280        schema
281            .upsert_module(module.name, Some(module.version), enabled)
282            .await
283            .map_err(|e| anyhow::anyhow!("seed engine.modules row {}: {e}", module.name))?;
284    }
285
286    let final_list = schema
287        .list_modules()
288        .await
289        .map_err(|e| anyhow::anyhow!("re-list engine.modules (pg): {e}"))?;
290    Ok(final_list
291        .into_iter()
292        .filter(|m| m.enabled)
293        .map(|m| m.name)
294        .collect())
295}
296
297#[cfg(feature = "backend-postgres")]
298async fn record_engine_migration_pg(
299    pool: &sqlx::PgPool,
300    module: &str,
301    version: i32,
302) -> anyhow::Result<()> {
303    sqlx::query(
304        "INSERT INTO engine.migrations (module, version)
305         VALUES ($1, $2) ON CONFLICT DO NOTHING",
306    )
307    .bind(module)
308    .bind(version)
309    .execute(pool)
310    .await
311    .map_err(|e| anyhow::anyhow!("record engine.migrations row {module}/{version}: {e}"))?;
312    Ok(())
313}
314
315#[cfg(feature = "backend-postgres")]
316fn spawn_pg_instance_lifecycle(pool: sqlx::PgPool, id: uuid::Uuid) {
317    use assay_domain::engine::PgEngineSchema;
318    let schema = PgEngineSchema::new(pool.clone());
319    tokio::spawn(async move {
320        let mut tick = tokio::time::interval(Duration::from_secs(INSTANCE_HEARTBEAT_SECS));
321        loop {
322            tick.tick().await;
323            if let Err(e) = schema.heartbeat_instance(id).await {
324                tracing::warn!(?e, %id, "engine.instances heartbeat failed");
325            }
326            // Best-effort stale cleanup. Idempotent — multiple instances
327            // racing the same DELETE is fine.
328            let cutoff_sql = format!(
329                "DELETE FROM engine.instances
330                 WHERE last_heartbeat < EXTRACT(EPOCH FROM NOW()) - {INSTANCE_STALE_SECS}"
331            );
332            if let Err(e) = sqlx::query(&cutoff_sql).execute(&pool).await {
333                tracing::debug!(?e, "engine.instances stale cleanup failed");
334            }
335        }
336    });
337}
338
339#[cfg(feature = "backend-sqlite")]
340async fn sqlite_boot(data_dir: &str, auto_enable: &[String]) -> anyhow::Result<SqliteBoot> {
341    use assay_domain::engine::SqliteEngineSchema;
342    use assay_domain::events::SqliteEngineEventBus;
343    use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
344    use std::str::FromStr;
345
346    let in_memory = data_dir == ":memory:";
347    if !in_memory {
348        std::fs::create_dir_all(data_dir)
349            .map_err(|e| anyhow::anyhow!("create data_dir {data_dir}: {e}"))?;
350    }
351
352    // The connection's "main" is a transient in-memory router. All real
353    // tables live in ATTACHed databases so engine-qualified queries
354    // (`engine.events`, `workflow.workflows`) match the PG syntax exactly.
355    let main_url = "sqlite::memory:";
356    let opts = SqliteConnectOptions::from_str(main_url)?.create_if_missing(true);
357
358    let engine_attach = sqlite_attach_uri(data_dir, "engine", in_memory);
359    let workflow_attach = sqlite_attach_uri(data_dir, "workflow", in_memory);
360    let auth_attach = sqlite_attach_uri(data_dir, "auth", in_memory);
361    #[cfg(feature = "vault")]
362    let vault_attach = sqlite_attach_uri(data_dir, "vault", in_memory);
363
364    info!(
365        target: "assay-engine",
366        data_dir = %data_dir,
367        engine = %engine_attach,
368        workflow = %workflow_attach,
369        "boot: opening sqlite engine pool"
370    );
371
372    let pool = SqlitePoolOptions::new()
373        .max_connections(1)
374        .after_connect(move |conn, _meta| {
375            let engine_attach = engine_attach.clone();
376            let workflow_attach = workflow_attach.clone();
377            let auth_attach = auth_attach.clone();
378            #[cfg(feature = "vault")]
379            let vault_attach = vault_attach.clone();
380            Box::pin(async move {
381                use sqlx::Executor;
382                conn.execute(
383                    format!("ATTACH DATABASE '{engine_attach}' AS engine").as_str(),
384                )
385                .await?;
386                conn.execute(
387                    format!("ATTACH DATABASE '{workflow_attach}' AS workflow").as_str(),
388                )
389                .await?;
390                conn.execute(
391                    format!("ATTACH DATABASE '{auth_attach}' AS auth").as_str(),
392                )
393                .await?;
394                #[cfg(feature = "vault")]
395                conn.execute(
396                    format!("ATTACH DATABASE '{vault_attach}' AS vault").as_str(),
397                )
398                .await?;
399                Ok(())
400            })
401        })
402        .connect_with(opts)
403        .await
404        .map_err(|e| anyhow::anyhow!("connect sqlite: {e}"))?;
405
406    let schema = SqliteEngineSchema::new(pool.clone());
407    schema
408        .migrate()
409        .await
410        .map_err(|e| anyhow::anyhow!("engine schema migrate (sqlite): {e}"))?;
411    record_engine_migration_sqlite(&pool, "engine", 1).await?;
412
413    let modules = read_or_seed_modules_sqlite(&schema, auto_enable).await?;
414    for name in &modules {
415        record_engine_migration_sqlite(&pool, name, 1).await?;
416    }
417
418    // Auth schema migration — always runs (auth is mandatory per
419    if modules.iter().any(|m| m == "auth") {
420        assay_auth::schema::migrate_sqlite(&pool)
421            .await
422            .map_err(|e| anyhow::anyhow!("auth schema migrate (sqlite): {e}"))?;
423        let _ = assay_auth::biscuit::load_or_init_sqlite(&pool)
424            .await
425            .map_err(|e| anyhow::anyhow!("biscuit root key bootstrap (sqlite): {e}"))?;
426        sqlx::query("SELECT COUNT(*) FROM auth.oidc_clients")
427            .fetch_one(&pool)
428            .await
429            .map_err(|e| anyhow::anyhow!("oidc provider tables (sqlite): {e}"))?;
430    }
431
432    // Vault schema migration (plan 17 / v0.3.0).
433    #[cfg(feature = "vault")]
434    if modules.iter().any(|m| m == "vault") {
435        assay_vault::schema::migrate_sqlite(&pool)
436            .await
437            .map_err(|e| anyhow::anyhow!("vault schema migrate (sqlite): {e}"))?;
438        sqlx::query("SELECT COUNT(*) FROM vault.kv_meta")
439            .fetch_one(&pool)
440            .await
441            .map_err(|e| anyhow::anyhow!("vault tables (sqlite): {e}"))?;
442    }
443
444    let bus: Arc<dyn EngineEventBus> = Arc::new(
445        SqliteEngineEventBus::new(pool.clone())
446            .await
447            .map_err(|e| anyhow::anyhow!("engine-events bus (sqlite): {e}"))?,
448    );
449
450    let instance_id = schema
451        .register_instance(&modules, Some(env!("CARGO_PKG_VERSION")))
452        .await
453        .map_err(|e| anyhow::anyhow!("register engine.instances row: {e}"))?;
454    spawn_sqlite_instance_lifecycle(pool.clone(), instance_id);
455
456    info!(target: "assay-engine", instance = %instance_id, modules = ?modules, "boot complete (sqlite)");
457    Ok(SqliteBoot {
458        pool,
459        bus,
460        instance_id,
461        modules,
462    })
463}
464
465#[cfg(feature = "backend-sqlite")]
466fn sqlite_attach_uri(data_dir: &str, module: &str, in_memory: bool) -> String {
467    if in_memory {
468        // Shared-cache memdb so every connection in the pool sees the
469        // same in-memory tables, and so reopening the pool after process
470        // restart picks up the fresh DB. Per-process suffix avoids
471        // collisions when multiple engines run in the same test binary.
472        use std::sync::atomic::{AtomicU64, Ordering};
473        static SEQ: AtomicU64 = AtomicU64::new(0);
474        let suffix = format!(
475            "{}_{}",
476            std::process::id(),
477            SEQ.fetch_add(1, Ordering::Relaxed)
478        );
479        format!("file:assay_{module}_{suffix}?mode=memory&cache=shared")
480    } else {
481        format!("file:{data_dir}/{module}.db?mode=rwc")
482    }
483}
484
485#[cfg(feature = "backend-sqlite")]
486async fn read_or_seed_modules_sqlite(
487    schema: &assay_domain::engine::SqliteEngineSchema,
488    auto_enable: &[String],
489) -> anyhow::Result<Vec<String>> {
490    let existing = schema
491        .list_modules()
492        .await
493        .map_err(|e| anyhow::anyhow!("list engine.modules (sqlite): {e}"))?;
494    let known: std::collections::HashSet<String> =
495        existing.iter().map(|m| m.name.clone()).collect();
496
497    // Same per-module insert pattern as the PG path: skip rows that
498    // already exist, honour `default_enabled` unless the operator
499    // explicitly auto-enabled the module.
500    for module in builtin_modules() {
501        if known.contains(module.name) {
502            continue;
503        }
504        let enabled = module.default_enabled
505            || auto_enable.iter().any(|n| n == module.name);
506        schema
507            .upsert_module(module.name, Some(module.version), enabled)
508            .await
509            .map_err(|e| anyhow::anyhow!("seed engine.modules row {}: {e}", module.name))?;
510    }
511
512    let final_list = schema
513        .list_modules()
514        .await
515        .map_err(|e| anyhow::anyhow!("re-list engine.modules (sqlite): {e}"))?;
516    Ok(final_list
517        .into_iter()
518        .filter(|m| m.enabled)
519        .map(|m| m.name)
520        .collect())
521}
522
523#[cfg(feature = "backend-sqlite")]
524async fn record_engine_migration_sqlite(
525    pool: &sqlx::SqlitePool,
526    module: &str,
527    version: i32,
528) -> anyhow::Result<()> {
529    sqlx::query(
530        "INSERT OR IGNORE INTO engine.migrations (module, version)
531         VALUES (?, ?)",
532    )
533    .bind(module)
534    .bind(version)
535    .execute(pool)
536    .await
537    .map_err(|e| anyhow::anyhow!("record engine.migrations row {module}/{version}: {e}"))?;
538    Ok(())
539}
540
541#[cfg(feature = "backend-sqlite")]
542fn spawn_sqlite_instance_lifecycle(pool: sqlx::SqlitePool, id: uuid::Uuid) {
543    use assay_domain::engine::SqliteEngineSchema;
544    let schema = SqliteEngineSchema::new(pool);
545    tokio::spawn(async move {
546        let mut tick = tokio::time::interval(Duration::from_secs(INSTANCE_HEARTBEAT_SECS));
547        loop {
548            tick.tick().await;
549            if let Err(e) = schema.heartbeat_instance(id).await {
550                tracing::warn!(?e, %id, "engine.instances heartbeat failed");
551            }
552        }
553    });
554}
555
556#[cfg(all(test, feature = "backend-sqlite"))]
557mod tests {
558    use super::*;
559
560    /// Plan-15 slice 3: auth is default-enabled. The `auto_enable_modules`
561    /// argument is a no-op for auth (kept as a setting for forward-compat
562    /// with future opt-in modules) — auth always runs its migration on
563    /// first boot now.
564    #[tokio::test(flavor = "multi_thread")]
565    async fn sqlite_boot_default_runs_auth_migration() {
566        let boot = sqlite_boot(":memory:", &[]).await.expect("boot");
567        assert!(
568            boot.modules.iter().any(|m| m == "auth"),
569            "auth must be in active modules by default; got {:?}",
570            boot.modules
571        );
572        // Auth migration recorded.
573        let auth_row: Option<(String,)> = sqlx::query_as(
574            "SELECT module FROM engine.migrations WHERE module = 'auth'",
575        )
576        .fetch_optional(&boot.pool)
577        .await
578        .expect("query engine.migrations");
579        assert!(
580            auth_row.is_some(),
581            "engine.migrations should have an auth row after auto-enabled boot"
582        );
583        // auth.users table should exist (proves migrate_sqlite ran against
584        // the ATTACHed auth db).
585        let user_count: (i64,) =
586            sqlx::query_as("SELECT COUNT(*) FROM auth.users")
587                .fetch_one(&boot.pool)
588                .await
589                .expect("count auth.users");
590        assert_eq!(user_count.0, 0);
591    }
592}