Skip to main content

actionqueue_runtime/
dispatch.rs

1//! Core dispatch loop that composes all engine primitives.
2//!
3//! The dispatch loop orchestrates the full run lifecycle:
4//! promote → select → gate → lease → execute → finish → release.
5//!
6//! Workers execute via `tokio::task::spawn_blocking` and communicate results
7//! back through an unbounded MPSC channel. The dispatch loop owns all WAL
8//! mutation authority exclusively — workers never write to the WAL.
9
10use std::collections::HashMap;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use actionqueue_core::ids::{AttemptId, RunId, TaskId};
15use actionqueue_core::mutation::{
16    AttemptStartCommand, DependencyDeclareCommand, DurabilityPolicy, LeaseAcquireCommand,
17    LeaseHeartbeatCommand, LeaseReleaseCommand, MutationAuthority, MutationCommand,
18    RunCreateCommand, RunStateTransitionCommand, TaskCancelCommand, TaskCreateCommand,
19};
20use actionqueue_core::run::run_instance::{RunInstance, RunInstanceError};
21use actionqueue_core::run::state::RunState;
22use actionqueue_core::task::constraints::ConcurrencyKeyHoldPolicy;
23use actionqueue_core::task::safety::SafetyLevel;
24use actionqueue_core::task::task_spec::TaskSpec;
25use actionqueue_engine::concurrency::key_gate::{ConcurrencyKey, KeyGate, ReleaseResult};
26use actionqueue_engine::derive::DerivationError;
27use actionqueue_engine::index::ready::ReadyIndex;
28use actionqueue_engine::index::scheduled::ScheduledIndex;
29use actionqueue_engine::scheduler::promotion::{
30    promote_scheduled_to_ready_via_authority, AuthorityPromotionError, PromotionParams,
31};
32use actionqueue_engine::scheduler::retry_promotion::promote_retry_wait_to_ready;
33use actionqueue_engine::selection::default_selector::{ready_inputs_from_index, select_ready_runs};
34use actionqueue_engine::time::clock::Clock;
35use actionqueue_executor_local::backoff::BackoffStrategy;
36use actionqueue_executor_local::handler::ExecutorHandler;
37use actionqueue_executor_local::identity::{ExecutorIdentity, LocalExecutorIdentity};
38use actionqueue_executor_local::types::ExecutorRequest;
39use actionqueue_executor_local::{AttemptRunner, SystemAttemptTimer};
40use actionqueue_storage::mutation::authority::{MutationAuthorityError, StorageMutationAuthority};
41use actionqueue_storage::recovery::reducer::{ReplayReducer, ReplayReducerError};
42use actionqueue_storage::snapshot::build::build_snapshot_from_projection;
43use actionqueue_storage::snapshot::mapping::SnapshotMappingError;
44use actionqueue_storage::snapshot::writer::{
45    SnapshotFsWriter, SnapshotWriter, SnapshotWriterError,
46};
47use actionqueue_storage::wal::writer::WalWriter;
48use actionqueue_workflow::children::build_children_snapshot;
49use actionqueue_workflow::dag::DependencyGate;
50use actionqueue_workflow::hierarchy::HierarchyTracker;
51use actionqueue_workflow::submission::{submission_channel, SubmissionChannel, SubmissionReceiver};
52use tokio::sync::mpsc;
53
54use crate::config::BackoffStrategyConfig;
55use crate::worker::{InFlightRun, WorkerResult};
56
57/// Builds a [`DependencyGate`] from the recovery reducer's dependency state.
58///
59/// Called once at `DispatchLoop::new()` to reconstruct the gate from WAL events
60/// that were already replayed during bootstrap. Satisfaction is derived from the
61/// projection's run states (which tasks have at least one Completed run with all
62/// runs terminal). Failure is derived similarly (all runs terminal, none Completed).
63fn build_dependency_gate(projection: &ReplayReducer) -> DependencyGate {
64    use actionqueue_core::run::state::RunState;
65
66    let mut gate = DependencyGate::new();
67
68    // Collect dependency map once for O(1) lookups.
69    let dep_map: std::collections::HashMap<_, _> = projection.dependency_declarations().collect();
70
71    // Populate declarations from WAL events.
72    for (&task_id, prereqs) in &dep_map {
73        // Declarations were already cycle-checked at submission time.
74        if let Err(err) = gate.declare(task_id, prereqs.iter().copied().collect()) {
75            tracing::warn!(%task_id, error = %err, "dependency gate declare failed at bootstrap");
76        }
77    }
78
79    // Derive satisfaction state from current run states using the O(R_task) index.
80    // Collect all unique prerequisite task_ids across all declarations.
81    let all_prereqs: std::collections::HashSet<_> =
82        dep_map.values().flat_map(|prereqs| prereqs.iter().copied()).collect();
83
84    for task_id in all_prereqs {
85        let runs: Vec<_> = projection.runs_for_task(task_id).collect();
86        if runs.is_empty() {
87            continue;
88        }
89        let all_terminal = runs.iter().all(|r| r.state().is_terminal());
90        if !all_terminal {
91            continue;
92        }
93        let has_completed = runs.iter().any(|r| r.state() == RunState::Completed);
94        if has_completed {
95            gate.force_satisfy(task_id);
96        } else {
97            gate.force_fail(task_id);
98        }
99    }
100
101    // Cascade failure from directly-failed prerequisites to all transitive dependents.
102    // Without this, a crash between a prerequisite failing and the cascade being
103    // committed would leave dependent tasks permanently stranded after recovery.
104    let _ = gate.propagate_failures();
105
106    gate
107}
108
109/// Builds a [`HierarchyTracker`] from the recovery reducer's task hierarchy.
110///
111/// Called once at `DispatchLoop::new()` to reconstruct the tracker from WAL events
112/// that were already replayed during bootstrap.
113///
114/// Two-pass approach:
115/// 1. Register all parent-child relationships (terminal_tasks is empty → no orphan errors).
116/// 2. Mark tasks terminal whose runs are all in terminal state or that are canceled with no runs.
117fn build_hierarchy_tracker(projection: &ReplayReducer) -> HierarchyTracker {
118    let mut tracker = HierarchyTracker::new();
119
120    // Pass 1: register all parent-child pairs.
121    // terminal_tasks is empty here, so orphan prevention never fires.
122    for (child_id, parent_id) in projection.parent_child_mappings() {
123        // Depth limit should not be exceeded for valid WAL data;
124        // ignore errors (they indicate a WAL invariant violation, not a tracker bug).
125        let _ = tracker.register_child(parent_id, child_id);
126    }
127
128    // Pass 2: mark tasks terminal based on projection run states using the O(R_task) index.
129    for task_record in projection.task_records() {
130        let task_id = task_record.task_spec().id();
131        let runs: Vec<_> = projection.runs_for_task(task_id).collect();
132        let is_terminal = if runs.is_empty() {
133            projection.is_task_canceled(task_id)
134        } else {
135            runs.iter().all(|r| r.state().is_terminal())
136        };
137        if is_terminal {
138            tracker.mark_terminal(task_id);
139        }
140    }
141
142    tracker
143}
144
145/// Result of a single dispatch tick.
146#[derive(Debug, Clone, Default)]
147#[must_use = "tick result should be inspected for dispatch activity"]
148pub struct TickResult {
149    /// Number of runs promoted from Scheduled to Ready.
150    pub promoted_scheduled: usize,
151    /// Number of runs promoted from RetryWait to Ready.
152    pub promoted_retry_wait: usize,
153    /// Number of runs selected and dispatched for execution.
154    pub dispatched: usize,
155    /// Number of runs that completed (terminal state).
156    pub completed: usize,
157    /// Whether the engine is currently paused.
158    pub engine_paused: bool,
159}
160
161/// Summary of a `run_until_idle` session.
162#[derive(Debug, Clone, Default)]
163#[must_use]
164pub struct RunSummary {
165    /// Total ticks executed.
166    pub ticks: usize,
167    /// Total runs dispatched across all ticks.
168    pub total_dispatched: usize,
169    /// Total runs completed across all ticks.
170    pub total_completed: usize,
171}
172
173/// Concrete authority error type used by the dispatch loop.
174pub type AuthorityError = MutationAuthorityError<ReplayReducerError>;
175
176/// Errors that can occur during dispatch.
177#[derive(Debug)]
178pub enum DispatchError {
179    /// WAL sequence counter overflow.
180    SequenceOverflow,
181    /// A mutation command submitted to the storage authority failed.
182    Authority(AuthorityError),
183    /// Scheduled-to-ready promotion via authority failed.
184    ScheduledPromotion(AuthorityPromotionError<AuthorityError>),
185    /// RetryWait-to-ready promotion failed due to an invalid state transition.
186    RetryPromotion(RunInstanceError),
187    /// Run derivation from a task's run policy failed.
188    Derivation(DerivationError),
189    /// Snapshot build from projection failed.
190    SnapshotBuild(SnapshotMappingError),
191    /// Snapshot I/O failed during write or close.
192    SnapshotWrite(SnapshotWriterError),
193    /// Snapshot writer initialization failed.
194    SnapshotInit(String),
195    /// Internal state inconsistency (e.g., task not found after run transition).
196    StateInconsistency {
197        /// Run that triggered the inconsistency.
198        run_id: RunId,
199        /// Human-readable context for diagnostics.
200        context: String,
201    },
202    /// Backoff strategy configuration is invalid (e.g., base exceeds max).
203    InvalidBackoffConfig,
204    /// Dependency declaration would introduce a cycle in the task DAG.
205    DependencyCycle(actionqueue_workflow::dag::CycleError),
206    /// Retry decision from attempt outcome violated retry invariants.
207    RetryDecision(actionqueue_executor_local::RetryDecisionError),
208    /// A dynamically submitted task was rejected (parent not found or terminal).
209    SubmissionRejected {
210        /// Task that was rejected.
211        task_id: TaskId,
212        /// Human-readable reason for rejection.
213        context: String,
214    },
215}
216
217impl std::fmt::Display for DispatchError {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        match self {
220            DispatchError::SequenceOverflow => write!(f, "WAL sequence counter overflow"),
221            DispatchError::Authority(e) => write!(f, "authority error: {e}"),
222            DispatchError::ScheduledPromotion(e) => {
223                write!(f, "scheduled promotion error: {e}")
224            }
225            DispatchError::RetryPromotion(e) => write!(f, "retry promotion error: {e}"),
226            DispatchError::Derivation(e) => write!(f, "run derivation error: {e}"),
227            DispatchError::SnapshotBuild(e) => write!(f, "snapshot build error: {e}"),
228            DispatchError::SnapshotWrite(e) => write!(f, "snapshot write error: {e}"),
229            DispatchError::SnapshotInit(e) => write!(f, "snapshot init error: {e}"),
230            DispatchError::StateInconsistency { run_id, context } => {
231                write!(f, "state inconsistency for run {run_id}: {context}")
232            }
233            DispatchError::InvalidBackoffConfig => {
234                write!(f, "invalid backoff configuration")
235            }
236            DispatchError::RetryDecision(e) => write!(f, "retry decision error: {e}"),
237            DispatchError::DependencyCycle(e) => write!(f, "dependency cycle: {e}"),
238            DispatchError::SubmissionRejected { task_id, context } => {
239                write!(f, "submission rejected for task {task_id}: {context}")
240            }
241        }
242    }
243}
244
245impl std::error::Error for DispatchError {}
246
247/// The core dispatch loop that composes all engine primitives.
248///
249/// Workers execute handler logic via `tokio::task::spawn_blocking` and send
250/// results back through an unbounded MPSC channel. All WAL mutation authority
251/// remains exclusively owned by the dispatch loop — workers never touch the WAL.
252///
253/// The `I: ExecutorIdentity` generic defaults to `LocalExecutorIdentity` so
254/// existing construction sites don't need changes for v0.x. Sprint 4 remote
255/// actors will supply their own identity via this parameter.
256pub struct DispatchLoop<
257    W: WalWriter,
258    H: ExecutorHandler,
259    C: Clock = actionqueue_engine::time::clock::SystemClock,
260    I: ExecutorIdentity = LocalExecutorIdentity,
261> {
262    authority: StorageMutationAuthority<W, ReplayReducer>,
263    runner: Arc<AttemptRunner<H, SystemAttemptTimer>>,
264    clock: C,
265    /// Identity of the executor acquiring leases.
266    identity: I,
267    key_gate: KeyGate,
268    backoff: Box<dyn BackoffStrategy + Send + Sync>,
269    max_concurrent: usize,
270    lease_timeout_secs: u64,
271    result_tx: mpsc::UnboundedSender<WorkerResult>,
272    result_rx: mpsc::UnboundedReceiver<WorkerResult>,
273    in_flight: HashMap<actionqueue_core::ids::RunId, InFlightRun>,
274    pending_result: Option<WorkerResult>,
275    draining: bool,
276    snapshot_path: Option<PathBuf>,
277    snapshot_event_threshold: Option<u64>,
278    events_since_last_snapshot: u64,
279    /// Sender side of the workflow submission channel (Arc'd, given to handlers).
280    submission_tx: std::sync::Arc<SubmissionChannel>,
281    /// Receiver side of the workflow submission channel (drained each tick).
282    submission_rx: SubmissionReceiver,
283    /// DAG dependency gate — gates Scheduled → Ready promotion.
284    /// Built from the projection at bootstrap and kept in sync as runs complete.
285    dependency_gate: DependencyGate,
286    /// Hierarchy tracker — parent-child task tree for cascade cancellation.
287    /// Built from the projection at bootstrap and kept in sync as tasks are
288    /// created (via handler submissions) and reach terminal state.
289    hierarchy_tracker: HierarchyTracker,
290    /// TaskIds with pending cascade cancellations. Seeded at bootstrap with
291    /// tasks that have `canceled_at` and at least one non-terminal descendant.
292    /// Entries are removed once the cascade completes (self-quenching).
293    pending_hierarchy_cascade: std::collections::HashSet<TaskId>,
294    /// In-memory budget state reconstructed from WAL events at bootstrap.
295    /// Updated each tick as worker results arrive with consumption records.
296    #[cfg(feature = "budget")]
297    budget_tracker: actionqueue_budget::BudgetTracker,
298    /// In-memory subscription state reconstructed from WAL events at bootstrap.
299    /// Consulted for subscription-triggered promotion eligibility and event matching.
300    #[cfg(feature = "budget")]
301    subscription_registry: actionqueue_budget::SubscriptionRegistry,
302    /// Per-task cache of parsed cron::Schedule objects (workflow feature only).
303    /// Avoids re-parsing cron expressions on every tick for cron-policy tasks.
304    #[cfg(feature = "workflow")]
305    cron_schedule_cache: actionqueue_engine::derive::cron::CronScheduleCache,
306    /// In-memory actor registry (actor feature only).
307    #[cfg(feature = "actor")]
308    actor_registry: actionqueue_actor::ActorRegistry,
309    /// Heartbeat monitor for actor timeout detection (actor feature only).
310    #[cfg(feature = "actor")]
311    heartbeat_monitor: actionqueue_actor::HeartbeatMonitor,
312    /// Department grouping registry (actor feature only).
313    #[cfg(feature = "actor")]
314    department_registry: actionqueue_actor::DepartmentRegistry,
315    /// Tenant registry (platform feature only).
316    #[cfg(feature = "platform")]
317    tenant_registry: actionqueue_platform::TenantRegistry,
318    /// RBAC enforcer (platform feature only).
319    #[cfg(feature = "platform")]
320    rbac_enforcer: actionqueue_platform::RbacEnforcer,
321    /// Append ledger (platform feature only).
322    #[cfg(feature = "platform")]
323    ledger: actionqueue_platform::AppendLedger,
324    /// TaskIds that have fully reached terminal state and are pending GC from
325    /// in-memory data structures (DependencyGate, HierarchyTracker, BudgetTracker,
326    /// SubscriptionRegistry, CronScheduleCache). Populated when all runs for a
327    /// task are terminal; drained each tick after cascades are complete.
328    pending_gc_tasks: std::collections::HashSet<TaskId>,
329}
330
331/// Configuration parameters for the dispatch loop that group backoff,
332/// concurrency, lease, and snapshot settings.
333pub struct DispatchConfig {
334    /// Backoff strategy configuration for retry delay computation.
335    pub(crate) backoff_config: BackoffStrategyConfig,
336    /// Maximum number of concurrently executing runs.
337    pub(crate) max_concurrent: usize,
338    /// Lease timeout in seconds for dispatched runs.
339    pub(crate) lease_timeout_secs: u64,
340    /// Filesystem path for snapshot persistence, if enabled.
341    pub(crate) snapshot_path: Option<PathBuf>,
342    /// Number of WAL events between automatic snapshot writes, if enabled.
343    pub(crate) snapshot_event_threshold: Option<u64>,
344}
345
346impl DispatchConfig {
347    /// Creates a new dispatch configuration.
348    pub fn new(
349        backoff_config: BackoffStrategyConfig,
350        max_concurrent: usize,
351        lease_timeout_secs: u64,
352        snapshot_path: Option<PathBuf>,
353        snapshot_event_threshold: Option<u64>,
354    ) -> Self {
355        Self {
356            backoff_config,
357            max_concurrent,
358            lease_timeout_secs,
359            snapshot_path,
360            snapshot_event_threshold,
361        }
362    }
363}
364
365impl<W: WalWriter, H: ExecutorHandler + 'static, C: Clock> DispatchLoop<W, H, C> {
366    /// Creates a new dispatch loop.
367    ///
368    /// # Errors
369    ///
370    /// Returns [`DispatchError::InvalidBackoffConfig`] if the backoff strategy
371    /// configuration is invalid (e.g., exponential base exceeds max).
372    pub fn new(
373        authority: StorageMutationAuthority<W, ReplayReducer>,
374        handler: H,
375        clock: C,
376        config: DispatchConfig,
377    ) -> Result<Self, DispatchError> {
378        let backoff: Box<dyn BackoffStrategy + Send + Sync> = match &config.backoff_config {
379            BackoffStrategyConfig::Fixed { interval } => {
380                Box::new(actionqueue_executor_local::FixedBackoff::new(*interval))
381            }
382            BackoffStrategyConfig::Exponential { base, max } => Box::new(
383                actionqueue_executor_local::ExponentialBackoff::new(*base, *max)
384                    .map_err(|_| DispatchError::InvalidBackoffConfig)?,
385            ),
386        };
387
388        let (result_tx, result_rx) = mpsc::unbounded_channel();
389        let (submission_tx, submission_rx) = submission_channel();
390
391        // Rebuild the dependency gate from projection state (WAL events already applied).
392        let dependency_gate = build_dependency_gate(authority.projection());
393        // Rebuild the hierarchy tracker from task parent_task_id fields.
394        let hierarchy_tracker = build_hierarchy_tracker(authority.projection());
395
396        // Rebuild budget tracker from WAL-sourced projection records.
397        #[cfg(feature = "budget")]
398        let budget_tracker = {
399            let mut tracker = actionqueue_budget::BudgetTracker::new();
400            for ((task_id, dimension), record) in authority.projection().budgets() {
401                tracker.allocate(*task_id, *dimension, record.limit);
402                if record.consumed > 0 {
403                    tracker.consume(*task_id, *dimension, record.consumed);
404                }
405            }
406            tracker
407        };
408
409        // Rebuild subscription registry from WAL-sourced projection records.
410        #[cfg(feature = "budget")]
411        let subscription_registry = {
412            let mut registry = actionqueue_budget::SubscriptionRegistry::new();
413            for (sub_id, record) in authority.projection().subscriptions() {
414                if record.canceled_at.is_none() {
415                    registry.register(*sub_id, record.task_id, record.filter.clone());
416                    if record.triggered_at.is_some() {
417                        registry.trigger(*sub_id);
418                    }
419                }
420            }
421            registry
422        };
423
424        // Seed pending cascade with canceled tasks that may have non-terminal descendants.
425        let pending_hierarchy_cascade: std::collections::HashSet<TaskId> = authority
426            .projection()
427            .task_records()
428            .filter(|tr| tr.canceled_at().is_some())
429            .map(|tr| tr.task_spec().id())
430            .collect();
431
432        // Rebuild actor registry from WAL-sourced projection records.
433        #[cfg(feature = "actor")]
434        let actor_registry = {
435            let mut registry = actionqueue_actor::ActorRegistry::new();
436            for (actor_id, record) in authority.projection().actors() {
437                if record.deregistered_at.is_none() {
438                    let caps = actionqueue_core::actor::ActorCapabilities::new(
439                        record.capabilities.clone(),
440                    )
441                    .unwrap_or_else(|_| {
442                        actionqueue_core::actor::ActorCapabilities::new(vec!["_".to_string()])
443                            .expect("fallback capability")
444                    });
445                    let mut reg = actionqueue_core::actor::ActorRegistration::new(
446                        *actor_id,
447                        record.identity.clone(),
448                        caps,
449                        record.heartbeat_interval_secs,
450                    );
451                    if let Some(tid) = record.tenant_id {
452                        reg = reg.with_tenant(tid);
453                    }
454                    if let Some(dept_str) = &record.department {
455                        if let Ok(dept) = actionqueue_core::ids::DepartmentId::new(dept_str.clone())
456                        {
457                            reg = reg.with_department(dept);
458                        }
459                    }
460                    registry.register(reg);
461                }
462            }
463            registry
464        };
465
466        // Rebuild heartbeat monitor from actor records.
467        #[cfg(feature = "actor")]
468        let heartbeat_monitor = {
469            let mut monitor = actionqueue_actor::HeartbeatMonitor::new();
470            for (actor_id, record) in authority.projection().actors() {
471                if record.deregistered_at.is_none() {
472                    let policy = actionqueue_core::actor::HeartbeatPolicy::with_default_multiplier(
473                        record.heartbeat_interval_secs,
474                    );
475                    let last_beat = record.last_heartbeat_at.unwrap_or(record.registered_at);
476                    monitor.record_registration(*actor_id, policy, last_beat);
477                }
478            }
479            monitor
480        };
481
482        // Rebuild department registry from actor records.
483        #[cfg(feature = "actor")]
484        let department_registry = {
485            let mut registry = actionqueue_actor::DepartmentRegistry::new();
486            for (actor_id, record) in authority.projection().actors() {
487                if record.deregistered_at.is_none() {
488                    if let Some(dept_str) = &record.department {
489                        if let Ok(dept) = actionqueue_core::ids::DepartmentId::new(dept_str.clone())
490                        {
491                            registry.assign(*actor_id, dept);
492                        }
493                    }
494                }
495            }
496            registry
497        };
498
499        // Rebuild tenant registry from WAL-sourced projection records.
500        #[cfg(feature = "platform")]
501        let tenant_registry = {
502            let mut registry = actionqueue_platform::TenantRegistry::new();
503            for (_, record) in authority.projection().tenants() {
504                registry.register(actionqueue_core::platform::TenantRegistration::new(
505                    record.tenant_id,
506                    record.name.clone(),
507                ));
508            }
509            registry
510        };
511
512        // Rebuild RBAC enforcer from WAL-sourced projection records.
513        #[cfg(feature = "platform")]
514        let rbac_enforcer = {
515            let mut enforcer = actionqueue_platform::RbacEnforcer::new();
516            for record in authority.projection().role_assignments() {
517                enforcer.assign_role(record.actor_id, record.role.clone(), record.tenant_id);
518            }
519            for record in authority.projection().capability_grants() {
520                if record.revoked_at.is_none() {
521                    enforcer.grant_capability(
522                        record.actor_id,
523                        record.capability.clone(),
524                        record.tenant_id,
525                    );
526                }
527            }
528            enforcer
529        };
530
531        // Rebuild ledger from WAL-sourced projection records.
532        #[cfg(feature = "platform")]
533        let ledger = {
534            let mut ledger = actionqueue_platform::AppendLedger::new();
535            for record in authority.projection().ledger_entries() {
536                let entry = actionqueue_core::platform::LedgerEntry::new(
537                    record.entry_id,
538                    record.tenant_id,
539                    record.ledger_key.clone(),
540                    record.payload.clone(),
541                    record.timestamp,
542                );
543                let entry =
544                    if let Some(aid) = record.actor_id { entry.with_actor(aid) } else { entry };
545                ledger.append(entry);
546            }
547            ledger
548        };
549
550        Ok(Self {
551            authority,
552            runner: Arc::new(AttemptRunner::new(handler)),
553            clock,
554            identity: LocalExecutorIdentity,
555            key_gate: KeyGate::new(),
556            backoff,
557            max_concurrent: config.max_concurrent,
558            lease_timeout_secs: config.lease_timeout_secs,
559            result_tx,
560            result_rx,
561            in_flight: HashMap::new(),
562            pending_result: None,
563            draining: false,
564            snapshot_path: config.snapshot_path,
565            snapshot_event_threshold: config.snapshot_event_threshold,
566            events_since_last_snapshot: 0,
567            submission_tx,
568            submission_rx,
569            dependency_gate,
570            hierarchy_tracker,
571            pending_hierarchy_cascade,
572            #[cfg(feature = "budget")]
573            budget_tracker,
574            #[cfg(feature = "budget")]
575            subscription_registry,
576            #[cfg(feature = "workflow")]
577            cron_schedule_cache: actionqueue_engine::derive::cron::CronScheduleCache::new(),
578            #[cfg(feature = "actor")]
579            actor_registry,
580            #[cfg(feature = "actor")]
581            heartbeat_monitor,
582            #[cfg(feature = "actor")]
583            department_registry,
584            #[cfg(feature = "platform")]
585            tenant_registry,
586            #[cfg(feature = "platform")]
587            rbac_enforcer,
588            #[cfg(feature = "platform")]
589            ledger,
590            pending_gc_tasks: std::collections::HashSet::new(),
591        })
592    }
593
594    /// Returns a reference to the projection (current state view).
595    pub fn projection(&self) -> &ReplayReducer {
596        self.authority.projection()
597    }
598
599    /// Computes the next WAL sequence number with overflow protection.
600    fn next_sequence(&self) -> Result<u64, DispatchError> {
601        self.authority
602            .projection()
603            .latest_sequence()
604            .checked_add(1)
605            .ok_or(DispatchError::SequenceOverflow)
606    }
607
608    /// Drains all pending task submissions proposed by handlers via
609    /// `ExecutorContext.submission` and processes them through the mutation authority.
610    ///
611    /// Each submission is validated and WAL-appended as a standard TaskCreate +
612    /// RunCreate sequence. Invalid submissions (e.g., nil task ID, terminal parent)
613    /// are logged and dropped — handlers have no error path for submission failures.
614    fn drain_submissions(&mut self, current_time: u64) -> Result<(), DispatchError> {
615        while let Some(submission) = self.submission_rx.try_recv() {
616            match self.process_submission(submission, current_time) {
617                Ok(()) => {}
618                Err(DispatchError::SubmissionRejected { ref task_id, ref context }) => {
619                    tracing::error!(
620                        %task_id,
621                        %context,
622                        "workflow submission rejected; submission dropped"
623                    );
624                }
625                Err(DispatchError::DependencyCycle(ref err)) => {
626                    tracing::error!(
627                        error = %err,
628                        "workflow submission rejected (dependency cycle); submission dropped"
629                    );
630                }
631                Err(fatal) => return Err(fatal),
632            }
633        }
634        Ok(())
635    }
636
637    /// Validates and commits a single handler-proposed task submission.
638    fn process_submission(
639        &mut self,
640        submission: actionqueue_workflow::submission::TaskSubmission,
641        current_time: u64,
642    ) -> Result<(), DispatchError> {
643        let (task_spec, mut dependencies) = submission.into_parts();
644        let task_id = task_spec.id();
645        let parent_task_id = task_spec.parent_task_id();
646
647        // Deduplicate dependencies to avoid redundant WAL entries and graph edges.
648        {
649            let mut seen = std::collections::HashSet::new();
650            dependencies.retain(|id| seen.insert(*id));
651        }
652
653        // Validate parent: must exist and not be terminal (orphan prevention).
654        if let Some(parent_id) = parent_task_id {
655            if self.authority.projection().get_task(&parent_id).is_none() {
656                return Err(DispatchError::SubmissionRejected {
657                    task_id: task_spec.id(),
658                    context: format!("parent {parent_id} not found"),
659                });
660            }
661            if self.hierarchy_tracker.is_terminal(parent_id) {
662                return Err(DispatchError::SubmissionRejected {
663                    task_id: task_spec.id(),
664                    context: format!("parent {parent_id} is terminal (orphan prevention)"),
665                });
666            }
667        }
668
669        // WAL-append the task.
670        let task_seq = self.next_sequence()?;
671        let _ = self
672            .authority
673            .submit_command(
674                actionqueue_core::mutation::MutationCommand::TaskCreate(
675                    actionqueue_core::mutation::TaskCreateCommand::new(
676                        task_seq,
677                        task_spec.clone(),
678                        current_time,
679                    ),
680                ),
681                actionqueue_core::mutation::DurabilityPolicy::Immediate,
682            )
683            .map_err(DispatchError::Authority)?;
684
685        // Derive and WAL-append runs.
686        let already_derived = self.authority.projection().run_ids_for_task(task_id).len() as u32;
687        let derivation = actionqueue_engine::derive::derive_runs(
688            &self.clock,
689            task_id,
690            task_spec.run_policy(),
691            already_derived,
692            current_time,
693        )
694        .map_err(DispatchError::Derivation)?;
695
696        for run in derivation.into_derived() {
697            let run_seq = self.next_sequence()?;
698            let _ = self
699                .authority
700                .submit_command(
701                    actionqueue_core::mutation::MutationCommand::RunCreate(
702                        actionqueue_core::mutation::RunCreateCommand::new(run_seq, run),
703                    ),
704                    actionqueue_core::mutation::DurabilityPolicy::Immediate,
705                )
706                .map_err(DispatchError::Authority)?;
707        }
708
709        // If the submission includes dependency declarations, WAL-append and register.
710        if !dependencies.is_empty() {
711            // Check for cycles BEFORE WAL append so invalid declarations are
712            // never persisted. The read-only check_cycle leaves the gate
713            // unmodified; declare() below is guaranteed to succeed after this.
714            self.dependency_gate
715                .check_cycle(task_id, &dependencies)
716                .map_err(DispatchError::DependencyCycle)?;
717
718            // Reject if any prerequisite is not yet in the projection.
719            // This prevents the mutation authority from fatally rejecting
720            // the DependencyDeclareCommand (UnknownTask), which would crash
721            // the dispatch loop instead of gracefully dropping the submission.
722            for prereq_id in &dependencies {
723                if self.authority.projection().get_task(prereq_id).is_none() {
724                    return Err(DispatchError::SubmissionRejected {
725                        task_id,
726                        context: format!("prerequisite {prereq_id} not yet in projection"),
727                    });
728                }
729            }
730            let dep_seq = self.next_sequence()?;
731            let _ = self
732                .authority
733                .submit_command(
734                    MutationCommand::DependencyDeclare(DependencyDeclareCommand::new(
735                        dep_seq,
736                        task_id,
737                        dependencies.clone(),
738                        current_time,
739                    )),
740                    DurabilityPolicy::Immediate,
741                )
742                .map_err(DispatchError::Authority)?;
743            // Gate declare is guaranteed to succeed after check_cycle above.
744            let _ = self.dependency_gate.declare(task_id, dependencies);
745        }
746
747        // Register parent-child in the hierarchy tracker (after WAL commit).
748        if let Some(parent_id) = parent_task_id {
749            // Validation already passed above; ignore errors here (should not occur).
750            let _ = self.hierarchy_tracker.register_child(parent_id, task_id);
751        }
752
753        tracing::debug!(task_id = %task_id, "workflow submission committed");
754        Ok(())
755    }
756
757    /// Drains completed worker results from the channel and applies
758    /// state transitions via the WAL mutation authority.
759    fn drain_completed_results(
760        &mut self,
761        result: &mut TickResult,
762        current_time: u64,
763    ) -> Result<(), DispatchError> {
764        // Process any result stashed from the async recv in run_until_idle.
765        if let Some(stashed) = self.pending_result.take() {
766            self.process_worker_result(stashed, result, current_time)?;
767        }
768        while let Ok(worker_result) = self.result_rx.try_recv() {
769            self.process_worker_result(worker_result, result, current_time)?;
770        }
771
772        Ok(())
773    }
774
775    fn process_worker_result(
776        &mut self,
777        worker_result: WorkerResult,
778        result: &mut TickResult,
779        current_time: u64,
780    ) -> Result<(), DispatchError> {
781        let run_id = worker_result.run_id;
782        let attempt_id = worker_result.attempt_id;
783
784        tracing::debug!(%run_id, %attempt_id, "worker result received");
785
786        // Record attempt finish via authority
787        let seq = self.next_sequence()?;
788        let finish_cmd =
789            actionqueue_engine::scheduler::attempt_finish::build_attempt_finish_command(
790                seq,
791                run_id,
792                attempt_id,
793                &worker_result.response,
794                current_time,
795            );
796        let _ = actionqueue_engine::scheduler::attempt_finish::submit_attempt_finish_via_authority(
797            finish_cmd,
798            DurabilityPolicy::Immediate,
799            &mut self.authority,
800        )
801        .map_err(|e| DispatchError::Authority(e.into_source()))?;
802
803        // Compute the effective attempt number for retry cap purposes.
804        // Suspended attempts do not count against max_attempts: they are
805        // budget-driven pauses, not failure-driven retries.
806        let suspended_count = self
807            .authority
808            .projection()
809            .get_attempt_history(&run_id)
810            .map(|history| {
811                history
812                    .iter()
813                    .filter(|a| {
814                        a.result() == Some(actionqueue_core::mutation::AttemptResultKind::Suspended)
815                    })
816                    .count() as u32
817            })
818            .unwrap_or(0);
819        let effective_attempt = worker_result.attempt_number.saturating_sub(suspended_count);
820
821        // Determine target state by delegating to the canonical retry decision
822        // function. This gets us defensive validation (rejects N+1 paths, validates
823        // max_attempts >= 1, validates attempt_number >= 1) for free.
824        let outcome_kind =
825            actionqueue_executor_local::AttemptOutcomeKind::from_response(&worker_result.response);
826        let retry_input = actionqueue_executor_local::RetryDecisionInput {
827            run_id,
828            attempt_id,
829            attempt_number: effective_attempt,
830            max_attempts: worker_result.max_attempts,
831            outcome_kind,
832        };
833        let decision = actionqueue_executor_local::retry::decide_retry_transition(&retry_input)
834            .map_err(DispatchError::RetryDecision)?;
835        let new_state = Some(decision.target_state());
836
837        if let Some(target_state) = new_state {
838            tracing::info!(%run_id, ?target_state, "run state transition applied");
839
840            // Release the lease while the run is still in Running state.
841            if let Some(inf) = self.in_flight.get(&run_id) {
842                let seq = self.next_sequence()?;
843                let _ = self
844                    .authority
845                    .submit_command(
846                        MutationCommand::LeaseRelease(LeaseReleaseCommand::new(
847                            seq,
848                            run_id,
849                            self.identity.identity(),
850                            inf.lease_expiry,
851                            current_time,
852                        )),
853                        DurabilityPolicy::Immediate,
854                    )
855                    .map_err(DispatchError::Authority)?;
856            }
857
858            // Apply the state transition. Suspended uses the dedicated RunSuspend
859            // command (which emits a RunSuspended WAL event); all other transitions
860            // use the generic RunStateTransition command.
861            let seq = self.next_sequence()?;
862            if target_state == RunState::Suspended {
863                let _ = self
864                    .authority
865                    .submit_command(
866                        MutationCommand::RunSuspend(
867                            actionqueue_core::mutation::RunSuspendCommand::new(
868                                seq,
869                                run_id,
870                                None,
871                                current_time,
872                            ),
873                        ),
874                        DurabilityPolicy::Immediate,
875                    )
876                    .map_err(DispatchError::Authority)?;
877            } else {
878                let _ = self
879                    .authority
880                    .submit_command(
881                        MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
882                            seq,
883                            run_id,
884                            RunState::Running,
885                            target_state,
886                            current_time,
887                        )),
888                        DurabilityPolicy::Immediate,
889                    )
890                    .map_err(DispatchError::Authority)?;
891            }
892
893            // Capture task_id before the in_flight borrow for use in the gate notification.
894            let task_id = self.in_flight.get(&run_id).map(|inf| inf.task_id);
895
896            // Release concurrency key for terminal runs, on RetryWait if the hold
897            // policy is ReleaseOnRetry, or on Suspended (same semantics as RetryWait).
898            if let Some(inf) = self.in_flight.get(&run_id) {
899                tracing::debug!(
900                    %run_id,
901                    attempt_id = %inf.attempt_id,
902                    attempt_number = inf.attempt_number,
903                    max_attempts = inf.max_attempts,
904                    ?target_state,
905                    "processing run completion"
906                );
907                Self::try_release_concurrency_key(
908                    &self.authority,
909                    &mut self.key_gate,
910                    run_id,
911                    inf.task_id,
912                    target_state,
913                );
914            }
915
916            // Increment completed counter only for terminal state transitions.
917            // Suspended is not terminal — the run may be resumed later.
918            if target_state.is_terminal() {
919                result.completed += 1;
920                if let Some(tid) = task_id {
921                    // Notify the dependency gate so dependents can be unblocked.
922                    self.notify_dependency_gate_terminal(tid, current_time)?;
923                }
924            }
925
926            // Fire events for subscription matching.
927            #[cfg(feature = "budget")]
928            if let Some(tid) = task_id {
929                self.fire_events_for_transition(tid, target_state)?;
930            }
931        }
932
933        // Record budget consumption reported by the handler, if any.
934        // Budget consumption uses Deferred durability for performance.
935        // If a crash occurs after the run's state transition (Immediate)
936        // but before this write flushes, at most one extra dispatch beyond
937        // the budget cap may occur on recovery. Accepted trade-off.
938        #[cfg(feature = "budget")]
939        {
940            use actionqueue_core::mutation::{BudgetConsumeCommand, MutationCommand as MC};
941            if let Some(inf) = self.in_flight.get(&run_id) {
942                let task_id = inf.task_id;
943                for c in &worker_result.consumption {
944                    let seq = self.next_sequence()?;
945                    let _ = self
946                        .authority
947                        .submit_command(
948                            MC::BudgetConsume(BudgetConsumeCommand::new(
949                                seq,
950                                task_id,
951                                c.dimension,
952                                c.amount,
953                                current_time,
954                            )),
955                            DurabilityPolicy::Deferred,
956                        )
957                        .map_err(DispatchError::Authority)?;
958                    self.budget_tracker.consume(task_id, c.dimension, c.amount);
959                }
960                // Fire budget threshold events after consumption.
961                if !worker_result.consumption.is_empty() {
962                    self.fire_budget_threshold_events(task_id)?;
963                }
964            }
965        }
966
967        // Remove from in-flight tracking
968        self.in_flight.remove(&run_id);
969        Ok(())
970    }
971
972    /// Notifies the dependency gate when a task's run reaches a terminal state.
973    ///
974    /// When a task has completed (all runs terminal + at least one Completed),
975    /// marks dependent tasks as eligible. When a task has permanently failed
976    /// (all runs terminal, none Completed), marks dependent tasks as failed
977    /// and cancels their non-terminal runs.
978    fn notify_dependency_gate_terminal(
979        &mut self,
980        task_id: actionqueue_core::ids::TaskId,
981        current_time: u64,
982    ) -> Result<(), DispatchError> {
983        // Check if all runs for this task are now terminal (O(R_task) via index).
984        let all_runs_terminal =
985            self.authority.projection().runs_for_task(task_id).all(|r| r.state().is_terminal());
986
987        if !all_runs_terminal {
988            return Ok(()); // Task still has in-flight or scheduled runs.
989        }
990
991        // All runs are terminal — mark in the hierarchy tracker for orphan prevention.
992        self.hierarchy_tracker.mark_terminal(task_id);
993
994        // Enqueue for GC: clean up in-memory structures once hierarchy cascade is done.
995        self.pending_gc_tasks.insert(task_id);
996
997        // Check if the task has at least one Completed run (O(R_task) via index).
998        let has_completed = self
999            .authority
1000            .projection()
1001            .runs_for_task(task_id)
1002            .any(|r| r.state() == RunState::Completed);
1003
1004        if has_completed {
1005            // Task succeeded — notify gate so dependents become eligible.
1006            // The gate update is in-memory; satisfaction is reconstructed at
1007            // recovery from the projection (which tasks have Completed runs).
1008            let newly_eligible = self.dependency_gate.notify_completed(task_id);
1009            if !newly_eligible.is_empty() {
1010                tracing::debug!(
1011                    task_id = %task_id,
1012                    newly_eligible = newly_eligible.len(),
1013                    "dependency gate: prerequisite satisfied, dependents now eligible"
1014                );
1015            }
1016        } else {
1017            // Task permanently failed — cascade failure and cancel blocked runs.
1018            let newly_blocked = self.dependency_gate.notify_failed(task_id);
1019            for blocked_id in newly_blocked {
1020                tracing::debug!(
1021                    failed_prerequisite = %task_id,
1022                    blocked_task = %blocked_id,
1023                    "dependency gate: cascading failure to dependent task"
1024                );
1025                // Cancel all non-terminal runs of the permanently blocked task (O(R_task)).
1026                let runs_to_cancel: Vec<_> = self
1027                    .authority
1028                    .projection()
1029                    .runs_for_task(blocked_id)
1030                    .filter(|r| !r.state().is_terminal())
1031                    .map(|r| (r.id(), r.state()))
1032                    .collect();
1033                for (run_id, current_state) in runs_to_cancel {
1034                    let seq = self.next_sequence()?;
1035                    let _ = self
1036                        .authority
1037                        .submit_command(
1038                            MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1039                                seq,
1040                                run_id,
1041                                current_state,
1042                                RunState::Canceled,
1043                                current_time,
1044                            )),
1045                            DurabilityPolicy::Immediate,
1046                        )
1047                        .map_err(DispatchError::Authority)?;
1048                }
1049            }
1050        }
1051
1052        Ok(())
1053    }
1054
1055    /// Cascades cancellation from canceled tasks to their non-terminal descendants.
1056    ///
1057    /// Called each tick (step 0c). For each task marked as canceled in the projection
1058    /// that has non-terminal descendants in the hierarchy tracker, this method:
1059    /// 1. Marks the canceled ancestor as terminal in the tracker (its runs were
1060    ///    already canceled by the control API or dependency cascade).
1061    /// 2. Collects all non-terminal descendants via `collect_cancellation_cascade`.
1062    /// 3. WAL-appends `TaskCancel` + `RunStateTransition → Canceled` for each
1063    ///    descendant that is not yet canceled.
1064    /// 4. Marks each newly-canceled descendant as terminal in the tracker.
1065    ///
1066    /// The self-quenching property: once all descendants are terminal, repeated
1067    /// calls return immediately with no WAL writes.
1068    fn cascade_hierarchy_cancellations(&mut self, current_time: u64) -> Result<(), DispatchError> {
1069        let canceled_task_ids: Vec<TaskId> =
1070            self.pending_hierarchy_cascade.iter().copied().collect();
1071
1072        let mut completed_cascades: Vec<TaskId> = Vec::new();
1073
1074        for canceled_id in canceled_task_ids {
1075            // Mark the canceled ancestor as terminal if all its runs are terminal.
1076            let all_runs_terminal = self
1077                .authority
1078                .projection()
1079                .runs_for_task(canceled_id)
1080                .all(|r| r.state().is_terminal());
1081            if all_runs_terminal {
1082                self.hierarchy_tracker.mark_terminal(canceled_id);
1083            }
1084
1085            let cascade = self.hierarchy_tracker.collect_cancellation_cascade(canceled_id);
1086            if cascade.is_empty() {
1087                completed_cascades.push(canceled_id);
1088                continue;
1089            }
1090
1091            for descendant_id in cascade {
1092                tracing::debug!(
1093                    canceled_ancestor = %canceled_id,
1094                    descendant = %descendant_id,
1095                    "hierarchy: cascading cancellation to descendant"
1096                );
1097
1098                // Cancel the descendant task if not yet canceled (idempotent guard).
1099                if !self.authority.projection().is_task_canceled(descendant_id) {
1100                    let seq = self.next_sequence()?;
1101                    let _ = self
1102                        .authority
1103                        .submit_command(
1104                            MutationCommand::TaskCancel(TaskCancelCommand::new(
1105                                seq,
1106                                descendant_id,
1107                                current_time,
1108                            )),
1109                            DurabilityPolicy::Immediate,
1110                        )
1111                        .map_err(DispatchError::Authority)?;
1112                    // Descendant may itself have children — enqueue for cascade.
1113                    self.pending_hierarchy_cascade.insert(descendant_id);
1114                }
1115
1116                // Cancel all non-terminal runs of the descendant (O(R_task) via index).
1117                let runs_to_cancel: Vec<_> = self
1118                    .authority
1119                    .projection()
1120                    .runs_for_task(descendant_id)
1121                    .filter(|r| !r.state().is_terminal())
1122                    .map(|r| (r.id(), r.state()))
1123                    .collect();
1124
1125                for (run_id, prev_state) in runs_to_cancel {
1126                    let seq = self.next_sequence()?;
1127                    let _ = self
1128                        .authority
1129                        .submit_command(
1130                            MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1131                                seq,
1132                                run_id,
1133                                prev_state,
1134                                RunState::Canceled,
1135                                current_time,
1136                            )),
1137                            DurabilityPolicy::Immediate,
1138                        )
1139                        .map_err(DispatchError::Authority)?;
1140
1141                    // Release concurrency key for cascade-canceled runs.
1142                    // Without this, a Suspended run with HoldDuringRetry policy
1143                    // would permanently leak its concurrency key slot.
1144                    Self::try_release_concurrency_key(
1145                        &self.authority,
1146                        &mut self.key_gate,
1147                        run_id,
1148                        descendant_id,
1149                        RunState::Canceled,
1150                    );
1151                }
1152
1153                // All descendant runs are now canceled — mark terminal in tracker.
1154                self.hierarchy_tracker.mark_terminal(descendant_id);
1155            }
1156        }
1157
1158        // Remove completed cascades (tasks whose cascade returned empty).
1159        for task_id in completed_cascades {
1160            self.pending_hierarchy_cascade.remove(&task_id);
1161        }
1162
1163        Ok(())
1164    }
1165
1166    /// Garbage-collects fully-terminal tasks from in-memory data structures.
1167    ///
1168    /// Called each tick (step 0c-gc), after hierarchy cascades complete. For each
1169    /// task in `pending_gc_tasks` whose cascade has quenched (no non-terminal
1170    /// descendants), removes it from the DependencyGate, HierarchyTracker,
1171    /// BudgetTracker, SubscriptionRegistry, and CronScheduleCache.
1172    fn gc_terminal_tasks(&mut self) {
1173        let candidates: Vec<TaskId> = self.pending_gc_tasks.iter().copied().collect();
1174
1175        for task_id in candidates {
1176            // Only GC once the hierarchy cascade has fully quenched for this task.
1177            // If collect_cancellation_cascade returns non-empty, some descendants
1178            // are still non-terminal; skip for now.
1179            if !self.hierarchy_tracker.collect_cancellation_cascade(task_id).is_empty() {
1180                continue;
1181            }
1182
1183            self.pending_gc_tasks.remove(&task_id);
1184
1185            // GC the task from all in-memory structures.
1186            self.dependency_gate.gc_task(task_id);
1187            self.hierarchy_tracker.gc_subtree(task_id);
1188
1189            #[cfg(feature = "budget")]
1190            self.budget_tracker.gc_task(task_id);
1191
1192            #[cfg(feature = "budget")]
1193            self.subscription_registry.gc_task(task_id);
1194
1195            #[cfg(feature = "workflow")]
1196            self.cron_schedule_cache.remove(task_id);
1197        }
1198    }
1199
1200    /// Derives new cron runs to maintain the rolling window.
1201    ///
1202    /// Called each tick (step 0d). For each task with [`RunPolicy::Cron`], checks
1203    /// how many non-terminal runs exist. If below [`actionqueue_engine::derive::cron::CRON_WINDOW_SIZE`],
1204    /// derives additional runs via [`actionqueue_engine::derive::cron::derive_cron`] and WAL-appends them.
1205    ///
1206    /// Respects `CronPolicy::max_occurrences`: stops deriving once the total
1207    /// run count for the task reaches the configured maximum.
1208    ///
1209    /// Self-quenching: tasks with no derivable occurrences remaining are skipped.
1210    #[cfg(feature = "workflow")]
1211    fn derive_cron_runs(&mut self, current_time: u64) -> Result<(), DispatchError> {
1212        use actionqueue_core::task::run_policy::RunPolicy;
1213        use actionqueue_engine::derive::cron::{derive_cron_cached, CRON_WINDOW_SIZE};
1214
1215        // Collect cron task IDs and their policies without holding the projection borrow.
1216        let cron_tasks: Vec<(TaskId, actionqueue_core::task::run_policy::CronPolicy)> = self
1217            .authority
1218            .projection()
1219            .task_records()
1220            .filter_map(|tr| {
1221                if let RunPolicy::Cron(ref policy) = *tr.task_spec().run_policy() {
1222                    Some((tr.task_spec().id(), policy.clone()))
1223                } else {
1224                    None
1225                }
1226            })
1227            .collect();
1228
1229        for (task_id, policy) in cron_tasks {
1230            // Skip canceled tasks.
1231            if self.authority.projection().is_task_canceled(task_id) {
1232                continue;
1233            }
1234
1235            // Clone run instances so we don't hold a borrow from self.authority
1236            // while calling self.cron_schedule_cache (different field, but method
1237            // calls take &self and Rust can't always split-borrow through them).
1238            let all_runs: Vec<RunInstance> =
1239                self.authority.projection().runs_for_task(task_id).cloned().collect();
1240
1241            let total_derived = u32::try_from(all_runs.len()).unwrap_or(u32::MAX);
1242            let non_terminal_count =
1243                u32::try_from(all_runs.iter().filter(|r| !r.state().is_terminal()).count())
1244                    .unwrap_or(u32::MAX);
1245
1246            // Check max_occurrences cap.
1247            if let Some(max) = policy.max_occurrences() {
1248                if total_derived >= max {
1249                    continue; // All allowed occurrences already derived.
1250                }
1251            }
1252
1253            let to_derive = CRON_WINDOW_SIZE.saturating_sub(non_terminal_count);
1254            if to_derive == 0 {
1255                continue;
1256            }
1257
1258            // Cap to_derive by remaining max_occurrences budget.
1259            let to_derive = if let Some(max) = policy.max_occurrences() {
1260                to_derive.min(max.saturating_sub(total_derived))
1261            } else {
1262                to_derive
1263            };
1264            if to_derive == 0 {
1265                continue;
1266            }
1267
1268            // Find the latest scheduled_at among all existing runs for this task.
1269            // New occurrences are derived strictly after this timestamp, preventing
1270            // duplicate runs for already-scheduled time slots.
1271            let last_scheduled_at = all_runs
1272                .iter()
1273                .map(|r| r.scheduled_at())
1274                .max()
1275                .unwrap_or_else(|| current_time.saturating_sub(1));
1276
1277            // Two-phase cache access to avoid borrow-checker conflicts:
1278            // Phase 1: ensure schedule is cached (mutable borrow ends after this call).
1279            self.cron_schedule_cache.ensure(task_id, &policy);
1280            // Phase 2: immutable borrow of cache during derive only.
1281            let schedule =
1282                self.cron_schedule_cache.get(task_id).expect("schedule was just ensured");
1283            let new_runs =
1284                derive_cron_cached(task_id, schedule, last_scheduled_at, current_time, to_derive)
1285                    .map_err(DispatchError::Derivation)?;
1286
1287            if new_runs.is_empty() {
1288                continue; // No upcoming occurrences (finite schedule exhausted).
1289            }
1290
1291            tracing::debug!(
1292                %task_id,
1293                count = new_runs.len(),
1294                "cron: deriving rolling window runs"
1295            );
1296
1297            for run in new_runs {
1298                let seq = self.next_sequence()?;
1299                let _ = self
1300                    .authority
1301                    .submit_command(
1302                        MutationCommand::RunCreate(RunCreateCommand::new(seq, run)),
1303                        DurabilityPolicy::Immediate,
1304                    )
1305                    .map_err(DispatchError::Authority)?;
1306            }
1307        }
1308
1309        Ok(())
1310    }
1311
1312    /// Heartbeats in-flight leases approaching expiry.
1313    fn heartbeat_in_flight_leases(&mut self, current_time: u64) -> Result<(), DispatchError> {
1314        // Heartbeat when 1/3 of lease time remains. Integer division truncates;
1315        // the minimum threshold of 1 second prevents a zero-second threshold for
1316        // very short leases (lease_timeout_secs validated >= 3 in RuntimeConfig).
1317        let heartbeat_threshold = (self.lease_timeout_secs / 3).max(1);
1318        let run_ids_needing_heartbeat: Vec<actionqueue_core::ids::RunId> = self
1319            .in_flight
1320            .values()
1321            .filter(|inf| current_time.saturating_add(heartbeat_threshold) >= inf.lease_expiry)
1322            .map(|inf| inf.run_id)
1323            .collect();
1324
1325        for run_id in run_ids_needing_heartbeat {
1326            let new_expiry = current_time.saturating_add(self.lease_timeout_secs);
1327            let (attempt_id, attempt_number, max_attempts) = self
1328                .in_flight
1329                .get(&run_id)
1330                .map(|inf| (inf.attempt_id, inf.attempt_number, inf.max_attempts))
1331                .unwrap_or_default();
1332            tracing::debug!(
1333                %run_id, %attempt_id, attempt_number, max_attempts,
1334                new_expiry, "lease heartbeat extended"
1335            );
1336            let seq = self.next_sequence()?;
1337            let _ = self
1338                .authority
1339                .submit_command(
1340                    MutationCommand::LeaseHeartbeat(LeaseHeartbeatCommand::new(
1341                        seq,
1342                        run_id,
1343                        self.identity.identity(),
1344                        new_expiry,
1345                        current_time,
1346                    )),
1347                    DurabilityPolicy::Immediate,
1348                )
1349                .map_err(DispatchError::Authority)?;
1350
1351            if let Some(inf) = self.in_flight.get_mut(&run_id) {
1352                inf.lease_expiry = new_expiry;
1353            }
1354        }
1355
1356        Ok(())
1357    }
1358
1359    /// Attempts to release the concurrency key for a run entering a terminal or
1360    /// RetryWait state, depending on the task's hold policy.
1361    fn try_release_concurrency_key(
1362        authority: &StorageMutationAuthority<W, ReplayReducer>,
1363        key_gate: &mut KeyGate,
1364        run_id: RunId,
1365        task_id: TaskId,
1366        target_state: RunState,
1367    ) {
1368        let should_release = if target_state.is_terminal() {
1369            true
1370        } else if target_state == RunState::RetryWait || target_state == RunState::Suspended {
1371            // Suspended follows the same hold policy as RetryWait: the run is paused
1372            // and may resume, so whether the key is held depends on the task's policy.
1373            authority
1374                .projection()
1375                .get_task(&task_id)
1376                .map(|task| {
1377                    task.constraints().concurrency_key_hold_policy()
1378                        == ConcurrencyKeyHoldPolicy::ReleaseOnRetry
1379                })
1380                .unwrap_or(false)
1381        } else {
1382            return;
1383        };
1384
1385        if !should_release {
1386            return;
1387        }
1388
1389        let Some(task) = authority.projection().get_task(&task_id) else {
1390            tracing::warn!(%run_id, %task_id, "skipping key release: task not found");
1391            return;
1392        };
1393
1394        let Some(key_str) = task.constraints().concurrency_key() else {
1395            return; // No concurrency key — nothing to release
1396        };
1397
1398        let key = ConcurrencyKey::new(key_str);
1399        match key_gate.release(key, run_id) {
1400            ReleaseResult::Released { .. } => {}
1401            ReleaseResult::NotHeld { key: k, attempting_run_id } => {
1402                tracing::warn!(
1403                    %attempting_run_id, key = %k,
1404                    "concurrency key release failed — key not held by this run"
1405                );
1406            }
1407        }
1408    }
1409
1410    /// Advances the state machine one step.
1411    ///
1412    /// A tick performs:
1413    /// 0a. Drain workflow submissions from handlers (non-blocking)
1414    /// 0b. Drain completed worker results (non-blocking)
1415    /// 0c. Cascade hierarchy cancellations to descendants of canceled tasks
1416    /// 0c-gc. GC terminal tasks from in-memory data structures
1417    /// 0d. Derive new cron runs to maintain the rolling window (workflow feature)
1418    /// 0f. Check actor heartbeat timeouts (actor feature)
1419    /// 1. Heartbeat in-flight leases approaching expiry
1420    /// 2. Check engine paused state — skip if paused
1421    /// 3. Promote Scheduled → Ready (time-based)
1422    /// 4. Promote RetryWait → Ready (backoff-based)
1423    /// 5. Select ready runs (priority-FIFO-RunId)
1424    /// 6. For each selected: check concurrency key gate → lease → dispatch (spawn worker)
1425    pub async fn tick(&mut self) -> Result<TickResult, DispatchError> {
1426        tracing::trace!("dispatch tick starting");
1427        let mut result = TickResult::default();
1428        let current_time = self.clock.now();
1429        let seq_before_tick = self.authority.projection().latest_sequence();
1430
1431        // Step 0a: Drain workflow submissions proposed by handlers
1432        self.drain_submissions(current_time)?;
1433
1434        // Step 0b: Drain completed worker results
1435        self.drain_completed_results(&mut result, current_time)?;
1436
1437        // Step 0e: Signal cancellation to in-flight runs whose budget is exhausted.
1438        #[cfg(feature = "budget")]
1439        self.signal_budget_exhaustion_cancellations();
1440
1441        // Step 0c: Cascade hierarchy cancellations from canceled tasks to descendants
1442        self.cascade_hierarchy_cancellations(current_time)?;
1443
1444        // Step 0c-gc: GC terminal tasks from in-memory data structures.
1445        // Runs after cascade so that descendants' GC is not triggered prematurely.
1446        self.gc_terminal_tasks();
1447
1448        // Step 0d: Derive new cron runs to maintain the rolling window
1449
1450        // Step 0f: Check actor heartbeat timeouts (actor feature only).
1451        #[cfg(feature = "actor")]
1452        self.check_actor_heartbeat_timeouts()?;
1453        #[cfg(feature = "workflow")]
1454        self.derive_cron_runs(current_time)?;
1455
1456        // Step 1: Heartbeat in-flight leases approaching expiry
1457        self.heartbeat_in_flight_leases(current_time)?;
1458
1459        // Step 2: Check engine paused state or draining mode — skip promotion and dispatch.
1460        if self.authority.projection().is_engine_paused() {
1461            result.engine_paused = true;
1462            let events_this_tick =
1463                self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1464            self.events_since_last_snapshot += events_this_tick;
1465            self.maybe_write_snapshot(current_time)?;
1466            return Ok(result);
1467        }
1468
1469        if self.draining {
1470            let events_this_tick =
1471                self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1472            self.events_since_last_snapshot += events_this_tick;
1473            self.maybe_write_snapshot(current_time)?;
1474            return Ok(result);
1475        }
1476
1477        // Step 3: Promote Scheduled → Ready (time-based + dependency gate)
1478        // Runs whose task has unsatisfied dependencies stay in Scheduled
1479        // until the dependency gate marks them eligible.
1480        let scheduled_runs: Vec<RunInstance> = self
1481            .authority
1482            .projection()
1483            .run_instances()
1484            .filter(|r| r.state() == RunState::Scheduled)
1485            .filter(|r| self.dependency_gate.is_eligible(r.task_id()))
1486            .cloned()
1487            .collect();
1488
1489        if !scheduled_runs.is_empty() {
1490            let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
1491            let promo_result = promote_scheduled_to_ready_via_authority(
1492                &scheduled_index,
1493                PromotionParams::new(
1494                    current_time,
1495                    self.next_sequence()?,
1496                    current_time,
1497                    DurabilityPolicy::Immediate,
1498                ),
1499                &mut self.authority,
1500            )
1501            .map_err(DispatchError::ScheduledPromotion)?;
1502            result.promoted_scheduled = promo_result.outcomes().len();
1503        }
1504
1505        // Step 3b: Promote Scheduled → Ready for subscription-triggered tasks
1506        // (bypasses the scheduled_at time check).
1507        #[cfg(feature = "budget")]
1508        {
1509            let promoted = self.promote_subscription_triggered_scheduled(current_time)?;
1510            result.promoted_scheduled += promoted;
1511        }
1512
1513        // Step 4: Promote RetryWait → Ready (backoff-based)
1514        let retry_waiting: Vec<RunInstance> = self
1515            .authority
1516            .projection()
1517            .run_instances()
1518            .filter(|r| r.state() == RunState::RetryWait)
1519            .cloned()
1520            .collect();
1521
1522        if !retry_waiting.is_empty() {
1523            let promo = promote_retry_wait_to_ready(&retry_waiting, current_time, &*self.backoff)
1524                .map_err(DispatchError::RetryPromotion)?;
1525            for run in promo.promoted() {
1526                let seq = self.next_sequence()?;
1527                let _ = self
1528                    .authority
1529                    .submit_command(
1530                        MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1531                            seq,
1532                            run.id(),
1533                            RunState::RetryWait,
1534                            RunState::Ready,
1535                            current_time,
1536                        )),
1537                        DurabilityPolicy::Immediate,
1538                    )
1539                    .map_err(DispatchError::Authority)?;
1540            }
1541            result.promoted_retry_wait = promo.promoted().len();
1542        }
1543
1544        // Step 5: Select ready runs
1545        let ready_runs: Vec<RunInstance> = self
1546            .authority
1547            .projection()
1548            .run_instances()
1549            .filter(|r| r.state() == RunState::Ready)
1550            .cloned()
1551            .collect();
1552
1553        if ready_runs.is_empty() {
1554            let events_this_tick =
1555                self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1556            self.events_since_last_snapshot += events_this_tick;
1557            self.maybe_write_snapshot(current_time)?;
1558            return Ok(result);
1559        }
1560
1561        let ready_index = ReadyIndex::from_runs(ready_runs);
1562        let inputs = ready_inputs_from_index(&ready_index);
1563        let selection = select_ready_runs(&inputs);
1564
1565        // Step 6: For each selected, check concurrency gate → lease → dispatch
1566        let available_slots = self.max_concurrent.saturating_sub(self.in_flight.len());
1567        let mut dispatched = 0usize;
1568        for run in selection.into_selected() {
1569            if dispatched >= available_slots {
1570                break;
1571            }
1572
1573            // Look up task ONCE, before any state transitions
1574            let task = match self.authority.projection().get_task(&run.task_id()) {
1575                Some(t) => t,
1576                None => {
1577                    tracing::warn!(
1578                        run_id = %run.id(),
1579                        task_id = %run.task_id(),
1580                        "skipping run: parent task not found in projection"
1581                    );
1582                    continue;
1583                }
1584            };
1585
1586            // Cache payload and constraints before state transitions
1587            let payload = task.payload().to_vec();
1588            let constraints = task.constraints().clone();
1589
1590            // Step 6a: Check budget gate — skip if any dimension is exhausted.
1591            #[cfg(feature = "budget")]
1592            {
1593                let gate = actionqueue_budget::BudgetGate::new(&self.budget_tracker);
1594                if !gate.can_dispatch(run.task_id()) {
1595                    tracing::debug!(
1596                        run_id = %run.id(),
1597                        task_id = %run.task_id(),
1598                        "skipping run: budget exhausted"
1599                    );
1600                    continue;
1601                }
1602            }
1603
1604            // Check concurrency key gate (using cached task)
1605            let acquired_key = if let Some(key_str) = constraints.concurrency_key() {
1606                let key = actionqueue_engine::concurrency::key_gate::ConcurrencyKey::new(key_str);
1607                match self.key_gate.acquire(key.clone(), run.id()) {
1608                    actionqueue_engine::concurrency::key_gate::AcquireResult::Acquired {
1609                        ..
1610                    } => Some(key),
1611                    actionqueue_engine::concurrency::key_gate::AcquireResult::Occupied {
1612                        ..
1613                    } => continue,
1614                }
1615            } else {
1616                None
1617            };
1618
1619            // Dispatch the run through WAL commands. If any step fails after
1620            // key acquisition, release the key before propagating the error
1621            // to avoid permanently blocking the concurrency key.
1622            match self.dispatch_single_run(&run, &constraints, current_time) {
1623                Ok((attempt_id, lease_expiry, attempt_number, max_attempts)) => {
1624                    // Advisory warning: Transactional tasks with retries may cause
1625                    // duplicate side effects.
1626                    if constraints.safety_level() == SafetyLevel::Transactional && max_attempts > 1
1627                    {
1628                        tracing::warn!(
1629                            run_id = %run.id(),
1630                            max_attempts,
1631                            "task has Transactional safety level with retries — retries may \
1632                             cause duplicate side effects"
1633                        );
1634                    }
1635
1636                    // Track in-flight
1637                    let run_id = run.id();
1638                    let task_id = run.task_id();
1639
1640                    // Create cancellation context BEFORE spawn_blocking so
1641                    // the dispatch loop retains a clone for in-flight
1642                    // suspension signaling (e.g. budget exhaustion).
1643                    #[cfg(feature = "budget")]
1644                    let (cancellation_ctx, cancel_ctx_clone) = {
1645                        let ctx = actionqueue_executor_local::handler::CancellationContext::new();
1646                        let clone = Some(ctx.clone());
1647                        (Some(ctx), clone)
1648                    };
1649                    #[cfg(not(feature = "budget"))]
1650                    let cancellation_ctx: Option<
1651                        actionqueue_executor_local::handler::CancellationContext,
1652                    > = None;
1653
1654                    self.in_flight.insert(
1655                        run_id,
1656                        InFlightRun {
1657                            run_id,
1658                            attempt_id,
1659                            task_id,
1660                            lease_expiry,
1661                            attempt_number,
1662                            max_attempts,
1663                            #[cfg(feature = "budget")]
1664                            cancellation_context: cancel_ctx_clone,
1665                        },
1666                    );
1667
1668                    // Build workflow context: submission channel and children snapshot.
1669                    // The submission channel allows handlers to propose child tasks;
1670                    // the children snapshot gives Coordinator handlers a point-in-time
1671                    // view of their children's states (both are None for non-workflow tasks).
1672                    let submission = Some(std::sync::Arc::clone(&self.submission_tx)
1673                        as std::sync::Arc<dyn actionqueue_executor_local::TaskSubmissionPort>);
1674                    let children = build_children_snapshot(self.authority.projection(), task_id);
1675
1676                    // Spawn worker via spawn_blocking with timeout enforcement.
1677                    tracing::info!(
1678                        %run_id, %attempt_id, attempt_number,
1679                        "spawning handler for attempt"
1680                    );
1681                    let runner = Arc::clone(&self.runner);
1682                    let result_tx = self.result_tx.clone();
1683
1684                    tokio::task::spawn_blocking(move || {
1685                        let request = ExecutorRequest {
1686                            run_id,
1687                            attempt_id,
1688                            payload,
1689                            constraints,
1690                            attempt_number,
1691                            submission,
1692                            children,
1693                            cancellation_context: cancellation_ctx,
1694                        };
1695                        let outcome = runner.run_attempt(request);
1696
1697                        let worker_result = WorkerResult {
1698                            run_id,
1699                            attempt_id,
1700                            response: outcome.response,
1701                            max_attempts,
1702                            attempt_number,
1703                            consumption: outcome.consumption,
1704                        };
1705
1706                        if result_tx.send(worker_result).is_err() {
1707                            tracing::error!(
1708                                %run_id,
1709                                "worker result channel closed — dispatch loop may have crashed"
1710                            );
1711                        }
1712                    });
1713
1714                    dispatched += 1;
1715                }
1716                Err(e) => {
1717                    // Release concurrency key on dispatch failure to avoid
1718                    // permanently blocking future runs with the same key.
1719                    if let Some(key) = acquired_key {
1720                        let _ = self.key_gate.release(key, run.id());
1721                    }
1722                    tracing::error!(
1723                        run_id = %run.id(),
1724                        error = %e,
1725                        "dispatch failed for run, skipping"
1726                    );
1727                    continue;
1728                }
1729            }
1730        }
1731
1732        result.dispatched = dispatched;
1733
1734        // Snapshot check: count events appended during this tick and write
1735        // a snapshot when the cumulative count exceeds the threshold.
1736        let events_this_tick =
1737            self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1738        self.events_since_last_snapshot += events_this_tick;
1739        self.maybe_write_snapshot(current_time)?;
1740
1741        Ok(result)
1742    }
1743
1744    /// Dispatches a single run through the WAL command sequence:
1745    /// Ready→Leased transition, lease acquire, Leased→Running transition, attempt start.
1746    ///
1747    /// Returns (attempt_id, lease_expiry, attempt_number, max_attempts) on success.
1748    fn dispatch_single_run(
1749        &mut self,
1750        run: &RunInstance,
1751        constraints: &actionqueue_core::task::constraints::TaskConstraints,
1752        current_time: u64,
1753    ) -> Result<(AttemptId, u64, u32, u32), DispatchError> {
1754        // Transition Ready → Leased
1755        let seq = self.next_sequence()?;
1756        let _ = self
1757            .authority
1758            .submit_command(
1759                MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1760                    seq,
1761                    run.id(),
1762                    RunState::Ready,
1763                    RunState::Leased,
1764                    current_time,
1765                )),
1766                DurabilityPolicy::Immediate,
1767            )
1768            .map_err(DispatchError::Authority)?;
1769
1770        // Acquire lease
1771        let lease_expiry = current_time.saturating_add(self.lease_timeout_secs);
1772        let seq = self.next_sequence()?;
1773        let _ = self
1774            .authority
1775            .submit_command(
1776                MutationCommand::LeaseAcquire(LeaseAcquireCommand::new(
1777                    seq,
1778                    run.id(),
1779                    self.identity.identity(),
1780                    lease_expiry,
1781                    current_time,
1782                )),
1783                DurabilityPolicy::Immediate,
1784            )
1785            .map_err(DispatchError::Authority)?;
1786
1787        // Transition Leased → Running
1788        let seq = self.next_sequence()?;
1789        let _ = self
1790            .authority
1791            .submit_command(
1792                MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1793                    seq,
1794                    run.id(),
1795                    RunState::Leased,
1796                    RunState::Running,
1797                    current_time,
1798                )),
1799                DurabilityPolicy::Immediate,
1800            )
1801            .map_err(DispatchError::Authority)?;
1802
1803        // Record attempt start
1804        let attempt_id = AttemptId::new();
1805        let seq = self.next_sequence()?;
1806        let _ = self
1807            .authority
1808            .submit_command(
1809                MutationCommand::AttemptStart(AttemptStartCommand::new(
1810                    seq,
1811                    run.id(),
1812                    attempt_id,
1813                    current_time,
1814                )),
1815                DurabilityPolicy::Immediate,
1816            )
1817            .map_err(DispatchError::Authority)?;
1818
1819        let max_attempts = constraints.max_attempts();
1820        let attempt_number =
1821            run.attempt_count().checked_add(1).ok_or(DispatchError::SequenceOverflow)?;
1822
1823        Ok((attempt_id, lease_expiry, attempt_number, max_attempts))
1824    }
1825
1826    /// Writes a snapshot if the cumulative event count exceeds the configured threshold.
1827    fn maybe_write_snapshot(&mut self, current_time: u64) -> Result<(), DispatchError> {
1828        let threshold = match self.snapshot_event_threshold {
1829            Some(t) => t,
1830            None => return Ok(()),
1831        };
1832        let path = match &self.snapshot_path {
1833            Some(p) => p.clone(),
1834            None => return Ok(()),
1835        };
1836        if self.events_since_last_snapshot < threshold {
1837            return Ok(());
1838        }
1839
1840        let snapshot = build_snapshot_from_projection(self.authority.projection(), current_time)
1841            .map_err(DispatchError::SnapshotBuild)?;
1842
1843        let mut writer =
1844            SnapshotFsWriter::new(path).map_err(|e| DispatchError::SnapshotInit(format!("{e}")))?;
1845        writer.write(&snapshot).map_err(DispatchError::SnapshotWrite)?;
1846        writer.close().map_err(DispatchError::SnapshotWrite)?;
1847
1848        self.events_since_last_snapshot = 0;
1849        tracing::info!(
1850            wal_sequence = snapshot.metadata.wal_sequence,
1851            task_count = snapshot.metadata.task_count,
1852            run_count = snapshot.metadata.run_count,
1853            "automatic snapshot written"
1854        );
1855
1856        Ok(())
1857    }
1858
1859    /// Cancels all non-terminal runs of tasks whose dependencies have permanently
1860    /// failed. Called once after bootstrap to close the recovery gap where a crash
1861    /// occurred between a prerequisite failing and the cascade cancellation being
1862    /// committed.
1863    fn cancel_dependency_failed_runs(&mut self, current_time: u64) -> Result<(), DispatchError> {
1864        let failed_tasks: Vec<TaskId> = self
1865            .authority
1866            .projection()
1867            .task_records()
1868            .map(|tr| tr.task_spec().id())
1869            .filter(|&tid| self.dependency_gate.is_dependency_failed(tid))
1870            .collect();
1871        for task_id in failed_tasks {
1872            let runs_to_cancel: Vec<_> = self
1873                .authority
1874                .projection()
1875                .runs_for_task(task_id)
1876                .filter(|r| !r.state().is_terminal())
1877                .map(|r| (r.id(), r.state()))
1878                .collect();
1879            for (run_id, current_state) in runs_to_cancel {
1880                let seq = self.next_sequence()?;
1881                let _ = self
1882                    .authority
1883                    .submit_command(
1884                        MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1885                            seq,
1886                            run_id,
1887                            current_state,
1888                            RunState::Canceled,
1889                            current_time,
1890                        )),
1891                        DurabilityPolicy::Immediate,
1892                    )
1893                    .map_err(DispatchError::Authority)?;
1894            }
1895        }
1896        Ok(())
1897    }
1898
1899    /// Loops `tick()` until no work remains (no in-flight, no promotions, no dispatches).
1900    pub async fn run_until_idle(&mut self) -> Result<RunSummary, DispatchError> {
1901        let mut summary = RunSummary::default();
1902
1903        let current_time = self.clock.now();
1904        self.cancel_dependency_failed_runs(current_time)?;
1905
1906        loop {
1907            let tick = self.tick().await?;
1908            summary.ticks += 1;
1909            summary.total_dispatched += tick.dispatched;
1910            summary.total_completed += tick.completed;
1911
1912            // Idle when no promotions happened, nothing was dispatched,
1913            // and no in-flight work remains
1914            if tick.promoted_scheduled == 0
1915                && tick.promoted_retry_wait == 0
1916                && tick.dispatched == 0
1917                && self.in_flight.is_empty()
1918            {
1919                break;
1920            }
1921
1922            // If there are in-flight tasks but nothing else to do, wait for
1923            // a worker result instead of spinning. The result is stashed in
1924            // pending_result so drain_completed_results() processes it on
1925            // the next tick iteration.
1926            if tick.promoted_scheduled == 0
1927                && tick.promoted_retry_wait == 0
1928                && tick.dispatched == 0
1929                && !self.in_flight.is_empty()
1930            {
1931                if let Some(worker_result) = self.result_rx.recv().await {
1932                    self.pending_result = Some(worker_result);
1933                } else {
1934                    // All worker senders dropped — no more results possible.
1935                    // Clear in_flight to prevent infinite loop.
1936                    tracing::warn!(
1937                        orphaned_runs = self.in_flight.len(),
1938                        "worker result channel closed with in-flight runs"
1939                    );
1940                    self.in_flight.clear();
1941                    break;
1942                }
1943            } else if tick.dispatched > 0 {
1944                // Yield to the runtime when work was dispatched, preventing
1945                // CPU spinning when the dispatch loop is actively processing.
1946                tokio::task::yield_now().await;
1947            }
1948        }
1949
1950        Ok(summary)
1951    }
1952
1953    /// Begins graceful drain: stops promoting and dispatching new work,
1954    /// but continues processing in-flight results and heartbeating leases.
1955    pub fn start_drain(&mut self) {
1956        self.draining = true;
1957    }
1958
1959    /// Drains in-flight work until idle or the timeout expires.
1960    pub async fn drain_until_idle(
1961        &mut self,
1962        timeout: std::time::Duration,
1963    ) -> Result<RunSummary, DispatchError> {
1964        self.start_drain();
1965        let deadline = tokio::time::Instant::now() + timeout;
1966        let mut summary = RunSummary::default();
1967
1968        loop {
1969            if self.in_flight.is_empty() {
1970                break;
1971            }
1972
1973            if tokio::time::Instant::now() >= deadline {
1974                tracing::warn!(
1975                    remaining_in_flight = self.in_flight.len(),
1976                    "drain timeout expired with in-flight runs"
1977                );
1978                break;
1979            }
1980
1981            let tick = self.tick().await?;
1982            summary.ticks += 1;
1983            summary.total_completed += tick.completed;
1984
1985            if tick.completed == 0 && !self.in_flight.is_empty() {
1986                // Wait for a result with the remaining timeout.
1987                let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1988                match tokio::time::timeout(remaining, self.result_rx.recv()).await {
1989                    Ok(Some(result)) => {
1990                        self.pending_result = Some(result);
1991                    }
1992                    Ok(None) => {
1993                        self.in_flight.clear();
1994                        break;
1995                    }
1996                    Err(_) => {
1997                        tracing::warn!(
1998                            remaining_in_flight = self.in_flight.len(),
1999                            "drain timeout expired waiting for worker results"
2000                        );
2001                        break;
2002                    }
2003                }
2004            }
2005        }
2006
2007        Ok(summary)
2008    }
2009
2010    /// Submits a new task and derives initial runs.
2011    pub fn submit_task(&mut self, spec: TaskSpec) -> Result<(), DispatchError> {
2012        let current_time = self.clock.now();
2013        let seq = self.next_sequence()?;
2014
2015        // Create task
2016        let _ = self
2017            .authority
2018            .submit_command(
2019                MutationCommand::TaskCreate(TaskCreateCommand::new(
2020                    seq,
2021                    spec.clone(),
2022                    current_time,
2023                )),
2024                DurabilityPolicy::Immediate,
2025            )
2026            .map_err(DispatchError::Authority)?;
2027
2028        // Derive runs
2029        let already_derived = self.authority.projection().run_ids_for_task(spec.id()).len() as u32;
2030        let derivation = actionqueue_engine::derive::derive_runs(
2031            &self.clock,
2032            spec.id(),
2033            spec.run_policy(),
2034            already_derived,
2035            current_time,
2036        )
2037        .map_err(DispatchError::Derivation)?;
2038
2039        for run in derivation.into_derived() {
2040            let seq = self.next_sequence()?;
2041            let _ = self
2042                .authority
2043                .submit_command(
2044                    MutationCommand::RunCreate(RunCreateCommand::new(seq, run)),
2045                    DurabilityPolicy::Immediate,
2046                )
2047                .map_err(DispatchError::Authority)?;
2048        }
2049
2050        Ok(())
2051    }
2052
2053    /// Declares a DAG dependency: `task_id` may not promote until all `prereqs` complete.
2054    ///
2055    /// Validates cycle-freedom and prerequisite existence before WAL-appending.
2056    pub fn declare_dependency(
2057        &mut self,
2058        task_id: TaskId,
2059        prereqs: Vec<TaskId>,
2060    ) -> Result<(), DispatchError> {
2061        // Cycle check.
2062        self.dependency_gate
2063            .check_cycle(task_id, &prereqs)
2064            .map_err(DispatchError::DependencyCycle)?;
2065
2066        // All prerequisites must be known.
2067        for prereq_id in &prereqs {
2068            if self.authority.projection().get_task(prereq_id).is_none() {
2069                return Err(DispatchError::SubmissionRejected {
2070                    task_id,
2071                    context: format!("prerequisite {prereq_id} not in projection"),
2072                });
2073            }
2074        }
2075
2076        let seq = self.next_sequence()?;
2077        let current_time = self.clock.now();
2078        let _ = self
2079            .authority
2080            .submit_command(
2081                MutationCommand::DependencyDeclare(DependencyDeclareCommand::new(
2082                    seq,
2083                    task_id,
2084                    prereqs.clone(),
2085                    current_time,
2086                )),
2087                DurabilityPolicy::Immediate,
2088            )
2089            .map_err(DispatchError::Authority)?;
2090
2091        // Update in-memory gate (safe since cycle was already checked).
2092        let _ = self.dependency_gate.declare(task_id, prereqs.clone());
2093
2094        // Catch up prerequisites that already completed (e.g., when declaring
2095        // a dependency AFTER the prerequisite ran). GC may have removed the
2096        // completed task from `satisfied`; re-check the projection to restore it.
2097        for prereq_id in &prereqs {
2098            let runs: Vec<_> = self.authority.projection().runs_for_task(*prereq_id).collect();
2099            if !runs.is_empty() {
2100                let all_terminal = runs.iter().all(|r| r.state().is_terminal());
2101                let has_completed = runs.iter().any(|r| r.state() == RunState::Completed);
2102                if all_terminal && has_completed {
2103                    self.dependency_gate.force_satisfy(*prereq_id);
2104                } else if all_terminal && !has_completed {
2105                    self.dependency_gate.force_fail(*prereq_id);
2106                }
2107            }
2108        }
2109        // Re-evaluate satisfaction for task_id after prereq state is restored.
2110        self.dependency_gate.recompute_satisfaction_pub(task_id);
2111
2112        Ok(())
2113    }
2114
2115    /// Allocates a budget for a task/dimension pair and WAL-appends the event.
2116    ///
2117    /// Called by external callers (e.g. acceptance tests or the Caelum runtime)
2118    /// to establish the consumption ceiling before the task begins executing.
2119    #[cfg(feature = "budget")]
2120    pub fn allocate_budget(
2121        &mut self,
2122        task_id: TaskId,
2123        dimension: actionqueue_core::budget::BudgetDimension,
2124        limit: u64,
2125    ) -> Result<(), DispatchError> {
2126        use actionqueue_core::mutation::{BudgetAllocateCommand, MutationCommand as MC};
2127        let current_time = self.clock.now();
2128        let seq = self.next_sequence()?;
2129        let _ = self
2130            .authority
2131            .submit_command(
2132                MC::BudgetAllocate(BudgetAllocateCommand::new(
2133                    seq,
2134                    task_id,
2135                    dimension,
2136                    limit,
2137                    current_time,
2138                )),
2139                DurabilityPolicy::Immediate,
2140            )
2141            .map_err(DispatchError::Authority)?;
2142        self.budget_tracker.allocate(task_id, dimension, limit);
2143        Ok(())
2144    }
2145
2146    /// Replenishes an exhausted budget and WAL-appends the event.
2147    ///
2148    /// After replenishment the budget gate will allow the blocked task to be
2149    /// dispatched again on the next tick.
2150    #[cfg(feature = "budget")]
2151    pub fn replenish_budget(
2152        &mut self,
2153        task_id: TaskId,
2154        dimension: actionqueue_core::budget::BudgetDimension,
2155        new_limit: u64,
2156    ) -> Result<(), DispatchError> {
2157        use actionqueue_core::mutation::{BudgetReplenishCommand, MutationCommand as MC};
2158        let current_time = self.clock.now();
2159        let seq = self.next_sequence()?;
2160        let _ = self
2161            .authority
2162            .submit_command(
2163                MC::BudgetReplenish(BudgetReplenishCommand::new(
2164                    seq,
2165                    task_id,
2166                    dimension,
2167                    new_limit,
2168                    current_time,
2169                )),
2170                DurabilityPolicy::Immediate,
2171            )
2172            .map_err(DispatchError::Authority)?;
2173        self.budget_tracker.replenish(task_id, dimension, new_limit);
2174        Ok(())
2175    }
2176
2177    /// Resumes a suspended run by WAL-appending a `RunResume` command.
2178    ///
2179    /// Transitions the run from `Suspended → Ready` so it will be dispatched
2180    /// on the next tick. Returns an error if the run is not currently suspended.
2181    #[cfg(feature = "budget")]
2182    pub fn resume_run(&mut self, run_id: RunId) -> Result<(), DispatchError> {
2183        use actionqueue_core::mutation::{MutationCommand as MC, RunResumeCommand};
2184        let current_time = self.clock.now();
2185        let seq = self.next_sequence()?;
2186        let _ = self
2187            .authority
2188            .submit_command(
2189                MC::RunResume(RunResumeCommand::new(seq, run_id, current_time)),
2190                DurabilityPolicy::Immediate,
2191            )
2192            .map_err(DispatchError::Authority)?;
2193        Ok(())
2194    }
2195
2196    /// Signals cancellation to in-flight runs whose budget is now exhausted.
2197    ///
2198    /// Called each tick after draining worker results and recording consumption.
2199    /// The handler observes the cancellation via its `CancellationContext` and
2200    /// cooperatively returns `Suspended`.
2201    #[cfg(feature = "budget")]
2202    fn signal_budget_exhaustion_cancellations(&self) {
2203        for inf in self.in_flight.values() {
2204            if self.budget_tracker.is_any_exhausted(inf.task_id) {
2205                if let Some(ref ctx) = inf.cancellation_context {
2206                    ctx.cancel();
2207                    tracing::debug!(
2208                        run_id = %inf.run_id,
2209                        task_id = %inf.task_id,
2210                        "budget exhausted: signaling cancellation to handler"
2211                    );
2212                }
2213            }
2214        }
2215    }
2216
2217    /// Promotes Scheduled runs whose task has a triggered subscription.
2218    ///
2219    /// Bypasses the `scheduled_at` time check. Returns the number of runs promoted.
2220    /// After promotion the one-shot trigger is cleared.
2221    #[cfg(feature = "budget")]
2222    fn promote_subscription_triggered_scheduled(
2223        &mut self,
2224        current_time: u64,
2225    ) -> Result<usize, DispatchError> {
2226        let triggered: Vec<RunInstance> = self
2227            .authority
2228            .projection()
2229            .run_instances()
2230            .filter(|r| r.state() == RunState::Scheduled)
2231            .filter(|r| self.subscription_registry.is_triggered(r.task_id()))
2232            .filter(|r| self.dependency_gate.is_eligible(r.task_id()))
2233            .cloned()
2234            .collect();
2235
2236        let count = triggered.len();
2237        for run in &triggered {
2238            let seq = self.next_sequence()?;
2239            let _ = self
2240                .authority
2241                .submit_command(
2242                    MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
2243                        seq,
2244                        run.id(),
2245                        RunState::Scheduled,
2246                        RunState::Ready,
2247                        current_time,
2248                    )),
2249                    DurabilityPolicy::Immediate,
2250                )
2251                .map_err(DispatchError::Authority)?;
2252            self.subscription_registry.clear_triggered(run.task_id());
2253        }
2254        Ok(count)
2255    }
2256
2257    /// Creates a new event subscription and WAL-appends it.
2258    ///
2259    /// The subscription is registered in the in-memory registry and will be
2260    /// matched against events fired after each tick's worker result processing.
2261    #[cfg(feature = "budget")]
2262    pub fn create_subscription(
2263        &mut self,
2264        task_id: TaskId,
2265        filter: actionqueue_core::subscription::EventFilter,
2266    ) -> Result<actionqueue_core::subscription::SubscriptionId, DispatchError> {
2267        use actionqueue_core::mutation::SubscriptionCreateCommand;
2268        let sub_id = actionqueue_core::subscription::SubscriptionId::new();
2269        let current_time = self.clock.now();
2270        let seq = self.next_sequence()?;
2271        let _ = self
2272            .authority
2273            .submit_command(
2274                MutationCommand::SubscriptionCreate(SubscriptionCreateCommand::new(
2275                    seq,
2276                    sub_id,
2277                    task_id,
2278                    filter.clone(),
2279                    current_time,
2280                )),
2281                DurabilityPolicy::Immediate,
2282            )
2283            .map_err(DispatchError::Authority)?;
2284        self.subscription_registry.register(sub_id, task_id, filter);
2285        Ok(sub_id)
2286    }
2287
2288    /// Fires a custom event and matches it against active subscriptions.
2289    ///
2290    /// Any subscriptions with a `Custom { key }` filter matching the event
2291    /// key are triggered. Triggered subscriptions cause their associated
2292    /// task's Scheduled runs to be promoted on the next tick.
2293    #[cfg(feature = "budget")]
2294    pub fn fire_custom_event(&mut self, key: String) -> Result<(), DispatchError> {
2295        let event = actionqueue_budget::ActionQueueEvent::CustomEvent { key };
2296        let matched = actionqueue_budget::check_event(&event, &self.subscription_registry);
2297        for sub_id in matched {
2298            self.trigger_subscription_durable(sub_id)?;
2299        }
2300        Ok(())
2301    }
2302
2303    /// Fires events for state transitions and matches against subscriptions.
2304    ///
2305    /// Called from `process_worker_result` after a terminal state transition.
2306    /// Fires `RunChangedState` for every transition, and `TaskReachedTerminalSuccess`
2307    /// when all runs of a task are terminal with at least one Completed.
2308    #[cfg(feature = "budget")]
2309    fn fire_events_for_transition(
2310        &mut self,
2311        task_id: TaskId,
2312        new_state: RunState,
2313    ) -> Result<(), DispatchError> {
2314        use actionqueue_budget::{check_event, ActionQueueEvent};
2315
2316        // Fire RunChangedState event.
2317        let event = ActionQueueEvent::RunChangedState { task_id, new_state };
2318        let matched = check_event(&event, &self.subscription_registry);
2319        for sub_id in matched {
2320            self.trigger_subscription_durable(sub_id)?;
2321        }
2322
2323        // If terminal: check if task reached terminal success.
2324        if new_state.is_terminal() {
2325            self.fire_task_terminal_success_event(task_id)?;
2326        }
2327        Ok(())
2328    }
2329
2330    /// Fires `TaskReachedTerminalSuccess` if all runs of the task are terminal
2331    /// and at least one is Completed.
2332    #[cfg(feature = "budget")]
2333    fn fire_task_terminal_success_event(&mut self, task_id: TaskId) -> Result<(), DispatchError> {
2334        use actionqueue_budget::{check_event, ActionQueueEvent};
2335
2336        let all_terminal =
2337            self.authority.projection().runs_for_task(task_id).all(|r| r.state().is_terminal());
2338        if !all_terminal {
2339            return Ok(());
2340        }
2341        let any_completed = self
2342            .authority
2343            .projection()
2344            .runs_for_task(task_id)
2345            .any(|r| r.state() == RunState::Completed);
2346        if !any_completed {
2347            return Ok(());
2348        }
2349        let event = ActionQueueEvent::TaskReachedTerminalSuccess { task_id };
2350        let matched = check_event(&event, &self.subscription_registry);
2351        for sub_id in matched {
2352            self.trigger_subscription_durable(sub_id)?;
2353        }
2354        Ok(())
2355    }
2356
2357    /// Fires budget threshold events after consumption is recorded.
2358    ///
2359    /// For each (task, dimension) that has a budget allocated, checks if
2360    /// the consumption percentage crossed any subscribed threshold.
2361    #[cfg(feature = "budget")]
2362    fn fire_budget_threshold_events(&mut self, task_id: TaskId) -> Result<(), DispatchError> {
2363        use actionqueue_budget::{check_event, ActionQueueEvent};
2364        use actionqueue_core::budget::BudgetDimension;
2365
2366        for &dim in &[BudgetDimension::Token, BudgetDimension::CostCents, BudgetDimension::TimeSecs]
2367        {
2368            if let Some(pct) = self.budget_tracker.threshold_pct(task_id, dim) {
2369                let event =
2370                    ActionQueueEvent::BudgetThresholdCrossed { task_id, dimension: dim, pct };
2371                let matched = check_event(&event, &self.subscription_registry);
2372                for sub_id in matched {
2373                    self.trigger_subscription_durable(sub_id)?;
2374                }
2375            }
2376        }
2377        Ok(())
2378    }
2379
2380    /// WAL-appends a `SubscriptionTriggered` event and triggers the
2381    /// subscription in-memory. This ensures subscription triggers survive
2382    /// crash recovery.
2383    #[cfg(feature = "budget")]
2384    fn trigger_subscription_durable(
2385        &mut self,
2386        subscription_id: actionqueue_core::subscription::SubscriptionId,
2387    ) -> Result<(), DispatchError> {
2388        use actionqueue_core::mutation::SubscriptionTriggerCommand;
2389        let current_time = self.clock.now();
2390        let seq = self.next_sequence()?;
2391        let _ = self
2392            .authority
2393            .submit_command(
2394                MutationCommand::SubscriptionTrigger(SubscriptionTriggerCommand::new(
2395                    seq,
2396                    subscription_id,
2397                    current_time,
2398                )),
2399                DurabilityPolicy::Immediate,
2400            )
2401            .map_err(DispatchError::Authority)?;
2402        self.subscription_registry.trigger(subscription_id);
2403        Ok(())
2404    }
2405
2406    // ── Actor feature methods ──────────────────────────────────────────────
2407
2408    /// Registers a remote actor with the hub.
2409    #[cfg(feature = "actor")]
2410    pub fn register_actor(
2411        &mut self,
2412        registration: actionqueue_core::actor::ActorRegistration,
2413    ) -> Result<(), DispatchError> {
2414        use actionqueue_core::mutation::{ActorRegisterCommand, MutationCommand};
2415
2416        let actor_id = registration.actor_id();
2417        let policy = actionqueue_core::actor::HeartbeatPolicy::with_default_multiplier(
2418            registration.heartbeat_interval_secs(),
2419        );
2420        let seq = self.next_sequence()?;
2421        let ts = self.clock.now();
2422        let _ = self
2423            .authority
2424            .submit_command(
2425                MutationCommand::ActorRegister(ActorRegisterCommand::new(
2426                    seq,
2427                    registration.clone(),
2428                    ts,
2429                )),
2430                DurabilityPolicy::Immediate,
2431            )
2432            .map_err(DispatchError::Authority)?;
2433
2434        self.actor_registry.register(registration);
2435        self.heartbeat_monitor.record_registration(actor_id, policy, ts);
2436        Ok(())
2437    }
2438
2439    /// Deregisters a remote actor from the hub.
2440    #[cfg(feature = "actor")]
2441    pub fn deregister_actor(
2442        &mut self,
2443        actor_id: actionqueue_core::ids::ActorId,
2444    ) -> Result<(), DispatchError> {
2445        use actionqueue_core::mutation::{ActorDeregisterCommand, MutationCommand};
2446
2447        let seq = self.next_sequence()?;
2448        let ts = self.clock.now();
2449        let _ = self
2450            .authority
2451            .submit_command(
2452                MutationCommand::ActorDeregister(ActorDeregisterCommand::new(seq, actor_id, ts)),
2453                DurabilityPolicy::Immediate,
2454            )
2455            .map_err(DispatchError::Authority)?;
2456
2457        self.actor_registry.deregister(actor_id);
2458        self.heartbeat_monitor.remove(actor_id);
2459        self.department_registry.remove(actor_id);
2460        Ok(())
2461    }
2462
2463    /// Records an actor heartbeat.
2464    #[cfg(feature = "actor")]
2465    pub fn actor_heartbeat(
2466        &mut self,
2467        actor_id: actionqueue_core::ids::ActorId,
2468    ) -> Result<(), DispatchError> {
2469        use actionqueue_core::mutation::{ActorHeartbeatCommand, MutationCommand};
2470
2471        let seq = self.next_sequence()?;
2472        let ts = self.clock.now();
2473        let _ = self
2474            .authority
2475            .submit_command(
2476                MutationCommand::ActorHeartbeat(ActorHeartbeatCommand::new(seq, actor_id, ts)),
2477                DurabilityPolicy::Immediate,
2478            )
2479            .map_err(DispatchError::Authority)?;
2480
2481        self.heartbeat_monitor.record_heartbeat(actor_id, ts);
2482        Ok(())
2483    }
2484
2485    /// Detects and handles actors whose heartbeat has timed out.
2486    ///
2487    /// Called each tick (step 0f). For each timed-out actor, deregisters
2488    /// them from all registries and releases their leases.
2489    #[cfg(feature = "actor")]
2490    fn check_actor_heartbeat_timeouts(&mut self) -> Result<(), DispatchError> {
2491        let now = self.clock.now();
2492        let timed_out = self.heartbeat_monitor.check_timeouts(now);
2493        for actor_id in timed_out {
2494            tracing::warn!(%actor_id, "actor heartbeat timeout — deregistering");
2495            self.deregister_actor(actor_id)?;
2496        }
2497        Ok(())
2498    }
2499
2500    // ── Platform feature methods ───────────────────────────────────────────
2501
2502    /// Creates a new organizational tenant.
2503    #[cfg(feature = "platform")]
2504    pub fn create_tenant(
2505        &mut self,
2506        registration: actionqueue_core::platform::TenantRegistration,
2507    ) -> Result<(), DispatchError> {
2508        use actionqueue_core::mutation::{MutationCommand, TenantCreateCommand};
2509
2510        let seq = self.next_sequence()?;
2511        let ts = self.clock.now();
2512        let _ = self
2513            .authority
2514            .submit_command(
2515                MutationCommand::TenantCreate(TenantCreateCommand::new(
2516                    seq,
2517                    registration.clone(),
2518                    ts,
2519                )),
2520                DurabilityPolicy::Immediate,
2521            )
2522            .map_err(DispatchError::Authority)?;
2523
2524        self.tenant_registry.register(registration);
2525        Ok(())
2526    }
2527
2528    /// Assigns a role to an actor within a tenant.
2529    #[cfg(feature = "platform")]
2530    pub fn assign_role(
2531        &mut self,
2532        actor_id: actionqueue_core::ids::ActorId,
2533        role: actionqueue_core::platform::Role,
2534        tenant_id: actionqueue_core::ids::TenantId,
2535    ) -> Result<(), DispatchError> {
2536        use actionqueue_core::mutation::{MutationCommand, RoleAssignCommand};
2537
2538        let seq = self.next_sequence()?;
2539        let ts = self.clock.now();
2540        let _ = self
2541            .authority
2542            .submit_command(
2543                MutationCommand::RoleAssign(RoleAssignCommand::new(
2544                    seq,
2545                    actor_id,
2546                    role.clone(),
2547                    tenant_id,
2548                    ts,
2549                )),
2550                DurabilityPolicy::Immediate,
2551            )
2552            .map_err(DispatchError::Authority)?;
2553
2554        self.rbac_enforcer.assign_role(actor_id, role, tenant_id);
2555        Ok(())
2556    }
2557
2558    /// Grants a capability to an actor within a tenant.
2559    #[cfg(feature = "platform")]
2560    pub fn grant_capability(
2561        &mut self,
2562        actor_id: actionqueue_core::ids::ActorId,
2563        capability: actionqueue_core::platform::Capability,
2564        tenant_id: actionqueue_core::ids::TenantId,
2565    ) -> Result<(), DispatchError> {
2566        use actionqueue_core::mutation::{CapabilityGrantCommand, MutationCommand};
2567
2568        let seq = self.next_sequence()?;
2569        let ts = self.clock.now();
2570        let _ = self
2571            .authority
2572            .submit_command(
2573                MutationCommand::CapabilityGrant(CapabilityGrantCommand::new(
2574                    seq,
2575                    actor_id,
2576                    capability.clone(),
2577                    tenant_id,
2578                    ts,
2579                )),
2580                DurabilityPolicy::Immediate,
2581            )
2582            .map_err(DispatchError::Authority)?;
2583
2584        self.rbac_enforcer.grant_capability(actor_id, capability, tenant_id);
2585        Ok(())
2586    }
2587
2588    /// Revokes a capability from an actor within a tenant.
2589    #[cfg(feature = "platform")]
2590    pub fn revoke_capability(
2591        &mut self,
2592        actor_id: actionqueue_core::ids::ActorId,
2593        capability: actionqueue_core::platform::Capability,
2594        tenant_id: actionqueue_core::ids::TenantId,
2595    ) -> Result<(), DispatchError> {
2596        use actionqueue_core::mutation::{CapabilityRevokeCommand, MutationCommand};
2597
2598        let seq = self.next_sequence()?;
2599        let ts = self.clock.now();
2600        let _ = self
2601            .authority
2602            .submit_command(
2603                MutationCommand::CapabilityRevoke(CapabilityRevokeCommand::new(
2604                    seq,
2605                    actor_id,
2606                    capability.clone(),
2607                    tenant_id,
2608                    ts,
2609                )),
2610                DurabilityPolicy::Immediate,
2611            )
2612            .map_err(DispatchError::Authority)?;
2613
2614        self.rbac_enforcer.revoke_capability(actor_id, &capability, tenant_id);
2615        Ok(())
2616    }
2617
2618    /// Appends an entry to the organizational ledger.
2619    #[cfg(feature = "platform")]
2620    pub fn append_ledger_entry(
2621        &mut self,
2622        entry: actionqueue_core::platform::LedgerEntry,
2623    ) -> Result<(), DispatchError> {
2624        use actionqueue_core::mutation::{LedgerAppendCommand, MutationCommand};
2625
2626        let seq = self.next_sequence()?;
2627        let ts = self.clock.now();
2628        let _ = self
2629            .authority
2630            .submit_command(
2631                MutationCommand::LedgerAppend(LedgerAppendCommand::new(seq, entry.clone(), ts)),
2632                DurabilityPolicy::Immediate,
2633            )
2634            .map_err(DispatchError::Authority)?;
2635
2636        self.ledger.append(entry);
2637        Ok(())
2638    }
2639
2640    /// Returns the append ledger.
2641    #[cfg(feature = "platform")]
2642    pub fn ledger(&self) -> &actionqueue_platform::AppendLedger {
2643        &self.ledger
2644    }
2645
2646    /// Returns the RBAC enforcer.
2647    #[cfg(feature = "platform")]
2648    pub fn rbac(&self) -> &actionqueue_platform::RbacEnforcer {
2649        &self.rbac_enforcer
2650    }
2651
2652    /// Returns the actor registry.
2653    #[cfg(feature = "actor")]
2654    pub fn actor_registry(&self) -> &actionqueue_actor::ActorRegistry {
2655        &self.actor_registry
2656    }
2657
2658    /// Returns the tenant registry.
2659    #[cfg(feature = "platform")]
2660    pub fn tenant_registry(&self) -> &actionqueue_platform::TenantRegistry {
2661        &self.tenant_registry
2662    }
2663
2664    /// Consumes the dispatch loop and returns the mutation authority.
2665    pub fn into_authority(self) -> StorageMutationAuthority<W, ReplayReducer> {
2666        self.authority
2667    }
2668}