Skip to main content

reddb_server/runtime/
lifecycle.rs

1//! Process lifecycle state machine (PLAN.md Phase 1 — Lifecycle Contract).
2//!
3//! Tracks where the runtime is in its boot/serve/shutdown sequence so
4//! that:
5//! * health probes can answer `/health/live`, `/health/ready`,
6//!   `/health/startup` deterministically (live: process responsive,
7//!   ready: accepting queries, startup: K8s-style "still warming up");
8//! * `WriteGate` can reject mutations once shutdown is initiated;
9//! * `POST /admin/shutdown` is idempotent — subsequent calls return
10//!   the same successful state without re-running the flush pipeline;
11//! * orchestrators see a consistent transition pattern regardless of
12//!   how shutdown was triggered (HTTP, SIGTERM, SIGINT).
13//!
14//! The state is a single `AtomicU8` in `RuntimeInner` plus a
15//! `parking_lot::RwLock<ShutdownReport>` for the optional final
16//! report. Phase transitions are monotonic — the runtime cannot move
17//! backwards through them.
18
19use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
20
21use parking_lot::RwLock;
22
23/// Discrete lifecycle phases. Numeric values are monotonic so an
24/// `AtomicU8` `compare_exchange` gives us atomic transitions without
25/// a mutex. Decoded via `Phase::from_u8`.
26#[repr(u8)]
27#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
28pub enum Phase {
29    /// Engine is opening: WAL replay, restore-from-remote, initial
30    /// checkpoint. Reads and writes are not yet served.
31    Starting = 0,
32    /// Engine is fully ready: every public surface accepts traffic.
33    Ready = 1,
34    /// `/admin/drain` was called or `/admin/shutdown` is in flight.
35    /// New writes return 503; existing in-flight may finish.
36    Draining = 2,
37    /// `Engine::graceful_shutdown` is running its final flush +
38    /// checkpoint + optional backup. The runtime is no longer usable
39    /// for writes.
40    ShuttingDown = 3,
41    /// Shutdown completed; the process is safe to exit.
42    Stopped = 4,
43}
44
45impl Phase {
46    pub fn from_u8(v: u8) -> Self {
47        match v {
48            0 => Phase::Starting,
49            1 => Phase::Ready,
50            2 => Phase::Draining,
51            3 => Phase::ShuttingDown,
52            4 => Phase::Stopped,
53            _ => Phase::Stopped,
54        }
55    }
56
57    pub fn as_str(self) -> &'static str {
58        match self {
59            Phase::Starting => "starting",
60            Phase::Ready => "ready",
61            Phase::Draining => "draining",
62            Phase::ShuttingDown => "shutting_down",
63            Phase::Stopped => "stopped",
64        }
65    }
66
67    /// Whether public mutations should be allowed in this phase.
68    /// Replica/read_only checks live in `WriteGate`; this is the
69    /// orthogonal lifecycle check.
70    pub fn accepts_writes(self) -> bool {
71        matches!(self, Phase::Ready)
72    }
73
74    /// Whether the runtime is far enough along in boot to answer
75    /// SQL queries. /health/ready follows this answer.
76    pub fn accepts_queries(self) -> bool {
77        matches!(self, Phase::Ready | Phase::Draining)
78    }
79}
80
81/// Final report produced by a successful graceful shutdown.
82#[derive(Debug, Clone, Default, PartialEq, Eq)]
83pub struct ShutdownReport {
84    pub flushed_wal: bool,
85    pub final_checkpoint: bool,
86    pub backup_uploaded: bool,
87    pub duration_ms: u64,
88    pub started_at_ms: u64,
89    pub completed_at_ms: u64,
90}
91
92/// Lifecycle state wrapper held in `RuntimeInner`.
93pub struct Lifecycle {
94    phase: AtomicU8,
95    /// Wall-clock millis at runtime construction. Used by /health/*
96    /// endpoints to report uptime + cold-start cost.
97    started_at_ms: u64,
98    /// Wall-clock millis when the runtime entered `Ready`. 0 while
99    /// still starting. Read by `/health/ready` to expose the cold-
100    /// start window.
101    ready_at_ms: AtomicU64,
102    /// PLAN.md Phase 9.1 — cold-start phase markers. Each AtomicU64
103    /// is the wall-clock unix-ms when the runtime transitioned into
104    /// the named phase. `0` until the phase fires. Operators tune
105    /// the cold-start budget by reading the deltas through the
106    /// `cold_start_phases()` accessor.
107    restore_started_at_ms: AtomicU64,
108    restore_ready_at_ms: AtomicU64,
109    wal_replay_started_at_ms: AtomicU64,
110    wal_replay_ready_at_ms: AtomicU64,
111    index_warmup_started_at_ms: AtomicU64,
112    index_warmup_ready_at_ms: AtomicU64,
113    /// Reason string when /health/ready returns 503. Empty when
114    /// ready or stopped. Updated under `report` write lock so
115    /// readers of the JSON status see a consistent pair of phase +
116    /// reason.
117    report: RwLock<LifecycleReport>,
118}
119
120/// PLAN.md Phase 9.1 — cold-start phase snapshot. All fields are
121/// wall-clock unix-ms; `0` means the phase hasn't fired yet.
122#[derive(Debug, Default, Clone, Copy)]
123pub struct ColdStartPhases {
124    pub started_at_ms: u64,
125    pub restore_started_at_ms: u64,
126    pub restore_ready_at_ms: u64,
127    pub wal_replay_started_at_ms: u64,
128    pub wal_replay_ready_at_ms: u64,
129    pub index_warmup_started_at_ms: u64,
130    pub index_warmup_ready_at_ms: u64,
131    pub ready_at_ms: u64,
132}
133
134impl ColdStartPhases {
135    /// `(phase_name, duration_ms)` pairs for /metrics. Skips phases
136    /// that haven't fired or haven't completed.
137    pub fn durations_ms(&self) -> Vec<(&'static str, u64)> {
138        let mut out = Vec::with_capacity(4);
139        if self.restore_ready_at_ms >= self.restore_started_at_ms
140            && self.restore_started_at_ms > 0
141            && self.restore_ready_at_ms > 0
142        {
143            out.push((
144                "restore",
145                self.restore_ready_at_ms - self.restore_started_at_ms,
146            ));
147        }
148        if self.wal_replay_ready_at_ms >= self.wal_replay_started_at_ms
149            && self.wal_replay_started_at_ms > 0
150            && self.wal_replay_ready_at_ms > 0
151        {
152            out.push((
153                "wal_replay",
154                self.wal_replay_ready_at_ms - self.wal_replay_started_at_ms,
155            ));
156        }
157        if self.index_warmup_ready_at_ms >= self.index_warmup_started_at_ms
158            && self.index_warmup_started_at_ms > 0
159            && self.index_warmup_ready_at_ms > 0
160        {
161            out.push((
162                "index_warmup",
163                self.index_warmup_ready_at_ms - self.index_warmup_started_at_ms,
164            ));
165        }
166        if self.ready_at_ms >= self.started_at_ms && self.ready_at_ms > 0 {
167            out.push(("total", self.ready_at_ms - self.started_at_ms));
168        }
169        out
170    }
171}
172
173#[derive(Debug, Default, Clone)]
174struct LifecycleReport {
175    not_ready_reason: Option<String>,
176    shutdown: Option<ShutdownReport>,
177}
178
179impl Lifecycle {
180    pub fn new() -> Self {
181        Self {
182            phase: AtomicU8::new(Phase::Starting as u8),
183            started_at_ms: now_ms(),
184            ready_at_ms: AtomicU64::new(0),
185            restore_started_at_ms: AtomicU64::new(0),
186            restore_ready_at_ms: AtomicU64::new(0),
187            wal_replay_started_at_ms: AtomicU64::new(0),
188            wal_replay_ready_at_ms: AtomicU64::new(0),
189            index_warmup_started_at_ms: AtomicU64::new(0),
190            index_warmup_ready_at_ms: AtomicU64::new(0),
191            report: RwLock::new(LifecycleReport::default()),
192        }
193    }
194
195    /// PLAN.md Phase 9.1 — mark a cold-start phase boundary. Boot
196    /// code calls these as it transitions through restore-from-
197    /// remote, WAL replay, and index warmup. Idempotent: only the
198    /// first call sets the timestamp; subsequent calls are no-ops
199    /// so a retried boot doesn't reset the gauge.
200    pub fn mark_restore_started(&self) {
201        let _ = self.restore_started_at_ms.compare_exchange(
202            0,
203            now_ms(),
204            Ordering::AcqRel,
205            Ordering::Acquire,
206        );
207    }
208    pub fn mark_restore_ready(&self) {
209        let _ = self.restore_ready_at_ms.compare_exchange(
210            0,
211            now_ms(),
212            Ordering::AcqRel,
213            Ordering::Acquire,
214        );
215    }
216    pub fn mark_wal_replay_started(&self) {
217        let _ = self.wal_replay_started_at_ms.compare_exchange(
218            0,
219            now_ms(),
220            Ordering::AcqRel,
221            Ordering::Acquire,
222        );
223    }
224    pub fn mark_wal_replay_ready(&self) {
225        let _ = self.wal_replay_ready_at_ms.compare_exchange(
226            0,
227            now_ms(),
228            Ordering::AcqRel,
229            Ordering::Acquire,
230        );
231    }
232    pub fn mark_index_warmup_started(&self) {
233        let _ = self.index_warmup_started_at_ms.compare_exchange(
234            0,
235            now_ms(),
236            Ordering::AcqRel,
237            Ordering::Acquire,
238        );
239    }
240    pub fn mark_index_warmup_ready(&self) {
241        let _ = self.index_warmup_ready_at_ms.compare_exchange(
242            0,
243            now_ms(),
244            Ordering::AcqRel,
245            Ordering::Acquire,
246        );
247    }
248
249    /// PLAN.md Phase 9.1 — backfill phase markers with an explicit
250    /// timestamp. Used when the runtime captures the wall-clock
251    /// before Lifecycle is constructible (e.g. before storage
252    /// open) and wants to replay it into the markers afterwards.
253    /// Idempotent (only sets when current value is 0).
254    pub fn set_restore_started_at_ms(&self, ms: u64) {
255        let _ =
256            self.restore_started_at_ms
257                .compare_exchange(0, ms, Ordering::AcqRel, Ordering::Acquire);
258    }
259    pub fn set_restore_ready_at_ms(&self, ms: u64) {
260        let _ =
261            self.restore_ready_at_ms
262                .compare_exchange(0, ms, Ordering::AcqRel, Ordering::Acquire);
263    }
264    pub fn set_wal_replay_started_at_ms(&self, ms: u64) {
265        let _ = self.wal_replay_started_at_ms.compare_exchange(
266            0,
267            ms,
268            Ordering::AcqRel,
269            Ordering::Acquire,
270        );
271    }
272    pub fn set_wal_replay_ready_at_ms(&self, ms: u64) {
273        let _ = self.wal_replay_ready_at_ms.compare_exchange(
274            0,
275            ms,
276            Ordering::AcqRel,
277            Ordering::Acquire,
278        );
279    }
280
281    /// Snapshot every cold-start marker in one read. Callers compute
282    /// per-phase deltas via `ColdStartPhases::durations_ms()`.
283    pub fn cold_start_phases(&self) -> ColdStartPhases {
284        ColdStartPhases {
285            started_at_ms: self.started_at_ms,
286            restore_started_at_ms: self.restore_started_at_ms.load(Ordering::Acquire),
287            restore_ready_at_ms: self.restore_ready_at_ms.load(Ordering::Acquire),
288            wal_replay_started_at_ms: self.wal_replay_started_at_ms.load(Ordering::Acquire),
289            wal_replay_ready_at_ms: self.wal_replay_ready_at_ms.load(Ordering::Acquire),
290            index_warmup_started_at_ms: self.index_warmup_started_at_ms.load(Ordering::Acquire),
291            index_warmup_ready_at_ms: self.index_warmup_ready_at_ms.load(Ordering::Acquire),
292            ready_at_ms: self.ready_at_ms.load(Ordering::Acquire),
293        }
294    }
295
296    pub fn phase(&self) -> Phase {
297        Phase::from_u8(self.phase.load(Ordering::Acquire))
298    }
299
300    pub fn started_at_ms(&self) -> u64 {
301        self.started_at_ms
302    }
303
304    pub fn ready_at_ms(&self) -> Option<u64> {
305        match self.ready_at_ms.load(Ordering::Acquire) {
306            0 => None,
307            v => Some(v),
308        }
309    }
310
311    pub fn not_ready_reason(&self) -> Option<String> {
312        self.report.read().not_ready_reason.clone()
313    }
314
315    pub fn shutdown_report(&self) -> Option<ShutdownReport> {
316        self.report.read().shutdown.clone()
317    }
318
319    /// Mark a transient "still starting, here's why" reason. Called
320    /// from boot stages so /health/ready 503 carries operator-useful
321    /// context (e.g. "wal_replay", "restore_from_remote",
322    /// "initial_checkpoint").
323    pub fn set_starting_reason(&self, reason: impl Into<String>) {
324        self.report.write().not_ready_reason = Some(reason.into());
325    }
326
327    /// Transition Starting → Ready. Idempotent: a second call leaves
328    /// the existing `ready_at_ms` untouched. Returns true if this
329    /// call effected the transition.
330    pub fn mark_ready(&self) -> bool {
331        let prev = self
332            .phase
333            .compare_exchange(
334                Phase::Starting as u8,
335                Phase::Ready as u8,
336                Ordering::AcqRel,
337                Ordering::Acquire,
338            )
339            .is_ok();
340        if prev {
341            self.ready_at_ms.store(now_ms(), Ordering::Release);
342            self.report.write().not_ready_reason = None;
343        }
344        prev
345    }
346
347    /// Transition Ready → Draining (best effort — already-Stopped
348    /// runtimes stay Stopped). Idempotent.
349    pub fn mark_draining(&self) {
350        let _ = self.phase.compare_exchange(
351            Phase::Ready as u8,
352            Phase::Draining as u8,
353            Ordering::AcqRel,
354            Ordering::Acquire,
355        );
356    }
357
358    /// Transition into ShuttingDown if not already past it. Returns
359    /// true if this call started the shutdown (caller should run the
360    /// actual flush pipeline); false if shutdown was already started
361    /// or finished by someone else (caller should poll for the
362    /// existing report instead).
363    pub fn begin_shutdown(&self) -> bool {
364        loop {
365            let current = self.phase.load(Ordering::Acquire);
366            let p = Phase::from_u8(current);
367            match p {
368                Phase::Starting | Phase::Ready | Phase::Draining => {
369                    if self
370                        .phase
371                        .compare_exchange(
372                            current,
373                            Phase::ShuttingDown as u8,
374                            Ordering::AcqRel,
375                            Ordering::Acquire,
376                        )
377                        .is_ok()
378                    {
379                        return true;
380                    }
381                    // raced with a concurrent transition; loop and re-check.
382                }
383                Phase::ShuttingDown | Phase::Stopped => return false,
384            }
385        }
386    }
387
388    /// Stamp the final report and move to Stopped. Called by
389    /// `Engine::graceful_shutdown` after the flush pipeline finishes.
390    pub fn finish_shutdown(&self, report: ShutdownReport) {
391        self.report.write().shutdown = Some(report);
392        self.phase.store(Phase::Stopped as u8, Ordering::Release);
393    }
394}
395
396fn now_ms() -> u64 {
397    crate::utils::now_unix_millis()
398}