1use std::path::PathBuf;
10use std::time::Instant;
11
12use actionqueue_core::run::state::RunState;
13use actionqueue_core::run::transitions::is_valid_transition;
14use actionqueue_core::run::RunInstance;
15
16use crate::recovery::reducer::ReplayReducer;
17use crate::recovery::replay::ReplayDriver;
18use crate::snapshot::loader::{SnapshotFsLoader, SnapshotLoader};
19use crate::snapshot::mapping::{
20 map_snapshot_attempt_history, map_snapshot_lease_metadata, map_snapshot_run_history,
21};
22use crate::snapshot::model::Snapshot;
23use crate::wal::event::{WalEvent, WalEventType};
24use crate::wal::fs_reader::WalFsReader;
25use crate::wal::fs_writer::WalFsWriter;
26use crate::wal::reader::{WalReader, WalReaderError};
27use crate::wal::{InstrumentedWalWriter, WalAppendTelemetry};
28
29#[derive(Debug, Clone, Copy, PartialEq)]
31#[must_use]
32pub struct RecoveryObservations {
33 pub recovery_duration_seconds: f64,
35 pub events_applied_total: u64,
37 pub snapshot_events_applied: u64,
39 pub wal_replay_events_applied: u64,
41}
42
43impl RecoveryObservations {
44 pub const fn zero() -> Self {
46 Self {
47 recovery_duration_seconds: 0.0,
48 events_applied_total: 0,
49 snapshot_events_applied: 0,
50 wal_replay_events_applied: 0,
51 }
52 }
53}
54
55#[must_use]
57pub struct RecoveryBootstrap {
58 pub projection: ReplayReducer,
60 pub wal_writer: InstrumentedWalWriter<WalFsWriter>,
62 pub wal_append_telemetry: WalAppendTelemetry,
64 pub wal_path: PathBuf,
66 pub snapshot_path: PathBuf,
68 pub snapshot_loaded: bool,
70 pub snapshot_sequence: u64,
72 pub recovery_observations: RecoveryObservations,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum RecoveryBootstrapError {
79 WalInit(String),
81 WalRead(String),
83 SnapshotLoad(String),
85 WalReplay(String),
87 SnapshotBootstrap(String),
89}
90
91impl std::fmt::Display for RecoveryBootstrapError {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 match self {
94 RecoveryBootstrapError::WalInit(msg) => write!(f, "WAL init error: {msg}"),
95 RecoveryBootstrapError::WalRead(msg) => write!(f, "WAL read error: {msg}"),
96 RecoveryBootstrapError::SnapshotLoad(msg) => write!(f, "snapshot load error: {msg}"),
97 RecoveryBootstrapError::WalReplay(msg) => write!(f, "WAL replay error: {msg}"),
98 RecoveryBootstrapError::SnapshotBootstrap(msg) => {
99 write!(f, "snapshot bootstrap error: {msg}")
100 }
101 }
102 }
103}
104
105impl std::error::Error for RecoveryBootstrapError {}
106
107pub fn load_projection_from_storage(
117 data_root: &std::path::Path,
118) -> Result<RecoveryBootstrap, RecoveryBootstrapError> {
119 let recovery_started_at = Instant::now();
120 tracing::info!(data_dir = %data_root.display(), "storage bootstrap started");
121
122 let wal_dir = data_root.join("wal");
123 let snapshot_dir = data_root.join("snapshots");
124 let wal_path = wal_dir.join("actionqueue.wal");
125 let snapshot_path = snapshot_dir.join("snapshot.bin");
126
127 std::fs::create_dir_all(&wal_dir)
128 .and_then(|_| std::fs::create_dir_all(&snapshot_dir))
129 .map_err(|err| RecoveryBootstrapError::WalInit(err.to_string()))?;
130
131 let wal_writer = WalFsWriter::new(wal_path.clone())
132 .map_err(|err| RecoveryBootstrapError::WalInit(err.to_string()))?;
133 let wal_append_telemetry = WalAppendTelemetry::new();
134 let wal_writer = InstrumentedWalWriter::new(wal_writer, wal_append_telemetry.clone());
135
136 let wal_reader = WalFsReader::new(wal_path.clone())
137 .map_err(|err| RecoveryBootstrapError::WalRead(err.to_string()))?;
138
139 let (snapshot_loaded, snapshot_sequence, reducer, snapshot_events_applied) =
140 match SnapshotFsLoader::new(snapshot_path.clone()).load() {
141 Ok(None) => (false, 0, ReplayReducer::new(), 0),
142 Ok(Some(snapshot)) => {
143 let sequence = snapshot.metadata.wal_sequence;
144 let task_count = snapshot.metadata.task_count;
145 tracing::info!(wal_sequence = sequence, task_count, "snapshot loaded successfully");
146 match bootstrap_reducer_from_snapshot(&snapshot) {
147 Ok((reducer, snapshot_events_applied)) => {
148 (true, sequence, reducer, snapshot_events_applied)
149 }
150 Err(err) => {
151 tracing::warn!(
152 error = %err,
153 "snapshot bootstrap failed, falling back to WAL-only replay"
154 );
155 (false, 0, ReplayReducer::new(), 0)
156 }
157 }
158 }
159 Err(err) => {
160 tracing::warn!(
163 error = %err,
164 "snapshot load failed, falling back to WAL-only replay"
165 );
166 (false, 0, ReplayReducer::new(), 0)
167 }
168 };
169
170 let wal_max_sequence = wal_writer.inner().current_sequence();
172 if snapshot_loaded && snapshot_sequence > wal_max_sequence {
173 if wal_max_sequence == 0 {
174 tracing::info!(
175 snapshot_sequence,
176 "bootstrapping from snapshot with empty WAL — WAL events prior to snapshot are \
177 not recoverable"
178 );
179 } else {
180 return Err(RecoveryBootstrapError::SnapshotBootstrap(format!(
181 "snapshot sequence {snapshot_sequence} exceeds WAL max sequence {wal_max_sequence}"
182 )));
183 }
184 }
185
186 let mut wal_reader = wal_reader;
187
188 if snapshot_sequence > 0 {
189 match wal_reader.seek_to_sequence(snapshot_sequence + 1) {
190 Ok(()) => {}
191 Err(WalReaderError::EndOfWal) => {
192 let recovery_duration_seconds = recovery_started_at.elapsed().as_secs_f64();
193 let wal_replay_events_applied = 0;
194 let events_applied_total =
195 snapshot_events_applied.saturating_add(wal_replay_events_applied);
196 return Ok(RecoveryBootstrap {
197 projection: reducer,
198 wal_writer,
199 wal_append_telemetry,
200 wal_path,
201 snapshot_path,
202 snapshot_loaded,
203 snapshot_sequence,
204 recovery_observations: RecoveryObservations {
205 recovery_duration_seconds,
206 events_applied_total,
207 snapshot_events_applied,
208 wal_replay_events_applied,
209 },
210 });
211 }
212 Err(err) => return Err(RecoveryBootstrapError::WalRead(err.to_string())),
213 }
214 }
215
216 let mut driver = ReplayDriver::new(wal_reader, reducer);
217 let wal_replay_events_applied = driver
218 .run_with_applied_count()
219 .map_err(|err| RecoveryBootstrapError::WalReplay(err.to_string()))?;
220 tracing::info!(event_count = wal_replay_events_applied, "WAL replay complete");
221 let reducer = driver.into_reducer();
222 let recovery_duration_seconds = recovery_started_at.elapsed().as_secs_f64();
223 let events_applied_total = snapshot_events_applied.saturating_add(wal_replay_events_applied);
224
225 tracing::info!(
226 events_applied_total,
227 snapshot_loaded,
228 snapshot_events_applied,
229 wal_replay_events_applied,
230 recovery_duration_seconds,
231 "recovery bootstrap complete"
232 );
233
234 Ok(RecoveryBootstrap {
235 projection: reducer,
236 wal_writer,
237 wal_append_telemetry,
238 wal_path,
239 snapshot_path,
240 snapshot_loaded,
241 snapshot_sequence,
242 recovery_observations: RecoveryObservations {
243 recovery_duration_seconds,
244 events_applied_total,
245 snapshot_events_applied,
246 wal_replay_events_applied,
247 },
248 })
249}
250
251fn bootstrap_reducer_from_snapshot(
252 snapshot: &Snapshot,
253) -> Result<(ReplayReducer, u64), RecoveryBootstrapError> {
254 let mut reducer = ReplayReducer::new();
255 let mut sequence = 1u64;
256 let mut snapshot_events_applied = 0u64;
257
258 for task in &snapshot.tasks {
259 let event = WalEvent::new(
260 sequence,
261 WalEventType::TaskCreated {
262 task_spec: task.task_spec.clone(),
263 timestamp: task.created_at,
264 },
265 );
266 reducer
267 .apply(&event)
268 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
269 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
270 sequence = sequence.saturating_add(1);
271
272 if let Some(canceled_at) = task.canceled_at {
273 let event = WalEvent::new(
274 sequence,
275 WalEventType::TaskCanceled { task_id: task.task_spec.id(), timestamp: canceled_at },
276 );
277 reducer
278 .apply(&event)
279 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
280 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
281 sequence = sequence.saturating_add(1);
282 }
283 }
284
285 for decl in &snapshot.dependency_declarations {
287 let event = WalEvent::new(
288 sequence,
289 WalEventType::DependencyDeclared {
290 task_id: decl.task_id,
291 depends_on: decl.depends_on.clone(),
292 timestamp: decl.declared_at,
293 },
294 );
295 reducer
296 .apply(&event)
297 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
298 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
299 sequence = sequence.saturating_add(1);
300 }
301
302 if let Some(paused_at) = snapshot.engine.paused_at {
303 let event = WalEvent::new(sequence, WalEventType::EnginePaused { timestamp: paused_at });
304 reducer
305 .apply(&event)
306 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
307 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
308 sequence = sequence.saturating_add(1);
309 }
310
311 if let Some(resumed_at) = snapshot.engine.resumed_at {
312 let event = WalEvent::new(sequence, WalEventType::EngineResumed { timestamp: resumed_at });
313 reducer
314 .apply(&event)
315 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
316 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
317 sequence = sequence.saturating_add(1);
318 }
319
320 for run in &snapshot.runs {
321 let run_instance = run.run_instance.clone();
322 let scheduled_run = RunInstance::new_scheduled_with_id(
323 run_instance.id(),
324 run_instance.task_id(),
325 run_instance.scheduled_at(),
326 run_instance.created_at(),
327 )
328 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
329
330 let event =
331 WalEvent::new(sequence, WalEventType::RunCreated { run_instance: scheduled_run });
332 reducer
333 .apply(&event)
334 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
335 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
336 sequence = sequence.saturating_add(1);
337
338 let snapshot_state = run_instance.state();
339 let mut previous_state = RunState::Scheduled;
340
341 for entry in run.state_history.iter().skip(1) {
342 if entry.from != Some(previous_state) {
343 return Err(RecoveryBootstrapError::SnapshotBootstrap(
344 "invalid snapshot state history for bootstrap".to_string(),
345 ));
346 }
347 if !is_valid_transition(previous_state, entry.to) {
348 return Err(RecoveryBootstrapError::SnapshotBootstrap(
349 "invalid snapshot state transition for bootstrap".to_string(),
350 ));
351 }
352
353 let event = WalEvent::new(
354 sequence,
355 WalEventType::RunStateChanged {
356 run_id: run_instance.id(),
357 previous_state,
358 new_state: entry.to,
359 timestamp: entry.timestamp,
360 },
361 );
362 reducer
363 .apply(&event)
364 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
365 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
366 sequence = sequence.saturating_add(1);
367 previous_state = entry.to;
368 }
369
370 if previous_state != snapshot_state {
371 return Err(RecoveryBootstrapError::SnapshotBootstrap(
372 "snapshot state history does not match run state".to_string(),
373 ));
374 }
375
376 let attempt_count = run.attempts.len() as u32;
380 let active_attempt = if snapshot_state == RunState::Running {
381 run.attempts.last().and_then(|a| {
384 if a.finished_at.is_none() {
385 Some(a.attempt_id)
386 } else {
387 None
388 }
389 })
390 } else {
391 None
392 };
393 if let Some(run_inst) = reducer.get_run_instance_mut(run_instance.id()) {
394 run_inst.restore_attempt_state_for_bootstrap(attempt_count, active_attempt);
395 }
396
397 reducer.set_run_history(
398 run_instance.id(),
399 map_snapshot_run_history(run.state_history.clone()),
400 );
401 reducer.set_attempt_history(
402 run_instance.id(),
403 map_snapshot_attempt_history(run.attempts.clone()),
404 );
405 let lease_metadata = map_snapshot_lease_metadata(run.lease.clone());
406 if let Some(metadata) = lease_metadata {
407 reducer.set_lease_for_bootstrap(run_instance.id(), metadata);
408 }
409 }
410
411 for budget in &snapshot.budgets {
413 let event = WalEvent::new(
414 sequence,
415 WalEventType::BudgetAllocated {
416 task_id: budget.task_id,
417 dimension: budget.dimension,
418 limit: budget.limit,
419 timestamp: budget.allocated_at,
420 },
421 );
422 reducer
423 .apply(&event)
424 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
425 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
426 sequence = sequence.saturating_add(1);
427
428 if budget.consumed > 0 {
429 let event = WalEvent::new(
430 sequence,
431 WalEventType::BudgetConsumed {
432 task_id: budget.task_id,
433 dimension: budget.dimension,
434 amount: budget.consumed,
435 timestamp: budget.allocated_at,
436 },
437 );
438 reducer
439 .apply(&event)
440 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
441 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
442 sequence = sequence.saturating_add(1);
443 }
444
445 if budget.exhausted && budget.consumed < budget.limit {
446 let event = WalEvent::new(
447 sequence,
448 WalEventType::BudgetExhausted {
449 task_id: budget.task_id,
450 dimension: budget.dimension,
451 timestamp: budget.allocated_at,
452 },
453 );
454 reducer
455 .apply(&event)
456 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
457 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
458 sequence = sequence.saturating_add(1);
459 }
460 }
461
462 for sub in &snapshot.subscriptions {
464 let event = WalEvent::new(
465 sequence,
466 WalEventType::SubscriptionCreated {
467 subscription_id: sub.subscription_id,
468 task_id: sub.task_id,
469 filter: sub.filter.clone(),
470 timestamp: sub.created_at,
471 },
472 );
473 reducer
474 .apply(&event)
475 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
476 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
477 sequence = sequence.saturating_add(1);
478
479 if let Some(triggered_at) = sub.triggered_at {
480 let event = WalEvent::new(
481 sequence,
482 WalEventType::SubscriptionTriggered {
483 subscription_id: sub.subscription_id,
484 timestamp: triggered_at,
485 },
486 );
487 reducer
488 .apply(&event)
489 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
490 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
491 sequence = sequence.saturating_add(1);
492 }
493
494 if let Some(canceled_at) = sub.canceled_at {
495 let event = WalEvent::new(
496 sequence,
497 WalEventType::SubscriptionCanceled {
498 subscription_id: sub.subscription_id,
499 timestamp: canceled_at,
500 },
501 );
502 reducer
503 .apply(&event)
504 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
505 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
506 sequence = sequence.saturating_add(1);
507 }
508 }
509
510 for actor in &snapshot.actors {
512 let event = WalEvent::new(
513 sequence,
514 WalEventType::ActorRegistered {
515 actor_id: actor.actor_id,
516 identity: actor.identity.clone(),
517 capabilities: actor.capabilities.clone(),
518 department: actor.department.clone(),
519 heartbeat_interval_secs: actor.heartbeat_interval_secs,
520 tenant_id: actor.tenant_id,
521 timestamp: actor.registered_at,
522 },
523 );
524 reducer
525 .apply(&event)
526 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
527 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
528 sequence = sequence.saturating_add(1);
529
530 if let Some(deregistered_at) = actor.deregistered_at {
531 let event = WalEvent::new(
532 sequence,
533 WalEventType::ActorDeregistered {
534 actor_id: actor.actor_id,
535 timestamp: deregistered_at,
536 },
537 );
538 reducer
539 .apply(&event)
540 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
541 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
542 sequence = sequence.saturating_add(1);
543 }
544 }
545
546 for tenant in &snapshot.tenants {
547 let event = WalEvent::new(
548 sequence,
549 WalEventType::TenantCreated {
550 tenant_id: tenant.tenant_id,
551 name: tenant.name.clone(),
552 timestamp: tenant.created_at,
553 },
554 );
555 reducer
556 .apply(&event)
557 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
558 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
559 sequence = sequence.saturating_add(1);
560 }
561
562 for ra in &snapshot.role_assignments {
563 let event = WalEvent::new(
564 sequence,
565 WalEventType::RoleAssigned {
566 actor_id: ra.actor_id,
567 role: ra.role.clone(),
568 tenant_id: ra.tenant_id,
569 timestamp: ra.assigned_at,
570 },
571 );
572 reducer
573 .apply(&event)
574 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
575 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
576 sequence = sequence.saturating_add(1);
577 }
578
579 for cg in &snapshot.capability_grants {
580 let event = WalEvent::new(
581 sequence,
582 WalEventType::CapabilityGranted {
583 actor_id: cg.actor_id,
584 capability: cg.capability.clone(),
585 tenant_id: cg.tenant_id,
586 timestamp: cg.granted_at,
587 },
588 );
589 reducer
590 .apply(&event)
591 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
592 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
593 sequence = sequence.saturating_add(1);
594
595 if let Some(revoked_at) = cg.revoked_at {
596 let event = WalEvent::new(
597 sequence,
598 WalEventType::CapabilityRevoked {
599 actor_id: cg.actor_id,
600 capability: cg.capability.clone(),
601 tenant_id: cg.tenant_id,
602 timestamp: revoked_at,
603 },
604 );
605 reducer
606 .apply(&event)
607 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
608 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
609 sequence = sequence.saturating_add(1);
610 }
611 }
612
613 for le in &snapshot.ledger_entries {
614 let event = WalEvent::new(
615 sequence,
616 WalEventType::LedgerEntryAppended {
617 entry_id: le.entry_id,
618 tenant_id: le.tenant_id,
619 ledger_key: le.ledger_key.clone(),
620 actor_id: le.actor_id,
621 payload: le.payload.clone(),
622 timestamp: le.timestamp,
623 },
624 );
625 reducer
626 .apply(&event)
627 .map_err(|err| RecoveryBootstrapError::SnapshotBootstrap(err.to_string()))?;
628 snapshot_events_applied = snapshot_events_applied.saturating_add(1);
629 sequence = sequence.saturating_add(1);
630 }
631
632 let _ = sequence; reducer.set_latest_sequence_for_bootstrap(snapshot.metadata.wal_sequence);
642
643 Ok((reducer, snapshot_events_applied))
644}
645
646#[cfg(test)]
647mod tests {
648 use std::fs;
649 use std::io::Write;
650 use std::path::Path;
651 use std::sync::atomic::{AtomicUsize, Ordering};
652
653 use actionqueue_core::ids::{AttemptId, RunId, TaskId};
654 use actionqueue_core::run::state::RunState;
655 use actionqueue_core::task::constraints::TaskConstraints;
656 use actionqueue_core::task::metadata::TaskMetadata;
657 use actionqueue_core::task::run_policy::RunPolicy;
658 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
659
660 use super::*;
661 use crate::snapshot::mapping::SNAPSHOT_SCHEMA_VERSION;
662 use crate::snapshot::model::{
663 SnapshotAttemptHistoryEntry, SnapshotEngineControl, SnapshotLeaseMetadata,
664 SnapshotMetadata, SnapshotRun, SnapshotRunStateHistoryEntry, SnapshotTask,
665 };
666 use crate::snapshot::writer::{SnapshotFsWriter, SnapshotWriter};
667 use crate::wal::codec;
668
669 static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
670
671 fn temp_data_root() -> PathBuf {
672 let dir = std::env::temp_dir();
673 let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
674 let path = dir.join(format!(
675 "actionqueue_recovery_bootstrap_test_{}_{}",
676 std::process::id(),
677 count
678 ));
679 let _ = fs::remove_dir_all(&path);
680 path
681 }
682
683 fn test_snapshot_task(task_spec: TaskSpec) -> SnapshotTask {
686 SnapshotTask { task_spec, created_at: 0, updated_at: None, canceled_at: None }
687 }
688
689 fn test_attempt_entry(
691 attempt_id: AttemptId,
692 started_at: u64,
693 finished_at: Option<u64>,
694 ) -> SnapshotAttemptHistoryEntry {
695 SnapshotAttemptHistoryEntry {
696 attempt_id,
697 started_at,
698 finished_at,
699 result: None,
700 error: None,
701 output: None,
702 }
703 }
704
705 fn create_task_spec(payload: &[u8]) -> TaskSpec {
706 TaskSpec::new(
707 TaskId::new(),
708 TaskPayload::with_content_type(payload.to_vec(), "application/octet-stream"),
709 RunPolicy::Once,
710 TaskConstraints::default(),
711 TaskMetadata::default(),
712 )
713 .expect("task spec should be valid")
714 }
715
716 fn create_snapshot(task: &TaskSpec, run_state: RunState, wal_sequence: u64) -> Snapshot {
717 let run_instance = RunInstance::new_scheduled_with_id(RunId::new(), task.id(), 10, 10)
718 .expect("scheduled run should be valid");
719
720 let run_instance = if run_state == RunState::Scheduled {
721 run_instance
722 } else {
723 let mut run_instance = run_instance;
724 run_instance.transition_to(run_state).expect("transition should be valid for test");
725 run_instance
726 };
727
728 let state_history = if run_state == RunState::Scheduled {
729 vec![SnapshotRunStateHistoryEntry {
730 from: None,
731 to: RunState::Scheduled,
732 timestamp: 10,
733 }]
734 } else {
735 vec![
736 SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
737 SnapshotRunStateHistoryEntry {
738 from: Some(RunState::Scheduled),
739 to: run_state,
740 timestamp: 11,
741 },
742 ]
743 };
744
745 Snapshot {
746 version: 4,
747 timestamp: 1234,
748 metadata: SnapshotMetadata {
749 schema_version: SNAPSHOT_SCHEMA_VERSION,
750 wal_sequence,
751 task_count: 1,
752 run_count: 1,
753 },
754 tasks: vec![test_snapshot_task(task.clone())],
755 runs: vec![SnapshotRun {
756 run_instance,
757 state_history,
758 attempts: Vec::new(),
759 lease: None,
760 }],
761 engine: SnapshotEngineControl::default(),
762 dependency_declarations: Vec::new(),
763 budgets: Vec::new(),
764 subscriptions: Vec::new(),
765 actors: Vec::new(),
766 tenants: Vec::new(),
767 role_assignments: Vec::new(),
768 capability_grants: Vec::new(),
769 ledger_entries: Vec::new(),
770 }
771 }
772
773 fn write_snapshot(path: &Path, snapshot: &Snapshot) {
774 let mut writer = SnapshotFsWriter::new(path.to_path_buf())
775 .expect("snapshot writer should open for bootstrap test");
776 writer.write(snapshot).expect("snapshot write should succeed");
777 writer.flush().expect("snapshot flush should succeed");
778 writer.close().expect("snapshot close should succeed");
779 }
780
781 fn write_wal_events(path: &Path, events: &[WalEvent]) {
782 let mut file = std::fs::OpenOptions::new()
783 .create(true)
784 .append(true)
785 .open(path)
786 .expect("wal file open should succeed");
787 for event in events {
788 let bytes = codec::encode(event).expect("encode should succeed");
789 file.write_all(&bytes).expect("wal write should succeed");
790 }
791 file.sync_all().expect("wal sync should succeed");
792 }
793
794 #[test]
795 fn test_bootstrap_empty_storage() {
796 let data_root = temp_data_root();
797 let result = load_projection_from_storage(&data_root)
798 .expect("bootstrap should succeed for empty storage");
799
800 assert!(!result.snapshot_loaded);
801 assert_eq!(result.snapshot_sequence, 0);
802 assert_eq!(result.projection.task_count(), 0);
803 assert_eq!(result.projection.run_count(), 0);
804 assert_eq!(result.projection.latest_sequence(), 0);
805 assert_eq!(result.wal_append_telemetry.snapshot().append_success_total, 0);
806 assert_eq!(result.wal_append_telemetry.snapshot().append_failure_total, 0);
807 assert_eq!(result.recovery_observations.events_applied_total, 0);
808 assert_eq!(result.recovery_observations.snapshot_events_applied, 0);
809 assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
810 assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
811
812 let _ = fs::remove_dir_all(data_root);
813 }
814
815 #[test]
816 fn test_bootstrap_snapshot_only_with_exact_sequence() {
817 let data_root = temp_data_root();
818 let task = create_task_spec(&[1, 2, 3]);
819 let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
820
821 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
822 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
823 .expect("snapshot dir should create");
824 write_snapshot(&snapshot_path, &snapshot);
825
826 let result = load_projection_from_storage(&data_root)
827 .expect("bootstrap should succeed for snapshot-only storage");
828
829 assert!(result.snapshot_loaded);
830 assert_eq!(result.snapshot_sequence, 3);
831 assert_eq!(result.projection.task_count(), 1);
832 assert_eq!(result.projection.run_count(), 1);
833 assert_eq!(result.projection.latest_sequence(), 3);
834 assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
835 assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
836 assert_eq!(result.recovery_observations.events_applied_total, 2);
837 assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
838
839 let _ = fs::remove_dir_all(data_root);
840 }
841
842 #[test]
843 fn test_bootstrap_snapshot_with_wal_tail() {
844 let data_root = temp_data_root();
845 let task = create_task_spec(&[4, 5, 6]);
846 let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
847
848 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
849 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
850 .expect("snapshot dir should create");
851 write_snapshot(&snapshot_path, &snapshot);
852
853 let wal_path = data_root.join("wal").join("actionqueue.wal");
854 std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
855 .expect("wal dir should create");
856
857 let extra_task = create_task_spec(&[7, 8, 9]);
858 let wal_events = vec![WalEvent::new(
859 4,
860 WalEventType::TaskCreated { task_spec: extra_task.clone(), timestamp: 0 },
861 )];
862 write_wal_events(&wal_path, &wal_events);
863
864 let result = load_projection_from_storage(&data_root)
865 .expect("bootstrap should succeed for snapshot + WAL tail");
866
867 assert!(result.snapshot_loaded);
868 assert_eq!(result.snapshot_sequence, 3);
869 assert_eq!(result.projection.task_count(), 2);
870 assert_eq!(result.projection.latest_sequence(), 4);
871 assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
872 assert_eq!(result.recovery_observations.wal_replay_events_applied, 1);
873 assert_eq!(result.recovery_observations.events_applied_total, 3);
874 assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
875
876 let _ = fs::remove_dir_all(data_root);
877 }
878
879 #[test]
880 fn test_bootstrap_snapshot_with_high_wal_sequence_succeeds() {
881 let data_root = temp_data_root();
882 let task = create_task_spec(&[10, 11, 12]);
883 let snapshot = create_snapshot(&task, RunState::Scheduled, 10);
887
888 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
889 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
890 .expect("snapshot dir should create");
891 write_snapshot(&snapshot_path, &snapshot);
892
893 let result = load_projection_from_storage(&data_root);
894 assert!(result.is_ok(), "snapshot with high wal_sequence should bootstrap successfully");
895 let recovery = result.expect("recovery should succeed");
896 assert!(recovery.snapshot_loaded, "snapshot should be loaded despite high wal_sequence");
897
898 let _ = fs::remove_dir_all(data_root);
899 }
900
901 #[test]
902 fn p6_011_t_p3_bootstrap_hydrates_task_canceled_projection_from_snapshot() {
903 let data_root = temp_data_root();
904 let task = create_task_spec(&[1, 9, 9]);
905 let canceled_at = 77;
906 let snapshot = Snapshot {
907 version: 4,
908 timestamp: 1234,
909 metadata: SnapshotMetadata {
910 schema_version: SNAPSHOT_SCHEMA_VERSION,
911 wal_sequence: 2,
912 task_count: 1,
913 run_count: 0,
914 },
915 tasks: vec![SnapshotTask {
916 canceled_at: Some(canceled_at),
917 ..test_snapshot_task(task.clone())
918 }],
919 runs: Vec::new(),
920 engine: SnapshotEngineControl::default(),
921 dependency_declarations: Vec::new(),
922 budgets: Vec::new(),
923 subscriptions: Vec::new(),
924 actors: Vec::new(),
925 tenants: Vec::new(),
926 role_assignments: Vec::new(),
927 capability_grants: Vec::new(),
928 ledger_entries: Vec::new(),
929 };
930
931 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
932 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
933 .expect("snapshot dir should create");
934 write_snapshot(&snapshot_path, &snapshot);
935
936 let result = load_projection_from_storage(&data_root)
937 .expect("bootstrap should succeed for canceled task");
938
939 assert!(result.snapshot_loaded);
940 assert_eq!(result.snapshot_sequence, 2);
941 assert!(result.projection.is_task_canceled(task.id()));
942 assert_eq!(result.projection.task_canceled_at(task.id()), Some(canceled_at));
943 assert_eq!(result.projection.latest_sequence(), 2);
944 assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
945 assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
946 assert_eq!(result.recovery_observations.events_applied_total, 2);
947
948 let _ = fs::remove_dir_all(data_root);
949 }
950
951 #[test]
952 fn p6_013_t_p3_bootstrap_hydrates_engine_paused_projection_from_snapshot() {
953 let data_root = temp_data_root();
954 let task = create_task_spec(&[2, 2, 2]);
955 let paused_at = 120;
956 let snapshot = Snapshot {
957 version: 4,
958 timestamp: 1234,
959 metadata: SnapshotMetadata {
960 schema_version: SNAPSHOT_SCHEMA_VERSION,
961 wal_sequence: 2,
962 task_count: 1,
963 run_count: 0,
964 },
965 tasks: vec![test_snapshot_task(task)],
966 runs: Vec::new(),
967 engine: SnapshotEngineControl {
968 paused: true,
969 paused_at: Some(paused_at),
970 resumed_at: None,
971 },
972 dependency_declarations: Vec::new(),
973 budgets: Vec::new(),
974 subscriptions: Vec::new(),
975 actors: Vec::new(),
976 tenants: Vec::new(),
977 role_assignments: Vec::new(),
978 capability_grants: Vec::new(),
979 ledger_entries: Vec::new(),
980 };
981
982 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
983 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
984 .expect("snapshot dir should create");
985 write_snapshot(&snapshot_path, &snapshot);
986
987 let result = load_projection_from_storage(&data_root)
988 .expect("bootstrap should succeed for engine paused snapshot");
989
990 assert!(result.snapshot_loaded);
991 assert_eq!(result.snapshot_sequence, 2);
992 assert!(result.projection.is_engine_paused());
993 assert_eq!(result.projection.engine_paused_at(), Some(paused_at));
994 assert_eq!(result.projection.engine_resumed_at(), None);
995 assert_eq!(result.projection.latest_sequence(), 2);
996 assert_eq!(result.recovery_observations.snapshot_events_applied, 2);
997 assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
998 assert_eq!(result.recovery_observations.events_applied_total, 2);
999
1000 let _ = fs::remove_dir_all(data_root);
1001 }
1002
1003 #[test]
1004 fn p6_013_t_p4_bootstrap_hydrates_engine_resumed_projection_from_snapshot() {
1005 let data_root = temp_data_root();
1006 let task = create_task_spec(&[2, 2, 3]);
1007 let paused_at = 120;
1008 let resumed_at = 180;
1009 let snapshot = Snapshot {
1010 version: 4,
1011 timestamp: 1234,
1012 metadata: SnapshotMetadata {
1013 schema_version: SNAPSHOT_SCHEMA_VERSION,
1014 wal_sequence: 3,
1015 task_count: 1,
1016 run_count: 0,
1017 },
1018 tasks: vec![test_snapshot_task(task)],
1019 runs: Vec::new(),
1020 engine: SnapshotEngineControl {
1021 paused: false,
1022 paused_at: Some(paused_at),
1023 resumed_at: Some(resumed_at),
1024 },
1025 dependency_declarations: Vec::new(),
1026 budgets: Vec::new(),
1027 subscriptions: Vec::new(),
1028 actors: Vec::new(),
1029 tenants: Vec::new(),
1030 role_assignments: Vec::new(),
1031 capability_grants: Vec::new(),
1032 ledger_entries: Vec::new(),
1033 };
1034
1035 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1036 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1037 .expect("snapshot dir should create");
1038 write_snapshot(&snapshot_path, &snapshot);
1039
1040 let result = load_projection_from_storage(&data_root)
1041 .expect("bootstrap should succeed for engine resumed snapshot");
1042
1043 assert!(result.snapshot_loaded);
1044 assert_eq!(result.snapshot_sequence, 3);
1045 assert!(!result.projection.is_engine_paused());
1046 assert_eq!(result.projection.engine_paused_at(), Some(paused_at));
1047 assert_eq!(result.projection.engine_resumed_at(), Some(resumed_at));
1048 assert_eq!(result.projection.latest_sequence(), 3);
1049 assert_eq!(result.recovery_observations.snapshot_events_applied, 3);
1050 assert_eq!(result.recovery_observations.wal_replay_events_applied, 0);
1051 assert_eq!(result.recovery_observations.events_applied_total, 3);
1052
1053 let _ = fs::remove_dir_all(data_root);
1054 }
1055
1056 #[test]
1057 fn test_bootstrap_wal_only_reports_wal_replay_events() {
1058 let data_root = temp_data_root();
1059 let wal_path = data_root.join("wal").join("actionqueue.wal");
1060 std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1061 .expect("wal dir should create");
1062
1063 let task = create_task_spec(&[7, 7, 7]);
1064 let wal_events =
1065 vec![WalEvent::new(1, WalEventType::TaskCreated { task_spec: task, timestamp: 0 })];
1066 write_wal_events(&wal_path, &wal_events);
1067
1068 let result = load_projection_from_storage(&data_root)
1069 .expect("bootstrap should succeed for WAL-only storage");
1070
1071 assert!(!result.snapshot_loaded);
1072 assert_eq!(result.recovery_observations.snapshot_events_applied, 0);
1073 assert_eq!(result.recovery_observations.wal_replay_events_applied, 1);
1074 assert_eq!(result.recovery_observations.events_applied_total, 1);
1075 assert!(result.recovery_observations.recovery_duration_seconds >= 0.0);
1076
1077 let _ = fs::remove_dir_all(data_root);
1078 }
1079
1080 #[test]
1081 fn test_snapshot_sequence_exceeds_wal_max_sequence() {
1082 let data_root = temp_data_root();
1083 let task = create_task_spec(&[1, 2, 3]);
1084 let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
1087
1088 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1089 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1090 .expect("snapshot dir should create");
1091 write_snapshot(&snapshot_path, &snapshot);
1092
1093 let wal_path = data_root.join("wal").join("actionqueue.wal");
1095 std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1096 .expect("wal dir should create");
1097 let wal_task = create_task_spec(&[4, 5, 6]);
1098 let wal_events =
1099 vec![WalEvent::new(1, WalEventType::TaskCreated { task_spec: wal_task, timestamp: 0 })];
1100 write_wal_events(&wal_path, &wal_events);
1101
1102 let result = load_projection_from_storage(&data_root);
1103 assert!(matches!(
1104 result,
1105 Err(RecoveryBootstrapError::SnapshotBootstrap(msg))
1106 if msg.contains("snapshot sequence 3 exceeds WAL max sequence 1")
1107 ));
1108
1109 let _ = fs::remove_dir_all(data_root);
1110 }
1111
1112 #[test]
1113 fn test_snapshot_sequence_within_wal_max_succeeds() {
1114 let data_root = temp_data_root();
1115 let task = create_task_spec(&[1, 2, 3]);
1116 let snapshot = create_snapshot(&task, RunState::Scheduled, 3);
1117
1118 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1119 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1120 .expect("snapshot dir should create");
1121 write_snapshot(&snapshot_path, &snapshot);
1122
1123 let wal_path = data_root.join("wal").join("actionqueue.wal");
1125 std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1126 .expect("wal dir should create");
1127 let extra_task = create_task_spec(&[7, 8, 9]);
1128 let wal_events = vec![WalEvent::new(
1129 4,
1130 WalEventType::TaskCreated { task_spec: extra_task, timestamp: 0 },
1131 )];
1132 write_wal_events(&wal_path, &wal_events);
1133
1134 let result = load_projection_from_storage(&data_root)
1135 .expect("bootstrap should succeed when snapshot sequence <= WAL max");
1136
1137 assert!(result.snapshot_loaded);
1138 assert_eq!(result.projection.task_count(), 2);
1139
1140 let _ = fs::remove_dir_all(data_root);
1141 }
1142
1143 #[test]
1144 fn test_bootstrap_snapshot_with_active_lease() {
1145 let data_root = temp_data_root();
1146 let task = create_task_spec(&[20, 21, 22]);
1147 let run_id = RunId::new();
1148
1149 let mut run_instance = RunInstance::new_scheduled_with_id(run_id, task.id(), 10, 10)
1151 .expect("scheduled run should be valid");
1152 run_instance.transition_to(RunState::Ready).expect("transition to Ready");
1153 run_instance.transition_to(RunState::Leased).expect("transition to Leased");
1154
1155 let state_history = vec![
1156 SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
1157 SnapshotRunStateHistoryEntry {
1158 from: Some(RunState::Scheduled),
1159 to: RunState::Ready,
1160 timestamp: 11,
1161 },
1162 SnapshotRunStateHistoryEntry {
1163 from: Some(RunState::Ready),
1164 to: RunState::Leased,
1165 timestamp: 12,
1166 },
1167 ];
1168
1169 let lease = Some(SnapshotLeaseMetadata {
1170 owner: "worker-1".to_string(),
1171 expiry: 1000,
1172 acquired_at: 12,
1173 updated_at: 12,
1174 });
1175
1176 let snapshot = Snapshot {
1177 version: 4,
1178 timestamp: 1234,
1179 metadata: SnapshotMetadata {
1180 schema_version: SNAPSHOT_SCHEMA_VERSION,
1181 wal_sequence: 5,
1182 task_count: 1,
1183 run_count: 1,
1184 },
1185 tasks: vec![test_snapshot_task(task.clone())],
1186 runs: vec![SnapshotRun { run_instance, state_history, attempts: Vec::new(), lease }],
1187 engine: SnapshotEngineControl::default(),
1188 dependency_declarations: Vec::new(),
1189 budgets: Vec::new(),
1190 subscriptions: Vec::new(),
1191 actors: Vec::new(),
1192 tenants: Vec::new(),
1193 role_assignments: Vec::new(),
1194 capability_grants: Vec::new(),
1195 ledger_entries: Vec::new(),
1196 };
1197
1198 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1199 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1200 .expect("snapshot dir should create");
1201 write_snapshot(&snapshot_path, &snapshot);
1202
1203 let result = load_projection_from_storage(&data_root)
1204 .expect("bootstrap should succeed for snapshot with active lease");
1205
1206 assert!(result.snapshot_loaded);
1207 let lease = result.projection.get_lease(&run_id);
1208 assert!(lease.is_some(), "get_lease() should return active lease after bootstrap");
1209 let (owner, expiry) = lease.unwrap();
1210 assert_eq!(owner, "worker-1");
1211 assert_eq!(*expiry, 1000);
1212
1213 let metadata = result.projection.get_lease_metadata(&run_id);
1214 assert!(metadata.is_some(), "lease metadata should be present");
1215 assert_eq!(metadata.unwrap().owner(), "worker-1");
1216 assert_eq!(metadata.unwrap().acquired_at(), 12);
1217
1218 let _ = fs::remove_dir_all(data_root);
1219 }
1220
1221 #[test]
1222 fn test_bootstrap_snapshot_with_running_attempt() {
1223 let data_root = temp_data_root();
1224 let task = create_task_spec(&[30, 31, 32]);
1225 let run_id = RunId::new();
1226 let attempt_id = AttemptId::new();
1227
1228 let mut run_instance = RunInstance::new_scheduled_with_id(run_id, task.id(), 10, 10)
1232 .expect("scheduled run should be valid");
1233 run_instance.transition_to(RunState::Ready).expect("transition to Ready");
1234 run_instance.transition_to(RunState::Leased).expect("transition to Leased");
1235 run_instance.transition_to(RunState::Running).expect("transition to Running");
1236 run_instance.start_attempt(attempt_id).expect("start attempt");
1237
1238 let state_history = vec![
1239 SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
1240 SnapshotRunStateHistoryEntry {
1241 from: Some(RunState::Scheduled),
1242 to: RunState::Ready,
1243 timestamp: 11,
1244 },
1245 SnapshotRunStateHistoryEntry {
1246 from: Some(RunState::Ready),
1247 to: RunState::Leased,
1248 timestamp: 12,
1249 },
1250 SnapshotRunStateHistoryEntry {
1251 from: Some(RunState::Leased),
1252 to: RunState::Running,
1253 timestamp: 13,
1254 },
1255 ];
1256
1257 let attempts = vec![test_attempt_entry(attempt_id, 13, None)];
1258
1259 let snapshot = Snapshot {
1260 version: 4,
1261 timestamp: 1234,
1262 metadata: SnapshotMetadata {
1263 schema_version: SNAPSHOT_SCHEMA_VERSION,
1264 wal_sequence: 6,
1265 task_count: 1,
1266 run_count: 1,
1267 },
1268 tasks: vec![test_snapshot_task(task.clone())],
1269 runs: vec![SnapshotRun { run_instance, state_history, attempts, lease: None }],
1270 engine: SnapshotEngineControl::default(),
1271 dependency_declarations: Vec::new(),
1272 budgets: Vec::new(),
1273 subscriptions: Vec::new(),
1274 actors: Vec::new(),
1275 tenants: Vec::new(),
1276 role_assignments: Vec::new(),
1277 capability_grants: Vec::new(),
1278 ledger_entries: Vec::new(),
1279 };
1280
1281 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1282 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1283 .expect("snapshot dir should create");
1284 write_snapshot(&snapshot_path, &snapshot);
1285
1286 let result = load_projection_from_storage(&data_root)
1287 .expect("bootstrap should succeed for snapshot with running attempt");
1288
1289 assert!(result.snapshot_loaded);
1290 let run = result.projection.get_run_instance(&run_id);
1291 assert!(run.is_some(), "run instance should exist after bootstrap");
1292 let run = run.unwrap();
1293 assert_eq!(run.attempt_count(), 1);
1294 assert_eq!(run.current_attempt_id(), Some(attempt_id));
1295 assert_eq!(run.state(), RunState::Running);
1296
1297 let _ = fs::remove_dir_all(data_root);
1298 }
1299
1300 #[test]
1301 fn test_bootstrap_snapshot_wal_tail_lease_heartbeat() {
1302 let data_root = temp_data_root();
1303 let task = create_task_spec(&[40, 41, 42]);
1304 let run_id = RunId::new();
1305
1306 let mut run_instance = RunInstance::new_scheduled_with_id(run_id, task.id(), 10, 10)
1308 .expect("scheduled run should be valid");
1309 run_instance.transition_to(RunState::Ready).expect("transition to Ready");
1310 run_instance.transition_to(RunState::Leased).expect("transition to Leased");
1311
1312 let state_history = vec![
1313 SnapshotRunStateHistoryEntry { from: None, to: RunState::Scheduled, timestamp: 10 },
1314 SnapshotRunStateHistoryEntry {
1315 from: Some(RunState::Scheduled),
1316 to: RunState::Ready,
1317 timestamp: 11,
1318 },
1319 SnapshotRunStateHistoryEntry {
1320 from: Some(RunState::Ready),
1321 to: RunState::Leased,
1322 timestamp: 12,
1323 },
1324 ];
1325
1326 let lease = Some(SnapshotLeaseMetadata {
1327 owner: "worker-1".to_string(),
1328 expiry: 1000,
1329 acquired_at: 12,
1330 updated_at: 12,
1331 });
1332
1333 let snapshot = Snapshot {
1334 version: 4,
1335 timestamp: 1234,
1336 metadata: SnapshotMetadata {
1337 schema_version: SNAPSHOT_SCHEMA_VERSION,
1338 wal_sequence: 5,
1339 task_count: 1,
1340 run_count: 1,
1341 },
1342 tasks: vec![test_snapshot_task(task.clone())],
1343 runs: vec![SnapshotRun { run_instance, state_history, attempts: Vec::new(), lease }],
1344 engine: SnapshotEngineControl::default(),
1345 dependency_declarations: Vec::new(),
1346 budgets: Vec::new(),
1347 subscriptions: Vec::new(),
1348 actors: Vec::new(),
1349 tenants: Vec::new(),
1350 role_assignments: Vec::new(),
1351 capability_grants: Vec::new(),
1352 ledger_entries: Vec::new(),
1353 };
1354
1355 let snapshot_path = data_root.join("snapshots").join("snapshot.bin");
1356 std::fs::create_dir_all(snapshot_path.parent().expect("snapshot parent"))
1357 .expect("snapshot dir should create");
1358 write_snapshot(&snapshot_path, &snapshot);
1359
1360 let wal_path = data_root.join("wal").join("actionqueue.wal");
1362 std::fs::create_dir_all(wal_path.parent().expect("wal parent"))
1363 .expect("wal dir should create");
1364 let wal_events = vec![WalEvent::new(
1365 6,
1366 WalEventType::LeaseHeartbeat {
1367 run_id,
1368 owner: "worker-1".to_string(),
1369 expiry: 2000,
1370 timestamp: 50,
1371 },
1372 )];
1373 write_wal_events(&wal_path, &wal_events);
1374
1375 let result = load_projection_from_storage(&data_root)
1376 .expect("bootstrap with snapshot lease + WAL heartbeat should succeed");
1377
1378 assert!(result.snapshot_loaded);
1379 let lease = result.projection.get_lease(&run_id);
1380 assert!(lease.is_some(), "lease should be present after heartbeat replay");
1381 let (owner, expiry) = lease.unwrap();
1382 assert_eq!(owner, "worker-1");
1383 assert_eq!(*expiry, 2000, "expiry should be updated by heartbeat");
1384
1385 let _ = fs::remove_dir_all(data_root);
1386 }
1387}