1use actionqueue_core::task::task_spec::TaskSpec;
7use actionqueue_engine::time::clock::{Clock, SystemClock};
8use actionqueue_executor_local::handler::ExecutorHandler;
9use actionqueue_storage::recovery::bootstrap::{
10 load_projection_from_storage, RecoveryBootstrapError,
11};
12use actionqueue_storage::recovery::reducer::ReplayReducer;
13use actionqueue_storage::wal::fs_writer::WalFsWriter;
14use actionqueue_storage::wal::writer::InstrumentedWalWriter;
15use tracing;
16
17use crate::config::RuntimeConfig;
18use crate::dispatch::{DispatchError, DispatchLoop, RunSummary, TickResult};
19
20#[derive(Debug)]
22pub enum BootstrapError {
23 Config(crate::config::ConfigError),
25 Recovery(RecoveryBootstrapError),
27 Io(String),
29 Dispatch(DispatchError),
31}
32
33impl std::fmt::Display for BootstrapError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 BootstrapError::Config(e) => write!(f, "config error: {e}"),
37 BootstrapError::Recovery(e) => write!(f, "recovery error: {e}"),
38 BootstrapError::Io(e) => write!(f, "I/O error: {e}"),
39 BootstrapError::Dispatch(e) => write!(f, "dispatch init error: {e}"),
40 }
41 }
42}
43
44impl std::error::Error for BootstrapError {}
45
46#[derive(Debug)]
48pub enum EngineError {
49 Dispatch(DispatchError),
51}
52
53impl std::fmt::Display for EngineError {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 EngineError::Dispatch(e) => write!(f, "dispatch error: {e}"),
57 }
58 }
59}
60
61impl std::error::Error for EngineError {}
62
63impl From<DispatchError> for EngineError {
64 fn from(e: DispatchError) -> Self {
65 EngineError::Dispatch(e)
66 }
67}
68
69pub struct ActionQueueEngine<H: ExecutorHandler> {
71 config: RuntimeConfig,
72 handler: H,
73}
74
75impl<H: ExecutorHandler + 'static> ActionQueueEngine<H> {
76 pub fn new(config: RuntimeConfig, handler: H) -> Self {
78 Self { config, handler }
79 }
80
81 pub fn bootstrap(self) -> Result<BootstrappedEngine<H, SystemClock>, BootstrapError> {
84 self.bootstrap_with_clock(SystemClock)
85 }
86
87 pub fn bootstrap_with_clock<C: Clock>(
89 self,
90 clock: C,
91 ) -> Result<BootstrappedEngine<H, C>, BootstrapError> {
92 self.config.validate().map_err(BootstrapError::Config)?;
93
94 let data_dir = self.config.data_dir.display().to_string();
95 tracing::info!(data_dir, "bootstrapping engine");
96
97 std::fs::create_dir_all(&self.config.data_dir)
99 .map_err(|e| BootstrapError::Io(e.to_string()))?;
100
101 let recovery = load_projection_from_storage(&self.config.data_dir)
103 .map_err(BootstrapError::Recovery)?;
104
105 let authority = actionqueue_storage::mutation::authority::StorageMutationAuthority::new(
107 recovery.wal_writer,
108 recovery.projection,
109 );
110
111 let snapshot_path = self
113 .config
114 .snapshot_event_threshold
115 .map(|_| self.config.data_dir.join("snapshots").join("snapshot.bin"));
116
117 let dispatch = DispatchLoop::new(
119 authority,
120 self.handler,
121 clock,
122 crate::dispatch::DispatchConfig::new(
123 self.config.backoff_strategy.clone(),
124 self.config.dispatch_concurrency.get(),
125 self.config.lease_timeout_secs,
126 snapshot_path,
127 self.config.snapshot_event_threshold,
128 ),
129 )
130 .map_err(BootstrapError::Dispatch)?;
131
132 tracing::info!(data_dir, "engine bootstrap complete");
133 Ok(BootstrappedEngine { dispatch })
134 }
135}
136
137pub struct BootstrappedEngine<H: ExecutorHandler + 'static, C: Clock = SystemClock> {
139 dispatch: DispatchLoop<InstrumentedWalWriter<WalFsWriter>, H, C>,
140}
141
142impl<H: ExecutorHandler + 'static, C: Clock> BootstrappedEngine<H, C> {
143 pub fn submit_task(&mut self, spec: TaskSpec) -> Result<(), EngineError> {
145 let task_id = spec.id();
146 tracing::debug!(%task_id, "submit_task");
147 self.dispatch.submit_task(spec).map_err(EngineError::Dispatch)
148 }
149
150 pub async fn tick(&mut self) -> Result<TickResult, EngineError> {
152 self.dispatch.tick().await.map_err(EngineError::Dispatch)
153 }
154
155 pub async fn run_until_idle(&mut self) -> Result<RunSummary, EngineError> {
157 self.dispatch.run_until_idle().await.map_err(EngineError::Dispatch)
158 }
159
160 pub fn projection(&self) -> &ReplayReducer {
162 self.dispatch.projection()
163 }
164
165 pub fn declare_dependency(
167 &mut self,
168 task_id: actionqueue_core::ids::TaskId,
169 prereqs: Vec<actionqueue_core::ids::TaskId>,
170 ) -> Result<(), EngineError> {
171 tracing::debug!(%task_id, prereq_count = prereqs.len(), "declare_dependency");
172 self.dispatch.declare_dependency(task_id, prereqs).map_err(EngineError::Dispatch)
173 }
174
175 #[cfg(feature = "budget")]
177 pub fn allocate_budget(
178 &mut self,
179 task_id: actionqueue_core::ids::TaskId,
180 dimension: actionqueue_core::budget::BudgetDimension,
181 limit: u64,
182 ) -> Result<(), EngineError> {
183 tracing::debug!(%task_id, ?dimension, limit, "allocate_budget");
184 self.dispatch.allocate_budget(task_id, dimension, limit).map_err(EngineError::Dispatch)
185 }
186
187 #[cfg(feature = "budget")]
189 pub fn replenish_budget(
190 &mut self,
191 task_id: actionqueue_core::ids::TaskId,
192 dimension: actionqueue_core::budget::BudgetDimension,
193 new_limit: u64,
194 ) -> Result<(), EngineError> {
195 tracing::debug!(%task_id, ?dimension, new_limit, "replenish_budget");
196 self.dispatch.replenish_budget(task_id, dimension, new_limit).map_err(EngineError::Dispatch)
197 }
198
199 #[cfg(feature = "budget")]
201 pub fn resume_run(&mut self, run_id: actionqueue_core::ids::RunId) -> Result<(), EngineError> {
202 tracing::debug!(%run_id, "resume_run");
203 self.dispatch.resume_run(run_id).map_err(EngineError::Dispatch)
204 }
205
206 #[cfg(feature = "budget")]
211 pub fn create_subscription(
212 &mut self,
213 task_id: actionqueue_core::ids::TaskId,
214 filter: actionqueue_core::subscription::EventFilter,
215 ) -> Result<actionqueue_core::subscription::SubscriptionId, EngineError> {
216 tracing::debug!(%task_id, "create_subscription");
217 self.dispatch.create_subscription(task_id, filter).map_err(EngineError::Dispatch)
218 }
219
220 #[cfg(feature = "budget")]
226 pub fn fire_custom_event(&mut self, key: String) -> Result<(), EngineError> {
227 tracing::debug!(key, "fire_custom_event");
228 self.dispatch.fire_custom_event(key).map_err(EngineError::Dispatch)
229 }
230
231 #[cfg(feature = "actor")]
235 pub fn register_actor(
236 &mut self,
237 registration: actionqueue_core::actor::ActorRegistration,
238 ) -> Result<(), EngineError> {
239 let actor_id = registration.actor_id();
240 tracing::debug!(%actor_id, "register_actor");
241 self.dispatch.register_actor(registration).map_err(EngineError::Dispatch)
242 }
243
244 #[cfg(feature = "actor")]
246 pub fn deregister_actor(
247 &mut self,
248 actor_id: actionqueue_core::ids::ActorId,
249 ) -> Result<(), EngineError> {
250 tracing::debug!(%actor_id, "deregister_actor");
251 self.dispatch.deregister_actor(actor_id).map_err(EngineError::Dispatch)
252 }
253
254 #[cfg(feature = "actor")]
256 pub fn actor_heartbeat(
257 &mut self,
258 actor_id: actionqueue_core::ids::ActorId,
259 ) -> Result<(), EngineError> {
260 tracing::trace!(%actor_id, "actor_heartbeat");
261 self.dispatch.actor_heartbeat(actor_id).map_err(EngineError::Dispatch)
262 }
263
264 #[cfg(feature = "actor")]
266 pub fn actor_registry(&self) -> &actionqueue_actor::ActorRegistry {
267 self.dispatch.actor_registry()
268 }
269
270 #[cfg(feature = "platform")]
274 pub fn create_tenant(
275 &mut self,
276 registration: actionqueue_core::platform::TenantRegistration,
277 ) -> Result<(), EngineError> {
278 let tenant_id = registration.tenant_id();
279 tracing::debug!(%tenant_id, "create_tenant");
280 self.dispatch.create_tenant(registration).map_err(EngineError::Dispatch)
281 }
282
283 #[cfg(feature = "platform")]
285 pub fn assign_role(
286 &mut self,
287 actor_id: actionqueue_core::ids::ActorId,
288 role: actionqueue_core::platform::Role,
289 tenant_id: actionqueue_core::ids::TenantId,
290 ) -> Result<(), EngineError> {
291 tracing::debug!(%actor_id, ?role, %tenant_id, "assign_role");
292 self.dispatch.assign_role(actor_id, role, tenant_id).map_err(EngineError::Dispatch)
293 }
294
295 #[cfg(feature = "platform")]
297 pub fn grant_capability(
298 &mut self,
299 actor_id: actionqueue_core::ids::ActorId,
300 capability: actionqueue_core::platform::Capability,
301 tenant_id: actionqueue_core::ids::TenantId,
302 ) -> Result<(), EngineError> {
303 tracing::debug!(%actor_id, ?capability, %tenant_id, "grant_capability");
304 self.dispatch
305 .grant_capability(actor_id, capability, tenant_id)
306 .map_err(EngineError::Dispatch)
307 }
308
309 #[cfg(feature = "platform")]
311 pub fn revoke_capability(
312 &mut self,
313 actor_id: actionqueue_core::ids::ActorId,
314 capability: actionqueue_core::platform::Capability,
315 tenant_id: actionqueue_core::ids::TenantId,
316 ) -> Result<(), EngineError> {
317 tracing::debug!(%actor_id, ?capability, %tenant_id, "revoke_capability");
318 self.dispatch
319 .revoke_capability(actor_id, capability, tenant_id)
320 .map_err(EngineError::Dispatch)
321 }
322
323 #[cfg(feature = "platform")]
325 pub fn append_ledger_entry(
326 &mut self,
327 entry: actionqueue_core::platform::LedgerEntry,
328 ) -> Result<(), EngineError> {
329 tracing::debug!(
330 entry_id = %entry.entry_id(),
331 tenant_id = %entry.tenant_id(),
332 ledger_key = entry.ledger_key(),
333 "append_ledger_entry"
334 );
335 self.dispatch.append_ledger_entry(entry).map_err(EngineError::Dispatch)
336 }
337
338 #[cfg(feature = "platform")]
340 pub fn ledger(&self) -> &actionqueue_platform::AppendLedger {
341 self.dispatch.ledger()
342 }
343
344 #[cfg(feature = "platform")]
346 pub fn rbac(&self) -> &actionqueue_platform::RbacEnforcer {
347 self.dispatch.rbac()
348 }
349
350 #[cfg(feature = "platform")]
352 pub fn tenant_registry(&self) -> &actionqueue_platform::TenantRegistry {
353 self.dispatch.tenant_registry()
354 }
355
356 pub async fn drain_and_shutdown(
363 mut self,
364 timeout: std::time::Duration,
365 ) -> Result<(), EngineError> {
366 tracing::info!(timeout_secs = timeout.as_secs(), "drain_and_shutdown starting");
367 let _ = self.dispatch.drain_until_idle(timeout).await?;
368 tracing::info!("drain_and_shutdown complete");
369 Ok(())
370 }
371
372 pub fn shutdown(self) -> Result<(), EngineError> {
381 tracing::info!("engine shutdown");
382 Ok(())
384 }
385}