Skip to main content

wombatkv_node/
embed_metrics.rs

1#![forbid(unsafe_code)]
2//! Lightweight observability for the embeddable KV store.
3//!
4//! Tracks per-operation latency histograms (microseconds) and bytes
5//! moved. Percentile computation is on-demand via sort, fine for the
6//! sample sizes we hit on a single-engine warm path; for very high
7//! throughput an HDR/CKMS-style sketch would be more appropriate.
8
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Mutex;
11
12/// Op tag for metric attribution. Cheap enum so the hot path doesn't
13/// touch a string.
14///
15/// `LoadFoyerRam` / `LoadFoyerSsd` split the legacy `LoadFoyer` bucket
16/// by hit tier. `LoadFoyer` remains for callers that don't / can't
17/// distinguish; new code paths should pick one of the tiered variants.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum Op {
20    Stash,
21    LoadFoyer,
22    LoadFoyerRam,
23    LoadFoyerSsd,
24    LoadS3,
25    Miss,
26    RestoreFromS3,
27}
28
29impl Op {
30    #[must_use]
31    pub fn as_str(self) -> &'static str {
32        match self {
33            Self::Stash => "stash",
34            Self::LoadFoyer => "load_foyer",
35            Self::LoadFoyerRam => "load_foyer_ram",
36            Self::LoadFoyerSsd => "load_foyer_ssd",
37            Self::LoadS3 => "load_s3",
38            Self::Miss => "miss",
39            Self::RestoreFromS3 => "restore_from_s3",
40        }
41    }
42}
43
44/// Snapshot of one op's stats. Cheap to clone.
45#[derive(Debug, Clone, Default)]
46pub struct OpSnapshot {
47    pub op: &'static str,
48    pub count: u64,
49    pub bytes_total: u64,
50    pub micros_total: u64,
51    pub p10_us: u64,
52    pub p25_us: u64,
53    pub p50_us: u64,
54    pub p90_us: u64,
55    pub p95_us: u64,
56    pub p99_us: u64,
57    pub p99_9_us: u64,
58    pub p99_99_us: u64,
59    pub max_us: u64,
60}
61
62impl OpSnapshot {
63    #[must_use]
64    pub fn throughput_mb_per_s(&self) -> f64 {
65        if self.micros_total == 0 {
66            return 0.0;
67        }
68        let mb = (self.bytes_total as f64) / (1024.0 * 1024.0);
69        let s = (self.micros_total as f64) / 1_000_000.0;
70        mb / s
71    }
72}
73
74/// One per-op accumulator. Counters are atomic; samples live in a Mutex
75/// so we can compute exact percentiles on snapshot. Soft cap prevents
76/// unbounded growth.
77struct OpAccumulator {
78    op: Op,
79    count: AtomicU64,
80    bytes_total: AtomicU64,
81    micros_total: AtomicU64,
82    samples: Mutex<Vec<u32>>,
83    max_samples: usize,
84}
85
86impl OpAccumulator {
87    fn new(op: Op, max_samples: usize) -> Self {
88        Self {
89            op,
90            count: AtomicU64::new(0),
91            bytes_total: AtomicU64::new(0),
92            micros_total: AtomicU64::new(0),
93            samples: Mutex::new(Vec::with_capacity(max_samples.min(8192))),
94            max_samples,
95        }
96    }
97
98    fn observe(&self, micros: u64, bytes: u64) {
99        self.count.fetch_add(1, Ordering::Relaxed);
100        self.bytes_total.fetch_add(bytes, Ordering::Relaxed);
101        self.micros_total.fetch_add(micros, Ordering::Relaxed);
102        if let Ok(mut samples) = self.samples.lock() {
103            // Reservoir: when we exceed cap, replace oldest 25% so we
104            // retain a mix of recent + historical samples without growing
105            // unboundedly.
106            if samples.len() >= self.max_samples {
107                let drop = self.max_samples / 4;
108                samples.drain(0..drop);
109            }
110            samples.push(u32::try_from(micros).unwrap_or(u32::MAX));
111        }
112    }
113
114    fn snapshot(&self) -> OpSnapshot {
115        let mut samples = self.samples.lock().map(|guard| guard.clone()).unwrap_or_default();
116        samples.sort_unstable();
117        let count = self.count.load(Ordering::Relaxed);
118        let bytes_total = self.bytes_total.load(Ordering::Relaxed);
119        let micros_total = self.micros_total.load(Ordering::Relaxed);
120
121        OpSnapshot {
122            op: self.op.as_str(),
123            count,
124            bytes_total,
125            micros_total,
126            p10_us: percentile(&samples, 10.0),
127            p25_us: percentile(&samples, 25.0),
128            p50_us: percentile(&samples, 50.0),
129            p90_us: percentile(&samples, 90.0),
130            p95_us: percentile(&samples, 95.0),
131            p99_us: percentile(&samples, 99.0),
132            p99_9_us: percentile(&samples, 99.9),
133            p99_99_us: percentile(&samples, 99.99),
134            max_us: u64::from(samples.last().copied().unwrap_or(0)),
135        }
136    }
137}
138
139fn percentile(sorted_samples: &[u32], pct: f64) -> u64 {
140    if sorted_samples.is_empty() {
141        return 0;
142    }
143    let n = sorted_samples.len();
144    let rank = (pct / 100.0) * (n.saturating_sub(1) as f64);
145    let idx = rank.round() as usize;
146    u64::from(sorted_samples[idx.min(n - 1)])
147}
148
149/// Aggregate metrics across all ops. Cheap to clone; intentionally
150/// process-global so the engine and any cli-stats utility look at the
151/// same registry.
152pub struct EmbedMetrics {
153    stash: OpAccumulator,
154    load_foyer: OpAccumulator,
155    load_foyer_ram: OpAccumulator,
156    load_foyer_ssd: OpAccumulator,
157    load_s3: OpAccumulator,
158    miss: OpAccumulator,
159    restore_from_s3: OpAccumulator,
160}
161
162impl EmbedMetrics {
163    fn new() -> Self {
164        let cap = std::env::var("WMBT_KV_METRICS_MAX_SAMPLES")
165            .ok()
166            .and_then(|s| s.parse::<usize>().ok())
167            .unwrap_or(4096);
168        Self {
169            stash: OpAccumulator::new(Op::Stash, cap),
170            load_foyer: OpAccumulator::new(Op::LoadFoyer, cap),
171            load_foyer_ram: OpAccumulator::new(Op::LoadFoyerRam, cap),
172            load_foyer_ssd: OpAccumulator::new(Op::LoadFoyerSsd, cap),
173            load_s3: OpAccumulator::new(Op::LoadS3, cap),
174            miss: OpAccumulator::new(Op::Miss, cap),
175            restore_from_s3: OpAccumulator::new(Op::RestoreFromS3, cap),
176        }
177    }
178
179    pub fn observe(&self, op: Op, micros: u64, bytes: u64) {
180        match op {
181            Op::Stash => self.stash.observe(micros, bytes),
182            Op::LoadFoyer => self.load_foyer.observe(micros, bytes),
183            Op::LoadFoyerRam => {
184                self.load_foyer.observe(micros, bytes);
185                self.load_foyer_ram.observe(micros, bytes);
186            }
187            Op::LoadFoyerSsd => {
188                self.load_foyer.observe(micros, bytes);
189                self.load_foyer_ssd.observe(micros, bytes);
190            }
191            Op::LoadS3 => self.load_s3.observe(micros, bytes),
192            Op::Miss => self.miss.observe(micros, bytes),
193            Op::RestoreFromS3 => self.restore_from_s3.observe(micros, bytes),
194        }
195    }
196
197    #[must_use]
198    pub fn snapshot_all(&self) -> Vec<OpSnapshot> {
199        vec![
200            self.stash.snapshot(),
201            self.load_foyer.snapshot(),
202            self.load_foyer_ram.snapshot(),
203            self.load_foyer_ssd.snapshot(),
204            self.load_s3.snapshot(),
205            self.miss.snapshot(),
206            self.restore_from_s3.snapshot(),
207        ]
208    }
209
210    /// Render a JSON-line report suitable for log emission.
211    #[must_use]
212    pub fn to_json_lines(&self) -> String {
213        let mut out = String::new();
214        for snap in self.snapshot_all() {
215            out.push_str(&format!(
216                "{{\"scope\":\"wombatkv_metrics\",\"op\":\"{}\",\"count\":{},\"bytes_total\":{},\"throughput_mb_per_s\":{:.3},\"p10_us\":{},\"p25_us\":{},\"p50_us\":{},\"p90_us\":{},\"p95_us\":{},\"p99_us\":{},\"p99_9_us\":{},\"p99_99_us\":{},\"max_us\":{}}}\n",
217                snap.op,
218                snap.count,
219                snap.bytes_total,
220                snap.throughput_mb_per_s(),
221                snap.p10_us,
222                snap.p25_us,
223                snap.p50_us,
224                snap.p90_us,
225                snap.p95_us,
226                snap.p99_us,
227                snap.p99_9_us,
228                snap.p99_99_us,
229                snap.max_us,
230            ));
231        }
232        out
233    }
234}
235
236static GLOBAL: once_cell::sync::Lazy<EmbedMetrics> = once_cell::sync::Lazy::new(EmbedMetrics::new);
237
238/// Borrow the process-global metrics registry.
239#[must_use]
240pub fn metrics() -> &'static EmbedMetrics {
241    &GLOBAL
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn percentiles_compute_correctly_on_known_distribution() {
250        let m = EmbedMetrics::new();
251        for v in 1..=100 {
252            m.observe(Op::Stash, v, 0);
253        }
254        let snap = m.snapshot_all().into_iter().find(|s| s.op == "stash").unwrap();
255        assert_eq!(snap.count, 100);
256        // Percentiles use nearest-rank with round-half-up; for 1..=100
257        // we accept the off-by-one between (50 vs 51) etc.
258        assert!(snap.p50_us == 50 || snap.p50_us == 51);
259        assert!(snap.p99_us == 99 || snap.p99_us == 100);
260        assert_eq!(snap.max_us, 100);
261    }
262
263    #[test]
264    fn throughput_is_computed_per_op() {
265        let m = EmbedMetrics::new();
266        m.observe(Op::Stash, 1_000_000, 1024 * 1024); // 1 MB in 1 s
267        m.observe(Op::Stash, 1_000_000, 1024 * 1024);
268        let snap = m.snapshot_all().into_iter().find(|s| s.op == "stash").unwrap();
269        // 2 MB in 2 s = 1 MB/s
270        assert!((snap.throughput_mb_per_s() - 1.0).abs() < 1e-9);
271    }
272
273    #[test]
274    fn json_line_output_includes_all_ops() {
275        let m = EmbedMetrics::new();
276        m.observe(Op::Stash, 100, 1000);
277        let json = m.to_json_lines();
278        for op in [
279            "stash",
280            "load_foyer",
281            "load_foyer_ram",
282            "load_foyer_ssd",
283            "load_s3",
284            "miss",
285            "restore_from_s3",
286        ] {
287            assert!(json.contains(&format!("\"op\":\"{op}\"")), "missing op {op} in {json}");
288        }
289    }
290
291    #[test]
292    fn load_foyer_ram_increments_both_legacy_and_tiered_counters() {
293        // Dual-recording keeps legacy load_foyer counters intact for
294        // existing dashboards / tests while the new tiered split is
295        // available to callers that want it.
296        let m = EmbedMetrics::new();
297        m.observe(Op::LoadFoyerRam, 50, 1000);
298        let snaps = m.snapshot_all();
299        let by_op = |o: &str| snaps.iter().find(|s| s.op == o).unwrap();
300        assert_eq!(by_op("load_foyer").count, 1);
301        assert_eq!(by_op("load_foyer_ram").count, 1);
302        assert_eq!(by_op("load_foyer_ssd").count, 0);
303    }
304
305    #[test]
306    fn load_foyer_ssd_routes_to_correct_bucket() {
307        let m = EmbedMetrics::new();
308        m.observe(Op::LoadFoyerSsd, 5000, 4_000_000_000);
309        let snaps = m.snapshot_all();
310        let by_op = |o: &str| snaps.iter().find(|s| s.op == o).unwrap();
311        assert_eq!(by_op("load_foyer").count, 1);
312        assert_eq!(by_op("load_foyer_ram").count, 0);
313        assert_eq!(by_op("load_foyer_ssd").count, 1);
314        assert_eq!(by_op("load_foyer_ssd").bytes_total, 4_000_000_000);
315    }
316}
317
318#[cfg(test)]
319mod ops_through_embed {
320    use super::*;
321    use crate::embed::{EmbedConfig, WombatKVKvStore};
322    use crate::foyer_cache::FoyerCacheConfig;
323    use bytes::Bytes;
324    use tempfile::tempdir;
325    use wombatkv_store::wal_store::InMemoryObjectStore;
326
327    fn small_foyer(dir: std::path::PathBuf) -> FoyerCacheConfig {
328        FoyerCacheConfig {
329            ram_bytes: 4 * 1024 * 1024,
330            ssd_dir: dir,
331            ssd_bytes: 16 * 1024 * 1024,
332            block_size: 1024 * 1024,
333            buffer_pool_size: 4 * 1024 * 1024,
334            iouring: false,
335        }
336    }
337
338    #[test]
339    fn put_get_round_trip_records_stash_and_load_foyer_observations() {
340        let dir = tempdir().expect("tempdir");
341        let cfg = EmbedConfig {
342            s3_prefix: "metrics/test".to_string(),
343            foyer: small_foyer(dir.path().to_path_buf()),
344            write_through_s3: true,
345            compression: crate::compression::BlockCompressionConfig::default(),
346        };
347        let store = WombatKVKvStore::new(cfg, InMemoryObjectStore::default()).expect("store");
348        let pre = metrics().snapshot_all();
349        let pre_stash = pre.iter().find(|s| s.op == "stash").unwrap().count;
350        let pre_load = pre.iter().find(|s| s.op == "load_foyer").unwrap().count;
351
352        store.put_kv("ns", "k", Bytes::from_static(b"abc")).expect("put");
353        let _ = store.get_kv("ns", "k").expect("get");
354
355        let post = metrics().snapshot_all();
356        let post_stash = post.iter().find(|s| s.op == "stash").unwrap().count;
357        let post_load = post.iter().find(|s| s.op == "load_foyer").unwrap().count;
358        // metrics() is a process-global singleton; parallel tests pollute
359        // the snapshot. Assert we observed AT LEAST our own put + get,
360        // not strict equality. This makes the test robust to running
361        // alongside the broader (now ~180-test) suite.
362        assert!(
363            post_stash > pre_stash,
364            "expected stash count to grow by ≥1; pre={pre_stash} post={post_stash}"
365        );
366        assert!(
367            post_load > pre_load,
368            "expected load_foyer count to grow by ≥1; pre={pre_load} post={post_load}"
369        );
370    }
371}