Skip to main content

actionqueue_storage/snapshot/
mapping.rs

1//! Explicit mapping boundary between snapshot wrappers and canonical core models.
2//!
3//! This module centralizes snapshot validation and run payload mapping so that
4//! snapshot persistence remains a strict projection of core semantic truth.
5
6use std::collections::HashSet;
7
8use actionqueue_core::budget::BudgetDimension;
9use actionqueue_core::ids::{RunId, TaskId};
10use actionqueue_core::run::run_instance::RunInstance as CoreRunInstance;
11use actionqueue_core::run::state::RunState;
12use actionqueue_core::run::transitions::is_valid_transition;
13use actionqueue_core::subscription::SubscriptionId;
14
15use crate::snapshot::model::{
16    Snapshot, SnapshotAttemptHistoryEntry, SnapshotEngineControl, SnapshotLeaseMetadata,
17    SnapshotRun, SnapshotRunStateHistoryEntry,
18};
19
20/// Current snapshot schema version accepted by the explicit mapping boundary.
21///
22/// Version history:
23/// - v4: Sprint 1 release (WAL v2, JSON snapshots)
24/// - v5: Sprint 2 additions — parent_task_id on TaskSpec, output on AttemptOutcome,
25///   required_capabilities on TaskConstraints
26/// - v6: Sprint 2 review — dependency declarations persisted in snapshots
27/// - v7: Sprint 3 — budgets, subscriptions, Suspended run state
28pub const SNAPSHOT_SCHEMA_VERSION: u32 = 8;
29
30/// Typed mapping and validation errors for snapshot/core parity enforcement.
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum SnapshotMappingError {
33    /// Snapshot metadata schema version is unknown to this mapping boundary.
34    UnsupportedSchemaVersion {
35        /// Schema version expected by the current implementation.
36        expected: u32,
37        /// Schema version found in the snapshot payload.
38        found: u32,
39    },
40    /// Snapshot metadata task count does not match payload contents.
41    TaskCountMismatch {
42        /// Declared task count in snapshot metadata.
43        declared: u64,
44        /// Actual number of task entries in snapshot payload.
45        actual: u64,
46    },
47    /// Snapshot metadata run count does not match payload contents.
48    RunCountMismatch {
49        /// Declared run count in snapshot metadata.
50        declared: u64,
51        /// Actual number of run entries in snapshot payload.
52        actual: u64,
53    },
54    /// Duplicate task identifier in snapshot payload.
55    DuplicateTaskId {
56        /// Duplicate task identifier.
57        task_id: TaskId,
58    },
59    /// Snapshot task has cancellation timestamp that violates causal ordering.
60    InvalidTaskCanceledAtCausality {
61        /// Task identifier associated with the invalid payload.
62        task_id: TaskId,
63        /// Task creation timestamp encoded in snapshot.
64        created_at: u64,
65        /// Task cancellation timestamp encoded in snapshot.
66        canceled_at: u64,
67    },
68    /// Duplicate run identifier in snapshot payload.
69    DuplicateRunId {
70        /// Duplicate run identifier.
71        run_id: RunId,
72    },
73    /// A run references a task that does not exist in the snapshot task set.
74    RunReferencesUnknownTask {
75        /// Run identifier with the invalid reference.
76        run_id: RunId,
77        /// Missing task identifier.
78        task_id: TaskId,
79    },
80    /// A run has a nil/invalid run identifier.
81    InvalidRunId {
82        /// Invalid run identifier value.
83        run_id: RunId,
84    },
85    /// A run has a nil/invalid task identifier.
86    InvalidTaskId {
87        /// Run identifier associated with the invalid task identifier.
88        run_id: RunId,
89        /// Invalid task identifier value.
90        task_id: TaskId,
91    },
92    /// A `Ready` run has impossible schedule causality (`scheduled_at > created_at`).
93    InvalidReadyScheduleCausality {
94        /// Run identifier associated with the invalid payload.
95        run_id: RunId,
96        /// Scheduled timestamp encoded in snapshot.
97        scheduled_at: u64,
98        /// Created timestamp encoded in snapshot.
99        created_at: u64,
100    },
101    /// Attempt lineage state is semantically inconsistent with run state.
102    InvalidAttemptLineageState {
103        /// Run identifier associated with the invalid payload.
104        run_id: RunId,
105        /// Run state that cannot carry an active attempt identifier.
106        state: RunState,
107    },
108    /// Attempt lineage count is inconsistent with active attempt payload.
109    InvalidAttemptLineageCount {
110        /// Run identifier associated with the invalid payload.
111        run_id: RunId,
112        /// Attempt count encoded in snapshot.
113        attempt_count: u32,
114    },
115    /// Snapshot run is missing the initial Scheduled history entry.
116    MissingInitialRunStateHistory {
117        /// Run identifier associated with the invalid payload.
118        run_id: RunId,
119    },
120    /// Snapshot run state history entry is invalid.
121    InvalidRunStateHistoryTransition {
122        /// Run identifier associated with the invalid payload.
123        run_id: RunId,
124        /// Transition source state.
125        from: RunState,
126        /// Transition target state.
127        to: RunState,
128    },
129    /// Snapshot run has invalid attempt history count.
130    InvalidAttemptHistoryCount {
131        /// Run identifier associated with the invalid payload.
132        run_id: RunId,
133        /// Attempt count encoded in snapshot.
134        attempt_count: u32,
135        /// Attempt history entries provided in snapshot.
136        history_len: usize,
137    },
138    /// Snapshot run has invalid active attempt history alignment.
139    InvalidActiveAttemptHistory {
140        /// Run identifier associated with the invalid payload.
141        run_id: RunId,
142    },
143    /// Snapshot run has lease metadata when not allowed by state.
144    InvalidLeasePresence {
145        /// Run identifier associated with the invalid payload.
146        run_id: RunId,
147        /// Run state that rejected lease presence.
148        state: RunState,
149    },
150    /// A dependency declaration references a task not present in the snapshot.
151    DependencyReferencesUnknownTask {
152        /// The task with the dependency declaration.
153        task_id: TaskId,
154        /// The prerequisite task not found in the snapshot.
155        prereq_id: TaskId,
156    },
157    /// A dependency declaration's declaring task is not present in the snapshot.
158    DependencyDeclarationUnknownTask {
159        /// The declaring task not found in the snapshot.
160        task_id: TaskId,
161    },
162    /// Duplicate dependency declaration for the same task_id.
163    DuplicateDependencyDeclaration {
164        /// The task with duplicated declarations.
165        task_id: TaskId,
166    },
167    /// Duplicate budget allocation for the same (task_id, dimension) pair.
168    DuplicateBudgetAllocation {
169        /// The task with duplicated allocations.
170        task_id: TaskId,
171        /// The duplicated dimension.
172        dimension: BudgetDimension,
173    },
174    /// Budget allocation references a task not present in the snapshot.
175    BudgetReferencesUnknownTask {
176        /// The task referenced by the budget.
177        task_id: TaskId,
178    },
179    /// Duplicate subscription identifier in snapshot.
180    DuplicateSubscriptionId {
181        /// The duplicated subscription identifier.
182        subscription_id: SubscriptionId,
183    },
184    /// Subscription references a task not present in the snapshot.
185    SubscriptionReferencesUnknownTask {
186        /// The subscription identifier.
187        subscription_id: SubscriptionId,
188        /// The missing task identifier.
189        task_id: TaskId,
190    },
191    /// Snapshot engine control has invalid paused causality.
192    InvalidEnginePausedCausality,
193    /// Snapshot engine control has invalid resumed causality.
194    InvalidEngineResumedCausality,
195    /// Snapshot engine control has invalid pause/resume ordering.
196    InvalidEnginePauseResumeOrdering {
197        /// Snapshot pause timestamp.
198        paused_at: u64,
199        /// Snapshot resume timestamp.
200        resumed_at: u64,
201    },
202}
203
204impl std::fmt::Display for SnapshotMappingError {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        match self {
207            Self::UnsupportedSchemaVersion { expected, found } => {
208                write!(f, "unsupported snapshot schema version: expected {expected}, found {found}")
209            }
210            Self::TaskCountMismatch { declared, actual } => {
211                write!(f, "snapshot task_count mismatch: declared {declared}, actual {actual}")
212            }
213            Self::RunCountMismatch { declared, actual } => {
214                write!(f, "snapshot run_count mismatch: declared {declared}, actual {actual}")
215            }
216            Self::DuplicateTaskId { task_id } => {
217                write!(f, "duplicate snapshot task id: {task_id}")
218            }
219            Self::InvalidTaskCanceledAtCausality { task_id, created_at, canceled_at } => write!(
220                f,
221                "snapshot task {task_id} has invalid canceled_at causality: canceled_at \
222                 ({canceled_at}) < created_at ({created_at})"
223            ),
224            Self::DuplicateRunId { run_id } => {
225                write!(f, "duplicate snapshot run id: {run_id}")
226            }
227            Self::RunReferencesUnknownTask { run_id, task_id } => {
228                write!(f, "snapshot run {run_id} references unknown task {task_id}")
229            }
230            Self::InvalidRunId { run_id } => {
231                write!(f, "snapshot run has invalid nil run id: {run_id}")
232            }
233            Self::InvalidTaskId { run_id, task_id } => {
234                write!(f, "snapshot run {run_id} has invalid nil task id: {task_id}")
235            }
236            Self::InvalidReadyScheduleCausality { run_id, scheduled_at, created_at } => write!(
237                f,
238                "snapshot run {run_id} has invalid Ready schedule causality: scheduled_at \
239                 ({scheduled_at}) > created_at ({created_at})"
240            ),
241            Self::InvalidAttemptLineageState { run_id, state } => write!(
242                f,
243                "snapshot run {run_id} has active attempt lineage in invalid state {state:?}"
244            ),
245            Self::InvalidAttemptLineageCount { run_id, attempt_count } => write!(
246                f,
247                "snapshot run {run_id} has active attempt with invalid attempt_count \
248                 ({attempt_count})"
249            ),
250            Self::MissingInitialRunStateHistory { run_id } => {
251                write!(f, "snapshot run {run_id} missing initial Scheduled state history entry")
252            }
253            Self::InvalidRunStateHistoryTransition { run_id, from, to } => write!(
254                f,
255                "snapshot run {run_id} has invalid state history transition {from:?} -> {to:?}"
256            ),
257            Self::InvalidAttemptHistoryCount { run_id, attempt_count, history_len } => write!(
258                f,
259                "snapshot run {run_id} has attempt history length {history_len} inconsistent with \
260                 attempt_count ({attempt_count})"
261            ),
262            Self::InvalidActiveAttemptHistory { run_id } => {
263                write!(f, "snapshot run {run_id} has invalid active attempt history alignment")
264            }
265            Self::InvalidLeasePresence { run_id, state } => {
266                write!(f, "snapshot run {run_id} has lease metadata in invalid state {state:?}")
267            }
268            Self::DependencyReferencesUnknownTask { task_id, prereq_id } => write!(
269                f,
270                "snapshot dependency declaration for task {task_id} references unknown \
271                 prerequisite task {prereq_id}"
272            ),
273            Self::DependencyDeclarationUnknownTask { task_id } => {
274                write!(f, "snapshot dependency declaration for unknown task {task_id}")
275            }
276            Self::DuplicateDependencyDeclaration { task_id } => {
277                write!(f, "snapshot contains duplicate dependency declaration for task {task_id}")
278            }
279            Self::DuplicateBudgetAllocation { task_id, dimension } => {
280                write!(f, "duplicate budget allocation for task {task_id} dimension {dimension}")
281            }
282            Self::BudgetReferencesUnknownTask { task_id } => {
283                write!(f, "budget allocation references unknown task {task_id}")
284            }
285            Self::DuplicateSubscriptionId { subscription_id } => {
286                write!(f, "duplicate subscription id {subscription_id}")
287            }
288            Self::SubscriptionReferencesUnknownTask { subscription_id, task_id } => {
289                write!(f, "subscription {subscription_id} references unknown task {task_id}")
290            }
291            Self::InvalidEnginePausedCausality => {
292                write!(f, "snapshot engine state invalid: paused=true requires paused_at")
293            }
294            Self::InvalidEngineResumedCausality => {
295                write!(
296                    f,
297                    "snapshot engine state invalid: paused=false with paused_at requires \
298                     resumed_at"
299                )
300            }
301            Self::InvalidEnginePauseResumeOrdering { paused_at, resumed_at } => write!(
302                f,
303                "snapshot engine state invalid: resumed_at ({resumed_at}) < paused_at \
304                 ({paused_at})"
305            ),
306        }
307    }
308}
309
310impl std::error::Error for SnapshotMappingError {}
311
312/// Maps a canonical core run payload into the snapshot wrapper.
313pub fn map_core_run_to_snapshot(run_instance: CoreRunInstance) -> SnapshotRun {
314    SnapshotRun { run_instance, state_history: Vec::new(), attempts: Vec::new(), lease: None }
315}
316
317/// Maps a snapshot run wrapper into a canonical core run payload.
318pub fn map_snapshot_run_to_core(
319    snapshot_run: &SnapshotRun,
320) -> Result<CoreRunInstance, SnapshotMappingError> {
321    validate_core_run_payload(&snapshot_run.run_instance)?;
322    validate_snapshot_run_details(snapshot_run)?;
323    Ok(snapshot_run.run_instance.clone())
324}
325
326/// Validates full snapshot parity and mapping invariants before bootstrap.
327pub fn validate_snapshot(snapshot: &Snapshot) -> Result<(), SnapshotMappingError> {
328    if snapshot.metadata.schema_version != SNAPSHOT_SCHEMA_VERSION {
329        return Err(SnapshotMappingError::UnsupportedSchemaVersion {
330            expected: SNAPSHOT_SCHEMA_VERSION,
331            found: snapshot.metadata.schema_version,
332        });
333    }
334
335    let task_count = snapshot.tasks.len() as u64;
336    if snapshot.metadata.task_count != task_count {
337        return Err(SnapshotMappingError::TaskCountMismatch {
338            declared: snapshot.metadata.task_count,
339            actual: task_count,
340        });
341    }
342
343    let run_count = snapshot.runs.len() as u64;
344    if snapshot.metadata.run_count != run_count {
345        return Err(SnapshotMappingError::RunCountMismatch {
346            declared: snapshot.metadata.run_count,
347            actual: run_count,
348        });
349    }
350
351    let mut task_ids = HashSet::new();
352    for task in &snapshot.tasks {
353        let task_id = task.task_spec.id();
354        if !task_ids.insert(task_id) {
355            return Err(SnapshotMappingError::DuplicateTaskId { task_id });
356        }
357
358        if let Some(canceled_at) = task.canceled_at {
359            if canceled_at < task.created_at {
360                return Err(SnapshotMappingError::InvalidTaskCanceledAtCausality {
361                    task_id,
362                    created_at: task.created_at,
363                    canceled_at,
364                });
365            }
366        }
367    }
368
369    let mut run_ids = HashSet::new();
370    for run in &snapshot.runs {
371        let core_run = map_snapshot_run_to_core(run)?;
372        let run_id = core_run.id();
373        let task_id = core_run.task_id();
374
375        if !run_ids.insert(run_id) {
376            return Err(SnapshotMappingError::DuplicateRunId { run_id });
377        }
378
379        if !task_ids.contains(&task_id) {
380            return Err(SnapshotMappingError::RunReferencesUnknownTask { run_id, task_id });
381        }
382    }
383
384    validate_engine_control(&snapshot.engine)?;
385
386    // Validate dependency declarations.
387    let mut dep_task_ids = HashSet::new();
388    for decl in &snapshot.dependency_declarations {
389        if !dep_task_ids.insert(decl.task_id) {
390            return Err(SnapshotMappingError::DuplicateDependencyDeclaration {
391                task_id: decl.task_id,
392            });
393        }
394        if !task_ids.contains(&decl.task_id) {
395            return Err(SnapshotMappingError::DependencyDeclarationUnknownTask {
396                task_id: decl.task_id,
397            });
398        }
399        for &prereq_id in &decl.depends_on {
400            if !task_ids.contains(&prereq_id) {
401                return Err(SnapshotMappingError::DependencyReferencesUnknownTask {
402                    task_id: decl.task_id,
403                    prereq_id,
404                });
405            }
406        }
407    }
408
409    // Validate budget records.
410    let mut budget_keys = HashSet::new();
411    for budget in &snapshot.budgets {
412        if !budget_keys.insert((budget.task_id, budget.dimension)) {
413            return Err(SnapshotMappingError::DuplicateBudgetAllocation {
414                task_id: budget.task_id,
415                dimension: budget.dimension,
416            });
417        }
418        if !task_ids.contains(&budget.task_id) {
419            return Err(SnapshotMappingError::BudgetReferencesUnknownTask {
420                task_id: budget.task_id,
421            });
422        }
423    }
424
425    // Validate subscription records.
426    let mut sub_ids = HashSet::new();
427    for sub in &snapshot.subscriptions {
428        if !sub_ids.insert(sub.subscription_id) {
429            return Err(SnapshotMappingError::DuplicateSubscriptionId {
430                subscription_id: sub.subscription_id,
431            });
432        }
433        if !task_ids.contains(&sub.task_id) {
434            return Err(SnapshotMappingError::SubscriptionReferencesUnknownTask {
435                subscription_id: sub.subscription_id,
436                task_id: sub.task_id,
437            });
438        }
439    }
440
441    // Actor, tenant, role, capability, and ledger records are validated at the
442    // platform layer. Basic structural integrity is ensured by WAL ordering;
443    // no cross-referencing validation is required here.
444
445    Ok(())
446}
447
448fn validate_engine_control(engine: &SnapshotEngineControl) -> Result<(), SnapshotMappingError> {
449    if engine.paused && engine.paused_at.is_none() {
450        return Err(SnapshotMappingError::InvalidEnginePausedCausality);
451    }
452
453    if engine.paused && engine.resumed_at.is_some() {
454        return Err(SnapshotMappingError::InvalidEngineResumedCausality);
455    }
456
457    if engine.paused_at.is_none() && engine.resumed_at.is_some() {
458        return Err(SnapshotMappingError::InvalidEngineResumedCausality);
459    }
460
461    if !engine.paused && engine.paused_at.is_some() && engine.resumed_at.is_none() {
462        return Err(SnapshotMappingError::InvalidEngineResumedCausality);
463    }
464
465    if !engine.paused {
466        if let (Some(paused_at), Some(resumed_at)) = (engine.paused_at, engine.resumed_at) {
467            if resumed_at < paused_at {
468                return Err(SnapshotMappingError::InvalidEnginePauseResumeOrdering {
469                    paused_at,
470                    resumed_at,
471                });
472            }
473        }
474    }
475
476    Ok(())
477}
478
479fn validate_core_run_payload(run_instance: &CoreRunInstance) -> Result<(), SnapshotMappingError> {
480    let run_id = run_instance.id();
481    if run_id.as_uuid().is_nil() {
482        return Err(SnapshotMappingError::InvalidRunId { run_id });
483    }
484
485    let task_id = run_instance.task_id();
486    if task_id.as_uuid().is_nil() {
487        return Err(SnapshotMappingError::InvalidTaskId { run_id, task_id });
488    }
489
490    if run_instance.state() == RunState::Ready
491        && run_instance.scheduled_at() > run_instance.created_at()
492    {
493        return Err(SnapshotMappingError::InvalidReadyScheduleCausality {
494            run_id,
495            scheduled_at: run_instance.scheduled_at(),
496            created_at: run_instance.created_at(),
497        });
498    }
499
500    if run_instance.current_attempt_id().is_some()
501        && !matches!(run_instance.state(), RunState::Running | RunState::Canceled)
502    {
503        return Err(SnapshotMappingError::InvalidAttemptLineageState {
504            run_id,
505            state: run_instance.state(),
506        });
507    }
508
509    if run_instance.current_attempt_id().is_some() && run_instance.attempt_count() == 0 {
510        return Err(SnapshotMappingError::InvalidAttemptLineageCount {
511            run_id,
512            attempt_count: run_instance.attempt_count(),
513        });
514    }
515
516    Ok(())
517}
518
519fn validate_snapshot_run_details(snapshot_run: &SnapshotRun) -> Result<(), SnapshotMappingError> {
520    let run_id = snapshot_run.run_instance.id();
521    let created_at = snapshot_run.run_instance.created_at();
522
523    if snapshot_run.state_history.is_empty() {
524        return Err(SnapshotMappingError::MissingInitialRunStateHistory { run_id });
525    }
526
527    let first = &snapshot_run.state_history[0];
528    if first.from.is_some() || first.to != RunState::Scheduled || first.timestamp != created_at {
529        return Err(SnapshotMappingError::MissingInitialRunStateHistory { run_id });
530    }
531
532    let mut previous_state = RunState::Scheduled;
533    for entry in snapshot_run.state_history.iter().skip(1) {
534        let from = entry.from.ok_or(SnapshotMappingError::InvalidRunStateHistoryTransition {
535            run_id,
536            from: previous_state,
537            to: entry.to,
538        })?;
539
540        if from != previous_state || !is_valid_transition(from, entry.to) {
541            return Err(SnapshotMappingError::InvalidRunStateHistoryTransition {
542                run_id,
543                from,
544                to: entry.to,
545            });
546        }
547
548        previous_state = entry.to;
549    }
550
551    if previous_state != snapshot_run.run_instance.state() {
552        return Err(SnapshotMappingError::InvalidRunStateHistoryTransition {
553            run_id,
554            from: previous_state,
555            to: snapshot_run.run_instance.state(),
556        });
557    }
558
559    let attempt_count = snapshot_run.run_instance.attempt_count();
560    if snapshot_run.attempts.len() != attempt_count as usize {
561        return Err(SnapshotMappingError::InvalidAttemptHistoryCount {
562            run_id,
563            attempt_count,
564            history_len: snapshot_run.attempts.len(),
565        });
566    }
567
568    if let Some(current_attempt_id) = snapshot_run.run_instance.current_attempt_id() {
569        let unfinished: Vec<&SnapshotAttemptHistoryEntry> = snapshot_run
570            .attempts
571            .iter()
572            .filter(|entry| entry.attempt_id == current_attempt_id)
573            .collect();
574        if unfinished.len() != 1 || unfinished[0].finished_at.is_some() {
575            return Err(SnapshotMappingError::InvalidActiveAttemptHistory { run_id });
576        }
577    }
578
579    if snapshot_run.lease.is_some()
580        && !matches!(
581            snapshot_run.run_instance.state(),
582            RunState::Ready | RunState::Leased | RunState::Running
583        )
584    {
585        return Err(SnapshotMappingError::InvalidLeasePresence {
586            run_id,
587            state: snapshot_run.run_instance.state(),
588        });
589    }
590
591    Ok(())
592}
593
594/// Maps snapshot run history entries into reducer history entries.
595pub fn map_snapshot_run_history(
596    entries: Vec<SnapshotRunStateHistoryEntry>,
597) -> Vec<crate::recovery::reducer::RunStateHistoryEntry> {
598    entries
599        .into_iter()
600        .map(|entry| crate::recovery::reducer::RunStateHistoryEntry {
601            from: entry.from,
602            to: entry.to,
603            timestamp: entry.timestamp,
604        })
605        .collect::<Vec<crate::recovery::reducer::RunStateHistoryEntry>>()
606}
607
608/// Maps snapshot attempt history entries into reducer attempt history entries.
609pub fn map_snapshot_attempt_history(
610    entries: Vec<SnapshotAttemptHistoryEntry>,
611) -> Vec<crate::recovery::reducer::AttemptHistoryEntry> {
612    entries
613        .into_iter()
614        .map(|entry| crate::recovery::reducer::AttemptHistoryEntry {
615            attempt_id: entry.attempt_id,
616            started_at: entry.started_at,
617            finished_at: entry.finished_at,
618            result: entry.result,
619            error: entry.error,
620            output: entry.output,
621        })
622        .collect()
623}
624
625/// Maps snapshot lease metadata into reducer lease metadata.
626pub fn map_snapshot_lease_metadata(
627    lease: Option<SnapshotLeaseMetadata>,
628) -> Option<crate::recovery::reducer::LeaseMetadata> {
629    lease.map(|metadata| crate::recovery::reducer::LeaseMetadata {
630        owner: metadata.owner,
631        expiry: metadata.expiry,
632        acquired_at: metadata.acquired_at,
633        updated_at: metadata.updated_at,
634    })
635}