1use std::sync::Arc;
29use std::time::Duration;
30
31use assay_domain::events::EngineEventBus;
32use tracing::info;
33
34use crate::config::{BackendConfig, EngineConfig};
35
36#[derive(Debug, Clone)]
41pub struct BuiltinModule {
42 pub name: &'static str,
43 pub version: &'static str,
44 pub default_enabled: bool,
45}
46
47pub fn builtin_modules() -> Vec<BuiltinModule> {
55 #[cfg_attr(not(feature = "vault"), allow(unused_mut))]
56 let mut mods = vec![
57 BuiltinModule {
58 name: "workflow",
59 version: env!("CARGO_PKG_VERSION"),
60 default_enabled: true,
61 },
62 BuiltinModule {
65 name: "auth",
66 version: env!("CARGO_PKG_VERSION"),
67 default_enabled: true,
68 },
69 ];
70 #[cfg(feature = "vault")]
74 mods.push(BuiltinModule {
75 name: "vault",
76 version: env!("CARGO_PKG_VERSION"),
77 default_enabled: true,
78 });
79 mods
80}
81
82const INSTANCE_HEARTBEAT_SECS: u64 = 3;
100#[cfg(feature = "backend-postgres")]
103const INSTANCE_STALE_SECS: f64 = 10.0;
104
105pub enum EngineBoot {
109 #[cfg(feature = "backend-postgres")]
110 Postgres(PgBoot),
111 #[cfg(feature = "backend-sqlite")]
112 Sqlite(SqliteBoot),
113}
114
115#[cfg(feature = "backend-postgres")]
116pub struct PgBoot {
117 pub pool: sqlx::PgPool,
118 pub bus: Arc<dyn EngineEventBus>,
119 pub instance_id: uuid::Uuid,
120 pub modules: Vec<String>,
121}
122
123#[cfg(feature = "backend-sqlite")]
124pub struct SqliteBoot {
125 pub pool: sqlx::SqlitePool,
126 pub bus: Arc<dyn EngineEventBus>,
127 pub instance_id: uuid::Uuid,
128 pub modules: Vec<String>,
129}
130
131impl EngineBoot {
132 pub async fn run(cfg: &EngineConfig) -> anyhow::Result<Self> {
134 match cfg.backend.clone() {
135 #[cfg(feature = "backend-postgres")]
136 BackendConfig::Postgres { url } => {
137 let boot = pg_boot(&url, &cfg.auto_enable_modules).await?;
138 Ok(EngineBoot::Postgres(boot))
139 }
140 #[cfg(feature = "backend-sqlite")]
141 BackendConfig::Sqlite { .. } => {
142 let data_dir = cfg
143 .backend
144 .sqlite_data_dir()
145 .expect("sqlite backend yields data_dir");
146 let boot = sqlite_boot(&data_dir, &cfg.auto_enable_modules).await?;
147 Ok(EngineBoot::Sqlite(boot))
148 }
149 #[allow(unreachable_patterns)]
150 _ => anyhow::bail!("backend not enabled at compile time"),
151 }
152 }
153
154 pub fn modules(&self) -> &[String] {
155 match self {
156 #[cfg(feature = "backend-postgres")]
157 EngineBoot::Postgres(b) => &b.modules,
158 #[cfg(feature = "backend-sqlite")]
159 EngineBoot::Sqlite(b) => &b.modules,
160 }
161 }
162
163 pub fn instance_id(&self) -> uuid::Uuid {
164 match self {
165 #[cfg(feature = "backend-postgres")]
166 EngineBoot::Postgres(b) => b.instance_id,
167 #[cfg(feature = "backend-sqlite")]
168 EngineBoot::Sqlite(b) => b.instance_id,
169 }
170 }
171}
172
173#[cfg(feature = "backend-postgres")]
174async fn pg_boot(url: &str, auto_enable: &[String]) -> anyhow::Result<PgBoot> {
175 use assay_domain::engine::PgEngineSchema;
176 use assay_domain::events::PgEngineEventBus;
177 use sqlx::PgPool;
178
179 info!(target: "assay-engine", "boot: connecting to postgres");
180 let pool = PgPool::connect(url)
181 .await
182 .map_err(|e| anyhow::anyhow!("connect postgres: {e}"))?;
183
184 let schema = PgEngineSchema::new(pool.clone());
185 schema
186 .migrate()
187 .await
188 .map_err(|e| anyhow::anyhow!("engine schema migrate (pg): {e}"))?;
189 record_engine_migration_pg(&pool, "engine", 1).await?;
190
191 let modules = read_or_seed_modules_pg(&schema, auto_enable).await?;
192
193 for name in &modules {
199 let create = format!("CREATE SCHEMA IF NOT EXISTS {name}");
200 sqlx::query(&create)
201 .execute(&pool)
202 .await
203 .map_err(|e| anyhow::anyhow!("create schema {name}: {e}"))?;
204 record_engine_migration_pg(&pool, name, 1).await?;
205 }
206
207 if modules.iter().any(|m| m == "auth") {
211 assay_auth::schema::migrate_postgres(&pool)
212 .await
213 .map_err(|e| anyhow::anyhow!("auth schema migrate (pg): {e}"))?;
214 let _ = assay_auth::biscuit::load_or_init_postgres(&pool)
215 .await
216 .map_err(|e| anyhow::anyhow!("biscuit root key bootstrap (pg): {e}"))?;
217 sqlx::query("SELECT COUNT(*) FROM auth.oidc_clients")
218 .fetch_one(&pool)
219 .await
220 .map_err(|e| anyhow::anyhow!("oidc provider tables (pg): {e}"))?;
221 }
222
223 #[cfg(feature = "vault")]
226 if modules.iter().any(|m| m == "vault") {
227 assay_vault::schema::migrate_postgres(&pool)
228 .await
229 .map_err(|e| anyhow::anyhow!("vault schema migrate (pg): {e}"))?;
230 sqlx::query("SELECT COUNT(*) FROM vault.kv_meta")
231 .fetch_one(&pool)
232 .await
233 .map_err(|e| anyhow::anyhow!("vault tables (pg): {e}"))?;
234 }
235
236 let bus: Arc<dyn EngineEventBus> = Arc::new(
237 PgEngineEventBus::new(pool.clone(), url)
238 .await
239 .map_err(|e| anyhow::anyhow!("engine-events bus (pg): {e}"))?,
240 );
241
242 let instance_id = schema
243 .register_instance(&modules, Some(env!("CARGO_PKG_VERSION")))
244 .await
245 .map_err(|e| anyhow::anyhow!("register engine.instances row: {e}"))?;
246 spawn_pg_instance_lifecycle(pool.clone(), instance_id);
247
248 info!(target: "assay-engine", instance = %instance_id, modules = ?modules, "boot complete (pg)");
249 Ok(PgBoot {
250 pool,
251 bus,
252 instance_id,
253 modules,
254 })
255}
256
257#[cfg(feature = "backend-postgres")]
258async fn read_or_seed_modules_pg(
259 schema: &assay_domain::engine::PgEngineSchema,
260 auto_enable: &[String],
261) -> anyhow::Result<Vec<String>> {
262 let existing = schema
263 .list_modules()
264 .await
265 .map_err(|e| anyhow::anyhow!("list engine.modules (pg): {e}"))?;
266 let known: std::collections::HashSet<String> =
267 existing.iter().map(|m| m.name.clone()).collect();
268
269 for module in builtin_modules() {
275 if known.contains(module.name) {
276 continue;
277 }
278 let enabled = module.default_enabled
279 || auto_enable.iter().any(|n| n == module.name);
280 schema
281 .upsert_module(module.name, Some(module.version), enabled)
282 .await
283 .map_err(|e| anyhow::anyhow!("seed engine.modules row {}: {e}", module.name))?;
284 }
285
286 let final_list = schema
287 .list_modules()
288 .await
289 .map_err(|e| anyhow::anyhow!("re-list engine.modules (pg): {e}"))?;
290 Ok(final_list
291 .into_iter()
292 .filter(|m| m.enabled)
293 .map(|m| m.name)
294 .collect())
295}
296
297#[cfg(feature = "backend-postgres")]
298async fn record_engine_migration_pg(
299 pool: &sqlx::PgPool,
300 module: &str,
301 version: i32,
302) -> anyhow::Result<()> {
303 sqlx::query(
304 "INSERT INTO engine.migrations (module, version)
305 VALUES ($1, $2) ON CONFLICT DO NOTHING",
306 )
307 .bind(module)
308 .bind(version)
309 .execute(pool)
310 .await
311 .map_err(|e| anyhow::anyhow!("record engine.migrations row {module}/{version}: {e}"))?;
312 Ok(())
313}
314
315#[cfg(feature = "backend-postgres")]
316fn spawn_pg_instance_lifecycle(pool: sqlx::PgPool, id: uuid::Uuid) {
317 use assay_domain::engine::PgEngineSchema;
318 let schema = PgEngineSchema::new(pool.clone());
319 tokio::spawn(async move {
320 let mut tick = tokio::time::interval(Duration::from_secs(INSTANCE_HEARTBEAT_SECS));
321 loop {
322 tick.tick().await;
323 if let Err(e) = schema.heartbeat_instance(id).await {
324 tracing::warn!(?e, %id, "engine.instances heartbeat failed");
325 }
326 let cutoff_sql = format!(
329 "DELETE FROM engine.instances
330 WHERE last_heartbeat < EXTRACT(EPOCH FROM NOW()) - {INSTANCE_STALE_SECS}"
331 );
332 if let Err(e) = sqlx::query(&cutoff_sql).execute(&pool).await {
333 tracing::debug!(?e, "engine.instances stale cleanup failed");
334 }
335 }
336 });
337}
338
339#[cfg(feature = "backend-sqlite")]
340async fn sqlite_boot(data_dir: &str, auto_enable: &[String]) -> anyhow::Result<SqliteBoot> {
341 use assay_domain::engine::SqliteEngineSchema;
342 use assay_domain::events::SqliteEngineEventBus;
343 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
344 use std::str::FromStr;
345
346 let in_memory = data_dir == ":memory:";
347 if !in_memory {
348 std::fs::create_dir_all(data_dir)
349 .map_err(|e| anyhow::anyhow!("create data_dir {data_dir}: {e}"))?;
350 }
351
352 let main_url = "sqlite::memory:";
356 let opts = SqliteConnectOptions::from_str(main_url)?.create_if_missing(true);
357
358 let engine_attach = sqlite_attach_uri(data_dir, "engine", in_memory);
359 let workflow_attach = sqlite_attach_uri(data_dir, "workflow", in_memory);
360 let auth_attach = sqlite_attach_uri(data_dir, "auth", in_memory);
361 #[cfg(feature = "vault")]
362 let vault_attach = sqlite_attach_uri(data_dir, "vault", in_memory);
363
364 info!(
365 target: "assay-engine",
366 data_dir = %data_dir,
367 engine = %engine_attach,
368 workflow = %workflow_attach,
369 "boot: opening sqlite engine pool"
370 );
371
372 let pool = SqlitePoolOptions::new()
373 .max_connections(1)
374 .after_connect(move |conn, _meta| {
375 let engine_attach = engine_attach.clone();
376 let workflow_attach = workflow_attach.clone();
377 let auth_attach = auth_attach.clone();
378 #[cfg(feature = "vault")]
379 let vault_attach = vault_attach.clone();
380 Box::pin(async move {
381 use sqlx::Executor;
382 conn.execute(
383 format!("ATTACH DATABASE '{engine_attach}' AS engine").as_str(),
384 )
385 .await?;
386 conn.execute(
387 format!("ATTACH DATABASE '{workflow_attach}' AS workflow").as_str(),
388 )
389 .await?;
390 conn.execute(
391 format!("ATTACH DATABASE '{auth_attach}' AS auth").as_str(),
392 )
393 .await?;
394 #[cfg(feature = "vault")]
395 conn.execute(
396 format!("ATTACH DATABASE '{vault_attach}' AS vault").as_str(),
397 )
398 .await?;
399 Ok(())
400 })
401 })
402 .connect_with(opts)
403 .await
404 .map_err(|e| anyhow::anyhow!("connect sqlite: {e}"))?;
405
406 let schema = SqliteEngineSchema::new(pool.clone());
407 schema
408 .migrate()
409 .await
410 .map_err(|e| anyhow::anyhow!("engine schema migrate (sqlite): {e}"))?;
411 record_engine_migration_sqlite(&pool, "engine", 1).await?;
412
413 let modules = read_or_seed_modules_sqlite(&schema, auto_enable).await?;
414 for name in &modules {
415 record_engine_migration_sqlite(&pool, name, 1).await?;
416 }
417
418 if modules.iter().any(|m| m == "auth") {
420 assay_auth::schema::migrate_sqlite(&pool)
421 .await
422 .map_err(|e| anyhow::anyhow!("auth schema migrate (sqlite): {e}"))?;
423 let _ = assay_auth::biscuit::load_or_init_sqlite(&pool)
424 .await
425 .map_err(|e| anyhow::anyhow!("biscuit root key bootstrap (sqlite): {e}"))?;
426 sqlx::query("SELECT COUNT(*) FROM auth.oidc_clients")
427 .fetch_one(&pool)
428 .await
429 .map_err(|e| anyhow::anyhow!("oidc provider tables (sqlite): {e}"))?;
430 }
431
432 #[cfg(feature = "vault")]
434 if modules.iter().any(|m| m == "vault") {
435 assay_vault::schema::migrate_sqlite(&pool)
436 .await
437 .map_err(|e| anyhow::anyhow!("vault schema migrate (sqlite): {e}"))?;
438 sqlx::query("SELECT COUNT(*) FROM vault.kv_meta")
439 .fetch_one(&pool)
440 .await
441 .map_err(|e| anyhow::anyhow!("vault tables (sqlite): {e}"))?;
442 }
443
444 let bus: Arc<dyn EngineEventBus> = Arc::new(
445 SqliteEngineEventBus::new(pool.clone())
446 .await
447 .map_err(|e| anyhow::anyhow!("engine-events bus (sqlite): {e}"))?,
448 );
449
450 let instance_id = schema
451 .register_instance(&modules, Some(env!("CARGO_PKG_VERSION")))
452 .await
453 .map_err(|e| anyhow::anyhow!("register engine.instances row: {e}"))?;
454 spawn_sqlite_instance_lifecycle(pool.clone(), instance_id);
455
456 info!(target: "assay-engine", instance = %instance_id, modules = ?modules, "boot complete (sqlite)");
457 Ok(SqliteBoot {
458 pool,
459 bus,
460 instance_id,
461 modules,
462 })
463}
464
465#[cfg(feature = "backend-sqlite")]
466fn sqlite_attach_uri(data_dir: &str, module: &str, in_memory: bool) -> String {
467 if in_memory {
468 use std::sync::atomic::{AtomicU64, Ordering};
473 static SEQ: AtomicU64 = AtomicU64::new(0);
474 let suffix = format!(
475 "{}_{}",
476 std::process::id(),
477 SEQ.fetch_add(1, Ordering::Relaxed)
478 );
479 format!("file:assay_{module}_{suffix}?mode=memory&cache=shared")
480 } else {
481 format!("file:{data_dir}/{module}.db?mode=rwc")
482 }
483}
484
485#[cfg(feature = "backend-sqlite")]
486async fn read_or_seed_modules_sqlite(
487 schema: &assay_domain::engine::SqliteEngineSchema,
488 auto_enable: &[String],
489) -> anyhow::Result<Vec<String>> {
490 let existing = schema
491 .list_modules()
492 .await
493 .map_err(|e| anyhow::anyhow!("list engine.modules (sqlite): {e}"))?;
494 let known: std::collections::HashSet<String> =
495 existing.iter().map(|m| m.name.clone()).collect();
496
497 for module in builtin_modules() {
501 if known.contains(module.name) {
502 continue;
503 }
504 let enabled = module.default_enabled
505 || auto_enable.iter().any(|n| n == module.name);
506 schema
507 .upsert_module(module.name, Some(module.version), enabled)
508 .await
509 .map_err(|e| anyhow::anyhow!("seed engine.modules row {}: {e}", module.name))?;
510 }
511
512 let final_list = schema
513 .list_modules()
514 .await
515 .map_err(|e| anyhow::anyhow!("re-list engine.modules (sqlite): {e}"))?;
516 Ok(final_list
517 .into_iter()
518 .filter(|m| m.enabled)
519 .map(|m| m.name)
520 .collect())
521}
522
523#[cfg(feature = "backend-sqlite")]
524async fn record_engine_migration_sqlite(
525 pool: &sqlx::SqlitePool,
526 module: &str,
527 version: i32,
528) -> anyhow::Result<()> {
529 sqlx::query(
530 "INSERT OR IGNORE INTO engine.migrations (module, version)
531 VALUES (?, ?)",
532 )
533 .bind(module)
534 .bind(version)
535 .execute(pool)
536 .await
537 .map_err(|e| anyhow::anyhow!("record engine.migrations row {module}/{version}: {e}"))?;
538 Ok(())
539}
540
541#[cfg(feature = "backend-sqlite")]
542fn spawn_sqlite_instance_lifecycle(pool: sqlx::SqlitePool, id: uuid::Uuid) {
543 use assay_domain::engine::SqliteEngineSchema;
544 let schema = SqliteEngineSchema::new(pool);
545 tokio::spawn(async move {
546 let mut tick = tokio::time::interval(Duration::from_secs(INSTANCE_HEARTBEAT_SECS));
547 loop {
548 tick.tick().await;
549 if let Err(e) = schema.heartbeat_instance(id).await {
550 tracing::warn!(?e, %id, "engine.instances heartbeat failed");
551 }
552 }
553 });
554}
555
556#[cfg(all(test, feature = "backend-sqlite"))]
557mod tests {
558 use super::*;
559
560 #[tokio::test(flavor = "multi_thread")]
565 async fn sqlite_boot_default_runs_auth_migration() {
566 let boot = sqlite_boot(":memory:", &[]).await.expect("boot");
567 assert!(
568 boot.modules.iter().any(|m| m == "auth"),
569 "auth must be in active modules by default; got {:?}",
570 boot.modules
571 );
572 let auth_row: Option<(String,)> = sqlx::query_as(
574 "SELECT module FROM engine.migrations WHERE module = 'auth'",
575 )
576 .fetch_optional(&boot.pool)
577 .await
578 .expect("query engine.migrations");
579 assert!(
580 auth_row.is_some(),
581 "engine.migrations should have an auth row after auto-enabled boot"
582 );
583 let user_count: (i64,) =
586 sqlx::query_as("SELECT COUNT(*) FROM auth.users")
587 .fetch_one(&boot.pool)
588 .await
589 .expect("count auth.users");
590 assert_eq!(user_count.0, 0);
591 }
592}