1use std::sync::Arc;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50#[repr(u8)]
51pub enum BootPhase {
52 Uninitialized = 0,
54 Init = 1,
56 Migrate = 2,
58 Recover = 3,
60 Warmup = 4,
62 Ready = 5,
64 ReadOnlyRecovery = 6,
66 ForceRecovery = 7,
68 ShuttingDown = 8,
70 Failed = 9,
72}
73
74impl BootPhase {
75 pub fn name(&self) -> &'static str {
77 match self {
78 BootPhase::Uninitialized => "uninitialized",
79 BootPhase::Init => "init",
80 BootPhase::Migrate => "migrate",
81 BootPhase::Recover => "recover",
82 BootPhase::Warmup => "warmup",
83 BootPhase::Ready => "ready",
84 BootPhase::ReadOnlyRecovery => "readonly_recovery",
85 BootPhase::ForceRecovery => "force_recovery",
86 BootPhase::ShuttingDown => "shutting_down",
87 BootPhase::Failed => "failed",
88 }
89 }
90
91 pub fn is_ready(&self) -> bool {
93 matches!(self, BootPhase::Ready)
94 }
95
96 pub fn is_alive(&self) -> bool {
98 !matches!(self, BootPhase::Failed)
99 }
100
101 pub fn is_booting(&self) -> bool {
103 matches!(
104 self,
105 BootPhase::Init
106 | BootPhase::Migrate
107 | BootPhase::Recover
108 | BootPhase::Warmup
109 | BootPhase::ReadOnlyRecovery
110 | BootPhase::ForceRecovery
111 )
112 }
113}
114
115#[derive(Debug, Clone)]
117pub struct PhaseProgress {
118 pub percent: u8,
120 pub message: String,
122 pub items_processed: u64,
124 pub items_total: u64,
126 pub bytes_processed: u64,
128 pub bytes_total: u64,
130 pub elapsed: Duration,
132}
133
134impl Default for PhaseProgress {
135 fn default() -> Self {
136 Self {
137 percent: 0,
138 message: String::new(),
139 items_processed: 0,
140 items_total: 0,
141 bytes_processed: 0,
142 bytes_total: 0,
143 elapsed: Duration::ZERO,
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct BootBudgets {
151 pub init_budget: Duration,
153 pub migrate_budget: Duration,
155 pub recover_budget: Duration,
157 pub warmup_budget: Duration,
159 pub total_budget: Duration,
161}
162
163impl Default for BootBudgets {
164 fn default() -> Self {
165 Self {
166 init_budget: Duration::from_secs(30),
167 migrate_budget: Duration::from_secs(300), recover_budget: Duration::from_secs(1800), warmup_budget: Duration::from_secs(300), total_budget: Duration::from_secs(3600), }
172 }
173}
174
175impl BootBudgets {
176 pub fn for_kubernetes(startup_probe_total_seconds: u64) -> Self {
181 let total = Duration::from_secs(startup_probe_total_seconds);
182 Self {
183 init_budget: Duration::from_secs(startup_probe_total_seconds / 20),
184 migrate_budget: Duration::from_secs(startup_probe_total_seconds / 5),
185 recover_budget: Duration::from_secs(startup_probe_total_seconds * 3 / 5),
186 warmup_budget: Duration::from_secs(startup_probe_total_seconds / 10),
187 total_budget: total,
188 }
189 }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum RecoveryMode {
195 Normal,
197 ReadOnly,
199 Force,
201}
202
203#[derive(Debug, Clone, Default)]
205pub struct PreloadHints {
206 pub page_ids: Vec<u64>,
208 pub indexes: Vec<String>,
210 pub working_set_bytes: u64,
212}
213
214pub struct BootStateMachine {
216 phase: RwLock<BootPhase>,
218 phase_start: RwLock<Instant>,
220 boot_start: RwLock<Option<Instant>>,
222 progress: RwLock<PhaseProgress>,
224 budgets: BootBudgets,
226 recovery_mode: RwLock<RecoveryMode>,
228 failure_reason: RwLock<Option<String>>,
230 preload_hints: RwLock<PreloadHints>,
232 metrics: BootMetrics,
234}
235
236pub struct BootMetrics {
238 pub wal_records_replayed: AtomicU64,
240 pub wal_bytes_processed: AtomicU64,
242 pub pages_recovered: AtomicU64,
244 pub txns_rolled_back: AtomicU64,
246 pub checkpoint_bytes_scanned: AtomicU64,
248 pub migration_steps_completed: AtomicU64,
250 pub warmup_hit_rate_permille: AtomicU64,
252}
253
254impl Default for BootMetrics {
255 fn default() -> Self {
256 Self {
257 wal_records_replayed: AtomicU64::new(0),
258 wal_bytes_processed: AtomicU64::new(0),
259 pages_recovered: AtomicU64::new(0),
260 txns_rolled_back: AtomicU64::new(0),
261 checkpoint_bytes_scanned: AtomicU64::new(0),
262 migration_steps_completed: AtomicU64::new(0),
263 warmup_hit_rate_permille: AtomicU64::new(0),
264 }
265 }
266}
267
268#[derive(Debug, Clone)]
270pub struct BootError {
271 pub phase: BootPhase,
272 pub message: String,
273 pub recoverable: bool,
274}
275
276impl std::fmt::Display for BootError {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 write!(
279 f,
280 "Boot error in phase {}: {} (recoverable: {})",
281 self.phase.name(),
282 self.message,
283 self.recoverable
284 )
285 }
286}
287
288impl std::error::Error for BootError {}
289
290impl BootStateMachine {
291 pub fn new(budgets: BootBudgets) -> Self {
293 Self {
294 phase: RwLock::new(BootPhase::Uninitialized),
295 phase_start: RwLock::new(Instant::now()),
296 boot_start: RwLock::new(None),
297 progress: RwLock::new(PhaseProgress::default()),
298 budgets,
299 recovery_mode: RwLock::new(RecoveryMode::Normal),
300 failure_reason: RwLock::new(None),
301 preload_hints: RwLock::new(PreloadHints::default()),
302 metrics: BootMetrics::default(),
303 }
304 }
305
306 pub fn with_defaults() -> Self {
308 Self::new(BootBudgets::default())
309 }
310
311 pub fn current_phase(&self) -> BootPhase {
313 *self.phase.read()
314 }
315
316 pub fn current_progress(&self) -> PhaseProgress {
318 let mut progress = self.progress.read().clone();
319 progress.elapsed = self.phase_start.read().elapsed();
320 progress
321 }
322
323 pub fn is_ready(&self) -> bool {
325 self.current_phase().is_ready()
326 }
327
328 pub fn is_alive(&self) -> bool {
330 self.current_phase().is_alive()
331 }
332
333 pub fn remaining_budget(&self) -> Duration {
335 self.remaining_budget_for(*self.phase.read())
336 }
337
338 fn remaining_budget_for(&self, phase: BootPhase) -> Duration {
345 let elapsed = self.phase_start.read().elapsed();
346 let budget = match phase {
347 BootPhase::Init => self.budgets.init_budget,
348 BootPhase::Migrate => self.budgets.migrate_budget,
349 BootPhase::Recover | BootPhase::ReadOnlyRecovery | BootPhase::ForceRecovery => {
350 self.budgets.recover_budget
351 }
352 BootPhase::Warmup => self.budgets.warmup_budget,
353 _ => Duration::ZERO,
354 };
355 budget.saturating_sub(elapsed)
356 }
357
358 pub fn total_elapsed(&self) -> Duration {
360 self.boot_start
361 .read()
362 .map(|t| t.elapsed())
363 .unwrap_or(Duration::ZERO)
364 }
365
366 pub fn start_boot(&self, recovery_mode: RecoveryMode) -> Result<(), BootError> {
368 let mut phase = self.phase.write();
369 if *phase != BootPhase::Uninitialized {
370 return Err(BootError {
371 phase: *phase,
372 message: "Boot already started".to_string(),
373 recoverable: false,
374 });
375 }
376
377 *self.boot_start.write() = Some(Instant::now());
378 *self.recovery_mode.write() = recovery_mode;
379 *phase = BootPhase::Init;
380 *self.phase_start.write() = Instant::now();
381 *self.progress.write() = PhaseProgress {
382 message: "Initializing core subsystems".to_string(),
383 ..Default::default()
384 };
385
386 Ok(())
387 }
388
389 pub fn transition_to(&self, next_phase: BootPhase) -> Result<(), BootError> {
391 let mut phase = self.phase.write();
392 let current = *phase;
393
394 let valid = match (current, next_phase) {
396 (BootPhase::Uninitialized, BootPhase::Init) => true,
397 (BootPhase::Init, BootPhase::Migrate) => true,
398 (BootPhase::Init, BootPhase::Failed) => true,
399 (BootPhase::Migrate, BootPhase::Recover) => true,
400 (BootPhase::Migrate, BootPhase::ReadOnlyRecovery) => true,
401 (BootPhase::Migrate, BootPhase::ForceRecovery) => true,
402 (BootPhase::Migrate, BootPhase::Failed) => true,
403 (BootPhase::Recover, BootPhase::Warmup) => true,
404 (BootPhase::Recover, BootPhase::Ready) => true, (BootPhase::Recover, BootPhase::Failed) => true,
406 (BootPhase::ReadOnlyRecovery, BootPhase::Ready) => true,
407 (BootPhase::ReadOnlyRecovery, BootPhase::Failed) => true,
408 (BootPhase::ForceRecovery, BootPhase::Warmup) => true,
409 (BootPhase::ForceRecovery, BootPhase::Ready) => true,
410 (BootPhase::ForceRecovery, BootPhase::Failed) => true,
411 (BootPhase::Warmup, BootPhase::Ready) => true,
412 (BootPhase::Warmup, BootPhase::Failed) => true,
413 (BootPhase::Ready, BootPhase::ShuttingDown) => true,
414 (_, BootPhase::Failed) => true, _ => false,
416 };
417
418 if !valid {
419 return Err(BootError {
420 phase: current,
421 message: format!(
422 "Invalid transition: {} -> {}",
423 current.name(),
424 next_phase.name()
425 ),
426 recoverable: false,
427 });
428 }
429
430 if self.remaining_budget_for(current) == Duration::ZERO && current.is_booting() {
434 *phase = BootPhase::Failed;
435 *self.failure_reason.write() =
436 Some(format!("Budget exceeded in phase {}", current.name()));
437 return Err(BootError {
438 phase: current,
439 message: "Phase budget exceeded".to_string(),
440 recoverable: false,
441 });
442 }
443
444 *phase = next_phase;
445 *self.phase_start.write() = Instant::now();
446 *self.progress.write() = PhaseProgress::default();
447
448 Ok(())
449 }
450
451 pub fn update_progress(&self, progress: PhaseProgress) {
453 *self.progress.write() = progress;
454 }
455
456 pub fn fail(&self, reason: &str) {
458 let current = *self.phase.read();
459 *self.phase.write() = BootPhase::Failed;
460 *self.failure_reason.write() = Some(format!("Failed in {}: {}", current.name(), reason));
461 }
462
463 pub fn failure_reason(&self) -> Option<String> {
465 self.failure_reason.read().clone()
466 }
467
468 pub fn set_preload_hints(&self, hints: PreloadHints) {
470 *self.preload_hints.write() = hints;
471 }
472
473 pub fn preload_hints(&self) -> PreloadHints {
475 self.preload_hints.read().clone()
476 }
477
478 pub fn metrics(&self) -> &BootMetrics {
480 &self.metrics
481 }
482
483 pub fn record_wal_progress(&self, records: u64, bytes: u64) {
485 self.metrics
486 .wal_records_replayed
487 .fetch_add(records, Ordering::Relaxed);
488 self.metrics
489 .wal_bytes_processed
490 .fetch_add(bytes, Ordering::Relaxed);
491 }
492
493 pub fn record_page_recovered(&self, count: u64) {
495 self.metrics
496 .pages_recovered
497 .fetch_add(count, Ordering::Relaxed);
498 }
499
500 pub fn record_txn_rollback(&self, count: u64) {
502 self.metrics
503 .txns_rolled_back
504 .fetch_add(count, Ordering::Relaxed);
505 }
506
507 pub fn health_status(&self) -> HealthStatus {
509 let phase = self.current_phase();
510 let progress = self.current_progress();
511
512 HealthStatus {
513 phase,
514 phase_name: phase.name().to_string(),
515 is_ready: phase.is_ready(),
516 is_alive: phase.is_alive(),
517 is_booting: phase.is_booting(),
518 progress_percent: progress.percent,
519 progress_message: progress.message,
520 phase_elapsed_ms: progress.elapsed.as_millis() as u64,
521 total_elapsed_ms: self.total_elapsed().as_millis() as u64,
522 remaining_budget_ms: self.remaining_budget().as_millis() as u64,
523 failure_reason: self.failure_reason(),
524 wal_records_replayed: self.metrics.wal_records_replayed.load(Ordering::Relaxed),
525 wal_bytes_processed: self.metrics.wal_bytes_processed.load(Ordering::Relaxed),
526 }
527 }
528}
529
530#[derive(Debug, Clone)]
532pub struct HealthStatus {
533 pub phase: BootPhase,
534 pub phase_name: String,
535 pub is_ready: bool,
536 pub is_alive: bool,
537 pub is_booting: bool,
538 pub progress_percent: u8,
539 pub progress_message: String,
540 pub phase_elapsed_ms: u64,
541 pub total_elapsed_ms: u64,
542 pub remaining_budget_ms: u64,
543 pub failure_reason: Option<String>,
544 pub wal_records_replayed: u64,
545 pub wal_bytes_processed: u64,
546}
547
548impl HealthStatus {
549 pub fn to_json(&self) -> String {
551 format!(
552 r#"{{"phase":"{}","is_ready":{},"is_alive":{},"is_booting":{},"progress_percent":{},"progress_message":"{}","phase_elapsed_ms":{},"total_elapsed_ms":{},"remaining_budget_ms":{},"failure_reason":{},"wal_records_replayed":{},"wal_bytes_processed":{}}}"#,
553 self.phase_name,
554 self.is_ready,
555 self.is_alive,
556 self.is_booting,
557 self.progress_percent,
558 self.progress_message.replace('"', "\\\""),
559 self.phase_elapsed_ms,
560 self.total_elapsed_ms,
561 self.remaining_budget_ms,
562 self.failure_reason
563 .as_ref()
564 .map(|s| format!("\"{}\"", s.replace('"', "\\\"")))
565 .unwrap_or_else(|| "null".to_string()),
566 self.wal_records_replayed,
567 self.wal_bytes_processed,
568 )
569 }
570}
571
572pub struct BootOrchestrator {
574 fsm: Arc<BootStateMachine>,
575}
576
577impl BootOrchestrator {
578 pub fn new(budgets: BootBudgets) -> Self {
580 Self {
581 fsm: Arc::new(BootStateMachine::new(budgets)),
582 }
583 }
584
585 pub fn fsm(&self) -> Arc<BootStateMachine> {
587 self.fsm.clone()
588 }
589
590 pub fn run_boot<I, M, R, W>(
592 &self,
593 recovery_mode: RecoveryMode,
594 init_fn: I,
595 migrate_fn: M,
596 recover_fn: R,
597 warmup_fn: W,
598 ) -> Result<(), BootError>
599 where
600 I: FnOnce(&BootStateMachine) -> Result<(), BootError>,
601 M: FnOnce(&BootStateMachine) -> Result<(), BootError>,
602 R: FnOnce(&BootStateMachine) -> Result<PreloadHints, BootError>,
603 W: FnOnce(&BootStateMachine, PreloadHints) -> Result<(), BootError>,
604 {
605 self.fsm.start_boot(recovery_mode)?;
607
608 init_fn(&self.fsm)?;
610 self.fsm.transition_to(BootPhase::Migrate)?;
611
612 migrate_fn(&self.fsm)?;
614 let next_phase = match recovery_mode {
615 RecoveryMode::Normal => BootPhase::Recover,
616 RecoveryMode::ReadOnly => BootPhase::ReadOnlyRecovery,
617 RecoveryMode::Force => BootPhase::ForceRecovery,
618 };
619 self.fsm.transition_to(next_phase)?;
620
621 let hints = recover_fn(&self.fsm)?;
623 self.fsm.set_preload_hints(hints.clone());
624
625 if hints.working_set_bytes > 0 || !hints.indexes.is_empty() {
627 self.fsm.transition_to(BootPhase::Warmup)?;
628 warmup_fn(&self.fsm, hints)?;
629 }
630
631 self.fsm.transition_to(BootPhase::Ready)?;
633
634 Ok(())
635 }
636
637 pub fn shutdown(&self) -> Result<(), BootError> {
639 self.fsm.transition_to(BootPhase::ShuttingDown)
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 #[test]
648 fn test_boot_fsm_transitions() {
649 let fsm = BootStateMachine::with_defaults();
650
651 assert!(fsm.start_boot(RecoveryMode::Normal).is_ok());
653 assert_eq!(fsm.current_phase(), BootPhase::Init);
654
655 assert!(fsm.transition_to(BootPhase::Migrate).is_ok());
657 assert!(fsm.transition_to(BootPhase::Recover).is_ok());
658 assert!(fsm.transition_to(BootPhase::Warmup).is_ok());
659 assert!(fsm.transition_to(BootPhase::Ready).is_ok());
660
661 assert!(fsm.is_ready());
662 assert!(fsm.is_alive());
663 }
664
665 #[test]
666 fn test_invalid_transition() {
667 let fsm = BootStateMachine::with_defaults();
668 fsm.start_boot(RecoveryMode::Normal).unwrap();
669
670 assert!(fsm.transition_to(BootPhase::Ready).is_err());
672 }
673
674 #[test]
675 fn test_health_status() {
676 let fsm = BootStateMachine::with_defaults();
677 fsm.start_boot(RecoveryMode::Normal).unwrap();
678
679 let status = fsm.health_status();
680 assert!(!status.is_ready);
681 assert!(status.is_alive);
682 assert!(status.is_booting);
683 assert_eq!(status.phase_name, "init");
684 }
685
686 #[test]
687 fn test_progress_tracking() {
688 let fsm = BootStateMachine::with_defaults();
689 fsm.start_boot(RecoveryMode::Normal).unwrap();
690
691 fsm.record_wal_progress(100, 4096);
692 assert_eq!(
693 fsm.metrics().wal_records_replayed.load(Ordering::Relaxed),
694 100
695 );
696 assert_eq!(
697 fsm.metrics().wal_bytes_processed.load(Ordering::Relaxed),
698 4096
699 );
700 }
701
702 #[test]
703 fn test_kubernetes_budgets() {
704 let budgets = BootBudgets::for_kubernetes(600); assert!(budgets.recover_budget >= Duration::from_secs(300));
706 assert!(budgets.total_budget == Duration::from_secs(600));
707 }
708}