Skip to main content

sochdb_kernel/
boot_fsm.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Deterministic Boot Finite State Machine
19//!
20//! Implements a production-grade boot sequence with:
21//! - Well-defined states: `Init → Migrate → Recover → Warmup → Ready`
22//! - Time budgets for each phase (for Kubernetes probe alignment)
23//! - Progress reporting for external observability
24//! - Recovery modes: Normal, ReadOnlyRecovery, ForceRecovery
25//!
26//! ## Kubernetes Integration
27//!
28//! The FSM exports progress metrics that align with K8s probe semantics:
29//! - `startupProbe`: tolerates long recovery (uses recovery budget)
30//! - `readinessProbe`: true only when FSM is in `Ready`
31//! - `livenessProbe`: heartbeat-based (separate from FSM)
32//!
33//! ## Safety Property
34//!
35//! `Ready ⇒ (recovery_complete ∧ invariants_checked ∧ services_registered)`
36//!
37//! ## Complexity Bounds
38//!
39//! Recovery is O(|WAL| + |checkpoint|). The FSM tracks and exposes this
40//! to allow operators to configure appropriate probe timeouts.
41
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47
48/// Boot phase states (DFA transitions)
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50#[repr(u8)]
51pub enum BootPhase {
52    /// Initial state before any boot activity
53    Uninitialized = 0,
54    /// Initializing core subsystems (allocator, config, logging)
55    Init = 1,
56    /// Running schema/format migrations
57    Migrate = 2,
58    /// Recovering from WAL (ARIES redo/undo)
59    Recover = 3,
60    /// Warming up caches and indexes
61    Warmup = 4,
62    /// Fully operational
63    Ready = 5,
64    /// Read-only recovery mode (for forensics)
65    ReadOnlyRecovery = 6,
66    /// Force recovery mode (skip some checks)
67    ForceRecovery = 7,
68    /// Graceful shutdown in progress
69    ShuttingDown = 8,
70    /// Boot failed (terminal state)
71    Failed = 9,
72}
73
74impl BootPhase {
75    /// Get human-readable phase name
76    pub fn name(&self) -> &'static str {
77        match self {
78            BootPhase::Uninitialized => "uninitialized",
79            BootPhase::Init => "init",
80            BootPhase::Migrate => "migrate",
81            BootPhase::Recover => "recover",
82            BootPhase::Warmup => "warmup",
83            BootPhase::Ready => "ready",
84            BootPhase::ReadOnlyRecovery => "readonly_recovery",
85            BootPhase::ForceRecovery => "force_recovery",
86            BootPhase::ShuttingDown => "shutting_down",
87            BootPhase::Failed => "failed",
88        }
89    }
90
91    /// Check if this phase indicates the system is ready for traffic
92    pub fn is_ready(&self) -> bool {
93        matches!(self, BootPhase::Ready)
94    }
95
96    /// Check if this phase indicates the system is alive (not dead)
97    pub fn is_alive(&self) -> bool {
98        !matches!(self, BootPhase::Failed)
99    }
100
101    /// Check if boot is still in progress
102    pub fn is_booting(&self) -> bool {
103        matches!(
104            self,
105            BootPhase::Init
106                | BootPhase::Migrate
107                | BootPhase::Recover
108                | BootPhase::Warmup
109                | BootPhase::ReadOnlyRecovery
110                | BootPhase::ForceRecovery
111        )
112    }
113}
114
115/// Progress information for a boot phase
116#[derive(Debug, Clone)]
117pub struct PhaseProgress {
118    /// Current progress (0-100)
119    pub percent: u8,
120    /// Human-readable status message
121    pub message: String,
122    /// Items processed (e.g., WAL records replayed)
123    pub items_processed: u64,
124    /// Total items to process (0 if unknown)
125    pub items_total: u64,
126    /// Bytes processed
127    pub bytes_processed: u64,
128    /// Total bytes to process (0 if unknown)
129    pub bytes_total: u64,
130    /// Time spent in this phase
131    pub elapsed: Duration,
132}
133
134impl Default for PhaseProgress {
135    fn default() -> Self {
136        Self {
137            percent: 0,
138            message: String::new(),
139            items_processed: 0,
140            items_total: 0,
141            bytes_processed: 0,
142            bytes_total: 0,
143            elapsed: Duration::ZERO,
144        }
145    }
146}
147
148/// Time budget configuration for each boot phase
149#[derive(Debug, Clone)]
150pub struct BootBudgets {
151    /// Maximum time for init phase
152    pub init_budget: Duration,
153    /// Maximum time for migration phase
154    pub migrate_budget: Duration,
155    /// Maximum time for recovery phase (WAL replay)
156    pub recover_budget: Duration,
157    /// Maximum time for warmup phase
158    pub warmup_budget: Duration,
159    /// Total boot timeout
160    pub total_budget: Duration,
161}
162
163impl Default for BootBudgets {
164    fn default() -> Self {
165        Self {
166            init_budget: Duration::from_secs(30),
167            migrate_budget: Duration::from_secs(300), // 5 min for migrations
168            recover_budget: Duration::from_secs(1800), // 30 min for WAL replay
169            warmup_budget: Duration::from_secs(300),  // 5 min for cache warmup
170            total_budget: Duration::from_secs(3600),  // 1 hour total
171        }
172    }
173}
174
175impl BootBudgets {
176    /// Create budgets suitable for Kubernetes startupProbe
177    ///
178    /// K8s startupProbe checks are: failureThreshold × periodSeconds
179    /// These budgets should be less than that product.
180    pub fn for_kubernetes(startup_probe_total_seconds: u64) -> Self {
181        let total = Duration::from_secs(startup_probe_total_seconds);
182        Self {
183            init_budget: Duration::from_secs(startup_probe_total_seconds / 20),
184            migrate_budget: Duration::from_secs(startup_probe_total_seconds / 5),
185            recover_budget: Duration::from_secs(startup_probe_total_seconds * 3 / 5),
186            warmup_budget: Duration::from_secs(startup_probe_total_seconds / 10),
187            total_budget: total,
188        }
189    }
190}
191
192/// Recovery mode configuration
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum RecoveryMode {
195    /// Normal recovery with full ARIES protocol
196    Normal,
197    /// Read-only mode for forensics (no WAL writes)
198    ReadOnly,
199    /// Force recovery (skip some consistency checks)
200    Force,
201}
202
203/// Preload hints for deterministic warmup
204#[derive(Debug, Clone, Default)]
205pub struct PreloadHints {
206    /// Specific pages to preload
207    pub page_ids: Vec<u64>,
208    /// Index names to preload
209    pub indexes: Vec<String>,
210    /// Estimated working set size in bytes
211    pub working_set_bytes: u64,
212}
213
214/// Boot state machine with thread-safe state transitions
215pub struct BootStateMachine {
216    /// Current boot phase
217    phase: RwLock<BootPhase>,
218    /// Phase start time
219    phase_start: RwLock<Instant>,
220    /// Boot start time
221    boot_start: RwLock<Option<Instant>>,
222    /// Current phase progress
223    progress: RwLock<PhaseProgress>,
224    /// Time budgets
225    budgets: BootBudgets,
226    /// Recovery mode
227    recovery_mode: RwLock<RecoveryMode>,
228    /// Failure reason (if Failed)
229    failure_reason: RwLock<Option<String>>,
230    /// Preload hints for warmup
231    preload_hints: RwLock<PreloadHints>,
232    /// Metrics counters
233    metrics: BootMetrics,
234}
235
236/// Boot metrics for observability
237pub struct BootMetrics {
238    /// Number of WAL records replayed
239    pub wal_records_replayed: AtomicU64,
240    /// Bytes of WAL data processed
241    pub wal_bytes_processed: AtomicU64,
242    /// Number of pages recovered
243    pub pages_recovered: AtomicU64,
244    /// Number of transactions rolled back
245    pub txns_rolled_back: AtomicU64,
246    /// Checkpoint scan bytes
247    pub checkpoint_bytes_scanned: AtomicU64,
248    /// Migration steps completed
249    pub migration_steps_completed: AtomicU64,
250    /// Cache hit rate during warmup (scaled by 1000)
251    pub warmup_hit_rate_permille: AtomicU64,
252}
253
254impl Default for BootMetrics {
255    fn default() -> Self {
256        Self {
257            wal_records_replayed: AtomicU64::new(0),
258            wal_bytes_processed: AtomicU64::new(0),
259            pages_recovered: AtomicU64::new(0),
260            txns_rolled_back: AtomicU64::new(0),
261            checkpoint_bytes_scanned: AtomicU64::new(0),
262            migration_steps_completed: AtomicU64::new(0),
263            warmup_hit_rate_permille: AtomicU64::new(0),
264        }
265    }
266}
267
268/// Error during boot
269#[derive(Debug, Clone)]
270pub struct BootError {
271    pub phase: BootPhase,
272    pub message: String,
273    pub recoverable: bool,
274}
275
276impl std::fmt::Display for BootError {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        write!(
279            f,
280            "Boot error in phase {}: {} (recoverable: {})",
281            self.phase.name(),
282            self.message,
283            self.recoverable
284        )
285    }
286}
287
288impl std::error::Error for BootError {}
289
290impl BootStateMachine {
291    /// Create a new boot state machine
292    pub fn new(budgets: BootBudgets) -> Self {
293        Self {
294            phase: RwLock::new(BootPhase::Uninitialized),
295            phase_start: RwLock::new(Instant::now()),
296            boot_start: RwLock::new(None),
297            progress: RwLock::new(PhaseProgress::default()),
298            budgets,
299            recovery_mode: RwLock::new(RecoveryMode::Normal),
300            failure_reason: RwLock::new(None),
301            preload_hints: RwLock::new(PreloadHints::default()),
302            metrics: BootMetrics::default(),
303        }
304    }
305
306    /// Create with default budgets
307    pub fn with_defaults() -> Self {
308        Self::new(BootBudgets::default())
309    }
310
311    /// Get current boot phase
312    pub fn current_phase(&self) -> BootPhase {
313        *self.phase.read()
314    }
315
316    /// Get current progress
317    pub fn current_progress(&self) -> PhaseProgress {
318        let mut progress = self.progress.read().clone();
319        progress.elapsed = self.phase_start.read().elapsed();
320        progress
321    }
322
323    /// Check if system is ready for traffic
324    pub fn is_ready(&self) -> bool {
325        self.current_phase().is_ready()
326    }
327
328    /// Check if system is alive (for liveness probe)
329    pub fn is_alive(&self) -> bool {
330        self.current_phase().is_alive()
331    }
332
333    /// Get time remaining in current phase budget
334    pub fn remaining_budget(&self) -> Duration {
335        self.remaining_budget_for(*self.phase.read())
336    }
337
338    /// Remaining budget for an explicitly-supplied phase, WITHOUT locking
339    /// `self.phase`. Callers that already hold the `self.phase` lock (e.g.
340    /// `transition_to`, which holds the write guard) MUST use this and pass the
341    /// phase value they hold — calling the public `remaining_budget()` there
342    /// re-locks `self.phase` and self-deadlocks (parking_lot RwLock is not
343    /// reentrant).
344    fn remaining_budget_for(&self, phase: BootPhase) -> Duration {
345        let elapsed = self.phase_start.read().elapsed();
346        let budget = match phase {
347            BootPhase::Init => self.budgets.init_budget,
348            BootPhase::Migrate => self.budgets.migrate_budget,
349            BootPhase::Recover | BootPhase::ReadOnlyRecovery | BootPhase::ForceRecovery => {
350                self.budgets.recover_budget
351            }
352            BootPhase::Warmup => self.budgets.warmup_budget,
353            _ => Duration::ZERO,
354        };
355        budget.saturating_sub(elapsed)
356    }
357
358    /// Get total boot elapsed time
359    pub fn total_elapsed(&self) -> Duration {
360        self.boot_start
361            .read()
362            .map(|t| t.elapsed())
363            .unwrap_or(Duration::ZERO)
364    }
365
366    /// Start the boot sequence
367    pub fn start_boot(&self, recovery_mode: RecoveryMode) -> Result<(), BootError> {
368        let mut phase = self.phase.write();
369        if *phase != BootPhase::Uninitialized {
370            return Err(BootError {
371                phase: *phase,
372                message: "Boot already started".to_string(),
373                recoverable: false,
374            });
375        }
376
377        *self.boot_start.write() = Some(Instant::now());
378        *self.recovery_mode.write() = recovery_mode;
379        *phase = BootPhase::Init;
380        *self.phase_start.write() = Instant::now();
381        *self.progress.write() = PhaseProgress {
382            message: "Initializing core subsystems".to_string(),
383            ..Default::default()
384        };
385
386        Ok(())
387    }
388
389    /// Transition to next phase
390    pub fn transition_to(&self, next_phase: BootPhase) -> Result<(), BootError> {
391        let mut phase = self.phase.write();
392        let current = *phase;
393
394        // Validate transition
395        let valid = match (current, next_phase) {
396            (BootPhase::Uninitialized, BootPhase::Init) => true,
397            (BootPhase::Init, BootPhase::Migrate) => true,
398            (BootPhase::Init, BootPhase::Failed) => true,
399            (BootPhase::Migrate, BootPhase::Recover) => true,
400            (BootPhase::Migrate, BootPhase::ReadOnlyRecovery) => true,
401            (BootPhase::Migrate, BootPhase::ForceRecovery) => true,
402            (BootPhase::Migrate, BootPhase::Failed) => true,
403            (BootPhase::Recover, BootPhase::Warmup) => true,
404            (BootPhase::Recover, BootPhase::Ready) => true, // Skip warmup
405            (BootPhase::Recover, BootPhase::Failed) => true,
406            (BootPhase::ReadOnlyRecovery, BootPhase::Ready) => true,
407            (BootPhase::ReadOnlyRecovery, BootPhase::Failed) => true,
408            (BootPhase::ForceRecovery, BootPhase::Warmup) => true,
409            (BootPhase::ForceRecovery, BootPhase::Ready) => true,
410            (BootPhase::ForceRecovery, BootPhase::Failed) => true,
411            (BootPhase::Warmup, BootPhase::Ready) => true,
412            (BootPhase::Warmup, BootPhase::Failed) => true,
413            (BootPhase::Ready, BootPhase::ShuttingDown) => true,
414            (_, BootPhase::Failed) => true, // Can always fail
415            _ => false,
416        };
417
418        if !valid {
419            return Err(BootError {
420                phase: current,
421                message: format!(
422                    "Invalid transition: {} -> {}",
423                    current.name(),
424                    next_phase.name()
425                ),
426                recoverable: false,
427            });
428        }
429
430        // Check budget exceeded. Use the lock-free variant with the phase we
431        // already hold the write lock on — calling remaining_budget() here would
432        // re-read-lock self.phase and self-deadlock.
433        if self.remaining_budget_for(current) == Duration::ZERO && current.is_booting() {
434            *phase = BootPhase::Failed;
435            *self.failure_reason.write() =
436                Some(format!("Budget exceeded in phase {}", current.name()));
437            return Err(BootError {
438                phase: current,
439                message: "Phase budget exceeded".to_string(),
440                recoverable: false,
441            });
442        }
443
444        *phase = next_phase;
445        *self.phase_start.write() = Instant::now();
446        *self.progress.write() = PhaseProgress::default();
447
448        Ok(())
449    }
450
451    /// Update progress within current phase
452    pub fn update_progress(&self, progress: PhaseProgress) {
453        *self.progress.write() = progress;
454    }
455
456    /// Mark boot as failed with reason
457    pub fn fail(&self, reason: &str) {
458        let current = *self.phase.read();
459        *self.phase.write() = BootPhase::Failed;
460        *self.failure_reason.write() = Some(format!("Failed in {}: {}", current.name(), reason));
461    }
462
463    /// Get failure reason if failed
464    pub fn failure_reason(&self) -> Option<String> {
465        self.failure_reason.read().clone()
466    }
467
468    /// Set preload hints for warmup phase
469    pub fn set_preload_hints(&self, hints: PreloadHints) {
470        *self.preload_hints.write() = hints;
471    }
472
473    /// Get preload hints
474    pub fn preload_hints(&self) -> PreloadHints {
475        self.preload_hints.read().clone()
476    }
477
478    /// Get boot metrics
479    pub fn metrics(&self) -> &BootMetrics {
480        &self.metrics
481    }
482
483    /// Record WAL replay progress
484    pub fn record_wal_progress(&self, records: u64, bytes: u64) {
485        self.metrics
486            .wal_records_replayed
487            .fetch_add(records, Ordering::Relaxed);
488        self.metrics
489            .wal_bytes_processed
490            .fetch_add(bytes, Ordering::Relaxed);
491    }
492
493    /// Record page recovery
494    pub fn record_page_recovered(&self, count: u64) {
495        self.metrics
496            .pages_recovered
497            .fetch_add(count, Ordering::Relaxed);
498    }
499
500    /// Record transaction rollback
501    pub fn record_txn_rollback(&self, count: u64) {
502        self.metrics
503            .txns_rolled_back
504            .fetch_add(count, Ordering::Relaxed);
505    }
506
507    /// Generate health check response for Kubernetes probes
508    pub fn health_status(&self) -> HealthStatus {
509        let phase = self.current_phase();
510        let progress = self.current_progress();
511
512        HealthStatus {
513            phase,
514            phase_name: phase.name().to_string(),
515            is_ready: phase.is_ready(),
516            is_alive: phase.is_alive(),
517            is_booting: phase.is_booting(),
518            progress_percent: progress.percent,
519            progress_message: progress.message,
520            phase_elapsed_ms: progress.elapsed.as_millis() as u64,
521            total_elapsed_ms: self.total_elapsed().as_millis() as u64,
522            remaining_budget_ms: self.remaining_budget().as_millis() as u64,
523            failure_reason: self.failure_reason(),
524            wal_records_replayed: self.metrics.wal_records_replayed.load(Ordering::Relaxed),
525            wal_bytes_processed: self.metrics.wal_bytes_processed.load(Ordering::Relaxed),
526        }
527    }
528}
529
530/// Health status for probes and observability
531#[derive(Debug, Clone)]
532pub struct HealthStatus {
533    pub phase: BootPhase,
534    pub phase_name: String,
535    pub is_ready: bool,
536    pub is_alive: bool,
537    pub is_booting: bool,
538    pub progress_percent: u8,
539    pub progress_message: String,
540    pub phase_elapsed_ms: u64,
541    pub total_elapsed_ms: u64,
542    pub remaining_budget_ms: u64,
543    pub failure_reason: Option<String>,
544    pub wal_records_replayed: u64,
545    pub wal_bytes_processed: u64,
546}
547
548impl HealthStatus {
549    /// Format as JSON for health endpoints
550    pub fn to_json(&self) -> String {
551        format!(
552            r#"{{"phase":"{}","is_ready":{},"is_alive":{},"is_booting":{},"progress_percent":{},"progress_message":"{}","phase_elapsed_ms":{},"total_elapsed_ms":{},"remaining_budget_ms":{},"failure_reason":{},"wal_records_replayed":{},"wal_bytes_processed":{}}}"#,
553            self.phase_name,
554            self.is_ready,
555            self.is_alive,
556            self.is_booting,
557            self.progress_percent,
558            self.progress_message.replace('"', "\\\""),
559            self.phase_elapsed_ms,
560            self.total_elapsed_ms,
561            self.remaining_budget_ms,
562            self.failure_reason
563                .as_ref()
564                .map(|s| format!("\"{}\"", s.replace('"', "\\\"")))
565                .unwrap_or_else(|| "null".to_string()),
566            self.wal_records_replayed,
567            self.wal_bytes_processed,
568        )
569    }
570}
571
572/// Boot orchestrator that coordinates the full boot sequence
573pub struct BootOrchestrator {
574    fsm: Arc<BootStateMachine>,
575}
576
577impl BootOrchestrator {
578    /// Create a new boot orchestrator
579    pub fn new(budgets: BootBudgets) -> Self {
580        Self {
581            fsm: Arc::new(BootStateMachine::new(budgets)),
582        }
583    }
584
585    /// Get the FSM for health checks
586    pub fn fsm(&self) -> Arc<BootStateMachine> {
587        self.fsm.clone()
588    }
589
590    /// Run the boot sequence with callbacks for each phase
591    pub fn run_boot<I, M, R, W>(
592        &self,
593        recovery_mode: RecoveryMode,
594        init_fn: I,
595        migrate_fn: M,
596        recover_fn: R,
597        warmup_fn: W,
598    ) -> Result<(), BootError>
599    where
600        I: FnOnce(&BootStateMachine) -> Result<(), BootError>,
601        M: FnOnce(&BootStateMachine) -> Result<(), BootError>,
602        R: FnOnce(&BootStateMachine) -> Result<PreloadHints, BootError>,
603        W: FnOnce(&BootStateMachine, PreloadHints) -> Result<(), BootError>,
604    {
605        // Start boot
606        self.fsm.start_boot(recovery_mode)?;
607
608        // Init phase
609        init_fn(&self.fsm)?;
610        self.fsm.transition_to(BootPhase::Migrate)?;
611
612        // Migrate phase
613        migrate_fn(&self.fsm)?;
614        let next_phase = match recovery_mode {
615            RecoveryMode::Normal => BootPhase::Recover,
616            RecoveryMode::ReadOnly => BootPhase::ReadOnlyRecovery,
617            RecoveryMode::Force => BootPhase::ForceRecovery,
618        };
619        self.fsm.transition_to(next_phase)?;
620
621        // Recover phase
622        let hints = recover_fn(&self.fsm)?;
623        self.fsm.set_preload_hints(hints.clone());
624
625        // Warmup phase (optional skip)
626        if hints.working_set_bytes > 0 || !hints.indexes.is_empty() {
627            self.fsm.transition_to(BootPhase::Warmup)?;
628            warmup_fn(&self.fsm, hints)?;
629        }
630
631        // Ready
632        self.fsm.transition_to(BootPhase::Ready)?;
633
634        Ok(())
635    }
636
637    /// Initiate graceful shutdown
638    pub fn shutdown(&self) -> Result<(), BootError> {
639        self.fsm.transition_to(BootPhase::ShuttingDown)
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646
647    #[test]
648    fn test_boot_fsm_transitions() {
649        let fsm = BootStateMachine::with_defaults();
650
651        // Start boot
652        assert!(fsm.start_boot(RecoveryMode::Normal).is_ok());
653        assert_eq!(fsm.current_phase(), BootPhase::Init);
654
655        // Progress through phases
656        assert!(fsm.transition_to(BootPhase::Migrate).is_ok());
657        assert!(fsm.transition_to(BootPhase::Recover).is_ok());
658        assert!(fsm.transition_to(BootPhase::Warmup).is_ok());
659        assert!(fsm.transition_to(BootPhase::Ready).is_ok());
660
661        assert!(fsm.is_ready());
662        assert!(fsm.is_alive());
663    }
664
665    #[test]
666    fn test_invalid_transition() {
667        let fsm = BootStateMachine::with_defaults();
668        fsm.start_boot(RecoveryMode::Normal).unwrap();
669
670        // Can't skip to Ready from Init
671        assert!(fsm.transition_to(BootPhase::Ready).is_err());
672    }
673
674    #[test]
675    fn test_health_status() {
676        let fsm = BootStateMachine::with_defaults();
677        fsm.start_boot(RecoveryMode::Normal).unwrap();
678
679        let status = fsm.health_status();
680        assert!(!status.is_ready);
681        assert!(status.is_alive);
682        assert!(status.is_booting);
683        assert_eq!(status.phase_name, "init");
684    }
685
686    #[test]
687    fn test_progress_tracking() {
688        let fsm = BootStateMachine::with_defaults();
689        fsm.start_boot(RecoveryMode::Normal).unwrap();
690
691        fsm.record_wal_progress(100, 4096);
692        assert_eq!(
693            fsm.metrics().wal_records_replayed.load(Ordering::Relaxed),
694            100
695        );
696        assert_eq!(
697            fsm.metrics().wal_bytes_processed.load(Ordering::Relaxed),
698            4096
699        );
700    }
701
702    #[test]
703    fn test_kubernetes_budgets() {
704        let budgets = BootBudgets::for_kubernetes(600); // 10 minutes
705        assert!(budgets.recover_budget >= Duration::from_secs(300));
706        assert!(budgets.total_budget == Duration::from_secs(600));
707    }
708}