use actionqueue_core::task::task_spec::TaskSpec;
use actionqueue_engine::time::clock::{Clock, SystemClock};
use actionqueue_executor_local::handler::ExecutorHandler;
use actionqueue_storage::recovery::bootstrap::{
load_projection_from_storage, RecoveryBootstrapError,
};
use actionqueue_storage::recovery::reducer::ReplayReducer;
use actionqueue_storage::wal::fs_writer::WalFsWriter;
use actionqueue_storage::wal::writer::InstrumentedWalWriter;
use tracing;
use crate::config::RuntimeConfig;
use crate::dispatch::{DispatchError, DispatchLoop, RunSummary, TickResult};
#[derive(Debug)]
pub enum BootstrapError {
Config(crate::config::ConfigError),
Recovery(RecoveryBootstrapError),
Io(String),
Dispatch(DispatchError),
}
impl std::fmt::Display for BootstrapError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BootstrapError::Config(e) => write!(f, "config error: {e}"),
BootstrapError::Recovery(e) => write!(f, "recovery error: {e}"),
BootstrapError::Io(e) => write!(f, "I/O error: {e}"),
BootstrapError::Dispatch(e) => write!(f, "dispatch init error: {e}"),
}
}
}
impl std::error::Error for BootstrapError {}
#[derive(Debug)]
pub enum EngineError {
Dispatch(DispatchError),
}
impl std::fmt::Display for EngineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EngineError::Dispatch(e) => write!(f, "dispatch error: {e}"),
}
}
}
impl std::error::Error for EngineError {}
impl From<DispatchError> for EngineError {
fn from(e: DispatchError) -> Self {
EngineError::Dispatch(e)
}
}
pub struct ActionQueueEngine<H: ExecutorHandler> {
config: RuntimeConfig,
handler: H,
}
impl<H: ExecutorHandler + 'static> ActionQueueEngine<H> {
pub fn new(config: RuntimeConfig, handler: H) -> Self {
Self { config, handler }
}
pub fn bootstrap(self) -> Result<BootstrappedEngine<H, SystemClock>, BootstrapError> {
self.bootstrap_with_clock(SystemClock)
}
pub fn bootstrap_with_clock<C: Clock>(
self,
clock: C,
) -> Result<BootstrappedEngine<H, C>, BootstrapError> {
self.config.validate().map_err(BootstrapError::Config)?;
let data_dir = self.config.data_dir.display().to_string();
tracing::info!(data_dir, "bootstrapping engine");
std::fs::create_dir_all(&self.config.data_dir)
.map_err(|e| BootstrapError::Io(e.to_string()))?;
let recovery = load_projection_from_storage(&self.config.data_dir)
.map_err(BootstrapError::Recovery)?;
let authority = actionqueue_storage::mutation::authority::StorageMutationAuthority::new(
recovery.wal_writer,
recovery.projection,
);
let snapshot_path = self
.config
.snapshot_event_threshold
.map(|_| self.config.data_dir.join("snapshots").join("snapshot.bin"));
let dispatch = DispatchLoop::new(
authority,
self.handler,
clock,
crate::dispatch::DispatchConfig::new(
self.config.backoff_strategy.clone(),
self.config.dispatch_concurrency.get(),
self.config.lease_timeout_secs,
snapshot_path,
self.config.snapshot_event_threshold,
),
)
.map_err(BootstrapError::Dispatch)?;
tracing::info!(data_dir, "engine bootstrap complete");
Ok(BootstrappedEngine { dispatch })
}
}
pub struct BootstrappedEngine<H: ExecutorHandler + 'static, C: Clock = SystemClock> {
dispatch: DispatchLoop<InstrumentedWalWriter<WalFsWriter>, H, C>,
}
impl<H: ExecutorHandler + 'static, C: Clock> BootstrappedEngine<H, C> {
pub fn submit_task(&mut self, spec: TaskSpec) -> Result<(), EngineError> {
let task_id = spec.id();
tracing::debug!(%task_id, "submit_task");
self.dispatch.submit_task(spec).map_err(EngineError::Dispatch)
}
pub async fn tick(&mut self) -> Result<TickResult, EngineError> {
self.dispatch.tick().await.map_err(EngineError::Dispatch)
}
pub async fn run_until_idle(&mut self) -> Result<RunSummary, EngineError> {
self.dispatch.run_until_idle().await.map_err(EngineError::Dispatch)
}
pub fn projection(&self) -> &ReplayReducer {
self.dispatch.projection()
}
pub fn declare_dependency(
&mut self,
task_id: actionqueue_core::ids::TaskId,
prereqs: Vec<actionqueue_core::ids::TaskId>,
) -> Result<(), EngineError> {
tracing::debug!(%task_id, prereq_count = prereqs.len(), "declare_dependency");
self.dispatch.declare_dependency(task_id, prereqs).map_err(EngineError::Dispatch)
}
#[cfg(feature = "budget")]
pub fn allocate_budget(
&mut self,
task_id: actionqueue_core::ids::TaskId,
dimension: actionqueue_core::budget::BudgetDimension,
limit: u64,
) -> Result<(), EngineError> {
tracing::debug!(%task_id, ?dimension, limit, "allocate_budget");
self.dispatch.allocate_budget(task_id, dimension, limit).map_err(EngineError::Dispatch)
}
#[cfg(feature = "budget")]
pub fn replenish_budget(
&mut self,
task_id: actionqueue_core::ids::TaskId,
dimension: actionqueue_core::budget::BudgetDimension,
new_limit: u64,
) -> Result<(), EngineError> {
tracing::debug!(%task_id, ?dimension, new_limit, "replenish_budget");
self.dispatch.replenish_budget(task_id, dimension, new_limit).map_err(EngineError::Dispatch)
}
#[cfg(feature = "budget")]
pub fn budget_remaining(
&self,
task_id: actionqueue_core::ids::TaskId,
dimension: actionqueue_core::budget::BudgetDimension,
) -> Option<u64> {
self.dispatch.budget_remaining(task_id, dimension)
}
#[cfg(feature = "budget")]
pub fn is_budget_exhausted(
&self,
task_id: actionqueue_core::ids::TaskId,
dimension: actionqueue_core::budget::BudgetDimension,
) -> bool {
self.dispatch.is_budget_exhausted(task_id, dimension)
}
#[cfg(feature = "budget")]
pub fn resume_run(&mut self, run_id: actionqueue_core::ids::RunId) -> Result<(), EngineError> {
tracing::debug!(%run_id, "resume_run");
self.dispatch.resume_run(run_id).map_err(EngineError::Dispatch)
}
#[cfg(feature = "budget")]
pub fn create_subscription(
&mut self,
task_id: actionqueue_core::ids::TaskId,
filter: actionqueue_core::subscription::EventFilter,
) -> Result<actionqueue_core::subscription::SubscriptionId, EngineError> {
tracing::debug!(%task_id, "create_subscription");
self.dispatch.create_subscription(task_id, filter).map_err(EngineError::Dispatch)
}
#[cfg(feature = "budget")]
pub fn fire_custom_event(&mut self, key: String) -> Result<(), EngineError> {
tracing::debug!(key, "fire_custom_event");
self.dispatch.fire_custom_event(key).map_err(EngineError::Dispatch)
}
#[cfg(feature = "actor")]
pub fn register_actor(
&mut self,
registration: actionqueue_core::actor::ActorRegistration,
) -> Result<(), EngineError> {
let actor_id = registration.actor_id();
tracing::debug!(%actor_id, "register_actor");
self.dispatch.register_actor(registration).map_err(EngineError::Dispatch)
}
#[cfg(feature = "actor")]
pub fn deregister_actor(
&mut self,
actor_id: actionqueue_core::ids::ActorId,
) -> Result<(), EngineError> {
tracing::debug!(%actor_id, "deregister_actor");
self.dispatch.deregister_actor(actor_id).map_err(EngineError::Dispatch)
}
#[cfg(feature = "actor")]
pub fn actor_heartbeat(
&mut self,
actor_id: actionqueue_core::ids::ActorId,
) -> Result<(), EngineError> {
tracing::trace!(%actor_id, "actor_heartbeat");
self.dispatch.actor_heartbeat(actor_id).map_err(EngineError::Dispatch)
}
#[cfg(feature = "actor")]
pub fn actor_registry(&self) -> &actionqueue_actor::ActorRegistry {
self.dispatch.actor_registry()
}
#[cfg(feature = "platform")]
pub fn create_tenant(
&mut self,
registration: actionqueue_core::platform::TenantRegistration,
) -> Result<(), EngineError> {
let tenant_id = registration.tenant_id();
tracing::debug!(%tenant_id, "create_tenant");
self.dispatch.create_tenant(registration).map_err(EngineError::Dispatch)
}
#[cfg(feature = "platform")]
pub fn assign_role(
&mut self,
actor_id: actionqueue_core::ids::ActorId,
role: actionqueue_core::platform::Role,
tenant_id: actionqueue_core::ids::TenantId,
) -> Result<(), EngineError> {
tracing::debug!(%actor_id, ?role, %tenant_id, "assign_role");
self.dispatch.assign_role(actor_id, role, tenant_id).map_err(EngineError::Dispatch)
}
#[cfg(feature = "platform")]
pub fn grant_capability(
&mut self,
actor_id: actionqueue_core::ids::ActorId,
capability: actionqueue_core::platform::Capability,
tenant_id: actionqueue_core::ids::TenantId,
) -> Result<(), EngineError> {
tracing::debug!(%actor_id, ?capability, %tenant_id, "grant_capability");
self.dispatch
.grant_capability(actor_id, capability, tenant_id)
.map_err(EngineError::Dispatch)
}
#[cfg(feature = "platform")]
pub fn revoke_capability(
&mut self,
actor_id: actionqueue_core::ids::ActorId,
capability: actionqueue_core::platform::Capability,
tenant_id: actionqueue_core::ids::TenantId,
) -> Result<(), EngineError> {
tracing::debug!(%actor_id, ?capability, %tenant_id, "revoke_capability");
self.dispatch
.revoke_capability(actor_id, capability, tenant_id)
.map_err(EngineError::Dispatch)
}
#[cfg(feature = "platform")]
pub fn append_ledger_entry(
&mut self,
entry: actionqueue_core::platform::LedgerEntry,
) -> Result<(), EngineError> {
tracing::debug!(
entry_id = %entry.entry_id(),
tenant_id = %entry.tenant_id(),
ledger_key = entry.ledger_key(),
"append_ledger_entry"
);
self.dispatch.append_ledger_entry(entry).map_err(EngineError::Dispatch)
}
#[cfg(feature = "platform")]
pub fn ledger(&self) -> &actionqueue_platform::AppendLedger {
self.dispatch.ledger()
}
#[cfg(feature = "platform")]
pub fn rbac(&self) -> &actionqueue_platform::RbacEnforcer {
self.dispatch.rbac()
}
#[cfg(feature = "platform")]
pub fn tenant_registry(&self) -> &actionqueue_platform::TenantRegistry {
self.dispatch.tenant_registry()
}
pub async fn drain_and_shutdown(
mut self,
timeout: std::time::Duration,
) -> Result<(), EngineError> {
tracing::info!(timeout_secs = timeout.as_secs(), "drain_and_shutdown starting");
let _ = self.dispatch.drain_until_idle(timeout).await?;
tracing::info!("drain_and_shutdown complete");
Ok(())
}
pub fn shutdown(self) -> Result<(), EngineError> {
tracing::info!("engine shutdown");
Ok(())
}
}