use std::collections::VecDeque;
use std::sync::Mutex;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering::Relaxed};
use std::time::Instant;
pub const LATENCY_SAMPLE_SHIFT: u8 = 6;
#[inline]
pub fn should_sample(seed: u64, shift: u8) -> bool {
if shift >= 63 {
return false;
}
let mask = (1u64 << shift) - 1;
(seed & mask) == 0
}
#[inline]
pub fn sampled_instant(seed: u64, shift: u8) -> Option<Instant> {
should_sample(seed, shift).then(Instant::now)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum CounterMetric {
TxnBegin,
TxnCommit,
TxnAbort,
TxnConflictAbort,
TxnRollback,
TxnRetryAgain,
TreeRetryAgain,
TreeNodeSplit,
TreeNodeMerge,
TreeNodeConsolidate,
WalAppend,
WalSync,
FlushOrphanDataStaged,
FlushOrphanBlobStaged,
FlushOrphanDataCleared,
FlushOrphanBlobCleared,
RecoveryRedoRecord,
RecoveryUndoTxn,
RecoveryWalTruncate,
GcRun,
GcWalRecycleFile,
GcPendingBucketClean,
GcScavengePageScan,
GcScavengePageCompact,
GcDataRewrite,
GcBlobRewrite,
GcDataObsoleteFile,
GcBlobObsoleteFile,
GcAbortCleanCheckpointBucket,
GcAbortCleanWalFileOpen,
FlowFgAdmissionWait,
}
impl CounterMetric {
pub const COUNT: usize = 31;
pub const ALL: [CounterMetric; Self::COUNT] = [
CounterMetric::TxnBegin,
CounterMetric::TxnCommit,
CounterMetric::TxnAbort,
CounterMetric::TxnConflictAbort,
CounterMetric::TxnRollback,
CounterMetric::TxnRetryAgain,
CounterMetric::TreeRetryAgain,
CounterMetric::TreeNodeSplit,
CounterMetric::TreeNodeMerge,
CounterMetric::TreeNodeConsolidate,
CounterMetric::WalAppend,
CounterMetric::WalSync,
CounterMetric::FlushOrphanDataStaged,
CounterMetric::FlushOrphanBlobStaged,
CounterMetric::FlushOrphanDataCleared,
CounterMetric::FlushOrphanBlobCleared,
CounterMetric::RecoveryRedoRecord,
CounterMetric::RecoveryUndoTxn,
CounterMetric::RecoveryWalTruncate,
CounterMetric::GcRun,
CounterMetric::GcWalRecycleFile,
CounterMetric::GcPendingBucketClean,
CounterMetric::GcScavengePageScan,
CounterMetric::GcScavengePageCompact,
CounterMetric::GcDataRewrite,
CounterMetric::GcBlobRewrite,
CounterMetric::GcDataObsoleteFile,
CounterMetric::GcBlobObsoleteFile,
CounterMetric::GcAbortCleanCheckpointBucket,
CounterMetric::GcAbortCleanWalFileOpen,
CounterMetric::FlowFgAdmissionWait,
];
#[inline]
pub const fn idx(self) -> usize {
self as usize
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum GaugeMetric {
RecoveryDirtyEntries,
RecoveryUndoEntries,
}
impl GaugeMetric {
pub const COUNT: usize = 2;
pub const ALL: [GaugeMetric; Self::COUNT] = [
GaugeMetric::RecoveryDirtyEntries,
GaugeMetric::RecoveryUndoEntries,
];
#[inline]
pub const fn idx(self) -> usize {
self as usize
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum HistogramMetric {
TxnCommitMicros,
TxnRollbackMicros,
TreeLinkHoldMicros,
WalAppendBytes,
WalSyncMicros,
RecoveryPhase2Micros,
RecoveryAnalyzeMicros,
RecoveryRedoMicros,
RecoveryUndoMicros,
GcRunMicros,
GcScavengeMicros,
GcDataRewriteMicros,
GcBlobRewriteMicros,
GcDataRewriteVictimFiles,
GcBlobRewriteVictimFiles,
FlowFgAdmissionWaitMicros,
}
impl HistogramMetric {
pub const COUNT: usize = 16;
pub const ALL: [HistogramMetric; Self::COUNT] = [
HistogramMetric::TxnCommitMicros,
HistogramMetric::TxnRollbackMicros,
HistogramMetric::TreeLinkHoldMicros,
HistogramMetric::WalAppendBytes,
HistogramMetric::WalSyncMicros,
HistogramMetric::RecoveryPhase2Micros,
HistogramMetric::RecoveryAnalyzeMicros,
HistogramMetric::RecoveryRedoMicros,
HistogramMetric::RecoveryUndoMicros,
HistogramMetric::GcRunMicros,
HistogramMetric::GcScavengeMicros,
HistogramMetric::GcDataRewriteMicros,
HistogramMetric::GcBlobRewriteMicros,
HistogramMetric::GcDataRewriteVictimFiles,
HistogramMetric::GcBlobRewriteVictimFiles,
HistogramMetric::FlowFgAdmissionWaitMicros,
];
#[inline]
pub const fn idx(self) -> usize {
self as usize
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum EventKind {
TxnConflictAbort,
TxnRollbackComplete,
FlushOrphanDataStaged,
FlushOrphanBlobStaged,
FlushOrphanDataCleared,
FlushOrphanBlobCleared,
RecoveryPhase2Begin,
RecoveryPhase2End,
GcPendingBucketCleaned,
GcDataRewriteComplete,
GcBlobRewriteComplete,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ObserveEvent {
pub kind: EventKind,
pub bucket_id: u64,
pub txid: u64,
pub file_id: u64,
pub value: u64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct HistogramSample {
pub count: u64,
pub sum: u64,
pub max: u64,
}
#[derive(Debug, Clone, Default)]
pub struct ObserveSnapshot {
pub counters: Vec<(CounterMetric, u64)>,
pub gauges: Vec<(GaugeMetric, i64)>,
pub histograms: Vec<(HistogramMetric, HistogramSample)>,
pub events: Vec<ObserveEvent>,
}
pub trait Observer: Send + Sync {
fn counter(&self, _metric: CounterMetric, _delta: u64) {}
fn gauge(&self, _metric: GaugeMetric, _value: i64) {}
fn histogram(&self, _metric: HistogramMetric, _value: u64) {}
fn event(&self, _event: ObserveEvent) {}
}
#[inline]
pub fn observe_elapsed(observer: &dyn Observer, metric: HistogramMetric, started: Option<Instant>) {
if let Some(started) = started {
observer.histogram(metric, started.elapsed().as_micros() as u64);
}
}
#[derive(Default)]
pub struct NoopObserver;
impl Observer for NoopObserver {}
pub struct InMemoryObserver {
counters: [AtomicU64; CounterMetric::COUNT],
gauges: [AtomicI64; GaugeMetric::COUNT],
hist_count: [AtomicU64; HistogramMetric::COUNT],
hist_sum: [AtomicU64; HistogramMetric::COUNT],
hist_max: [AtomicU64; HistogramMetric::COUNT],
events: Mutex<VecDeque<ObserveEvent>>,
event_cap: usize,
}
impl InMemoryObserver {
pub fn new(event_cap: usize) -> Self {
Self {
counters: std::array::from_fn(|_| AtomicU64::new(0)),
gauges: std::array::from_fn(|_| AtomicI64::new(0)),
hist_count: std::array::from_fn(|_| AtomicU64::new(0)),
hist_sum: std::array::from_fn(|_| AtomicU64::new(0)),
hist_max: std::array::from_fn(|_| AtomicU64::new(0)),
events: Mutex::new(VecDeque::new()),
event_cap: event_cap.max(1),
}
}
pub fn snapshot(&self) -> ObserveSnapshot {
let mut counters = Vec::with_capacity(CounterMetric::COUNT);
for metric in CounterMetric::ALL {
counters.push((metric, self.counters[metric.idx()].load(Relaxed)));
}
let mut gauges = Vec::with_capacity(GaugeMetric::COUNT);
for metric in GaugeMetric::ALL {
gauges.push((metric, self.gauges[metric.idx()].load(Relaxed)));
}
let mut histograms = Vec::with_capacity(HistogramMetric::COUNT);
for metric in HistogramMetric::ALL {
let idx = metric.idx();
histograms.push((
metric,
HistogramSample {
count: self.hist_count[idx].load(Relaxed),
sum: self.hist_sum[idx].load(Relaxed),
max: self.hist_max[idx].load(Relaxed),
},
));
}
let events = self
.events
.lock()
.expect("observer events mutex poisoned")
.iter()
.copied()
.collect();
ObserveSnapshot {
counters,
gauges,
histograms,
events,
}
}
}
impl Default for InMemoryObserver {
fn default() -> Self {
Self::new(1024)
}
}
impl Observer for InMemoryObserver {
fn counter(&self, metric: CounterMetric, delta: u64) {
self.counters[metric.idx()].fetch_add(delta, Relaxed);
}
fn gauge(&self, metric: GaugeMetric, value: i64) {
self.gauges[metric.idx()].store(value, Relaxed);
}
fn histogram(&self, metric: HistogramMetric, value: u64) {
let idx = metric.idx();
self.hist_count[idx].fetch_add(1, Relaxed);
self.hist_sum[idx].fetch_add(value, Relaxed);
let slot = &self.hist_max[idx];
let mut current = slot.load(Relaxed);
while value > current {
match slot.compare_exchange_weak(current, value, Relaxed, Relaxed) {
Ok(_) => break,
Err(next) => current = next,
}
}
}
fn event(&self, event: ObserveEvent) {
let mut events = self.events.lock().expect("observer events mutex poisoned");
if events.len() >= self.event_cap {
events.pop_front();
}
events.push_back(event);
}
}
#[cfg(test)]
mod test {
use super::{
CounterMetric, EventKind, GaugeMetric, HistogramMetric, InMemoryObserver, ObserveEvent,
Observer,
};
#[test]
fn snapshot_counts() {
let observer = InMemoryObserver::new(2);
observer.counter(CounterMetric::TxnBegin, 3);
observer.gauge(GaugeMetric::RecoveryDirtyEntries, 7);
observer.histogram(HistogramMetric::WalAppendBytes, 11);
observer.histogram(HistogramMetric::WalAppendBytes, 19);
observer.event(ObserveEvent {
kind: EventKind::RecoveryPhase2Begin,
bucket_id: 0,
txid: 0,
file_id: 0,
value: 0,
});
observer.event(ObserveEvent {
kind: EventKind::RecoveryPhase2End,
bucket_id: 0,
txid: 0,
file_id: 0,
value: 1,
});
observer.event(ObserveEvent {
kind: EventKind::TxnRollbackComplete,
bucket_id: 1,
txid: 2,
file_id: 3,
value: 4,
});
let snapshot = observer.snapshot();
let begin = snapshot
.counters
.iter()
.find(|(m, _)| *m == CounterMetric::TxnBegin)
.map(|(_, v)| *v)
.unwrap();
assert_eq!(begin, 3);
let dirty = snapshot
.gauges
.iter()
.find(|(m, _)| *m == GaugeMetric::RecoveryDirtyEntries)
.map(|(_, v)| *v)
.unwrap();
assert_eq!(dirty, 7);
let wal = snapshot
.histograms
.iter()
.find(|(m, _)| *m == HistogramMetric::WalAppendBytes)
.map(|(_, v)| *v)
.unwrap();
assert_eq!(wal.count, 2);
assert_eq!(wal.sum, 30);
assert_eq!(wal.max, 19);
assert_eq!(snapshot.events.len(), 2);
assert_eq!(snapshot.events[0].kind, EventKind::RecoveryPhase2End);
assert_eq!(snapshot.events[1].kind, EventKind::TxnRollbackComplete);
}
}