1use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
20
21use parking_lot::RwLock;
22
23#[repr(u8)]
27#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
28pub enum Phase {
29 Starting = 0,
32 Ready = 1,
34 Draining = 2,
37 ShuttingDown = 3,
41 Stopped = 4,
43}
44
45impl Phase {
46 pub fn from_u8(v: u8) -> Self {
47 match v {
48 0 => Phase::Starting,
49 1 => Phase::Ready,
50 2 => Phase::Draining,
51 3 => Phase::ShuttingDown,
52 4 => Phase::Stopped,
53 _ => Phase::Stopped,
54 }
55 }
56
57 pub fn as_str(self) -> &'static str {
58 match self {
59 Phase::Starting => "starting",
60 Phase::Ready => "ready",
61 Phase::Draining => "draining",
62 Phase::ShuttingDown => "shutting_down",
63 Phase::Stopped => "stopped",
64 }
65 }
66
67 pub fn accepts_writes(self) -> bool {
71 matches!(self, Phase::Ready)
72 }
73
74 pub fn accepts_queries(self) -> bool {
77 matches!(self, Phase::Ready | Phase::Draining)
78 }
79}
80
81#[derive(Debug, Clone, Default, PartialEq, Eq)]
83pub struct ShutdownReport {
84 pub flushed_wal: bool,
85 pub final_checkpoint: bool,
86 pub backup_uploaded: bool,
87 pub duration_ms: u64,
88 pub started_at_ms: u64,
89 pub completed_at_ms: u64,
90}
91
92pub struct Lifecycle {
94 phase: AtomicU8,
95 started_at_ms: u64,
98 ready_at_ms: AtomicU64,
102 restore_started_at_ms: AtomicU64,
108 restore_ready_at_ms: AtomicU64,
109 wal_replay_started_at_ms: AtomicU64,
110 wal_replay_ready_at_ms: AtomicU64,
111 index_warmup_started_at_ms: AtomicU64,
112 index_warmup_ready_at_ms: AtomicU64,
113 report: RwLock<LifecycleReport>,
118}
119
120#[derive(Debug, Default, Clone, Copy)]
123pub struct ColdStartPhases {
124 pub started_at_ms: u64,
125 pub restore_started_at_ms: u64,
126 pub restore_ready_at_ms: u64,
127 pub wal_replay_started_at_ms: u64,
128 pub wal_replay_ready_at_ms: u64,
129 pub index_warmup_started_at_ms: u64,
130 pub index_warmup_ready_at_ms: u64,
131 pub ready_at_ms: u64,
132}
133
134impl ColdStartPhases {
135 pub fn durations_ms(&self) -> Vec<(&'static str, u64)> {
138 let mut out = Vec::with_capacity(4);
139 if self.restore_ready_at_ms >= self.restore_started_at_ms
140 && self.restore_started_at_ms > 0
141 && self.restore_ready_at_ms > 0
142 {
143 out.push((
144 "restore",
145 self.restore_ready_at_ms - self.restore_started_at_ms,
146 ));
147 }
148 if self.wal_replay_ready_at_ms >= self.wal_replay_started_at_ms
149 && self.wal_replay_started_at_ms > 0
150 && self.wal_replay_ready_at_ms > 0
151 {
152 out.push((
153 "wal_replay",
154 self.wal_replay_ready_at_ms - self.wal_replay_started_at_ms,
155 ));
156 }
157 if self.index_warmup_ready_at_ms >= self.index_warmup_started_at_ms
158 && self.index_warmup_started_at_ms > 0
159 && self.index_warmup_ready_at_ms > 0
160 {
161 out.push((
162 "index_warmup",
163 self.index_warmup_ready_at_ms - self.index_warmup_started_at_ms,
164 ));
165 }
166 if self.ready_at_ms >= self.started_at_ms && self.ready_at_ms > 0 {
167 out.push(("total", self.ready_at_ms - self.started_at_ms));
168 }
169 out
170 }
171}
172
173#[derive(Debug, Default, Clone)]
174struct LifecycleReport {
175 not_ready_reason: Option<String>,
176 shutdown: Option<ShutdownReport>,
177}
178
179impl Lifecycle {
180 pub fn new() -> Self {
181 Self {
182 phase: AtomicU8::new(Phase::Starting as u8),
183 started_at_ms: now_ms(),
184 ready_at_ms: AtomicU64::new(0),
185 restore_started_at_ms: AtomicU64::new(0),
186 restore_ready_at_ms: AtomicU64::new(0),
187 wal_replay_started_at_ms: AtomicU64::new(0),
188 wal_replay_ready_at_ms: AtomicU64::new(0),
189 index_warmup_started_at_ms: AtomicU64::new(0),
190 index_warmup_ready_at_ms: AtomicU64::new(0),
191 report: RwLock::new(LifecycleReport::default()),
192 }
193 }
194
195 pub fn mark_restore_started(&self) {
201 let _ = self.restore_started_at_ms.compare_exchange(
202 0,
203 now_ms(),
204 Ordering::AcqRel,
205 Ordering::Acquire,
206 );
207 }
208 pub fn mark_restore_ready(&self) {
209 let _ = self.restore_ready_at_ms.compare_exchange(
210 0,
211 now_ms(),
212 Ordering::AcqRel,
213 Ordering::Acquire,
214 );
215 }
216 pub fn mark_wal_replay_started(&self) {
217 let _ = self.wal_replay_started_at_ms.compare_exchange(
218 0,
219 now_ms(),
220 Ordering::AcqRel,
221 Ordering::Acquire,
222 );
223 }
224 pub fn mark_wal_replay_ready(&self) {
225 let _ = self.wal_replay_ready_at_ms.compare_exchange(
226 0,
227 now_ms(),
228 Ordering::AcqRel,
229 Ordering::Acquire,
230 );
231 }
232 pub fn mark_index_warmup_started(&self) {
233 let _ = self.index_warmup_started_at_ms.compare_exchange(
234 0,
235 now_ms(),
236 Ordering::AcqRel,
237 Ordering::Acquire,
238 );
239 }
240 pub fn mark_index_warmup_ready(&self) {
241 let _ = self.index_warmup_ready_at_ms.compare_exchange(
242 0,
243 now_ms(),
244 Ordering::AcqRel,
245 Ordering::Acquire,
246 );
247 }
248
249 pub fn set_restore_started_at_ms(&self, ms: u64) {
255 let _ =
256 self.restore_started_at_ms
257 .compare_exchange(0, ms, Ordering::AcqRel, Ordering::Acquire);
258 }
259 pub fn set_restore_ready_at_ms(&self, ms: u64) {
260 let _ =
261 self.restore_ready_at_ms
262 .compare_exchange(0, ms, Ordering::AcqRel, Ordering::Acquire);
263 }
264 pub fn set_wal_replay_started_at_ms(&self, ms: u64) {
265 let _ = self.wal_replay_started_at_ms.compare_exchange(
266 0,
267 ms,
268 Ordering::AcqRel,
269 Ordering::Acquire,
270 );
271 }
272 pub fn set_wal_replay_ready_at_ms(&self, ms: u64) {
273 let _ = self.wal_replay_ready_at_ms.compare_exchange(
274 0,
275 ms,
276 Ordering::AcqRel,
277 Ordering::Acquire,
278 );
279 }
280
281 pub fn cold_start_phases(&self) -> ColdStartPhases {
284 ColdStartPhases {
285 started_at_ms: self.started_at_ms,
286 restore_started_at_ms: self.restore_started_at_ms.load(Ordering::Acquire),
287 restore_ready_at_ms: self.restore_ready_at_ms.load(Ordering::Acquire),
288 wal_replay_started_at_ms: self.wal_replay_started_at_ms.load(Ordering::Acquire),
289 wal_replay_ready_at_ms: self.wal_replay_ready_at_ms.load(Ordering::Acquire),
290 index_warmup_started_at_ms: self.index_warmup_started_at_ms.load(Ordering::Acquire),
291 index_warmup_ready_at_ms: self.index_warmup_ready_at_ms.load(Ordering::Acquire),
292 ready_at_ms: self.ready_at_ms.load(Ordering::Acquire),
293 }
294 }
295
296 pub fn phase(&self) -> Phase {
297 Phase::from_u8(self.phase.load(Ordering::Acquire))
298 }
299
300 pub fn started_at_ms(&self) -> u64 {
301 self.started_at_ms
302 }
303
304 pub fn ready_at_ms(&self) -> Option<u64> {
305 match self.ready_at_ms.load(Ordering::Acquire) {
306 0 => None,
307 v => Some(v),
308 }
309 }
310
311 pub fn not_ready_reason(&self) -> Option<String> {
312 self.report.read().not_ready_reason.clone()
313 }
314
315 pub fn shutdown_report(&self) -> Option<ShutdownReport> {
316 self.report.read().shutdown.clone()
317 }
318
319 pub fn set_starting_reason(&self, reason: impl Into<String>) {
324 self.report.write().not_ready_reason = Some(reason.into());
325 }
326
327 pub fn mark_ready(&self) -> bool {
331 let prev = self
332 .phase
333 .compare_exchange(
334 Phase::Starting as u8,
335 Phase::Ready as u8,
336 Ordering::AcqRel,
337 Ordering::Acquire,
338 )
339 .is_ok();
340 if prev {
341 self.ready_at_ms.store(now_ms(), Ordering::Release);
342 self.report.write().not_ready_reason = None;
343 }
344 prev
345 }
346
347 pub fn mark_draining(&self) {
350 let _ = self.phase.compare_exchange(
351 Phase::Ready as u8,
352 Phase::Draining as u8,
353 Ordering::AcqRel,
354 Ordering::Acquire,
355 );
356 }
357
358 pub fn begin_shutdown(&self) -> bool {
364 loop {
365 let current = self.phase.load(Ordering::Acquire);
366 let p = Phase::from_u8(current);
367 match p {
368 Phase::Starting | Phase::Ready | Phase::Draining => {
369 if self
370 .phase
371 .compare_exchange(
372 current,
373 Phase::ShuttingDown as u8,
374 Ordering::AcqRel,
375 Ordering::Acquire,
376 )
377 .is_ok()
378 {
379 return true;
380 }
381 }
383 Phase::ShuttingDown | Phase::Stopped => return false,
384 }
385 }
386 }
387
388 pub fn finish_shutdown(&self, report: ShutdownReport) {
391 self.report.write().shutdown = Some(report);
392 self.phase.store(Phase::Stopped as u8, Ordering::Release);
393 }
394}
395
396fn now_ms() -> u64 {
397 crate::utils::now_unix_millis()
398}