use std::collections::BTreeMap;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Instant;
use schema_core::IndexName;
use serde::Serialize;
fn lock<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
mutex
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum Phase {
Starting,
Backfilling,
Live,
Stopped,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum IndexState {
Pending,
Backfilling,
Seeded,
}
#[derive(Debug)]
pub struct Status {
started_at: Instant,
phase: Mutex<Phase>,
indexes: Mutex<BTreeMap<IndexName, IndexState>>,
changes_captured: AtomicU64,
changes_committed: AtomicU64,
documents_built: AtomicU64,
documents_quarantined: AtomicU64,
batches: AtomicU64,
last_flush_micros: AtomicU64,
slot_lag_bytes: AtomicU64,
slot_lag_known: AtomicBool,
errors: AtomicU64,
last_error: Mutex<Option<String>>,
}
impl Status {
pub fn new(indexes: impl IntoIterator<Item = IndexName>, now: Instant) -> Self {
let indexes = indexes
.into_iter()
.map(|index| (index, IndexState::Pending))
.collect();
Self {
started_at: now,
phase: Mutex::new(Phase::Starting),
indexes: Mutex::new(indexes),
changes_captured: AtomicU64::new(0),
changes_committed: AtomicU64::new(0),
documents_built: AtomicU64::new(0),
documents_quarantined: AtomicU64::new(0),
batches: AtomicU64::new(0),
last_flush_micros: AtomicU64::new(0),
slot_lag_bytes: AtomicU64::new(0),
slot_lag_known: AtomicBool::new(false),
errors: AtomicU64::new(0),
last_error: Mutex::new(None),
}
}
pub(crate) fn set_phase(&self, phase: Phase) {
*lock(&self.phase) = phase;
}
pub(crate) fn mark_backfilling(&self, indexes: &[IndexName]) {
let mut map = lock(&self.indexes);
for index in indexes {
map.insert(index.clone(), IndexState::Backfilling);
}
}
pub(crate) fn mark_seeded(&self, index: &IndexName) {
lock(&self.indexes).insert(index.clone(), IndexState::Seeded);
}
pub(crate) fn mark_all_seeded(&self) {
for state in lock(&self.indexes).values_mut() {
if *state != IndexState::Seeded {
*state = IndexState::Seeded;
}
}
}
pub(crate) fn record_capture(&self) {
self.changes_captured.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_commit(&self, changes: u64, documents: u64, flush_micros: u64) {
self.changes_committed.fetch_add(changes, Ordering::Relaxed);
self.documents_built.fetch_add(documents, Ordering::Relaxed);
self.batches.fetch_add(1, Ordering::Relaxed);
self.last_flush_micros
.store(flush_micros, Ordering::Relaxed);
}
pub fn in_flight(&self) -> u64 {
self.changes_captured
.load(Ordering::Relaxed)
.saturating_sub(self.changes_committed.load(Ordering::Relaxed))
}
pub(crate) fn record_quarantine(&self) {
self.documents_quarantined.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_lag(&self, bytes: u64) {
self.slot_lag_bytes.store(bytes, Ordering::Relaxed);
self.slot_lag_known.store(true, Ordering::Relaxed);
}
pub(crate) fn record_error(&self, error: &str) {
self.errors.fetch_add(1, Ordering::Relaxed);
*lock(&self.last_error) = Some(error.to_owned());
}
pub fn snapshot(&self) -> StatusSnapshot {
let captured = self.changes_captured.load(Ordering::Relaxed);
let committed = self.changes_committed.load(Ordering::Relaxed);
StatusSnapshot {
phase: *lock(&self.phase),
uptime_seconds: self.started_at.elapsed().as_secs(),
indexes: lock(&self.indexes)
.iter()
.map(|(name, state)| (name.as_ref().to_owned(), *state))
.collect(),
changes_captured: captured,
changes_committed: committed,
changes_in_flight: captured.saturating_sub(committed),
documents_built: self.documents_built.load(Ordering::Relaxed),
documents_quarantined: self.documents_quarantined.load(Ordering::Relaxed),
batches: self.batches.load(Ordering::Relaxed),
last_flush_micros: self.last_flush_micros.load(Ordering::Relaxed),
slot_lag_bytes: self
.slot_lag_known
.load(Ordering::Relaxed)
.then(|| self.slot_lag_bytes.load(Ordering::Relaxed)),
errors: self.errors.load(Ordering::Relaxed),
last_error: lock(&self.last_error).clone(),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct StatusSnapshot {
pub phase: Phase,
pub uptime_seconds: u64,
pub indexes: BTreeMap<String, IndexState>,
pub changes_captured: u64,
pub changes_committed: u64,
pub changes_in_flight: u64,
pub documents_built: u64,
pub documents_quarantined: u64,
pub batches: u64,
pub last_flush_micros: u64,
pub slot_lag_bytes: Option<u64>,
pub errors: u64,
pub last_error: Option<String>,
}