Skip to main content

actionqueue_runtime/
engine.rs

1//! Embedded API surface for the ActionQueue runtime.
2//!
3//! Provides [`ActionQueueEngine`] as the primary entry point for embedding
4//! ActionQueue as a library in Rust applications.
5
6use actionqueue_core::task::task_spec::TaskSpec;
7use actionqueue_engine::time::clock::{Clock, SystemClock};
8use actionqueue_executor_local::handler::ExecutorHandler;
9use actionqueue_storage::recovery::bootstrap::{
10    load_projection_from_storage, RecoveryBootstrapError,
11};
12use actionqueue_storage::recovery::reducer::ReplayReducer;
13use actionqueue_storage::wal::fs_writer::WalFsWriter;
14use actionqueue_storage::wal::writer::InstrumentedWalWriter;
15use tracing;
16
17use crate::config::RuntimeConfig;
18use crate::dispatch::{DispatchError, DispatchLoop, RunSummary, TickResult};
19
20/// Errors that can occur during engine bootstrap.
21#[derive(Debug)]
22pub enum BootstrapError {
23    /// Configuration is invalid.
24    Config(crate::config::ConfigError),
25    /// Storage recovery failed.
26    Recovery(RecoveryBootstrapError),
27    /// Directory creation failed.
28    Io(String),
29    /// Dispatch loop initialization failed.
30    Dispatch(DispatchError),
31}
32
33impl std::fmt::Display for BootstrapError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            BootstrapError::Config(e) => write!(f, "config error: {e}"),
37            BootstrapError::Recovery(e) => write!(f, "recovery error: {e}"),
38            BootstrapError::Io(e) => write!(f, "I/O error: {e}"),
39            BootstrapError::Dispatch(e) => write!(f, "dispatch init error: {e}"),
40        }
41    }
42}
43
44impl std::error::Error for BootstrapError {}
45
46/// Errors that can occur during engine operations.
47#[derive(Debug)]
48pub enum EngineError {
49    /// Dispatch loop error.
50    Dispatch(DispatchError),
51}
52
53impl std::fmt::Display for EngineError {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            EngineError::Dispatch(e) => write!(f, "dispatch error: {e}"),
57        }
58    }
59}
60
61impl std::error::Error for EngineError {}
62
63impl From<DispatchError> for EngineError {
64    fn from(e: DispatchError) -> Self {
65        EngineError::Dispatch(e)
66    }
67}
68
69/// Pre-bootstrap engine configuration holder.
70pub struct ActionQueueEngine<H: ExecutorHandler> {
71    config: RuntimeConfig,
72    handler: H,
73}
74
75impl<H: ExecutorHandler + 'static> ActionQueueEngine<H> {
76    /// Creates a new engine with the given configuration and handler.
77    pub fn new(config: RuntimeConfig, handler: H) -> Self {
78        Self { config, handler }
79    }
80
81    /// Bootstraps the engine by recovering state from storage and
82    /// constructing the dispatch loop with the system clock.
83    pub fn bootstrap(self) -> Result<BootstrappedEngine<H, SystemClock>, BootstrapError> {
84        self.bootstrap_with_clock(SystemClock)
85    }
86
87    /// Bootstraps with an explicit clock (for testing).
88    pub fn bootstrap_with_clock<C: Clock>(
89        self,
90        clock: C,
91    ) -> Result<BootstrappedEngine<H, C>, BootstrapError> {
92        self.config.validate().map_err(BootstrapError::Config)?;
93
94        let data_dir = self.config.data_dir.display().to_string();
95        tracing::info!(data_dir, "bootstrapping engine");
96
97        // Ensure data directory exists
98        std::fs::create_dir_all(&self.config.data_dir)
99            .map_err(|e| BootstrapError::Io(e.to_string()))?;
100
101        // Recover from storage
102        let recovery = load_projection_from_storage(&self.config.data_dir)
103            .map_err(BootstrapError::Recovery)?;
104
105        // Build mutation authority
106        let authority = actionqueue_storage::mutation::authority::StorageMutationAuthority::new(
107            recovery.wal_writer,
108            recovery.projection,
109        );
110
111        // Compute snapshot path — must match bootstrap.rs snapshot_dir / "snapshot.bin"
112        let snapshot_path = self
113            .config
114            .snapshot_event_threshold
115            .map(|_| self.config.data_dir.join("snapshots").join("snapshot.bin"));
116
117        // Build dispatch loop
118        let dispatch = DispatchLoop::new(
119            authority,
120            self.handler,
121            clock,
122            crate::dispatch::DispatchConfig::new(
123                self.config.backoff_strategy.clone(),
124                self.config.dispatch_concurrency.get(),
125                self.config.lease_timeout_secs,
126                snapshot_path,
127                self.config.snapshot_event_threshold,
128            ),
129        )
130        .map_err(BootstrapError::Dispatch)?;
131
132        tracing::info!(data_dir, "engine bootstrap complete");
133        Ok(BootstrappedEngine { dispatch })
134    }
135}
136
137/// A bootstrapped engine ready for task submission and execution.
138pub struct BootstrappedEngine<H: ExecutorHandler + 'static, C: Clock = SystemClock> {
139    dispatch: DispatchLoop<InstrumentedWalWriter<WalFsWriter>, H, C>,
140}
141
142impl<H: ExecutorHandler + 'static, C: Clock> BootstrappedEngine<H, C> {
143    /// Submits a new task specification for execution.
144    pub fn submit_task(&mut self, spec: TaskSpec) -> Result<(), EngineError> {
145        let task_id = spec.id();
146        tracing::debug!(%task_id, "submit_task");
147        self.dispatch.submit_task(spec).map_err(EngineError::Dispatch)
148    }
149
150    /// Advances the dispatch loop by one tick.
151    pub async fn tick(&mut self) -> Result<TickResult, EngineError> {
152        self.dispatch.tick().await.map_err(EngineError::Dispatch)
153    }
154
155    /// Runs the dispatch loop until no work remains.
156    pub async fn run_until_idle(&mut self) -> Result<RunSummary, EngineError> {
157        self.dispatch.run_until_idle().await.map_err(EngineError::Dispatch)
158    }
159
160    /// Returns a reference to the current projection state.
161    pub fn projection(&self) -> &ReplayReducer {
162        self.dispatch.projection()
163    }
164
165    /// Declares a DAG dependency.
166    pub fn declare_dependency(
167        &mut self,
168        task_id: actionqueue_core::ids::TaskId,
169        prereqs: Vec<actionqueue_core::ids::TaskId>,
170    ) -> Result<(), EngineError> {
171        tracing::debug!(%task_id, prereq_count = prereqs.len(), "declare_dependency");
172        self.dispatch.declare_dependency(task_id, prereqs).map_err(EngineError::Dispatch)
173    }
174
175    /// Allocates a token budget for a task/dimension pair.
176    #[cfg(feature = "budget")]
177    pub fn allocate_budget(
178        &mut self,
179        task_id: actionqueue_core::ids::TaskId,
180        dimension: actionqueue_core::budget::BudgetDimension,
181        limit: u64,
182    ) -> Result<(), EngineError> {
183        tracing::debug!(%task_id, ?dimension, limit, "allocate_budget");
184        self.dispatch.allocate_budget(task_id, dimension, limit).map_err(EngineError::Dispatch)
185    }
186
187    /// Replenishes an exhausted budget dimension for a task.
188    #[cfg(feature = "budget")]
189    pub fn replenish_budget(
190        &mut self,
191        task_id: actionqueue_core::ids::TaskId,
192        dimension: actionqueue_core::budget::BudgetDimension,
193        new_limit: u64,
194    ) -> Result<(), EngineError> {
195        tracing::debug!(%task_id, ?dimension, new_limit, "replenish_budget");
196        self.dispatch.replenish_budget(task_id, dimension, new_limit).map_err(EngineError::Dispatch)
197    }
198
199    /// Resumes a suspended run (transitions Suspended → Ready).
200    #[cfg(feature = "budget")]
201    pub fn resume_run(&mut self, run_id: actionqueue_core::ids::RunId) -> Result<(), EngineError> {
202        tracing::debug!(%run_id, "resume_run");
203        self.dispatch.resume_run(run_id).map_err(EngineError::Dispatch)
204    }
205
206    /// Creates a new event subscription.
207    ///
208    /// Returns the generated `SubscriptionId` which the caller may use to
209    /// inspect the subscription state later.
210    #[cfg(feature = "budget")]
211    pub fn create_subscription(
212        &mut self,
213        task_id: actionqueue_core::ids::TaskId,
214        filter: actionqueue_core::subscription::EventFilter,
215    ) -> Result<actionqueue_core::subscription::SubscriptionId, EngineError> {
216        tracing::debug!(%task_id, "create_subscription");
217        self.dispatch.create_subscription(task_id, filter).map_err(EngineError::Dispatch)
218    }
219
220    /// Fires a custom event, triggering any matching subscriptions.
221    ///
222    /// Subscriptions with a `Custom { key }` filter matching the event key
223    /// are triggered. Triggered subscriptions promote their task's Scheduled
224    /// runs on the next tick.
225    #[cfg(feature = "budget")]
226    pub fn fire_custom_event(&mut self, key: String) -> Result<(), EngineError> {
227        tracing::debug!(key, "fire_custom_event");
228        self.dispatch.fire_custom_event(key).map_err(EngineError::Dispatch)
229    }
230
231    // ── Actor feature ──────────────────────────────────────────────────────
232
233    /// Registers a remote actor with the hub.
234    #[cfg(feature = "actor")]
235    pub fn register_actor(
236        &mut self,
237        registration: actionqueue_core::actor::ActorRegistration,
238    ) -> Result<(), EngineError> {
239        let actor_id = registration.actor_id();
240        tracing::debug!(%actor_id, "register_actor");
241        self.dispatch.register_actor(registration).map_err(EngineError::Dispatch)
242    }
243
244    /// Deregisters a remote actor from the hub.
245    #[cfg(feature = "actor")]
246    pub fn deregister_actor(
247        &mut self,
248        actor_id: actionqueue_core::ids::ActorId,
249    ) -> Result<(), EngineError> {
250        tracing::debug!(%actor_id, "deregister_actor");
251        self.dispatch.deregister_actor(actor_id).map_err(EngineError::Dispatch)
252    }
253
254    /// Records an actor heartbeat.
255    #[cfg(feature = "actor")]
256    pub fn actor_heartbeat(
257        &mut self,
258        actor_id: actionqueue_core::ids::ActorId,
259    ) -> Result<(), EngineError> {
260        tracing::trace!(%actor_id, "actor_heartbeat");
261        self.dispatch.actor_heartbeat(actor_id).map_err(EngineError::Dispatch)
262    }
263
264    /// Returns a reference to the actor registry.
265    #[cfg(feature = "actor")]
266    pub fn actor_registry(&self) -> &actionqueue_actor::ActorRegistry {
267        self.dispatch.actor_registry()
268    }
269
270    // ── Platform feature ───────────────────────────────────────────────────
271
272    /// Creates a new organizational tenant.
273    #[cfg(feature = "platform")]
274    pub fn create_tenant(
275        &mut self,
276        registration: actionqueue_core::platform::TenantRegistration,
277    ) -> Result<(), EngineError> {
278        let tenant_id = registration.tenant_id();
279        tracing::debug!(%tenant_id, "create_tenant");
280        self.dispatch.create_tenant(registration).map_err(EngineError::Dispatch)
281    }
282
283    /// Assigns a role to an actor within a tenant.
284    #[cfg(feature = "platform")]
285    pub fn assign_role(
286        &mut self,
287        actor_id: actionqueue_core::ids::ActorId,
288        role: actionqueue_core::platform::Role,
289        tenant_id: actionqueue_core::ids::TenantId,
290    ) -> Result<(), EngineError> {
291        tracing::debug!(%actor_id, ?role, %tenant_id, "assign_role");
292        self.dispatch.assign_role(actor_id, role, tenant_id).map_err(EngineError::Dispatch)
293    }
294
295    /// Grants a capability to an actor within a tenant.
296    #[cfg(feature = "platform")]
297    pub fn grant_capability(
298        &mut self,
299        actor_id: actionqueue_core::ids::ActorId,
300        capability: actionqueue_core::platform::Capability,
301        tenant_id: actionqueue_core::ids::TenantId,
302    ) -> Result<(), EngineError> {
303        tracing::debug!(%actor_id, ?capability, %tenant_id, "grant_capability");
304        self.dispatch
305            .grant_capability(actor_id, capability, tenant_id)
306            .map_err(EngineError::Dispatch)
307    }
308
309    /// Revokes a capability from an actor within a tenant.
310    #[cfg(feature = "platform")]
311    pub fn revoke_capability(
312        &mut self,
313        actor_id: actionqueue_core::ids::ActorId,
314        capability: actionqueue_core::platform::Capability,
315        tenant_id: actionqueue_core::ids::TenantId,
316    ) -> Result<(), EngineError> {
317        tracing::debug!(%actor_id, ?capability, %tenant_id, "revoke_capability");
318        self.dispatch
319            .revoke_capability(actor_id, capability, tenant_id)
320            .map_err(EngineError::Dispatch)
321    }
322
323    /// Appends an entry to the organizational ledger.
324    #[cfg(feature = "platform")]
325    pub fn append_ledger_entry(
326        &mut self,
327        entry: actionqueue_core::platform::LedgerEntry,
328    ) -> Result<(), EngineError> {
329        tracing::debug!(
330            entry_id = %entry.entry_id(),
331            tenant_id = %entry.tenant_id(),
332            ledger_key = entry.ledger_key(),
333            "append_ledger_entry"
334        );
335        self.dispatch.append_ledger_entry(entry).map_err(EngineError::Dispatch)
336    }
337
338    /// Returns a reference to the append ledger.
339    #[cfg(feature = "platform")]
340    pub fn ledger(&self) -> &actionqueue_platform::AppendLedger {
341        self.dispatch.ledger()
342    }
343
344    /// Returns a reference to the RBAC enforcer.
345    #[cfg(feature = "platform")]
346    pub fn rbac(&self) -> &actionqueue_platform::RbacEnforcer {
347        self.dispatch.rbac()
348    }
349
350    /// Returns a reference to the tenant registry.
351    #[cfg(feature = "platform")]
352    pub fn tenant_registry(&self) -> &actionqueue_platform::TenantRegistry {
353        self.dispatch.tenant_registry()
354    }
355
356    /// Gracefully drains in-flight work and then shuts down.
357    ///
358    /// Stops promoting and dispatching new work, but continues processing
359    /// in-flight results and heartbeating leases until all in-flight work
360    /// completes or the timeout expires. The WAL writer is flushed and
361    /// closed on drop of the underlying authority.
362    pub async fn drain_and_shutdown(
363        mut self,
364        timeout: std::time::Duration,
365    ) -> Result<(), EngineError> {
366        tracing::info!(timeout_secs = timeout.as_secs(), "drain_and_shutdown starting");
367        let _ = self.dispatch.drain_until_idle(timeout).await?;
368        tracing::info!("drain_and_shutdown complete");
369        Ok(())
370    }
371
372    /// Shuts down the engine immediately, consuming it.
373    ///
374    /// In-flight workers will continue to completion in their tokio blocking tasks.
375    /// Worker results for in-flight runs are lost; those runs will recover via
376    /// lease expiry on the next bootstrap. The WAL writer is flushed and closed
377    /// on drop of the underlying authority.
378    ///
379    /// For orderly drain of in-flight work, use `drain_and_shutdown()` instead.
380    pub fn shutdown(self) -> Result<(), EngineError> {
381        tracing::info!("engine shutdown");
382        // Authority (and its WAL writer) are dropped here, triggering flush.
383        Ok(())
384    }
385}