assay_engine/embedded.rs
1//! Embedded-mode entrypoints for `assay-engine`.
2//!
3//! Use case: a parent binary wants to compose engine into its own
4//! [`axum::Router`] rather than running engine standalone via
5//! [`crate::run`].
6//!
7//! ```no_run
8//! # async fn example() -> anyhow::Result<()> {
9//! use assay_engine::embedded;
10//! use assay_engine::config::EngineConfig;
11//! use std::path::Path;
12//!
13//! let cfg = EngineConfig::from_file(Path::new("engine.toml"))?;
14//! let engine = embedded::build(cfg).await?;
15//! // Mount engine.router on parent's listener
16//! // Use engine.pool for parent's queries (engine confines its
17//! // writes to its own schemas)
18//! # let _ = engine;
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! The standalone [`crate::run`] is implemented as a thin wrapper
24//! around [`build`] + a serve loop; standalone behaviour is unchanged.
25//!
26//! # Compared to the previous (now-closed) PR exposing four
27//! # `pub fn build_*_ctx_*` helpers
28//!
29//! Embedded mode is a first-class concept here: one type, one
30//! function, one symmetric `migrate` helper. Internal ctx-builders
31//! (`build_auth_ctx_{pg,sqlite}`, `build_vault_ctx_{pg,sqlite}`)
32//! stay `pub(crate)` — they are implementation details. Preconditions
33//! that prevent operator lockout (no operator users + no admin
34//! api-keys + no external issuers) are enforced inside [`build`] and
35//! cannot be skipped.
36
37use std::sync::Arc;
38
39use assay_dashboard::{DashboardCtx, WhitelabelConfig};
40use assay_domain::events::EngineEventBus;
41use assay_workflow::{WorkflowStore, WorkflowCtx};
42
43use crate::config::EngineConfig;
44use crate::init::EngineBoot;
45use crate::state::EngineState;
46
47/// Engine composed for embedding into a parent binary. See module
48/// docs.
49///
50/// Marked `#[non_exhaustive]` so future fields (graceful-shutdown
51/// handle, metrics handle, …) can be added without breaking
52/// downstream pattern-matching.
53#[non_exhaustive]
54pub struct EmbeddedEngine {
55 /// Engine's [`axum::Router`]. Mount on parent's listener at root,
56 /// or under a sub-path. URL surface includes:
57 /// - `/api/v1/engine/*` — engine + per-module APIs
58 /// - `/auth/*` — OIDC spec endpoints (when auth module enabled)
59 /// - `/api/v1/vault/*` — vault APIs (when vault module enabled)
60 /// - `/workflow/`, `/auth/console`, `/engine/console`,
61 /// `/vault/console` — assay-dashboard SPAs
62 pub router: axum::Router,
63
64 /// Backend-typed pool engine's modules use. Parent may share for
65 /// its own queries; engine confines writes to its own schemas
66 /// (`engine.*`, `workflow.*`, `auth.*`, `vault.*` on PG; per-
67 /// module `.db` files on sqlite via ATTACH).
68 pub pool: EmbeddedPool,
69
70 /// This engine instance's `engine.instances` row id. Surface in
71 /// parent's introspection endpoints if useful.
72 pub instance_id: uuid::Uuid,
73
74 /// Names of modules attached/enabled at boot. Mirrors
75 /// `EngineState::modules` but cloned out so the parent doesn't
76 /// need to keep an `Arc<EngineState>` around.
77 pub modules: Vec<String>,
78
79 /// `assay-engine` crate version.
80 pub engine_version: &'static str,
81}
82
83/// Backend-typed pool. Engine's internal code paths are backend-
84/// specific (PG advisory locks, SQLite ATTACH for multi-module DB
85/// files); we expose typed pools so downstream can dispatch on the
86/// variant explicitly rather than guess.
87///
88/// Marked `#[non_exhaustive]` so future backends (e.g., MySQL) can
89/// be added without breaking exhaustive matches.
90#[non_exhaustive]
91pub enum EmbeddedPool {
92 #[cfg(feature = "backend-postgres")]
93 Postgres(sqlx::PgPool),
94 #[cfg(feature = "backend-sqlite")]
95 Sqlite(sqlx::SqlitePool),
96}
97
98/// Build engine for embedding. Internally:
99///
100/// 1. Open pool against `cfg.backend` via [`EngineBoot::run`]
101/// (which also runs engine + per-module schema migrations).
102/// 2. Bootstrap module contexts (auth, vault, workflow).
103/// 3. **Enforce preconditions** — refuse to start when auth is on
104/// and the deployment has no operator users, no admin api-keys,
105/// and no external OIDC issuers (would lock the operator out).
106/// 4. Compose [`axum::Router`] via [`crate::server::build_app`].
107/// 5. Spawn engine's background tasks (events outbox cleanup;
108/// module-specific schedulers spawn during ctx construction).
109///
110/// Returns `Err` on any of: pool-open failure, migration failure,
111/// ctx-build failure, precondition failure. The `Err` carries a
112/// helpful operator-facing message when the cause is configuration.
113pub async fn build(cfg: EngineConfig) -> anyhow::Result<EmbeddedEngine> {
114 let boot = EngineBoot::run(&cfg).await?;
115 match boot {
116 #[cfg(feature = "backend-postgres")]
117 EngineBoot::Postgres(b) => build_pg(cfg, b).await,
118 #[cfg(feature = "backend-sqlite")]
119 EngineBoot::Sqlite(b) => build_sqlite(cfg, b).await,
120 }
121}
122
123#[cfg(feature = "backend-postgres")]
124async fn build_pg(
125 cfg: EngineConfig,
126 b: crate::init::PgBoot,
127) -> anyhow::Result<EmbeddedEngine> {
128 let store = assay_workflow::PostgresStore::from_pool(b.pool.clone())
129 .await
130 .map_err(|e| anyhow::anyhow!("workflow store (pg): {e}"))?;
131 let auth_ctx = crate::build_auth_ctx_pg(&cfg, &b.pool).await?;
132 #[cfg(feature = "vault")]
133 let vault_ctx = crate::build_vault_ctx_pg(&b.modules, &b.pool).await?;
134 #[cfg(not(feature = "vault"))]
135 let vault_ctx: Option<()> = None;
136
137 compose(
138 cfg,
139 store,
140 b.bus,
141 b.modules,
142 b.instance_id,
143 Some(auth_ctx),
144 vault_ctx,
145 EmbeddedPool::Postgres(b.pool),
146 )
147 .await
148}
149
150#[cfg(feature = "backend-sqlite")]
151async fn build_sqlite(
152 cfg: EngineConfig,
153 b: crate::init::SqliteBoot,
154) -> anyhow::Result<EmbeddedEngine> {
155 let store = assay_workflow::SqliteStore::from_attached_pool(b.pool.clone())
156 .await
157 .map_err(|e| anyhow::anyhow!("workflow store (sqlite): {e}"))?;
158 let auth_ctx = crate::build_auth_ctx_sqlite(&cfg, &b.pool).await?;
159 #[cfg(feature = "vault")]
160 let vault_ctx = crate::build_vault_ctx_sqlite(&b.modules, &b.pool).await?;
161 #[cfg(not(feature = "vault"))]
162 let vault_ctx: Option<()> = None;
163
164 compose(
165 cfg,
166 store,
167 b.bus,
168 b.modules,
169 b.instance_id,
170 Some(auth_ctx),
171 vault_ctx,
172 EmbeddedPool::Sqlite(b.pool),
173 )
174 .await
175}
176
177/// Common composition step shared between PG + SQLite paths.
178///
179/// Mirrors the body of the previous private `run_with_store` (now
180/// removed) minus the final `server::serve` call: builds the Lua-
181/// VM-equivalent state container, spawns the engine_events cleanup
182/// task, and constructs the [`axum::Router`]. The caller (this
183/// module's `build`, or the standalone `run` wrapper) decides what
184/// to do with the resulting router.
185//
186// 8 args (1 over clippy's default limit) — splitting them into a
187// struct just to placate the lint adds boilerplate without making
188// the call sites clearer (each call site is a single-use match
189// arm). Allow the lint here.
190#[allow(clippy::too_many_arguments)]
191async fn compose<S: WorkflowStore + Clone + 'static>(
192 cfg: EngineConfig,
193 store: S,
194 bus: Arc<dyn EngineEventBus>,
195 modules: Vec<String>,
196 instance_id: uuid::Uuid,
197 auth_ctx: Option<assay_auth::AuthCtx>,
198 #[cfg(feature = "vault")] vault_ctx: Option<assay_vault::VaultCtx>,
199 #[cfg(not(feature = "vault"))] _vault_ctx: Option<()>,
200 pool: EmbeddedPool,
201) -> anyhow::Result<EmbeddedEngine> {
202 // Precondition: refuse to start when auth is on and no operator
203 // user / api-key / external issuer is configured. Same logic as
204 // the previous run_with_store, lifted unchanged.
205 if let Some(auth) = auth_ctx.as_ref() {
206 let user_count = auth
207 .users
208 .count_users(None)
209 .await
210 .map_err(|e| anyhow::anyhow!("count auth.users: {e}"))?;
211 if user_count == 0
212 && cfg.auth.admin_api_keys.is_empty()
213 && cfg.auth.external_issuers().is_empty()
214 {
215 anyhow::bail!(
216 "engine refuses to start: no operator users exist, \
217 `auth.admin_api_keys` is empty, and no external issuers \
218 are configured. Either run `assay-engine bootstrap-admin \
219 --email <e> --password <p>` to seed the first user, add \
220 at least one entry to `auth.admin_api_keys` in \
221 engine.toml as a break-glass, or configure \
222 `[[auth.external_issuers]]` with an upstream OIDC \
223 provider (e.g. Hydra) that mints the JWTs your callers \
224 forward."
225 );
226 }
227 }
228
229 let workflow_ctx: Arc<WorkflowCtx<S>> =
230 crate::server::build_workflow_ctx_with_bus(store, Arc::clone(&bus));
231
232 // Hourly sweep of the engine_events outbox. Detached — the handle
233 // lives for the process lifetime; nothing to await for clean
234 // shutdown (prune is idempotent, missed tick is fine).
235 tokio::spawn(assay_workflow::events_cleanup::run_events_cleanup(
236 Arc::clone(&bus),
237 std::time::Duration::from_secs(3600),
238 cfg.engine_events_ttl_secs,
239 ));
240
241 let whitelabel = Arc::new(WhitelabelConfig::from_env());
242 let asset_version = env!("CARGO_PKG_VERSION").to_string();
243 let dashboard_ctx = Arc::new(DashboardCtx::new(whitelabel, asset_version));
244 let admin_api_keys = Arc::new(cfg.auth.admin_api_keys.clone());
245 let started_at = std::time::SystemTime::now()
246 .duration_since(std::time::UNIX_EPOCH)
247 .unwrap_or_default()
248 .as_secs_f64();
249 let engine_config = Arc::new(cfg);
250
251 let state = EngineState {
252 workflow: workflow_ctx,
253 dashboard: dashboard_ctx,
254 auth: auth_ctx,
255 #[cfg(feature = "vault")]
256 vault: vault_ctx,
257 admin_api_keys,
258 modules: Arc::new(modules.clone()),
259 instance_id,
260 engine_version: env!("CARGO_PKG_VERSION"),
261 started_at,
262 engine_config,
263 };
264
265 let router = crate::server::build_app(state);
266
267 Ok(EmbeddedEngine {
268 router,
269 pool,
270 instance_id,
271 modules,
272 engine_version: env!("CARGO_PKG_VERSION"),
273 })
274}
275
276/// Run engine + per-module schema migrations against the configured
277/// backend. No runtime state is built, no background tasks are
278/// spawned. Idempotent — calling on an up-to-date schema is a no-op.
279///
280/// Used by parent binaries that want a `migrate` subcommand without
281/// booting workflow scheduler / vault unseal / etc. Equivalent to
282/// `EngineBoot::run(cfg).await?;` with a more discoverable name.
283pub async fn migrate(cfg: &EngineConfig) -> anyhow::Result<()> {
284 let _boot = EngineBoot::run(cfg).await?;
285 Ok(())
286}