1use actionqueue_core::task::task_spec::TaskSpec;
7use actionqueue_engine::time::clock::{Clock, SystemClock};
8use actionqueue_executor_local::handler::ExecutorHandler;
9use actionqueue_storage::recovery::bootstrap::{
10 load_projection_from_storage, RecoveryBootstrapError,
11};
12use actionqueue_storage::recovery::reducer::ReplayReducer;
13use actionqueue_storage::wal::fs_writer::WalFsWriter;
14use actionqueue_storage::wal::writer::InstrumentedWalWriter;
15use tracing;
16
17use crate::config::RuntimeConfig;
18use crate::dispatch::{DispatchError, DispatchLoop, RunSummary, TickResult};
19
20#[derive(Debug)]
22pub enum BootstrapError {
23 Config(crate::config::ConfigError),
25 Recovery(RecoveryBootstrapError),
27 Io(String),
29 Dispatch(DispatchError),
31}
32
33impl std::fmt::Display for BootstrapError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 BootstrapError::Config(e) => write!(f, "config error: {e}"),
37 BootstrapError::Recovery(e) => write!(f, "recovery error: {e}"),
38 BootstrapError::Io(e) => write!(f, "I/O error: {e}"),
39 BootstrapError::Dispatch(e) => write!(f, "dispatch init error: {e}"),
40 }
41 }
42}
43
44impl std::error::Error for BootstrapError {}
45
46#[derive(Debug)]
48pub enum EngineError {
49 Dispatch(DispatchError),
51}
52
53impl std::fmt::Display for EngineError {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 EngineError::Dispatch(e) => write!(f, "dispatch error: {e}"),
57 }
58 }
59}
60
61impl std::error::Error for EngineError {}
62
63impl From<DispatchError> for EngineError {
64 fn from(e: DispatchError) -> Self {
65 EngineError::Dispatch(e)
66 }
67}
68
69pub struct ActionQueueEngine<H: ExecutorHandler> {
71 config: RuntimeConfig,
72 handler: H,
73}
74
75impl<H: ExecutorHandler + 'static> ActionQueueEngine<H> {
76 pub fn new(config: RuntimeConfig, handler: H) -> Self {
78 Self { config, handler }
79 }
80
81 pub fn bootstrap(self) -> Result<BootstrappedEngine<H, SystemClock>, BootstrapError> {
84 self.bootstrap_with_clock(SystemClock)
85 }
86
87 pub fn bootstrap_with_clock<C: Clock>(
89 self,
90 clock: C,
91 ) -> Result<BootstrappedEngine<H, C>, BootstrapError> {
92 self.config.validate().map_err(BootstrapError::Config)?;
93
94 let data_dir = self.config.data_dir.display().to_string();
95 tracing::info!(data_dir, "bootstrapping engine");
96
97 std::fs::create_dir_all(&self.config.data_dir)
99 .map_err(|e| BootstrapError::Io(e.to_string()))?;
100
101 let recovery = load_projection_from_storage(&self.config.data_dir)
103 .map_err(BootstrapError::Recovery)?;
104
105 let authority = actionqueue_storage::mutation::authority::StorageMutationAuthority::new(
107 recovery.wal_writer,
108 recovery.projection,
109 );
110
111 let snapshot_path = self
113 .config
114 .snapshot_event_threshold
115 .map(|_| self.config.data_dir.join("snapshots").join("snapshot.bin"));
116
117 let dispatch = DispatchLoop::new(
119 authority,
120 self.handler,
121 clock,
122 crate::dispatch::DispatchConfig::new(
123 self.config.backoff_strategy.clone(),
124 self.config.dispatch_concurrency.get(),
125 self.config.lease_timeout_secs,
126 snapshot_path,
127 self.config.snapshot_event_threshold,
128 ),
129 )
130 .map_err(BootstrapError::Dispatch)?;
131
132 tracing::info!(data_dir, "engine bootstrap complete");
133 Ok(BootstrappedEngine { dispatch })
134 }
135}
136
137pub struct BootstrappedEngine<H: ExecutorHandler + 'static, C: Clock = SystemClock> {
139 dispatch: DispatchLoop<InstrumentedWalWriter<WalFsWriter>, H, C>,
140}
141
142impl<H: ExecutorHandler + 'static, C: Clock> BootstrappedEngine<H, C> {
143 pub fn submit_task(&mut self, spec: TaskSpec) -> Result<(), EngineError> {
145 let task_id = spec.id();
146 tracing::debug!(%task_id, "submit_task");
147 self.dispatch.submit_task(spec).map_err(EngineError::Dispatch)
148 }
149
150 pub async fn tick(&mut self) -> Result<TickResult, EngineError> {
152 self.dispatch.tick().await.map_err(EngineError::Dispatch)
153 }
154
155 pub async fn run_until_idle(&mut self) -> Result<RunSummary, EngineError> {
157 self.dispatch.run_until_idle().await.map_err(EngineError::Dispatch)
158 }
159
160 pub fn projection(&self) -> &ReplayReducer {
162 self.dispatch.projection()
163 }
164
165 pub fn declare_dependency(
167 &mut self,
168 task_id: actionqueue_core::ids::TaskId,
169 prereqs: Vec<actionqueue_core::ids::TaskId>,
170 ) -> Result<(), EngineError> {
171 tracing::debug!(%task_id, prereq_count = prereqs.len(), "declare_dependency");
172 self.dispatch.declare_dependency(task_id, prereqs).map_err(EngineError::Dispatch)
173 }
174
175 #[cfg(feature = "budget")]
177 pub fn allocate_budget(
178 &mut self,
179 task_id: actionqueue_core::ids::TaskId,
180 dimension: actionqueue_core::budget::BudgetDimension,
181 limit: u64,
182 ) -> Result<(), EngineError> {
183 tracing::debug!(%task_id, ?dimension, limit, "allocate_budget");
184 self.dispatch.allocate_budget(task_id, dimension, limit).map_err(EngineError::Dispatch)
185 }
186
187 #[cfg(feature = "budget")]
189 pub fn replenish_budget(
190 &mut self,
191 task_id: actionqueue_core::ids::TaskId,
192 dimension: actionqueue_core::budget::BudgetDimension,
193 new_limit: u64,
194 ) -> Result<(), EngineError> {
195 tracing::debug!(%task_id, ?dimension, new_limit, "replenish_budget");
196 self.dispatch.replenish_budget(task_id, dimension, new_limit).map_err(EngineError::Dispatch)
197 }
198
199 #[cfg(feature = "budget")]
203 pub fn budget_remaining(
204 &self,
205 task_id: actionqueue_core::ids::TaskId,
206 dimension: actionqueue_core::budget::BudgetDimension,
207 ) -> Option<u64> {
208 self.dispatch.budget_remaining(task_id, dimension)
209 }
210
211 #[cfg(feature = "budget")]
215 pub fn is_budget_exhausted(
216 &self,
217 task_id: actionqueue_core::ids::TaskId,
218 dimension: actionqueue_core::budget::BudgetDimension,
219 ) -> bool {
220 self.dispatch.is_budget_exhausted(task_id, dimension)
221 }
222
223 #[cfg(feature = "budget")]
225 pub fn resume_run(&mut self, run_id: actionqueue_core::ids::RunId) -> Result<(), EngineError> {
226 tracing::debug!(%run_id, "resume_run");
227 self.dispatch.resume_run(run_id).map_err(EngineError::Dispatch)
228 }
229
230 #[cfg(feature = "budget")]
235 pub fn create_subscription(
236 &mut self,
237 task_id: actionqueue_core::ids::TaskId,
238 filter: actionqueue_core::subscription::EventFilter,
239 ) -> Result<actionqueue_core::subscription::SubscriptionId, EngineError> {
240 tracing::debug!(%task_id, "create_subscription");
241 self.dispatch.create_subscription(task_id, filter).map_err(EngineError::Dispatch)
242 }
243
244 #[cfg(feature = "budget")]
250 pub fn fire_custom_event(&mut self, key: String) -> Result<(), EngineError> {
251 tracing::debug!(key, "fire_custom_event");
252 self.dispatch.fire_custom_event(key).map_err(EngineError::Dispatch)
253 }
254
255 #[cfg(feature = "actor")]
259 pub fn register_actor(
260 &mut self,
261 registration: actionqueue_core::actor::ActorRegistration,
262 ) -> Result<(), EngineError> {
263 let actor_id = registration.actor_id();
264 tracing::debug!(%actor_id, "register_actor");
265 self.dispatch.register_actor(registration).map_err(EngineError::Dispatch)
266 }
267
268 #[cfg(feature = "actor")]
270 pub fn deregister_actor(
271 &mut self,
272 actor_id: actionqueue_core::ids::ActorId,
273 ) -> Result<(), EngineError> {
274 tracing::debug!(%actor_id, "deregister_actor");
275 self.dispatch.deregister_actor(actor_id).map_err(EngineError::Dispatch)
276 }
277
278 #[cfg(feature = "actor")]
280 pub fn actor_heartbeat(
281 &mut self,
282 actor_id: actionqueue_core::ids::ActorId,
283 ) -> Result<(), EngineError> {
284 tracing::trace!(%actor_id, "actor_heartbeat");
285 self.dispatch.actor_heartbeat(actor_id).map_err(EngineError::Dispatch)
286 }
287
288 #[cfg(feature = "actor")]
290 pub fn actor_registry(&self) -> &actionqueue_actor::ActorRegistry {
291 self.dispatch.actor_registry()
292 }
293
294 #[cfg(feature = "platform")]
298 pub fn create_tenant(
299 &mut self,
300 registration: actionqueue_core::platform::TenantRegistration,
301 ) -> Result<(), EngineError> {
302 let tenant_id = registration.tenant_id();
303 tracing::debug!(%tenant_id, "create_tenant");
304 self.dispatch.create_tenant(registration).map_err(EngineError::Dispatch)
305 }
306
307 #[cfg(feature = "platform")]
309 pub fn assign_role(
310 &mut self,
311 actor_id: actionqueue_core::ids::ActorId,
312 role: actionqueue_core::platform::Role,
313 tenant_id: actionqueue_core::ids::TenantId,
314 ) -> Result<(), EngineError> {
315 tracing::debug!(%actor_id, ?role, %tenant_id, "assign_role");
316 self.dispatch.assign_role(actor_id, role, tenant_id).map_err(EngineError::Dispatch)
317 }
318
319 #[cfg(feature = "platform")]
321 pub fn grant_capability(
322 &mut self,
323 actor_id: actionqueue_core::ids::ActorId,
324 capability: actionqueue_core::platform::Capability,
325 tenant_id: actionqueue_core::ids::TenantId,
326 ) -> Result<(), EngineError> {
327 tracing::debug!(%actor_id, ?capability, %tenant_id, "grant_capability");
328 self.dispatch
329 .grant_capability(actor_id, capability, tenant_id)
330 .map_err(EngineError::Dispatch)
331 }
332
333 #[cfg(feature = "platform")]
335 pub fn revoke_capability(
336 &mut self,
337 actor_id: actionqueue_core::ids::ActorId,
338 capability: actionqueue_core::platform::Capability,
339 tenant_id: actionqueue_core::ids::TenantId,
340 ) -> Result<(), EngineError> {
341 tracing::debug!(%actor_id, ?capability, %tenant_id, "revoke_capability");
342 self.dispatch
343 .revoke_capability(actor_id, capability, tenant_id)
344 .map_err(EngineError::Dispatch)
345 }
346
347 #[cfg(feature = "platform")]
349 pub fn append_ledger_entry(
350 &mut self,
351 entry: actionqueue_core::platform::LedgerEntry,
352 ) -> Result<(), EngineError> {
353 tracing::debug!(
354 entry_id = %entry.entry_id(),
355 tenant_id = %entry.tenant_id(),
356 ledger_key = entry.ledger_key(),
357 "append_ledger_entry"
358 );
359 self.dispatch.append_ledger_entry(entry).map_err(EngineError::Dispatch)
360 }
361
362 #[cfg(feature = "platform")]
364 pub fn ledger(&self) -> &actionqueue_platform::AppendLedger {
365 self.dispatch.ledger()
366 }
367
368 #[cfg(feature = "platform")]
370 pub fn rbac(&self) -> &actionqueue_platform::RbacEnforcer {
371 self.dispatch.rbac()
372 }
373
374 #[cfg(feature = "platform")]
376 pub fn tenant_registry(&self) -> &actionqueue_platform::TenantRegistry {
377 self.dispatch.tenant_registry()
378 }
379
380 pub async fn drain_and_shutdown(
387 mut self,
388 timeout: std::time::Duration,
389 ) -> Result<(), EngineError> {
390 tracing::info!(timeout_secs = timeout.as_secs(), "drain_and_shutdown starting");
391 let _ = self.dispatch.drain_until_idle(timeout).await?;
392 tracing::info!("drain_and_shutdown complete");
393 Ok(())
394 }
395
396 pub fn shutdown(self) -> Result<(), EngineError> {
405 tracing::info!("engine shutdown");
406 Ok(())
408 }
409}