1use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use parking_lot::RwLock;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50#[repr(u8)]
51pub enum BootPhase {
52 Uninitialized = 0,
54 Init = 1,
56 Migrate = 2,
58 Recover = 3,
60 Warmup = 4,
62 Ready = 5,
64 ReadOnlyRecovery = 6,
66 ForceRecovery = 7,
68 ShuttingDown = 8,
70 Failed = 9,
72}
73
74impl BootPhase {
75 pub fn name(&self) -> &'static str {
77 match self {
78 BootPhase::Uninitialized => "uninitialized",
79 BootPhase::Init => "init",
80 BootPhase::Migrate => "migrate",
81 BootPhase::Recover => "recover",
82 BootPhase::Warmup => "warmup",
83 BootPhase::Ready => "ready",
84 BootPhase::ReadOnlyRecovery => "readonly_recovery",
85 BootPhase::ForceRecovery => "force_recovery",
86 BootPhase::ShuttingDown => "shutting_down",
87 BootPhase::Failed => "failed",
88 }
89 }
90
91 pub fn is_ready(&self) -> bool {
93 matches!(self, BootPhase::Ready)
94 }
95
96 pub fn is_alive(&self) -> bool {
98 !matches!(self, BootPhase::Failed)
99 }
100
101 pub fn is_booting(&self) -> bool {
103 matches!(
104 self,
105 BootPhase::Init
106 | BootPhase::Migrate
107 | BootPhase::Recover
108 | BootPhase::Warmup
109 | BootPhase::ReadOnlyRecovery
110 | BootPhase::ForceRecovery
111 )
112 }
113}
114
115#[derive(Debug, Clone)]
117pub struct PhaseProgress {
118 pub percent: u8,
120 pub message: String,
122 pub items_processed: u64,
124 pub items_total: u64,
126 pub bytes_processed: u64,
128 pub bytes_total: u64,
130 pub elapsed: Duration,
132}
133
134impl Default for PhaseProgress {
135 fn default() -> Self {
136 Self {
137 percent: 0,
138 message: String::new(),
139 items_processed: 0,
140 items_total: 0,
141 bytes_processed: 0,
142 bytes_total: 0,
143 elapsed: Duration::ZERO,
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct BootBudgets {
151 pub init_budget: Duration,
153 pub migrate_budget: Duration,
155 pub recover_budget: Duration,
157 pub warmup_budget: Duration,
159 pub total_budget: Duration,
161}
162
163impl Default for BootBudgets {
164 fn default() -> Self {
165 Self {
166 init_budget: Duration::from_secs(30),
167 migrate_budget: Duration::from_secs(300), recover_budget: Duration::from_secs(1800), warmup_budget: Duration::from_secs(300), total_budget: Duration::from_secs(3600), }
172 }
173}
174
175impl BootBudgets {
176 pub fn for_kubernetes(startup_probe_total_seconds: u64) -> Self {
181 let total = Duration::from_secs(startup_probe_total_seconds);
182 Self {
183 init_budget: Duration::from_secs(startup_probe_total_seconds / 20),
184 migrate_budget: Duration::from_secs(startup_probe_total_seconds / 5),
185 recover_budget: Duration::from_secs(startup_probe_total_seconds * 3 / 5),
186 warmup_budget: Duration::from_secs(startup_probe_total_seconds / 10),
187 total_budget: total,
188 }
189 }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum RecoveryMode {
195 Normal,
197 ReadOnly,
199 Force,
201}
202
203#[derive(Debug, Clone, Default)]
205pub struct PreloadHints {
206 pub page_ids: Vec<u64>,
208 pub indexes: Vec<String>,
210 pub working_set_bytes: u64,
212}
213
214pub struct BootStateMachine {
216 phase: RwLock<BootPhase>,
218 phase_start: RwLock<Instant>,
220 boot_start: RwLock<Option<Instant>>,
222 progress: RwLock<PhaseProgress>,
224 budgets: BootBudgets,
226 recovery_mode: RwLock<RecoveryMode>,
228 failure_reason: RwLock<Option<String>>,
230 preload_hints: RwLock<PreloadHints>,
232 metrics: BootMetrics,
234}
235
236pub struct BootMetrics {
238 pub wal_records_replayed: AtomicU64,
240 pub wal_bytes_processed: AtomicU64,
242 pub pages_recovered: AtomicU64,
244 pub txns_rolled_back: AtomicU64,
246 pub checkpoint_bytes_scanned: AtomicU64,
248 pub migration_steps_completed: AtomicU64,
250 pub warmup_hit_rate_permille: AtomicU64,
252}
253
254impl Default for BootMetrics {
255 fn default() -> Self {
256 Self {
257 wal_records_replayed: AtomicU64::new(0),
258 wal_bytes_processed: AtomicU64::new(0),
259 pages_recovered: AtomicU64::new(0),
260 txns_rolled_back: AtomicU64::new(0),
261 checkpoint_bytes_scanned: AtomicU64::new(0),
262 migration_steps_completed: AtomicU64::new(0),
263 warmup_hit_rate_permille: AtomicU64::new(0),
264 }
265 }
266}
267
268#[derive(Debug, Clone)]
270pub struct BootError {
271 pub phase: BootPhase,
272 pub message: String,
273 pub recoverable: bool,
274}
275
276impl std::fmt::Display for BootError {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 write!(
279 f,
280 "Boot error in phase {}: {} (recoverable: {})",
281 self.phase.name(),
282 self.message,
283 self.recoverable
284 )
285 }
286}
287
288impl std::error::Error for BootError {}
289
290impl BootStateMachine {
291 pub fn new(budgets: BootBudgets) -> Self {
293 Self {
294 phase: RwLock::new(BootPhase::Uninitialized),
295 phase_start: RwLock::new(Instant::now()),
296 boot_start: RwLock::new(None),
297 progress: RwLock::new(PhaseProgress::default()),
298 budgets,
299 recovery_mode: RwLock::new(RecoveryMode::Normal),
300 failure_reason: RwLock::new(None),
301 preload_hints: RwLock::new(PreloadHints::default()),
302 metrics: BootMetrics::default(),
303 }
304 }
305
306 pub fn with_defaults() -> Self {
308 Self::new(BootBudgets::default())
309 }
310
311 pub fn current_phase(&self) -> BootPhase {
313 *self.phase.read()
314 }
315
316 pub fn current_progress(&self) -> PhaseProgress {
318 let mut progress = self.progress.read().clone();
319 progress.elapsed = self.phase_start.read().elapsed();
320 progress
321 }
322
323 pub fn is_ready(&self) -> bool {
325 self.current_phase().is_ready()
326 }
327
328 pub fn is_alive(&self) -> bool {
330 self.current_phase().is_alive()
331 }
332
333 pub fn remaining_budget(&self) -> Duration {
335 let phase = *self.phase.read();
336 let elapsed = self.phase_start.read().elapsed();
337 let budget = match phase {
338 BootPhase::Init => self.budgets.init_budget,
339 BootPhase::Migrate => self.budgets.migrate_budget,
340 BootPhase::Recover | BootPhase::ReadOnlyRecovery | BootPhase::ForceRecovery => {
341 self.budgets.recover_budget
342 }
343 BootPhase::Warmup => self.budgets.warmup_budget,
344 _ => Duration::ZERO,
345 };
346 budget.saturating_sub(elapsed)
347 }
348
349 pub fn total_elapsed(&self) -> Duration {
351 self.boot_start
352 .read()
353 .map(|t| t.elapsed())
354 .unwrap_or(Duration::ZERO)
355 }
356
357 pub fn start_boot(&self, recovery_mode: RecoveryMode) -> Result<(), BootError> {
359 let mut phase = self.phase.write();
360 if *phase != BootPhase::Uninitialized {
361 return Err(BootError {
362 phase: *phase,
363 message: "Boot already started".to_string(),
364 recoverable: false,
365 });
366 }
367
368 *self.boot_start.write() = Some(Instant::now());
369 *self.recovery_mode.write() = recovery_mode;
370 *phase = BootPhase::Init;
371 *self.phase_start.write() = Instant::now();
372 *self.progress.write() = PhaseProgress {
373 message: "Initializing core subsystems".to_string(),
374 ..Default::default()
375 };
376
377 Ok(())
378 }
379
380 pub fn transition_to(&self, next_phase: BootPhase) -> Result<(), BootError> {
382 let mut phase = self.phase.write();
383 let current = *phase;
384
385 let valid = match (current, next_phase) {
387 (BootPhase::Uninitialized, BootPhase::Init) => true,
388 (BootPhase::Init, BootPhase::Migrate) => true,
389 (BootPhase::Init, BootPhase::Failed) => true,
390 (BootPhase::Migrate, BootPhase::Recover) => true,
391 (BootPhase::Migrate, BootPhase::ReadOnlyRecovery) => true,
392 (BootPhase::Migrate, BootPhase::ForceRecovery) => true,
393 (BootPhase::Migrate, BootPhase::Failed) => true,
394 (BootPhase::Recover, BootPhase::Warmup) => true,
395 (BootPhase::Recover, BootPhase::Ready) => true, (BootPhase::Recover, BootPhase::Failed) => true,
397 (BootPhase::ReadOnlyRecovery, BootPhase::Ready) => true,
398 (BootPhase::ReadOnlyRecovery, BootPhase::Failed) => true,
399 (BootPhase::ForceRecovery, BootPhase::Warmup) => true,
400 (BootPhase::ForceRecovery, BootPhase::Ready) => true,
401 (BootPhase::ForceRecovery, BootPhase::Failed) => true,
402 (BootPhase::Warmup, BootPhase::Ready) => true,
403 (BootPhase::Warmup, BootPhase::Failed) => true,
404 (BootPhase::Ready, BootPhase::ShuttingDown) => true,
405 (_, BootPhase::Failed) => true, _ => false,
407 };
408
409 if !valid {
410 return Err(BootError {
411 phase: current,
412 message: format!(
413 "Invalid transition: {} -> {}",
414 current.name(),
415 next_phase.name()
416 ),
417 recoverable: false,
418 });
419 }
420
421 if self.remaining_budget() == Duration::ZERO && current.is_booting() {
423 *phase = BootPhase::Failed;
424 *self.failure_reason.write() = Some(format!(
425 "Budget exceeded in phase {}",
426 current.name()
427 ));
428 return Err(BootError {
429 phase: current,
430 message: "Phase budget exceeded".to_string(),
431 recoverable: false,
432 });
433 }
434
435 *phase = next_phase;
436 *self.phase_start.write() = Instant::now();
437 *self.progress.write() = PhaseProgress::default();
438
439 Ok(())
440 }
441
442 pub fn update_progress(&self, progress: PhaseProgress) {
444 *self.progress.write() = progress;
445 }
446
447 pub fn fail(&self, reason: &str) {
449 let current = *self.phase.read();
450 *self.phase.write() = BootPhase::Failed;
451 *self.failure_reason.write() = Some(format!(
452 "Failed in {}: {}",
453 current.name(),
454 reason
455 ));
456 }
457
458 pub fn failure_reason(&self) -> Option<String> {
460 self.failure_reason.read().clone()
461 }
462
463 pub fn set_preload_hints(&self, hints: PreloadHints) {
465 *self.preload_hints.write() = hints;
466 }
467
468 pub fn preload_hints(&self) -> PreloadHints {
470 self.preload_hints.read().clone()
471 }
472
473 pub fn metrics(&self) -> &BootMetrics {
475 &self.metrics
476 }
477
478 pub fn record_wal_progress(&self, records: u64, bytes: u64) {
480 self.metrics.wal_records_replayed.fetch_add(records, Ordering::Relaxed);
481 self.metrics.wal_bytes_processed.fetch_add(bytes, Ordering::Relaxed);
482 }
483
484 pub fn record_page_recovered(&self, count: u64) {
486 self.metrics.pages_recovered.fetch_add(count, Ordering::Relaxed);
487 }
488
489 pub fn record_txn_rollback(&self, count: u64) {
491 self.metrics.txns_rolled_back.fetch_add(count, Ordering::Relaxed);
492 }
493
494 pub fn health_status(&self) -> HealthStatus {
496 let phase = self.current_phase();
497 let progress = self.current_progress();
498
499 HealthStatus {
500 phase,
501 phase_name: phase.name().to_string(),
502 is_ready: phase.is_ready(),
503 is_alive: phase.is_alive(),
504 is_booting: phase.is_booting(),
505 progress_percent: progress.percent,
506 progress_message: progress.message,
507 phase_elapsed_ms: progress.elapsed.as_millis() as u64,
508 total_elapsed_ms: self.total_elapsed().as_millis() as u64,
509 remaining_budget_ms: self.remaining_budget().as_millis() as u64,
510 failure_reason: self.failure_reason(),
511 wal_records_replayed: self.metrics.wal_records_replayed.load(Ordering::Relaxed),
512 wal_bytes_processed: self.metrics.wal_bytes_processed.load(Ordering::Relaxed),
513 }
514 }
515}
516
517#[derive(Debug, Clone)]
519pub struct HealthStatus {
520 pub phase: BootPhase,
521 pub phase_name: String,
522 pub is_ready: bool,
523 pub is_alive: bool,
524 pub is_booting: bool,
525 pub progress_percent: u8,
526 pub progress_message: String,
527 pub phase_elapsed_ms: u64,
528 pub total_elapsed_ms: u64,
529 pub remaining_budget_ms: u64,
530 pub failure_reason: Option<String>,
531 pub wal_records_replayed: u64,
532 pub wal_bytes_processed: u64,
533}
534
535impl HealthStatus {
536 pub fn to_json(&self) -> String {
538 format!(
539 r#"{{"phase":"{}","is_ready":{},"is_alive":{},"is_booting":{},"progress_percent":{},"progress_message":"{}","phase_elapsed_ms":{},"total_elapsed_ms":{},"remaining_budget_ms":{},"failure_reason":{},"wal_records_replayed":{},"wal_bytes_processed":{}}}"#,
540 self.phase_name,
541 self.is_ready,
542 self.is_alive,
543 self.is_booting,
544 self.progress_percent,
545 self.progress_message.replace('"', "\\\""),
546 self.phase_elapsed_ms,
547 self.total_elapsed_ms,
548 self.remaining_budget_ms,
549 self.failure_reason.as_ref().map(|s| format!("\"{}\"", s.replace('"', "\\\""))).unwrap_or_else(|| "null".to_string()),
550 self.wal_records_replayed,
551 self.wal_bytes_processed,
552 )
553 }
554}
555
556pub struct BootOrchestrator {
558 fsm: Arc<BootStateMachine>,
559}
560
561impl BootOrchestrator {
562 pub fn new(budgets: BootBudgets) -> Self {
564 Self {
565 fsm: Arc::new(BootStateMachine::new(budgets)),
566 }
567 }
568
569 pub fn fsm(&self) -> Arc<BootStateMachine> {
571 self.fsm.clone()
572 }
573
574 pub fn run_boot<I, M, R, W>(
576 &self,
577 recovery_mode: RecoveryMode,
578 init_fn: I,
579 migrate_fn: M,
580 recover_fn: R,
581 warmup_fn: W,
582 ) -> Result<(), BootError>
583 where
584 I: FnOnce(&BootStateMachine) -> Result<(), BootError>,
585 M: FnOnce(&BootStateMachine) -> Result<(), BootError>,
586 R: FnOnce(&BootStateMachine) -> Result<PreloadHints, BootError>,
587 W: FnOnce(&BootStateMachine, PreloadHints) -> Result<(), BootError>,
588 {
589 self.fsm.start_boot(recovery_mode)?;
591
592 init_fn(&self.fsm)?;
594 self.fsm.transition_to(BootPhase::Migrate)?;
595
596 migrate_fn(&self.fsm)?;
598 let next_phase = match recovery_mode {
599 RecoveryMode::Normal => BootPhase::Recover,
600 RecoveryMode::ReadOnly => BootPhase::ReadOnlyRecovery,
601 RecoveryMode::Force => BootPhase::ForceRecovery,
602 };
603 self.fsm.transition_to(next_phase)?;
604
605 let hints = recover_fn(&self.fsm)?;
607 self.fsm.set_preload_hints(hints.clone());
608
609 if hints.working_set_bytes > 0 || !hints.indexes.is_empty() {
611 self.fsm.transition_to(BootPhase::Warmup)?;
612 warmup_fn(&self.fsm, hints)?;
613 }
614
615 self.fsm.transition_to(BootPhase::Ready)?;
617
618 Ok(())
619 }
620
621 pub fn shutdown(&self) -> Result<(), BootError> {
623 self.fsm.transition_to(BootPhase::ShuttingDown)
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn test_boot_fsm_transitions() {
633 let fsm = BootStateMachine::with_defaults();
634
635 assert!(fsm.start_boot(RecoveryMode::Normal).is_ok());
637 assert_eq!(fsm.current_phase(), BootPhase::Init);
638
639 assert!(fsm.transition_to(BootPhase::Migrate).is_ok());
641 assert!(fsm.transition_to(BootPhase::Recover).is_ok());
642 assert!(fsm.transition_to(BootPhase::Warmup).is_ok());
643 assert!(fsm.transition_to(BootPhase::Ready).is_ok());
644
645 assert!(fsm.is_ready());
646 assert!(fsm.is_alive());
647 }
648
649 #[test]
650 fn test_invalid_transition() {
651 let fsm = BootStateMachine::with_defaults();
652 fsm.start_boot(RecoveryMode::Normal).unwrap();
653
654 assert!(fsm.transition_to(BootPhase::Ready).is_err());
656 }
657
658 #[test]
659 fn test_health_status() {
660 let fsm = BootStateMachine::with_defaults();
661 fsm.start_boot(RecoveryMode::Normal).unwrap();
662
663 let status = fsm.health_status();
664 assert!(!status.is_ready);
665 assert!(status.is_alive);
666 assert!(status.is_booting);
667 assert_eq!(status.phase_name, "init");
668 }
669
670 #[test]
671 fn test_progress_tracking() {
672 let fsm = BootStateMachine::with_defaults();
673 fsm.start_boot(RecoveryMode::Normal).unwrap();
674
675 fsm.record_wal_progress(100, 4096);
676 assert_eq!(fsm.metrics().wal_records_replayed.load(Ordering::Relaxed), 100);
677 assert_eq!(fsm.metrics().wal_bytes_processed.load(Ordering::Relaxed), 4096);
678 }
679
680 #[test]
681 fn test_kubernetes_budgets() {
682 let budgets = BootBudgets::for_kubernetes(600); assert!(budgets.recover_budget >= Duration::from_secs(300));
684 assert!(budgets.total_budget == Duration::from_secs(600));
685 }
686}