1use std::collections::{HashMap, HashSet};
14
15use actionqueue_core::budget::BudgetDimension;
16use actionqueue_core::ids::{ActorId, LedgerEntryId, RunId, TaskId, TenantId};
17use actionqueue_core::mutation::{AttemptOutcome, AttemptResultKind};
18use actionqueue_core::platform::{Capability, Role};
19use actionqueue_core::run::state::RunState;
20use actionqueue_core::run::transitions::is_valid_transition;
21use actionqueue_core::run::RunInstanceError;
22use actionqueue_core::subscription::{EventFilter, SubscriptionId};
23
24use crate::wal::event::{WalEvent, WalEventType};
25
26#[derive(Debug, Clone)]
28pub struct TaskRecord {
29 task_spec: actionqueue_core::task::task_spec::TaskSpec,
31 created_at: u64,
33 updated_at: Option<u64>,
35 canceled_at: Option<u64>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct RunStateHistoryEntry {
42 pub(crate) from: Option<RunState>,
44 pub(crate) to: RunState,
46 pub(crate) timestamp: u64,
48}
49
50impl RunStateHistoryEntry {
51 pub fn from(&self) -> Option<RunState> {
53 self.from
54 }
55
56 pub fn to(&self) -> RunState {
58 self.to
59 }
60
61 pub fn timestamp(&self) -> u64 {
63 self.timestamp
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct AttemptHistoryEntry {
70 pub(crate) attempt_id: actionqueue_core::ids::AttemptId,
72 pub(crate) started_at: u64,
74 pub(crate) finished_at: Option<u64>,
76 pub(crate) result: Option<AttemptResultKind>,
78 pub(crate) error: Option<String>,
80 pub(crate) output: Option<Vec<u8>>,
82}
83
84impl AttemptHistoryEntry {
85 pub fn attempt_id(&self) -> actionqueue_core::ids::AttemptId {
87 self.attempt_id
88 }
89
90 pub fn started_at(&self) -> u64 {
92 self.started_at
93 }
94
95 pub fn finished_at(&self) -> Option<u64> {
97 self.finished_at
98 }
99
100 pub fn result(&self) -> Option<AttemptResultKind> {
102 self.result
103 }
104
105 pub fn success(&self) -> Option<bool> {
109 self.result.map(|result| matches!(result, AttemptResultKind::Success))
110 }
111
112 pub fn error(&self) -> Option<&str> {
114 self.error.as_deref()
115 }
116
117 pub fn output(&self) -> Option<&[u8]> {
119 self.output.as_deref()
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct LeaseMetadata {
126 pub(crate) owner: String,
128 pub(crate) expiry: u64,
130 pub(crate) acquired_at: u64,
132 pub(crate) updated_at: u64,
134}
135
136impl LeaseMetadata {
137 pub fn owner(&self) -> &str {
139 &self.owner
140 }
141
142 pub fn expiry(&self) -> u64 {
144 self.expiry
145 }
146
147 pub fn acquired_at(&self) -> u64 {
149 self.acquired_at
150 }
151
152 pub fn updated_at(&self) -> u64 {
154 self.updated_at
155 }
156}
157
158#[derive(Debug, Clone)]
160pub struct ActorRecord {
161 pub actor_id: ActorId,
162 pub identity: String,
163 pub capabilities: Vec<String>,
164 pub department: Option<String>,
165 pub heartbeat_interval_secs: u64,
166 pub tenant_id: Option<TenantId>,
167 pub registered_at: u64,
168 pub last_heartbeat_at: Option<u64>,
169 pub deregistered_at: Option<u64>,
170}
171
172#[derive(Debug, Clone)]
174pub struct TenantRecord {
175 pub tenant_id: TenantId,
176 pub name: String,
177 pub created_at: u64,
178}
179
180#[derive(Debug, Clone)]
182pub struct RoleAssignmentRecord {
183 pub actor_id: ActorId,
184 pub role: Role,
185 pub tenant_id: TenantId,
186 pub assigned_at: u64,
187}
188
189#[derive(Debug, Clone)]
191pub struct CapabilityGrantRecord {
192 pub actor_id: ActorId,
193 pub capability: Capability,
194 pub tenant_id: TenantId,
195 pub granted_at: u64,
196 pub revoked_at: Option<u64>,
197}
198
199#[derive(Debug, Clone)]
201pub struct LedgerEntryRecord {
202 pub entry_id: LedgerEntryId,
203 pub tenant_id: TenantId,
204 pub ledger_key: String,
205 pub actor_id: Option<ActorId>,
206 pub payload: Vec<u8>,
207 pub timestamp: u64,
208}
209
210#[derive(Debug, Clone)]
212pub struct BudgetRecord {
213 pub dimension: BudgetDimension,
215 pub limit: u64,
217 pub consumed: u64,
219 pub allocated_at: u64,
221 pub exhausted: bool,
223}
224
225#[derive(Debug, Clone)]
227pub struct SubscriptionRecord {
228 pub subscription_id: SubscriptionId,
230 pub task_id: actionqueue_core::ids::TaskId,
232 pub filter: EventFilter,
234 pub created_at: u64,
236 pub triggered_at: Option<u64>,
238 pub canceled_at: Option<u64>,
240}
241
242impl TaskRecord {
243 pub fn task_spec(&self) -> &actionqueue_core::task::task_spec::TaskSpec {
245 &self.task_spec
246 }
247
248 pub fn created_at(&self) -> u64 {
250 self.created_at
251 }
252
253 pub fn updated_at(&self) -> Option<u64> {
255 self.updated_at
256 }
257
258 pub fn canceled_at(&self) -> Option<u64> {
260 self.canceled_at
261 }
262}
263
264#[derive(Debug, Clone)]
266pub struct ReplayReducer {
267 runs: HashMap<actionqueue_core::ids::RunId, RunState>,
269 tasks: HashMap<actionqueue_core::ids::TaskId, TaskRecord>,
271 run_instances:
273 HashMap<actionqueue_core::ids::RunId, actionqueue_core::run::run_instance::RunInstance>,
274 runs_by_task: HashMap<TaskId, Vec<RunId>>,
276 run_history: HashMap<actionqueue_core::ids::RunId, Vec<RunStateHistoryEntry>>,
278 attempt_history: HashMap<actionqueue_core::ids::RunId, Vec<AttemptHistoryEntry>>,
280 leases: HashMap<actionqueue_core::ids::RunId, (String, u64)>,
282 lease_metadata: HashMap<actionqueue_core::ids::RunId, LeaseMetadata>,
284 latest_sequence: u64,
286 task_canceled_at: HashMap<TaskId, u64>,
288 engine_paused: bool,
290 engine_paused_at: Option<u64>,
292 engine_resumed_at: Option<u64>,
294 dependency_declarations: HashMap<TaskId, HashSet<TaskId>>,
298 budgets: HashMap<(actionqueue_core::ids::TaskId, BudgetDimension), BudgetRecord>,
300 subscriptions: HashMap<SubscriptionId, SubscriptionRecord>,
302 actors: HashMap<ActorId, ActorRecord>,
304 tenants: HashMap<TenantId, TenantRecord>,
306 role_assignments: HashMap<(ActorId, TenantId), RoleAssignmentRecord>,
308 capability_grants: HashMap<(ActorId, String, TenantId), CapabilityGrantRecord>,
311 ledger_entries: Vec<LedgerEntryRecord>,
313}
314
315impl ReplayReducer {
316 pub fn new() -> Self {
318 ReplayReducer {
319 runs: HashMap::new(),
320 tasks: HashMap::new(),
321 run_instances: HashMap::new(),
322 runs_by_task: HashMap::new(),
323 run_history: HashMap::new(),
324 attempt_history: HashMap::new(),
325 leases: HashMap::new(),
326 lease_metadata: HashMap::new(),
327 latest_sequence: 0,
328 task_canceled_at: HashMap::new(),
329 engine_paused: false,
330 engine_paused_at: None,
331 engine_resumed_at: None,
332 dependency_declarations: HashMap::new(),
333 budgets: HashMap::new(),
334 subscriptions: HashMap::new(),
335 actors: HashMap::new(),
336 tenants: HashMap::new(),
337 role_assignments: HashMap::new(),
338 capability_grants: HashMap::new(),
339 ledger_entries: Vec::new(),
340 }
341 }
342
343 pub fn get_lease(&self, run_id: &actionqueue_core::ids::RunId) -> Option<&(String, u64)> {
346 self.leases.get(run_id)
347 }
348
349 pub fn get_lease_metadata(
352 &self,
353 run_id: &actionqueue_core::ids::RunId,
354 ) -> Option<&LeaseMetadata> {
355 self.lease_metadata.get(run_id)
356 }
357
358 pub fn get_run_state(&self, run_id: &actionqueue_core::ids::RunId) -> Option<&RunState> {
360 self.runs.get(run_id)
361 }
362
363 pub fn get_task(
365 &self,
366 task_id: &actionqueue_core::ids::TaskId,
367 ) -> Option<&actionqueue_core::task::task_spec::TaskSpec> {
368 self.tasks.get(task_id).map(TaskRecord::task_spec)
369 }
370
371 pub fn get_task_record(&self, task_id: &actionqueue_core::ids::TaskId) -> Option<&TaskRecord> {
373 self.tasks.get(task_id)
374 }
375
376 pub fn get_run_instance(
378 &self,
379 run_id: &actionqueue_core::ids::RunId,
380 ) -> Option<&actionqueue_core::run::run_instance::RunInstance> {
381 self.run_instances.get(run_id)
382 }
383
384 pub(crate) fn get_run_instance_mut(
389 &mut self,
390 run_id: actionqueue_core::ids::RunId,
391 ) -> Option<&mut actionqueue_core::run::run_instance::RunInstance> {
392 self.run_instances.get_mut(&run_id)
393 }
394
395 pub fn get_run_history(
397 &self,
398 run_id: &actionqueue_core::ids::RunId,
399 ) -> Option<&[RunStateHistoryEntry]> {
400 self.run_history.get(run_id).map(Vec::as_slice)
401 }
402
403 pub fn get_attempt_history(
405 &self,
406 run_id: &actionqueue_core::ids::RunId,
407 ) -> Option<&[AttemptHistoryEntry]> {
408 self.attempt_history.get(run_id).map(Vec::as_slice)
409 }
410
411 pub fn latest_sequence(&self) -> u64 {
413 self.latest_sequence
414 }
415
416 pub fn run_count(&self) -> usize {
418 self.run_instances.len()
419 }
420
421 pub fn task_count(&self) -> usize {
423 self.tasks.len()
424 }
425
426 pub fn run_instances(
431 &self,
432 ) -> impl Iterator<Item = &actionqueue_core::run::run_instance::RunInstance> {
433 self.run_instances.values()
434 }
435
436 pub fn run_ids_for_task(&self, task_id: TaskId) -> Vec<RunId> {
438 let mut run_ids = self.runs_by_task.get(&task_id).cloned().unwrap_or_default();
439 run_ids.sort();
440 run_ids
441 }
442
443 pub fn runs_for_task(
445 &self,
446 task_id: TaskId,
447 ) -> impl Iterator<Item = &actionqueue_core::run::run_instance::RunInstance> {
448 self.runs_by_task
449 .get(&task_id)
450 .into_iter()
451 .flat_map(|ids| ids.iter().filter_map(|id| self.run_instances.get(id)))
452 }
453
454 pub fn is_task_canceled(&self, task_id: TaskId) -> bool {
456 self.task_canceled_at.contains_key(&task_id)
457 }
458
459 pub fn task_canceled_at(&self, task_id: TaskId) -> Option<u64> {
461 self.task_canceled_at.get(&task_id).copied()
462 }
463
464 pub fn is_engine_paused(&self) -> bool {
466 self.engine_paused
467 }
468
469 pub fn engine_paused_at(&self) -> Option<u64> {
471 self.engine_paused_at
472 }
473
474 pub fn engine_resumed_at(&self) -> Option<u64> {
476 self.engine_resumed_at
477 }
478
479 pub fn dependency_declarations(&self) -> impl Iterator<Item = (TaskId, &HashSet<TaskId>)> + '_ {
484 self.dependency_declarations.iter().map(|(task_id, prereqs)| (*task_id, prereqs))
485 }
486
487 pub fn parent_child_mappings(&self) -> impl Iterator<Item = (TaskId, TaskId)> + '_ {
493 self.tasks.values().filter_map(|tr| {
494 tr.task_spec().parent_task_id().map(|parent| (tr.task_spec().id(), parent))
495 })
496 }
497
498 pub fn tasks(&self) -> impl Iterator<Item = &actionqueue_core::task::task_spec::TaskSpec> {
503 self.tasks.values().map(TaskRecord::task_spec)
504 }
505
506 pub fn task_records(&self) -> impl Iterator<Item = &TaskRecord> {
508 self.tasks.values()
509 }
510
511 pub fn apply(&mut self, event: &WalEvent) -> Result<(), ReplayReducerError> {
513 if event.sequence() <= self.latest_sequence {
517 return Err(ReplayReducerError::InvalidTransition);
518 }
519
520 match event.event() {
521 WalEventType::TaskCreated { task_spec, timestamp } => {
522 self.apply_task_created(task_spec, *timestamp)?;
523 }
524 WalEventType::RunCreated { run_instance } => {
525 self.apply_run_created(run_instance)?;
526 }
527 WalEventType::RunStateChanged { run_id, previous_state, new_state, timestamp } => {
528 self.apply_run_state_changed(run_id, previous_state, new_state, *timestamp)?;
529 }
530 WalEventType::AttemptStarted { run_id, attempt_id, timestamp } => {
531 self.apply_attempt_started(run_id, attempt_id, *timestamp)?;
532 }
533 WalEventType::AttemptFinished {
534 run_id,
535 attempt_id,
536 result,
537 error,
538 output,
539 timestamp,
540 } => {
541 let outcome =
542 AttemptOutcome::from_raw_parts(*result, error.clone(), output.clone())
543 .unwrap_or_else(|e| {
544 tracing::warn!(
545 "WAL replay: invalid attempt outcome shape: {e}; falling back to \
546 safe reconstruction"
547 );
548 match result {
549 actionqueue_core::mutation::AttemptResultKind::Success => {
550 AttemptOutcome::success()
551 }
552 actionqueue_core::mutation::AttemptResultKind::Failure => {
553 AttemptOutcome::failure(
554 error
555 .clone()
556 .unwrap_or_else(|| "unknown failure".to_string()),
557 )
558 }
559 actionqueue_core::mutation::AttemptResultKind::Timeout => {
560 AttemptOutcome::timeout(
561 error
562 .clone()
563 .unwrap_or_else(|| "unknown timeout".to_string()),
564 )
565 }
566 actionqueue_core::mutation::AttemptResultKind::Suspended => {
567 AttemptOutcome::suspended()
568 }
569 }
570 });
571 self.apply_attempt_finished(run_id, attempt_id, outcome, *timestamp)?;
572 }
573 WalEventType::TaskCanceled { task_id, timestamp } => {
574 self.apply_task_canceled(task_id, *timestamp)?;
575 }
576 WalEventType::RunCanceled { run_id, timestamp } => {
577 self.apply_run_canceled(run_id, *timestamp)?;
578 }
579 WalEventType::LeaseAcquired { run_id, owner, expiry, timestamp } => {
580 self.apply_lease_acquired(run_id, owner.clone(), *expiry, *timestamp)?;
581 }
582 WalEventType::LeaseHeartbeat { run_id, owner, expiry, timestamp } => {
583 self.apply_lease_heartbeat(run_id, owner.clone(), *expiry, *timestamp)?;
584 }
585 WalEventType::LeaseExpired { run_id, owner, expiry, timestamp } => {
586 self.apply_lease_expired(run_id, owner.clone(), *expiry, *timestamp)?;
587 }
588 WalEventType::LeaseReleased { run_id, owner, expiry, timestamp } => {
589 self.apply_lease_released(run_id, owner.clone(), *expiry, *timestamp)?;
590 }
591 WalEventType::EnginePaused { timestamp } => {
592 self.apply_engine_paused(*timestamp)?;
593 }
594 WalEventType::EngineResumed { timestamp } => {
595 self.apply_engine_resumed(*timestamp)?;
596 }
597 WalEventType::DependencyDeclared { task_id, depends_on, .. } => {
598 self.apply_dependency_declared(*task_id, depends_on);
599 }
600 WalEventType::RunSuspended { run_id, reason: _, timestamp } => {
601 self.apply_run_state_changed(
602 run_id,
603 &RunState::Running,
604 &RunState::Suspended,
605 *timestamp,
606 )?;
607 }
608 WalEventType::RunResumed { run_id, timestamp } => {
609 self.apply_run_state_changed(
610 run_id,
611 &RunState::Suspended,
612 &RunState::Ready,
613 *timestamp,
614 )?;
615 }
616 WalEventType::BudgetAllocated { task_id, dimension, limit, timestamp } => {
617 self.apply_budget_allocated(*task_id, *dimension, *limit, *timestamp);
618 }
619 WalEventType::BudgetConsumed { task_id, dimension, amount, timestamp } => {
620 self.apply_budget_consumed(*task_id, *dimension, *amount, *timestamp);
621 }
622 WalEventType::BudgetExhausted { task_id, dimension, .. } => {
623 if let Some(record) = self.budgets.get_mut(&(*task_id, *dimension)) {
628 record.exhausted = true;
629 }
630 }
631 WalEventType::BudgetReplenished { task_id, dimension, new_limit, timestamp } => {
632 self.apply_budget_replenished(*task_id, *dimension, *new_limit, *timestamp);
633 }
634 WalEventType::SubscriptionCreated { subscription_id, task_id, filter, timestamp } => {
635 self.apply_subscription_created(
636 *subscription_id,
637 *task_id,
638 filter.clone(),
639 *timestamp,
640 );
641 }
642 WalEventType::SubscriptionTriggered { subscription_id, timestamp } => {
643 self.apply_subscription_triggered(*subscription_id, *timestamp);
644 }
645 WalEventType::SubscriptionCanceled { subscription_id, timestamp } => {
646 self.apply_subscription_canceled(*subscription_id, *timestamp);
647 }
648
649 WalEventType::ActorRegistered {
651 actor_id,
652 identity,
653 capabilities,
654 department,
655 heartbeat_interval_secs,
656 tenant_id,
657 timestamp,
658 } => {
659 self.actors.insert(
660 *actor_id,
661 ActorRecord {
662 actor_id: *actor_id,
663 identity: identity.clone(),
664 capabilities: capabilities.clone(),
665 department: department.clone(),
666 heartbeat_interval_secs: *heartbeat_interval_secs,
667 tenant_id: *tenant_id,
668 registered_at: *timestamp,
669 last_heartbeat_at: None,
670 deregistered_at: None,
671 },
672 );
673 }
674 WalEventType::ActorDeregistered { actor_id, timestamp } => {
675 if let Some(record) = self.actors.get_mut(actor_id) {
676 record.deregistered_at = Some(*timestamp);
677 }
678 }
679 WalEventType::ActorHeartbeat { actor_id, timestamp } => {
680 if let Some(record) = self.actors.get_mut(actor_id) {
681 record.last_heartbeat_at = Some(*timestamp);
682 }
683 }
684
685 WalEventType::TenantCreated { tenant_id, name, timestamp } => {
687 self.tenants.insert(
688 *tenant_id,
689 TenantRecord {
690 tenant_id: *tenant_id,
691 name: name.clone(),
692 created_at: *timestamp,
693 },
694 );
695 }
696 WalEventType::RoleAssigned { actor_id, role, tenant_id, timestamp } => {
697 self.role_assignments.insert(
698 (*actor_id, *tenant_id),
699 RoleAssignmentRecord {
700 actor_id: *actor_id,
701 role: role.clone(),
702 tenant_id: *tenant_id,
703 assigned_at: *timestamp,
704 },
705 );
706 }
707 WalEventType::CapabilityGranted { actor_id, capability, tenant_id, timestamp } => {
708 let key = capability_key(capability);
709 self.capability_grants.insert(
710 (*actor_id, key.clone(), *tenant_id),
711 CapabilityGrantRecord {
712 actor_id: *actor_id,
713 capability: capability.clone(),
714 tenant_id: *tenant_id,
715 granted_at: *timestamp,
716 revoked_at: None,
717 },
718 );
719 }
720 WalEventType::CapabilityRevoked { actor_id, capability, tenant_id, timestamp } => {
721 let key = capability_key(capability);
722 if let Some(record) = self.capability_grants.get_mut(&(*actor_id, key, *tenant_id))
723 {
724 record.revoked_at = Some(*timestamp);
725 }
726 }
727 WalEventType::LedgerEntryAppended {
728 entry_id,
729 tenant_id,
730 ledger_key,
731 actor_id,
732 payload,
733 timestamp,
734 } => {
735 self.ledger_entries.push(LedgerEntryRecord {
736 entry_id: *entry_id,
737 tenant_id: *tenant_id,
738 ledger_key: ledger_key.clone(),
739 actor_id: *actor_id,
740 payload: payload.clone(),
741 timestamp: *timestamp,
742 });
743 }
744 }
745
746 self.latest_sequence = event.sequence();
748
749 Ok(())
750 }
751
752 fn apply_task_created(
753 &mut self,
754 task_spec: &actionqueue_core::task::task_spec::TaskSpec,
755 timestamp: u64,
756 ) -> Result<(), ReplayReducerError> {
757 let task_id = task_spec.id();
758 if self.tasks.contains_key(&task_id) {
759 return Err(ReplayReducerError::DuplicateEvent);
760 }
761 self.tasks.insert(
762 task_id,
763 TaskRecord {
764 task_spec: task_spec.clone(),
765 created_at: timestamp,
766 updated_at: None,
767 canceled_at: None,
768 },
769 );
770 Ok(())
771 }
772
773 fn apply_task_canceled(
774 &mut self,
775 task_id: &TaskId,
776 timestamp: u64,
777 ) -> Result<(), ReplayReducerError> {
778 let task_record = self.tasks.get_mut(task_id).ok_or(ReplayReducerError::TaskCausality(
779 TaskCausalityError::UnknownTask { task_id: *task_id },
780 ))?;
781
782 if let Some(existing_timestamp) = task_record.canceled_at {
783 return Err(ReplayReducerError::TaskCausality(TaskCausalityError::AlreadyCanceled {
784 task_id: *task_id,
785 first_canceled_at: existing_timestamp,
786 duplicate_canceled_at: timestamp,
787 }));
788 }
789
790 task_record.canceled_at = Some(timestamp);
791 self.task_canceled_at.insert(*task_id, timestamp);
792 Ok(())
793 }
794
795 fn apply_run_created(
796 &mut self,
797 run_instance: &actionqueue_core::run::run_instance::RunInstance,
798 ) -> Result<(), ReplayReducerError> {
799 let run_id = run_instance.id();
800 if self.runs.contains_key(&run_id) {
801 return Err(ReplayReducerError::DuplicateEvent);
802 }
803 if run_instance.state() != RunState::Scheduled {
805 return Err(ReplayReducerError::InvalidTransition);
806 }
807 let task_id = run_instance.task_id();
808 self.runs.insert(run_id, run_instance.state());
809 self.run_instances.insert(run_id, run_instance.clone());
810 self.runs_by_task.entry(task_id).or_default().push(run_id);
811 self.run_history.insert(
812 run_id,
813 vec![RunStateHistoryEntry {
814 from: None,
815 to: RunState::Scheduled,
816 timestamp: run_instance.created_at(),
817 }],
818 );
819 self.attempt_history.insert(run_id, Vec::new());
820 Ok(())
821 }
822
823 fn apply_run_state_changed(
824 &mut self,
825 run_id: &actionqueue_core::ids::RunId,
826 previous_state: &RunState,
827 new_state: &RunState,
828 timestamp: u64,
829 ) -> Result<(), ReplayReducerError> {
830 let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
832
833 if *current_state != *previous_state {
835 return Err(ReplayReducerError::InvalidTransition);
836 }
837
838 if !is_valid_transition(*previous_state, *new_state) {
840 return Err(ReplayReducerError::InvalidTransition);
841 }
842
843 if let Some(run_instance) = self.run_instances.get(run_id) {
847 if run_instance.state() != *current_state {
848 return Err(ReplayReducerError::CorruptedData);
849 }
850
851 if *new_state == RunState::Running && run_instance.current_attempt_id().is_some() {
852 return Err(ReplayReducerError::InvalidTransition);
853 }
854
855 if *current_state == RunState::Running
856 && *new_state != RunState::Running
857 && *new_state != RunState::Canceled
858 && run_instance.current_attempt_id().is_some()
859 {
860 return Err(ReplayReducerError::InvalidTransition);
861 }
862 }
863
864 self.runs.insert(*run_id, *new_state);
866
867 if let Some(run_instance) = self.run_instances.get_mut(run_id) {
869 run_instance.transition_to(*new_state).map_err(Self::map_run_instance_error)?;
870 run_instance.record_state_change_at(timestamp);
871 }
872
873 let history = self.run_history.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
874 history.push(RunStateHistoryEntry {
875 from: Some(*previous_state),
876 to: *new_state,
877 timestamp,
878 });
879
880 self.clear_lease_projection_if_terminal(run_id, *new_state)?;
881
882 Ok(())
883 }
884
885 fn apply_attempt_started(
886 &mut self,
887 run_id: &actionqueue_core::ids::RunId,
888 attempt_id: &actionqueue_core::ids::AttemptId,
889 timestamp: u64,
890 ) -> Result<(), ReplayReducerError> {
891 let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
893 if *current_state != RunState::Running {
894 return Err(ReplayReducerError::InvalidTransition);
895 }
896
897 let run_instance =
898 self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
899
900 if run_instance.state() != RunState::Running {
901 return Err(ReplayReducerError::CorruptedData);
902 }
903
904 if run_instance.current_attempt_id().is_some() {
905 return Err(ReplayReducerError::InvalidTransition);
906 }
907
908 run_instance.start_attempt(*attempt_id).map_err(Self::map_run_instance_error)?;
909
910 let attempts = self.attempt_history.entry(*run_id).or_default();
911 attempts.push(AttemptHistoryEntry {
912 attempt_id: *attempt_id,
913 started_at: timestamp,
914 finished_at: None,
915 result: None,
916 error: None,
917 output: None,
918 });
919
920 Ok(())
921 }
922
923 fn apply_attempt_finished(
924 &mut self,
925 run_id: &actionqueue_core::ids::RunId,
926 attempt_id: &actionqueue_core::ids::AttemptId,
927 outcome: AttemptOutcome,
928 timestamp: u64,
929 ) -> Result<(), ReplayReducerError> {
930 let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
932 if *current_state != RunState::Running {
933 return Err(ReplayReducerError::InvalidTransition);
934 }
935
936 let run_instance =
937 self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
938
939 if run_instance.state() != RunState::Running {
940 return Err(ReplayReducerError::CorruptedData);
941 }
942
943 run_instance.finish_attempt(*attempt_id).map_err(Self::map_run_instance_error)?;
944
945 let attempts =
946 self.attempt_history.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
947 let entry = attempts
948 .iter_mut()
949 .find(|entry| entry.attempt_id == *attempt_id)
950 .ok_or(ReplayReducerError::CorruptedData)?;
951
952 if entry.finished_at.is_some() {
953 return Err(ReplayReducerError::CorruptedData);
954 }
955
956 entry.finished_at = Some(timestamp);
957 let (result_kind, error_detail, output) = outcome.into_parts();
958 entry.result = Some(result_kind);
959 entry.error = error_detail;
960 entry.output = output;
961
962 Ok(())
963 }
964
965 fn apply_run_canceled(
966 &mut self,
967 run_id: &actionqueue_core::ids::RunId,
968 timestamp: u64,
969 ) -> Result<(), ReplayReducerError> {
970 let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
971 let previous_state = *current_state;
972
973 if !is_valid_transition(previous_state, RunState::Canceled) {
975 return Err(ReplayReducerError::InvalidTransition);
976 }
977
978 self.runs.insert(*run_id, RunState::Canceled);
979
980 let run_instance =
981 self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
982 run_instance.transition_to(RunState::Canceled).map_err(Self::map_run_instance_error)?;
983 run_instance.record_state_change_at(timestamp);
984
985 let history = self.run_history.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
986 history.push(RunStateHistoryEntry {
987 from: Some(previous_state),
988 to: RunState::Canceled,
989 timestamp,
990 });
991
992 self.clear_lease_projection_if_terminal(run_id, RunState::Canceled)?;
993
994 Ok(())
995 }
996
997 fn apply_lease_acquired(
998 &mut self,
999 run_id: &actionqueue_core::ids::RunId,
1000 owner: String,
1001 expiry: u64,
1002 timestamp: u64,
1003 ) -> Result<(), ReplayReducerError> {
1004 self.validate_lease_run_precondition(run_id, LeaseEventKind::Acquire)?;
1005
1006 if self.leases.contains_key(run_id) {
1007 return Err(ReplayReducerError::LeaseCausality(
1008 LeaseCausalityError::LeaseAlreadyActive { run_id: *run_id },
1009 ));
1010 }
1011
1012 let metadata_owner = owner.clone();
1013 self.leases.insert(*run_id, (owner, expiry));
1014 self.lease_metadata.insert(
1015 *run_id,
1016 LeaseMetadata {
1017 owner: metadata_owner,
1018 expiry,
1019 acquired_at: timestamp,
1020 updated_at: timestamp,
1021 },
1022 );
1023 Ok(())
1024 }
1025
1026 fn apply_lease_heartbeat(
1027 &mut self,
1028 run_id: &actionqueue_core::ids::RunId,
1029 owner: String,
1030 expiry: u64,
1031 timestamp: u64,
1032 ) -> Result<(), ReplayReducerError> {
1033 self.validate_lease_run_precondition(run_id, LeaseEventKind::Heartbeat)?;
1034
1035 let (current_owner, current_expiry) =
1036 self.active_lease_snapshot(run_id, LeaseEventKind::Heartbeat)?;
1037
1038 if owner != current_owner {
1039 return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::OwnerMismatch {
1040 run_id: *run_id,
1041 event: LeaseEventKind::Heartbeat,
1042 expected_owner: current_owner,
1043 actual_owner: owner,
1044 }));
1045 }
1046
1047 if expiry < current_expiry {
1048 return Err(ReplayReducerError::LeaseCausality(
1049 LeaseCausalityError::NonMonotonicHeartbeatExpiry {
1050 run_id: *run_id,
1051 previous_expiry: current_expiry,
1052 proposed_expiry: expiry,
1053 },
1054 ));
1055 }
1056
1057 self.leases.insert(*run_id, (current_owner, expiry));
1058 let metadata =
1059 self.lease_metadata.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
1060 metadata.expiry = expiry;
1061 metadata.updated_at = timestamp;
1062 Ok(())
1063 }
1064
1065 fn apply_lease_expired(
1066 &mut self,
1067 run_id: &actionqueue_core::ids::RunId,
1068 owner: String,
1069 expiry: u64,
1070 _timestamp: u64,
1071 ) -> Result<(), ReplayReducerError> {
1072 let current_state = self.validate_lease_run_precondition(run_id, LeaseEventKind::Expire)?;
1073
1074 self.validate_exact_lease_metadata_match(run_id, LeaseEventKind::Expire, &owner, expiry)?;
1075
1076 self.leases.remove(run_id);
1077 self.lease_metadata.remove(run_id);
1078
1079 if current_state == RunState::Leased {
1080 self.transition_run_to_ready_after_lease_close(run_id)?;
1081 }
1082
1083 Ok(())
1084 }
1085
1086 fn apply_lease_released(
1087 &mut self,
1088 run_id: &actionqueue_core::ids::RunId,
1089 owner: String,
1090 expiry: u64,
1091 _timestamp: u64,
1092 ) -> Result<(), ReplayReducerError> {
1093 let current_state =
1094 self.validate_lease_run_precondition(run_id, LeaseEventKind::Release)?;
1095
1096 self.validate_exact_lease_metadata_match(run_id, LeaseEventKind::Release, &owner, expiry)?;
1097
1098 self.leases.remove(run_id);
1099 self.lease_metadata.remove(run_id);
1100
1101 if current_state == RunState::Leased {
1102 self.transition_run_to_ready_after_lease_close(run_id)?;
1103 }
1104
1105 Ok(())
1106 }
1107
1108 fn validate_lease_run_precondition(
1111 &self,
1112 run_id: &RunId,
1113 event: LeaseEventKind,
1114 ) -> Result<RunState, ReplayReducerError> {
1115 let run_state =
1116 self.runs.get(run_id).copied().ok_or(ReplayReducerError::LeaseCausality(
1117 LeaseCausalityError::UnknownRun { run_id: *run_id, event },
1118 ))?;
1119
1120 if !Self::is_allowed_lease_event_state(event, run_state) {
1121 return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::InvalidRunState {
1122 run_id: *run_id,
1123 event,
1124 state: run_state,
1125 }));
1126 }
1127
1128 Ok(run_state)
1129 }
1130
1131 fn is_allowed_lease_event_state(event: LeaseEventKind, run_state: RunState) -> bool {
1133 match event {
1134 LeaseEventKind::Acquire => matches!(run_state, RunState::Ready | RunState::Leased),
1135 LeaseEventKind::Heartbeat => matches!(run_state, RunState::Leased | RunState::Running),
1136 LeaseEventKind::Expire | LeaseEventKind::Release => {
1137 matches!(run_state, RunState::Ready | RunState::Leased | RunState::Running)
1138 }
1139 }
1140 }
1141
1142 fn active_lease_snapshot(
1144 &self,
1145 run_id: &RunId,
1146 event: LeaseEventKind,
1147 ) -> Result<(String, u64), ReplayReducerError> {
1148 self.leases.get(run_id).cloned().ok_or(ReplayReducerError::LeaseCausality(
1149 LeaseCausalityError::MissingActiveLease { run_id: *run_id, event },
1150 ))
1151 }
1152
1153 fn validate_exact_lease_metadata_match(
1155 &self,
1156 run_id: &RunId,
1157 event: LeaseEventKind,
1158 owner: &str,
1159 expiry: u64,
1160 ) -> Result<(), ReplayReducerError> {
1161 let (expected_owner, expected_expiry) = self.active_lease_snapshot(run_id, event)?;
1162
1163 if owner != expected_owner {
1164 return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::OwnerMismatch {
1165 run_id: *run_id,
1166 event,
1167 expected_owner,
1168 actual_owner: owner.to_string(),
1169 }));
1170 }
1171
1172 if expiry != expected_expiry {
1173 return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::ExpiryMismatch {
1174 run_id: *run_id,
1175 event,
1176 expected_expiry,
1177 actual_expiry: expiry,
1178 }));
1179 }
1180
1181 Ok(())
1182 }
1183
1184 fn transition_run_to_ready_after_lease_close(
1186 &mut self,
1187 run_id: &RunId,
1188 ) -> Result<(), ReplayReducerError> {
1189 let run_instance =
1190 self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
1191
1192 if run_instance.state() != RunState::Leased {
1193 return Err(ReplayReducerError::CorruptedData);
1194 }
1195
1196 run_instance.transition_to(RunState::Ready).map_err(Self::map_run_instance_error)?;
1197 self.runs.insert(*run_id, RunState::Ready);
1198 Ok(())
1199 }
1200
1201 fn apply_engine_paused(&mut self, timestamp: u64) -> Result<(), ReplayReducerError> {
1202 if self.engine_paused {
1203 return Err(ReplayReducerError::EngineControlCausality(
1204 EngineControlCausalityError::AlreadyPaused {
1205 first_paused_at: self.engine_paused_at,
1206 duplicate_paused_at: timestamp,
1207 },
1208 ));
1209 }
1210
1211 self.engine_paused = true;
1212 self.engine_paused_at = Some(timestamp);
1213 self.engine_resumed_at = None;
1214 Ok(())
1215 }
1216
1217 fn apply_engine_resumed(&mut self, timestamp: u64) -> Result<(), ReplayReducerError> {
1218 if !self.engine_paused {
1219 return Err(ReplayReducerError::EngineControlCausality(
1220 EngineControlCausalityError::NotPaused { attempted_resumed_at: timestamp },
1221 ));
1222 }
1223
1224 if let Some(paused_at) = self.engine_paused_at {
1225 if timestamp < paused_at {
1226 return Err(ReplayReducerError::EngineControlCausality(
1227 EngineControlCausalityError::ResumeBeforePause {
1228 paused_at,
1229 resumed_at: timestamp,
1230 },
1231 ));
1232 }
1233 }
1234
1235 self.engine_paused = false;
1236 self.engine_resumed_at = Some(timestamp);
1237 Ok(())
1238 }
1239
1240 fn apply_dependency_declared(&mut self, task_id: TaskId, depends_on: &[TaskId]) {
1241 if !self.tasks.contains_key(&task_id) {
1242 tracing::warn!(
1243 %task_id,
1244 "dependency declaration for unknown task during WAL replay"
1245 );
1246 }
1247 for prereq in depends_on {
1248 if !self.tasks.contains_key(prereq) {
1249 tracing::warn!(
1250 %task_id,
1251 prereq_id = %prereq,
1252 "dependency prerequisite references unknown task during WAL replay"
1253 );
1254 }
1255 }
1256 let entry = self.dependency_declarations.entry(task_id).or_default();
1258 for &prereq in depends_on {
1259 entry.insert(prereq);
1260 }
1261 }
1262
1263 fn clear_lease_projection_if_terminal(
1266 &mut self,
1267 run_id: &RunId,
1268 new_state: RunState,
1269 ) -> Result<(), ReplayReducerError> {
1270 if !new_state.is_terminal() {
1271 return Ok(());
1272 }
1273
1274 self.leases.remove(run_id);
1275 self.lease_metadata.remove(run_id);
1276
1277 Ok(())
1278 }
1279
1280 pub fn trim_terminal_history(&mut self) {
1286 let terminal_run_ids: Vec<RunId> =
1287 self.runs.iter().filter(|(_, state)| state.is_terminal()).map(|(id, _)| *id).collect();
1288
1289 for run_id in &terminal_run_ids {
1290 self.run_history.remove(run_id);
1291 self.attempt_history.remove(run_id);
1292 self.lease_metadata.remove(run_id);
1293 }
1294 }
1295
1296 pub(crate) fn set_run_history(&mut self, run_id: RunId, history: Vec<RunStateHistoryEntry>) {
1298 self.run_history.insert(run_id, history);
1299 }
1300
1301 pub(crate) fn set_attempt_history(
1303 &mut self,
1304 run_id: RunId,
1305 attempts: Vec<AttemptHistoryEntry>,
1306 ) {
1307 self.attempt_history.insert(run_id, attempts);
1308 }
1309
1310 pub(crate) fn set_lease_for_bootstrap(&mut self, run_id: RunId, metadata: LeaseMetadata) {
1317 self.leases.insert(run_id, (metadata.owner.clone(), metadata.expiry));
1318 self.lease_metadata.insert(run_id, metadata);
1319 }
1320
1321 pub(crate) fn set_latest_sequence_for_bootstrap(&mut self, sequence: u64) {
1323 self.latest_sequence = sequence;
1324 }
1325
1326 fn map_run_instance_error(error: RunInstanceError) -> ReplayReducerError {
1327 match error {
1328 RunInstanceError::AttemptCountOverflow { .. } => ReplayReducerError::CorruptedData,
1329 _ => ReplayReducerError::InvalidTransition,
1330 }
1331 }
1332
1333 pub fn budgets(
1335 &self,
1336 ) -> impl Iterator<Item = (&(actionqueue_core::ids::TaskId, BudgetDimension), &BudgetRecord)>
1337 {
1338 self.budgets.iter()
1339 }
1340
1341 pub fn get_budget(
1343 &self,
1344 task_id: &actionqueue_core::ids::TaskId,
1345 dimension: BudgetDimension,
1346 ) -> Option<&BudgetRecord> {
1347 self.budgets.get(&(*task_id, dimension))
1348 }
1349
1350 pub fn is_budget_exhausted(
1352 &self,
1353 task_id: actionqueue_core::ids::TaskId,
1354 dimension: BudgetDimension,
1355 ) -> bool {
1356 self.budgets.get(&(task_id, dimension)).is_some_and(|r| r.exhausted)
1357 }
1358
1359 pub fn budget_allocation_exists(
1361 &self,
1362 task_id: actionqueue_core::ids::TaskId,
1363 dimension: BudgetDimension,
1364 ) -> bool {
1365 self.budgets.contains_key(&(task_id, dimension))
1366 }
1367
1368 pub fn subscriptions(&self) -> impl Iterator<Item = (&SubscriptionId, &SubscriptionRecord)> {
1370 self.subscriptions.iter()
1371 }
1372
1373 pub fn get_subscription(
1375 &self,
1376 subscription_id: &SubscriptionId,
1377 ) -> Option<&SubscriptionRecord> {
1378 self.subscriptions.get(subscription_id)
1379 }
1380
1381 pub fn subscription_exists(&self, subscription_id: SubscriptionId) -> bool {
1383 self.subscriptions.contains_key(&subscription_id)
1384 }
1385
1386 pub fn is_subscription_canceled(&self, subscription_id: SubscriptionId) -> bool {
1388 self.subscriptions.get(&subscription_id).is_some_and(|r| r.canceled_at.is_some())
1389 }
1390
1391 pub fn actors(&self) -> impl Iterator<Item = (&ActorId, &ActorRecord)> {
1393 self.actors.iter()
1394 }
1395
1396 pub fn get_actor(&self, actor_id: &ActorId) -> Option<&ActorRecord> {
1398 self.actors.get(actor_id)
1399 }
1400
1401 pub fn is_actor_active(&self, actor_id: ActorId) -> bool {
1403 self.actors.get(&actor_id).is_some_and(|r| r.deregistered_at.is_none())
1404 }
1405
1406 pub fn tenants(&self) -> impl Iterator<Item = (&TenantId, &TenantRecord)> {
1408 self.tenants.iter()
1409 }
1410
1411 pub fn get_tenant(&self, tenant_id: &TenantId) -> Option<&TenantRecord> {
1413 self.tenants.get(tenant_id)
1414 }
1415
1416 pub fn tenant_exists(&self, tenant_id: TenantId) -> bool {
1418 self.tenants.contains_key(&tenant_id)
1419 }
1420
1421 pub fn get_role_assignment(
1423 &self,
1424 actor_id: ActorId,
1425 tenant_id: TenantId,
1426 ) -> Option<&RoleAssignmentRecord> {
1427 self.role_assignments.get(&(actor_id, tenant_id))
1428 }
1429
1430 pub fn actor_has_capability(
1432 &self,
1433 actor_id: ActorId,
1434 capability_key: &str,
1435 tenant_id: TenantId,
1436 ) -> bool {
1437 let key = (actor_id, capability_key.to_string(), tenant_id);
1438 self.capability_grants.get(&key).is_some_and(|r| r.revoked_at.is_none())
1439 }
1440
1441 pub fn role_assignments(&self) -> impl Iterator<Item = &RoleAssignmentRecord> {
1444 self.role_assignments.values()
1445 }
1446
1447 pub fn capability_grants(&self) -> impl Iterator<Item = &CapabilityGrantRecord> {
1449 self.capability_grants.values()
1450 }
1451
1452 pub fn ledger_entries(&self) -> impl Iterator<Item = &LedgerEntryRecord> {
1453 self.ledger_entries.iter()
1454 }
1455
1456 fn apply_budget_allocated(
1457 &mut self,
1458 task_id: actionqueue_core::ids::TaskId,
1459 dimension: BudgetDimension,
1460 limit: u64,
1461 timestamp: u64,
1462 ) {
1463 self.budgets.insert(
1464 (task_id, dimension),
1465 BudgetRecord {
1466 dimension,
1467 limit,
1468 consumed: 0,
1469 allocated_at: timestamp,
1470 exhausted: false,
1471 },
1472 );
1473 }
1474
1475 fn apply_budget_consumed(
1476 &mut self,
1477 task_id: actionqueue_core::ids::TaskId,
1478 dimension: BudgetDimension,
1479 amount: u64,
1480 _timestamp: u64,
1481 ) {
1482 if let Some(record) = self.budgets.get_mut(&(task_id, dimension)) {
1483 record.consumed = record.consumed.saturating_add(amount);
1484 if record.consumed >= record.limit {
1485 record.exhausted = true;
1486 }
1487 }
1488 }
1489
1490 fn apply_budget_replenished(
1497 &mut self,
1498 task_id: actionqueue_core::ids::TaskId,
1499 dimension: BudgetDimension,
1500 new_limit: u64,
1501 _timestamp: u64,
1502 ) {
1503 if let Some(record) = self.budgets.get_mut(&(task_id, dimension)) {
1504 record.limit = new_limit;
1505 record.consumed = 0;
1506 record.exhausted = false;
1507 }
1508 }
1509
1510 fn apply_subscription_created(
1511 &mut self,
1512 subscription_id: SubscriptionId,
1513 task_id: actionqueue_core::ids::TaskId,
1514 filter: EventFilter,
1515 timestamp: u64,
1516 ) {
1517 self.subscriptions.insert(
1518 subscription_id,
1519 SubscriptionRecord {
1520 subscription_id,
1521 task_id,
1522 filter,
1523 created_at: timestamp,
1524 triggered_at: None,
1525 canceled_at: None,
1526 },
1527 );
1528 }
1529
1530 fn apply_subscription_triggered(&mut self, subscription_id: SubscriptionId, timestamp: u64) {
1531 if let Some(record) = self.subscriptions.get_mut(&subscription_id) {
1532 record.triggered_at = Some(timestamp);
1533 }
1534 }
1535
1536 fn apply_subscription_canceled(&mut self, subscription_id: SubscriptionId, timestamp: u64) {
1537 if let Some(record) = self.subscriptions.get_mut(&subscription_id) {
1538 record.canceled_at = Some(timestamp);
1539 }
1540 }
1541}
1542
1543impl Default for ReplayReducer {
1544 fn default() -> Self {
1545 Self::new()
1546 }
1547}
1548
1549#[derive(Debug, Clone, PartialEq, Eq)]
1551pub enum ReplayReducerError {
1552 InvalidTransition,
1554 DuplicateEvent,
1556 CorruptedData,
1558 LeaseCausality(LeaseCausalityError),
1560 TaskCausality(TaskCausalityError),
1562 EngineControlCausality(EngineControlCausalityError),
1564}
1565
1566#[derive(Debug, Clone, PartialEq, Eq)]
1568pub enum EngineControlCausalityError {
1569 AlreadyPaused {
1571 first_paused_at: Option<u64>,
1573 duplicate_paused_at: u64,
1575 },
1576 NotPaused {
1578 attempted_resumed_at: u64,
1580 },
1581 ResumeBeforePause {
1583 paused_at: u64,
1585 resumed_at: u64,
1587 },
1588}
1589
1590impl std::fmt::Display for EngineControlCausalityError {
1591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1592 match self {
1593 EngineControlCausalityError::AlreadyPaused { first_paused_at, duplicate_paused_at } => {
1594 if let Some(first_paused_at) = first_paused_at {
1595 write!(
1596 f,
1597 "engine pause rejected: already paused at {first_paused_at}, duplicate \
1598 timestamp {duplicate_paused_at}"
1599 )
1600 } else {
1601 write!(
1602 f,
1603 "engine pause rejected: already paused, duplicate timestamp \
1604 {duplicate_paused_at}"
1605 )
1606 }
1607 }
1608 EngineControlCausalityError::NotPaused { attempted_resumed_at } => {
1609 write!(
1610 f,
1611 "engine resume rejected: engine not paused at timestamp {attempted_resumed_at}"
1612 )
1613 }
1614 EngineControlCausalityError::ResumeBeforePause { paused_at, resumed_at } => {
1615 write!(
1616 f,
1617 "engine resume rejected: resumed_at {resumed_at} precedes paused_at \
1618 {paused_at}"
1619 )
1620 }
1621 }
1622 }
1623}
1624
1625#[derive(Debug, Clone, PartialEq, Eq)]
1627pub enum TaskCausalityError {
1628 UnknownTask {
1630 task_id: TaskId,
1632 },
1633 AlreadyCanceled {
1635 task_id: TaskId,
1637 first_canceled_at: u64,
1639 duplicate_canceled_at: u64,
1641 },
1642}
1643
1644impl std::fmt::Display for TaskCausalityError {
1645 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1646 match self {
1647 TaskCausalityError::UnknownTask { task_id } => {
1648 write!(f, "task canceled rejected: unknown task {task_id}")
1649 }
1650 TaskCausalityError::AlreadyCanceled {
1651 task_id,
1652 first_canceled_at,
1653 duplicate_canceled_at,
1654 } => {
1655 write!(
1656 f,
1657 "task canceled rejected for task {task_id}: already canceled at \
1658 {first_canceled_at}, duplicate timestamp {duplicate_canceled_at}"
1659 )
1660 }
1661 }
1662 }
1663}
1664
1665#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1667pub enum LeaseEventKind {
1668 Acquire,
1670 Heartbeat,
1672 Expire,
1674 Release,
1676}
1677
1678impl std::fmt::Display for LeaseEventKind {
1679 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1680 match self {
1681 LeaseEventKind::Acquire => write!(f, "lease acquire"),
1682 LeaseEventKind::Heartbeat => write!(f, "lease heartbeat"),
1683 LeaseEventKind::Expire => write!(f, "lease expire"),
1684 LeaseEventKind::Release => write!(f, "lease release"),
1685 }
1686 }
1687}
1688
1689#[derive(Debug, Clone, PartialEq, Eq)]
1691pub enum LeaseCausalityError {
1692 UnknownRun {
1694 run_id: RunId,
1696 event: LeaseEventKind,
1698 },
1699 InvalidRunState {
1701 run_id: RunId,
1703 event: LeaseEventKind,
1705 state: RunState,
1707 },
1708 MissingActiveLease {
1710 run_id: RunId,
1712 event: LeaseEventKind,
1714 },
1715 LeaseAlreadyActive {
1717 run_id: RunId,
1719 },
1720 OwnerMismatch {
1722 run_id: RunId,
1724 event: LeaseEventKind,
1726 expected_owner: String,
1728 actual_owner: String,
1730 },
1731 ExpiryMismatch {
1733 run_id: RunId,
1735 event: LeaseEventKind,
1737 expected_expiry: u64,
1739 actual_expiry: u64,
1741 },
1742 NonMonotonicHeartbeatExpiry {
1744 run_id: RunId,
1746 previous_expiry: u64,
1748 proposed_expiry: u64,
1750 },
1751}
1752
1753impl std::fmt::Display for LeaseCausalityError {
1754 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1755 match self {
1756 LeaseCausalityError::UnknownRun { run_id, event } => {
1757 write!(f, "{event} rejected: unknown run {run_id}")
1758 }
1759 LeaseCausalityError::InvalidRunState { run_id, event, state } => {
1760 write!(f, "{event} rejected for run {run_id}: invalid state {state:?}")
1761 }
1762 LeaseCausalityError::MissingActiveLease { run_id, event } => {
1763 write!(f, "{event} rejected for run {run_id}: missing active lease")
1764 }
1765 LeaseCausalityError::LeaseAlreadyActive { run_id } => {
1766 write!(f, "lease acquire rejected for run {run_id}: lease already active")
1767 }
1768 LeaseCausalityError::OwnerMismatch { run_id, event, expected_owner, actual_owner } => {
1769 write!(
1770 f,
1771 "{event} rejected for run {run_id}: owner mismatch expected={expected_owner} \
1772 actual={actual_owner}"
1773 )
1774 }
1775 LeaseCausalityError::ExpiryMismatch {
1776 run_id,
1777 event,
1778 expected_expiry,
1779 actual_expiry,
1780 } => {
1781 write!(
1782 f,
1783 "{event} rejected for run {run_id}: expiry mismatch \
1784 expected={expected_expiry} actual={actual_expiry}"
1785 )
1786 }
1787 LeaseCausalityError::NonMonotonicHeartbeatExpiry {
1788 run_id,
1789 previous_expiry,
1790 proposed_expiry,
1791 } => {
1792 write!(
1793 f,
1794 "lease heartbeat rejected for run {run_id}: expiry regression \
1795 previous={previous_expiry} proposed={proposed_expiry}"
1796 )
1797 }
1798 }
1799 }
1800}
1801
1802impl std::fmt::Display for ReplayReducerError {
1803 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1804 match self {
1805 ReplayReducerError::InvalidTransition => {
1806 write!(f, "Invalid state transition during replay")
1807 }
1808 ReplayReducerError::DuplicateEvent => write!(f, "Duplicate event detected"),
1809 ReplayReducerError::CorruptedData => write!(f, "Corrupted event data"),
1810 ReplayReducerError::LeaseCausality(details) => {
1811 write!(f, "Lease causality violation during replay: {details}")
1812 }
1813 ReplayReducerError::TaskCausality(details) => {
1814 write!(f, "Task causality violation during replay: {details}")
1815 }
1816 ReplayReducerError::EngineControlCausality(details) => {
1817 write!(f, "Engine control causality violation during replay: {details}")
1818 }
1819 }
1820 }
1821}
1822
1823impl std::error::Error for ReplayReducerError {}
1824
1825fn capability_key(cap: &Capability) -> String {
1827 match cap {
1828 Capability::CanSubmit => "CanSubmit".to_string(),
1829 Capability::CanExecute => "CanExecute".to_string(),
1830 Capability::CanReview => "CanReview".to_string(),
1831 Capability::CanApprove => "CanApprove".to_string(),
1832 Capability::CanCancel => "CanCancel".to_string(),
1833 Capability::Custom(s) => format!("Custom:{s}"),
1834 }
1835}
1836
1837#[cfg(test)]
1838mod tests {
1839 use actionqueue_core::ids::{AttemptId, RunId, TaskId};
1840 use actionqueue_core::mutation::AttemptResultKind;
1841 use actionqueue_core::run::run_instance::RunInstance;
1842 use actionqueue_core::run::state::RunState;
1843 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
1844
1845 use super::*;
1846
1847 fn test_task_spec(id: u128) -> TaskSpec {
1848 TaskSpec::new(
1849 TaskId::from_uuid(uuid::Uuid::from_u128(id)),
1850 TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
1851 actionqueue_core::task::run_policy::RunPolicy::Once,
1852 actionqueue_core::task::constraints::TaskConstraints::default(),
1853 actionqueue_core::task::metadata::TaskMetadata::default(),
1854 )
1855 .expect("test task spec should be valid")
1856 }
1857
1858 fn event(seq: u64, event: WalEventType) -> WalEvent {
1859 WalEvent::new(seq, event)
1860 }
1861
1862 fn drive_run_to_completed(reducer: &mut ReplayReducer, run_id: RunId, seq_start: u64) -> u64 {
1865 let attempt_id =
1866 AttemptId::from_uuid(uuid::Uuid::from_u128(run_id.as_uuid().as_u128() + 1));
1867 let mut seq = seq_start;
1868
1869 reducer
1870 .apply(&event(
1871 seq,
1872 WalEventType::RunStateChanged {
1873 run_id,
1874 previous_state: RunState::Scheduled,
1875 new_state: RunState::Ready,
1876 timestamp: 100,
1877 },
1878 ))
1879 .expect("scheduled->ready");
1880 seq += 1;
1881
1882 reducer
1883 .apply(&event(
1884 seq,
1885 WalEventType::RunStateChanged {
1886 run_id,
1887 previous_state: RunState::Ready,
1888 new_state: RunState::Leased,
1889 timestamp: 200,
1890 },
1891 ))
1892 .expect("ready->leased");
1893 seq += 1;
1894
1895 reducer
1896 .apply(&event(
1897 seq,
1898 WalEventType::RunStateChanged {
1899 run_id,
1900 previous_state: RunState::Leased,
1901 new_state: RunState::Running,
1902 timestamp: 300,
1903 },
1904 ))
1905 .expect("leased->running");
1906 seq += 1;
1907
1908 reducer
1909 .apply(&event(seq, WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 400 }))
1910 .expect("attempt started");
1911 seq += 1;
1912
1913 reducer
1914 .apply(&event(
1915 seq,
1916 WalEventType::AttemptFinished {
1917 run_id,
1918 attempt_id,
1919 result: AttemptResultKind::Success,
1920 error: None,
1921 output: None,
1922 timestamp: 500,
1923 },
1924 ))
1925 .expect("attempt finished");
1926 seq += 1;
1927
1928 reducer
1929 .apply(&event(
1930 seq,
1931 WalEventType::RunStateChanged {
1932 run_id,
1933 previous_state: RunState::Running,
1934 new_state: RunState::Completed,
1935 timestamp: 600,
1936 },
1937 ))
1938 .expect("running->completed");
1939 seq += 1;
1940
1941 seq
1942 }
1943
1944 fn drive_run_to_failed(reducer: &mut ReplayReducer, run_id: RunId, seq_start: u64) -> u64 {
1947 let attempt_id =
1948 AttemptId::from_uuid(uuid::Uuid::from_u128(run_id.as_uuid().as_u128() + 1));
1949 let mut seq = seq_start;
1950
1951 reducer
1952 .apply(&event(
1953 seq,
1954 WalEventType::RunStateChanged {
1955 run_id,
1956 previous_state: RunState::Scheduled,
1957 new_state: RunState::Ready,
1958 timestamp: 100,
1959 },
1960 ))
1961 .expect("scheduled->ready");
1962 seq += 1;
1963
1964 reducer
1965 .apply(&event(
1966 seq,
1967 WalEventType::RunStateChanged {
1968 run_id,
1969 previous_state: RunState::Ready,
1970 new_state: RunState::Leased,
1971 timestamp: 200,
1972 },
1973 ))
1974 .expect("ready->leased");
1975 seq += 1;
1976
1977 reducer
1978 .apply(&event(
1979 seq,
1980 WalEventType::RunStateChanged {
1981 run_id,
1982 previous_state: RunState::Leased,
1983 new_state: RunState::Running,
1984 timestamp: 300,
1985 },
1986 ))
1987 .expect("leased->running");
1988 seq += 1;
1989
1990 reducer
1991 .apply(&event(seq, WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 400 }))
1992 .expect("attempt started");
1993 seq += 1;
1994
1995 reducer
1996 .apply(&event(
1997 seq,
1998 WalEventType::AttemptFinished {
1999 run_id,
2000 attempt_id,
2001 result: AttemptResultKind::Failure,
2002 error: Some("test failure".to_string()),
2003 output: None,
2004 timestamp: 500,
2005 },
2006 ))
2007 .expect("attempt finished");
2008 seq += 1;
2009
2010 reducer
2011 .apply(&event(
2012 seq,
2013 WalEventType::RunStateChanged {
2014 run_id,
2015 previous_state: RunState::Running,
2016 new_state: RunState::Failed,
2017 timestamp: 600,
2018 },
2019 ))
2020 .expect("running->failed");
2021 seq += 1;
2022
2023 seq
2024 }
2025
2026 #[test]
2027 fn trim_terminal_history_removes_completed_run_history() {
2028 let mut reducer = ReplayReducer::new();
2029 let task = test_task_spec(0xA001);
2030 let task_id = task.id();
2031 let run_id = RunId::from_uuid(uuid::Uuid::from_u128(0xB001));
2032
2033 reducer
2035 .apply(&event(1, WalEventType::TaskCreated { task_spec: task, timestamp: 10 }))
2036 .unwrap();
2037 let run = RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
2038 .expect("run should build");
2039 reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run })).unwrap();
2040
2041 drive_run_to_completed(&mut reducer, run_id, 3);
2043
2044 assert!(reducer.get_run_history(&run_id).is_some());
2046 assert!(reducer.get_attempt_history(&run_id).is_some());
2047
2048 reducer.trim_terminal_history();
2050
2051 assert!(reducer.get_run_history(&run_id).is_none());
2053 assert!(reducer.get_attempt_history(&run_id).is_none());
2054 assert!(reducer.get_lease_metadata(&run_id).is_none());
2055
2056 assert_eq!(reducer.get_run_state(&run_id), Some(&RunState::Completed));
2058 assert!(reducer.get_run_instance(&run_id).is_some());
2059 }
2060
2061 #[test]
2062 fn trim_terminal_history_removes_failed_and_canceled_run_history() {
2063 let mut reducer = ReplayReducer::new();
2064
2065 let task_f = test_task_spec(0xA010);
2067 let task_id_f = task_f.id();
2068 let run_id_f = RunId::from_uuid(uuid::Uuid::from_u128(0xB010));
2069
2070 reducer
2071 .apply(&event(1, WalEventType::TaskCreated { task_spec: task_f, timestamp: 10 }))
2072 .unwrap();
2073 let run_f = RunInstance::new_scheduled_with_id(run_id_f, task_id_f, 1000, 1000)
2074 .expect("run should build");
2075 reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run_f })).unwrap();
2076 let seq = drive_run_to_failed(&mut reducer, run_id_f, 3);
2077
2078 let task_c = test_task_spec(0xA020);
2080 let task_id_c = task_c.id();
2081 let run_id_c = RunId::from_uuid(uuid::Uuid::from_u128(0xB020));
2082
2083 reducer
2084 .apply(&event(seq, WalEventType::TaskCreated { task_spec: task_c, timestamp: 20 }))
2085 .unwrap();
2086 let run_c = RunInstance::new_scheduled_with_id(run_id_c, task_id_c, 1000, 1000)
2087 .expect("run should build");
2088 reducer.apply(&event(seq + 1, WalEventType::RunCreated { run_instance: run_c })).unwrap();
2089 reducer
2090 .apply(&event(
2091 seq + 2,
2092 WalEventType::RunStateChanged {
2093 run_id: run_id_c,
2094 previous_state: RunState::Scheduled,
2095 new_state: RunState::Ready,
2096 timestamp: 100,
2097 },
2098 ))
2099 .unwrap();
2100 reducer
2101 .apply(&event(seq + 3, WalEventType::RunCanceled { run_id: run_id_c, timestamp: 200 }))
2102 .unwrap();
2103
2104 assert!(reducer.get_run_history(&run_id_f).is_some());
2106 assert!(reducer.get_run_history(&run_id_c).is_some());
2107
2108 reducer.trim_terminal_history();
2109
2110 assert!(reducer.get_run_history(&run_id_f).is_none());
2112 assert!(reducer.get_attempt_history(&run_id_f).is_none());
2113 assert!(reducer.get_run_history(&run_id_c).is_none());
2114 assert!(reducer.get_attempt_history(&run_id_c).is_none());
2115
2116 assert_eq!(reducer.get_run_state(&run_id_f), Some(&RunState::Failed));
2118 assert_eq!(reducer.get_run_state(&run_id_c), Some(&RunState::Canceled));
2119 }
2120
2121 #[test]
2122 fn trim_terminal_history_preserves_active_run_history() {
2123 let mut reducer = ReplayReducer::new();
2124 let task = test_task_spec(0xA002);
2125 let task_id = task.id();
2126 let run_id = RunId::from_uuid(uuid::Uuid::from_u128(0xB002));
2127
2128 reducer
2130 .apply(&event(1, WalEventType::TaskCreated { task_spec: task, timestamp: 10 }))
2131 .unwrap();
2132 let run = RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
2133 .expect("run should build");
2134 reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run })).unwrap();
2135
2136 reducer
2138 .apply(&event(
2139 3,
2140 WalEventType::RunStateChanged {
2141 run_id,
2142 previous_state: RunState::Scheduled,
2143 new_state: RunState::Ready,
2144 timestamp: 100,
2145 },
2146 ))
2147 .unwrap();
2148
2149 let history = reducer.get_run_history(&run_id).expect("history should exist");
2151 assert_eq!(history.len(), 2);
2152
2153 reducer.trim_terminal_history();
2155
2156 let history = reducer.get_run_history(&run_id).expect("history should still exist");
2157 assert_eq!(history.len(), 2);
2158 assert!(reducer.get_attempt_history(&run_id).is_some());
2159 }
2160
2161 #[test]
2162 fn trim_terminal_history_no_op_when_no_terminal_runs() {
2163 let mut reducer = ReplayReducer::new();
2164 let task = test_task_spec(0xA003);
2165 let task_id = task.id();
2166 let run_id = RunId::from_uuid(uuid::Uuid::from_u128(0xB003));
2167
2168 reducer
2169 .apply(&event(1, WalEventType::TaskCreated { task_spec: task, timestamp: 10 }))
2170 .unwrap();
2171 let run = RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
2172 .expect("run should build");
2173 reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run })).unwrap();
2174
2175 assert_eq!(reducer.run_count(), 1);
2177 let run_history_len =
2178 reducer.get_run_history(&run_id).expect("run history should exist").len();
2179 let attempt_history_len =
2180 reducer.get_attempt_history(&run_id).expect("attempt history should exist").len();
2181
2182 reducer.trim_terminal_history();
2184
2185 assert_eq!(reducer.run_count(), 1);
2187 assert_eq!(
2188 reducer.get_run_history(&run_id).expect("run history should still exist").len(),
2189 run_history_len
2190 );
2191 assert_eq!(
2192 reducer.get_attempt_history(&run_id).expect("attempt history should still exist").len(),
2193 attempt_history_len
2194 );
2195 }
2196
2197 #[test]
2198 fn trim_terminal_history_mixed_terminal_and_active() {
2199 let mut reducer = ReplayReducer::new();
2200
2201 let task_t = test_task_spec(0xA004);
2203 let task_id_t = task_t.id();
2204 let run_id_terminal = RunId::from_uuid(uuid::Uuid::from_u128(0xB004));
2205
2206 reducer
2207 .apply(&event(1, WalEventType::TaskCreated { task_spec: task_t, timestamp: 10 }))
2208 .unwrap();
2209 let run_t = RunInstance::new_scheduled_with_id(run_id_terminal, task_id_t, 1000, 1000)
2210 .expect("run should build");
2211 reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run_t })).unwrap();
2212 let seq = drive_run_to_completed(&mut reducer, run_id_terminal, 3);
2213
2214 let task_a = test_task_spec(0xA005);
2216 let task_id_a = task_a.id();
2217 let run_id_active = RunId::from_uuid(uuid::Uuid::from_u128(0xB005));
2218
2219 reducer
2220 .apply(&event(seq, WalEventType::TaskCreated { task_spec: task_a, timestamp: 20 }))
2221 .unwrap();
2222 let run_a = RunInstance::new_scheduled_with_id(run_id_active, task_id_a, 1000, 1000)
2223 .expect("run should build");
2224 reducer.apply(&event(seq + 1, WalEventType::RunCreated { run_instance: run_a })).unwrap();
2225 reducer
2226 .apply(&event(
2227 seq + 2,
2228 WalEventType::RunStateChanged {
2229 run_id: run_id_active,
2230 previous_state: RunState::Scheduled,
2231 new_state: RunState::Ready,
2232 timestamp: 100,
2233 },
2234 ))
2235 .unwrap();
2236
2237 reducer.trim_terminal_history();
2239
2240 assert!(reducer.get_run_history(&run_id_terminal).is_none());
2242 assert!(reducer.get_attempt_history(&run_id_terminal).is_none());
2243
2244 assert!(reducer.get_run_history(&run_id_active).is_some());
2246 assert!(reducer.get_attempt_history(&run_id_active).is_some());
2247 let active_history =
2248 reducer.get_run_history(&run_id_active).expect("active history should exist");
2249 assert_eq!(active_history.len(), 2); }
2251}