Skip to main content

assay_engine/
embedded.rs

1//! Embedded-mode entrypoints for `assay-engine`.
2//!
3//! Use case: a parent binary wants to compose engine into its own
4//! [`axum::Router`] rather than running engine standalone via
5//! [`crate::run`].
6//!
7//! ```no_run
8//! # async fn example() -> anyhow::Result<()> {
9//! use assay_engine::embedded;
10//! use assay_engine::config::EngineConfig;
11//! use std::path::Path;
12//!
13//! let cfg = EngineConfig::from_file(Path::new("engine.toml"))?;
14//! let engine = embedded::build(cfg).await?;
15//! // Mount engine.router on parent's listener
16//! // Use engine.pool for parent's queries (engine confines its
17//! //   writes to its own schemas)
18//! # let _ = engine;
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! The standalone [`crate::run`] is implemented as a thin wrapper
24//! around [`build`] + a serve loop; standalone behaviour is unchanged.
25//!
26//! # Compared to the previous (now-closed) PR exposing four
27//! # `pub fn build_*_ctx_*` helpers
28//!
29//! Embedded mode is a first-class concept here: one type, one
30//! function, one symmetric `migrate` helper. Internal ctx-builders
31//! (`build_auth_ctx_{pg,sqlite}`, `build_vault_ctx_{pg,sqlite}`)
32//! stay `pub(crate)` — they are implementation details. Preconditions
33//! that prevent operator lockout (no operator users + no admin
34//! api-keys + no external issuers) are enforced inside [`build`] and
35//! cannot be skipped.
36
37use std::sync::Arc;
38
39use assay_dashboard::{DashboardCtx, WhitelabelConfig};
40use assay_domain::events::EngineEventBus;
41use assay_workflow::{WorkflowStore, WorkflowCtx};
42
43use crate::config::EngineConfig;
44use crate::init::EngineBoot;
45use crate::state::EngineState;
46
47/// Engine composed for embedding into a parent binary. See module
48/// docs.
49///
50/// Marked `#[non_exhaustive]` so future fields (graceful-shutdown
51/// handle, metrics handle, …) can be added without breaking
52/// downstream pattern-matching.
53#[non_exhaustive]
54pub struct EmbeddedEngine {
55    /// Engine's [`axum::Router`]. Mount on parent's listener at root,
56    /// or under a sub-path. URL surface includes:
57    ///   - `/api/v1/engine/*` — engine + per-module APIs
58    ///   - `/auth/*` — OIDC spec endpoints (when auth module enabled)
59    ///   - `/api/v1/vault/*` — vault APIs (when vault module enabled)
60    ///   - `/workflow/`, `/auth/console`, `/engine/console`,
61    ///     `/vault/console` — assay-dashboard SPAs
62    pub router: axum::Router,
63
64    /// Backend-typed pool engine's modules use. Parent may share for
65    /// its own queries; engine confines writes to its own schemas
66    /// (`engine.*`, `workflow.*`, `auth.*`, `vault.*` on PG; per-
67    /// module `.db` files on sqlite via ATTACH).
68    pub pool: EmbeddedPool,
69
70    /// This engine instance's `engine.instances` row id. Surface in
71    /// parent's introspection endpoints if useful.
72    pub instance_id: uuid::Uuid,
73
74    /// Names of modules attached/enabled at boot. Mirrors
75    /// `EngineState::modules` but cloned out so the parent doesn't
76    /// need to keep an `Arc<EngineState>` around.
77    pub modules: Vec<String>,
78
79    /// `assay-engine` crate version.
80    pub engine_version: &'static str,
81}
82
83/// Backend-typed pool. Engine's internal code paths are backend-
84/// specific (PG advisory locks, SQLite ATTACH for multi-module DB
85/// files); we expose typed pools so downstream can dispatch on the
86/// variant explicitly rather than guess.
87///
88/// Marked `#[non_exhaustive]` so future backends (e.g., MySQL) can
89/// be added without breaking exhaustive matches.
90#[non_exhaustive]
91pub enum EmbeddedPool {
92    #[cfg(feature = "backend-postgres")]
93    Postgres(sqlx::PgPool),
94    #[cfg(feature = "backend-sqlite")]
95    Sqlite(sqlx::SqlitePool),
96}
97
98/// Build engine for embedding. Internally:
99///
100///   1. Open pool against `cfg.backend` via [`EngineBoot::run`]
101///      (which also runs engine + per-module schema migrations).
102///   2. Bootstrap module contexts (auth, vault, workflow).
103///   3. **Enforce preconditions** — refuse to start when auth is on
104///      and the deployment has no operator users, no admin api-keys,
105///      and no external OIDC issuers (would lock the operator out).
106///   4. Compose [`axum::Router`] via [`crate::server::build_app`].
107///   5. Spawn engine's background tasks (events outbox cleanup;
108///      module-specific schedulers spawn during ctx construction).
109///
110/// Returns `Err` on any of: pool-open failure, migration failure,
111/// ctx-build failure, precondition failure. The `Err` carries a
112/// helpful operator-facing message when the cause is configuration.
113pub async fn build(cfg: EngineConfig) -> anyhow::Result<EmbeddedEngine> {
114    let boot = EngineBoot::run(&cfg).await?;
115    match boot {
116        #[cfg(feature = "backend-postgres")]
117        EngineBoot::Postgres(b) => build_pg(cfg, b).await,
118        #[cfg(feature = "backend-sqlite")]
119        EngineBoot::Sqlite(b) => build_sqlite(cfg, b).await,
120    }
121}
122
123#[cfg(feature = "backend-postgres")]
124async fn build_pg(
125    cfg: EngineConfig,
126    b: crate::init::PgBoot,
127) -> anyhow::Result<EmbeddedEngine> {
128    let store = assay_workflow::PostgresStore::from_pool(b.pool.clone())
129        .await
130        .map_err(|e| anyhow::anyhow!("workflow store (pg): {e}"))?;
131    let auth_ctx = crate::build_auth_ctx_pg(&cfg, &b.pool).await?;
132    #[cfg(feature = "vault")]
133    let vault_ctx = crate::build_vault_ctx_pg(&b.modules, &b.pool).await?;
134    #[cfg(not(feature = "vault"))]
135    let vault_ctx: Option<()> = None;
136
137    compose(
138        cfg,
139        store,
140        b.bus,
141        b.modules,
142        b.instance_id,
143        Some(auth_ctx),
144        vault_ctx,
145        EmbeddedPool::Postgres(b.pool),
146    )
147    .await
148}
149
150#[cfg(feature = "backend-sqlite")]
151async fn build_sqlite(
152    cfg: EngineConfig,
153    b: crate::init::SqliteBoot,
154) -> anyhow::Result<EmbeddedEngine> {
155    let store = assay_workflow::SqliteStore::from_attached_pool(b.pool.clone())
156        .await
157        .map_err(|e| anyhow::anyhow!("workflow store (sqlite): {e}"))?;
158    let auth_ctx = crate::build_auth_ctx_sqlite(&cfg, &b.pool).await?;
159    #[cfg(feature = "vault")]
160    let vault_ctx = crate::build_vault_ctx_sqlite(&b.modules, &b.pool).await?;
161    #[cfg(not(feature = "vault"))]
162    let vault_ctx: Option<()> = None;
163
164    compose(
165        cfg,
166        store,
167        b.bus,
168        b.modules,
169        b.instance_id,
170        Some(auth_ctx),
171        vault_ctx,
172        EmbeddedPool::Sqlite(b.pool),
173    )
174    .await
175}
176
177/// Common composition step shared between PG + SQLite paths.
178///
179/// Mirrors the body of the previous private `run_with_store` (now
180/// removed) minus the final `server::serve` call: builds the Lua-
181/// VM-equivalent state container, spawns the engine_events cleanup
182/// task, and constructs the [`axum::Router`]. The caller (this
183/// module's `build`, or the standalone `run` wrapper) decides what
184/// to do with the resulting router.
185//
186// 8 args (1 over clippy's default limit) — splitting them into a
187// struct just to placate the lint adds boilerplate without making
188// the call sites clearer (each call site is a single-use match
189// arm). Allow the lint here.
190#[allow(clippy::too_many_arguments)]
191async fn compose<S: WorkflowStore + Clone + 'static>(
192    cfg: EngineConfig,
193    store: S,
194    bus: Arc<dyn EngineEventBus>,
195    modules: Vec<String>,
196    instance_id: uuid::Uuid,
197    auth_ctx: Option<assay_auth::AuthCtx>,
198    #[cfg(feature = "vault")] vault_ctx: Option<assay_vault::VaultCtx>,
199    #[cfg(not(feature = "vault"))] _vault_ctx: Option<()>,
200    pool: EmbeddedPool,
201) -> anyhow::Result<EmbeddedEngine> {
202    // Precondition: refuse to start when auth is on and no operator
203    // user / api-key / external issuer is configured. Same logic as
204    // the previous run_with_store, lifted unchanged.
205    if let Some(auth) = auth_ctx.as_ref() {
206        let user_count = auth
207            .users
208            .count_users(None)
209            .await
210            .map_err(|e| anyhow::anyhow!("count auth.users: {e}"))?;
211        if user_count == 0
212            && cfg.auth.admin_api_keys.is_empty()
213            && cfg.auth.external_issuers().is_empty()
214        {
215            anyhow::bail!(
216                "engine refuses to start: no operator users exist, \
217                 `auth.admin_api_keys` is empty, and no external issuers \
218                 are configured. Either run `assay-engine bootstrap-admin \
219                 --email <e> --password <p>` to seed the first user, add \
220                 at least one entry to `auth.admin_api_keys` in \
221                 engine.toml as a break-glass, or configure \
222                 `[[auth.external_issuers]]` with an upstream OIDC \
223                 provider (e.g. Hydra) that mints the JWTs your callers \
224                 forward."
225            );
226        }
227    }
228
229    let workflow_ctx: Arc<WorkflowCtx<S>> =
230        crate::server::build_workflow_ctx_with_bus(store, Arc::clone(&bus));
231
232    // Hourly sweep of the engine_events outbox. Detached — the handle
233    // lives for the process lifetime; nothing to await for clean
234    // shutdown (prune is idempotent, missed tick is fine).
235    tokio::spawn(assay_workflow::events_cleanup::run_events_cleanup(
236        Arc::clone(&bus),
237        std::time::Duration::from_secs(3600),
238        cfg.engine_events_ttl_secs,
239    ));
240
241    let whitelabel = Arc::new(WhitelabelConfig::from_env());
242    let asset_version = env!("CARGO_PKG_VERSION").to_string();
243    let dashboard_ctx = Arc::new(DashboardCtx::new(whitelabel, asset_version));
244    let admin_api_keys = Arc::new(cfg.auth.admin_api_keys.clone());
245    let started_at = std::time::SystemTime::now()
246        .duration_since(std::time::UNIX_EPOCH)
247        .unwrap_or_default()
248        .as_secs_f64();
249    let engine_config = Arc::new(cfg);
250
251    let state = EngineState {
252        workflow: workflow_ctx,
253        dashboard: dashboard_ctx,
254        auth: auth_ctx,
255        #[cfg(feature = "vault")]
256        vault: vault_ctx,
257        admin_api_keys,
258        modules: Arc::new(modules.clone()),
259        instance_id,
260        engine_version: env!("CARGO_PKG_VERSION"),
261        started_at,
262        engine_config,
263    };
264
265    let router = crate::server::build_app(state);
266
267    Ok(EmbeddedEngine {
268        router,
269        pool,
270        instance_id,
271        modules,
272        engine_version: env!("CARGO_PKG_VERSION"),
273    })
274}
275
276/// Run engine + per-module schema migrations against the configured
277/// backend. No runtime state is built, no background tasks are
278/// spawned. Idempotent — calling on an up-to-date schema is a no-op.
279///
280/// Used by parent binaries that want a `migrate` subcommand without
281/// booting workflow scheduler / vault unseal / etc. Equivalent to
282/// `EngineBoot::run(cfg).await?;` with a more discoverable name.
283pub async fn migrate(cfg: &EngineConfig) -> anyhow::Result<()> {
284    let _boot = EngineBoot::run(cfg).await?;
285    Ok(())
286}