1use std::collections::HashSet;
7
8use actionqueue_core::budget::BudgetDimension;
9use actionqueue_core::ids::{RunId, TaskId};
10use actionqueue_core::run::run_instance::RunInstance as CoreRunInstance;
11use actionqueue_core::run::state::RunState;
12use actionqueue_core::run::transitions::is_valid_transition;
13use actionqueue_core::subscription::SubscriptionId;
14
15use crate::snapshot::model::{
16 Snapshot, SnapshotAttemptHistoryEntry, SnapshotEngineControl, SnapshotLeaseMetadata,
17 SnapshotRun, SnapshotRunStateHistoryEntry,
18};
19
20pub const SNAPSHOT_SCHEMA_VERSION: u32 = 8;
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum SnapshotMappingError {
33 UnsupportedSchemaVersion {
35 expected: u32,
37 found: u32,
39 },
40 TaskCountMismatch {
42 declared: u64,
44 actual: u64,
46 },
47 RunCountMismatch {
49 declared: u64,
51 actual: u64,
53 },
54 DuplicateTaskId {
56 task_id: TaskId,
58 },
59 InvalidTaskCanceledAtCausality {
61 task_id: TaskId,
63 created_at: u64,
65 canceled_at: u64,
67 },
68 DuplicateRunId {
70 run_id: RunId,
72 },
73 RunReferencesUnknownTask {
75 run_id: RunId,
77 task_id: TaskId,
79 },
80 InvalidRunId {
82 run_id: RunId,
84 },
85 InvalidTaskId {
87 run_id: RunId,
89 task_id: TaskId,
91 },
92 InvalidReadyScheduleCausality {
94 run_id: RunId,
96 scheduled_at: u64,
98 created_at: u64,
100 },
101 InvalidAttemptLineageState {
103 run_id: RunId,
105 state: RunState,
107 },
108 InvalidAttemptLineageCount {
110 run_id: RunId,
112 attempt_count: u32,
114 },
115 MissingInitialRunStateHistory {
117 run_id: RunId,
119 },
120 InvalidRunStateHistoryTransition {
122 run_id: RunId,
124 from: RunState,
126 to: RunState,
128 },
129 InvalidAttemptHistoryCount {
131 run_id: RunId,
133 attempt_count: u32,
135 history_len: usize,
137 },
138 InvalidActiveAttemptHistory {
140 run_id: RunId,
142 },
143 InvalidLeasePresence {
145 run_id: RunId,
147 state: RunState,
149 },
150 DependencyReferencesUnknownTask {
152 task_id: TaskId,
154 prereq_id: TaskId,
156 },
157 DependencyDeclarationUnknownTask {
159 task_id: TaskId,
161 },
162 DuplicateDependencyDeclaration {
164 task_id: TaskId,
166 },
167 DuplicateBudgetAllocation {
169 task_id: TaskId,
171 dimension: BudgetDimension,
173 },
174 BudgetReferencesUnknownTask {
176 task_id: TaskId,
178 },
179 DuplicateSubscriptionId {
181 subscription_id: SubscriptionId,
183 },
184 SubscriptionReferencesUnknownTask {
186 subscription_id: SubscriptionId,
188 task_id: TaskId,
190 },
191 InvalidEnginePausedCausality,
193 InvalidEngineResumedCausality,
195 InvalidEnginePauseResumeOrdering {
197 paused_at: u64,
199 resumed_at: u64,
201 },
202}
203
204impl std::fmt::Display for SnapshotMappingError {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 match self {
207 Self::UnsupportedSchemaVersion { expected, found } => {
208 write!(f, "unsupported snapshot schema version: expected {expected}, found {found}")
209 }
210 Self::TaskCountMismatch { declared, actual } => {
211 write!(f, "snapshot task_count mismatch: declared {declared}, actual {actual}")
212 }
213 Self::RunCountMismatch { declared, actual } => {
214 write!(f, "snapshot run_count mismatch: declared {declared}, actual {actual}")
215 }
216 Self::DuplicateTaskId { task_id } => {
217 write!(f, "duplicate snapshot task id: {task_id}")
218 }
219 Self::InvalidTaskCanceledAtCausality { task_id, created_at, canceled_at } => write!(
220 f,
221 "snapshot task {task_id} has invalid canceled_at causality: canceled_at \
222 ({canceled_at}) < created_at ({created_at})"
223 ),
224 Self::DuplicateRunId { run_id } => {
225 write!(f, "duplicate snapshot run id: {run_id}")
226 }
227 Self::RunReferencesUnknownTask { run_id, task_id } => {
228 write!(f, "snapshot run {run_id} references unknown task {task_id}")
229 }
230 Self::InvalidRunId { run_id } => {
231 write!(f, "snapshot run has invalid nil run id: {run_id}")
232 }
233 Self::InvalidTaskId { run_id, task_id } => {
234 write!(f, "snapshot run {run_id} has invalid nil task id: {task_id}")
235 }
236 Self::InvalidReadyScheduleCausality { run_id, scheduled_at, created_at } => write!(
237 f,
238 "snapshot run {run_id} has invalid Ready schedule causality: scheduled_at \
239 ({scheduled_at}) > created_at ({created_at})"
240 ),
241 Self::InvalidAttemptLineageState { run_id, state } => write!(
242 f,
243 "snapshot run {run_id} has active attempt lineage in invalid state {state:?}"
244 ),
245 Self::InvalidAttemptLineageCount { run_id, attempt_count } => write!(
246 f,
247 "snapshot run {run_id} has active attempt with invalid attempt_count \
248 ({attempt_count})"
249 ),
250 Self::MissingInitialRunStateHistory { run_id } => {
251 write!(f, "snapshot run {run_id} missing initial Scheduled state history entry")
252 }
253 Self::InvalidRunStateHistoryTransition { run_id, from, to } => write!(
254 f,
255 "snapshot run {run_id} has invalid state history transition {from:?} -> {to:?}"
256 ),
257 Self::InvalidAttemptHistoryCount { run_id, attempt_count, history_len } => write!(
258 f,
259 "snapshot run {run_id} has attempt history length {history_len} inconsistent with \
260 attempt_count ({attempt_count})"
261 ),
262 Self::InvalidActiveAttemptHistory { run_id } => {
263 write!(f, "snapshot run {run_id} has invalid active attempt history alignment")
264 }
265 Self::InvalidLeasePresence { run_id, state } => {
266 write!(f, "snapshot run {run_id} has lease metadata in invalid state {state:?}")
267 }
268 Self::DependencyReferencesUnknownTask { task_id, prereq_id } => write!(
269 f,
270 "snapshot dependency declaration for task {task_id} references unknown \
271 prerequisite task {prereq_id}"
272 ),
273 Self::DependencyDeclarationUnknownTask { task_id } => {
274 write!(f, "snapshot dependency declaration for unknown task {task_id}")
275 }
276 Self::DuplicateDependencyDeclaration { task_id } => {
277 write!(f, "snapshot contains duplicate dependency declaration for task {task_id}")
278 }
279 Self::DuplicateBudgetAllocation { task_id, dimension } => {
280 write!(f, "duplicate budget allocation for task {task_id} dimension {dimension}")
281 }
282 Self::BudgetReferencesUnknownTask { task_id } => {
283 write!(f, "budget allocation references unknown task {task_id}")
284 }
285 Self::DuplicateSubscriptionId { subscription_id } => {
286 write!(f, "duplicate subscription id {subscription_id}")
287 }
288 Self::SubscriptionReferencesUnknownTask { subscription_id, task_id } => {
289 write!(f, "subscription {subscription_id} references unknown task {task_id}")
290 }
291 Self::InvalidEnginePausedCausality => {
292 write!(f, "snapshot engine state invalid: paused=true requires paused_at")
293 }
294 Self::InvalidEngineResumedCausality => {
295 write!(
296 f,
297 "snapshot engine state invalid: paused=false with paused_at requires \
298 resumed_at"
299 )
300 }
301 Self::InvalidEnginePauseResumeOrdering { paused_at, resumed_at } => write!(
302 f,
303 "snapshot engine state invalid: resumed_at ({resumed_at}) < paused_at \
304 ({paused_at})"
305 ),
306 }
307 }
308}
309
310impl std::error::Error for SnapshotMappingError {}
311
312pub fn map_core_run_to_snapshot(run_instance: CoreRunInstance) -> SnapshotRun {
314 SnapshotRun { run_instance, state_history: Vec::new(), attempts: Vec::new(), lease: None }
315}
316
317pub fn map_snapshot_run_to_core(
319 snapshot_run: &SnapshotRun,
320) -> Result<CoreRunInstance, SnapshotMappingError> {
321 validate_core_run_payload(&snapshot_run.run_instance)?;
322 validate_snapshot_run_details(snapshot_run)?;
323 Ok(snapshot_run.run_instance.clone())
324}
325
326pub fn validate_snapshot(snapshot: &Snapshot) -> Result<(), SnapshotMappingError> {
328 if snapshot.metadata.schema_version != SNAPSHOT_SCHEMA_VERSION {
329 return Err(SnapshotMappingError::UnsupportedSchemaVersion {
330 expected: SNAPSHOT_SCHEMA_VERSION,
331 found: snapshot.metadata.schema_version,
332 });
333 }
334
335 let task_count = snapshot.tasks.len() as u64;
336 if snapshot.metadata.task_count != task_count {
337 return Err(SnapshotMappingError::TaskCountMismatch {
338 declared: snapshot.metadata.task_count,
339 actual: task_count,
340 });
341 }
342
343 let run_count = snapshot.runs.len() as u64;
344 if snapshot.metadata.run_count != run_count {
345 return Err(SnapshotMappingError::RunCountMismatch {
346 declared: snapshot.metadata.run_count,
347 actual: run_count,
348 });
349 }
350
351 let mut task_ids = HashSet::new();
352 for task in &snapshot.tasks {
353 let task_id = task.task_spec.id();
354 if !task_ids.insert(task_id) {
355 return Err(SnapshotMappingError::DuplicateTaskId { task_id });
356 }
357
358 if let Some(canceled_at) = task.canceled_at {
359 if canceled_at < task.created_at {
360 return Err(SnapshotMappingError::InvalidTaskCanceledAtCausality {
361 task_id,
362 created_at: task.created_at,
363 canceled_at,
364 });
365 }
366 }
367 }
368
369 let mut run_ids = HashSet::new();
370 for run in &snapshot.runs {
371 let core_run = map_snapshot_run_to_core(run)?;
372 let run_id = core_run.id();
373 let task_id = core_run.task_id();
374
375 if !run_ids.insert(run_id) {
376 return Err(SnapshotMappingError::DuplicateRunId { run_id });
377 }
378
379 if !task_ids.contains(&task_id) {
380 return Err(SnapshotMappingError::RunReferencesUnknownTask { run_id, task_id });
381 }
382 }
383
384 validate_engine_control(&snapshot.engine)?;
385
386 let mut dep_task_ids = HashSet::new();
388 for decl in &snapshot.dependency_declarations {
389 if !dep_task_ids.insert(decl.task_id) {
390 return Err(SnapshotMappingError::DuplicateDependencyDeclaration {
391 task_id: decl.task_id,
392 });
393 }
394 if !task_ids.contains(&decl.task_id) {
395 return Err(SnapshotMappingError::DependencyDeclarationUnknownTask {
396 task_id: decl.task_id,
397 });
398 }
399 for &prereq_id in &decl.depends_on {
400 if !task_ids.contains(&prereq_id) {
401 return Err(SnapshotMappingError::DependencyReferencesUnknownTask {
402 task_id: decl.task_id,
403 prereq_id,
404 });
405 }
406 }
407 }
408
409 let mut budget_keys = HashSet::new();
411 for budget in &snapshot.budgets {
412 if !budget_keys.insert((budget.task_id, budget.dimension)) {
413 return Err(SnapshotMappingError::DuplicateBudgetAllocation {
414 task_id: budget.task_id,
415 dimension: budget.dimension,
416 });
417 }
418 if !task_ids.contains(&budget.task_id) {
419 return Err(SnapshotMappingError::BudgetReferencesUnknownTask {
420 task_id: budget.task_id,
421 });
422 }
423 }
424
425 let mut sub_ids = HashSet::new();
427 for sub in &snapshot.subscriptions {
428 if !sub_ids.insert(sub.subscription_id) {
429 return Err(SnapshotMappingError::DuplicateSubscriptionId {
430 subscription_id: sub.subscription_id,
431 });
432 }
433 if !task_ids.contains(&sub.task_id) {
434 return Err(SnapshotMappingError::SubscriptionReferencesUnknownTask {
435 subscription_id: sub.subscription_id,
436 task_id: sub.task_id,
437 });
438 }
439 }
440
441 Ok(())
446}
447
448fn validate_engine_control(engine: &SnapshotEngineControl) -> Result<(), SnapshotMappingError> {
449 if engine.paused && engine.paused_at.is_none() {
450 return Err(SnapshotMappingError::InvalidEnginePausedCausality);
451 }
452
453 if engine.paused && engine.resumed_at.is_some() {
454 return Err(SnapshotMappingError::InvalidEngineResumedCausality);
455 }
456
457 if engine.paused_at.is_none() && engine.resumed_at.is_some() {
458 return Err(SnapshotMappingError::InvalidEngineResumedCausality);
459 }
460
461 if !engine.paused && engine.paused_at.is_some() && engine.resumed_at.is_none() {
462 return Err(SnapshotMappingError::InvalidEngineResumedCausality);
463 }
464
465 if !engine.paused {
466 if let (Some(paused_at), Some(resumed_at)) = (engine.paused_at, engine.resumed_at) {
467 if resumed_at < paused_at {
468 return Err(SnapshotMappingError::InvalidEnginePauseResumeOrdering {
469 paused_at,
470 resumed_at,
471 });
472 }
473 }
474 }
475
476 Ok(())
477}
478
479fn validate_core_run_payload(run_instance: &CoreRunInstance) -> Result<(), SnapshotMappingError> {
480 let run_id = run_instance.id();
481 if run_id.as_uuid().is_nil() {
482 return Err(SnapshotMappingError::InvalidRunId { run_id });
483 }
484
485 let task_id = run_instance.task_id();
486 if task_id.as_uuid().is_nil() {
487 return Err(SnapshotMappingError::InvalidTaskId { run_id, task_id });
488 }
489
490 if run_instance.state() == RunState::Ready
491 && run_instance.scheduled_at() > run_instance.created_at()
492 {
493 return Err(SnapshotMappingError::InvalidReadyScheduleCausality {
494 run_id,
495 scheduled_at: run_instance.scheduled_at(),
496 created_at: run_instance.created_at(),
497 });
498 }
499
500 if run_instance.current_attempt_id().is_some()
501 && !matches!(run_instance.state(), RunState::Running | RunState::Canceled)
502 {
503 return Err(SnapshotMappingError::InvalidAttemptLineageState {
504 run_id,
505 state: run_instance.state(),
506 });
507 }
508
509 if run_instance.current_attempt_id().is_some() && run_instance.attempt_count() == 0 {
510 return Err(SnapshotMappingError::InvalidAttemptLineageCount {
511 run_id,
512 attempt_count: run_instance.attempt_count(),
513 });
514 }
515
516 Ok(())
517}
518
519fn validate_snapshot_run_details(snapshot_run: &SnapshotRun) -> Result<(), SnapshotMappingError> {
520 let run_id = snapshot_run.run_instance.id();
521 let created_at = snapshot_run.run_instance.created_at();
522
523 if snapshot_run.state_history.is_empty() {
524 return Err(SnapshotMappingError::MissingInitialRunStateHistory { run_id });
525 }
526
527 let first = &snapshot_run.state_history[0];
528 if first.from.is_some() || first.to != RunState::Scheduled || first.timestamp != created_at {
529 return Err(SnapshotMappingError::MissingInitialRunStateHistory { run_id });
530 }
531
532 let mut previous_state = RunState::Scheduled;
533 for entry in snapshot_run.state_history.iter().skip(1) {
534 let from = entry.from.ok_or(SnapshotMappingError::InvalidRunStateHistoryTransition {
535 run_id,
536 from: previous_state,
537 to: entry.to,
538 })?;
539
540 if from != previous_state || !is_valid_transition(from, entry.to) {
541 return Err(SnapshotMappingError::InvalidRunStateHistoryTransition {
542 run_id,
543 from,
544 to: entry.to,
545 });
546 }
547
548 previous_state = entry.to;
549 }
550
551 if previous_state != snapshot_run.run_instance.state() {
552 return Err(SnapshotMappingError::InvalidRunStateHistoryTransition {
553 run_id,
554 from: previous_state,
555 to: snapshot_run.run_instance.state(),
556 });
557 }
558
559 let attempt_count = snapshot_run.run_instance.attempt_count();
560 if snapshot_run.attempts.len() != attempt_count as usize {
561 return Err(SnapshotMappingError::InvalidAttemptHistoryCount {
562 run_id,
563 attempt_count,
564 history_len: snapshot_run.attempts.len(),
565 });
566 }
567
568 if let Some(current_attempt_id) = snapshot_run.run_instance.current_attempt_id() {
569 let unfinished: Vec<&SnapshotAttemptHistoryEntry> = snapshot_run
570 .attempts
571 .iter()
572 .filter(|entry| entry.attempt_id == current_attempt_id)
573 .collect();
574 if unfinished.len() != 1 || unfinished[0].finished_at.is_some() {
575 return Err(SnapshotMappingError::InvalidActiveAttemptHistory { run_id });
576 }
577 }
578
579 if snapshot_run.lease.is_some()
580 && !matches!(
581 snapshot_run.run_instance.state(),
582 RunState::Ready | RunState::Leased | RunState::Running
583 )
584 {
585 return Err(SnapshotMappingError::InvalidLeasePresence {
586 run_id,
587 state: snapshot_run.run_instance.state(),
588 });
589 }
590
591 Ok(())
592}
593
594pub fn map_snapshot_run_history(
596 entries: Vec<SnapshotRunStateHistoryEntry>,
597) -> Vec<crate::recovery::reducer::RunStateHistoryEntry> {
598 entries
599 .into_iter()
600 .map(|entry| crate::recovery::reducer::RunStateHistoryEntry {
601 from: entry.from,
602 to: entry.to,
603 timestamp: entry.timestamp,
604 })
605 .collect::<Vec<crate::recovery::reducer::RunStateHistoryEntry>>()
606}
607
608pub fn map_snapshot_attempt_history(
610 entries: Vec<SnapshotAttemptHistoryEntry>,
611) -> Vec<crate::recovery::reducer::AttemptHistoryEntry> {
612 entries
613 .into_iter()
614 .map(|entry| crate::recovery::reducer::AttemptHistoryEntry {
615 attempt_id: entry.attempt_id,
616 started_at: entry.started_at,
617 finished_at: entry.finished_at,
618 result: entry.result,
619 error: entry.error,
620 output: entry.output,
621 })
622 .collect()
623}
624
625pub fn map_snapshot_lease_metadata(
627 lease: Option<SnapshotLeaseMetadata>,
628) -> Option<crate::recovery::reducer::LeaseMetadata> {
629 lease.map(|metadata| crate::recovery::reducer::LeaseMetadata {
630 owner: metadata.owner,
631 expiry: metadata.expiry,
632 acquired_at: metadata.acquired_at,
633 updated_at: metadata.updated_at,
634 })
635}