Skip to main content

actionqueue_storage/recovery/
reducer.rs

1//! Recovery reducer - applies events to reconstruct state.
2//!
3//! A replay reducer applies WAL events to reconstruct the current state
4//! of the ActionQueue system. This is used during recovery to rebuild state
5//! from the WAL.
6//!
7//! Lease replay semantics are strict by design:
8//! - lease events must reference a known run,
9//! - lease events must satisfy explicit run-state preconditions,
10//! - lease mutations must satisfy owner/expiry causality checks,
11//! - terminal runs cannot retain active lease projection.
12
13use std::collections::{HashMap, HashSet};
14
15use actionqueue_core::budget::BudgetDimension;
16use actionqueue_core::ids::{ActorId, LedgerEntryId, RunId, TaskId, TenantId};
17use actionqueue_core::mutation::{AttemptOutcome, AttemptResultKind};
18use actionqueue_core::platform::{Capability, Role};
19use actionqueue_core::run::state::RunState;
20use actionqueue_core::run::transitions::is_valid_transition;
21use actionqueue_core::run::RunInstanceError;
22use actionqueue_core::subscription::{EventFilter, SubscriptionId};
23
24use crate::wal::event::{WalEvent, WalEventType};
25
26/// Projection record for a task with timestamp metadata.
27#[derive(Debug, Clone)]
28pub struct TaskRecord {
29    /// Canonical task specification.
30    task_spec: actionqueue_core::task::task_spec::TaskSpec,
31    /// Task creation timestamp (Unix epoch seconds).
32    created_at: u64,
33    /// Task update timestamp (Unix epoch seconds), if any.
34    updated_at: Option<u64>,
35    /// Task cancellation timestamp (Unix epoch seconds), if canceled.
36    canceled_at: Option<u64>,
37}
38
39/// A deterministic state transition record for a run.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct RunStateHistoryEntry {
42    /// The previous state, or None for the initial Scheduled entry.
43    pub(crate) from: Option<RunState>,
44    /// The new state recorded by the WAL event.
45    pub(crate) to: RunState,
46    /// The timestamp associated with the transition.
47    pub(crate) timestamp: u64,
48}
49
50impl RunStateHistoryEntry {
51    /// Returns the previous run state, if any.
52    pub fn from(&self) -> Option<RunState> {
53        self.from
54    }
55
56    /// Returns the new run state.
57    pub fn to(&self) -> RunState {
58        self.to
59    }
60
61    /// Returns the timestamp of the transition.
62    pub fn timestamp(&self) -> u64 {
63        self.timestamp
64    }
65}
66
67/// A deterministic attempt lineage record for a run.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct AttemptHistoryEntry {
70    /// The attempt identifier.
71    pub(crate) attempt_id: actionqueue_core::ids::AttemptId,
72    /// The timestamp when the attempt started.
73    pub(crate) started_at: u64,
74    /// The timestamp when the attempt finished, if finished.
75    pub(crate) finished_at: Option<u64>,
76    /// Canonical attempt result taxonomy, if finished.
77    pub(crate) result: Option<AttemptResultKind>,
78    /// Optional error message when the attempt failed.
79    pub(crate) error: Option<String>,
80    /// Optional opaque output bytes produced by the handler on success.
81    pub(crate) output: Option<Vec<u8>>,
82}
83
84impl AttemptHistoryEntry {
85    /// Returns the attempt identifier.
86    pub fn attempt_id(&self) -> actionqueue_core::ids::AttemptId {
87        self.attempt_id
88    }
89
90    /// Returns the timestamp when the attempt started.
91    pub fn started_at(&self) -> u64 {
92        self.started_at
93    }
94
95    /// Returns the timestamp when the attempt finished, if any.
96    pub fn finished_at(&self) -> Option<u64> {
97        self.finished_at
98    }
99
100    /// Returns canonical attempt result taxonomy, if finished.
101    pub fn result(&self) -> Option<AttemptResultKind> {
102        self.result
103    }
104
105    /// Returns whether the attempt succeeded, if finished.
106    ///
107    /// This is a compatibility view over [`Self::result`].
108    pub fn success(&self) -> Option<bool> {
109        self.result.map(|result| matches!(result, AttemptResultKind::Success))
110    }
111
112    /// Returns the error message, if any.
113    pub fn error(&self) -> Option<&str> {
114        self.error.as_deref()
115    }
116
117    /// Returns the optional opaque output bytes produced by the handler.
118    pub fn output(&self) -> Option<&[u8]> {
119        self.output.as_deref()
120    }
121}
122
123/// A deterministic lease metadata record for a run.
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct LeaseMetadata {
126    /// Lease owner string.
127    pub(crate) owner: String,
128    /// Lease expiry timestamp.
129    pub(crate) expiry: u64,
130    /// Timestamp when the lease was acquired.
131    pub(crate) acquired_at: u64,
132    /// Timestamp when the lease was last updated.
133    pub(crate) updated_at: u64,
134}
135
136impl LeaseMetadata {
137    /// Returns the lease owner.
138    pub fn owner(&self) -> &str {
139        &self.owner
140    }
141
142    /// Returns the lease expiry timestamp.
143    pub fn expiry(&self) -> u64 {
144        self.expiry
145    }
146
147    /// Returns the lease acquisition timestamp.
148    pub fn acquired_at(&self) -> u64 {
149        self.acquired_at
150    }
151
152    /// Returns the lease last-updated timestamp.
153    pub fn updated_at(&self) -> u64 {
154        self.updated_at
155    }
156}
157
158/// Actor projection record.
159#[derive(Debug, Clone)]
160pub struct ActorRecord {
161    pub actor_id: ActorId,
162    pub identity: String,
163    pub capabilities: Vec<String>,
164    pub department: Option<String>,
165    pub heartbeat_interval_secs: u64,
166    pub tenant_id: Option<TenantId>,
167    pub registered_at: u64,
168    pub last_heartbeat_at: Option<u64>,
169    pub deregistered_at: Option<u64>,
170}
171
172/// Tenant projection record.
173#[derive(Debug, Clone)]
174pub struct TenantRecord {
175    pub tenant_id: TenantId,
176    pub name: String,
177    pub created_at: u64,
178}
179
180/// Role assignment record.
181#[derive(Debug, Clone)]
182pub struct RoleAssignmentRecord {
183    pub actor_id: ActorId,
184    pub role: Role,
185    pub tenant_id: TenantId,
186    pub assigned_at: u64,
187}
188
189/// Capability grant record.
190#[derive(Debug, Clone)]
191pub struct CapabilityGrantRecord {
192    pub actor_id: ActorId,
193    pub capability: Capability,
194    pub tenant_id: TenantId,
195    pub granted_at: u64,
196    pub revoked_at: Option<u64>,
197}
198
199/// Ledger entry record.
200#[derive(Debug, Clone)]
201pub struct LedgerEntryRecord {
202    pub entry_id: LedgerEntryId,
203    pub tenant_id: TenantId,
204    pub ledger_key: String,
205    pub actor_id: Option<ActorId>,
206    pub payload: Vec<u8>,
207    pub timestamp: u64,
208}
209
210/// Budget allocation and consumption record.
211#[derive(Debug, Clone)]
212pub struct BudgetRecord {
213    /// The budget dimension this record covers.
214    pub dimension: BudgetDimension,
215    /// The maximum amount allowed before dispatch is blocked.
216    pub limit: u64,
217    /// The total amount consumed so far.
218    pub consumed: u64,
219    /// The timestamp when the budget was allocated.
220    pub allocated_at: u64,
221    /// Whether the budget has been exhausted (consumed >= limit).
222    pub exhausted: bool,
223}
224
225/// Subscription state record.
226#[derive(Debug, Clone)]
227pub struct SubscriptionRecord {
228    /// The subscription identifier.
229    pub subscription_id: SubscriptionId,
230    /// The subscribing task identifier.
231    pub task_id: actionqueue_core::ids::TaskId,
232    /// The event filter for this subscription.
233    pub filter: EventFilter,
234    /// The timestamp when the subscription was created.
235    pub created_at: u64,
236    /// The timestamp when the subscription was last triggered, if any.
237    pub triggered_at: Option<u64>,
238    /// The timestamp when the subscription was canceled, if any.
239    pub canceled_at: Option<u64>,
240}
241
242impl TaskRecord {
243    /// Returns the canonical task specification.
244    pub fn task_spec(&self) -> &actionqueue_core::task::task_spec::TaskSpec {
245        &self.task_spec
246    }
247
248    /// Returns the task creation timestamp.
249    pub fn created_at(&self) -> u64 {
250        self.created_at
251    }
252
253    /// Returns the task update timestamp, if any.
254    pub fn updated_at(&self) -> Option<u64> {
255        self.updated_at
256    }
257
258    /// Returns the task cancellation timestamp, if canceled.
259    pub fn canceled_at(&self) -> Option<u64> {
260        self.canceled_at
261    }
262}
263
264/// A reducer that applies WAL events to reconstruct state.
265#[derive(Debug, Clone)]
266pub struct ReplayReducer {
267    /// The current state of all runs being replayed.
268    runs: HashMap<actionqueue_core::ids::RunId, RunState>,
269    /// The current state of all tasks being replayed.
270    tasks: HashMap<actionqueue_core::ids::TaskId, TaskRecord>,
271    /// The current state of all run instances being replayed.
272    run_instances:
273        HashMap<actionqueue_core::ids::RunId, actionqueue_core::run::run_instance::RunInstance>,
274    /// Secondary index: task_id → run_ids for O(R_task) lookups.
275    runs_by_task: HashMap<TaskId, Vec<RunId>>,
276    /// Run lifecycle history derived from WAL transitions.
277    run_history: HashMap<actionqueue_core::ids::RunId, Vec<RunStateHistoryEntry>>,
278    /// Attempt lineage derived from WAL attempt events.
279    attempt_history: HashMap<actionqueue_core::ids::RunId, Vec<AttemptHistoryEntry>>,
280    /// The active lease projection for runs (`owner`, `expiry`).
281    leases: HashMap<actionqueue_core::ids::RunId, (String, u64)>,
282    /// Lease metadata derived from WAL lease events.
283    lease_metadata: HashMap<actionqueue_core::ids::RunId, LeaseMetadata>,
284    /// The latest sequence number processed.
285    latest_sequence: u64,
286    /// Task canceled projection keyed by task identifier.
287    task_canceled_at: HashMap<TaskId, u64>,
288    /// Whether engine scheduling/dispatch is currently paused.
289    engine_paused: bool,
290    /// Timestamp of most recent engine pause, if any.
291    engine_paused_at: Option<u64>,
292    /// Timestamp of most recent engine resume, if any.
293    engine_resumed_at: Option<u64>,
294    /// Declared task dependencies: task_id → prerequisite task_ids.
295    /// Rebuilt from `DependencyDeclared` WAL events. Used by the dispatch loop
296    /// to reconstruct the `DependencyGate` after recovery.
297    dependency_declarations: HashMap<TaskId, HashSet<TaskId>>,
298    /// Budget allocation and consumption records keyed by (task_id, dimension).
299    budgets: HashMap<(actionqueue_core::ids::TaskId, BudgetDimension), BudgetRecord>,
300    /// Subscription state records keyed by subscription identifier.
301    subscriptions: HashMap<SubscriptionId, SubscriptionRecord>,
302    /// Actor projection records keyed by actor identifier.
303    actors: HashMap<ActorId, ActorRecord>,
304    /// Tenant projection records keyed by tenant identifier.
305    tenants: HashMap<TenantId, TenantRecord>,
306    /// Role assignment records keyed by (actor_id, tenant_id).
307    role_assignments: HashMap<(ActorId, TenantId), RoleAssignmentRecord>,
308    /// Capability grant records keyed by (actor_id, capability_key, tenant_id).
309    /// Using String as capability key to avoid Hash bound on Capability.
310    capability_grants: HashMap<(ActorId, String, TenantId), CapabilityGrantRecord>,
311    /// Ledger entries (append-only, ordered by insertion).
312    ledger_entries: Vec<LedgerEntryRecord>,
313}
314
315impl ReplayReducer {
316    /// Creates a new replay reducer.
317    pub fn new() -> Self {
318        ReplayReducer {
319            runs: HashMap::new(),
320            tasks: HashMap::new(),
321            run_instances: HashMap::new(),
322            runs_by_task: HashMap::new(),
323            run_history: HashMap::new(),
324            attempt_history: HashMap::new(),
325            leases: HashMap::new(),
326            lease_metadata: HashMap::new(),
327            latest_sequence: 0,
328            task_canceled_at: HashMap::new(),
329            engine_paused: false,
330            engine_paused_at: None,
331            engine_resumed_at: None,
332            dependency_declarations: HashMap::new(),
333            budgets: HashMap::new(),
334            subscriptions: HashMap::new(),
335            actors: HashMap::new(),
336            tenants: HashMap::new(),
337            role_assignments: HashMap::new(),
338            capability_grants: HashMap::new(),
339            ledger_entries: Vec::new(),
340        }
341    }
342
343    /// Returns the current lease state (owner, expiry) for a run by ID.
344    /// Returns None if no lease is active for the run.
345    pub fn get_lease(&self, run_id: &actionqueue_core::ids::RunId) -> Option<&(String, u64)> {
346        self.leases.get(run_id)
347    }
348
349    /// Returns the current lease metadata for a run by ID.
350    /// Returns None if no lease metadata is active for the run.
351    pub fn get_lease_metadata(
352        &self,
353        run_id: &actionqueue_core::ids::RunId,
354    ) -> Option<&LeaseMetadata> {
355        self.lease_metadata.get(run_id)
356    }
357
358    /// Returns the current state of a run by ID.
359    pub fn get_run_state(&self, run_id: &actionqueue_core::ids::RunId) -> Option<&RunState> {
360        self.runs.get(run_id)
361    }
362
363    /// Returns the current state of a task by ID.
364    pub fn get_task(
365        &self,
366        task_id: &actionqueue_core::ids::TaskId,
367    ) -> Option<&actionqueue_core::task::task_spec::TaskSpec> {
368        self.tasks.get(task_id).map(TaskRecord::task_spec)
369    }
370
371    /// Returns the full task record by ID.
372    pub fn get_task_record(&self, task_id: &actionqueue_core::ids::TaskId) -> Option<&TaskRecord> {
373        self.tasks.get(task_id)
374    }
375
376    /// Returns the current state of a run instance by ID.
377    pub fn get_run_instance(
378        &self,
379        run_id: &actionqueue_core::ids::RunId,
380    ) -> Option<&actionqueue_core::run::run_instance::RunInstance> {
381        self.run_instances.get(run_id)
382    }
383
384    /// Returns a mutable reference to a run instance by ID.
385    ///
386    /// This is used during snapshot bootstrap to restore attempt state that
387    /// cannot be replayed through normal WAL event application.
388    pub(crate) fn get_run_instance_mut(
389        &mut self,
390        run_id: actionqueue_core::ids::RunId,
391    ) -> Option<&mut actionqueue_core::run::run_instance::RunInstance> {
392        self.run_instances.get_mut(&run_id)
393    }
394
395    /// Returns the run state history for a run by ID.
396    pub fn get_run_history(
397        &self,
398        run_id: &actionqueue_core::ids::RunId,
399    ) -> Option<&[RunStateHistoryEntry]> {
400        self.run_history.get(run_id).map(Vec::as_slice)
401    }
402
403    /// Returns the attempt history for a run by ID.
404    pub fn get_attempt_history(
405        &self,
406        run_id: &actionqueue_core::ids::RunId,
407    ) -> Option<&[AttemptHistoryEntry]> {
408        self.attempt_history.get(run_id).map(Vec::as_slice)
409    }
410
411    /// Returns the latest sequence number that has been applied.
412    pub fn latest_sequence(&self) -> u64 {
413        self.latest_sequence
414    }
415
416    /// Returns the number of runs being tracked.
417    pub fn run_count(&self) -> usize {
418        self.run_instances.len()
419    }
420
421    /// Returns the number of tasks being tracked.
422    pub fn task_count(&self) -> usize {
423        self.tasks.len()
424    }
425
426    /// Returns an iterator over run instances and their states.
427    ///
428    /// This method provides access to the run instances projection for
429    /// read-only operations like stats collection.
430    pub fn run_instances(
431        &self,
432    ) -> impl Iterator<Item = &actionqueue_core::run::run_instance::RunInstance> {
433        self.run_instances.values()
434    }
435
436    /// Returns run identifiers owned by the provided task in deterministic order.
437    pub fn run_ids_for_task(&self, task_id: TaskId) -> Vec<RunId> {
438        let mut run_ids = self.runs_by_task.get(&task_id).cloned().unwrap_or_default();
439        run_ids.sort();
440        run_ids
441    }
442
443    /// Returns run instances for the provided task via the O(R_task) secondary index.
444    pub fn runs_for_task(
445        &self,
446        task_id: TaskId,
447    ) -> impl Iterator<Item = &actionqueue_core::run::run_instance::RunInstance> {
448        self.runs_by_task
449            .get(&task_id)
450            .into_iter()
451            .flat_map(|ids| ids.iter().filter_map(|id| self.run_instances.get(id)))
452    }
453
454    /// Returns true if the task is marked canceled in projection state.
455    pub fn is_task_canceled(&self, task_id: TaskId) -> bool {
456        self.task_canceled_at.contains_key(&task_id)
457    }
458
459    /// Returns the task canceled timestamp, if present.
460    pub fn task_canceled_at(&self, task_id: TaskId) -> Option<u64> {
461        self.task_canceled_at.get(&task_id).copied()
462    }
463
464    /// Returns true when engine scheduling/dispatch is paused in projection state.
465    pub fn is_engine_paused(&self) -> bool {
466        self.engine_paused
467    }
468
469    /// Returns the timestamp of the most recent engine pause event, if any.
470    pub fn engine_paused_at(&self) -> Option<u64> {
471        self.engine_paused_at
472    }
473
474    /// Returns the timestamp of the most recent engine resume event, if any.
475    pub fn engine_resumed_at(&self) -> Option<u64> {
476        self.engine_resumed_at
477    }
478
479    /// Returns the dependency declarations for gate reconstruction at bootstrap.
480    ///
481    /// Provides raw prerequisite data so the dispatch loop can rebuild the
482    /// `DependencyGate` without a circular storage → workflow dependency.
483    pub fn dependency_declarations(&self) -> impl Iterator<Item = (TaskId, &HashSet<TaskId>)> + '_ {
484        self.dependency_declarations.iter().map(|(task_id, prereqs)| (*task_id, prereqs))
485    }
486
487    /// Returns (child_task_id, parent_task_id) pairs for hierarchy reconstruction at bootstrap.
488    ///
489    /// Derived from the `parent_task_id` field on each `TaskSpec` stored in the projection.
490    /// The dispatch loop uses these to rebuild the `HierarchyTracker` without a circular
491    /// storage → workflow dependency.
492    pub fn parent_child_mappings(&self) -> impl Iterator<Item = (TaskId, TaskId)> + '_ {
493        self.tasks.values().filter_map(|tr| {
494            tr.task_spec().parent_task_id().map(|parent| (tr.task_spec().id(), parent))
495        })
496    }
497
498    /// Returns an iterator over task IDs and specs.
499    ///
500    /// This method provides access to the tasks projection for
501    /// read-only operations like stats collection.
502    pub fn tasks(&self) -> impl Iterator<Item = &actionqueue_core::task::task_spec::TaskSpec> {
503        self.tasks.values().map(TaskRecord::task_spec)
504    }
505
506    /// Returns an iterator over task records with timestamp metadata.
507    pub fn task_records(&self) -> impl Iterator<Item = &TaskRecord> {
508        self.tasks.values()
509    }
510
511    /// Applies an event to the current state.
512    pub fn apply(&mut self, event: &WalEvent) -> Result<(), ReplayReducerError> {
513        // Validate sequence order (must be monotonically increasing).
514        // This check also prevents duplicate events since a repeated sequence
515        // would not be strictly greater than latest_sequence.
516        if event.sequence() <= self.latest_sequence {
517            return Err(ReplayReducerError::InvalidTransition);
518        }
519
520        match event.event() {
521            WalEventType::TaskCreated { task_spec, timestamp } => {
522                self.apply_task_created(task_spec, *timestamp)?;
523            }
524            WalEventType::RunCreated { run_instance } => {
525                self.apply_run_created(run_instance)?;
526            }
527            WalEventType::RunStateChanged { run_id, previous_state, new_state, timestamp } => {
528                self.apply_run_state_changed(run_id, previous_state, new_state, *timestamp)?;
529            }
530            WalEventType::AttemptStarted { run_id, attempt_id, timestamp } => {
531                self.apply_attempt_started(run_id, attempt_id, *timestamp)?;
532            }
533            WalEventType::AttemptFinished {
534                run_id,
535                attempt_id,
536                result,
537                error,
538                output,
539                timestamp,
540            } => {
541                let outcome =
542                    AttemptOutcome::from_raw_parts(*result, error.clone(), output.clone())
543                        .unwrap_or_else(|e| {
544                            tracing::warn!(
545                                "WAL replay: invalid attempt outcome shape: {e}; falling back to \
546                                 safe reconstruction"
547                            );
548                            match result {
549                                actionqueue_core::mutation::AttemptResultKind::Success => {
550                                    AttemptOutcome::success()
551                                }
552                                actionqueue_core::mutation::AttemptResultKind::Failure => {
553                                    AttemptOutcome::failure(
554                                        error
555                                            .clone()
556                                            .unwrap_or_else(|| "unknown failure".to_string()),
557                                    )
558                                }
559                                actionqueue_core::mutation::AttemptResultKind::Timeout => {
560                                    AttemptOutcome::timeout(
561                                        error
562                                            .clone()
563                                            .unwrap_or_else(|| "unknown timeout".to_string()),
564                                    )
565                                }
566                                actionqueue_core::mutation::AttemptResultKind::Suspended => {
567                                    AttemptOutcome::suspended()
568                                }
569                            }
570                        });
571                self.apply_attempt_finished(run_id, attempt_id, outcome, *timestamp)?;
572            }
573            WalEventType::TaskCanceled { task_id, timestamp } => {
574                self.apply_task_canceled(task_id, *timestamp)?;
575            }
576            WalEventType::RunCanceled { run_id, timestamp } => {
577                self.apply_run_canceled(run_id, *timestamp)?;
578            }
579            WalEventType::LeaseAcquired { run_id, owner, expiry, timestamp } => {
580                self.apply_lease_acquired(run_id, owner.clone(), *expiry, *timestamp)?;
581            }
582            WalEventType::LeaseHeartbeat { run_id, owner, expiry, timestamp } => {
583                self.apply_lease_heartbeat(run_id, owner.clone(), *expiry, *timestamp)?;
584            }
585            WalEventType::LeaseExpired { run_id, owner, expiry, timestamp } => {
586                self.apply_lease_expired(run_id, owner.clone(), *expiry, *timestamp)?;
587            }
588            WalEventType::LeaseReleased { run_id, owner, expiry, timestamp } => {
589                self.apply_lease_released(run_id, owner.clone(), *expiry, *timestamp)?;
590            }
591            WalEventType::EnginePaused { timestamp } => {
592                self.apply_engine_paused(*timestamp)?;
593            }
594            WalEventType::EngineResumed { timestamp } => {
595                self.apply_engine_resumed(*timestamp)?;
596            }
597            WalEventType::DependencyDeclared { task_id, depends_on, .. } => {
598                self.apply_dependency_declared(*task_id, depends_on);
599            }
600            WalEventType::RunSuspended { run_id, reason: _, timestamp } => {
601                self.apply_run_state_changed(
602                    run_id,
603                    &RunState::Running,
604                    &RunState::Suspended,
605                    *timestamp,
606                )?;
607            }
608            WalEventType::RunResumed { run_id, timestamp } => {
609                self.apply_run_state_changed(
610                    run_id,
611                    &RunState::Suspended,
612                    &RunState::Ready,
613                    *timestamp,
614                )?;
615            }
616            WalEventType::BudgetAllocated { task_id, dimension, limit, timestamp } => {
617                self.apply_budget_allocated(*task_id, *dimension, *limit, *timestamp);
618            }
619            WalEventType::BudgetConsumed { task_id, dimension, amount, timestamp } => {
620                self.apply_budget_consumed(*task_id, *dimension, *amount, *timestamp);
621            }
622            WalEventType::BudgetExhausted { task_id, dimension, .. } => {
623                // Exhaustion is normally derived from consumption (consumed >= limit).
624                // This handler exists for snapshot bootstrap: if a snapshot records
625                // exhausted=true with consumed<limit, the bootstrap synthesizes this
626                // event to restore the flag.
627                if let Some(record) = self.budgets.get_mut(&(*task_id, *dimension)) {
628                    record.exhausted = true;
629                }
630            }
631            WalEventType::BudgetReplenished { task_id, dimension, new_limit, timestamp } => {
632                self.apply_budget_replenished(*task_id, *dimension, *new_limit, *timestamp);
633            }
634            WalEventType::SubscriptionCreated { subscription_id, task_id, filter, timestamp } => {
635                self.apply_subscription_created(
636                    *subscription_id,
637                    *task_id,
638                    filter.clone(),
639                    *timestamp,
640                );
641            }
642            WalEventType::SubscriptionTriggered { subscription_id, timestamp } => {
643                self.apply_subscription_triggered(*subscription_id, *timestamp);
644            }
645            WalEventType::SubscriptionCanceled { subscription_id, timestamp } => {
646                self.apply_subscription_canceled(*subscription_id, *timestamp);
647            }
648
649            // ── WAL v5: Actor events ──────────────────────────────────────
650            WalEventType::ActorRegistered {
651                actor_id,
652                identity,
653                capabilities,
654                department,
655                heartbeat_interval_secs,
656                tenant_id,
657                timestamp,
658            } => {
659                self.actors.insert(
660                    *actor_id,
661                    ActorRecord {
662                        actor_id: *actor_id,
663                        identity: identity.clone(),
664                        capabilities: capabilities.clone(),
665                        department: department.clone(),
666                        heartbeat_interval_secs: *heartbeat_interval_secs,
667                        tenant_id: *tenant_id,
668                        registered_at: *timestamp,
669                        last_heartbeat_at: None,
670                        deregistered_at: None,
671                    },
672                );
673            }
674            WalEventType::ActorDeregistered { actor_id, timestamp } => {
675                if let Some(record) = self.actors.get_mut(actor_id) {
676                    record.deregistered_at = Some(*timestamp);
677                }
678            }
679            WalEventType::ActorHeartbeat { actor_id, timestamp } => {
680                if let Some(record) = self.actors.get_mut(actor_id) {
681                    record.last_heartbeat_at = Some(*timestamp);
682                }
683            }
684
685            // ── WAL v5: Platform events ───────────────────────────────────
686            WalEventType::TenantCreated { tenant_id, name, timestamp } => {
687                self.tenants.insert(
688                    *tenant_id,
689                    TenantRecord {
690                        tenant_id: *tenant_id,
691                        name: name.clone(),
692                        created_at: *timestamp,
693                    },
694                );
695            }
696            WalEventType::RoleAssigned { actor_id, role, tenant_id, timestamp } => {
697                self.role_assignments.insert(
698                    (*actor_id, *tenant_id),
699                    RoleAssignmentRecord {
700                        actor_id: *actor_id,
701                        role: role.clone(),
702                        tenant_id: *tenant_id,
703                        assigned_at: *timestamp,
704                    },
705                );
706            }
707            WalEventType::CapabilityGranted { actor_id, capability, tenant_id, timestamp } => {
708                let key = capability_key(capability);
709                self.capability_grants.insert(
710                    (*actor_id, key.clone(), *tenant_id),
711                    CapabilityGrantRecord {
712                        actor_id: *actor_id,
713                        capability: capability.clone(),
714                        tenant_id: *tenant_id,
715                        granted_at: *timestamp,
716                        revoked_at: None,
717                    },
718                );
719            }
720            WalEventType::CapabilityRevoked { actor_id, capability, tenant_id, timestamp } => {
721                let key = capability_key(capability);
722                if let Some(record) = self.capability_grants.get_mut(&(*actor_id, key, *tenant_id))
723                {
724                    record.revoked_at = Some(*timestamp);
725                }
726            }
727            WalEventType::LedgerEntryAppended {
728                entry_id,
729                tenant_id,
730                ledger_key,
731                actor_id,
732                payload,
733                timestamp,
734            } => {
735                self.ledger_entries.push(LedgerEntryRecord {
736                    entry_id: *entry_id,
737                    tenant_id: *tenant_id,
738                    ledger_key: ledger_key.clone(),
739                    actor_id: *actor_id,
740                    payload: payload.clone(),
741                    timestamp: *timestamp,
742                });
743            }
744        }
745
746        // Commit bookkeeping only after semantic application succeeds.
747        self.latest_sequence = event.sequence();
748
749        Ok(())
750    }
751
752    fn apply_task_created(
753        &mut self,
754        task_spec: &actionqueue_core::task::task_spec::TaskSpec,
755        timestamp: u64,
756    ) -> Result<(), ReplayReducerError> {
757        let task_id = task_spec.id();
758        if self.tasks.contains_key(&task_id) {
759            return Err(ReplayReducerError::DuplicateEvent);
760        }
761        self.tasks.insert(
762            task_id,
763            TaskRecord {
764                task_spec: task_spec.clone(),
765                created_at: timestamp,
766                updated_at: None,
767                canceled_at: None,
768            },
769        );
770        Ok(())
771    }
772
773    fn apply_task_canceled(
774        &mut self,
775        task_id: &TaskId,
776        timestamp: u64,
777    ) -> Result<(), ReplayReducerError> {
778        let task_record = self.tasks.get_mut(task_id).ok_or(ReplayReducerError::TaskCausality(
779            TaskCausalityError::UnknownTask { task_id: *task_id },
780        ))?;
781
782        if let Some(existing_timestamp) = task_record.canceled_at {
783            return Err(ReplayReducerError::TaskCausality(TaskCausalityError::AlreadyCanceled {
784                task_id: *task_id,
785                first_canceled_at: existing_timestamp,
786                duplicate_canceled_at: timestamp,
787            }));
788        }
789
790        task_record.canceled_at = Some(timestamp);
791        self.task_canceled_at.insert(*task_id, timestamp);
792        Ok(())
793    }
794
795    fn apply_run_created(
796        &mut self,
797        run_instance: &actionqueue_core::run::run_instance::RunInstance,
798    ) -> Result<(), ReplayReducerError> {
799        let run_id = run_instance.id();
800        if self.runs.contains_key(&run_id) {
801            return Err(ReplayReducerError::DuplicateEvent);
802        }
803        // Verify run state is valid (must start in Scheduled state)
804        if run_instance.state() != RunState::Scheduled {
805            return Err(ReplayReducerError::InvalidTransition);
806        }
807        let task_id = run_instance.task_id();
808        self.runs.insert(run_id, run_instance.state());
809        self.run_instances.insert(run_id, run_instance.clone());
810        self.runs_by_task.entry(task_id).or_default().push(run_id);
811        self.run_history.insert(
812            run_id,
813            vec![RunStateHistoryEntry {
814                from: None,
815                to: RunState::Scheduled,
816                timestamp: run_instance.created_at(),
817            }],
818        );
819        self.attempt_history.insert(run_id, Vec::new());
820        Ok(())
821    }
822
823    fn apply_run_state_changed(
824        &mut self,
825        run_id: &actionqueue_core::ids::RunId,
826        previous_state: &RunState,
827        new_state: &RunState,
828        timestamp: u64,
829    ) -> Result<(), ReplayReducerError> {
830        // Check that the run exists
831        let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
832
833        // Verify the transition matches what was recorded
834        if *current_state != *previous_state {
835            return Err(ReplayReducerError::InvalidTransition);
836        }
837
838        // Verify the transition is valid according to run transitions
839        if !is_valid_transition(*previous_state, *new_state) {
840            return Err(ReplayReducerError::InvalidTransition);
841        }
842
843        // Attempt lifecycle integrity:
844        // - entering Running requires no active attempt yet
845        // - leaving Running requires the active attempt to be closed first
846        if let Some(run_instance) = self.run_instances.get(run_id) {
847            if run_instance.state() != *current_state {
848                return Err(ReplayReducerError::CorruptedData);
849            }
850
851            if *new_state == RunState::Running && run_instance.current_attempt_id().is_some() {
852                return Err(ReplayReducerError::InvalidTransition);
853            }
854
855            if *current_state == RunState::Running
856                && *new_state != RunState::Running
857                && *new_state != RunState::Canceled
858                && run_instance.current_attempt_id().is_some()
859            {
860                return Err(ReplayReducerError::InvalidTransition);
861            }
862        }
863
864        // Update the run state
865        self.runs.insert(*run_id, *new_state);
866
867        // Update the run instance state if present
868        if let Some(run_instance) = self.run_instances.get_mut(run_id) {
869            run_instance.transition_to(*new_state).map_err(Self::map_run_instance_error)?;
870            run_instance.record_state_change_at(timestamp);
871        }
872
873        let history = self.run_history.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
874        history.push(RunStateHistoryEntry {
875            from: Some(*previous_state),
876            to: *new_state,
877            timestamp,
878        });
879
880        self.clear_lease_projection_if_terminal(run_id, *new_state)?;
881
882        Ok(())
883    }
884
885    fn apply_attempt_started(
886        &mut self,
887        run_id: &actionqueue_core::ids::RunId,
888        attempt_id: &actionqueue_core::ids::AttemptId,
889        timestamp: u64,
890    ) -> Result<(), ReplayReducerError> {
891        // Verify the run exists and is actively running.
892        let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
893        if *current_state != RunState::Running {
894            return Err(ReplayReducerError::InvalidTransition);
895        }
896
897        let run_instance =
898            self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
899
900        if run_instance.state() != RunState::Running {
901            return Err(ReplayReducerError::CorruptedData);
902        }
903
904        if run_instance.current_attempt_id().is_some() {
905            return Err(ReplayReducerError::InvalidTransition);
906        }
907
908        run_instance.start_attempt(*attempt_id).map_err(Self::map_run_instance_error)?;
909
910        let attempts = self.attempt_history.entry(*run_id).or_default();
911        attempts.push(AttemptHistoryEntry {
912            attempt_id: *attempt_id,
913            started_at: timestamp,
914            finished_at: None,
915            result: None,
916            error: None,
917            output: None,
918        });
919
920        Ok(())
921    }
922
923    fn apply_attempt_finished(
924        &mut self,
925        run_id: &actionqueue_core::ids::RunId,
926        attempt_id: &actionqueue_core::ids::AttemptId,
927        outcome: AttemptOutcome,
928        timestamp: u64,
929    ) -> Result<(), ReplayReducerError> {
930        // Verify the run exists and is actively running.
931        let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
932        if *current_state != RunState::Running {
933            return Err(ReplayReducerError::InvalidTransition);
934        }
935
936        let run_instance =
937            self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
938
939        if run_instance.state() != RunState::Running {
940            return Err(ReplayReducerError::CorruptedData);
941        }
942
943        run_instance.finish_attempt(*attempt_id).map_err(Self::map_run_instance_error)?;
944
945        let attempts =
946            self.attempt_history.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
947        let entry = attempts
948            .iter_mut()
949            .find(|entry| entry.attempt_id == *attempt_id)
950            .ok_or(ReplayReducerError::CorruptedData)?;
951
952        if entry.finished_at.is_some() {
953            return Err(ReplayReducerError::CorruptedData);
954        }
955
956        entry.finished_at = Some(timestamp);
957        let (result_kind, error_detail, output) = outcome.into_parts();
958        entry.result = Some(result_kind);
959        entry.error = error_detail;
960        entry.output = output;
961
962        Ok(())
963    }
964
965    fn apply_run_canceled(
966        &mut self,
967        run_id: &actionqueue_core::ids::RunId,
968        timestamp: u64,
969    ) -> Result<(), ReplayReducerError> {
970        let current_state = self.runs.get(run_id).ok_or(ReplayReducerError::InvalidTransition)?;
971        let previous_state = *current_state;
972
973        // Cancellation must be a valid transition from the current state.
974        if !is_valid_transition(previous_state, RunState::Canceled) {
975            return Err(ReplayReducerError::InvalidTransition);
976        }
977
978        self.runs.insert(*run_id, RunState::Canceled);
979
980        let run_instance =
981            self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
982        run_instance.transition_to(RunState::Canceled).map_err(Self::map_run_instance_error)?;
983        run_instance.record_state_change_at(timestamp);
984
985        let history = self.run_history.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
986        history.push(RunStateHistoryEntry {
987            from: Some(previous_state),
988            to: RunState::Canceled,
989            timestamp,
990        });
991
992        self.clear_lease_projection_if_terminal(run_id, RunState::Canceled)?;
993
994        Ok(())
995    }
996
997    fn apply_lease_acquired(
998        &mut self,
999        run_id: &actionqueue_core::ids::RunId,
1000        owner: String,
1001        expiry: u64,
1002        timestamp: u64,
1003    ) -> Result<(), ReplayReducerError> {
1004        self.validate_lease_run_precondition(run_id, LeaseEventKind::Acquire)?;
1005
1006        if self.leases.contains_key(run_id) {
1007            return Err(ReplayReducerError::LeaseCausality(
1008                LeaseCausalityError::LeaseAlreadyActive { run_id: *run_id },
1009            ));
1010        }
1011
1012        let metadata_owner = owner.clone();
1013        self.leases.insert(*run_id, (owner, expiry));
1014        self.lease_metadata.insert(
1015            *run_id,
1016            LeaseMetadata {
1017                owner: metadata_owner,
1018                expiry,
1019                acquired_at: timestamp,
1020                updated_at: timestamp,
1021            },
1022        );
1023        Ok(())
1024    }
1025
1026    fn apply_lease_heartbeat(
1027        &mut self,
1028        run_id: &actionqueue_core::ids::RunId,
1029        owner: String,
1030        expiry: u64,
1031        timestamp: u64,
1032    ) -> Result<(), ReplayReducerError> {
1033        self.validate_lease_run_precondition(run_id, LeaseEventKind::Heartbeat)?;
1034
1035        let (current_owner, current_expiry) =
1036            self.active_lease_snapshot(run_id, LeaseEventKind::Heartbeat)?;
1037
1038        if owner != current_owner {
1039            return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::OwnerMismatch {
1040                run_id: *run_id,
1041                event: LeaseEventKind::Heartbeat,
1042                expected_owner: current_owner,
1043                actual_owner: owner,
1044            }));
1045        }
1046
1047        if expiry < current_expiry {
1048            return Err(ReplayReducerError::LeaseCausality(
1049                LeaseCausalityError::NonMonotonicHeartbeatExpiry {
1050                    run_id: *run_id,
1051                    previous_expiry: current_expiry,
1052                    proposed_expiry: expiry,
1053                },
1054            ));
1055        }
1056
1057        self.leases.insert(*run_id, (current_owner, expiry));
1058        let metadata =
1059            self.lease_metadata.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
1060        metadata.expiry = expiry;
1061        metadata.updated_at = timestamp;
1062        Ok(())
1063    }
1064
1065    fn apply_lease_expired(
1066        &mut self,
1067        run_id: &actionqueue_core::ids::RunId,
1068        owner: String,
1069        expiry: u64,
1070        _timestamp: u64,
1071    ) -> Result<(), ReplayReducerError> {
1072        let current_state = self.validate_lease_run_precondition(run_id, LeaseEventKind::Expire)?;
1073
1074        self.validate_exact_lease_metadata_match(run_id, LeaseEventKind::Expire, &owner, expiry)?;
1075
1076        self.leases.remove(run_id);
1077        self.lease_metadata.remove(run_id);
1078
1079        if current_state == RunState::Leased {
1080            self.transition_run_to_ready_after_lease_close(run_id)?;
1081        }
1082
1083        Ok(())
1084    }
1085
1086    fn apply_lease_released(
1087        &mut self,
1088        run_id: &actionqueue_core::ids::RunId,
1089        owner: String,
1090        expiry: u64,
1091        _timestamp: u64,
1092    ) -> Result<(), ReplayReducerError> {
1093        let current_state =
1094            self.validate_lease_run_precondition(run_id, LeaseEventKind::Release)?;
1095
1096        self.validate_exact_lease_metadata_match(run_id, LeaseEventKind::Release, &owner, expiry)?;
1097
1098        self.leases.remove(run_id);
1099        self.lease_metadata.remove(run_id);
1100
1101        if current_state == RunState::Leased {
1102            self.transition_run_to_ready_after_lease_close(run_id)?;
1103        }
1104
1105        Ok(())
1106    }
1107
1108    /// Validates that a lease event references a known run and that the run is in a state
1109    /// allowed for the lease event kind.
1110    fn validate_lease_run_precondition(
1111        &self,
1112        run_id: &RunId,
1113        event: LeaseEventKind,
1114    ) -> Result<RunState, ReplayReducerError> {
1115        let run_state =
1116            self.runs.get(run_id).copied().ok_or(ReplayReducerError::LeaseCausality(
1117                LeaseCausalityError::UnknownRun { run_id: *run_id, event },
1118            ))?;
1119
1120        if !Self::is_allowed_lease_event_state(event, run_state) {
1121            return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::InvalidRunState {
1122                run_id: *run_id,
1123                event,
1124                state: run_state,
1125            }));
1126        }
1127
1128        Ok(run_state)
1129    }
1130
1131    /// Returns true when `run_state` is an allowed precondition for the lease `event`.
1132    fn is_allowed_lease_event_state(event: LeaseEventKind, run_state: RunState) -> bool {
1133        match event {
1134            LeaseEventKind::Acquire => matches!(run_state, RunState::Ready | RunState::Leased),
1135            LeaseEventKind::Heartbeat => matches!(run_state, RunState::Leased | RunState::Running),
1136            LeaseEventKind::Expire | LeaseEventKind::Release => {
1137                matches!(run_state, RunState::Ready | RunState::Leased | RunState::Running)
1138            }
1139        }
1140    }
1141
1142    /// Returns a cloned snapshot of the current active lease metadata for a run.
1143    fn active_lease_snapshot(
1144        &self,
1145        run_id: &RunId,
1146        event: LeaseEventKind,
1147    ) -> Result<(String, u64), ReplayReducerError> {
1148        self.leases.get(run_id).cloned().ok_or(ReplayReducerError::LeaseCausality(
1149            LeaseCausalityError::MissingActiveLease { run_id: *run_id, event },
1150        ))
1151    }
1152
1153    /// Requires exact owner and expiry equality between lease event metadata and projection.
1154    fn validate_exact_lease_metadata_match(
1155        &self,
1156        run_id: &RunId,
1157        event: LeaseEventKind,
1158        owner: &str,
1159        expiry: u64,
1160    ) -> Result<(), ReplayReducerError> {
1161        let (expected_owner, expected_expiry) = self.active_lease_snapshot(run_id, event)?;
1162
1163        if owner != expected_owner {
1164            return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::OwnerMismatch {
1165                run_id: *run_id,
1166                event,
1167                expected_owner,
1168                actual_owner: owner.to_string(),
1169            }));
1170        }
1171
1172        if expiry != expected_expiry {
1173            return Err(ReplayReducerError::LeaseCausality(LeaseCausalityError::ExpiryMismatch {
1174                run_id: *run_id,
1175                event,
1176                expected_expiry,
1177                actual_expiry: expiry,
1178            }));
1179        }
1180
1181        Ok(())
1182    }
1183
1184    /// Transitions the run projection from `Leased` to `Ready` after a lease close event.
1185    fn transition_run_to_ready_after_lease_close(
1186        &mut self,
1187        run_id: &RunId,
1188    ) -> Result<(), ReplayReducerError> {
1189        let run_instance =
1190            self.run_instances.get_mut(run_id).ok_or(ReplayReducerError::CorruptedData)?;
1191
1192        if run_instance.state() != RunState::Leased {
1193            return Err(ReplayReducerError::CorruptedData);
1194        }
1195
1196        run_instance.transition_to(RunState::Ready).map_err(Self::map_run_instance_error)?;
1197        self.runs.insert(*run_id, RunState::Ready);
1198        Ok(())
1199    }
1200
1201    fn apply_engine_paused(&mut self, timestamp: u64) -> Result<(), ReplayReducerError> {
1202        if self.engine_paused {
1203            return Err(ReplayReducerError::EngineControlCausality(
1204                EngineControlCausalityError::AlreadyPaused {
1205                    first_paused_at: self.engine_paused_at,
1206                    duplicate_paused_at: timestamp,
1207                },
1208            ));
1209        }
1210
1211        self.engine_paused = true;
1212        self.engine_paused_at = Some(timestamp);
1213        self.engine_resumed_at = None;
1214        Ok(())
1215    }
1216
1217    fn apply_engine_resumed(&mut self, timestamp: u64) -> Result<(), ReplayReducerError> {
1218        if !self.engine_paused {
1219            return Err(ReplayReducerError::EngineControlCausality(
1220                EngineControlCausalityError::NotPaused { attempted_resumed_at: timestamp },
1221            ));
1222        }
1223
1224        if let Some(paused_at) = self.engine_paused_at {
1225            if timestamp < paused_at {
1226                return Err(ReplayReducerError::EngineControlCausality(
1227                    EngineControlCausalityError::ResumeBeforePause {
1228                        paused_at,
1229                        resumed_at: timestamp,
1230                    },
1231                ));
1232            }
1233        }
1234
1235        self.engine_paused = false;
1236        self.engine_resumed_at = Some(timestamp);
1237        Ok(())
1238    }
1239
1240    fn apply_dependency_declared(&mut self, task_id: TaskId, depends_on: &[TaskId]) {
1241        if !self.tasks.contains_key(&task_id) {
1242            tracing::warn!(
1243                %task_id,
1244                "dependency declaration for unknown task during WAL replay"
1245            );
1246        }
1247        for prereq in depends_on {
1248            if !self.tasks.contains_key(prereq) {
1249                tracing::warn!(
1250                    %task_id,
1251                    prereq_id = %prereq,
1252                    "dependency prerequisite references unknown task during WAL replay"
1253                );
1254            }
1255        }
1256        // NOTE: infallible by design — dependency accumulation cannot fail
1257        let entry = self.dependency_declarations.entry(task_id).or_default();
1258        for &prereq in depends_on {
1259            entry.insert(prereq);
1260        }
1261    }
1262
1263    /// Clears active lease projection whenever a run enters terminal state and asserts that
1264    /// terminal/lease coexistence is impossible post-apply.
1265    fn clear_lease_projection_if_terminal(
1266        &mut self,
1267        run_id: &RunId,
1268        new_state: RunState,
1269    ) -> Result<(), ReplayReducerError> {
1270        if !new_state.is_terminal() {
1271            return Ok(());
1272        }
1273
1274        self.leases.remove(run_id);
1275        self.lease_metadata.remove(run_id);
1276
1277        Ok(())
1278    }
1279
1280    /// Removes history entries for runs that are in terminal state (Completed, Failed, Canceled).
1281    ///
1282    /// This should be called after a snapshot write successfully captures the current state,
1283    /// since the snapshot will contain the full history for these terminal runs.
1284    /// Active (non-terminal) run histories are preserved intact.
1285    pub fn trim_terminal_history(&mut self) {
1286        let terminal_run_ids: Vec<RunId> =
1287            self.runs.iter().filter(|(_, state)| state.is_terminal()).map(|(id, _)| *id).collect();
1288
1289        for run_id in &terminal_run_ids {
1290            self.run_history.remove(run_id);
1291            self.attempt_history.remove(run_id);
1292            self.lease_metadata.remove(run_id);
1293        }
1294    }
1295
1296    /// Seeds the run history for a run from validated snapshot data.
1297    pub(crate) fn set_run_history(&mut self, run_id: RunId, history: Vec<RunStateHistoryEntry>) {
1298        self.run_history.insert(run_id, history);
1299    }
1300
1301    /// Seeds the attempt history for a run from validated snapshot data.
1302    pub(crate) fn set_attempt_history(
1303        &mut self,
1304        run_id: RunId,
1305        attempts: Vec<AttemptHistoryEntry>,
1306    ) {
1307        self.attempt_history.insert(run_id, attempts);
1308    }
1309
1310    /// Seeds both the active lease projection and lease metadata from snapshot data.
1311    ///
1312    /// During snapshot bootstrap, `set_lease_metadata` alone is insufficient because
1313    /// `get_lease()` queries the `leases` map (not `lease_metadata`). This method
1314    /// populates both maps so that post-bootstrap lease queries (e.g. heartbeat
1315    /// causality checks) find the correct owner/expiry state.
1316    pub(crate) fn set_lease_for_bootstrap(&mut self, run_id: RunId, metadata: LeaseMetadata) {
1317        self.leases.insert(run_id, (metadata.owner.clone(), metadata.expiry));
1318        self.lease_metadata.insert(run_id, metadata);
1319    }
1320
1321    /// Sets reducer sequence during trusted bootstrap hydration.
1322    pub(crate) fn set_latest_sequence_for_bootstrap(&mut self, sequence: u64) {
1323        self.latest_sequence = sequence;
1324    }
1325
1326    fn map_run_instance_error(error: RunInstanceError) -> ReplayReducerError {
1327        match error {
1328            RunInstanceError::AttemptCountOverflow { .. } => ReplayReducerError::CorruptedData,
1329            _ => ReplayReducerError::InvalidTransition,
1330        }
1331    }
1332
1333    /// Returns an iterator over all budget records.
1334    pub fn budgets(
1335        &self,
1336    ) -> impl Iterator<Item = (&(actionqueue_core::ids::TaskId, BudgetDimension), &BudgetRecord)>
1337    {
1338        self.budgets.iter()
1339    }
1340
1341    /// Returns the budget record for a specific (task, dimension) pair, if any.
1342    pub fn get_budget(
1343        &self,
1344        task_id: &actionqueue_core::ids::TaskId,
1345        dimension: BudgetDimension,
1346    ) -> Option<&BudgetRecord> {
1347        self.budgets.get(&(*task_id, dimension))
1348    }
1349
1350    /// Returns true when the specified budget has been exhausted.
1351    pub fn is_budget_exhausted(
1352        &self,
1353        task_id: actionqueue_core::ids::TaskId,
1354        dimension: BudgetDimension,
1355    ) -> bool {
1356        self.budgets.get(&(task_id, dimension)).is_some_and(|r| r.exhausted)
1357    }
1358
1359    /// Returns true when a budget allocation exists for the specified (task, dimension) pair.
1360    pub fn budget_allocation_exists(
1361        &self,
1362        task_id: actionqueue_core::ids::TaskId,
1363        dimension: BudgetDimension,
1364    ) -> bool {
1365        self.budgets.contains_key(&(task_id, dimension))
1366    }
1367
1368    /// Returns an iterator over all subscription records.
1369    pub fn subscriptions(&self) -> impl Iterator<Item = (&SubscriptionId, &SubscriptionRecord)> {
1370        self.subscriptions.iter()
1371    }
1372
1373    /// Returns the subscription record for the given identifier, if any.
1374    pub fn get_subscription(
1375        &self,
1376        subscription_id: &SubscriptionId,
1377    ) -> Option<&SubscriptionRecord> {
1378        self.subscriptions.get(subscription_id)
1379    }
1380
1381    /// Returns true when the specified subscription exists in the projection.
1382    pub fn subscription_exists(&self, subscription_id: SubscriptionId) -> bool {
1383        self.subscriptions.contains_key(&subscription_id)
1384    }
1385
1386    /// Returns true when the specified subscription has been canceled.
1387    pub fn is_subscription_canceled(&self, subscription_id: SubscriptionId) -> bool {
1388        self.subscriptions.get(&subscription_id).is_some_and(|r| r.canceled_at.is_some())
1389    }
1390
1391    /// Returns an iterator over all actor records.
1392    pub fn actors(&self) -> impl Iterator<Item = (&ActorId, &ActorRecord)> {
1393        self.actors.iter()
1394    }
1395
1396    /// Returns the actor record for the given identifier, if any.
1397    pub fn get_actor(&self, actor_id: &ActorId) -> Option<&ActorRecord> {
1398        self.actors.get(actor_id)
1399    }
1400
1401    /// Returns true when the actor is registered and active.
1402    pub fn is_actor_active(&self, actor_id: ActorId) -> bool {
1403        self.actors.get(&actor_id).is_some_and(|r| r.deregistered_at.is_none())
1404    }
1405
1406    /// Returns an iterator over all tenant records.
1407    pub fn tenants(&self) -> impl Iterator<Item = (&TenantId, &TenantRecord)> {
1408        self.tenants.iter()
1409    }
1410
1411    /// Returns the tenant record for the given identifier, if any.
1412    pub fn get_tenant(&self, tenant_id: &TenantId) -> Option<&TenantRecord> {
1413        self.tenants.get(tenant_id)
1414    }
1415
1416    /// Returns true when the tenant exists in the projection.
1417    pub fn tenant_exists(&self, tenant_id: TenantId) -> bool {
1418        self.tenants.contains_key(&tenant_id)
1419    }
1420
1421    /// Returns the role assigned to an actor in a tenant, if any.
1422    pub fn get_role_assignment(
1423        &self,
1424        actor_id: ActorId,
1425        tenant_id: TenantId,
1426    ) -> Option<&RoleAssignmentRecord> {
1427        self.role_assignments.get(&(actor_id, tenant_id))
1428    }
1429
1430    /// Returns true when an actor has a given capability in a tenant.
1431    pub fn actor_has_capability(
1432        &self,
1433        actor_id: ActorId,
1434        capability_key: &str,
1435        tenant_id: TenantId,
1436    ) -> bool {
1437        let key = (actor_id, capability_key.to_string(), tenant_id);
1438        self.capability_grants.get(&key).is_some_and(|r| r.revoked_at.is_none())
1439    }
1440
1441    /// Returns an iterator over all ledger entry records.
1442    /// Returns an iterator over all role assignment records.
1443    pub fn role_assignments(&self) -> impl Iterator<Item = &RoleAssignmentRecord> {
1444        self.role_assignments.values()
1445    }
1446
1447    /// Returns an iterator over all capability grant records.
1448    pub fn capability_grants(&self) -> impl Iterator<Item = &CapabilityGrantRecord> {
1449        self.capability_grants.values()
1450    }
1451
1452    pub fn ledger_entries(&self) -> impl Iterator<Item = &LedgerEntryRecord> {
1453        self.ledger_entries.iter()
1454    }
1455
1456    fn apply_budget_allocated(
1457        &mut self,
1458        task_id: actionqueue_core::ids::TaskId,
1459        dimension: BudgetDimension,
1460        limit: u64,
1461        timestamp: u64,
1462    ) {
1463        self.budgets.insert(
1464            (task_id, dimension),
1465            BudgetRecord {
1466                dimension,
1467                limit,
1468                consumed: 0,
1469                allocated_at: timestamp,
1470                exhausted: false,
1471            },
1472        );
1473    }
1474
1475    fn apply_budget_consumed(
1476        &mut self,
1477        task_id: actionqueue_core::ids::TaskId,
1478        dimension: BudgetDimension,
1479        amount: u64,
1480        _timestamp: u64,
1481    ) {
1482        if let Some(record) = self.budgets.get_mut(&(task_id, dimension)) {
1483            record.consumed = record.consumed.saturating_add(amount);
1484            if record.consumed >= record.limit {
1485                record.exhausted = true;
1486            }
1487        }
1488    }
1489
1490    // NOTE: `BudgetExhausted` WAL events are never appended to the WAL on disk.
1491    // Exhaustion is normally a derived projection: `apply_budget_consumed` sets
1492    // `exhausted = true` when `consumed >= limit`. The `BudgetExhausted` event
1493    // may be synthesized during snapshot bootstrap for reducer replay (see
1494    // `bootstrap.rs`) to restore the flag when consumed < limit.
1495
1496    fn apply_budget_replenished(
1497        &mut self,
1498        task_id: actionqueue_core::ids::TaskId,
1499        dimension: BudgetDimension,
1500        new_limit: u64,
1501        _timestamp: u64,
1502    ) {
1503        if let Some(record) = self.budgets.get_mut(&(task_id, dimension)) {
1504            record.limit = new_limit;
1505            record.consumed = 0;
1506            record.exhausted = false;
1507        }
1508    }
1509
1510    fn apply_subscription_created(
1511        &mut self,
1512        subscription_id: SubscriptionId,
1513        task_id: actionqueue_core::ids::TaskId,
1514        filter: EventFilter,
1515        timestamp: u64,
1516    ) {
1517        self.subscriptions.insert(
1518            subscription_id,
1519            SubscriptionRecord {
1520                subscription_id,
1521                task_id,
1522                filter,
1523                created_at: timestamp,
1524                triggered_at: None,
1525                canceled_at: None,
1526            },
1527        );
1528    }
1529
1530    fn apply_subscription_triggered(&mut self, subscription_id: SubscriptionId, timestamp: u64) {
1531        if let Some(record) = self.subscriptions.get_mut(&subscription_id) {
1532            record.triggered_at = Some(timestamp);
1533        }
1534    }
1535
1536    fn apply_subscription_canceled(&mut self, subscription_id: SubscriptionId, timestamp: u64) {
1537        if let Some(record) = self.subscriptions.get_mut(&subscription_id) {
1538            record.canceled_at = Some(timestamp);
1539        }
1540    }
1541}
1542
1543impl Default for ReplayReducer {
1544    fn default() -> Self {
1545        Self::new()
1546    }
1547}
1548
1549/// Errors that can occur during replay reduction.
1550#[derive(Debug, Clone, PartialEq, Eq)]
1551pub enum ReplayReducerError {
1552    /// Invalid state transition during replay.
1553    InvalidTransition,
1554    /// Duplicate event detected.
1555    DuplicateEvent,
1556    /// Corrupted event data.
1557    CorruptedData,
1558    /// Lease replay causality invariant violation.
1559    LeaseCausality(LeaseCausalityError),
1560    /// Task replay causality invariant violation.
1561    TaskCausality(TaskCausalityError),
1562    /// Engine control replay causality invariant violation.
1563    EngineControlCausality(EngineControlCausalityError),
1564}
1565
1566/// Typed engine control replay causality failures.
1567#[derive(Debug, Clone, PartialEq, Eq)]
1568pub enum EngineControlCausalityError {
1569    /// Engine pause event was applied while already paused.
1570    AlreadyPaused {
1571        /// Timestamp of previously applied pause event, if known.
1572        first_paused_at: Option<u64>,
1573        /// Timestamp carried by duplicate pause event.
1574        duplicate_paused_at: u64,
1575    },
1576    /// Engine resume event was applied while engine is not paused.
1577    NotPaused {
1578        /// Timestamp carried by invalid resume event.
1579        attempted_resumed_at: u64,
1580    },
1581    /// Engine resume timestamp violated pause->resume ordering.
1582    ResumeBeforePause {
1583        /// Timestamp of currently active pause event.
1584        paused_at: u64,
1585        /// Timestamp carried by resume event.
1586        resumed_at: u64,
1587    },
1588}
1589
1590impl std::fmt::Display for EngineControlCausalityError {
1591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1592        match self {
1593            EngineControlCausalityError::AlreadyPaused { first_paused_at, duplicate_paused_at } => {
1594                if let Some(first_paused_at) = first_paused_at {
1595                    write!(
1596                        f,
1597                        "engine pause rejected: already paused at {first_paused_at}, duplicate \
1598                         timestamp {duplicate_paused_at}"
1599                    )
1600                } else {
1601                    write!(
1602                        f,
1603                        "engine pause rejected: already paused, duplicate timestamp \
1604                         {duplicate_paused_at}"
1605                    )
1606                }
1607            }
1608            EngineControlCausalityError::NotPaused { attempted_resumed_at } => {
1609                write!(
1610                    f,
1611                    "engine resume rejected: engine not paused at timestamp {attempted_resumed_at}"
1612                )
1613            }
1614            EngineControlCausalityError::ResumeBeforePause { paused_at, resumed_at } => {
1615                write!(
1616                    f,
1617                    "engine resume rejected: resumed_at {resumed_at} precedes paused_at \
1618                     {paused_at}"
1619                )
1620            }
1621        }
1622    }
1623}
1624
1625/// Typed task replay causality failures.
1626#[derive(Debug, Clone, PartialEq, Eq)]
1627pub enum TaskCausalityError {
1628    /// Task cancellation references a task ID that has not been created.
1629    UnknownTask {
1630        /// Task ID referenced by the event.
1631        task_id: TaskId,
1632    },
1633    /// Task cancellation for a task that is already canceled.
1634    AlreadyCanceled {
1635        /// Canceled task identifier.
1636        task_id: TaskId,
1637        /// First applied cancellation timestamp.
1638        first_canceled_at: u64,
1639        /// Duplicate cancellation timestamp from the current event.
1640        duplicate_canceled_at: u64,
1641    },
1642}
1643
1644impl std::fmt::Display for TaskCausalityError {
1645    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1646        match self {
1647            TaskCausalityError::UnknownTask { task_id } => {
1648                write!(f, "task canceled rejected: unknown task {task_id}")
1649            }
1650            TaskCausalityError::AlreadyCanceled {
1651                task_id,
1652                first_canceled_at,
1653                duplicate_canceled_at,
1654            } => {
1655                write!(
1656                    f,
1657                    "task canceled rejected for task {task_id}: already canceled at \
1658                     {first_canceled_at}, duplicate timestamp {duplicate_canceled_at}"
1659                )
1660            }
1661        }
1662    }
1663}
1664
1665/// Lease event kinds used by typed causality errors.
1666#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1667pub enum LeaseEventKind {
1668    /// Lease acquired event.
1669    Acquire,
1670    /// Lease heartbeat event.
1671    Heartbeat,
1672    /// Lease expired event.
1673    Expire,
1674    /// Lease released event.
1675    Release,
1676}
1677
1678impl std::fmt::Display for LeaseEventKind {
1679    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1680        match self {
1681            LeaseEventKind::Acquire => write!(f, "lease acquire"),
1682            LeaseEventKind::Heartbeat => write!(f, "lease heartbeat"),
1683            LeaseEventKind::Expire => write!(f, "lease expire"),
1684            LeaseEventKind::Release => write!(f, "lease release"),
1685        }
1686    }
1687}
1688
1689/// Typed lease replay causality failures.
1690#[derive(Debug, Clone, PartialEq, Eq)]
1691pub enum LeaseCausalityError {
1692    /// Lease event references a run ID that has not been created.
1693    UnknownRun {
1694        /// Run ID referenced by the event.
1695        run_id: RunId,
1696        /// Lease event kind being validated.
1697        event: LeaseEventKind,
1698    },
1699    /// Lease event is not allowed from the run's current state.
1700    InvalidRunState {
1701        /// Run ID referenced by the event.
1702        run_id: RunId,
1703        /// Lease event kind being validated.
1704        event: LeaseEventKind,
1705        /// Current run state at validation time.
1706        state: RunState,
1707    },
1708    /// Lease mutation event requires an active lease projection but none exists.
1709    MissingActiveLease {
1710        /// Run ID referenced by the event.
1711        run_id: RunId,
1712        /// Lease event kind being validated.
1713        event: LeaseEventKind,
1714    },
1715    /// Lease acquire encountered an already-active lease for the run.
1716    LeaseAlreadyActive {
1717        /// Run ID with an existing active lease.
1718        run_id: RunId,
1719    },
1720    /// Lease event owner did not match active lease owner.
1721    OwnerMismatch {
1722        /// Run ID referenced by the event.
1723        run_id: RunId,
1724        /// Lease event kind being validated.
1725        event: LeaseEventKind,
1726        /// Owner currently projected as active.
1727        expected_owner: String,
1728        /// Owner carried by the event payload.
1729        actual_owner: String,
1730    },
1731    /// Lease event expiry did not match active lease expiry.
1732    ExpiryMismatch {
1733        /// Run ID referenced by the event.
1734        run_id: RunId,
1735        /// Lease event kind being validated.
1736        event: LeaseEventKind,
1737        /// Expiry currently projected as active.
1738        expected_expiry: u64,
1739        /// Expiry carried by the event payload.
1740        actual_expiry: u64,
1741    },
1742    /// Heartbeat attempted to regress lease expiry.
1743    NonMonotonicHeartbeatExpiry {
1744        /// Run ID referenced by the event.
1745        run_id: RunId,
1746        /// Current projected expiry.
1747        previous_expiry: u64,
1748        /// Heartbeat proposed expiry.
1749        proposed_expiry: u64,
1750    },
1751}
1752
1753impl std::fmt::Display for LeaseCausalityError {
1754    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1755        match self {
1756            LeaseCausalityError::UnknownRun { run_id, event } => {
1757                write!(f, "{event} rejected: unknown run {run_id}")
1758            }
1759            LeaseCausalityError::InvalidRunState { run_id, event, state } => {
1760                write!(f, "{event} rejected for run {run_id}: invalid state {state:?}")
1761            }
1762            LeaseCausalityError::MissingActiveLease { run_id, event } => {
1763                write!(f, "{event} rejected for run {run_id}: missing active lease")
1764            }
1765            LeaseCausalityError::LeaseAlreadyActive { run_id } => {
1766                write!(f, "lease acquire rejected for run {run_id}: lease already active")
1767            }
1768            LeaseCausalityError::OwnerMismatch { run_id, event, expected_owner, actual_owner } => {
1769                write!(
1770                    f,
1771                    "{event} rejected for run {run_id}: owner mismatch expected={expected_owner} \
1772                     actual={actual_owner}"
1773                )
1774            }
1775            LeaseCausalityError::ExpiryMismatch {
1776                run_id,
1777                event,
1778                expected_expiry,
1779                actual_expiry,
1780            } => {
1781                write!(
1782                    f,
1783                    "{event} rejected for run {run_id}: expiry mismatch \
1784                     expected={expected_expiry} actual={actual_expiry}"
1785                )
1786            }
1787            LeaseCausalityError::NonMonotonicHeartbeatExpiry {
1788                run_id,
1789                previous_expiry,
1790                proposed_expiry,
1791            } => {
1792                write!(
1793                    f,
1794                    "lease heartbeat rejected for run {run_id}: expiry regression \
1795                     previous={previous_expiry} proposed={proposed_expiry}"
1796                )
1797            }
1798        }
1799    }
1800}
1801
1802impl std::fmt::Display for ReplayReducerError {
1803    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1804        match self {
1805            ReplayReducerError::InvalidTransition => {
1806                write!(f, "Invalid state transition during replay")
1807            }
1808            ReplayReducerError::DuplicateEvent => write!(f, "Duplicate event detected"),
1809            ReplayReducerError::CorruptedData => write!(f, "Corrupted event data"),
1810            ReplayReducerError::LeaseCausality(details) => {
1811                write!(f, "Lease causality violation during replay: {details}")
1812            }
1813            ReplayReducerError::TaskCausality(details) => {
1814                write!(f, "Task causality violation during replay: {details}")
1815            }
1816            ReplayReducerError::EngineControlCausality(details) => {
1817                write!(f, "Engine control causality violation during replay: {details}")
1818            }
1819        }
1820    }
1821}
1822
1823impl std::error::Error for ReplayReducerError {}
1824
1825/// Returns a stable string key for a `Capability`, used as HashMap key.
1826fn capability_key(cap: &Capability) -> String {
1827    match cap {
1828        Capability::CanSubmit => "CanSubmit".to_string(),
1829        Capability::CanExecute => "CanExecute".to_string(),
1830        Capability::CanReview => "CanReview".to_string(),
1831        Capability::CanApprove => "CanApprove".to_string(),
1832        Capability::CanCancel => "CanCancel".to_string(),
1833        Capability::Custom(s) => format!("Custom:{s}"),
1834    }
1835}
1836
1837#[cfg(test)]
1838mod tests {
1839    use actionqueue_core::ids::{AttemptId, RunId, TaskId};
1840    use actionqueue_core::mutation::AttemptResultKind;
1841    use actionqueue_core::run::run_instance::RunInstance;
1842    use actionqueue_core::run::state::RunState;
1843    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
1844
1845    use super::*;
1846
1847    fn test_task_spec(id: u128) -> TaskSpec {
1848        TaskSpec::new(
1849            TaskId::from_uuid(uuid::Uuid::from_u128(id)),
1850            TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
1851            actionqueue_core::task::run_policy::RunPolicy::Once,
1852            actionqueue_core::task::constraints::TaskConstraints::default(),
1853            actionqueue_core::task::metadata::TaskMetadata::default(),
1854        )
1855        .expect("test task spec should be valid")
1856    }
1857
1858    fn event(seq: u64, event: WalEventType) -> WalEvent {
1859        WalEvent::new(seq, event)
1860    }
1861
1862    /// Drives a run from Scheduled through to Completed via the standard lifecycle:
1863    /// Scheduled -> Ready -> Leased -> Running -> (attempt start/finish) -> Completed.
1864    fn drive_run_to_completed(reducer: &mut ReplayReducer, run_id: RunId, seq_start: u64) -> u64 {
1865        let attempt_id =
1866            AttemptId::from_uuid(uuid::Uuid::from_u128(run_id.as_uuid().as_u128() + 1));
1867        let mut seq = seq_start;
1868
1869        reducer
1870            .apply(&event(
1871                seq,
1872                WalEventType::RunStateChanged {
1873                    run_id,
1874                    previous_state: RunState::Scheduled,
1875                    new_state: RunState::Ready,
1876                    timestamp: 100,
1877                },
1878            ))
1879            .expect("scheduled->ready");
1880        seq += 1;
1881
1882        reducer
1883            .apply(&event(
1884                seq,
1885                WalEventType::RunStateChanged {
1886                    run_id,
1887                    previous_state: RunState::Ready,
1888                    new_state: RunState::Leased,
1889                    timestamp: 200,
1890                },
1891            ))
1892            .expect("ready->leased");
1893        seq += 1;
1894
1895        reducer
1896            .apply(&event(
1897                seq,
1898                WalEventType::RunStateChanged {
1899                    run_id,
1900                    previous_state: RunState::Leased,
1901                    new_state: RunState::Running,
1902                    timestamp: 300,
1903                },
1904            ))
1905            .expect("leased->running");
1906        seq += 1;
1907
1908        reducer
1909            .apply(&event(seq, WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 400 }))
1910            .expect("attempt started");
1911        seq += 1;
1912
1913        reducer
1914            .apply(&event(
1915                seq,
1916                WalEventType::AttemptFinished {
1917                    run_id,
1918                    attempt_id,
1919                    result: AttemptResultKind::Success,
1920                    error: None,
1921                    output: None,
1922                    timestamp: 500,
1923                },
1924            ))
1925            .expect("attempt finished");
1926        seq += 1;
1927
1928        reducer
1929            .apply(&event(
1930                seq,
1931                WalEventType::RunStateChanged {
1932                    run_id,
1933                    previous_state: RunState::Running,
1934                    new_state: RunState::Completed,
1935                    timestamp: 600,
1936                },
1937            ))
1938            .expect("running->completed");
1939        seq += 1;
1940
1941        seq
1942    }
1943
1944    /// Drives a run from Scheduled through to Failed via the standard lifecycle:
1945    /// Scheduled -> Ready -> Leased -> Running -> (attempt start/finish) -> Failed.
1946    fn drive_run_to_failed(reducer: &mut ReplayReducer, run_id: RunId, seq_start: u64) -> u64 {
1947        let attempt_id =
1948            AttemptId::from_uuid(uuid::Uuid::from_u128(run_id.as_uuid().as_u128() + 1));
1949        let mut seq = seq_start;
1950
1951        reducer
1952            .apply(&event(
1953                seq,
1954                WalEventType::RunStateChanged {
1955                    run_id,
1956                    previous_state: RunState::Scheduled,
1957                    new_state: RunState::Ready,
1958                    timestamp: 100,
1959                },
1960            ))
1961            .expect("scheduled->ready");
1962        seq += 1;
1963
1964        reducer
1965            .apply(&event(
1966                seq,
1967                WalEventType::RunStateChanged {
1968                    run_id,
1969                    previous_state: RunState::Ready,
1970                    new_state: RunState::Leased,
1971                    timestamp: 200,
1972                },
1973            ))
1974            .expect("ready->leased");
1975        seq += 1;
1976
1977        reducer
1978            .apply(&event(
1979                seq,
1980                WalEventType::RunStateChanged {
1981                    run_id,
1982                    previous_state: RunState::Leased,
1983                    new_state: RunState::Running,
1984                    timestamp: 300,
1985                },
1986            ))
1987            .expect("leased->running");
1988        seq += 1;
1989
1990        reducer
1991            .apply(&event(seq, WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 400 }))
1992            .expect("attempt started");
1993        seq += 1;
1994
1995        reducer
1996            .apply(&event(
1997                seq,
1998                WalEventType::AttemptFinished {
1999                    run_id,
2000                    attempt_id,
2001                    result: AttemptResultKind::Failure,
2002                    error: Some("test failure".to_string()),
2003                    output: None,
2004                    timestamp: 500,
2005                },
2006            ))
2007            .expect("attempt finished");
2008        seq += 1;
2009
2010        reducer
2011            .apply(&event(
2012                seq,
2013                WalEventType::RunStateChanged {
2014                    run_id,
2015                    previous_state: RunState::Running,
2016                    new_state: RunState::Failed,
2017                    timestamp: 600,
2018                },
2019            ))
2020            .expect("running->failed");
2021        seq += 1;
2022
2023        seq
2024    }
2025
2026    #[test]
2027    fn trim_terminal_history_removes_completed_run_history() {
2028        let mut reducer = ReplayReducer::new();
2029        let task = test_task_spec(0xA001);
2030        let task_id = task.id();
2031        let run_id = RunId::from_uuid(uuid::Uuid::from_u128(0xB001));
2032
2033        // Create task and run.
2034        reducer
2035            .apply(&event(1, WalEventType::TaskCreated { task_spec: task, timestamp: 10 }))
2036            .unwrap();
2037        let run = RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
2038            .expect("run should build");
2039        reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run })).unwrap();
2040
2041        // Drive to Completed.
2042        drive_run_to_completed(&mut reducer, run_id, 3);
2043
2044        // Precondition: history exists.
2045        assert!(reducer.get_run_history(&run_id).is_some());
2046        assert!(reducer.get_attempt_history(&run_id).is_some());
2047
2048        // Trim.
2049        reducer.trim_terminal_history();
2050
2051        // History should be removed for the terminal run.
2052        assert!(reducer.get_run_history(&run_id).is_none());
2053        assert!(reducer.get_attempt_history(&run_id).is_none());
2054        assert!(reducer.get_lease_metadata(&run_id).is_none());
2055
2056        // The run itself (state and instance) should still exist.
2057        assert_eq!(reducer.get_run_state(&run_id), Some(&RunState::Completed));
2058        assert!(reducer.get_run_instance(&run_id).is_some());
2059    }
2060
2061    #[test]
2062    fn trim_terminal_history_removes_failed_and_canceled_run_history() {
2063        let mut reducer = ReplayReducer::new();
2064
2065        // Failed run.
2066        let task_f = test_task_spec(0xA010);
2067        let task_id_f = task_f.id();
2068        let run_id_f = RunId::from_uuid(uuid::Uuid::from_u128(0xB010));
2069
2070        reducer
2071            .apply(&event(1, WalEventType::TaskCreated { task_spec: task_f, timestamp: 10 }))
2072            .unwrap();
2073        let run_f = RunInstance::new_scheduled_with_id(run_id_f, task_id_f, 1000, 1000)
2074            .expect("run should build");
2075        reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run_f })).unwrap();
2076        let seq = drive_run_to_failed(&mut reducer, run_id_f, 3);
2077
2078        // Canceled run.
2079        let task_c = test_task_spec(0xA020);
2080        let task_id_c = task_c.id();
2081        let run_id_c = RunId::from_uuid(uuid::Uuid::from_u128(0xB020));
2082
2083        reducer
2084            .apply(&event(seq, WalEventType::TaskCreated { task_spec: task_c, timestamp: 20 }))
2085            .unwrap();
2086        let run_c = RunInstance::new_scheduled_with_id(run_id_c, task_id_c, 1000, 1000)
2087            .expect("run should build");
2088        reducer.apply(&event(seq + 1, WalEventType::RunCreated { run_instance: run_c })).unwrap();
2089        reducer
2090            .apply(&event(
2091                seq + 2,
2092                WalEventType::RunStateChanged {
2093                    run_id: run_id_c,
2094                    previous_state: RunState::Scheduled,
2095                    new_state: RunState::Ready,
2096                    timestamp: 100,
2097                },
2098            ))
2099            .unwrap();
2100        reducer
2101            .apply(&event(seq + 3, WalEventType::RunCanceled { run_id: run_id_c, timestamp: 200 }))
2102            .unwrap();
2103
2104        // Precondition: both have history.
2105        assert!(reducer.get_run_history(&run_id_f).is_some());
2106        assert!(reducer.get_run_history(&run_id_c).is_some());
2107
2108        reducer.trim_terminal_history();
2109
2110        // Both should be trimmed.
2111        assert!(reducer.get_run_history(&run_id_f).is_none());
2112        assert!(reducer.get_attempt_history(&run_id_f).is_none());
2113        assert!(reducer.get_run_history(&run_id_c).is_none());
2114        assert!(reducer.get_attempt_history(&run_id_c).is_none());
2115
2116        // State projections remain.
2117        assert_eq!(reducer.get_run_state(&run_id_f), Some(&RunState::Failed));
2118        assert_eq!(reducer.get_run_state(&run_id_c), Some(&RunState::Canceled));
2119    }
2120
2121    #[test]
2122    fn trim_terminal_history_preserves_active_run_history() {
2123        let mut reducer = ReplayReducer::new();
2124        let task = test_task_spec(0xA002);
2125        let task_id = task.id();
2126        let run_id = RunId::from_uuid(uuid::Uuid::from_u128(0xB002));
2127
2128        // Create task and run (stays in Scheduled — a non-terminal state).
2129        reducer
2130            .apply(&event(1, WalEventType::TaskCreated { task_spec: task, timestamp: 10 }))
2131            .unwrap();
2132        let run = RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
2133            .expect("run should build");
2134        reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run })).unwrap();
2135
2136        // Advance to Ready (still non-terminal).
2137        reducer
2138            .apply(&event(
2139                3,
2140                WalEventType::RunStateChanged {
2141                    run_id,
2142                    previous_state: RunState::Scheduled,
2143                    new_state: RunState::Ready,
2144                    timestamp: 100,
2145                },
2146            ))
2147            .unwrap();
2148
2149        // Precondition: history exists with 2 entries (Scheduled, Ready).
2150        let history = reducer.get_run_history(&run_id).expect("history should exist");
2151        assert_eq!(history.len(), 2);
2152
2153        // Trim should be a no-op for this active run.
2154        reducer.trim_terminal_history();
2155
2156        let history = reducer.get_run_history(&run_id).expect("history should still exist");
2157        assert_eq!(history.len(), 2);
2158        assert!(reducer.get_attempt_history(&run_id).is_some());
2159    }
2160
2161    #[test]
2162    fn trim_terminal_history_no_op_when_no_terminal_runs() {
2163        let mut reducer = ReplayReducer::new();
2164        let task = test_task_spec(0xA003);
2165        let task_id = task.id();
2166        let run_id = RunId::from_uuid(uuid::Uuid::from_u128(0xB003));
2167
2168        reducer
2169            .apply(&event(1, WalEventType::TaskCreated { task_spec: task, timestamp: 10 }))
2170            .unwrap();
2171        let run = RunInstance::new_scheduled_with_id(run_id, task_id, 1000, 1000)
2172            .expect("run should build");
2173        reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run })).unwrap();
2174
2175        // Pre-trim state.
2176        assert_eq!(reducer.run_count(), 1);
2177        let run_history_len =
2178            reducer.get_run_history(&run_id).expect("run history should exist").len();
2179        let attempt_history_len =
2180            reducer.get_attempt_history(&run_id).expect("attempt history should exist").len();
2181
2182        // Trim.
2183        reducer.trim_terminal_history();
2184
2185        // Post-trim state should be identical.
2186        assert_eq!(reducer.run_count(), 1);
2187        assert_eq!(
2188            reducer.get_run_history(&run_id).expect("run history should still exist").len(),
2189            run_history_len
2190        );
2191        assert_eq!(
2192            reducer.get_attempt_history(&run_id).expect("attempt history should still exist").len(),
2193            attempt_history_len
2194        );
2195    }
2196
2197    #[test]
2198    fn trim_terminal_history_mixed_terminal_and_active() {
2199        let mut reducer = ReplayReducer::new();
2200
2201        // Terminal run (completed).
2202        let task_t = test_task_spec(0xA004);
2203        let task_id_t = task_t.id();
2204        let run_id_terminal = RunId::from_uuid(uuid::Uuid::from_u128(0xB004));
2205
2206        reducer
2207            .apply(&event(1, WalEventType::TaskCreated { task_spec: task_t, timestamp: 10 }))
2208            .unwrap();
2209        let run_t = RunInstance::new_scheduled_with_id(run_id_terminal, task_id_t, 1000, 1000)
2210            .expect("run should build");
2211        reducer.apply(&event(2, WalEventType::RunCreated { run_instance: run_t })).unwrap();
2212        let seq = drive_run_to_completed(&mut reducer, run_id_terminal, 3);
2213
2214        // Active run (stays in Ready).
2215        let task_a = test_task_spec(0xA005);
2216        let task_id_a = task_a.id();
2217        let run_id_active = RunId::from_uuid(uuid::Uuid::from_u128(0xB005));
2218
2219        reducer
2220            .apply(&event(seq, WalEventType::TaskCreated { task_spec: task_a, timestamp: 20 }))
2221            .unwrap();
2222        let run_a = RunInstance::new_scheduled_with_id(run_id_active, task_id_a, 1000, 1000)
2223            .expect("run should build");
2224        reducer.apply(&event(seq + 1, WalEventType::RunCreated { run_instance: run_a })).unwrap();
2225        reducer
2226            .apply(&event(
2227                seq + 2,
2228                WalEventType::RunStateChanged {
2229                    run_id: run_id_active,
2230                    previous_state: RunState::Scheduled,
2231                    new_state: RunState::Ready,
2232                    timestamp: 100,
2233                },
2234            ))
2235            .unwrap();
2236
2237        // Trim.
2238        reducer.trim_terminal_history();
2239
2240        // Terminal run history removed.
2241        assert!(reducer.get_run_history(&run_id_terminal).is_none());
2242        assert!(reducer.get_attempt_history(&run_id_terminal).is_none());
2243
2244        // Active run history preserved.
2245        assert!(reducer.get_run_history(&run_id_active).is_some());
2246        assert!(reducer.get_attempt_history(&run_id_active).is_some());
2247        let active_history =
2248            reducer.get_run_history(&run_id_active).expect("active history should exist");
2249        assert_eq!(active_history.len(), 2); // Scheduled + Ready
2250    }
2251}