Skip to main content

daemon/
status.rs

1//! The live operational state of a running daemon.
2//!
3//! [`Status`] is the shared handle the crate's status observer (`StatusObserver`)
4//! writes to as the engine emits events, and the HTTP `/status` endpoint reads
5//! from. It
6//! holds only fast, lock-light state (atomics for counters, short-held mutexes
7//! for the phase, the per-index map, and the last error) so updating it never
8//! blocks the pipeline's hot path.
9
10use std::collections::BTreeMap;
11use std::sync::Mutex;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::time::Instant;
14
15use schema_core::IndexName;
16use serde::Serialize;
17
18/// Recover a poisoned mutex rather than panicking — a writer that panicked
19/// mid-update leaves at worst slightly stale status, never a downed endpoint.
20/// (`.lock().unwrap()` is forbidden by the workspace lints anyway.)
21fn lock<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
22    mutex
23        .lock()
24        .unwrap_or_else(std::sync::PoisonError::into_inner)
25}
26
27/// The pipeline's overall phase, in the order the engine moves through them.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "snake_case")]
30pub enum Phase {
31    /// Starting up; indexes not yet ensured.
32    Starting,
33    /// Seeding one or more unseeded indexes from a snapshot.
34    Backfilling,
35    /// Following live changes.
36    Live,
37    /// The run has ended (clean stop or error).
38    Stopped,
39}
40
41/// Where one index is in its lifecycle.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
43#[serde(rename_all = "snake_case")]
44pub enum IndexState {
45    /// Not yet known to be seeded this run.
46    Pending,
47    /// Its initial backfill is in progress.
48    Backfilling,
49    /// Seeded — either backfilled this run or already seeded on start.
50    Seeded,
51}
52
53/// Shared, mutable operational state. Cheap to update concurrently from the
54/// observer; snapshotted to JSON for the `/status` endpoint.
55#[derive(Debug)]
56pub struct Status {
57    started_at: Instant,
58    phase: Mutex<Phase>,
59    indexes: Mutex<BTreeMap<IndexName, IndexState>>,
60    changes_captured: AtomicU64,
61    changes_committed: AtomicU64,
62    documents_built: AtomicU64,
63    documents_quarantined: AtomicU64,
64    batches: AtomicU64,
65    last_flush_micros: AtomicU64,
66    slot_lag_bytes: AtomicU64,
67    slot_lag_known: AtomicBool,
68    errors: AtomicU64,
69    last_error: Mutex<Option<String>>,
70}
71
72impl Status {
73    /// A fresh status with every configured index `Pending`. `now` is the start
74    /// instant uptime is measured from.
75    pub fn new(indexes: impl IntoIterator<Item = IndexName>, now: Instant) -> Self {
76        let indexes = indexes
77            .into_iter()
78            .map(|index| (index, IndexState::Pending))
79            .collect();
80        Self {
81            started_at: now,
82            phase: Mutex::new(Phase::Starting),
83            indexes: Mutex::new(indexes),
84            changes_captured: AtomicU64::new(0),
85            changes_committed: AtomicU64::new(0),
86            documents_built: AtomicU64::new(0),
87            documents_quarantined: AtomicU64::new(0),
88            batches: AtomicU64::new(0),
89            last_flush_micros: AtomicU64::new(0),
90            slot_lag_bytes: AtomicU64::new(0),
91            slot_lag_known: AtomicBool::new(false),
92            errors: AtomicU64::new(0),
93            last_error: Mutex::new(None),
94        }
95    }
96
97    pub(crate) fn set_phase(&self, phase: Phase) {
98        *lock(&self.phase) = phase;
99    }
100
101    pub(crate) fn mark_backfilling(&self, indexes: &[IndexName]) {
102        let mut map = lock(&self.indexes);
103        for index in indexes {
104            map.insert(index.clone(), IndexState::Backfilling);
105        }
106    }
107
108    pub(crate) fn mark_seeded(&self, index: &IndexName) {
109        lock(&self.indexes).insert(index.clone(), IndexState::Seeded);
110    }
111
112    /// Reaching live capture means every index is seeded by definition, so any
113    /// still `Pending` (already seeded before this run, never backfilled here)
114    /// is promoted to `Seeded`.
115    pub(crate) fn mark_all_seeded(&self) {
116        for state in lock(&self.indexes).values_mut() {
117            if *state != IndexState::Seeded {
118                *state = IndexState::Seeded;
119            }
120        }
121    }
122
123    pub(crate) fn record_capture(&self) {
124        self.changes_captured.fetch_add(1, Ordering::Relaxed);
125    }
126
127    pub(crate) fn record_commit(&self, changes: u64, documents: u64, flush_micros: u64) {
128        self.changes_committed.fetch_add(changes, Ordering::Relaxed);
129        self.documents_built.fetch_add(documents, Ordering::Relaxed);
130        self.batches.fetch_add(1, Ordering::Relaxed);
131        self.last_flush_micros
132            .store(flush_micros, Ordering::Relaxed);
133    }
134
135    /// Changes captured but not yet committed — the queue/back-pressure signal.
136    /// A cheap two-atomic read, safe to call from a metrics collection thread
137    /// (e.g. an observable-gauge callback in the binary).
138    pub fn in_flight(&self) -> u64 {
139        self.changes_captured
140            .load(Ordering::Relaxed)
141            .saturating_sub(self.changes_committed.load(Ordering::Relaxed))
142    }
143
144    pub(crate) fn record_quarantine(&self) {
145        self.documents_quarantined.fetch_add(1, Ordering::Relaxed);
146    }
147
148    pub(crate) fn record_lag(&self, bytes: u64) {
149        self.slot_lag_bytes.store(bytes, Ordering::Relaxed);
150        self.slot_lag_known.store(true, Ordering::Relaxed);
151    }
152
153    pub(crate) fn record_error(&self, error: &str) {
154        self.errors.fetch_add(1, Ordering::Relaxed);
155        *lock(&self.last_error) = Some(error.to_owned());
156    }
157
158    /// A point-in-time, serializable view of the status for the HTTP endpoint.
159    pub fn snapshot(&self) -> StatusSnapshot {
160        let captured = self.changes_captured.load(Ordering::Relaxed);
161        let committed = self.changes_committed.load(Ordering::Relaxed);
162        StatusSnapshot {
163            phase: *lock(&self.phase),
164            uptime_seconds: self.started_at.elapsed().as_secs(),
165            indexes: lock(&self.indexes)
166                .iter()
167                .map(|(name, state)| (name.as_ref().to_owned(), *state))
168                .collect(),
169            changes_captured: captured,
170            changes_committed: committed,
171            changes_in_flight: captured.saturating_sub(committed),
172            documents_built: self.documents_built.load(Ordering::Relaxed),
173            documents_quarantined: self.documents_quarantined.load(Ordering::Relaxed),
174            batches: self.batches.load(Ordering::Relaxed),
175            last_flush_micros: self.last_flush_micros.load(Ordering::Relaxed),
176            slot_lag_bytes: self
177                .slot_lag_known
178                .load(Ordering::Relaxed)
179                .then(|| self.slot_lag_bytes.load(Ordering::Relaxed)),
180            errors: self.errors.load(Ordering::Relaxed),
181            last_error: lock(&self.last_error).clone(),
182        }
183    }
184}
185
186/// A serializable snapshot of [`Status`], returned as JSON by `/status`.
187#[derive(Debug, Clone, Serialize)]
188pub struct StatusSnapshot {
189    pub phase: Phase,
190    pub uptime_seconds: u64,
191    pub indexes: BTreeMap<String, IndexState>,
192    pub changes_captured: u64,
193    pub changes_committed: u64,
194    pub changes_in_flight: u64,
195    pub documents_built: u64,
196    /// Documents the sink rejected and the engine skipped (failure policy
197    /// `skip`). A non-zero value means data is being dropped — alert on it.
198    pub documents_quarantined: u64,
199    pub batches: u64,
200    pub last_flush_micros: u64,
201    /// `None` until the source first reports lag (e.g. the slot doesn't exist
202    /// yet), otherwise bytes the destination trails the source by.
203    pub slot_lag_bytes: Option<u64>,
204    pub errors: u64,
205    pub last_error: Option<String>,
206}