Skip to main content

actionqueue_runtime/
engine.rs

1//! Embedded API surface for the ActionQueue runtime.
2//!
3//! Provides [`ActionQueueEngine`] as the primary entry point for embedding
4//! ActionQueue as a library in Rust applications.
5
6use actionqueue_core::task::task_spec::TaskSpec;
7use actionqueue_engine::time::clock::{Clock, SystemClock};
8use actionqueue_executor_local::handler::ExecutorHandler;
9use actionqueue_storage::recovery::bootstrap::{
10    load_projection_from_storage, RecoveryBootstrapError,
11};
12use actionqueue_storage::recovery::reducer::ReplayReducer;
13use actionqueue_storage::wal::fs_writer::WalFsWriter;
14use actionqueue_storage::wal::writer::InstrumentedWalWriter;
15use tracing;
16
17use crate::config::RuntimeConfig;
18use crate::dispatch::{DispatchError, DispatchLoop, RunSummary, TickResult};
19
20/// Errors that can occur during engine bootstrap.
21#[derive(Debug)]
22pub enum BootstrapError {
23    /// Configuration is invalid.
24    Config(crate::config::ConfigError),
25    /// Storage recovery failed.
26    Recovery(RecoveryBootstrapError),
27    /// Directory creation failed.
28    Io(String),
29    /// Dispatch loop initialization failed.
30    Dispatch(DispatchError),
31}
32
33impl std::fmt::Display for BootstrapError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            BootstrapError::Config(e) => write!(f, "config error: {e}"),
37            BootstrapError::Recovery(e) => write!(f, "recovery error: {e}"),
38            BootstrapError::Io(e) => write!(f, "I/O error: {e}"),
39            BootstrapError::Dispatch(e) => write!(f, "dispatch init error: {e}"),
40        }
41    }
42}
43
44impl std::error::Error for BootstrapError {}
45
46/// Errors that can occur during engine operations.
47#[derive(Debug)]
48pub enum EngineError {
49    /// Dispatch loop error.
50    Dispatch(DispatchError),
51}
52
53impl std::fmt::Display for EngineError {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            EngineError::Dispatch(e) => write!(f, "dispatch error: {e}"),
57        }
58    }
59}
60
61impl std::error::Error for EngineError {}
62
63impl From<DispatchError> for EngineError {
64    fn from(e: DispatchError) -> Self {
65        EngineError::Dispatch(e)
66    }
67}
68
69/// Pre-bootstrap engine configuration holder.
70pub struct ActionQueueEngine<H: ExecutorHandler> {
71    config: RuntimeConfig,
72    handler: H,
73}
74
75impl<H: ExecutorHandler + 'static> ActionQueueEngine<H> {
76    /// Creates a new engine with the given configuration and handler.
77    pub fn new(config: RuntimeConfig, handler: H) -> Self {
78        Self { config, handler }
79    }
80
81    /// Bootstraps the engine by recovering state from storage and
82    /// constructing the dispatch loop with the system clock.
83    pub fn bootstrap(self) -> Result<BootstrappedEngine<H, SystemClock>, BootstrapError> {
84        self.bootstrap_with_clock(SystemClock)
85    }
86
87    /// Bootstraps with an explicit clock (for testing).
88    pub fn bootstrap_with_clock<C: Clock>(
89        self,
90        clock: C,
91    ) -> Result<BootstrappedEngine<H, C>, BootstrapError> {
92        self.config.validate().map_err(BootstrapError::Config)?;
93
94        let data_dir = self.config.data_dir.display().to_string();
95        tracing::info!(data_dir, "bootstrapping engine");
96
97        // Ensure data directory exists
98        std::fs::create_dir_all(&self.config.data_dir)
99            .map_err(|e| BootstrapError::Io(e.to_string()))?;
100
101        // Recover from storage
102        let recovery = load_projection_from_storage(&self.config.data_dir)
103            .map_err(BootstrapError::Recovery)?;
104
105        // Build mutation authority
106        let authority = actionqueue_storage::mutation::authority::StorageMutationAuthority::new(
107            recovery.wal_writer,
108            recovery.projection,
109        );
110
111        // Compute snapshot path — must match bootstrap.rs snapshot_dir / "snapshot.bin"
112        let snapshot_path = self
113            .config
114            .snapshot_event_threshold
115            .map(|_| self.config.data_dir.join("snapshots").join("snapshot.bin"));
116
117        // Build dispatch loop
118        let dispatch = DispatchLoop::new(
119            authority,
120            self.handler,
121            clock,
122            crate::dispatch::DispatchConfig::new(
123                self.config.backoff_strategy.clone(),
124                self.config.dispatch_concurrency.get(),
125                self.config.lease_timeout_secs,
126                snapshot_path,
127                self.config.snapshot_event_threshold,
128            ),
129        )
130        .map_err(BootstrapError::Dispatch)?;
131
132        tracing::info!(data_dir, "engine bootstrap complete");
133        Ok(BootstrappedEngine { dispatch })
134    }
135}
136
137/// A bootstrapped engine ready for task submission and execution.
138pub struct BootstrappedEngine<H: ExecutorHandler + 'static, C: Clock = SystemClock> {
139    dispatch: DispatchLoop<InstrumentedWalWriter<WalFsWriter>, H, C>,
140}
141
142impl<H: ExecutorHandler + 'static, C: Clock> BootstrappedEngine<H, C> {
143    /// Submits a new task specification for execution.
144    pub fn submit_task(&mut self, spec: TaskSpec) -> Result<(), EngineError> {
145        let task_id = spec.id();
146        tracing::debug!(%task_id, "submit_task");
147        self.dispatch.submit_task(spec).map_err(EngineError::Dispatch)
148    }
149
150    /// Advances the dispatch loop by one tick.
151    pub async fn tick(&mut self) -> Result<TickResult, EngineError> {
152        self.dispatch.tick().await.map_err(EngineError::Dispatch)
153    }
154
155    /// Runs the dispatch loop until no work remains.
156    pub async fn run_until_idle(&mut self) -> Result<RunSummary, EngineError> {
157        self.dispatch.run_until_idle().await.map_err(EngineError::Dispatch)
158    }
159
160    /// Returns a reference to the current projection state.
161    pub fn projection(&self) -> &ReplayReducer {
162        self.dispatch.projection()
163    }
164
165    /// Declares a DAG dependency.
166    pub fn declare_dependency(
167        &mut self,
168        task_id: actionqueue_core::ids::TaskId,
169        prereqs: Vec<actionqueue_core::ids::TaskId>,
170    ) -> Result<(), EngineError> {
171        tracing::debug!(%task_id, prereq_count = prereqs.len(), "declare_dependency");
172        self.dispatch.declare_dependency(task_id, prereqs).map_err(EngineError::Dispatch)
173    }
174
175    /// Allocates a token budget for a task/dimension pair.
176    #[cfg(feature = "budget")]
177    pub fn allocate_budget(
178        &mut self,
179        task_id: actionqueue_core::ids::TaskId,
180        dimension: actionqueue_core::budget::BudgetDimension,
181        limit: u64,
182    ) -> Result<(), EngineError> {
183        tracing::debug!(%task_id, ?dimension, limit, "allocate_budget");
184        self.dispatch.allocate_budget(task_id, dimension, limit).map_err(EngineError::Dispatch)
185    }
186
187    /// Replenishes an exhausted budget dimension for a task.
188    #[cfg(feature = "budget")]
189    pub fn replenish_budget(
190        &mut self,
191        task_id: actionqueue_core::ids::TaskId,
192        dimension: actionqueue_core::budget::BudgetDimension,
193        new_limit: u64,
194    ) -> Result<(), EngineError> {
195        tracing::debug!(%task_id, ?dimension, new_limit, "replenish_budget");
196        self.dispatch.replenish_budget(task_id, dimension, new_limit).map_err(EngineError::Dispatch)
197    }
198
199    /// Query remaining budget for a task on a specific dimension.
200    ///
201    /// Returns `None` if no budget is allocated for this task+dimension.
202    #[cfg(feature = "budget")]
203    pub fn budget_remaining(
204        &self,
205        task_id: actionqueue_core::ids::TaskId,
206        dimension: actionqueue_core::budget::BudgetDimension,
207    ) -> Option<u64> {
208        self.dispatch.budget_remaining(task_id, dimension)
209    }
210
211    /// Check if a budget dimension is exhausted for a task.
212    ///
213    /// Returns `false` if no budget is allocated (no budget = no limit).
214    #[cfg(feature = "budget")]
215    pub fn is_budget_exhausted(
216        &self,
217        task_id: actionqueue_core::ids::TaskId,
218        dimension: actionqueue_core::budget::BudgetDimension,
219    ) -> bool {
220        self.dispatch.is_budget_exhausted(task_id, dimension)
221    }
222
223    /// Resumes a suspended run (transitions Suspended → Ready).
224    #[cfg(feature = "budget")]
225    pub fn resume_run(&mut self, run_id: actionqueue_core::ids::RunId) -> Result<(), EngineError> {
226        tracing::debug!(%run_id, "resume_run");
227        self.dispatch.resume_run(run_id).map_err(EngineError::Dispatch)
228    }
229
230    /// Creates a new event subscription.
231    ///
232    /// Returns the generated `SubscriptionId` which the caller may use to
233    /// inspect the subscription state later.
234    #[cfg(feature = "budget")]
235    pub fn create_subscription(
236        &mut self,
237        task_id: actionqueue_core::ids::TaskId,
238        filter: actionqueue_core::subscription::EventFilter,
239    ) -> Result<actionqueue_core::subscription::SubscriptionId, EngineError> {
240        tracing::debug!(%task_id, "create_subscription");
241        self.dispatch.create_subscription(task_id, filter).map_err(EngineError::Dispatch)
242    }
243
244    /// Fires a custom event, triggering any matching subscriptions.
245    ///
246    /// Subscriptions with a `Custom { key }` filter matching the event key
247    /// are triggered. Triggered subscriptions promote their task's Scheduled
248    /// runs on the next tick.
249    #[cfg(feature = "budget")]
250    pub fn fire_custom_event(&mut self, key: String) -> Result<(), EngineError> {
251        tracing::debug!(key, "fire_custom_event");
252        self.dispatch.fire_custom_event(key).map_err(EngineError::Dispatch)
253    }
254
255    // ── Actor feature ──────────────────────────────────────────────────────
256
257    /// Registers a remote actor with the hub.
258    #[cfg(feature = "actor")]
259    pub fn register_actor(
260        &mut self,
261        registration: actionqueue_core::actor::ActorRegistration,
262    ) -> Result<(), EngineError> {
263        let actor_id = registration.actor_id();
264        tracing::debug!(%actor_id, "register_actor");
265        self.dispatch.register_actor(registration).map_err(EngineError::Dispatch)
266    }
267
268    /// Deregisters a remote actor from the hub.
269    #[cfg(feature = "actor")]
270    pub fn deregister_actor(
271        &mut self,
272        actor_id: actionqueue_core::ids::ActorId,
273    ) -> Result<(), EngineError> {
274        tracing::debug!(%actor_id, "deregister_actor");
275        self.dispatch.deregister_actor(actor_id).map_err(EngineError::Dispatch)
276    }
277
278    /// Records an actor heartbeat.
279    #[cfg(feature = "actor")]
280    pub fn actor_heartbeat(
281        &mut self,
282        actor_id: actionqueue_core::ids::ActorId,
283    ) -> Result<(), EngineError> {
284        tracing::trace!(%actor_id, "actor_heartbeat");
285        self.dispatch.actor_heartbeat(actor_id).map_err(EngineError::Dispatch)
286    }
287
288    /// Returns a reference to the actor registry.
289    #[cfg(feature = "actor")]
290    pub fn actor_registry(&self) -> &actionqueue_actor::ActorRegistry {
291        self.dispatch.actor_registry()
292    }
293
294    // ── Platform feature ───────────────────────────────────────────────────
295
296    /// Creates a new organizational tenant.
297    #[cfg(feature = "platform")]
298    pub fn create_tenant(
299        &mut self,
300        registration: actionqueue_core::platform::TenantRegistration,
301    ) -> Result<(), EngineError> {
302        let tenant_id = registration.tenant_id();
303        tracing::debug!(%tenant_id, "create_tenant");
304        self.dispatch.create_tenant(registration).map_err(EngineError::Dispatch)
305    }
306
307    /// Assigns a role to an actor within a tenant.
308    #[cfg(feature = "platform")]
309    pub fn assign_role(
310        &mut self,
311        actor_id: actionqueue_core::ids::ActorId,
312        role: actionqueue_core::platform::Role,
313        tenant_id: actionqueue_core::ids::TenantId,
314    ) -> Result<(), EngineError> {
315        tracing::debug!(%actor_id, ?role, %tenant_id, "assign_role");
316        self.dispatch.assign_role(actor_id, role, tenant_id).map_err(EngineError::Dispatch)
317    }
318
319    /// Grants a capability to an actor within a tenant.
320    #[cfg(feature = "platform")]
321    pub fn grant_capability(
322        &mut self,
323        actor_id: actionqueue_core::ids::ActorId,
324        capability: actionqueue_core::platform::Capability,
325        tenant_id: actionqueue_core::ids::TenantId,
326    ) -> Result<(), EngineError> {
327        tracing::debug!(%actor_id, ?capability, %tenant_id, "grant_capability");
328        self.dispatch
329            .grant_capability(actor_id, capability, tenant_id)
330            .map_err(EngineError::Dispatch)
331    }
332
333    /// Revokes a capability from an actor within a tenant.
334    #[cfg(feature = "platform")]
335    pub fn revoke_capability(
336        &mut self,
337        actor_id: actionqueue_core::ids::ActorId,
338        capability: actionqueue_core::platform::Capability,
339        tenant_id: actionqueue_core::ids::TenantId,
340    ) -> Result<(), EngineError> {
341        tracing::debug!(%actor_id, ?capability, %tenant_id, "revoke_capability");
342        self.dispatch
343            .revoke_capability(actor_id, capability, tenant_id)
344            .map_err(EngineError::Dispatch)
345    }
346
347    /// Appends an entry to the organizational ledger.
348    #[cfg(feature = "platform")]
349    pub fn append_ledger_entry(
350        &mut self,
351        entry: actionqueue_core::platform::LedgerEntry,
352    ) -> Result<(), EngineError> {
353        tracing::debug!(
354            entry_id = %entry.entry_id(),
355            tenant_id = %entry.tenant_id(),
356            ledger_key = entry.ledger_key(),
357            "append_ledger_entry"
358        );
359        self.dispatch.append_ledger_entry(entry).map_err(EngineError::Dispatch)
360    }
361
362    /// Returns a reference to the append ledger.
363    #[cfg(feature = "platform")]
364    pub fn ledger(&self) -> &actionqueue_platform::AppendLedger {
365        self.dispatch.ledger()
366    }
367
368    /// Returns a reference to the RBAC enforcer.
369    #[cfg(feature = "platform")]
370    pub fn rbac(&self) -> &actionqueue_platform::RbacEnforcer {
371        self.dispatch.rbac()
372    }
373
374    /// Returns a reference to the tenant registry.
375    #[cfg(feature = "platform")]
376    pub fn tenant_registry(&self) -> &actionqueue_platform::TenantRegistry {
377        self.dispatch.tenant_registry()
378    }
379
380    /// Gracefully drains in-flight work and then shuts down.
381    ///
382    /// Stops promoting and dispatching new work, but continues processing
383    /// in-flight results and heartbeating leases until all in-flight work
384    /// completes or the timeout expires. The WAL writer is flushed and
385    /// closed on drop of the underlying authority.
386    pub async fn drain_and_shutdown(
387        mut self,
388        timeout: std::time::Duration,
389    ) -> Result<(), EngineError> {
390        tracing::info!(timeout_secs = timeout.as_secs(), "drain_and_shutdown starting");
391        let _ = self.dispatch.drain_until_idle(timeout).await?;
392        tracing::info!("drain_and_shutdown complete");
393        Ok(())
394    }
395
396    /// Shuts down the engine immediately, consuming it.
397    ///
398    /// In-flight workers will continue to completion in their tokio blocking tasks.
399    /// Worker results for in-flight runs are lost; those runs will recover via
400    /// lease expiry on the next bootstrap. The WAL writer is flushed and closed
401    /// on drop of the underlying authority.
402    ///
403    /// For orderly drain of in-flight work, use `drain_and_shutdown()` instead.
404    pub fn shutdown(self) -> Result<(), EngineError> {
405        tracing::info!("engine shutdown");
406        // Authority (and its WAL writer) are dropped here, triggering flush.
407        Ok(())
408    }
409}