Skip to main content

sochdb_kernel/
boot_fsm.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Deterministic Boot Finite State Machine
19//!
20//! Implements a production-grade boot sequence with:
21//! - Well-defined states: `Init → Migrate → Recover → Warmup → Ready`
22//! - Time budgets for each phase (for Kubernetes probe alignment)
23//! - Progress reporting for external observability
24//! - Recovery modes: Normal, ReadOnlyRecovery, ForceRecovery
25//!
26//! ## Kubernetes Integration
27//!
28//! The FSM exports progress metrics that align with K8s probe semantics:
29//! - `startupProbe`: tolerates long recovery (uses recovery budget)
30//! - `readinessProbe`: true only when FSM is in `Ready`
31//! - `livenessProbe`: heartbeat-based (separate from FSM)
32//!
33//! ## Safety Property
34//!
35//! `Ready ⇒ (recovery_complete ∧ invariants_checked ∧ services_registered)`
36//!
37//! ## Complexity Bounds
38//!
39//! Recovery is O(|WAL| + |checkpoint|). The FSM tracks and exposes this
40//! to allow operators to configure appropriate probe timeouts.
41
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47
48/// Boot phase states (DFA transitions)
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50#[repr(u8)]
51pub enum BootPhase {
52    /// Initial state before any boot activity
53    Uninitialized = 0,
54    /// Initializing core subsystems (allocator, config, logging)
55    Init = 1,
56    /// Running schema/format migrations
57    Migrate = 2,
58    /// Recovering from WAL (ARIES redo/undo)
59    Recover = 3,
60    /// Warming up caches and indexes
61    Warmup = 4,
62    /// Fully operational
63    Ready = 5,
64    /// Read-only recovery mode (for forensics)
65    ReadOnlyRecovery = 6,
66    /// Force recovery mode (skip some checks)
67    ForceRecovery = 7,
68    /// Graceful shutdown in progress
69    ShuttingDown = 8,
70    /// Boot failed (terminal state)
71    Failed = 9,
72}
73
74impl BootPhase {
75    /// Get human-readable phase name
76    pub fn name(&self) -> &'static str {
77        match self {
78            BootPhase::Uninitialized => "uninitialized",
79            BootPhase::Init => "init",
80            BootPhase::Migrate => "migrate",
81            BootPhase::Recover => "recover",
82            BootPhase::Warmup => "warmup",
83            BootPhase::Ready => "ready",
84            BootPhase::ReadOnlyRecovery => "readonly_recovery",
85            BootPhase::ForceRecovery => "force_recovery",
86            BootPhase::ShuttingDown => "shutting_down",
87            BootPhase::Failed => "failed",
88        }
89    }
90
91    /// Check if this phase indicates the system is ready for traffic
92    pub fn is_ready(&self) -> bool {
93        matches!(self, BootPhase::Ready)
94    }
95
96    /// Check if this phase indicates the system is alive (not dead)
97    pub fn is_alive(&self) -> bool {
98        !matches!(self, BootPhase::Failed)
99    }
100
101    /// Check if boot is still in progress
102    pub fn is_booting(&self) -> bool {
103        matches!(
104            self,
105            BootPhase::Init
106                | BootPhase::Migrate
107                | BootPhase::Recover
108                | BootPhase::Warmup
109                | BootPhase::ReadOnlyRecovery
110                | BootPhase::ForceRecovery
111        )
112    }
113}
114
115/// Progress information for a boot phase
116#[derive(Debug, Clone)]
117pub struct PhaseProgress {
118    /// Current progress (0-100)
119    pub percent: u8,
120    /// Human-readable status message
121    pub message: String,
122    /// Items processed (e.g., WAL records replayed)
123    pub items_processed: u64,
124    /// Total items to process (0 if unknown)
125    pub items_total: u64,
126    /// Bytes processed
127    pub bytes_processed: u64,
128    /// Total bytes to process (0 if unknown)
129    pub bytes_total: u64,
130    /// Time spent in this phase
131    pub elapsed: Duration,
132}
133
134impl Default for PhaseProgress {
135    fn default() -> Self {
136        Self {
137            percent: 0,
138            message: String::new(),
139            items_processed: 0,
140            items_total: 0,
141            bytes_processed: 0,
142            bytes_total: 0,
143            elapsed: Duration::ZERO,
144        }
145    }
146}
147
148/// Time budget configuration for each boot phase
149#[derive(Debug, Clone)]
150pub struct BootBudgets {
151    /// Maximum time for init phase
152    pub init_budget: Duration,
153    /// Maximum time for migration phase
154    pub migrate_budget: Duration,
155    /// Maximum time for recovery phase (WAL replay)
156    pub recover_budget: Duration,
157    /// Maximum time for warmup phase
158    pub warmup_budget: Duration,
159    /// Total boot timeout
160    pub total_budget: Duration,
161}
162
163impl Default for BootBudgets {
164    fn default() -> Self {
165        Self {
166            init_budget: Duration::from_secs(30),
167            migrate_budget: Duration::from_secs(300),   // 5 min for migrations
168            recover_budget: Duration::from_secs(1800),  // 30 min for WAL replay
169            warmup_budget: Duration::from_secs(300),    // 5 min for cache warmup
170            total_budget: Duration::from_secs(3600),    // 1 hour total
171        }
172    }
173}
174
175impl BootBudgets {
176    /// Create budgets suitable for Kubernetes startupProbe
177    /// 
178    /// K8s startupProbe checks are: failureThreshold × periodSeconds
179    /// These budgets should be less than that product.
180    pub fn for_kubernetes(startup_probe_total_seconds: u64) -> Self {
181        let total = Duration::from_secs(startup_probe_total_seconds);
182        Self {
183            init_budget: Duration::from_secs(startup_probe_total_seconds / 20),
184            migrate_budget: Duration::from_secs(startup_probe_total_seconds / 5),
185            recover_budget: Duration::from_secs(startup_probe_total_seconds * 3 / 5),
186            warmup_budget: Duration::from_secs(startup_probe_total_seconds / 10),
187            total_budget: total,
188        }
189    }
190}
191
192/// Recovery mode configuration
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum RecoveryMode {
195    /// Normal recovery with full ARIES protocol
196    Normal,
197    /// Read-only mode for forensics (no WAL writes)
198    ReadOnly,
199    /// Force recovery (skip some consistency checks)
200    Force,
201}
202
203/// Preload hints for deterministic warmup
204#[derive(Debug, Clone, Default)]
205pub struct PreloadHints {
206    /// Specific pages to preload
207    pub page_ids: Vec<u64>,
208    /// Index names to preload
209    pub indexes: Vec<String>,
210    /// Estimated working set size in bytes
211    pub working_set_bytes: u64,
212}
213
214/// Boot state machine with thread-safe state transitions
215pub struct BootStateMachine {
216    /// Current boot phase
217    phase: RwLock<BootPhase>,
218    /// Phase start time
219    phase_start: RwLock<Instant>,
220    /// Boot start time
221    boot_start: RwLock<Option<Instant>>,
222    /// Current phase progress
223    progress: RwLock<PhaseProgress>,
224    /// Time budgets
225    budgets: BootBudgets,
226    /// Recovery mode
227    recovery_mode: RwLock<RecoveryMode>,
228    /// Failure reason (if Failed)
229    failure_reason: RwLock<Option<String>>,
230    /// Preload hints for warmup
231    preload_hints: RwLock<PreloadHints>,
232    /// Metrics counters
233    metrics: BootMetrics,
234}
235
236/// Boot metrics for observability
237pub struct BootMetrics {
238    /// Number of WAL records replayed
239    pub wal_records_replayed: AtomicU64,
240    /// Bytes of WAL data processed
241    pub wal_bytes_processed: AtomicU64,
242    /// Number of pages recovered
243    pub pages_recovered: AtomicU64,
244    /// Number of transactions rolled back
245    pub txns_rolled_back: AtomicU64,
246    /// Checkpoint scan bytes
247    pub checkpoint_bytes_scanned: AtomicU64,
248    /// Migration steps completed
249    pub migration_steps_completed: AtomicU64,
250    /// Cache hit rate during warmup (scaled by 1000)
251    pub warmup_hit_rate_permille: AtomicU64,
252}
253
254impl Default for BootMetrics {
255    fn default() -> Self {
256        Self {
257            wal_records_replayed: AtomicU64::new(0),
258            wal_bytes_processed: AtomicU64::new(0),
259            pages_recovered: AtomicU64::new(0),
260            txns_rolled_back: AtomicU64::new(0),
261            checkpoint_bytes_scanned: AtomicU64::new(0),
262            migration_steps_completed: AtomicU64::new(0),
263            warmup_hit_rate_permille: AtomicU64::new(0),
264        }
265    }
266}
267
268/// Error during boot
269#[derive(Debug, Clone)]
270pub struct BootError {
271    pub phase: BootPhase,
272    pub message: String,
273    pub recoverable: bool,
274}
275
276impl std::fmt::Display for BootError {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        write!(
279            f,
280            "Boot error in phase {}: {} (recoverable: {})",
281            self.phase.name(),
282            self.message,
283            self.recoverable
284        )
285    }
286}
287
288impl std::error::Error for BootError {}
289
290impl BootStateMachine {
291    /// Create a new boot state machine
292    pub fn new(budgets: BootBudgets) -> Self {
293        Self {
294            phase: RwLock::new(BootPhase::Uninitialized),
295            phase_start: RwLock::new(Instant::now()),
296            boot_start: RwLock::new(None),
297            progress: RwLock::new(PhaseProgress::default()),
298            budgets,
299            recovery_mode: RwLock::new(RecoveryMode::Normal),
300            failure_reason: RwLock::new(None),
301            preload_hints: RwLock::new(PreloadHints::default()),
302            metrics: BootMetrics::default(),
303        }
304    }
305
306    /// Create with default budgets
307    pub fn with_defaults() -> Self {
308        Self::new(BootBudgets::default())
309    }
310
311    /// Get current boot phase
312    pub fn current_phase(&self) -> BootPhase {
313        *self.phase.read()
314    }
315
316    /// Get current progress
317    pub fn current_progress(&self) -> PhaseProgress {
318        let mut progress = self.progress.read().clone();
319        progress.elapsed = self.phase_start.read().elapsed();
320        progress
321    }
322
323    /// Check if system is ready for traffic
324    pub fn is_ready(&self) -> bool {
325        self.current_phase().is_ready()
326    }
327
328    /// Check if system is alive (for liveness probe)
329    pub fn is_alive(&self) -> bool {
330        self.current_phase().is_alive()
331    }
332
333    /// Get time remaining in current phase budget
334    pub fn remaining_budget(&self) -> Duration {
335        let phase = *self.phase.read();
336        let elapsed = self.phase_start.read().elapsed();
337        let budget = match phase {
338            BootPhase::Init => self.budgets.init_budget,
339            BootPhase::Migrate => self.budgets.migrate_budget,
340            BootPhase::Recover | BootPhase::ReadOnlyRecovery | BootPhase::ForceRecovery => {
341                self.budgets.recover_budget
342            }
343            BootPhase::Warmup => self.budgets.warmup_budget,
344            _ => Duration::ZERO,
345        };
346        budget.saturating_sub(elapsed)
347    }
348
349    /// Get total boot elapsed time
350    pub fn total_elapsed(&self) -> Duration {
351        self.boot_start
352            .read()
353            .map(|t| t.elapsed())
354            .unwrap_or(Duration::ZERO)
355    }
356
357    /// Start the boot sequence
358    pub fn start_boot(&self, recovery_mode: RecoveryMode) -> Result<(), BootError> {
359        let mut phase = self.phase.write();
360        if *phase != BootPhase::Uninitialized {
361            return Err(BootError {
362                phase: *phase,
363                message: "Boot already started".to_string(),
364                recoverable: false,
365            });
366        }
367
368        *self.boot_start.write() = Some(Instant::now());
369        *self.recovery_mode.write() = recovery_mode;
370        *phase = BootPhase::Init;
371        *self.phase_start.write() = Instant::now();
372        *self.progress.write() = PhaseProgress {
373            message: "Initializing core subsystems".to_string(),
374            ..Default::default()
375        };
376
377        Ok(())
378    }
379
380    /// Transition to next phase
381    pub fn transition_to(&self, next_phase: BootPhase) -> Result<(), BootError> {
382        let mut phase = self.phase.write();
383        let current = *phase;
384
385        // Validate transition
386        let valid = match (current, next_phase) {
387            (BootPhase::Uninitialized, BootPhase::Init) => true,
388            (BootPhase::Init, BootPhase::Migrate) => true,
389            (BootPhase::Init, BootPhase::Failed) => true,
390            (BootPhase::Migrate, BootPhase::Recover) => true,
391            (BootPhase::Migrate, BootPhase::ReadOnlyRecovery) => true,
392            (BootPhase::Migrate, BootPhase::ForceRecovery) => true,
393            (BootPhase::Migrate, BootPhase::Failed) => true,
394            (BootPhase::Recover, BootPhase::Warmup) => true,
395            (BootPhase::Recover, BootPhase::Ready) => true, // Skip warmup
396            (BootPhase::Recover, BootPhase::Failed) => true,
397            (BootPhase::ReadOnlyRecovery, BootPhase::Ready) => true,
398            (BootPhase::ReadOnlyRecovery, BootPhase::Failed) => true,
399            (BootPhase::ForceRecovery, BootPhase::Warmup) => true,
400            (BootPhase::ForceRecovery, BootPhase::Ready) => true,
401            (BootPhase::ForceRecovery, BootPhase::Failed) => true,
402            (BootPhase::Warmup, BootPhase::Ready) => true,
403            (BootPhase::Warmup, BootPhase::Failed) => true,
404            (BootPhase::Ready, BootPhase::ShuttingDown) => true,
405            (_, BootPhase::Failed) => true, // Can always fail
406            _ => false,
407        };
408
409        if !valid {
410            return Err(BootError {
411                phase: current,
412                message: format!(
413                    "Invalid transition: {} -> {}",
414                    current.name(),
415                    next_phase.name()
416                ),
417                recoverable: false,
418            });
419        }
420
421        // Check budget exceeded
422        if self.remaining_budget() == Duration::ZERO && current.is_booting() {
423            *phase = BootPhase::Failed;
424            *self.failure_reason.write() = Some(format!(
425                "Budget exceeded in phase {}",
426                current.name()
427            ));
428            return Err(BootError {
429                phase: current,
430                message: "Phase budget exceeded".to_string(),
431                recoverable: false,
432            });
433        }
434
435        *phase = next_phase;
436        *self.phase_start.write() = Instant::now();
437        *self.progress.write() = PhaseProgress::default();
438
439        Ok(())
440    }
441
442    /// Update progress within current phase
443    pub fn update_progress(&self, progress: PhaseProgress) {
444        *self.progress.write() = progress;
445    }
446
447    /// Mark boot as failed with reason
448    pub fn fail(&self, reason: &str) {
449        let current = *self.phase.read();
450        *self.phase.write() = BootPhase::Failed;
451        *self.failure_reason.write() = Some(format!(
452            "Failed in {}: {}",
453            current.name(),
454            reason
455        ));
456    }
457
458    /// Get failure reason if failed
459    pub fn failure_reason(&self) -> Option<String> {
460        self.failure_reason.read().clone()
461    }
462
463    /// Set preload hints for warmup phase
464    pub fn set_preload_hints(&self, hints: PreloadHints) {
465        *self.preload_hints.write() = hints;
466    }
467
468    /// Get preload hints
469    pub fn preload_hints(&self) -> PreloadHints {
470        self.preload_hints.read().clone()
471    }
472
473    /// Get boot metrics
474    pub fn metrics(&self) -> &BootMetrics {
475        &self.metrics
476    }
477
478    /// Record WAL replay progress
479    pub fn record_wal_progress(&self, records: u64, bytes: u64) {
480        self.metrics.wal_records_replayed.fetch_add(records, Ordering::Relaxed);
481        self.metrics.wal_bytes_processed.fetch_add(bytes, Ordering::Relaxed);
482    }
483
484    /// Record page recovery
485    pub fn record_page_recovered(&self, count: u64) {
486        self.metrics.pages_recovered.fetch_add(count, Ordering::Relaxed);
487    }
488
489    /// Record transaction rollback
490    pub fn record_txn_rollback(&self, count: u64) {
491        self.metrics.txns_rolled_back.fetch_add(count, Ordering::Relaxed);
492    }
493
494    /// Generate health check response for Kubernetes probes
495    pub fn health_status(&self) -> HealthStatus {
496        let phase = self.current_phase();
497        let progress = self.current_progress();
498
499        HealthStatus {
500            phase,
501            phase_name: phase.name().to_string(),
502            is_ready: phase.is_ready(),
503            is_alive: phase.is_alive(),
504            is_booting: phase.is_booting(),
505            progress_percent: progress.percent,
506            progress_message: progress.message,
507            phase_elapsed_ms: progress.elapsed.as_millis() as u64,
508            total_elapsed_ms: self.total_elapsed().as_millis() as u64,
509            remaining_budget_ms: self.remaining_budget().as_millis() as u64,
510            failure_reason: self.failure_reason(),
511            wal_records_replayed: self.metrics.wal_records_replayed.load(Ordering::Relaxed),
512            wal_bytes_processed: self.metrics.wal_bytes_processed.load(Ordering::Relaxed),
513        }
514    }
515}
516
517/// Health status for probes and observability
518#[derive(Debug, Clone)]
519pub struct HealthStatus {
520    pub phase: BootPhase,
521    pub phase_name: String,
522    pub is_ready: bool,
523    pub is_alive: bool,
524    pub is_booting: bool,
525    pub progress_percent: u8,
526    pub progress_message: String,
527    pub phase_elapsed_ms: u64,
528    pub total_elapsed_ms: u64,
529    pub remaining_budget_ms: u64,
530    pub failure_reason: Option<String>,
531    pub wal_records_replayed: u64,
532    pub wal_bytes_processed: u64,
533}
534
535impl HealthStatus {
536    /// Format as JSON for health endpoints
537    pub fn to_json(&self) -> String {
538        format!(
539            r#"{{"phase":"{}","is_ready":{},"is_alive":{},"is_booting":{},"progress_percent":{},"progress_message":"{}","phase_elapsed_ms":{},"total_elapsed_ms":{},"remaining_budget_ms":{},"failure_reason":{},"wal_records_replayed":{},"wal_bytes_processed":{}}}"#,
540            self.phase_name,
541            self.is_ready,
542            self.is_alive,
543            self.is_booting,
544            self.progress_percent,
545            self.progress_message.replace('"', "\\\""),
546            self.phase_elapsed_ms,
547            self.total_elapsed_ms,
548            self.remaining_budget_ms,
549            self.failure_reason.as_ref().map(|s| format!("\"{}\"", s.replace('"', "\\\""))).unwrap_or_else(|| "null".to_string()),
550            self.wal_records_replayed,
551            self.wal_bytes_processed,
552        )
553    }
554}
555
556/// Boot orchestrator that coordinates the full boot sequence
557pub struct BootOrchestrator {
558    fsm: Arc<BootStateMachine>,
559}
560
561impl BootOrchestrator {
562    /// Create a new boot orchestrator
563    pub fn new(budgets: BootBudgets) -> Self {
564        Self {
565            fsm: Arc::new(BootStateMachine::new(budgets)),
566        }
567    }
568
569    /// Get the FSM for health checks
570    pub fn fsm(&self) -> Arc<BootStateMachine> {
571        self.fsm.clone()
572    }
573
574    /// Run the boot sequence with callbacks for each phase
575    pub fn run_boot<I, M, R, W>(
576        &self,
577        recovery_mode: RecoveryMode,
578        init_fn: I,
579        migrate_fn: M,
580        recover_fn: R,
581        warmup_fn: W,
582    ) -> Result<(), BootError>
583    where
584        I: FnOnce(&BootStateMachine) -> Result<(), BootError>,
585        M: FnOnce(&BootStateMachine) -> Result<(), BootError>,
586        R: FnOnce(&BootStateMachine) -> Result<PreloadHints, BootError>,
587        W: FnOnce(&BootStateMachine, PreloadHints) -> Result<(), BootError>,
588    {
589        // Start boot
590        self.fsm.start_boot(recovery_mode)?;
591
592        // Init phase
593        init_fn(&self.fsm)?;
594        self.fsm.transition_to(BootPhase::Migrate)?;
595
596        // Migrate phase
597        migrate_fn(&self.fsm)?;
598        let next_phase = match recovery_mode {
599            RecoveryMode::Normal => BootPhase::Recover,
600            RecoveryMode::ReadOnly => BootPhase::ReadOnlyRecovery,
601            RecoveryMode::Force => BootPhase::ForceRecovery,
602        };
603        self.fsm.transition_to(next_phase)?;
604
605        // Recover phase
606        let hints = recover_fn(&self.fsm)?;
607        self.fsm.set_preload_hints(hints.clone());
608
609        // Warmup phase (optional skip)
610        if hints.working_set_bytes > 0 || !hints.indexes.is_empty() {
611            self.fsm.transition_to(BootPhase::Warmup)?;
612            warmup_fn(&self.fsm, hints)?;
613        }
614
615        // Ready
616        self.fsm.transition_to(BootPhase::Ready)?;
617
618        Ok(())
619    }
620
621    /// Initiate graceful shutdown
622    pub fn shutdown(&self) -> Result<(), BootError> {
623        self.fsm.transition_to(BootPhase::ShuttingDown)
624    }
625}
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630
631    #[test]
632    fn test_boot_fsm_transitions() {
633        let fsm = BootStateMachine::with_defaults();
634
635        // Start boot
636        assert!(fsm.start_boot(RecoveryMode::Normal).is_ok());
637        assert_eq!(fsm.current_phase(), BootPhase::Init);
638
639        // Progress through phases
640        assert!(fsm.transition_to(BootPhase::Migrate).is_ok());
641        assert!(fsm.transition_to(BootPhase::Recover).is_ok());
642        assert!(fsm.transition_to(BootPhase::Warmup).is_ok());
643        assert!(fsm.transition_to(BootPhase::Ready).is_ok());
644
645        assert!(fsm.is_ready());
646        assert!(fsm.is_alive());
647    }
648
649    #[test]
650    fn test_invalid_transition() {
651        let fsm = BootStateMachine::with_defaults();
652        fsm.start_boot(RecoveryMode::Normal).unwrap();
653
654        // Can't skip to Ready from Init
655        assert!(fsm.transition_to(BootPhase::Ready).is_err());
656    }
657
658    #[test]
659    fn test_health_status() {
660        let fsm = BootStateMachine::with_defaults();
661        fsm.start_boot(RecoveryMode::Normal).unwrap();
662
663        let status = fsm.health_status();
664        assert!(!status.is_ready);
665        assert!(status.is_alive);
666        assert!(status.is_booting);
667        assert_eq!(status.phase_name, "init");
668    }
669
670    #[test]
671    fn test_progress_tracking() {
672        let fsm = BootStateMachine::with_defaults();
673        fsm.start_boot(RecoveryMode::Normal).unwrap();
674
675        fsm.record_wal_progress(100, 4096);
676        assert_eq!(fsm.metrics().wal_records_replayed.load(Ordering::Relaxed), 100);
677        assert_eq!(fsm.metrics().wal_bytes_processed.load(Ordering::Relaxed), 4096);
678    }
679
680    #[test]
681    fn test_kubernetes_budgets() {
682        let budgets = BootBudgets::for_kubernetes(600); // 10 minutes
683        assert!(budgets.recover_budget >= Duration::from_secs(300));
684        assert!(budgets.total_budget == Duration::from_secs(600));
685    }
686}