1use std::collections::BTreeMap;
11use std::sync::Mutex;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::time::Instant;
14
15use schema_core::IndexName;
16use serde::Serialize;
17
18fn lock<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
22 mutex
23 .lock()
24 .unwrap_or_else(std::sync::PoisonError::into_inner)
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "snake_case")]
30pub enum Phase {
31 Starting,
33 Backfilling,
35 Live,
37 Stopped,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
43#[serde(rename_all = "snake_case")]
44pub enum IndexState {
45 Pending,
47 Backfilling,
49 Seeded,
51}
52
53#[derive(Debug)]
56pub struct Status {
57 started_at: Instant,
58 phase: Mutex<Phase>,
59 indexes: Mutex<BTreeMap<IndexName, IndexState>>,
60 changes_captured: AtomicU64,
61 changes_committed: AtomicU64,
62 documents_built: AtomicU64,
63 documents_quarantined: AtomicU64,
64 batches: AtomicU64,
65 last_flush_micros: AtomicU64,
66 slot_lag_bytes: AtomicU64,
67 slot_lag_known: AtomicBool,
68 errors: AtomicU64,
69 last_error: Mutex<Option<String>>,
70}
71
72impl Status {
73 pub fn new(indexes: impl IntoIterator<Item = IndexName>, now: Instant) -> Self {
76 let indexes = indexes
77 .into_iter()
78 .map(|index| (index, IndexState::Pending))
79 .collect();
80 Self {
81 started_at: now,
82 phase: Mutex::new(Phase::Starting),
83 indexes: Mutex::new(indexes),
84 changes_captured: AtomicU64::new(0),
85 changes_committed: AtomicU64::new(0),
86 documents_built: AtomicU64::new(0),
87 documents_quarantined: AtomicU64::new(0),
88 batches: AtomicU64::new(0),
89 last_flush_micros: AtomicU64::new(0),
90 slot_lag_bytes: AtomicU64::new(0),
91 slot_lag_known: AtomicBool::new(false),
92 errors: AtomicU64::new(0),
93 last_error: Mutex::new(None),
94 }
95 }
96
97 pub(crate) fn set_phase(&self, phase: Phase) {
98 *lock(&self.phase) = phase;
99 }
100
101 pub(crate) fn mark_backfilling(&self, indexes: &[IndexName]) {
102 let mut map = lock(&self.indexes);
103 for index in indexes {
104 map.insert(index.clone(), IndexState::Backfilling);
105 }
106 }
107
108 pub(crate) fn mark_seeded(&self, index: &IndexName) {
109 lock(&self.indexes).insert(index.clone(), IndexState::Seeded);
110 }
111
112 pub(crate) fn mark_all_seeded(&self) {
116 for state in lock(&self.indexes).values_mut() {
117 if *state != IndexState::Seeded {
118 *state = IndexState::Seeded;
119 }
120 }
121 }
122
123 pub(crate) fn record_capture(&self) {
124 self.changes_captured.fetch_add(1, Ordering::Relaxed);
125 }
126
127 pub(crate) fn record_commit(&self, changes: u64, documents: u64, flush_micros: u64) {
128 self.changes_committed.fetch_add(changes, Ordering::Relaxed);
129 self.documents_built.fetch_add(documents, Ordering::Relaxed);
130 self.batches.fetch_add(1, Ordering::Relaxed);
131 self.last_flush_micros
132 .store(flush_micros, Ordering::Relaxed);
133 }
134
135 pub fn in_flight(&self) -> u64 {
139 self.changes_captured
140 .load(Ordering::Relaxed)
141 .saturating_sub(self.changes_committed.load(Ordering::Relaxed))
142 }
143
144 pub(crate) fn record_quarantine(&self) {
145 self.documents_quarantined.fetch_add(1, Ordering::Relaxed);
146 }
147
148 pub(crate) fn record_lag(&self, bytes: u64) {
149 self.slot_lag_bytes.store(bytes, Ordering::Relaxed);
150 self.slot_lag_known.store(true, Ordering::Relaxed);
151 }
152
153 pub(crate) fn record_error(&self, error: &str) {
154 self.errors.fetch_add(1, Ordering::Relaxed);
155 *lock(&self.last_error) = Some(error.to_owned());
156 }
157
158 pub fn snapshot(&self) -> StatusSnapshot {
160 let captured = self.changes_captured.load(Ordering::Relaxed);
161 let committed = self.changes_committed.load(Ordering::Relaxed);
162 StatusSnapshot {
163 phase: *lock(&self.phase),
164 uptime_seconds: self.started_at.elapsed().as_secs(),
165 indexes: lock(&self.indexes)
166 .iter()
167 .map(|(name, state)| (name.as_ref().to_owned(), *state))
168 .collect(),
169 changes_captured: captured,
170 changes_committed: committed,
171 changes_in_flight: captured.saturating_sub(committed),
172 documents_built: self.documents_built.load(Ordering::Relaxed),
173 documents_quarantined: self.documents_quarantined.load(Ordering::Relaxed),
174 batches: self.batches.load(Ordering::Relaxed),
175 last_flush_micros: self.last_flush_micros.load(Ordering::Relaxed),
176 slot_lag_bytes: self
177 .slot_lag_known
178 .load(Ordering::Relaxed)
179 .then(|| self.slot_lag_bytes.load(Ordering::Relaxed)),
180 errors: self.errors.load(Ordering::Relaxed),
181 last_error: lock(&self.last_error).clone(),
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize)]
188pub struct StatusSnapshot {
189 pub phase: Phase,
190 pub uptime_seconds: u64,
191 pub indexes: BTreeMap<String, IndexState>,
192 pub changes_captured: u64,
193 pub changes_committed: u64,
194 pub changes_in_flight: u64,
195 pub documents_built: u64,
196 pub documents_quarantined: u64,
199 pub batches: u64,
200 pub last_flush_micros: u64,
201 pub slot_lag_bytes: Option<u64>,
204 pub errors: u64,
205 pub last_error: Option<String>,
206}