Skip to main content

actionqueue_storage/recovery/
bootstrap.rs

1//! Recovery bootstrap utilities for loading projections from storage.
2//!
3//! This module provides a deterministic, replay-only bootstrap path that
4//! reconstructs authoritative projection state from snapshot + WAL tail.
5//! It enforces strict invariant boundaries by avoiding direct mutation
6//! of projection internals and by mapping all snapshot contents through
7//! WAL-equivalent events.
8
9use std::path::PathBuf;
10use std::time::Instant;
11
12use actionqueue_core::run::state::RunState;
13use actionqueue_core::run::transitions::is_valid_transition;
14use actionqueue_core::run::RunInstance;
15
16use crate::recovery::reducer::ReplayReducer;
17use crate::recovery::replay::ReplayDriver;
18use crate::snapshot::loader::{SnapshotFsLoader, SnapshotLoader};
19use crate::snapshot::mapping::{
20    map_snapshot_attempt_history, map_snapshot_lease_metadata, map_snapshot_run_history,
21};
22use crate::snapshot::model::Snapshot;
23use crate::wal::event::{WalEvent, WalEventType};
24use crate::wal::fs_reader::WalFsReader;
25use crate::wal::fs_writer::WalFsWriter;
26use crate::wal::reader::{WalReader, WalReaderError};
27use crate::wal::{InstrumentedWalWriter, WalAppendTelemetry};
28
29/// Authoritative recovery observations emitted by storage bootstrap execution.
30#[derive(Debug, Clone, Copy, PartialEq)]
31#[must_use]
32pub struct RecoveryObservations {
33    /// Measured bootstrap recovery wall duration in seconds.
34    pub recovery_duration_seconds: f64,
35    /// Total applied recovery events (`snapshot + WAL replay`).
36    pub events_applied_total: u64,
37    /// Applied event count contributed by snapshot hydration.
38    pub snapshot_events_applied: u64,
39    /// Applied event count contributed by WAL tail replay.
40    pub wal_replay_events_applied: u64,
41}
42
43impl RecoveryObservations {
44    /// Returns a zeroed observation payload for deterministic test setup.
45    pub const fn zero() -> Self {
46        Self {
47            recovery_duration_seconds: 0.0,
48            events_applied_total: 0,
49            snapshot_events_applied: 0,
50            wal_replay_events_applied: 0,
51        }
52    }
53}
54
55/// Recovery bootstrap output containing projection state and storage handles.
56#[must_use]
57pub struct RecoveryBootstrap {
58    /// Replayed projection state derived from snapshot + WAL.
59    pub projection: ReplayReducer,
60    /// WAL writer handle for future mutation/control lanes.
61    pub wal_writer: InstrumentedWalWriter<WalFsWriter>,
62    /// Authoritative WAL append telemetry for daemon metrics surfaces.
63    pub wal_append_telemetry: WalAppendTelemetry,
64    /// Full WAL file path.
65    pub wal_path: PathBuf,
66    /// Full snapshot file path.
67    pub snapshot_path: PathBuf,
68    /// Whether a snapshot was loaded.
69    pub snapshot_loaded: bool,
70    /// The WAL sequence number encoded in the loaded snapshot (0 if none).
71    pub snapshot_sequence: u64,
72    /// Authoritative recovery observations from this bootstrap execution.
73    pub recovery_observations: RecoveryObservations,
74}
75
76/// Typed errors for recovery bootstrap failures.
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum RecoveryBootstrapError {
79    /// WAL initialization failed.
80    WalInit(String),
81    /// WAL read failed.
82    WalRead(String),
83    /// Snapshot load failed.
84    SnapshotLoad(String),
85    /// WAL replay failed.
86    WalReplay(String),
87    /// Snapshot bootstrap failed.
88    SnapshotBootstrap(String),
89}
90
91impl std::fmt::Display for RecoveryBootstrapError {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        match self {
94            RecoveryBootstrapError::WalInit(msg) => write!(f, "WAL init error: {msg}"),
95            RecoveryBootstrapError::WalRead(msg) => write!(f, "WAL read error: {msg}"),
96            RecoveryBootstrapError::SnapshotLoad(msg) => write!(f, "snapshot load error: {msg}"),
97            RecoveryBootstrapError::WalReplay(msg) => write!(f, "WAL replay error: {msg}"),
98            RecoveryBootstrapError::SnapshotBootstrap(msg) => {
99                write!(f, "snapshot bootstrap error: {msg}")
100            }
101        }
102    }
103}
104
105impl std::error::Error for RecoveryBootstrapError {}
106
107/// Loads an authoritative projection from storage using snapshot + WAL tail.
108///
109/// This function creates required storage directories, initializes WAL handles,
110/// attempts snapshot load, then replays the WAL tail from the snapshot sequence.
111///
112/// # Errors
113///
114/// Returns [`RecoveryBootstrapError`] variants with the underlying error string
115/// for deterministic diagnostics.
116pub fn load_projection_from_storage(
117    data_root: &std::path::Path,
118) -> Result<RecoveryBootstrap, RecoveryBootstrapError> {
119    let recovery_started_at = Instant::now();
120    tracing::info!(data_dir = %data_root.display(), "storage bootstrap started");
121
122    let wal_dir = data_root.join("wal");
123    let snapshot_dir = data_root.join("snapshots");
124    let wal_path = wal_dir.join("actionqueue.wal");
125    let snapshot_path = snapshot_dir.join("snapshot.bin");
126
127    std::fs::create_dir_all(&wal_dir)
128        .and_then(|_| std::fs::create_dir_all(&snapshot_dir))
129        .map_err(|err| RecoveryBootstrapError::WalInit(err.to_string()))?;
130
131    let wal_writer = WalFsWriter::new(wal_path.clone())
132        .map_err(|err| RecoveryBootstrapError::WalInit(err.to_string()))?;
133    let wal_append_telemetry = WalAppendTelemetry::new();
134    let wal_writer = InstrumentedWalWriter::new(wal_writer, wal_append_telemetry.clone());
135
136    let wal_reader = WalFsReader::new(wal_path.clone())
137        .map_err(|err| RecoveryBootstrapError::WalRead(err.to_string()))?;
138
139    let (snapshot_loaded, snapshot_sequence, reducer, snapshot_events_applied) =
140        match SnapshotFsLoader::new(snapshot_path.clone()).load() {
141            Ok(None) => (false, 0, ReplayReducer::new(), 0),
142            Ok(Some(snapshot)) => {
143                let sequence = snapshot.metadata.wal_sequence;
144                let task_count = snapshot.metadata.task_count;
145                tracing::info!(wal_sequence = sequence, task_count, "snapshot loaded successfully");
146                match bootstrap_reducer_from_snapshot(&snapshot) {
147                    Ok((reducer, snapshot_events_applied)) => {
148                        (true, sequence, reducer, snapshot_events_applied)
149                    }
150                    Err(err) => {
151                        tracing::warn!(
152                            error = %err,
153                            "snapshot bootstrap failed, falling back to WAL-only replay"
154                        );
155                        (false, 0, ReplayReducer::new(), 0)
156                    }
157                }
158            }
159            Err(err) => {
160                // Snapshots are derived acceleration artifacts — a corrupt snapshot
161                // must not prevent recovery. Fall back to full WAL replay.
162                tracing::warn!(
163                    error = %err,
164                    "snapshot load failed, falling back to WAL-only replay"
165                );
166                (false, 0, ReplayReducer::new(), 0)
167            }
168        };
169
170    // Validate snapshot sequence against WAL max sequence.
171    let wal_max_sequence = wal_writer.inner().current_sequence();
172    if snapshot_loaded && snapshot_sequence > wal_max_sequence {
173        if wal_max_sequence == 0 {
174            tracing::info!(
175                snapshot_sequence,
176                "bootstrapping from snapshot with empty WAL — WAL events prior to snapshot are \
177                 not recoverable"
178            );
179        } else {
180            return Err(RecoveryBootstrapError::SnapshotBootstrap(format!(
181                "snapshot sequence {snapshot_sequence} exceeds WAL max sequence {wal_max_sequence}"
182            )));
183        }
184    }
185
186    let mut wal_reader = wal_reader;
187
188    if snapshot_sequence > 0 {
189        match wal_reader.seek_to_sequence(snapshot_sequence + 1) {
190            Ok(()) => {}
191            Err(WalReaderError::EndOfWal) => {
192                let recovery_duration_seconds = recovery_started_at.elapsed().as_secs_f64();
193                let wal_replay_events_applied = 0;
194                let events_applied_total =
195                    snapshot_events_applied.saturating_add(wal_replay_events_applied);
196                return Ok(RecoveryBootstrap {
197                    projection: reducer,
198                    wal_writer,
199                    wal_append_telemetry,
200                    wal_path,
201                    snapshot_path,
202                    snapshot_loaded,
203                    snapshot_sequence,
204                    recovery_observations: RecoveryObservations {
205                        recovery_duration_seconds,
206                        events_applied_total,
207                        snapshot_events_applied,
208                        wal_replay_events_applied,
209                    },
210                });
211            }
212            Err(err) => return Err(RecoveryBootstrapError::WalRead(err.to_string())),
213        }
214    }
215
216    let mut driver = ReplayDriver::new(wal_reader, reducer);
217    let wal_replay_events_applied = driver
218        .run_with_applied_count()
219        .map_err(|err| RecoveryBootstrapError::WalReplay(err.to_string()))?;
220    tracing::info!(event_count = wal_replay_events_applied, "WAL replay complete");
221    let reducer = driver.into_reducer();
222    let recovery_duration_seconds = recovery_started_at.elapsed().as_secs_f64();
223    let events_applied_total = snapshot_events_applied.saturating_add(wal_replay_events_applied);
224
225    tracing::info!(
226        events_applied_total,
227        snapshot_loaded,
228        snapshot_events_applied,
229        wal_replay_events_applied,
230        recovery_duration_seconds,
231        "recovery bootstrap complete"
232    );
233
234    Ok(RecoveryBootstrap {
235        projection: reducer,
236        wal_writer,
237        wal_append_telemetry,
238        wal_path,
239        snapshot_path,
240        snapshot_loaded,
241        snapshot_sequence,
242        recovery_observations: RecoveryObservations {
243            recovery_duration_seconds,
244            events_applied_total,
245            snapshot_events_applied,
246            wal_replay_events_applied,
247        },
248    })
249}
250
251fn bootstrap_reducer_from_snapshot(
252    snapshot: &Snapshot,
253) -> Result<(ReplayReducer, u64), RecoveryBootstrapError> {
254    let mut reducer = ReplayReducer::new();
255    let mut sequence = 1u64;
256    let mut snapshot_events_applied = 0u64;
257
258    for task in &snapshot.tasks {
259        let event = WalEvent::new(
260            sequence,
261            WalEventType::TaskCreated {
262                task_spec: task.task_spec.clone(),
263                timestamp: task.created_at,
264            },
265        );
266        reducer
267            .apply(&event)
268            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
269        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
270        sequence = sequence.saturating_add(1);
271
272        if let Some(canceled_at) = task.canceled_at {
273            let event = WalEvent::new(
274                sequence,
275                WalEventType::TaskCanceled { task_id: task.task_spec.id(), timestamp: canceled_at },
276            );
277            reducer
278                .apply(&event)
279                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
280            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
281            sequence = sequence.saturating_add(1);
282        }
283    }
284
285    // Synthesize dependency declarations from snapshot (S-1 fix).
286    for decl in &snapshot.dependency_declarations {
287        let event = WalEvent::new(
288            sequence,
289            WalEventType::DependencyDeclared {
290                task_id: decl.task_id,
291                depends_on: decl.depends_on.clone(),
292                timestamp: decl.declared_at,
293            },
294        );
295        reducer
296            .apply(&event)
297            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
298        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
299        sequence = sequence.saturating_add(1);
300    }
301
302    if let Some(paused_at) = snapshot.engine.paused_at {
303        let event = WalEvent::new(sequence, WalEventType::EnginePaused { timestamp: paused_at });
304        reducer
305            .apply(&event)
306            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
307        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
308        sequence = sequence.saturating_add(1);
309    }
310
311    if let Some(resumed_at) = snapshot.engine.resumed_at {
312        let event = WalEvent::new(sequence, WalEventType::EngineResumed { timestamp: resumed_at });
313        reducer
314            .apply(&event)
315            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
316        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
317        sequence = sequence.saturating_add(1);
318    }
319
320    for run in &snapshot.runs {
321        let run_instance = run.run_instance.clone();
322        let scheduled_run = RunInstance::new_scheduled_with_id(
323            run_instance.id(),
324            run_instance.task_id(),
325            run_instance.scheduled_at(),
326            run_instance.created_at(),
327        )
328        .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
329
330        let event =
331            WalEvent::new(sequence, WalEventType::RunCreated { run_instance: scheduled_run });
332        reducer
333            .apply(&event)
334            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
335        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
336        sequence = sequence.saturating_add(1);
337
338        let snapshot_state = run_instance.state();
339        let mut previous_state = RunState::Scheduled;
340
341        for entry in run.state_history.iter().skip(1) {
342            if entry.from != Some(previous_state) {
343                return Err(RecoveryBootstrapError::SnapshotBootstrap(
344                    "invalid snapshot state history for bootstrap".to_string(),
345                ));
346            }
347            if !is_valid_transition(previous_state, entry.to) {
348                return Err(RecoveryBootstrapError::SnapshotBootstrap(
349                    "invalid snapshot state transition for bootstrap".to_string(),
350                ));
351            }
352
353            let event = WalEvent::new(
354                sequence,
355                WalEventType::RunStateChanged {
356                    run_id: run_instance.id(),
357                    previous_state,
358                    new_state: entry.to,
359                    timestamp: entry.timestamp,
360                },
361            );
362            reducer
363                .apply(&event)
364                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
365            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
366            sequence = sequence.saturating_add(1);
367            previous_state = entry.to;
368        }
369
370        if previous_state != snapshot_state {
371            return Err(RecoveryBootstrapError::SnapshotBootstrap(
372                "snapshot state history does not match run state".to_string(),
373            ));
374        }
375
376        // Restore attempt state from snapshot attempt history.
377        // Bootstrap replay walks state transitions but never calls
378        // start_attempt()/finish_attempt(), so we must seed these directly.
379        let attempt_count = run.attempts.len() as u32;
380        let active_attempt = if snapshot_state == RunState::Running {
381            // If the run is Running, the last attempt may still be active
382            // (no finished_at timestamp means the attempt is in-flight).
383            run.attempts.last().and_then(|a| {
384                if a.finished_at.is_none() {
385                    Some(a.attempt_id)
386                } else {
387                    None
388                }
389            })
390        } else {
391            None
392        };
393        if let Some(run_inst) = reducer.get_run_instance_mut(run_instance.id()) {
394            run_inst.restore_attempt_state_for_bootstrap(attempt_count, active_attempt);
395        }
396
397        reducer.set_run_history(
398            run_instance.id(),
399            map_snapshot_run_history(run.state_history.clone()),
400        );
401        reducer.set_attempt_history(
402            run_instance.id(),
403            map_snapshot_attempt_history(run.attempts.clone()),
404        );
405        let lease_metadata = map_snapshot_lease_metadata(run.lease.clone());
406        if let Some(metadata) = lease_metadata {
407            reducer.set_lease_for_bootstrap(run_instance.id(), metadata);
408        }
409    }
410
411    // Synthesize budget allocation events from snapshot.
412    for budget in &snapshot.budgets {
413        let event = WalEvent::new(
414            sequence,
415            WalEventType::BudgetAllocated {
416                task_id: budget.task_id,
417                dimension: budget.dimension,
418                limit: budget.limit,
419                timestamp: budget.allocated_at,
420            },
421        );
422        reducer
423            .apply(&event)
424            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
425        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
426        sequence = sequence.saturating_add(1);
427
428        if budget.consumed > 0 {
429            let event = WalEvent::new(
430                sequence,
431                WalEventType::BudgetConsumed {
432                    task_id: budget.task_id,
433                    dimension: budget.dimension,
434                    amount: budget.consumed,
435                    timestamp: budget.allocated_at,
436                },
437            );
438            reducer
439                .apply(&event)
440                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
441            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
442            sequence = sequence.saturating_add(1);
443        }
444
445        if budget.exhausted && budget.consumed < budget.limit {
446            let event = WalEvent::new(
447                sequence,
448                WalEventType::BudgetExhausted {
449                    task_id: budget.task_id,
450                    dimension: budget.dimension,
451                    timestamp: budget.allocated_at,
452                },
453            );
454            reducer
455                .apply(&event)
456                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
457            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
458            sequence = sequence.saturating_add(1);
459        }
460    }
461
462    // Synthesize subscription events from snapshot.
463    for sub in &snapshot.subscriptions {
464        let event = WalEvent::new(
465            sequence,
466            WalEventType::SubscriptionCreated {
467                subscription_id: sub.subscription_id,
468                task_id: sub.task_id,
469                filter: sub.filter.clone(),
470                timestamp: sub.created_at,
471            },
472        );
473        reducer
474            .apply(&event)
475            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
476        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
477        sequence = sequence.saturating_add(1);
478
479        if let Some(triggered_at) = sub.triggered_at {
480            let event = WalEvent::new(
481                sequence,
482                WalEventType::SubscriptionTriggered {
483                    subscription_id: sub.subscription_id,
484                    timestamp: triggered_at,
485                },
486            );
487            reducer
488                .apply(&event)
489                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
490            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
491            sequence = sequence.saturating_add(1);
492        }
493
494        if let Some(canceled_at) = sub.canceled_at {
495            let event = WalEvent::new(
496                sequence,
497                WalEventType::SubscriptionCanceled {
498                    subscription_id: sub.subscription_id,
499                    timestamp: canceled_at,
500                },
501            );
502            reducer
503                .apply(&event)
504                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
505            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
506            sequence = sequence.saturating_add(1);
507        }
508    }
509
510    // Bootstrap actor, tenant, role, capability, and ledger state from snapshot.
511    for actor in &snapshot.actors {
512        let event = WalEvent::new(
513            sequence,
514            WalEventType::ActorRegistered {
515                actor_id: actor.actor_id,
516                identity: actor.identity.clone(),
517                capabilities: actor.capabilities.clone(),
518                department: actor.department.clone(),
519                heartbeat_interval_secs: actor.heartbeat_interval_secs,
520                tenant_id: actor.tenant_id,
521                timestamp: actor.registered_at,
522            },
523        );
524        reducer
525            .apply(&event)
526            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
527        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
528        sequence = sequence.saturating_add(1);
529
530        if let Some(deregistered_at) = actor.deregistered_at {
531            let event = WalEvent::new(
532                sequence,
533                WalEventType::ActorDeregistered {
534                    actor_id: actor.actor_id,
535                    timestamp: deregistered_at,
536                },
537            );
538            reducer
539                .apply(&event)
540                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
541            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
542            sequence = sequence.saturating_add(1);
543        }
544    }
545
546    for tenant in &snapshot.tenants {
547        let event = WalEvent::new(
548            sequence,
549            WalEventType::TenantCreated {
550                tenant_id: tenant.tenant_id,
551                name: tenant.name.clone(),
552                timestamp: tenant.created_at,
553            },
554        );
555        reducer
556            .apply(&event)
557            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
558        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
559        sequence = sequence.saturating_add(1);
560    }
561
562    for ra in &snapshot.role_assignments {
563        let event = WalEvent::new(
564            sequence,
565            WalEventType::RoleAssigned {
566                actor_id: ra.actor_id,
567                role: ra.role.clone(),
568                tenant_id: ra.tenant_id,
569                timestamp: ra.assigned_at,
570            },
571        );
572        reducer
573            .apply(&event)
574            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
575        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
576        sequence = sequence.saturating_add(1);
577    }
578
579    for cg in &snapshot.capability_grants {
580        let event = WalEvent::new(
581            sequence,
582            WalEventType::CapabilityGranted {
583                actor_id: cg.actor_id,
584                capability: cg.capability.clone(),
585                tenant_id: cg.tenant_id,
586                timestamp: cg.granted_at,
587            },
588        );
589        reducer
590            .apply(&event)
591            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
592        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
593        sequence = sequence.saturating_add(1);
594
595        if let Some(revoked_at) = cg.revoked_at {
596            let event = WalEvent::new(
597                sequence,
598                WalEventType::CapabilityRevoked {
599                    actor_id: cg.actor_id,
600                    capability: cg.capability.clone(),
601                    tenant_id: cg.tenant_id,
602                    timestamp: revoked_at,
603                },
604            );
605            reducer
606                .apply(&event)
607                .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
608            snapshot_events_applied = snapshot_events_applied.saturating_add(1);
609            sequence = sequence.saturating_add(1);
610        }
611    }
612
613    for le in &snapshot.ledger_entries {
614        let event = WalEvent::new(
615            sequence,
616            WalEventType::LedgerEntryAppended {
617                entry_id: le.entry_id,
618                tenant_id: le.tenant_id,
619                ledger_key: le.ledger_key.clone(),
620                actor_id: le.actor_id,
621                payload: le.payload.clone(),
622                timestamp: le.timestamp,
623            },
624        );
625        reducer
626            .apply(&event)
627            .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
628        snapshot_events_applied = snapshot_events_applied.saturating_add(1);
629        sequence = sequence.saturating_add(1);
630    }
631
632    // Note: `sequence` counts synthesized events (tasks, deps, engine, runs, state
633    // transitions), which is intentionally less than `snapshot.metadata.wal_sequence`
634    // because attempt events (AttemptStart, AttemptFinished) and lease events
635    // (LeaseAcquire, LeaseRelease, LeaseExpire, LeaseHeartbeat) are seeded directly
636    // via dedicated bootstrap methods rather than synthesized as WAL events.
637    // The snapshot's structural integrity is validated by `validate_snapshot()` before
638    // we reach this point.
639    let _ = sequence; // consumed by synthesis loop
640
641    reducer.set_latest_sequence_for_bootstrap(snapshot.metadata.wal_sequence);
642
643    Ok((reducer, snapshot_events_applied))
644}
645
646#[cfg(test)]
647mod tests {
648    use std::fs;
649    use std::io::Write;
650    use std::path::Path;
651    use std::sync::atomic::{AtomicUsize, Ordering};
652
653    use actionqueue_core::ids::{AttemptId, RunId, TaskId};
654    use actionqueue_core::run::state::RunState;
655    use actionqueue_core::task::constraints::TaskConstraints;
656    use actionqueue_core::task::metadata::TaskMetadata;
657    use actionqueue_core::task::run_policy::RunPolicy;
658    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
659
660    use super::*;
661    use crate::snapshot::mapping::SNAPSHOT_SCHEMA_VERSION;
662    use crate::snapshot::model::{
663        SnapshotAttemptHistoryEntry, SnapshotEngineControl, SnapshotLeaseMetadata,
664        SnapshotMetadata, SnapshotRun, SnapshotRunStateHistoryEntry, SnapshotTask,
665    };
666    use crate::snapshot::writer::{SnapshotFsWriter, SnapshotWriter};
667    use crate::wal::codec;
668
669    static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
670
671    fn temp_data_root() -> PathBuf {
672        let dir = std::env::temp_dir();
673        let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
674        let path = dir.join(format!(
675            "actionqueue_recovery_bootstrap_test_{}_{}",
676            std::process::id(),
677            count
678        ));
679        let _ = fs::remove_dir_all(&path);
680        path
681    }
682
683    /// Test helper: builds a SnapshotTask with standard test defaults.
684    /// When new fields are added to SnapshotTask, only this function needs updating.
685    fn test_snapshot_task(task_spec: TaskSpec) -> SnapshotTask {
686        SnapshotTask { task_spec, created_at: 0, updated_at: None, canceled_at: None }
687    }
688
689    /// Test helper: builds a SnapshotAttemptHistoryEntry with standard defaults.
690    fn test_attempt_entry(
691        attempt_id: AttemptId,
692        started_at: u64,
693        finished_at: Option<u64>,
694    ) -> SnapshotAttemptHistoryEntry {
695        SnapshotAttemptHistoryEntry {
696            attempt_id,
697            started_at,
698            finished_at,
699            result: None,
700            error: None,
701            output: None,
702        }
703    }
704
705    fn create_task_spec(payload: &[u8]) -> TaskSpec {
706        TaskSpec::new(
707            TaskId::new(),
708            TaskPayload::with_content_type(payload.to_vec(), "application/octet-stream"),
709            RunPolicy::Once,
710            TaskConstraints::default(),
711            TaskMetadata::default(),
712        )
713        .expect("task spec should be valid")
714    }
715
716    fn create_snapshot(task: &TaskSpec, run_state: RunState, wal_sequence: u64) -> Snapshot {
717        let run_instance = RunInstance::new_scheduled_with_id(RunId::new(), task.id(), 10, 10)
718            .expect("scheduled run should be valid");
719
720        let run_instance = if run_state == RunState::Scheduled {
721            run_instance
722        } else {
723            let mut run_instance = run_instance;
724            run_instance.transition_to(run_state).expect("transition should be valid for test");
725            run_instance
726        };
727
728        let state_history = if run_state == RunState::Scheduled {
729            vec![SnapshotRunStateHistoryEntry {
730                from: None,
731                to: RunState::Scheduled,
732                timestamp: 10,
733            }]
734        } else {
735            vec![
736                SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
737                SnapshotRunStateHistoryEntry {
738                    from: Some(RunState::Scheduled),
739                    to: run_state,
740                    timestamp: 11,
741                },
742            ]
743        };
744
745        Snapshot {
746            version: 4,
747            timestamp: 1234,
748            metadata: SnapshotMetadata {
749                schema_version: SNAPSHOT_SCHEMA_VERSION,
750                wal_sequence,
751                task_count: 1,
752                run_count: 1,
753            },
754            tasks: vec![test_snapshot_task(task.clone())],
755            runs: vec![SnapshotRun {
756                run_instance,
757                state_history,
758                attempts: Vec::new(),
759                lease: None,
760            }],
761            engine: SnapshotEngineControl::default(),
762            dependency_declarations: Vec::new(),
763            budgets: Vec::new(),
764            subscriptions: Vec::new(),
765            actors: Vec::new(),
766            tenants: Vec::new(),
767            role_assignments: Vec::new(),
768            capability_grants: Vec::new(),
769            ledger_entries: Vec::new(),
770        }
771    }
772
773    fn write_snapshot(path: &Path, snapshot: &Snapshot) {
774        let mut writer = SnapshotFsWriter::new(path.to_path_buf())
775            .expect("snapshot writer should open for bootstrap test");
776        writer.write(snapshot).expect("snapshot write should succeed");
777        writer.flush().expect("snapshot flush should succeed");
778        writer.close().expect("snapshot close should succeed");
779    }
780
781    fn write_wal_events(path: &Path, events: &[WalEvent]) {
782        let mut file = std::fs::OpenOptions::new()
783            .create(true)
784            .append(true)
785            .open(path)
786            .expect("wal file open should succeed");
787        for event in events {
788            let bytes = codec::encode(event).expect("encode should succeed");
789            file.write_all(&bytes).expect("wal write should succeed");
790        }
791        file.sync_all().expect("wal sync should succeed");
792    }
793
794    #[test]
795    fn test_bootstrap_empty_storage() {
796        let data_root = temp_data_root();
797        let result = load_projection_from_storage(&data_root)
798            .expect("bootstrap should succeed for empty storage");
799
800        assert!(!result.snapshot_loaded);
801        assert_eq!(result.snapshot_sequence, 0);
802        assert_eq!(result.projection.task_count(), 0);
803        assert_eq!(result.projection.run_count(), 0);
804        assert_eq!(result.projection.latest_sequence(), 0);
805        assert_eq!(result.wal_append_telemetry.snapshot().append_success_total, 0);
806        assert_eq!(result.wal_append_telemetry.snapshot().append_failure_total, 0);
807        assert_eq!(result.recovery_observations.events_applied_total, 0);
808        assert_eq!(result.recovery_observations.snapshot_events_applied, 0);
809        assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
810        assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
811
812        let _ = fs::remove_dir_all(data_root);
813    }
814
815    #[test]
816    fn test_bootstrap_snapshot_only_with_exact_sequence() {
817        let data_root = temp_data_root();
818        let task = create_task_spec(&[1, 2, 3]);
819        let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
820
821        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
822        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
823            .expect("snapshot dir should create");
824        write_snapshot(&snapshot_path, &snapshot);
825
826        let result = load_projection_from_storage(&data_root)
827            .expect("bootstrap should succeed for snapshot-only storage");
828
829        assert!(result.snapshot_loaded);
830        assert_eq!(result.snapshot_sequence, 3);
831        assert_eq!(result.projection.task_count(), 1);
832        assert_eq!(result.projection.run_count(), 1);
833        assert_eq!(result.projection.latest_sequence(), 3);
834        assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
835        assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
836        assert_eq!(result.recovery_observations.events_applied_total, 2);
837        assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
838
839        let _ = fs::remove_dir_all(data_root);
840    }
841
842    #[test]
843    fn test_bootstrap_snapshot_with_wal_tail() {
844        let data_root = temp_data_root();
845        let task = create_task_spec(&[4, 5, 6]);
846        let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
847
848        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
849        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
850            .expect("snapshot dir should create");
851        write_snapshot(&snapshot_path, &snapshot);
852
853        let wal_path = data_root.join("wal").join("actionqueue.wal");
854        std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
855            .expect("wal dir should create");
856
857        let extra_task = create_task_spec(&[7, 8, 9]);
858        let wal_events = vec![WalEvent::new(
859            4,
860            WalEventType::TaskCreated { task_spec: extra_task.clone(), timestamp: 0 },
861        )];
862        write_wal_events(&wal_path, &wal_events);
863
864        let result = load_projection_from_storage(&data_root)
865            .expect("bootstrap should succeed for snapshot + WAL tail");
866
867        assert!(result.snapshot_loaded);
868        assert_eq!(result.snapshot_sequence, 3);
869        assert_eq!(result.projection.task_count(), 2);
870        assert_eq!(result.projection.latest_sequence(), 4);
871        assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
872        assert_eq!(result.recovery_observations.wal_replay_events_applied, 1);
873        assert_eq!(result.recovery_observations.events_applied_total, 3);
874        assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
875
876        let _ = fs::remove_dir_all(data_root);
877    }
878
879    #[test]
880    fn test_bootstrap_snapshot_with_high_wal_sequence_succeeds() {
881        let data_root = temp_data_root();
882        let task = create_task_spec(&[10, 11, 12]);
883        // wal_sequence=10 is higher than synthesized event count (2 events for
884        // 1 task + 1 run), reflecting real snapshots where attempt/lease events
885        // in the WAL are seeded directly rather than synthesized.
886        let snapshot = create_snapshot(&task, RunState::Scheduled, 10);
887
888        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
889        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
890            .expect("snapshot dir should create");
891        write_snapshot(&snapshot_path, &snapshot);
892
893        let result = load_projection_from_storage(&data_root);
894        assert!(result.is_ok(), "snapshot with high wal_sequence should bootstrap successfully");
895        let recovery = result.expect("recovery should succeed");
896        assert!(recovery.snapshot_loaded, "snapshot should be loaded despite high wal_sequence");
897
898        let _ = fs::remove_dir_all(data_root);
899    }
900
901    #[test]
902    fn p6_011_t_p3_bootstrap_hydrates_task_canceled_projection_from_snapshot() {
903        let data_root = temp_data_root();
904        let task = create_task_spec(&[1, 9, 9]);
905        let canceled_at = 77;
906        let snapshot = Snapshot {
907            version: 4,
908            timestamp: 1234,
909            metadata: SnapshotMetadata {
910                schema_version: SNAPSHOT_SCHEMA_VERSION,
911                wal_sequence: 2,
912                task_count: 1,
913                run_count: 0,
914            },
915            tasks: vec![SnapshotTask {
916                canceled_at: Some(canceled_at),
917                ..test_snapshot_task(task.clone())
918            }],
919            runs: Vec::new(),
920            engine: SnapshotEngineControl::default(),
921            dependency_declarations: Vec::new(),
922            budgets: Vec::new(),
923            subscriptions: Vec::new(),
924            actors: Vec::new(),
925            tenants: Vec::new(),
926            role_assignments: Vec::new(),
927            capability_grants: Vec::new(),
928            ledger_entries: Vec::new(),
929        };
930
931        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
932        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
933            .expect("snapshot dir should create");
934        write_snapshot(&snapshot_path, &snapshot);
935
936        let result = load_projection_from_storage(&data_root)
937            .expect("bootstrap should succeed for canceled task");
938
939        assert!(result.snapshot_loaded);
940        assert_eq!(result.snapshot_sequence, 2);
941        assert!(result.projection.is_task_canceled(task.id()));
942        assert_eq!(result.projection.task_canceled_at(task.id()), Some(canceled_at));
943        assert_eq!(result.projection.latest_sequence(), 2);
944        assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
945        assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
946        assert_eq!(result.recovery_observations.events_applied_total, 2);
947
948        let _ = fs::remove_dir_all(data_root);
949    }
950
951    #[test]
952    fn p6_013_t_p3_bootstrap_hydrates_engine_paused_projection_from_snapshot() {
953        let data_root = temp_data_root();
954        let task = create_task_spec(&[2, 2, 2]);
955        let paused_at = 120;
956        let snapshot = Snapshot {
957            version: 4,
958            timestamp: 1234,
959            metadata: SnapshotMetadata {
960                schema_version: SNAPSHOT_SCHEMA_VERSION,
961                wal_sequence: 2,
962                task_count: 1,
963                run_count: 0,
964            },
965            tasks: vec![test_snapshot_task(task)],
966            runs: Vec::new(),
967            engine: SnapshotEngineControl {
968                paused: true,
969                paused_at: Some(paused_at),
970                resumed_at: None,
971            },
972            dependency_declarations: Vec::new(),
973            budgets: Vec::new(),
974            subscriptions: Vec::new(),
975            actors: Vec::new(),
976            tenants: Vec::new(),
977            role_assignments: Vec::new(),
978            capability_grants: Vec::new(),
979            ledger_entries: Vec::new(),
980        };
981
982        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
983        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
984            .expect("snapshot dir should create");
985        write_snapshot(&snapshot_path, &snapshot);
986
987        let result = load_projection_from_storage(&data_root)
988            .expect("bootstrap should succeed for engine paused snapshot");
989
990        assert!(result.snapshot_loaded);
991        assert_eq!(result.snapshot_sequence, 2);
992        assert!(result.projection.is_engine_paused());
993        assert_eq!(result.projection.engine_paused_at(), Some(paused_at));
994        assert_eq!(result.projection.engine_resumed_at(), None);
995        assert_eq!(result.projection.latest_sequence(), 2);
996        assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
997        assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
998        assert_eq!(result.recovery_observations.events_applied_total, 2);
999
1000        let _ = fs::remove_dir_all(data_root);
1001    }
1002
1003    #[test]
1004    fn p6_013_t_p4_bootstrap_hydrates_engine_resumed_projection_from_snapshot() {
1005        let data_root = temp_data_root();
1006        let task = create_task_spec(&[2, 2, 3]);
1007        let paused_at = 120;
1008        let resumed_at = 180;
1009        let snapshot = Snapshot {
1010            version: 4,
1011            timestamp: 1234,
1012            metadata: SnapshotMetadata {
1013                schema_version: SNAPSHOT_SCHEMA_VERSION,
1014                wal_sequence: 3,
1015                task_count: 1,
1016                run_count: 0,
1017            },
1018            tasks: vec![test_snapshot_task(task)],
1019            runs: Vec::new(),
1020            engine: SnapshotEngineControl {
1021                paused: false,
1022                paused_at: Some(paused_at),
1023                resumed_at: Some(resumed_at),
1024            },
1025            dependency_declarations: Vec::new(),
1026            budgets: Vec::new(),
1027            subscriptions: Vec::new(),
1028            actors: Vec::new(),
1029            tenants: Vec::new(),
1030            role_assignments: Vec::new(),
1031            capability_grants: Vec::new(),
1032            ledger_entries: Vec::new(),
1033        };
1034
1035        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1036        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1037            .expect("snapshot dir should create");
1038        write_snapshot(&snapshot_path, &snapshot);
1039
1040        let result = load_projection_from_storage(&data_root)
1041            .expect("bootstrap should succeed for engine resumed snapshot");
1042
1043        assert!(result.snapshot_loaded);
1044        assert_eq!(result.snapshot_sequence, 3);
1045        assert!(!result.projection.is_engine_paused());
1046        assert_eq!(result.projection.engine_paused_at(), Some(paused_at));
1047        assert_eq!(result.projection.engine_resumed_at(), Some(resumed_at));
1048        assert_eq!(result.projection.latest_sequence(), 3);
1049        assert_eq!(result.recovery_observations.snapshot_events_applied, 3);
1050        assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
1051        assert_eq!(result.recovery_observations.events_applied_total, 3);
1052
1053        let _ = fs::remove_dir_all(data_root);
1054    }
1055
1056    #[test]
1057    fn test_bootstrap_wal_only_reports_wal_replay_events() {
1058        let data_root = temp_data_root();
1059        let wal_path = data_root.join("wal").join("actionqueue.wal");
1060        std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1061            .expect("wal dir should create");
1062
1063        let task = create_task_spec(&[7, 7, 7]);
1064        let wal_events =
1065            vec![WalEvent::new(1, WalEventType::TaskCreated { task_spec: task, timestamp: 0 })];
1066        write_wal_events(&wal_path, &wal_events);
1067
1068        let result = load_projection_from_storage(&data_root)
1069            .expect("bootstrap should succeed for WAL-only storage");
1070
1071        assert!(!result.snapshot_loaded);
1072        assert_eq!(result.recovery_observations.snapshot_events_applied, 0);
1073        assert_eq!(result.recovery_observations.wal_replay_events_applied, 1);
1074        assert_eq!(result.recovery_observations.events_applied_total, 1);
1075        assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
1076
1077        let _ = fs::remove_dir_all(data_root);
1078    }
1079
1080    #[test]
1081    fn test_snapshot_sequence_exceeds_wal_max_sequence() {
1082        let data_root = temp_data_root();
1083        let task = create_task_spec(&[1, 2, 3]);
1084        // wal_sequence=3 is valid for a snapshot with 1 task + 1 run (2 bootstrap events)
1085        // but WAL max will be only 1
1086        let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
1087
1088        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1089        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1090            .expect("snapshot dir should create");
1091        write_snapshot(&snapshot_path, &snapshot);
1092
1093        // Create WAL with max sequence = 1 (less than snapshot's 3)
1094        let wal_path = data_root.join("wal").join("actionqueue.wal");
1095        std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1096            .expect("wal dir should create");
1097        let wal_task = create_task_spec(&[4, 5, 6]);
1098        let wal_events =
1099            vec![WalEvent::new(1, WalEventType::TaskCreated { task_spec: wal_task, timestamp: 0 })];
1100        write_wal_events(&wal_path, &wal_events);
1101
1102        let result = load_projection_from_storage(&data_root);
1103        assert!(matches!(
1104            result,
1105            Err(RecoveryBootstrapError::SnapshotBootstrap(msg))
1106                if msg.contains("snapshot sequence 3 exceeds WAL max sequence 1")
1107        ));
1108
1109        let _ = fs::remove_dir_all(data_root);
1110    }
1111
1112    #[test]
1113    fn test_snapshot_sequence_within_wal_max_succeeds() {
1114        let data_root = temp_data_root();
1115        let task = create_task_spec(&[1, 2, 3]);
1116        let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
1117
1118        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1119        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1120            .expect("snapshot dir should create");
1121        write_snapshot(&snapshot_path, &snapshot);
1122
1123        // Create WAL with max sequence = 4 (greater than snapshot's 3)
1124        let wal_path = data_root.join("wal").join("actionqueue.wal");
1125        std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1126            .expect("wal dir should create");
1127        let extra_task = create_task_spec(&[7, 8, 9]);
1128        let wal_events = vec![WalEvent::new(
1129            4,
1130            WalEventType::TaskCreated { task_spec: extra_task, timestamp: 0 },
1131        )];
1132        write_wal_events(&wal_path, &wal_events);
1133
1134        let result = load_projection_from_storage(&data_root)
1135            .expect("bootstrap should succeed when snapshot sequence <= WAL max");
1136
1137        assert!(result.snapshot_loaded);
1138        assert_eq!(result.projection.task_count(), 2);
1139
1140        let _ = fs::remove_dir_all(data_root);
1141    }
1142
1143    #[test]
1144    fn test_bootstrap_snapshot_with_active_lease() {
1145        let data_root = temp_data_root();
1146        let task = create_task_spec(&[20, 21, 22]);
1147        let run_id = RunId::new();
1148
1149        // Build a Leased run with active lease data in the snapshot
1150        let mut run_instance = RunInstance::new_scheduled_with_id(run_id, task.id(), 10, 10)
1151            .expect("scheduled run should be valid");
1152        run_instance.transition_to(RunState::Ready).expect("transition to Ready");
1153        run_instance.transition_to(RunState::Leased).expect("transition to Leased");
1154
1155        let state_history = vec![
1156            SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
1157            SnapshotRunStateHistoryEntry {
1158                from: Some(RunState::Scheduled),
1159                to: RunState::Ready,
1160                timestamp: 11,
1161            },
1162            SnapshotRunStateHistoryEntry {
1163                from: Some(RunState::Ready),
1164                to: RunState::Leased,
1165                timestamp: 12,
1166            },
1167        ];
1168
1169        let lease = Some(SnapshotLeaseMetadata {
1170            owner: "worker-1".to_string(),
1171            expiry: 1000,
1172            acquired_at: 12,
1173            updated_at: 12,
1174        });
1175
1176        let snapshot = Snapshot {
1177            version: 4,
1178            timestamp: 1234,
1179            metadata: SnapshotMetadata {
1180                schema_version: SNAPSHOT_SCHEMA_VERSION,
1181                wal_sequence: 5,
1182                task_count: 1,
1183                run_count: 1,
1184            },
1185            tasks: vec![test_snapshot_task(task.clone())],
1186            runs: vec![SnapshotRun { run_instance, state_history, attempts: Vec::new(), lease }],
1187            engine: SnapshotEngineControl::default(),
1188            dependency_declarations: Vec::new(),
1189            budgets: Vec::new(),
1190            subscriptions: Vec::new(),
1191            actors: Vec::new(),
1192            tenants: Vec::new(),
1193            role_assignments: Vec::new(),
1194            capability_grants: Vec::new(),
1195            ledger_entries: Vec::new(),
1196        };
1197
1198        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1199        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1200            .expect("snapshot dir should create");
1201        write_snapshot(&snapshot_path, &snapshot);
1202
1203        let result = load_projection_from_storage(&data_root)
1204            .expect("bootstrap should succeed for snapshot with active lease");
1205
1206        assert!(result.snapshot_loaded);
1207        let lease = result.projection.get_lease(&run_id);
1208        assert!(lease.is_some(), "get_lease() should return active lease after bootstrap");
1209        let (owner, expiry) = lease.unwrap();
1210        assert_eq!(owner, "worker-1");
1211        assert_eq!(*expiry, 1000);
1212
1213        let metadata = result.projection.get_lease_metadata(&run_id);
1214        assert!(metadata.is_some(), "lease metadata should be present");
1215        assert_eq!(metadata.unwrap().owner(), "worker-1");
1216        assert_eq!(metadata.unwrap().acquired_at(), 12);
1217
1218        let _ = fs::remove_dir_all(data_root);
1219    }
1220
1221    #[test]
1222    fn test_bootstrap_snapshot_with_running_attempt() {
1223        let data_root = temp_data_root();
1224        let task = create_task_spec(&[30, 31, 32]);
1225        let run_id = RunId::new();
1226        let attempt_id = AttemptId::new();
1227
1228        // Build a Running run with attempt history.
1229        // We must call start_attempt() so the RunInstance's attempt_count
1230        // matches the snapshot attempt history (validation checks parity).
1231        let mut run_instance = RunInstance::new_scheduled_with_id(run_id, task.id(), 10, 10)
1232            .expect("scheduled run should be valid");
1233        run_instance.transition_to(RunState::Ready).expect("transition to Ready");
1234        run_instance.transition_to(RunState::Leased).expect("transition to Leased");
1235        run_instance.transition_to(RunState::Running).expect("transition to Running");
1236        run_instance.start_attempt(attempt_id).expect("start attempt");
1237
1238        let state_history = vec![
1239            SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
1240            SnapshotRunStateHistoryEntry {
1241                from: Some(RunState::Scheduled),
1242                to: RunState::Ready,
1243                timestamp: 11,
1244            },
1245            SnapshotRunStateHistoryEntry {
1246                from: Some(RunState::Ready),
1247                to: RunState::Leased,
1248                timestamp: 12,
1249            },
1250            SnapshotRunStateHistoryEntry {
1251                from: Some(RunState::Leased),
1252                to: RunState::Running,
1253                timestamp: 13,
1254            },
1255        ];
1256
1257        let attempts = vec![test_attempt_entry(attempt_id, 13, None)];
1258
1259        let snapshot = Snapshot {
1260            version: 4,
1261            timestamp: 1234,
1262            metadata: SnapshotMetadata {
1263                schema_version: SNAPSHOT_SCHEMA_VERSION,
1264                wal_sequence: 6,
1265                task_count: 1,
1266                run_count: 1,
1267            },
1268            tasks: vec![test_snapshot_task(task.clone())],
1269            runs: vec![SnapshotRun { run_instance, state_history, attempts, lease: None }],
1270            engine: SnapshotEngineControl::default(),
1271            dependency_declarations: Vec::new(),
1272            budgets: Vec::new(),
1273            subscriptions: Vec::new(),
1274            actors: Vec::new(),
1275            tenants: Vec::new(),
1276            role_assignments: Vec::new(),
1277            capability_grants: Vec::new(),
1278            ledger_entries: Vec::new(),
1279        };
1280
1281        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1282        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1283            .expect("snapshot dir should create");
1284        write_snapshot(&snapshot_path, &snapshot);
1285
1286        let result = load_projection_from_storage(&data_root)
1287            .expect("bootstrap should succeed for snapshot with running attempt");
1288
1289        assert!(result.snapshot_loaded);
1290        let run = result.projection.get_run_instance(&run_id);
1291        assert!(run.is_some(), "run instance should exist after bootstrap");
1292        let run = run.unwrap();
1293        assert_eq!(run.attempt_count(), 1);
1294        assert_eq!(run.current_attempt_id(), Some(attempt_id));
1295        assert_eq!(run.state(), RunState::Running);
1296
1297        let _ = fs::remove_dir_all(data_root);
1298    }
1299
1300    #[test]
1301    fn test_bootstrap_snapshot_wal_tail_lease_heartbeat() {
1302        let data_root = temp_data_root();
1303        let task = create_task_spec(&[40, 41, 42]);
1304        let run_id = RunId::new();
1305
1306        // Build a Leased run with active lease data
1307        let mut run_instance = RunInstance::new_scheduled_with_id(run_id, task.id(), 10, 10)
1308            .expect("scheduled run should be valid");
1309        run_instance.transition_to(RunState::Ready).expect("transition to Ready");
1310        run_instance.transition_to(RunState::Leased).expect("transition to Leased");
1311
1312        let state_history = vec![
1313            SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
1314            SnapshotRunStateHistoryEntry {
1315                from: Some(RunState::Scheduled),
1316                to: RunState::Ready,
1317                timestamp: 11,
1318            },
1319            SnapshotRunStateHistoryEntry {
1320                from: Some(RunState::Ready),
1321                to: RunState::Leased,
1322                timestamp: 12,
1323            },
1324        ];
1325
1326        let lease = Some(SnapshotLeaseMetadata {
1327            owner: "worker-1".to_string(),
1328            expiry: 1000,
1329            acquired_at: 12,
1330            updated_at: 12,
1331        });
1332
1333        let snapshot = Snapshot {
1334            version: 4,
1335            timestamp: 1234,
1336            metadata: SnapshotMetadata {
1337                schema_version: SNAPSHOT_SCHEMA_VERSION,
1338                wal_sequence: 5,
1339                task_count: 1,
1340                run_count: 1,
1341            },
1342            tasks: vec![test_snapshot_task(task.clone())],
1343            runs: vec![SnapshotRun { run_instance, state_history, attempts: Vec::new(), lease }],
1344            engine: SnapshotEngineControl::default(),
1345            dependency_declarations: Vec::new(),
1346            budgets: Vec::new(),
1347            subscriptions: Vec::new(),
1348            actors: Vec::new(),
1349            tenants: Vec::new(),
1350            role_assignments: Vec::new(),
1351            capability_grants: Vec::new(),
1352            ledger_entries: Vec::new(),
1353        };
1354
1355        let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1356        std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1357            .expect("snapshot dir should create");
1358        write_snapshot(&snapshot_path, &snapshot);
1359
1360        // Write a WAL tail with a lease heartbeat event (sequence 6)
1361        let wal_path = data_root.join("wal").join("actionqueue.wal");
1362        std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1363            .expect("wal dir should create");
1364        let wal_events = vec![WalEvent::new(
1365            6,
1366            WalEventType::LeaseHeartbeat {
1367                run_id,
1368                owner: "worker-1".to_string(),
1369                expiry: 2000,
1370                timestamp: 50,
1371            },
1372        )];
1373        write_wal_events(&wal_path, &wal_events);
1374
1375        let result = load_projection_from_storage(&data_root)
1376            .expect("bootstrap with snapshot lease + WAL heartbeat should succeed");
1377
1378        assert!(result.snapshot_loaded);
1379        let lease = result.projection.get_lease(&run_id);
1380        assert!(lease.is_some(), "lease should be present after heartbeat replay");
1381        let (owner, expiry) = lease.unwrap();
1382        assert_eq!(owner, "worker-1");
1383        assert_eq!(*expiry, 2000, "expiry should be updated by heartbeat");
1384
1385        let _ = fs::remove_dir_all(data_root);
1386    }
1387}