1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
//! Process lifecycle state machine (PLAN.md Phase 1 — Lifecycle Contract).
//!
//! Tracks where the runtime is in its boot/serve/shutdown sequence so
//! that:
//! * health probes can answer `/health/live`, `/health/ready`,
//! `/health/startup` deterministically (live: process responsive,
//! ready: accepting queries, startup: K8s-style "still warming up");
//! * `WriteGate` can reject mutations once shutdown is initiated;
//! * `POST /admin/shutdown` is idempotent — subsequent calls return
//! the same successful state without re-running the flush pipeline;
//! * orchestrators see a consistent transition pattern regardless of
//! how shutdown was triggered (HTTP, SIGTERM, SIGINT).
//!
//! The state is a single `AtomicU8` in `RuntimeInner` plus a
//! `parking_lot::RwLock<ShutdownReport>` for the optional final
//! report. Phase transitions are monotonic — the runtime cannot move
//! backwards through them.
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use parking_lot::RwLock;
/// Discrete lifecycle phases. Numeric values are monotonic so an
/// `AtomicU8` `compare_exchange` gives us atomic transitions without
/// a mutex. Decoded via `Phase::from_u8`.
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Phase {
/// Engine is opening: WAL replay, restore-from-remote, initial
/// checkpoint. Reads and writes are not yet served.
Starting = 0,
/// Engine is fully ready: every public surface accepts traffic.
Ready = 1,
/// `/admin/drain` was called or `/admin/shutdown` is in flight.
/// New writes return 503; existing in-flight may finish.
Draining = 2,
/// `Engine::graceful_shutdown` is running its final flush +
/// checkpoint + optional backup. The runtime is no longer usable
/// for writes.
ShuttingDown = 3,
/// Shutdown completed; the process is safe to exit.
Stopped = 4,
}
impl Phase {
pub fn from_u8(v: u8) -> Self {
match v {
0 => Phase::Starting,
1 => Phase::Ready,
2 => Phase::Draining,
3 => Phase::ShuttingDown,
4 => Phase::Stopped,
_ => Phase::Stopped,
}
}
pub fn as_str(self) -> &'static str {
match self {
Phase::Starting => "starting",
Phase::Ready => "ready",
Phase::Draining => "draining",
Phase::ShuttingDown => "shutting_down",
Phase::Stopped => "stopped",
}
}
/// Whether public mutations should be allowed in this phase.
/// Replica/read_only checks live in `WriteGate`; this is the
/// orthogonal lifecycle check.
pub fn accepts_writes(self) -> bool {
matches!(self, Phase::Ready)
}
/// Whether the runtime is far enough along in boot to answer
/// SQL queries. /health/ready follows this answer.
pub fn accepts_queries(self) -> bool {
matches!(self, Phase::Ready | Phase::Draining)
}
}
/// Final report produced by a successful graceful shutdown.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ShutdownReport {
pub flushed_wal: bool,
pub final_checkpoint: bool,
pub backup_uploaded: bool,
pub duration_ms: u64,
pub started_at_ms: u64,
pub completed_at_ms: u64,
}
/// Lifecycle state wrapper held in `RuntimeInner`.
pub struct Lifecycle {
phase: AtomicU8,
/// Wall-clock millis at runtime construction. Used by /health/*
/// endpoints to report uptime + cold-start cost.
started_at_ms: u64,
/// Wall-clock millis when the runtime entered `Ready`. 0 while
/// still starting. Read by `/health/ready` to expose the cold-
/// start window.
ready_at_ms: AtomicU64,
/// PLAN.md Phase 9.1 — cold-start phase markers. Each AtomicU64
/// is the wall-clock unix-ms when the runtime transitioned into
/// the named phase. `0` until the phase fires. Operators tune
/// the cold-start budget by reading the deltas through the
/// `cold_start_phases()` accessor.
restore_started_at_ms: AtomicU64,
restore_ready_at_ms: AtomicU64,
wal_replay_started_at_ms: AtomicU64,
wal_replay_ready_at_ms: AtomicU64,
index_warmup_started_at_ms: AtomicU64,
index_warmup_ready_at_ms: AtomicU64,
/// Reason string when /health/ready returns 503. Empty when
/// ready or stopped. Updated under `report` write lock so
/// readers of the JSON status see a consistent pair of phase +
/// reason.
report: RwLock<LifecycleReport>,
}
/// PLAN.md Phase 9.1 — cold-start phase snapshot. All fields are
/// wall-clock unix-ms; `0` means the phase hasn't fired yet.
#[derive(Debug, Default, Clone, Copy)]
pub struct ColdStartPhases {
pub started_at_ms: u64,
pub restore_started_at_ms: u64,
pub restore_ready_at_ms: u64,
pub wal_replay_started_at_ms: u64,
pub wal_replay_ready_at_ms: u64,
pub index_warmup_started_at_ms: u64,
pub index_warmup_ready_at_ms: u64,
pub ready_at_ms: u64,
}
impl ColdStartPhases {
/// `(phase_name, duration_ms)` pairs for /metrics. Skips phases
/// that haven't fired or haven't completed.
pub fn durations_ms(&self) -> Vec<(&'static str, u64)> {
let mut out = Vec::with_capacity(4);
if self.restore_ready_at_ms >= self.restore_started_at_ms
&& self.restore_started_at_ms > 0
&& self.restore_ready_at_ms > 0
{
out.push((
"restore",
self.restore_ready_at_ms - self.restore_started_at_ms,
));
}
if self.wal_replay_ready_at_ms >= self.wal_replay_started_at_ms
&& self.wal_replay_started_at_ms > 0
&& self.wal_replay_ready_at_ms > 0
{
out.push((
"wal_replay",
self.wal_replay_ready_at_ms - self.wal_replay_started_at_ms,
));
}
if self.index_warmup_ready_at_ms >= self.index_warmup_started_at_ms
&& self.index_warmup_started_at_ms > 0
&& self.index_warmup_ready_at_ms > 0
{
out.push((
"index_warmup",
self.index_warmup_ready_at_ms - self.index_warmup_started_at_ms,
));
}
if self.ready_at_ms >= self.started_at_ms && self.ready_at_ms > 0 {
out.push(("total", self.ready_at_ms - self.started_at_ms));
}
out
}
}
#[derive(Debug, Default, Clone)]
struct LifecycleReport {
not_ready_reason: Option<String>,
shutdown: Option<ShutdownReport>,
}
impl Lifecycle {
pub fn new() -> Self {
Self {
phase: AtomicU8::new(Phase::Starting as u8),
started_at_ms: now_ms(),
ready_at_ms: AtomicU64::new(0),
restore_started_at_ms: AtomicU64::new(0),
restore_ready_at_ms: AtomicU64::new(0),
wal_replay_started_at_ms: AtomicU64::new(0),
wal_replay_ready_at_ms: AtomicU64::new(0),
index_warmup_started_at_ms: AtomicU64::new(0),
index_warmup_ready_at_ms: AtomicU64::new(0),
report: RwLock::new(LifecycleReport::default()),
}
}
/// PLAN.md Phase 9.1 — mark a cold-start phase boundary. Boot
/// code calls these as it transitions through restore-from-
/// remote, WAL replay, and index warmup. Idempotent: only the
/// first call sets the timestamp; subsequent calls are no-ops
/// so a retried boot doesn't reset the gauge.
pub fn mark_restore_started(&self) {
let _ = self.restore_started_at_ms.compare_exchange(
0,
now_ms(),
Ordering::AcqRel,
Ordering::Acquire,
);
}
pub fn mark_restore_ready(&self) {
let _ = self.restore_ready_at_ms.compare_exchange(
0,
now_ms(),
Ordering::AcqRel,
Ordering::Acquire,
);
}
pub fn mark_wal_replay_started(&self) {
let _ = self.wal_replay_started_at_ms.compare_exchange(
0,
now_ms(),
Ordering::AcqRel,
Ordering::Acquire,
);
}
pub fn mark_wal_replay_ready(&self) {
let _ = self.wal_replay_ready_at_ms.compare_exchange(
0,
now_ms(),
Ordering::AcqRel,
Ordering::Acquire,
);
}
pub fn mark_index_warmup_started(&self) {
let _ = self.index_warmup_started_at_ms.compare_exchange(
0,
now_ms(),
Ordering::AcqRel,
Ordering::Acquire,
);
}
pub fn mark_index_warmup_ready(&self) {
let _ = self.index_warmup_ready_at_ms.compare_exchange(
0,
now_ms(),
Ordering::AcqRel,
Ordering::Acquire,
);
}
/// PLAN.md Phase 9.1 — backfill phase markers with an explicit
/// timestamp. Used when the runtime captures the wall-clock
/// before Lifecycle is constructible (e.g. before storage
/// open) and wants to replay it into the markers afterwards.
/// Idempotent (only sets when current value is 0).
pub fn set_restore_started_at_ms(&self, ms: u64) {
let _ =
self.restore_started_at_ms
.compare_exchange(0, ms, Ordering::AcqRel, Ordering::Acquire);
}
pub fn set_restore_ready_at_ms(&self, ms: u64) {
let _ =
self.restore_ready_at_ms
.compare_exchange(0, ms, Ordering::AcqRel, Ordering::Acquire);
}
pub fn set_wal_replay_started_at_ms(&self, ms: u64) {
let _ = self.wal_replay_started_at_ms.compare_exchange(
0,
ms,
Ordering::AcqRel,
Ordering::Acquire,
);
}
pub fn set_wal_replay_ready_at_ms(&self, ms: u64) {
let _ = self.wal_replay_ready_at_ms.compare_exchange(
0,
ms,
Ordering::AcqRel,
Ordering::Acquire,
);
}
/// Snapshot every cold-start marker in one read. Callers compute
/// per-phase deltas via `ColdStartPhases::durations_ms()`.
pub fn cold_start_phases(&self) -> ColdStartPhases {
ColdStartPhases {
started_at_ms: self.started_at_ms,
restore_started_at_ms: self.restore_started_at_ms.load(Ordering::Acquire),
restore_ready_at_ms: self.restore_ready_at_ms.load(Ordering::Acquire),
wal_replay_started_at_ms: self.wal_replay_started_at_ms.load(Ordering::Acquire),
wal_replay_ready_at_ms: self.wal_replay_ready_at_ms.load(Ordering::Acquire),
index_warmup_started_at_ms: self.index_warmup_started_at_ms.load(Ordering::Acquire),
index_warmup_ready_at_ms: self.index_warmup_ready_at_ms.load(Ordering::Acquire),
ready_at_ms: self.ready_at_ms.load(Ordering::Acquire),
}
}
pub fn phase(&self) -> Phase {
Phase::from_u8(self.phase.load(Ordering::Acquire))
}
pub fn started_at_ms(&self) -> u64 {
self.started_at_ms
}
pub fn ready_at_ms(&self) -> Option<u64> {
match self.ready_at_ms.load(Ordering::Acquire) {
0 => None,
v => Some(v),
}
}
pub fn not_ready_reason(&self) -> Option<String> {
self.report.read().not_ready_reason.clone()
}
pub fn shutdown_report(&self) -> Option<ShutdownReport> {
self.report.read().shutdown.clone()
}
/// Mark a transient "still starting, here's why" reason. Called
/// from boot stages so /health/ready 503 carries operator-useful
/// context (e.g. "wal_replay", "restore_from_remote",
/// "initial_checkpoint").
pub fn set_starting_reason(&self, reason: impl Into<String>) {
self.report.write().not_ready_reason = Some(reason.into());
}
/// Transition Starting → Ready. Idempotent: a second call leaves
/// the existing `ready_at_ms` untouched. Returns true if this
/// call effected the transition.
pub fn mark_ready(&self) -> bool {
let prev = self
.phase
.compare_exchange(
Phase::Starting as u8,
Phase::Ready as u8,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok();
if prev {
self.ready_at_ms.store(now_ms(), Ordering::Release);
self.report.write().not_ready_reason = None;
}
prev
}
/// Transition Ready → Draining (best effort — already-Stopped
/// runtimes stay Stopped). Idempotent.
pub fn mark_draining(&self) {
let _ = self.phase.compare_exchange(
Phase::Ready as u8,
Phase::Draining as u8,
Ordering::AcqRel,
Ordering::Acquire,
);
}
/// Transition into ShuttingDown if not already past it. Returns
/// true if this call started the shutdown (caller should run the
/// actual flush pipeline); false if shutdown was already started
/// or finished by someone else (caller should poll for the
/// existing report instead).
pub fn begin_shutdown(&self) -> bool {
loop {
let current = self.phase.load(Ordering::Acquire);
let p = Phase::from_u8(current);
match p {
Phase::Starting | Phase::Ready | Phase::Draining => {
if self
.phase
.compare_exchange(
current,
Phase::ShuttingDown as u8,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
return true;
}
// raced with a concurrent transition; loop and re-check.
}
Phase::ShuttingDown | Phase::Stopped => return false,
}
}
}
/// Stamp the final report and move to Stopped. Called by
/// `Engine::graceful_shutdown` after the flush pipeline finishes.
pub fn finish_shutdown(&self, report: ShutdownReport) {
self.report.write().shutdown = Some(report);
self.phase.store(Phase::Stopped as u8, Ordering::Release);
}
}
fn now_ms() -> u64 {
crate::utils::now_unix_millis()
}