1use std::collections::HashMap;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use actionqueue_core::ids::{AttemptId, RunId, TaskId};
15use actionqueue_core::mutation::{
16 AttemptStartCommand, DependencyDeclareCommand, DurabilityPolicy, LeaseAcquireCommand,
17 LeaseHeartbeatCommand, LeaseReleaseCommand, MutationAuthority, MutationCommand,
18 RunCreateCommand, RunStateTransitionCommand, TaskCancelCommand, TaskCreateCommand,
19};
20use actionqueue_core::run::run_instance::{RunInstance, RunInstanceError};
21use actionqueue_core::run::state::RunState;
22use actionqueue_core::task::constraints::ConcurrencyKeyHoldPolicy;
23use actionqueue_core::task::safety::SafetyLevel;
24use actionqueue_core::task::task_spec::TaskSpec;
25use actionqueue_engine::concurrency::key_gate::{ConcurrencyKey, KeyGate, ReleaseResult};
26use actionqueue_engine::derive::DerivationError;
27use actionqueue_engine::index::ready::ReadyIndex;
28use actionqueue_engine::index::scheduled::ScheduledIndex;
29use actionqueue_engine::scheduler::promotion::{
30 promote_scheduled_to_ready_via_authority, AuthorityPromotionError, PromotionParams,
31};
32use actionqueue_engine::scheduler::retry_promotion::promote_retry_wait_to_ready;
33use actionqueue_engine::selection::default_selector::{ready_inputs_from_index, select_ready_runs};
34use actionqueue_engine::time::clock::Clock;
35use actionqueue_executor_local::backoff::BackoffStrategy;
36use actionqueue_executor_local::handler::ExecutorHandler;
37use actionqueue_executor_local::identity::{ExecutorIdentity, LocalExecutorIdentity};
38use actionqueue_executor_local::types::ExecutorRequest;
39use actionqueue_executor_local::{AttemptRunner, SystemAttemptTimer};
40use actionqueue_storage::mutation::authority::{MutationAuthorityError, StorageMutationAuthority};
41use actionqueue_storage::recovery::reducer::{ReplayReducer, ReplayReducerError};
42use actionqueue_storage::snapshot::build::build_snapshot_from_projection;
43use actionqueue_storage::snapshot::mapping::SnapshotMappingError;
44use actionqueue_storage::snapshot::writer::{
45 SnapshotFsWriter, SnapshotWriter, SnapshotWriterError,
46};
47use actionqueue_storage::wal::writer::WalWriter;
48use actionqueue_workflow::children::build_children_snapshot;
49use actionqueue_workflow::dag::DependencyGate;
50use actionqueue_workflow::hierarchy::HierarchyTracker;
51use actionqueue_workflow::submission::{submission_channel, SubmissionChannel, SubmissionReceiver};
52use tokio::sync::mpsc;
53
54use crate::config::BackoffStrategyConfig;
55use crate::worker::{InFlightRun, WorkerResult};
56
57fn build_dependency_gate(projection: &ReplayReducer) -> DependencyGate {
64 use actionqueue_core::run::state::RunState;
65
66 let mut gate = DependencyGate::new();
67
68 let dep_map: std::collections::HashMap<_, _> = projection.dependency_declarations().collect();
70
71 for (&task_id, prereqs) in &dep_map {
73 if let Err(err) = gate.declare(task_id, prereqs.iter().copied().collect()) {
75 tracing::warn!(%task_id, error = %err, "dependency gate declare failed at bootstrap");
76 }
77 }
78
79 let all_prereqs: std::collections::HashSet<_> =
82 dep_map.values().flat_map(|prereqs| prereqs.iter().copied()).collect();
83
84 for task_id in all_prereqs {
85 let runs: Vec<_> = projection.runs_for_task(task_id).collect();
86 if runs.is_empty() {
87 continue;
88 }
89 let all_terminal = runs.iter().all(|r| r.state().is_terminal());
90 if !all_terminal {
91 continue;
92 }
93 let has_completed = runs.iter().any(|r| r.state() == RunState::Completed);
94 if has_completed {
95 gate.force_satisfy(task_id);
96 } else {
97 gate.force_fail(task_id);
98 }
99 }
100
101 let _ = gate.propagate_failures();
105
106 gate
107}
108
109fn build_hierarchy_tracker(projection: &ReplayReducer) -> HierarchyTracker {
118 let mut tracker = HierarchyTracker::new();
119
120 for (child_id, parent_id) in projection.parent_child_mappings() {
123 let _ = tracker.register_child(parent_id, child_id);
126 }
127
128 for task_record in projection.task_records() {
130 let task_id = task_record.task_spec().id();
131 let runs: Vec<_> = projection.runs_for_task(task_id).collect();
132 let is_terminal = if runs.is_empty() {
133 projection.is_task_canceled(task_id)
134 } else {
135 runs.iter().all(|r| r.state().is_terminal())
136 };
137 if is_terminal {
138 tracker.mark_terminal(task_id);
139 }
140 }
141
142 tracker
143}
144
145#[derive(Debug, Clone, Default)]
147#[must_use = "tick result should be inspected for dispatch activity"]
148pub struct TickResult {
149 pub promoted_scheduled: usize,
151 pub promoted_retry_wait: usize,
153 pub dispatched: usize,
155 pub completed: usize,
157 pub engine_paused: bool,
159}
160
161#[derive(Debug, Clone, Default)]
163#[must_use]
164pub struct RunSummary {
165 pub ticks: usize,
167 pub total_dispatched: usize,
169 pub total_completed: usize,
171}
172
173pub type AuthorityError = MutationAuthorityError<ReplayReducerError>;
175
176#[derive(Debug)]
178pub enum DispatchError {
179 SequenceOverflow,
181 Authority(AuthorityError),
183 ScheduledPromotion(AuthorityPromotionError<AuthorityError>),
185 RetryPromotion(RunInstanceError),
187 Derivation(DerivationError),
189 SnapshotBuild(SnapshotMappingError),
191 SnapshotWrite(SnapshotWriterError),
193 SnapshotInit(String),
195 StateInconsistency {
197 run_id: RunId,
199 context: String,
201 },
202 InvalidBackoffConfig,
204 DependencyCycle(actionqueue_workflow::dag::CycleError),
206 RetryDecision(actionqueue_executor_local::RetryDecisionError),
208 SubmissionRejected {
210 task_id: TaskId,
212 context: String,
214 },
215}
216
217impl std::fmt::Display for DispatchError {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 match self {
220 DispatchError::SequenceOverflow => write!(f, "WAL sequence counter overflow"),
221 DispatchError::Authority(e) => write!(f, "authority error: {e}"),
222 DispatchError::ScheduledPromotion(e) => {
223 write!(f, "scheduled promotion error: {e}")
224 }
225 DispatchError::RetryPromotion(e) => write!(f, "retry promotion error: {e}"),
226 DispatchError::Derivation(e) => write!(f, "run derivation error: {e}"),
227 DispatchError::SnapshotBuild(e) => write!(f, "snapshot build error: {e}"),
228 DispatchError::SnapshotWrite(e) => write!(f, "snapshot write error: {e}"),
229 DispatchError::SnapshotInit(e) => write!(f, "snapshot init error: {e}"),
230 DispatchError::StateInconsistency { run_id, context } => {
231 write!(f, "state inconsistency for run {run_id}: {context}")
232 }
233 DispatchError::InvalidBackoffConfig => {
234 write!(f, "invalid backoff configuration")
235 }
236 DispatchError::RetryDecision(e) => write!(f, "retry decision error: {e}"),
237 DispatchError::DependencyCycle(e) => write!(f, "dependency cycle: {e}"),
238 DispatchError::SubmissionRejected { task_id, context } => {
239 write!(f, "submission rejected for task {task_id}: {context}")
240 }
241 }
242 }
243}
244
245impl std::error::Error for DispatchError {}
246
247pub struct DispatchLoop<
257 W: WalWriter,
258 H: ExecutorHandler,
259 C: Clock = actionqueue_engine::time::clock::SystemClock,
260 I: ExecutorIdentity = LocalExecutorIdentity,
261> {
262 authority: StorageMutationAuthority<W, ReplayReducer>,
263 runner: Arc<AttemptRunner<H, SystemAttemptTimer>>,
264 clock: C,
265 identity: I,
267 key_gate: KeyGate,
268 backoff: Box<dyn BackoffStrategy + Send + Sync>,
269 max_concurrent: usize,
270 lease_timeout_secs: u64,
271 result_tx: mpsc::UnboundedSender<WorkerResult>,
272 result_rx: mpsc::UnboundedReceiver<WorkerResult>,
273 in_flight: HashMap<actionqueue_core::ids::RunId, InFlightRun>,
274 pending_result: Option<WorkerResult>,
275 draining: bool,
276 snapshot_path: Option<PathBuf>,
277 snapshot_event_threshold: Option<u64>,
278 events_since_last_snapshot: u64,
279 submission_tx: std::sync::Arc<SubmissionChannel>,
281 submission_rx: SubmissionReceiver,
283 dependency_gate: DependencyGate,
286 hierarchy_tracker: HierarchyTracker,
290 pending_hierarchy_cascade: std::collections::HashSet<TaskId>,
294 #[cfg(feature = "budget")]
297 budget_tracker: actionqueue_budget::BudgetTracker,
298 #[cfg(feature = "budget")]
301 subscription_registry: actionqueue_budget::SubscriptionRegistry,
302 #[cfg(feature = "workflow")]
305 cron_schedule_cache: actionqueue_engine::derive::cron::CronScheduleCache,
306 #[cfg(feature = "actor")]
308 actor_registry: actionqueue_actor::ActorRegistry,
309 #[cfg(feature = "actor")]
311 heartbeat_monitor: actionqueue_actor::HeartbeatMonitor,
312 #[cfg(feature = "actor")]
314 department_registry: actionqueue_actor::DepartmentRegistry,
315 #[cfg(feature = "platform")]
317 tenant_registry: actionqueue_platform::TenantRegistry,
318 #[cfg(feature = "platform")]
320 rbac_enforcer: actionqueue_platform::RbacEnforcer,
321 #[cfg(feature = "platform")]
323 ledger: actionqueue_platform::AppendLedger,
324 pending_gc_tasks: std::collections::HashSet<TaskId>,
329}
330
331pub struct DispatchConfig {
334 pub(crate) backoff_config: BackoffStrategyConfig,
336 pub(crate) max_concurrent: usize,
338 pub(crate) lease_timeout_secs: u64,
340 pub(crate) snapshot_path: Option<PathBuf>,
342 pub(crate) snapshot_event_threshold: Option<u64>,
344}
345
346impl DispatchConfig {
347 pub fn new(
349 backoff_config: BackoffStrategyConfig,
350 max_concurrent: usize,
351 lease_timeout_secs: u64,
352 snapshot_path: Option<PathBuf>,
353 snapshot_event_threshold: Option<u64>,
354 ) -> Self {
355 Self {
356 backoff_config,
357 max_concurrent,
358 lease_timeout_secs,
359 snapshot_path,
360 snapshot_event_threshold,
361 }
362 }
363}
364
365impl<W: WalWriter, H: ExecutorHandler + 'static, C: Clock> DispatchLoop<W, H, C> {
366 pub fn new(
373 authority: StorageMutationAuthority<W, ReplayReducer>,
374 handler: H,
375 clock: C,
376 config: DispatchConfig,
377 ) -> Result<Self, DispatchError> {
378 let backoff: Box<dyn BackoffStrategy + Send + Sync> = match &config.backoff_config {
379 BackoffStrategyConfig::Fixed { interval } => {
380 Box::new(actionqueue_executor_local::FixedBackoff::new(*interval))
381 }
382 BackoffStrategyConfig::Exponential { base, max } => Box::new(
383 actionqueue_executor_local::ExponentialBackoff::new(*base, *max)
384 .map_err(|_| DispatchError::InvalidBackoffConfig)?,
385 ),
386 };
387
388 let (result_tx, result_rx) = mpsc::unbounded_channel();
389 let (submission_tx, submission_rx) = submission_channel();
390
391 let dependency_gate = build_dependency_gate(authority.projection());
393 let hierarchy_tracker = build_hierarchy_tracker(authority.projection());
395
396 #[cfg(feature = "budget")]
398 let budget_tracker = {
399 let mut tracker = actionqueue_budget::BudgetTracker::new();
400 for ((task_id, dimension), record) in authority.projection().budgets() {
401 tracker.allocate(*task_id, *dimension, record.limit);
402 if record.consumed > 0 {
403 tracker.consume(*task_id, *dimension, record.consumed);
404 }
405 }
406 tracker
407 };
408
409 #[cfg(feature = "budget")]
411 let subscription_registry = {
412 let mut registry = actionqueue_budget::SubscriptionRegistry::new();
413 for (sub_id, record) in authority.projection().subscriptions() {
414 if record.canceled_at.is_none() {
415 registry.register(*sub_id, record.task_id, record.filter.clone());
416 if record.triggered_at.is_some() {
417 registry.trigger(*sub_id);
418 }
419 }
420 }
421 registry
422 };
423
424 let pending_hierarchy_cascade: std::collections::HashSet<TaskId> = authority
426 .projection()
427 .task_records()
428 .filter(|tr| tr.canceled_at().is_some())
429 .map(|tr| tr.task_spec().id())
430 .collect();
431
432 #[cfg(feature = "actor")]
434 let actor_registry = {
435 let mut registry = actionqueue_actor::ActorRegistry::new();
436 for (actor_id, record) in authority.projection().actors() {
437 if record.deregistered_at.is_none() {
438 let caps = actionqueue_core::actor::ActorCapabilities::new(
439 record.capabilities.clone(),
440 )
441 .unwrap_or_else(|_| {
442 actionqueue_core::actor::ActorCapabilities::new(vec!["_".to_string()])
443 .expect("fallback capability")
444 });
445 let mut reg = actionqueue_core::actor::ActorRegistration::new(
446 *actor_id,
447 record.identity.clone(),
448 caps,
449 record.heartbeat_interval_secs,
450 );
451 if let Some(tid) = record.tenant_id {
452 reg = reg.with_tenant(tid);
453 }
454 if let Some(dept_str) = &record.department {
455 if let Ok(dept) = actionqueue_core::ids::DepartmentId::new(dept_str.clone())
456 {
457 reg = reg.with_department(dept);
458 }
459 }
460 registry.register(reg);
461 }
462 }
463 registry
464 };
465
466 #[cfg(feature = "actor")]
468 let heartbeat_monitor = {
469 let mut monitor = actionqueue_actor::HeartbeatMonitor::new();
470 for (actor_id, record) in authority.projection().actors() {
471 if record.deregistered_at.is_none() {
472 let policy = actionqueue_core::actor::HeartbeatPolicy::with_default_multiplier(
473 record.heartbeat_interval_secs,
474 );
475 let last_beat = record.last_heartbeat_at.unwrap_or(record.registered_at);
476 monitor.record_registration(*actor_id, policy, last_beat);
477 }
478 }
479 monitor
480 };
481
482 #[cfg(feature = "actor")]
484 let department_registry = {
485 let mut registry = actionqueue_actor::DepartmentRegistry::new();
486 for (actor_id, record) in authority.projection().actors() {
487 if record.deregistered_at.is_none() {
488 if let Some(dept_str) = &record.department {
489 if let Ok(dept) = actionqueue_core::ids::DepartmentId::new(dept_str.clone())
490 {
491 registry.assign(*actor_id, dept);
492 }
493 }
494 }
495 }
496 registry
497 };
498
499 #[cfg(feature = "platform")]
501 let tenant_registry = {
502 let mut registry = actionqueue_platform::TenantRegistry::new();
503 for (_, record) in authority.projection().tenants() {
504 registry.register(actionqueue_core::platform::TenantRegistration::new(
505 record.tenant_id,
506 record.name.clone(),
507 ));
508 }
509 registry
510 };
511
512 #[cfg(feature = "platform")]
514 let rbac_enforcer = {
515 let mut enforcer = actionqueue_platform::RbacEnforcer::new();
516 for record in authority.projection().role_assignments() {
517 enforcer.assign_role(record.actor_id, record.role.clone(), record.tenant_id);
518 }
519 for record in authority.projection().capability_grants() {
520 if record.revoked_at.is_none() {
521 enforcer.grant_capability(
522 record.actor_id,
523 record.capability.clone(),
524 record.tenant_id,
525 );
526 }
527 }
528 enforcer
529 };
530
531 #[cfg(feature = "platform")]
533 let ledger = {
534 let mut ledger = actionqueue_platform::AppendLedger::new();
535 for record in authority.projection().ledger_entries() {
536 let entry = actionqueue_core::platform::LedgerEntry::new(
537 record.entry_id,
538 record.tenant_id,
539 record.ledger_key.clone(),
540 record.payload.clone(),
541 record.timestamp,
542 );
543 let entry =
544 if let Some(aid) = record.actor_id { entry.with_actor(aid) } else { entry };
545 ledger.append(entry);
546 }
547 ledger
548 };
549
550 Ok(Self {
551 authority,
552 runner: Arc::new(AttemptRunner::new(handler)),
553 clock,
554 identity: LocalExecutorIdentity,
555 key_gate: KeyGate::new(),
556 backoff,
557 max_concurrent: config.max_concurrent,
558 lease_timeout_secs: config.lease_timeout_secs,
559 result_tx,
560 result_rx,
561 in_flight: HashMap::new(),
562 pending_result: None,
563 draining: false,
564 snapshot_path: config.snapshot_path,
565 snapshot_event_threshold: config.snapshot_event_threshold,
566 events_since_last_snapshot: 0,
567 submission_tx,
568 submission_rx,
569 dependency_gate,
570 hierarchy_tracker,
571 pending_hierarchy_cascade,
572 #[cfg(feature = "budget")]
573 budget_tracker,
574 #[cfg(feature = "budget")]
575 subscription_registry,
576 #[cfg(feature = "workflow")]
577 cron_schedule_cache: actionqueue_engine::derive::cron::CronScheduleCache::new(),
578 #[cfg(feature = "actor")]
579 actor_registry,
580 #[cfg(feature = "actor")]
581 heartbeat_monitor,
582 #[cfg(feature = "actor")]
583 department_registry,
584 #[cfg(feature = "platform")]
585 tenant_registry,
586 #[cfg(feature = "platform")]
587 rbac_enforcer,
588 #[cfg(feature = "platform")]
589 ledger,
590 pending_gc_tasks: std::collections::HashSet::new(),
591 })
592 }
593
594 pub fn projection(&self) -> &ReplayReducer {
596 self.authority.projection()
597 }
598
599 fn next_sequence(&self) -> Result<u64, DispatchError> {
601 self.authority
602 .projection()
603 .latest_sequence()
604 .checked_add(1)
605 .ok_or(DispatchError::SequenceOverflow)
606 }
607
608 fn drain_submissions(&mut self, current_time: u64) -> Result<(), DispatchError> {
615 while let Some(submission) = self.submission_rx.try_recv() {
616 match self.process_submission(submission, current_time) {
617 Ok(()) => {}
618 Err(DispatchError::SubmissionRejected { ref task_id, ref context }) => {
619 tracing::error!(
620 %task_id,
621 %context,
622 "workflow submission rejected; submission dropped"
623 );
624 }
625 Err(DispatchError::DependencyCycle(ref err)) => {
626 tracing::error!(
627 error = %err,
628 "workflow submission rejected (dependency cycle); submission dropped"
629 );
630 }
631 Err(fatal) => return Err(fatal),
632 }
633 }
634 Ok(())
635 }
636
637 fn process_submission(
639 &mut self,
640 submission: actionqueue_workflow::submission::TaskSubmission,
641 current_time: u64,
642 ) -> Result<(), DispatchError> {
643 let (task_spec, mut dependencies) = submission.into_parts();
644 let task_id = task_spec.id();
645 let parent_task_id = task_spec.parent_task_id();
646
647 {
649 let mut seen = std::collections::HashSet::new();
650 dependencies.retain(|id| seen.insert(*id));
651 }
652
653 if let Some(parent_id) = parent_task_id {
655 if self.authority.projection().get_task(&parent_id).is_none() {
656 return Err(DispatchError::SubmissionRejected {
657 task_id: task_spec.id(),
658 context: format!("parent {parent_id} not found"),
659 });
660 }
661 if self.hierarchy_tracker.is_terminal(parent_id) {
662 return Err(DispatchError::SubmissionRejected {
663 task_id: task_spec.id(),
664 context: format!("parent {parent_id} is terminal (orphan prevention)"),
665 });
666 }
667 }
668
669 let task_seq = self.next_sequence()?;
671 let _ = self
672 .authority
673 .submit_command(
674 actionqueue_core::mutation::MutationCommand::TaskCreate(
675 actionqueue_core::mutation::TaskCreateCommand::new(
676 task_seq,
677 task_spec.clone(),
678 current_time,
679 ),
680 ),
681 actionqueue_core::mutation::DurabilityPolicy::Immediate,
682 )
683 .map_err(DispatchError::Authority)?;
684
685 let already_derived = self.authority.projection().run_ids_for_task(task_id).len() as u32;
687 let derivation = actionqueue_engine::derive::derive_runs(
688 &self.clock,
689 task_id,
690 task_spec.run_policy(),
691 already_derived,
692 current_time,
693 )
694 .map_err(DispatchError::Derivation)?;
695
696 for run in derivation.into_derived() {
697 let run_seq = self.next_sequence()?;
698 let _ = self
699 .authority
700 .submit_command(
701 actionqueue_core::mutation::MutationCommand::RunCreate(
702 actionqueue_core::mutation::RunCreateCommand::new(run_seq, run),
703 ),
704 actionqueue_core::mutation::DurabilityPolicy::Immediate,
705 )
706 .map_err(DispatchError::Authority)?;
707 }
708
709 if !dependencies.is_empty() {
711 self.dependency_gate
715 .check_cycle(task_id, &dependencies)
716 .map_err(DispatchError::DependencyCycle)?;
717
718 for prereq_id in &dependencies {
723 if self.authority.projection().get_task(prereq_id).is_none() {
724 return Err(DispatchError::SubmissionRejected {
725 task_id,
726 context: format!("prerequisite {prereq_id} not yet in projection"),
727 });
728 }
729 }
730 let dep_seq = self.next_sequence()?;
731 let _ = self
732 .authority
733 .submit_command(
734 MutationCommand::DependencyDeclare(DependencyDeclareCommand::new(
735 dep_seq,
736 task_id,
737 dependencies.clone(),
738 current_time,
739 )),
740 DurabilityPolicy::Immediate,
741 )
742 .map_err(DispatchError::Authority)?;
743 let _ = self.dependency_gate.declare(task_id, dependencies);
745 }
746
747 if let Some(parent_id) = parent_task_id {
749 let _ = self.hierarchy_tracker.register_child(parent_id, task_id);
751 }
752
753 tracing::debug!(task_id = %task_id, "workflow submission committed");
754 Ok(())
755 }
756
757 fn drain_completed_results(
760 &mut self,
761 result: &mut TickResult,
762 current_time: u64,
763 ) -> Result<(), DispatchError> {
764 if let Some(stashed) = self.pending_result.take() {
766 self.process_worker_result(stashed, result, current_time)?;
767 }
768 while let Ok(worker_result) = self.result_rx.try_recv() {
769 self.process_worker_result(worker_result, result, current_time)?;
770 }
771
772 Ok(())
773 }
774
775 fn process_worker_result(
776 &mut self,
777 worker_result: WorkerResult,
778 result: &mut TickResult,
779 current_time: u64,
780 ) -> Result<(), DispatchError> {
781 let run_id = worker_result.run_id;
782 let attempt_id = worker_result.attempt_id;
783
784 tracing::debug!(%run_id, %attempt_id, "worker result received");
785
786 let seq = self.next_sequence()?;
788 let finish_cmd =
789 actionqueue_engine::scheduler::attempt_finish::build_attempt_finish_command(
790 seq,
791 run_id,
792 attempt_id,
793 &worker_result.response,
794 current_time,
795 );
796 let _ = actionqueue_engine::scheduler::attempt_finish::submit_attempt_finish_via_authority(
797 finish_cmd,
798 DurabilityPolicy::Immediate,
799 &mut self.authority,
800 )
801 .map_err(|e| DispatchError::Authority(e.into_source()))?;
802
803 let suspended_count = self
807 .authority
808 .projection()
809 .get_attempt_history(&run_id)
810 .map(|history| {
811 history
812 .iter()
813 .filter(|a| {
814 a.result() == Some(actionqueue_core::mutation::AttemptResultKind::Suspended)
815 })
816 .count() as u32
817 })
818 .unwrap_or(0);
819 let effective_attempt = worker_result.attempt_number.saturating_sub(suspended_count);
820
821 let outcome_kind =
825 actionqueue_executor_local::AttemptOutcomeKind::from_response(&worker_result.response);
826 let retry_input = actionqueue_executor_local::RetryDecisionInput {
827 run_id,
828 attempt_id,
829 attempt_number: effective_attempt,
830 max_attempts: worker_result.max_attempts,
831 outcome_kind,
832 };
833 let decision = actionqueue_executor_local::retry::decide_retry_transition(&retry_input)
834 .map_err(DispatchError::RetryDecision)?;
835 let new_state = Some(decision.target_state());
836
837 if let Some(target_state) = new_state {
838 tracing::info!(%run_id, ?target_state, "run state transition applied");
839
840 if let Some(inf) = self.in_flight.get(&run_id) {
842 let seq = self.next_sequence()?;
843 let _ = self
844 .authority
845 .submit_command(
846 MutationCommand::LeaseRelease(LeaseReleaseCommand::new(
847 seq,
848 run_id,
849 self.identity.identity(),
850 inf.lease_expiry,
851 current_time,
852 )),
853 DurabilityPolicy::Immediate,
854 )
855 .map_err(DispatchError::Authority)?;
856 }
857
858 let seq = self.next_sequence()?;
862 if target_state == RunState::Suspended {
863 let _ = self
864 .authority
865 .submit_command(
866 MutationCommand::RunSuspend(
867 actionqueue_core::mutation::RunSuspendCommand::new(
868 seq,
869 run_id,
870 None,
871 current_time,
872 ),
873 ),
874 DurabilityPolicy::Immediate,
875 )
876 .map_err(DispatchError::Authority)?;
877 } else {
878 let _ = self
879 .authority
880 .submit_command(
881 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
882 seq,
883 run_id,
884 RunState::Running,
885 target_state,
886 current_time,
887 )),
888 DurabilityPolicy::Immediate,
889 )
890 .map_err(DispatchError::Authority)?;
891 }
892
893 let task_id = self.in_flight.get(&run_id).map(|inf| inf.task_id);
895
896 if let Some(inf) = self.in_flight.get(&run_id) {
899 tracing::debug!(
900 %run_id,
901 attempt_id = %inf.attempt_id,
902 attempt_number = inf.attempt_number,
903 max_attempts = inf.max_attempts,
904 ?target_state,
905 "processing run completion"
906 );
907 Self::try_release_concurrency_key(
908 &self.authority,
909 &mut self.key_gate,
910 run_id,
911 inf.task_id,
912 target_state,
913 );
914 }
915
916 if target_state.is_terminal() {
919 result.completed += 1;
920 if let Some(tid) = task_id {
921 self.notify_dependency_gate_terminal(tid, current_time)?;
923 }
924 }
925
926 #[cfg(feature = "budget")]
928 if let Some(tid) = task_id {
929 self.fire_events_for_transition(tid, target_state)?;
930 }
931 }
932
933 #[cfg(feature = "budget")]
939 {
940 use actionqueue_core::mutation::{BudgetConsumeCommand, MutationCommand as MC};
941 if let Some(inf) = self.in_flight.get(&run_id) {
942 let task_id = inf.task_id;
943 for c in &worker_result.consumption {
944 let seq = self.next_sequence()?;
945 let _ = self
946 .authority
947 .submit_command(
948 MC::BudgetConsume(BudgetConsumeCommand::new(
949 seq,
950 task_id,
951 c.dimension,
952 c.amount,
953 current_time,
954 )),
955 DurabilityPolicy::Deferred,
956 )
957 .map_err(DispatchError::Authority)?;
958 self.budget_tracker.consume(task_id, c.dimension, c.amount);
959 }
960 if !worker_result.consumption.is_empty() {
962 self.fire_budget_threshold_events(task_id)?;
963 }
964 }
965 }
966
967 self.in_flight.remove(&run_id);
969 Ok(())
970 }
971
972 fn notify_dependency_gate_terminal(
979 &mut self,
980 task_id: actionqueue_core::ids::TaskId,
981 current_time: u64,
982 ) -> Result<(), DispatchError> {
983 let all_runs_terminal =
985 self.authority.projection().runs_for_task(task_id).all(|r| r.state().is_terminal());
986
987 if !all_runs_terminal {
988 return Ok(()); }
990
991 self.hierarchy_tracker.mark_terminal(task_id);
993
994 self.pending_gc_tasks.insert(task_id);
996
997 let has_completed = self
999 .authority
1000 .projection()
1001 .runs_for_task(task_id)
1002 .any(|r| r.state() == RunState::Completed);
1003
1004 if has_completed {
1005 let newly_eligible = self.dependency_gate.notify_completed(task_id);
1009 if !newly_eligible.is_empty() {
1010 tracing::debug!(
1011 task_id = %task_id,
1012 newly_eligible = newly_eligible.len(),
1013 "dependency gate: prerequisite satisfied, dependents now eligible"
1014 );
1015 }
1016 } else {
1017 let newly_blocked = self.dependency_gate.notify_failed(task_id);
1019 for blocked_id in newly_blocked {
1020 tracing::debug!(
1021 failed_prerequisite = %task_id,
1022 blocked_task = %blocked_id,
1023 "dependency gate: cascading failure to dependent task"
1024 );
1025 let runs_to_cancel: Vec<_> = self
1027 .authority
1028 .projection()
1029 .runs_for_task(blocked_id)
1030 .filter(|r| !r.state().is_terminal())
1031 .map(|r| (r.id(), r.state()))
1032 .collect();
1033 for (run_id, current_state) in runs_to_cancel {
1034 let seq = self.next_sequence()?;
1035 let _ = self
1036 .authority
1037 .submit_command(
1038 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1039 seq,
1040 run_id,
1041 current_state,
1042 RunState::Canceled,
1043 current_time,
1044 )),
1045 DurabilityPolicy::Immediate,
1046 )
1047 .map_err(DispatchError::Authority)?;
1048 }
1049 }
1050 }
1051
1052 Ok(())
1053 }
1054
1055 fn cascade_hierarchy_cancellations(&mut self, current_time: u64) -> Result<(), DispatchError> {
1069 let canceled_task_ids: Vec<TaskId> =
1070 self.pending_hierarchy_cascade.iter().copied().collect();
1071
1072 let mut completed_cascades: Vec<TaskId> = Vec::new();
1073
1074 for canceled_id in canceled_task_ids {
1075 let all_runs_terminal = self
1077 .authority
1078 .projection()
1079 .runs_for_task(canceled_id)
1080 .all(|r| r.state().is_terminal());
1081 if all_runs_terminal {
1082 self.hierarchy_tracker.mark_terminal(canceled_id);
1083 }
1084
1085 let cascade = self.hierarchy_tracker.collect_cancellation_cascade(canceled_id);
1086 if cascade.is_empty() {
1087 completed_cascades.push(canceled_id);
1088 continue;
1089 }
1090
1091 for descendant_id in cascade {
1092 tracing::debug!(
1093 canceled_ancestor = %canceled_id,
1094 descendant = %descendant_id,
1095 "hierarchy: cascading cancellation to descendant"
1096 );
1097
1098 if !self.authority.projection().is_task_canceled(descendant_id) {
1100 let seq = self.next_sequence()?;
1101 let _ = self
1102 .authority
1103 .submit_command(
1104 MutationCommand::TaskCancel(TaskCancelCommand::new(
1105 seq,
1106 descendant_id,
1107 current_time,
1108 )),
1109 DurabilityPolicy::Immediate,
1110 )
1111 .map_err(DispatchError::Authority)?;
1112 self.pending_hierarchy_cascade.insert(descendant_id);
1114 }
1115
1116 let runs_to_cancel: Vec<_> = self
1118 .authority
1119 .projection()
1120 .runs_for_task(descendant_id)
1121 .filter(|r| !r.state().is_terminal())
1122 .map(|r| (r.id(), r.state()))
1123 .collect();
1124
1125 for (run_id, prev_state) in runs_to_cancel {
1126 let seq = self.next_sequence()?;
1127 let _ = self
1128 .authority
1129 .submit_command(
1130 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1131 seq,
1132 run_id,
1133 prev_state,
1134 RunState::Canceled,
1135 current_time,
1136 )),
1137 DurabilityPolicy::Immediate,
1138 )
1139 .map_err(DispatchError::Authority)?;
1140
1141 Self::try_release_concurrency_key(
1145 &self.authority,
1146 &mut self.key_gate,
1147 run_id,
1148 descendant_id,
1149 RunState::Canceled,
1150 );
1151 }
1152
1153 self.hierarchy_tracker.mark_terminal(descendant_id);
1155 }
1156 }
1157
1158 for task_id in completed_cascades {
1160 self.pending_hierarchy_cascade.remove(&task_id);
1161 }
1162
1163 Ok(())
1164 }
1165
1166 fn gc_terminal_tasks(&mut self) {
1173 let candidates: Vec<TaskId> = self.pending_gc_tasks.iter().copied().collect();
1174
1175 for task_id in candidates {
1176 if !self.hierarchy_tracker.collect_cancellation_cascade(task_id).is_empty() {
1180 continue;
1181 }
1182
1183 self.pending_gc_tasks.remove(&task_id);
1184
1185 self.dependency_gate.gc_task(task_id);
1187 self.hierarchy_tracker.gc_subtree(task_id);
1188
1189 #[cfg(feature = "budget")]
1190 self.budget_tracker.gc_task(task_id);
1191
1192 #[cfg(feature = "budget")]
1193 self.subscription_registry.gc_task(task_id);
1194
1195 #[cfg(feature = "workflow")]
1196 self.cron_schedule_cache.remove(task_id);
1197 }
1198 }
1199
1200 #[cfg(feature = "workflow")]
1211 fn derive_cron_runs(&mut self, current_time: u64) -> Result<(), DispatchError> {
1212 use actionqueue_core::task::run_policy::RunPolicy;
1213 use actionqueue_engine::derive::cron::{derive_cron_cached, CRON_WINDOW_SIZE};
1214
1215 let cron_tasks: Vec<(TaskId, actionqueue_core::task::run_policy::CronPolicy)> = self
1217 .authority
1218 .projection()
1219 .task_records()
1220 .filter_map(|tr| {
1221 if let RunPolicy::Cron(ref policy) = *tr.task_spec().run_policy() {
1222 Some((tr.task_spec().id(), policy.clone()))
1223 } else {
1224 None
1225 }
1226 })
1227 .collect();
1228
1229 for (task_id, policy) in cron_tasks {
1230 if self.authority.projection().is_task_canceled(task_id) {
1232 continue;
1233 }
1234
1235 let all_runs: Vec<RunInstance> =
1239 self.authority.projection().runs_for_task(task_id).cloned().collect();
1240
1241 let total_derived = u32::try_from(all_runs.len()).unwrap_or(u32::MAX);
1242 let non_terminal_count =
1243 u32::try_from(all_runs.iter().filter(|r| !r.state().is_terminal()).count())
1244 .unwrap_or(u32::MAX);
1245
1246 if let Some(max) = policy.max_occurrences() {
1248 if total_derived >= max {
1249 continue; }
1251 }
1252
1253 let to_derive = CRON_WINDOW_SIZE.saturating_sub(non_terminal_count);
1254 if to_derive == 0 {
1255 continue;
1256 }
1257
1258 let to_derive = if let Some(max) = policy.max_occurrences() {
1260 to_derive.min(max.saturating_sub(total_derived))
1261 } else {
1262 to_derive
1263 };
1264 if to_derive == 0 {
1265 continue;
1266 }
1267
1268 let last_scheduled_at = all_runs
1272 .iter()
1273 .map(|r| r.scheduled_at())
1274 .max()
1275 .unwrap_or_else(|| current_time.saturating_sub(1));
1276
1277 self.cron_schedule_cache.ensure(task_id, &policy);
1280 let schedule =
1282 self.cron_schedule_cache.get(task_id).expect("schedule was just ensured");
1283 let new_runs =
1284 derive_cron_cached(task_id, schedule, last_scheduled_at, current_time, to_derive)
1285 .map_err(DispatchError::Derivation)?;
1286
1287 if new_runs.is_empty() {
1288 continue; }
1290
1291 tracing::debug!(
1292 %task_id,
1293 count = new_runs.len(),
1294 "cron: deriving rolling window runs"
1295 );
1296
1297 for run in new_runs {
1298 let seq = self.next_sequence()?;
1299 let _ = self
1300 .authority
1301 .submit_command(
1302 MutationCommand::RunCreate(RunCreateCommand::new(seq, run)),
1303 DurabilityPolicy::Immediate,
1304 )
1305 .map_err(DispatchError::Authority)?;
1306 }
1307 }
1308
1309 Ok(())
1310 }
1311
1312 fn heartbeat_in_flight_leases(&mut self, current_time: u64) -> Result<(), DispatchError> {
1314 let heartbeat_threshold = (self.lease_timeout_secs / 3).max(1);
1318 let run_ids_needing_heartbeat: Vec<actionqueue_core::ids::RunId> = self
1319 .in_flight
1320 .values()
1321 .filter(|inf| current_time.saturating_add(heartbeat_threshold) >= inf.lease_expiry)
1322 .map(|inf| inf.run_id)
1323 .collect();
1324
1325 for run_id in run_ids_needing_heartbeat {
1326 let new_expiry = current_time.saturating_add(self.lease_timeout_secs);
1327 let (attempt_id, attempt_number, max_attempts) = self
1328 .in_flight
1329 .get(&run_id)
1330 .map(|inf| (inf.attempt_id, inf.attempt_number, inf.max_attempts))
1331 .unwrap_or_default();
1332 tracing::debug!(
1333 %run_id, %attempt_id, attempt_number, max_attempts,
1334 new_expiry, "lease heartbeat extended"
1335 );
1336 let seq = self.next_sequence()?;
1337 let _ = self
1338 .authority
1339 .submit_command(
1340 MutationCommand::LeaseHeartbeat(LeaseHeartbeatCommand::new(
1341 seq,
1342 run_id,
1343 self.identity.identity(),
1344 new_expiry,
1345 current_time,
1346 )),
1347 DurabilityPolicy::Immediate,
1348 )
1349 .map_err(DispatchError::Authority)?;
1350
1351 if let Some(inf) = self.in_flight.get_mut(&run_id) {
1352 inf.lease_expiry = new_expiry;
1353 }
1354 }
1355
1356 Ok(())
1357 }
1358
1359 fn try_release_concurrency_key(
1362 authority: &StorageMutationAuthority<W, ReplayReducer>,
1363 key_gate: &mut KeyGate,
1364 run_id: RunId,
1365 task_id: TaskId,
1366 target_state: RunState,
1367 ) {
1368 let should_release = if target_state.is_terminal() {
1369 true
1370 } else if target_state == RunState::RetryWait || target_state == RunState::Suspended {
1371 authority
1374 .projection()
1375 .get_task(&task_id)
1376 .map(|task| {
1377 task.constraints().concurrency_key_hold_policy()
1378 == ConcurrencyKeyHoldPolicy::ReleaseOnRetry
1379 })
1380 .unwrap_or(false)
1381 } else {
1382 return;
1383 };
1384
1385 if !should_release {
1386 return;
1387 }
1388
1389 let Some(task) = authority.projection().get_task(&task_id) else {
1390 tracing::warn!(%run_id, %task_id, "skipping key release: task not found");
1391 return;
1392 };
1393
1394 let Some(key_str) = task.constraints().concurrency_key() else {
1395 return; };
1397
1398 let key = ConcurrencyKey::new(key_str);
1399 match key_gate.release(key, run_id) {
1400 ReleaseResult::Released { .. } => {}
1401 ReleaseResult::NotHeld { key: k, attempting_run_id } => {
1402 tracing::warn!(
1403 %attempting_run_id, key = %k,
1404 "concurrency key release failed — key not held by this run"
1405 );
1406 }
1407 }
1408 }
1409
1410 pub async fn tick(&mut self) -> Result<TickResult, DispatchError> {
1426 tracing::trace!("dispatch tick starting");
1427 let mut result = TickResult::default();
1428 let current_time = self.clock.now();
1429 let seq_before_tick = self.authority.projection().latest_sequence();
1430
1431 self.drain_submissions(current_time)?;
1433
1434 self.drain_completed_results(&mut result, current_time)?;
1436
1437 #[cfg(feature = "budget")]
1439 self.signal_budget_exhaustion_cancellations();
1440
1441 self.cascade_hierarchy_cancellations(current_time)?;
1443
1444 self.gc_terminal_tasks();
1447
1448 #[cfg(feature = "actor")]
1452 self.check_actor_heartbeat_timeouts()?;
1453 #[cfg(feature = "workflow")]
1454 self.derive_cron_runs(current_time)?;
1455
1456 self.heartbeat_in_flight_leases(current_time)?;
1458
1459 if self.authority.projection().is_engine_paused() {
1461 result.engine_paused = true;
1462 let events_this_tick =
1463 self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1464 self.events_since_last_snapshot += events_this_tick;
1465 self.maybe_write_snapshot(current_time)?;
1466 return Ok(result);
1467 }
1468
1469 if self.draining {
1470 let events_this_tick =
1471 self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1472 self.events_since_last_snapshot += events_this_tick;
1473 self.maybe_write_snapshot(current_time)?;
1474 return Ok(result);
1475 }
1476
1477 let scheduled_runs: Vec<RunInstance> = self
1481 .authority
1482 .projection()
1483 .run_instances()
1484 .filter(|r| r.state() == RunState::Scheduled)
1485 .filter(|r| self.dependency_gate.is_eligible(r.task_id()))
1486 .cloned()
1487 .collect();
1488
1489 if !scheduled_runs.is_empty() {
1490 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
1491 let promo_result = promote_scheduled_to_ready_via_authority(
1492 &scheduled_index,
1493 PromotionParams::new(
1494 current_time,
1495 self.next_sequence()?,
1496 current_time,
1497 DurabilityPolicy::Immediate,
1498 ),
1499 &mut self.authority,
1500 )
1501 .map_err(DispatchError::ScheduledPromotion)?;
1502 result.promoted_scheduled = promo_result.outcomes().len();
1503 }
1504
1505 #[cfg(feature = "budget")]
1508 {
1509 let promoted = self.promote_subscription_triggered_scheduled(current_time)?;
1510 result.promoted_scheduled += promoted;
1511 }
1512
1513 let retry_waiting: Vec<RunInstance> = self
1515 .authority
1516 .projection()
1517 .run_instances()
1518 .filter(|r| r.state() == RunState::RetryWait)
1519 .cloned()
1520 .collect();
1521
1522 if !retry_waiting.is_empty() {
1523 let promo = promote_retry_wait_to_ready(&retry_waiting, current_time, &*self.backoff)
1524 .map_err(DispatchError::RetryPromotion)?;
1525 for run in promo.promoted() {
1526 let seq = self.next_sequence()?;
1527 let _ = self
1528 .authority
1529 .submit_command(
1530 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1531 seq,
1532 run.id(),
1533 RunState::RetryWait,
1534 RunState::Ready,
1535 current_time,
1536 )),
1537 DurabilityPolicy::Immediate,
1538 )
1539 .map_err(DispatchError::Authority)?;
1540 }
1541 result.promoted_retry_wait = promo.promoted().len();
1542 }
1543
1544 let ready_runs: Vec<RunInstance> = self
1546 .authority
1547 .projection()
1548 .run_instances()
1549 .filter(|r| r.state() == RunState::Ready)
1550 .cloned()
1551 .collect();
1552
1553 if ready_runs.is_empty() {
1554 let events_this_tick =
1555 self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1556 self.events_since_last_snapshot += events_this_tick;
1557 self.maybe_write_snapshot(current_time)?;
1558 return Ok(result);
1559 }
1560
1561 let ready_index = ReadyIndex::from_runs(ready_runs);
1562 let inputs = ready_inputs_from_index(&ready_index);
1563 let selection = select_ready_runs(&inputs);
1564
1565 let available_slots = self.max_concurrent.saturating_sub(self.in_flight.len());
1567 let mut dispatched = 0usize;
1568 for run in selection.into_selected() {
1569 if dispatched >= available_slots {
1570 break;
1571 }
1572
1573 let task = match self.authority.projection().get_task(&run.task_id()) {
1575 Some(t) => t,
1576 None => {
1577 tracing::warn!(
1578 run_id = %run.id(),
1579 task_id = %run.task_id(),
1580 "skipping run: parent task not found in projection"
1581 );
1582 continue;
1583 }
1584 };
1585
1586 let payload = task.payload().to_vec();
1588 let constraints = task.constraints().clone();
1589
1590 #[cfg(feature = "budget")]
1592 {
1593 let gate = actionqueue_budget::BudgetGate::new(&self.budget_tracker);
1594 if !gate.can_dispatch(run.task_id()) {
1595 tracing::debug!(
1596 run_id = %run.id(),
1597 task_id = %run.task_id(),
1598 "skipping run: budget exhausted"
1599 );
1600 continue;
1601 }
1602 }
1603
1604 let acquired_key = if let Some(key_str) = constraints.concurrency_key() {
1606 let key = actionqueue_engine::concurrency::key_gate::ConcurrencyKey::new(key_str);
1607 match self.key_gate.acquire(key.clone(), run.id()) {
1608 actionqueue_engine::concurrency::key_gate::AcquireResult::Acquired {
1609 ..
1610 } => Some(key),
1611 actionqueue_engine::concurrency::key_gate::AcquireResult::Occupied {
1612 ..
1613 } => continue,
1614 }
1615 } else {
1616 None
1617 };
1618
1619 match self.dispatch_single_run(&run, &constraints, current_time) {
1623 Ok((attempt_id, lease_expiry, attempt_number, max_attempts)) => {
1624 if constraints.safety_level() == SafetyLevel::Transactional && max_attempts > 1
1627 {
1628 tracing::warn!(
1629 run_id = %run.id(),
1630 max_attempts,
1631 "task has Transactional safety level with retries — retries may \
1632 cause duplicate side effects"
1633 );
1634 }
1635
1636 let run_id = run.id();
1638 let task_id = run.task_id();
1639
1640 #[cfg(feature = "budget")]
1644 let (cancellation_ctx, cancel_ctx_clone) = {
1645 let ctx = actionqueue_executor_local::handler::CancellationContext::new();
1646 let clone = Some(ctx.clone());
1647 (Some(ctx), clone)
1648 };
1649 #[cfg(not(feature = "budget"))]
1650 let cancellation_ctx: Option<
1651 actionqueue_executor_local::handler::CancellationContext,
1652 > = None;
1653
1654 self.in_flight.insert(
1655 run_id,
1656 InFlightRun {
1657 run_id,
1658 attempt_id,
1659 task_id,
1660 lease_expiry,
1661 attempt_number,
1662 max_attempts,
1663 #[cfg(feature = "budget")]
1664 cancellation_context: cancel_ctx_clone,
1665 },
1666 );
1667
1668 let submission = Some(std::sync::Arc::clone(&self.submission_tx)
1673 as std::sync::Arc<dyn actionqueue_executor_local::TaskSubmissionPort>);
1674 let children = build_children_snapshot(self.authority.projection(), task_id);
1675
1676 tracing::info!(
1678 %run_id, %attempt_id, attempt_number,
1679 "spawning handler for attempt"
1680 );
1681 let runner = Arc::clone(&self.runner);
1682 let result_tx = self.result_tx.clone();
1683
1684 tokio::task::spawn_blocking(move || {
1685 let request = ExecutorRequest {
1686 run_id,
1687 attempt_id,
1688 payload,
1689 constraints,
1690 attempt_number,
1691 submission,
1692 children,
1693 cancellation_context: cancellation_ctx,
1694 };
1695 let outcome = runner.run_attempt(request);
1696
1697 let worker_result = WorkerResult {
1698 run_id,
1699 attempt_id,
1700 response: outcome.response,
1701 max_attempts,
1702 attempt_number,
1703 consumption: outcome.consumption,
1704 };
1705
1706 if result_tx.send(worker_result).is_err() {
1707 tracing::error!(
1708 %run_id,
1709 "worker result channel closed — dispatch loop may have crashed"
1710 );
1711 }
1712 });
1713
1714 dispatched += 1;
1715 }
1716 Err(e) => {
1717 if let Some(key) = acquired_key {
1720 let _ = self.key_gate.release(key, run.id());
1721 }
1722 tracing::error!(
1723 run_id = %run.id(),
1724 error = %e,
1725 "dispatch failed for run, skipping"
1726 );
1727 continue;
1728 }
1729 }
1730 }
1731
1732 result.dispatched = dispatched;
1733
1734 let events_this_tick =
1737 self.authority.projection().latest_sequence().saturating_sub(seq_before_tick);
1738 self.events_since_last_snapshot += events_this_tick;
1739 self.maybe_write_snapshot(current_time)?;
1740
1741 Ok(result)
1742 }
1743
1744 fn dispatch_single_run(
1749 &mut self,
1750 run: &RunInstance,
1751 constraints: &actionqueue_core::task::constraints::TaskConstraints,
1752 current_time: u64,
1753 ) -> Result<(AttemptId, u64, u32, u32), DispatchError> {
1754 let seq = self.next_sequence()?;
1756 let _ = self
1757 .authority
1758 .submit_command(
1759 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1760 seq,
1761 run.id(),
1762 RunState::Ready,
1763 RunState::Leased,
1764 current_time,
1765 )),
1766 DurabilityPolicy::Immediate,
1767 )
1768 .map_err(DispatchError::Authority)?;
1769
1770 let lease_expiry = current_time.saturating_add(self.lease_timeout_secs);
1772 let seq = self.next_sequence()?;
1773 let _ = self
1774 .authority
1775 .submit_command(
1776 MutationCommand::LeaseAcquire(LeaseAcquireCommand::new(
1777 seq,
1778 run.id(),
1779 self.identity.identity(),
1780 lease_expiry,
1781 current_time,
1782 )),
1783 DurabilityPolicy::Immediate,
1784 )
1785 .map_err(DispatchError::Authority)?;
1786
1787 let seq = self.next_sequence()?;
1789 let _ = self
1790 .authority
1791 .submit_command(
1792 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1793 seq,
1794 run.id(),
1795 RunState::Leased,
1796 RunState::Running,
1797 current_time,
1798 )),
1799 DurabilityPolicy::Immediate,
1800 )
1801 .map_err(DispatchError::Authority)?;
1802
1803 let attempt_id = AttemptId::new();
1805 let seq = self.next_sequence()?;
1806 let _ = self
1807 .authority
1808 .submit_command(
1809 MutationCommand::AttemptStart(AttemptStartCommand::new(
1810 seq,
1811 run.id(),
1812 attempt_id,
1813 current_time,
1814 )),
1815 DurabilityPolicy::Immediate,
1816 )
1817 .map_err(DispatchError::Authority)?;
1818
1819 let max_attempts = constraints.max_attempts();
1820 let attempt_number =
1821 run.attempt_count().checked_add(1).ok_or(DispatchError::SequenceOverflow)?;
1822
1823 Ok((attempt_id, lease_expiry, attempt_number, max_attempts))
1824 }
1825
1826 fn maybe_write_snapshot(&mut self, current_time: u64) -> Result<(), DispatchError> {
1828 let threshold = match self.snapshot_event_threshold {
1829 Some(t) => t,
1830 None => return Ok(()),
1831 };
1832 let path = match &self.snapshot_path {
1833 Some(p) => p.clone(),
1834 None => return Ok(()),
1835 };
1836 if self.events_since_last_snapshot < threshold {
1837 return Ok(());
1838 }
1839
1840 let snapshot = build_snapshot_from_projection(self.authority.projection(), current_time)
1841 .map_err(DispatchError::SnapshotBuild)?;
1842
1843 let mut writer =
1844 SnapshotFsWriter::new(path).map_err(|e| DispatchError::SnapshotInit(format!("{e}")))?;
1845 writer.write(&snapshot).map_err(DispatchError::SnapshotWrite)?;
1846 writer.close().map_err(DispatchError::SnapshotWrite)?;
1847
1848 self.events_since_last_snapshot = 0;
1849 tracing::info!(
1850 wal_sequence = snapshot.metadata.wal_sequence,
1851 task_count = snapshot.metadata.task_count,
1852 run_count = snapshot.metadata.run_count,
1853 "automatic snapshot written"
1854 );
1855
1856 Ok(())
1857 }
1858
1859 fn cancel_dependency_failed_runs(&mut self, current_time: u64) -> Result<(), DispatchError> {
1864 let failed_tasks: Vec<TaskId> = self
1865 .authority
1866 .projection()
1867 .task_records()
1868 .map(|tr| tr.task_spec().id())
1869 .filter(|&tid| self.dependency_gate.is_dependency_failed(tid))
1870 .collect();
1871 for task_id in failed_tasks {
1872 let runs_to_cancel: Vec<_> = self
1873 .authority
1874 .projection()
1875 .runs_for_task(task_id)
1876 .filter(|r| !r.state().is_terminal())
1877 .map(|r| (r.id(), r.state()))
1878 .collect();
1879 for (run_id, current_state) in runs_to_cancel {
1880 let seq = self.next_sequence()?;
1881 let _ = self
1882 .authority
1883 .submit_command(
1884 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1885 seq,
1886 run_id,
1887 current_state,
1888 RunState::Canceled,
1889 current_time,
1890 )),
1891 DurabilityPolicy::Immediate,
1892 )
1893 .map_err(DispatchError::Authority)?;
1894 }
1895 }
1896 Ok(())
1897 }
1898
1899 pub async fn run_until_idle(&mut self) -> Result<RunSummary, DispatchError> {
1901 let mut summary = RunSummary::default();
1902
1903 let current_time = self.clock.now();
1904 self.cancel_dependency_failed_runs(current_time)?;
1905
1906 loop {
1907 let tick = self.tick().await?;
1908 summary.ticks += 1;
1909 summary.total_dispatched += tick.dispatched;
1910 summary.total_completed += tick.completed;
1911
1912 if tick.promoted_scheduled == 0
1915 && tick.promoted_retry_wait == 0
1916 && tick.dispatched == 0
1917 && self.in_flight.is_empty()
1918 {
1919 break;
1920 }
1921
1922 if tick.promoted_scheduled == 0
1927 && tick.promoted_retry_wait == 0
1928 && tick.dispatched == 0
1929 && !self.in_flight.is_empty()
1930 {
1931 if let Some(worker_result) = self.result_rx.recv().await {
1932 self.pending_result = Some(worker_result);
1933 } else {
1934 tracing::warn!(
1937 orphaned_runs = self.in_flight.len(),
1938 "worker result channel closed with in-flight runs"
1939 );
1940 self.in_flight.clear();
1941 break;
1942 }
1943 } else if tick.dispatched > 0 {
1944 tokio::task::yield_now().await;
1947 }
1948 }
1949
1950 Ok(summary)
1951 }
1952
1953 pub fn start_drain(&mut self) {
1956 self.draining = true;
1957 }
1958
1959 pub async fn drain_until_idle(
1961 &mut self,
1962 timeout: std::time::Duration,
1963 ) -> Result<RunSummary, DispatchError> {
1964 self.start_drain();
1965 let deadline = tokio::time::Instant::now() + timeout;
1966 let mut summary = RunSummary::default();
1967
1968 loop {
1969 if self.in_flight.is_empty() {
1970 break;
1971 }
1972
1973 if tokio::time::Instant::now() >= deadline {
1974 tracing::warn!(
1975 remaining_in_flight = self.in_flight.len(),
1976 "drain timeout expired with in-flight runs"
1977 );
1978 break;
1979 }
1980
1981 let tick = self.tick().await?;
1982 summary.ticks += 1;
1983 summary.total_completed += tick.completed;
1984
1985 if tick.completed == 0 && !self.in_flight.is_empty() {
1986 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1988 match tokio::time::timeout(remaining, self.result_rx.recv()).await {
1989 Ok(Some(result)) => {
1990 self.pending_result = Some(result);
1991 }
1992 Ok(None) => {
1993 self.in_flight.clear();
1994 break;
1995 }
1996 Err(_) => {
1997 tracing::warn!(
1998 remaining_in_flight = self.in_flight.len(),
1999 "drain timeout expired waiting for worker results"
2000 );
2001 break;
2002 }
2003 }
2004 }
2005 }
2006
2007 Ok(summary)
2008 }
2009
2010 pub fn submit_task(&mut self, spec: TaskSpec) -> Result<(), DispatchError> {
2012 let current_time = self.clock.now();
2013 let seq = self.next_sequence()?;
2014
2015 let _ = self
2017 .authority
2018 .submit_command(
2019 MutationCommand::TaskCreate(TaskCreateCommand::new(
2020 seq,
2021 spec.clone(),
2022 current_time,
2023 )),
2024 DurabilityPolicy::Immediate,
2025 )
2026 .map_err(DispatchError::Authority)?;
2027
2028 let already_derived = self.authority.projection().run_ids_for_task(spec.id()).len() as u32;
2030 let derivation = actionqueue_engine::derive::derive_runs(
2031 &self.clock,
2032 spec.id(),
2033 spec.run_policy(),
2034 already_derived,
2035 current_time,
2036 )
2037 .map_err(DispatchError::Derivation)?;
2038
2039 for run in derivation.into_derived() {
2040 let seq = self.next_sequence()?;
2041 let _ = self
2042 .authority
2043 .submit_command(
2044 MutationCommand::RunCreate(RunCreateCommand::new(seq, run)),
2045 DurabilityPolicy::Immediate,
2046 )
2047 .map_err(DispatchError::Authority)?;
2048 }
2049
2050 Ok(())
2051 }
2052
2053 pub fn declare_dependency(
2057 &mut self,
2058 task_id: TaskId,
2059 prereqs: Vec<TaskId>,
2060 ) -> Result<(), DispatchError> {
2061 self.dependency_gate
2063 .check_cycle(task_id, &prereqs)
2064 .map_err(DispatchError::DependencyCycle)?;
2065
2066 for prereq_id in &prereqs {
2068 if self.authority.projection().get_task(prereq_id).is_none() {
2069 return Err(DispatchError::SubmissionRejected {
2070 task_id,
2071 context: format!("prerequisite {prereq_id} not in projection"),
2072 });
2073 }
2074 }
2075
2076 let seq = self.next_sequence()?;
2077 let current_time = self.clock.now();
2078 let _ = self
2079 .authority
2080 .submit_command(
2081 MutationCommand::DependencyDeclare(DependencyDeclareCommand::new(
2082 seq,
2083 task_id,
2084 prereqs.clone(),
2085 current_time,
2086 )),
2087 DurabilityPolicy::Immediate,
2088 )
2089 .map_err(DispatchError::Authority)?;
2090
2091 let _ = self.dependency_gate.declare(task_id, prereqs.clone());
2093
2094 for prereq_id in &prereqs {
2098 let runs: Vec<_> = self.authority.projection().runs_for_task(*prereq_id).collect();
2099 if !runs.is_empty() {
2100 let all_terminal = runs.iter().all(|r| r.state().is_terminal());
2101 let has_completed = runs.iter().any(|r| r.state() == RunState::Completed);
2102 if all_terminal && has_completed {
2103 self.dependency_gate.force_satisfy(*prereq_id);
2104 } else if all_terminal && !has_completed {
2105 self.dependency_gate.force_fail(*prereq_id);
2106 }
2107 }
2108 }
2109 self.dependency_gate.recompute_satisfaction_pub(task_id);
2111
2112 Ok(())
2113 }
2114
2115 #[cfg(feature = "budget")]
2120 pub fn allocate_budget(
2121 &mut self,
2122 task_id: TaskId,
2123 dimension: actionqueue_core::budget::BudgetDimension,
2124 limit: u64,
2125 ) -> Result<(), DispatchError> {
2126 use actionqueue_core::mutation::{BudgetAllocateCommand, MutationCommand as MC};
2127 let current_time = self.clock.now();
2128 let seq = self.next_sequence()?;
2129 let _ = self
2130 .authority
2131 .submit_command(
2132 MC::BudgetAllocate(BudgetAllocateCommand::new(
2133 seq,
2134 task_id,
2135 dimension,
2136 limit,
2137 current_time,
2138 )),
2139 DurabilityPolicy::Immediate,
2140 )
2141 .map_err(DispatchError::Authority)?;
2142 self.budget_tracker.allocate(task_id, dimension, limit);
2143 Ok(())
2144 }
2145
2146 #[cfg(feature = "budget")]
2151 pub fn replenish_budget(
2152 &mut self,
2153 task_id: TaskId,
2154 dimension: actionqueue_core::budget::BudgetDimension,
2155 new_limit: u64,
2156 ) -> Result<(), DispatchError> {
2157 use actionqueue_core::mutation::{BudgetReplenishCommand, MutationCommand as MC};
2158 let current_time = self.clock.now();
2159 let seq = self.next_sequence()?;
2160 let _ = self
2161 .authority
2162 .submit_command(
2163 MC::BudgetReplenish(BudgetReplenishCommand::new(
2164 seq,
2165 task_id,
2166 dimension,
2167 new_limit,
2168 current_time,
2169 )),
2170 DurabilityPolicy::Immediate,
2171 )
2172 .map_err(DispatchError::Authority)?;
2173 self.budget_tracker.replenish(task_id, dimension, new_limit);
2174 Ok(())
2175 }
2176
2177 #[cfg(feature = "budget")]
2182 pub fn resume_run(&mut self, run_id: RunId) -> Result<(), DispatchError> {
2183 use actionqueue_core::mutation::{MutationCommand as MC, RunResumeCommand};
2184 let current_time = self.clock.now();
2185 let seq = self.next_sequence()?;
2186 let _ = self
2187 .authority
2188 .submit_command(
2189 MC::RunResume(RunResumeCommand::new(seq, run_id, current_time)),
2190 DurabilityPolicy::Immediate,
2191 )
2192 .map_err(DispatchError::Authority)?;
2193 Ok(())
2194 }
2195
2196 #[cfg(feature = "budget")]
2202 fn signal_budget_exhaustion_cancellations(&self) {
2203 for inf in self.in_flight.values() {
2204 if self.budget_tracker.is_any_exhausted(inf.task_id) {
2205 if let Some(ref ctx) = inf.cancellation_context {
2206 ctx.cancel();
2207 tracing::debug!(
2208 run_id = %inf.run_id,
2209 task_id = %inf.task_id,
2210 "budget exhausted: signaling cancellation to handler"
2211 );
2212 }
2213 }
2214 }
2215 }
2216
2217 #[cfg(feature = "budget")]
2222 fn promote_subscription_triggered_scheduled(
2223 &mut self,
2224 current_time: u64,
2225 ) -> Result<usize, DispatchError> {
2226 let triggered: Vec<RunInstance> = self
2227 .authority
2228 .projection()
2229 .run_instances()
2230 .filter(|r| r.state() == RunState::Scheduled)
2231 .filter(|r| self.subscription_registry.is_triggered(r.task_id()))
2232 .filter(|r| self.dependency_gate.is_eligible(r.task_id()))
2233 .cloned()
2234 .collect();
2235
2236 let count = triggered.len();
2237 for run in &triggered {
2238 let seq = self.next_sequence()?;
2239 let _ = self
2240 .authority
2241 .submit_command(
2242 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
2243 seq,
2244 run.id(),
2245 RunState::Scheduled,
2246 RunState::Ready,
2247 current_time,
2248 )),
2249 DurabilityPolicy::Immediate,
2250 )
2251 .map_err(DispatchError::Authority)?;
2252 self.subscription_registry.clear_triggered(run.task_id());
2253 }
2254 Ok(count)
2255 }
2256
2257 #[cfg(feature = "budget")]
2262 pub fn create_subscription(
2263 &mut self,
2264 task_id: TaskId,
2265 filter: actionqueue_core::subscription::EventFilter,
2266 ) -> Result<actionqueue_core::subscription::SubscriptionId, DispatchError> {
2267 use actionqueue_core::mutation::SubscriptionCreateCommand;
2268 let sub_id = actionqueue_core::subscription::SubscriptionId::new();
2269 let current_time = self.clock.now();
2270 let seq = self.next_sequence()?;
2271 let _ = self
2272 .authority
2273 .submit_command(
2274 MutationCommand::SubscriptionCreate(SubscriptionCreateCommand::new(
2275 seq,
2276 sub_id,
2277 task_id,
2278 filter.clone(),
2279 current_time,
2280 )),
2281 DurabilityPolicy::Immediate,
2282 )
2283 .map_err(DispatchError::Authority)?;
2284 self.subscription_registry.register(sub_id, task_id, filter);
2285 Ok(sub_id)
2286 }
2287
2288 #[cfg(feature = "budget")]
2294 pub fn fire_custom_event(&mut self, key: String) -> Result<(), DispatchError> {
2295 let event = actionqueue_budget::ActionQueueEvent::CustomEvent { key };
2296 let matched = actionqueue_budget::check_event(&event, &self.subscription_registry);
2297 for sub_id in matched {
2298 self.trigger_subscription_durable(sub_id)?;
2299 }
2300 Ok(())
2301 }
2302
2303 #[cfg(feature = "budget")]
2309 fn fire_events_for_transition(
2310 &mut self,
2311 task_id: TaskId,
2312 new_state: RunState,
2313 ) -> Result<(), DispatchError> {
2314 use actionqueue_budget::{check_event, ActionQueueEvent};
2315
2316 let event = ActionQueueEvent::RunChangedState { task_id, new_state };
2318 let matched = check_event(&event, &self.subscription_registry);
2319 for sub_id in matched {
2320 self.trigger_subscription_durable(sub_id)?;
2321 }
2322
2323 if new_state.is_terminal() {
2325 self.fire_task_terminal_success_event(task_id)?;
2326 }
2327 Ok(())
2328 }
2329
2330 #[cfg(feature = "budget")]
2333 fn fire_task_terminal_success_event(&mut self, task_id: TaskId) -> Result<(), DispatchError> {
2334 use actionqueue_budget::{check_event, ActionQueueEvent};
2335
2336 let all_terminal =
2337 self.authority.projection().runs_for_task(task_id).all(|r| r.state().is_terminal());
2338 if !all_terminal {
2339 return Ok(());
2340 }
2341 let any_completed = self
2342 .authority
2343 .projection()
2344 .runs_for_task(task_id)
2345 .any(|r| r.state() == RunState::Completed);
2346 if !any_completed {
2347 return Ok(());
2348 }
2349 let event = ActionQueueEvent::TaskReachedTerminalSuccess { task_id };
2350 let matched = check_event(&event, &self.subscription_registry);
2351 for sub_id in matched {
2352 self.trigger_subscription_durable(sub_id)?;
2353 }
2354 Ok(())
2355 }
2356
2357 #[cfg(feature = "budget")]
2362 fn fire_budget_threshold_events(&mut self, task_id: TaskId) -> Result<(), DispatchError> {
2363 use actionqueue_budget::{check_event, ActionQueueEvent};
2364 use actionqueue_core::budget::BudgetDimension;
2365
2366 for &dim in &[BudgetDimension::Token, BudgetDimension::CostCents, BudgetDimension::TimeSecs]
2367 {
2368 if let Some(pct) = self.budget_tracker.threshold_pct(task_id, dim) {
2369 let event =
2370 ActionQueueEvent::BudgetThresholdCrossed { task_id, dimension: dim, pct };
2371 let matched = check_event(&event, &self.subscription_registry);
2372 for sub_id in matched {
2373 self.trigger_subscription_durable(sub_id)?;
2374 }
2375 }
2376 }
2377 Ok(())
2378 }
2379
2380 #[cfg(feature = "budget")]
2384 fn trigger_subscription_durable(
2385 &mut self,
2386 subscription_id: actionqueue_core::subscription::SubscriptionId,
2387 ) -> Result<(), DispatchError> {
2388 use actionqueue_core::mutation::SubscriptionTriggerCommand;
2389 let current_time = self.clock.now();
2390 let seq = self.next_sequence()?;
2391 let _ = self
2392 .authority
2393 .submit_command(
2394 MutationCommand::SubscriptionTrigger(SubscriptionTriggerCommand::new(
2395 seq,
2396 subscription_id,
2397 current_time,
2398 )),
2399 DurabilityPolicy::Immediate,
2400 )
2401 .map_err(DispatchError::Authority)?;
2402 self.subscription_registry.trigger(subscription_id);
2403 Ok(())
2404 }
2405
2406 #[cfg(feature = "actor")]
2410 pub fn register_actor(
2411 &mut self,
2412 registration: actionqueue_core::actor::ActorRegistration,
2413 ) -> Result<(), DispatchError> {
2414 use actionqueue_core::mutation::{ActorRegisterCommand, MutationCommand};
2415
2416 let actor_id = registration.actor_id();
2417 let policy = actionqueue_core::actor::HeartbeatPolicy::with_default_multiplier(
2418 registration.heartbeat_interval_secs(),
2419 );
2420 let seq = self.next_sequence()?;
2421 let ts = self.clock.now();
2422 let _ = self
2423 .authority
2424 .submit_command(
2425 MutationCommand::ActorRegister(ActorRegisterCommand::new(
2426 seq,
2427 registration.clone(),
2428 ts,
2429 )),
2430 DurabilityPolicy::Immediate,
2431 )
2432 .map_err(DispatchError::Authority)?;
2433
2434 self.actor_registry.register(registration);
2435 self.heartbeat_monitor.record_registration(actor_id, policy, ts);
2436 Ok(())
2437 }
2438
2439 #[cfg(feature = "actor")]
2441 pub fn deregister_actor(
2442 &mut self,
2443 actor_id: actionqueue_core::ids::ActorId,
2444 ) -> Result<(), DispatchError> {
2445 use actionqueue_core::mutation::{ActorDeregisterCommand, MutationCommand};
2446
2447 let seq = self.next_sequence()?;
2448 let ts = self.clock.now();
2449 let _ = self
2450 .authority
2451 .submit_command(
2452 MutationCommand::ActorDeregister(ActorDeregisterCommand::new(seq, actor_id, ts)),
2453 DurabilityPolicy::Immediate,
2454 )
2455 .map_err(DispatchError::Authority)?;
2456
2457 self.actor_registry.deregister(actor_id);
2458 self.heartbeat_monitor.remove(actor_id);
2459 self.department_registry.remove(actor_id);
2460 Ok(())
2461 }
2462
2463 #[cfg(feature = "actor")]
2465 pub fn actor_heartbeat(
2466 &mut self,
2467 actor_id: actionqueue_core::ids::ActorId,
2468 ) -> Result<(), DispatchError> {
2469 use actionqueue_core::mutation::{ActorHeartbeatCommand, MutationCommand};
2470
2471 let seq = self.next_sequence()?;
2472 let ts = self.clock.now();
2473 let _ = self
2474 .authority
2475 .submit_command(
2476 MutationCommand::ActorHeartbeat(ActorHeartbeatCommand::new(seq, actor_id, ts)),
2477 DurabilityPolicy::Immediate,
2478 )
2479 .map_err(DispatchError::Authority)?;
2480
2481 self.heartbeat_monitor.record_heartbeat(actor_id, ts);
2482 Ok(())
2483 }
2484
2485 #[cfg(feature = "actor")]
2490 fn check_actor_heartbeat_timeouts(&mut self) -> Result<(), DispatchError> {
2491 let now = self.clock.now();
2492 let timed_out = self.heartbeat_monitor.check_timeouts(now);
2493 for actor_id in timed_out {
2494 tracing::warn!(%actor_id, "actor heartbeat timeout — deregistering");
2495 self.deregister_actor(actor_id)?;
2496 }
2497 Ok(())
2498 }
2499
2500 #[cfg(feature = "platform")]
2504 pub fn create_tenant(
2505 &mut self,
2506 registration: actionqueue_core::platform::TenantRegistration,
2507 ) -> Result<(), DispatchError> {
2508 use actionqueue_core::mutation::{MutationCommand, TenantCreateCommand};
2509
2510 let seq = self.next_sequence()?;
2511 let ts = self.clock.now();
2512 let _ = self
2513 .authority
2514 .submit_command(
2515 MutationCommand::TenantCreate(TenantCreateCommand::new(
2516 seq,
2517 registration.clone(),
2518 ts,
2519 )),
2520 DurabilityPolicy::Immediate,
2521 )
2522 .map_err(DispatchError::Authority)?;
2523
2524 self.tenant_registry.register(registration);
2525 Ok(())
2526 }
2527
2528 #[cfg(feature = "platform")]
2530 pub fn assign_role(
2531 &mut self,
2532 actor_id: actionqueue_core::ids::ActorId,
2533 role: actionqueue_core::platform::Role,
2534 tenant_id: actionqueue_core::ids::TenantId,
2535 ) -> Result<(), DispatchError> {
2536 use actionqueue_core::mutation::{MutationCommand, RoleAssignCommand};
2537
2538 let seq = self.next_sequence()?;
2539 let ts = self.clock.now();
2540 let _ = self
2541 .authority
2542 .submit_command(
2543 MutationCommand::RoleAssign(RoleAssignCommand::new(
2544 seq,
2545 actor_id,
2546 role.clone(),
2547 tenant_id,
2548 ts,
2549 )),
2550 DurabilityPolicy::Immediate,
2551 )
2552 .map_err(DispatchError::Authority)?;
2553
2554 self.rbac_enforcer.assign_role(actor_id, role, tenant_id);
2555 Ok(())
2556 }
2557
2558 #[cfg(feature = "platform")]
2560 pub fn grant_capability(
2561 &mut self,
2562 actor_id: actionqueue_core::ids::ActorId,
2563 capability: actionqueue_core::platform::Capability,
2564 tenant_id: actionqueue_core::ids::TenantId,
2565 ) -> Result<(), DispatchError> {
2566 use actionqueue_core::mutation::{CapabilityGrantCommand, MutationCommand};
2567
2568 let seq = self.next_sequence()?;
2569 let ts = self.clock.now();
2570 let _ = self
2571 .authority
2572 .submit_command(
2573 MutationCommand::CapabilityGrant(CapabilityGrantCommand::new(
2574 seq,
2575 actor_id,
2576 capability.clone(),
2577 tenant_id,
2578 ts,
2579 )),
2580 DurabilityPolicy::Immediate,
2581 )
2582 .map_err(DispatchError::Authority)?;
2583
2584 self.rbac_enforcer.grant_capability(actor_id, capability, tenant_id);
2585 Ok(())
2586 }
2587
2588 #[cfg(feature = "platform")]
2590 pub fn revoke_capability(
2591 &mut self,
2592 actor_id: actionqueue_core::ids::ActorId,
2593 capability: actionqueue_core::platform::Capability,
2594 tenant_id: actionqueue_core::ids::TenantId,
2595 ) -> Result<(), DispatchError> {
2596 use actionqueue_core::mutation::{CapabilityRevokeCommand, MutationCommand};
2597
2598 let seq = self.next_sequence()?;
2599 let ts = self.clock.now();
2600 let _ = self
2601 .authority
2602 .submit_command(
2603 MutationCommand::CapabilityRevoke(CapabilityRevokeCommand::new(
2604 seq,
2605 actor_id,
2606 capability.clone(),
2607 tenant_id,
2608 ts,
2609 )),
2610 DurabilityPolicy::Immediate,
2611 )
2612 .map_err(DispatchError::Authority)?;
2613
2614 self.rbac_enforcer.revoke_capability(actor_id, &capability, tenant_id);
2615 Ok(())
2616 }
2617
2618 #[cfg(feature = "platform")]
2620 pub fn append_ledger_entry(
2621 &mut self,
2622 entry: actionqueue_core::platform::LedgerEntry,
2623 ) -> Result<(), DispatchError> {
2624 use actionqueue_core::mutation::{LedgerAppendCommand, MutationCommand};
2625
2626 let seq = self.next_sequence()?;
2627 let ts = self.clock.now();
2628 let _ = self
2629 .authority
2630 .submit_command(
2631 MutationCommand::LedgerAppend(LedgerAppendCommand::new(seq, entry.clone(), ts)),
2632 DurabilityPolicy::Immediate,
2633 )
2634 .map_err(DispatchError::Authority)?;
2635
2636 self.ledger.append(entry);
2637 Ok(())
2638 }
2639
2640 #[cfg(feature = "platform")]
2642 pub fn ledger(&self) -> &actionqueue_platform::AppendLedger {
2643 &self.ledger
2644 }
2645
2646 #[cfg(feature = "platform")]
2648 pub fn rbac(&self) -> &actionqueue_platform::RbacEnforcer {
2649 &self.rbac_enforcer
2650 }
2651
2652 #[cfg(feature = "actor")]
2654 pub fn actor_registry(&self) -> &actionqueue_actor::ActorRegistry {
2655 &self.actor_registry
2656 }
2657
2658 #[cfg(feature = "platform")]
2660 pub fn tenant_registry(&self) -> &actionqueue_platform::TenantRegistry {
2661 &self.tenant_registry
2662 }
2663
2664 pub fn into_authority(self) -> StorageMutationAuthority<W, ReplayReducer> {
2666 self.authority
2667 }
2668}