use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use parking_lot::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum BootPhase {
Uninitialized = 0,
Init = 1,
Migrate = 2,
Recover = 3,
Warmup = 4,
Ready = 5,
ReadOnlyRecovery = 6,
ForceRecovery = 7,
ShuttingDown = 8,
Failed = 9,
}
impl BootPhase {
pub fn name(&self) -> &'static str {
match self {
BootPhase::Uninitialized => "uninitialized",
BootPhase::Init => "init",
BootPhase::Migrate => "migrate",
BootPhase::Recover => "recover",
BootPhase::Warmup => "warmup",
BootPhase::Ready => "ready",
BootPhase::ReadOnlyRecovery => "readonly_recovery",
BootPhase::ForceRecovery => "force_recovery",
BootPhase::ShuttingDown => "shutting_down",
BootPhase::Failed => "failed",
}
}
pub fn is_ready(&self) -> bool {
matches!(self, BootPhase::Ready)
}
pub fn is_alive(&self) -> bool {
!matches!(self, BootPhase::Failed)
}
pub fn is_booting(&self) -> bool {
matches!(
self,
BootPhase::Init
| BootPhase::Migrate
| BootPhase::Recover
| BootPhase::Warmup
| BootPhase::ReadOnlyRecovery
| BootPhase::ForceRecovery
)
}
}
#[derive(Debug, Clone)]
pub struct PhaseProgress {
pub percent: u8,
pub message: String,
pub items_processed: u64,
pub items_total: u64,
pub bytes_processed: u64,
pub bytes_total: u64,
pub elapsed: Duration,
}
impl Default for PhaseProgress {
fn default() -> Self {
Self {
percent: 0,
message: String::new(),
items_processed: 0,
items_total: 0,
bytes_processed: 0,
bytes_total: 0,
elapsed: Duration::ZERO,
}
}
}
#[derive(Debug, Clone)]
pub struct BootBudgets {
pub init_budget: Duration,
pub migrate_budget: Duration,
pub recover_budget: Duration,
pub warmup_budget: Duration,
pub total_budget: Duration,
}
impl Default for BootBudgets {
fn default() -> Self {
Self {
init_budget: Duration::from_secs(30),
migrate_budget: Duration::from_secs(300), recover_budget: Duration::from_secs(1800), warmup_budget: Duration::from_secs(300), total_budget: Duration::from_secs(3600), }
}
}
impl BootBudgets {
pub fn for_kubernetes(startup_probe_total_seconds: u64) -> Self {
let total = Duration::from_secs(startup_probe_total_seconds);
Self {
init_budget: Duration::from_secs(startup_probe_total_seconds / 20),
migrate_budget: Duration::from_secs(startup_probe_total_seconds / 5),
recover_budget: Duration::from_secs(startup_probe_total_seconds * 3 / 5),
warmup_budget: Duration::from_secs(startup_probe_total_seconds / 10),
total_budget: total,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryMode {
Normal,
ReadOnly,
Force,
}
#[derive(Debug, Clone, Default)]
pub struct PreloadHints {
pub page_ids: Vec<u64>,
pub indexes: Vec<String>,
pub working_set_bytes: u64,
}
pub struct BootStateMachine {
phase: RwLock<BootPhase>,
phase_start: RwLock<Instant>,
boot_start: RwLock<Option<Instant>>,
progress: RwLock<PhaseProgress>,
budgets: BootBudgets,
recovery_mode: RwLock<RecoveryMode>,
failure_reason: RwLock<Option<String>>,
preload_hints: RwLock<PreloadHints>,
metrics: BootMetrics,
}
pub struct BootMetrics {
pub wal_records_replayed: AtomicU64,
pub wal_bytes_processed: AtomicU64,
pub pages_recovered: AtomicU64,
pub txns_rolled_back: AtomicU64,
pub checkpoint_bytes_scanned: AtomicU64,
pub migration_steps_completed: AtomicU64,
pub warmup_hit_rate_permille: AtomicU64,
}
impl Default for BootMetrics {
fn default() -> Self {
Self {
wal_records_replayed: AtomicU64::new(0),
wal_bytes_processed: AtomicU64::new(0),
pages_recovered: AtomicU64::new(0),
txns_rolled_back: AtomicU64::new(0),
checkpoint_bytes_scanned: AtomicU64::new(0),
migration_steps_completed: AtomicU64::new(0),
warmup_hit_rate_permille: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct BootError {
pub phase: BootPhase,
pub message: String,
pub recoverable: bool,
}
impl std::fmt::Display for BootError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Boot error in phase {}: {} (recoverable: {})",
self.phase.name(),
self.message,
self.recoverable
)
}
}
impl std::error::Error for BootError {}
impl BootStateMachine {
pub fn new(budgets: BootBudgets) -> Self {
Self {
phase: RwLock::new(BootPhase::Uninitialized),
phase_start: RwLock::new(Instant::now()),
boot_start: RwLock::new(None),
progress: RwLock::new(PhaseProgress::default()),
budgets,
recovery_mode: RwLock::new(RecoveryMode::Normal),
failure_reason: RwLock::new(None),
preload_hints: RwLock::new(PreloadHints::default()),
metrics: BootMetrics::default(),
}
}
pub fn with_defaults() -> Self {
Self::new(BootBudgets::default())
}
pub fn current_phase(&self) -> BootPhase {
*self.phase.read()
}
pub fn current_progress(&self) -> PhaseProgress {
let mut progress = self.progress.read().clone();
progress.elapsed = self.phase_start.read().elapsed();
progress
}
pub fn is_ready(&self) -> bool {
self.current_phase().is_ready()
}
pub fn is_alive(&self) -> bool {
self.current_phase().is_alive()
}
pub fn remaining_budget(&self) -> Duration {
self.remaining_budget_for(*self.phase.read())
}
fn remaining_budget_for(&self, phase: BootPhase) -> Duration {
let elapsed = self.phase_start.read().elapsed();
let budget = match phase {
BootPhase::Init => self.budgets.init_budget,
BootPhase::Migrate => self.budgets.migrate_budget,
BootPhase::Recover | BootPhase::ReadOnlyRecovery | BootPhase::ForceRecovery => {
self.budgets.recover_budget
}
BootPhase::Warmup => self.budgets.warmup_budget,
_ => Duration::ZERO,
};
budget.saturating_sub(elapsed)
}
pub fn total_elapsed(&self) -> Duration {
self.boot_start
.read()
.map(|t| t.elapsed())
.unwrap_or(Duration::ZERO)
}
pub fn start_boot(&self, recovery_mode: RecoveryMode) -> Result<(), BootError> {
let mut phase = self.phase.write();
if *phase != BootPhase::Uninitialized {
return Err(BootError {
phase: *phase,
message: "Boot already started".to_string(),
recoverable: false,
});
}
*self.boot_start.write() = Some(Instant::now());
*self.recovery_mode.write() = recovery_mode;
*phase = BootPhase::Init;
*self.phase_start.write() = Instant::now();
*self.progress.write() = PhaseProgress {
message: "Initializing core subsystems".to_string(),
..Default::default()
};
Ok(())
}
pub fn transition_to(&self, next_phase: BootPhase) -> Result<(), BootError> {
let mut phase = self.phase.write();
let current = *phase;
let valid = match (current, next_phase) {
(BootPhase::Uninitialized, BootPhase::Init) => true,
(BootPhase::Init, BootPhase::Migrate) => true,
(BootPhase::Init, BootPhase::Failed) => true,
(BootPhase::Migrate, BootPhase::Recover) => true,
(BootPhase::Migrate, BootPhase::ReadOnlyRecovery) => true,
(BootPhase::Migrate, BootPhase::ForceRecovery) => true,
(BootPhase::Migrate, BootPhase::Failed) => true,
(BootPhase::Recover, BootPhase::Warmup) => true,
(BootPhase::Recover, BootPhase::Ready) => true, (BootPhase::Recover, BootPhase::Failed) => true,
(BootPhase::ReadOnlyRecovery, BootPhase::Ready) => true,
(BootPhase::ReadOnlyRecovery, BootPhase::Failed) => true,
(BootPhase::ForceRecovery, BootPhase::Warmup) => true,
(BootPhase::ForceRecovery, BootPhase::Ready) => true,
(BootPhase::ForceRecovery, BootPhase::Failed) => true,
(BootPhase::Warmup, BootPhase::Ready) => true,
(BootPhase::Warmup, BootPhase::Failed) => true,
(BootPhase::Ready, BootPhase::ShuttingDown) => true,
(_, BootPhase::Failed) => true, _ => false,
};
if !valid {
return Err(BootError {
phase: current,
message: format!(
"Invalid transition: {} -> {}",
current.name(),
next_phase.name()
),
recoverable: false,
});
}
if self.remaining_budget_for(current) == Duration::ZERO && current.is_booting() {
*phase = BootPhase::Failed;
*self.failure_reason.write() =
Some(format!("Budget exceeded in phase {}", current.name()));
return Err(BootError {
phase: current,
message: "Phase budget exceeded".to_string(),
recoverable: false,
});
}
*phase = next_phase;
*self.phase_start.write() = Instant::now();
*self.progress.write() = PhaseProgress::default();
Ok(())
}
pub fn update_progress(&self, progress: PhaseProgress) {
*self.progress.write() = progress;
}
pub fn fail(&self, reason: &str) {
let current = *self.phase.read();
*self.phase.write() = BootPhase::Failed;
*self.failure_reason.write() = Some(format!("Failed in {}: {}", current.name(), reason));
}
pub fn failure_reason(&self) -> Option<String> {
self.failure_reason.read().clone()
}
pub fn set_preload_hints(&self, hints: PreloadHints) {
*self.preload_hints.write() = hints;
}
pub fn preload_hints(&self) -> PreloadHints {
self.preload_hints.read().clone()
}
pub fn metrics(&self) -> &BootMetrics {
&self.metrics
}
pub fn record_wal_progress(&self, records: u64, bytes: u64) {
self.metrics
.wal_records_replayed
.fetch_add(records, Ordering::Relaxed);
self.metrics
.wal_bytes_processed
.fetch_add(bytes, Ordering::Relaxed);
}
pub fn record_page_recovered(&self, count: u64) {
self.metrics
.pages_recovered
.fetch_add(count, Ordering::Relaxed);
}
pub fn record_txn_rollback(&self, count: u64) {
self.metrics
.txns_rolled_back
.fetch_add(count, Ordering::Relaxed);
}
pub fn health_status(&self) -> HealthStatus {
let phase = self.current_phase();
let progress = self.current_progress();
HealthStatus {
phase,
phase_name: phase.name().to_string(),
is_ready: phase.is_ready(),
is_alive: phase.is_alive(),
is_booting: phase.is_booting(),
progress_percent: progress.percent,
progress_message: progress.message,
phase_elapsed_ms: progress.elapsed.as_millis() as u64,
total_elapsed_ms: self.total_elapsed().as_millis() as u64,
remaining_budget_ms: self.remaining_budget().as_millis() as u64,
failure_reason: self.failure_reason(),
wal_records_replayed: self.metrics.wal_records_replayed.load(Ordering::Relaxed),
wal_bytes_processed: self.metrics.wal_bytes_processed.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct HealthStatus {
pub phase: BootPhase,
pub phase_name: String,
pub is_ready: bool,
pub is_alive: bool,
pub is_booting: bool,
pub progress_percent: u8,
pub progress_message: String,
pub phase_elapsed_ms: u64,
pub total_elapsed_ms: u64,
pub remaining_budget_ms: u64,
pub failure_reason: Option<String>,
pub wal_records_replayed: u64,
pub wal_bytes_processed: u64,
}
impl HealthStatus {
pub fn to_json(&self) -> String {
format!(
r#"{{"phase":"{}","is_ready":{},"is_alive":{},"is_booting":{},"progress_percent":{},"progress_message":"{}","phase_elapsed_ms":{},"total_elapsed_ms":{},"remaining_budget_ms":{},"failure_reason":{},"wal_records_replayed":{},"wal_bytes_processed":{}}}"#,
self.phase_name,
self.is_ready,
self.is_alive,
self.is_booting,
self.progress_percent,
self.progress_message.replace('"', "\\\""),
self.phase_elapsed_ms,
self.total_elapsed_ms,
self.remaining_budget_ms,
self.failure_reason
.as_ref()
.map(|s| format!("\"{}\"", s.replace('"', "\\\"")))
.unwrap_or_else(|| "null".to_string()),
self.wal_records_replayed,
self.wal_bytes_processed,
)
}
}
pub struct BootOrchestrator {
fsm: Arc<BootStateMachine>,
}
impl BootOrchestrator {
pub fn new(budgets: BootBudgets) -> Self {
Self {
fsm: Arc::new(BootStateMachine::new(budgets)),
}
}
pub fn fsm(&self) -> Arc<BootStateMachine> {
self.fsm.clone()
}
pub fn run_boot<I, M, R, W>(
&self,
recovery_mode: RecoveryMode,
init_fn: I,
migrate_fn: M,
recover_fn: R,
warmup_fn: W,
) -> Result<(), BootError>
where
I: FnOnce(&BootStateMachine) -> Result<(), BootError>,
M: FnOnce(&BootStateMachine) -> Result<(), BootError>,
R: FnOnce(&BootStateMachine) -> Result<PreloadHints, BootError>,
W: FnOnce(&BootStateMachine, PreloadHints) -> Result<(), BootError>,
{
self.fsm.start_boot(recovery_mode)?;
init_fn(&self.fsm)?;
self.fsm.transition_to(BootPhase::Migrate)?;
migrate_fn(&self.fsm)?;
let next_phase = match recovery_mode {
RecoveryMode::Normal => BootPhase::Recover,
RecoveryMode::ReadOnly => BootPhase::ReadOnlyRecovery,
RecoveryMode::Force => BootPhase::ForceRecovery,
};
self.fsm.transition_to(next_phase)?;
let hints = recover_fn(&self.fsm)?;
self.fsm.set_preload_hints(hints.clone());
if hints.working_set_bytes > 0 || !hints.indexes.is_empty() {
self.fsm.transition_to(BootPhase::Warmup)?;
warmup_fn(&self.fsm, hints)?;
}
self.fsm.transition_to(BootPhase::Ready)?;
Ok(())
}
pub fn shutdown(&self) -> Result<(), BootError> {
self.fsm.transition_to(BootPhase::ShuttingDown)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_boot_fsm_transitions() {
let fsm = BootStateMachine::with_defaults();
assert!(fsm.start_boot(RecoveryMode::Normal).is_ok());
assert_eq!(fsm.current_phase(), BootPhase::Init);
assert!(fsm.transition_to(BootPhase::Migrate).is_ok());
assert!(fsm.transition_to(BootPhase::Recover).is_ok());
assert!(fsm.transition_to(BootPhase::Warmup).is_ok());
assert!(fsm.transition_to(BootPhase::Ready).is_ok());
assert!(fsm.is_ready());
assert!(fsm.is_alive());
}
#[test]
fn test_invalid_transition() {
let fsm = BootStateMachine::with_defaults();
fsm.start_boot(RecoveryMode::Normal).unwrap();
assert!(fsm.transition_to(BootPhase::Ready).is_err());
}
#[test]
fn test_health_status() {
let fsm = BootStateMachine::with_defaults();
fsm.start_boot(RecoveryMode::Normal).unwrap();
let status = fsm.health_status();
assert!(!status.is_ready);
assert!(status.is_alive);
assert!(status.is_booting);
assert_eq!(status.phase_name, "init");
}
#[test]
fn test_progress_tracking() {
let fsm = BootStateMachine::with_defaults();
fsm.start_boot(RecoveryMode::Normal).unwrap();
fsm.record_wal_progress(100, 4096);
assert_eq!(
fsm.metrics().wal_records_replayed.load(Ordering::Relaxed),
100
);
assert_eq!(
fsm.metrics().wal_bytes_processed.load(Ordering::Relaxed),
4096
);
}
#[test]
fn test_kubernetes_budgets() {
let budgets = BootBudgets::for_kubernetes(600); assert!(budgets.recover_budget >= Duration::from_secs(300));
assert!(budgets.total_budget == Duration::from_secs(600));
}
}