1#![forbid(unsafe_code)]
2use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Mutex;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum Op {
20 Stash,
21 LoadFoyer,
22 LoadFoyerRam,
23 LoadFoyerSsd,
24 LoadS3,
25 Miss,
26 RestoreFromS3,
27}
28
29impl Op {
30 #[must_use]
31 pub fn as_str(self) -> &'static str {
32 match self {
33 Self::Stash => "stash",
34 Self::LoadFoyer => "load_foyer",
35 Self::LoadFoyerRam => "load_foyer_ram",
36 Self::LoadFoyerSsd => "load_foyer_ssd",
37 Self::LoadS3 => "load_s3",
38 Self::Miss => "miss",
39 Self::RestoreFromS3 => "restore_from_s3",
40 }
41 }
42}
43
44#[derive(Debug, Clone, Default)]
46pub struct OpSnapshot {
47 pub op: &'static str,
48 pub count: u64,
49 pub bytes_total: u64,
50 pub micros_total: u64,
51 pub p10_us: u64,
52 pub p25_us: u64,
53 pub p50_us: u64,
54 pub p90_us: u64,
55 pub p95_us: u64,
56 pub p99_us: u64,
57 pub p99_9_us: u64,
58 pub p99_99_us: u64,
59 pub max_us: u64,
60}
61
62impl OpSnapshot {
63 #[must_use]
64 pub fn throughput_mb_per_s(&self) -> f64 {
65 if self.micros_total == 0 {
66 return 0.0;
67 }
68 let mb = (self.bytes_total as f64) / (1024.0 * 1024.0);
69 let s = (self.micros_total as f64) / 1_000_000.0;
70 mb / s
71 }
72}
73
74struct OpAccumulator {
78 op: Op,
79 count: AtomicU64,
80 bytes_total: AtomicU64,
81 micros_total: AtomicU64,
82 samples: Mutex<Vec<u32>>,
83 max_samples: usize,
84}
85
86impl OpAccumulator {
87 fn new(op: Op, max_samples: usize) -> Self {
88 Self {
89 op,
90 count: AtomicU64::new(0),
91 bytes_total: AtomicU64::new(0),
92 micros_total: AtomicU64::new(0),
93 samples: Mutex::new(Vec::with_capacity(max_samples.min(8192))),
94 max_samples,
95 }
96 }
97
98 fn observe(&self, micros: u64, bytes: u64) {
99 self.count.fetch_add(1, Ordering::Relaxed);
100 self.bytes_total.fetch_add(bytes, Ordering::Relaxed);
101 self.micros_total.fetch_add(micros, Ordering::Relaxed);
102 if let Ok(mut samples) = self.samples.lock() {
103 if samples.len() >= self.max_samples {
107 let drop = self.max_samples / 4;
108 samples.drain(0..drop);
109 }
110 samples.push(u32::try_from(micros).unwrap_or(u32::MAX));
111 }
112 }
113
114 fn snapshot(&self) -> OpSnapshot {
115 let mut samples = self.samples.lock().map(|guard| guard.clone()).unwrap_or_default();
116 samples.sort_unstable();
117 let count = self.count.load(Ordering::Relaxed);
118 let bytes_total = self.bytes_total.load(Ordering::Relaxed);
119 let micros_total = self.micros_total.load(Ordering::Relaxed);
120
121 OpSnapshot {
122 op: self.op.as_str(),
123 count,
124 bytes_total,
125 micros_total,
126 p10_us: percentile(&samples, 10.0),
127 p25_us: percentile(&samples, 25.0),
128 p50_us: percentile(&samples, 50.0),
129 p90_us: percentile(&samples, 90.0),
130 p95_us: percentile(&samples, 95.0),
131 p99_us: percentile(&samples, 99.0),
132 p99_9_us: percentile(&samples, 99.9),
133 p99_99_us: percentile(&samples, 99.99),
134 max_us: u64::from(samples.last().copied().unwrap_or(0)),
135 }
136 }
137}
138
139fn percentile(sorted_samples: &[u32], pct: f64) -> u64 {
140 if sorted_samples.is_empty() {
141 return 0;
142 }
143 let n = sorted_samples.len();
144 let rank = (pct / 100.0) * (n.saturating_sub(1) as f64);
145 let idx = rank.round() as usize;
146 u64::from(sorted_samples[idx.min(n - 1)])
147}
148
149pub struct EmbedMetrics {
153 stash: OpAccumulator,
154 load_foyer: OpAccumulator,
155 load_foyer_ram: OpAccumulator,
156 load_foyer_ssd: OpAccumulator,
157 load_s3: OpAccumulator,
158 miss: OpAccumulator,
159 restore_from_s3: OpAccumulator,
160}
161
162impl EmbedMetrics {
163 fn new() -> Self {
164 let cap = std::env::var("WMBT_KV_METRICS_MAX_SAMPLES")
165 .ok()
166 .and_then(|s| s.parse::<usize>().ok())
167 .unwrap_or(4096);
168 Self {
169 stash: OpAccumulator::new(Op::Stash, cap),
170 load_foyer: OpAccumulator::new(Op::LoadFoyer, cap),
171 load_foyer_ram: OpAccumulator::new(Op::LoadFoyerRam, cap),
172 load_foyer_ssd: OpAccumulator::new(Op::LoadFoyerSsd, cap),
173 load_s3: OpAccumulator::new(Op::LoadS3, cap),
174 miss: OpAccumulator::new(Op::Miss, cap),
175 restore_from_s3: OpAccumulator::new(Op::RestoreFromS3, cap),
176 }
177 }
178
179 pub fn observe(&self, op: Op, micros: u64, bytes: u64) {
180 match op {
181 Op::Stash => self.stash.observe(micros, bytes),
182 Op::LoadFoyer => self.load_foyer.observe(micros, bytes),
183 Op::LoadFoyerRam => {
184 self.load_foyer.observe(micros, bytes);
185 self.load_foyer_ram.observe(micros, bytes);
186 }
187 Op::LoadFoyerSsd => {
188 self.load_foyer.observe(micros, bytes);
189 self.load_foyer_ssd.observe(micros, bytes);
190 }
191 Op::LoadS3 => self.load_s3.observe(micros, bytes),
192 Op::Miss => self.miss.observe(micros, bytes),
193 Op::RestoreFromS3 => self.restore_from_s3.observe(micros, bytes),
194 }
195 }
196
197 #[must_use]
198 pub fn snapshot_all(&self) -> Vec<OpSnapshot> {
199 vec![
200 self.stash.snapshot(),
201 self.load_foyer.snapshot(),
202 self.load_foyer_ram.snapshot(),
203 self.load_foyer_ssd.snapshot(),
204 self.load_s3.snapshot(),
205 self.miss.snapshot(),
206 self.restore_from_s3.snapshot(),
207 ]
208 }
209
210 #[must_use]
212 pub fn to_json_lines(&self) -> String {
213 let mut out = String::new();
214 for snap in self.snapshot_all() {
215 out.push_str(&format!(
216 "{{\"scope\":\"wombatkv_metrics\",\"op\":\"{}\",\"count\":{},\"bytes_total\":{},\"throughput_mb_per_s\":{:.3},\"p10_us\":{},\"p25_us\":{},\"p50_us\":{},\"p90_us\":{},\"p95_us\":{},\"p99_us\":{},\"p99_9_us\":{},\"p99_99_us\":{},\"max_us\":{}}}\n",
217 snap.op,
218 snap.count,
219 snap.bytes_total,
220 snap.throughput_mb_per_s(),
221 snap.p10_us,
222 snap.p25_us,
223 snap.p50_us,
224 snap.p90_us,
225 snap.p95_us,
226 snap.p99_us,
227 snap.p99_9_us,
228 snap.p99_99_us,
229 snap.max_us,
230 ));
231 }
232 out
233 }
234}
235
236static GLOBAL: once_cell::sync::Lazy<EmbedMetrics> = once_cell::sync::Lazy::new(EmbedMetrics::new);
237
238#[must_use]
240pub fn metrics() -> &'static EmbedMetrics {
241 &GLOBAL
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn percentiles_compute_correctly_on_known_distribution() {
250 let m = EmbedMetrics::new();
251 for v in 1..=100 {
252 m.observe(Op::Stash, v, 0);
253 }
254 let snap = m.snapshot_all().into_iter().find(|s| s.op == "stash").unwrap();
255 assert_eq!(snap.count, 100);
256 assert!(snap.p50_us == 50 || snap.p50_us == 51);
259 assert!(snap.p99_us == 99 || snap.p99_us == 100);
260 assert_eq!(snap.max_us, 100);
261 }
262
263 #[test]
264 fn throughput_is_computed_per_op() {
265 let m = EmbedMetrics::new();
266 m.observe(Op::Stash, 1_000_000, 1024 * 1024); m.observe(Op::Stash, 1_000_000, 1024 * 1024);
268 let snap = m.snapshot_all().into_iter().find(|s| s.op == "stash").unwrap();
269 assert!((snap.throughput_mb_per_s() - 1.0).abs() < 1e-9);
271 }
272
273 #[test]
274 fn json_line_output_includes_all_ops() {
275 let m = EmbedMetrics::new();
276 m.observe(Op::Stash, 100, 1000);
277 let json = m.to_json_lines();
278 for op in [
279 "stash",
280 "load_foyer",
281 "load_foyer_ram",
282 "load_foyer_ssd",
283 "load_s3",
284 "miss",
285 "restore_from_s3",
286 ] {
287 assert!(json.contains(&format!("\"op\":\"{op}\"")), "missing op {op} in {json}");
288 }
289 }
290
291 #[test]
292 fn load_foyer_ram_increments_both_legacy_and_tiered_counters() {
293 let m = EmbedMetrics::new();
297 m.observe(Op::LoadFoyerRam, 50, 1000);
298 let snaps = m.snapshot_all();
299 let by_op = |o: &str| snaps.iter().find(|s| s.op == o).unwrap();
300 assert_eq!(by_op("load_foyer").count, 1);
301 assert_eq!(by_op("load_foyer_ram").count, 1);
302 assert_eq!(by_op("load_foyer_ssd").count, 0);
303 }
304
305 #[test]
306 fn load_foyer_ssd_routes_to_correct_bucket() {
307 let m = EmbedMetrics::new();
308 m.observe(Op::LoadFoyerSsd, 5000, 4_000_000_000);
309 let snaps = m.snapshot_all();
310 let by_op = |o: &str| snaps.iter().find(|s| s.op == o).unwrap();
311 assert_eq!(by_op("load_foyer").count, 1);
312 assert_eq!(by_op("load_foyer_ram").count, 0);
313 assert_eq!(by_op("load_foyer_ssd").count, 1);
314 assert_eq!(by_op("load_foyer_ssd").bytes_total, 4_000_000_000);
315 }
316}
317
318#[cfg(test)]
319mod ops_through_embed {
320 use super::*;
321 use crate::embed::{EmbedConfig, WombatKVKvStore};
322 use crate::foyer_cache::FoyerCacheConfig;
323 use bytes::Bytes;
324 use tempfile::tempdir;
325 use wombatkv_store::wal_store::InMemoryObjectStore;
326
327 fn small_foyer(dir: std::path::PathBuf) -> FoyerCacheConfig {
328 FoyerCacheConfig {
329 ram_bytes: 4 * 1024 * 1024,
330 ssd_dir: dir,
331 ssd_bytes: 16 * 1024 * 1024,
332 block_size: 1024 * 1024,
333 buffer_pool_size: 4 * 1024 * 1024,
334 iouring: false,
335 }
336 }
337
338 #[test]
339 fn put_get_round_trip_records_stash_and_load_foyer_observations() {
340 let dir = tempdir().expect("tempdir");
341 let cfg = EmbedConfig {
342 s3_prefix: "metrics/test".to_string(),
343 foyer: small_foyer(dir.path().to_path_buf()),
344 write_through_s3: true,
345 compression: crate::compression::BlockCompressionConfig::default(),
346 };
347 let store = WombatKVKvStore::new(cfg, InMemoryObjectStore::default()).expect("store");
348 let pre = metrics().snapshot_all();
349 let pre_stash = pre.iter().find(|s| s.op == "stash").unwrap().count;
350 let pre_load = pre.iter().find(|s| s.op == "load_foyer").unwrap().count;
351
352 store.put_kv("ns", "k", Bytes::from_static(b"abc")).expect("put");
353 let _ = store.get_kv("ns", "k").expect("get");
354
355 let post = metrics().snapshot_all();
356 let post_stash = post.iter().find(|s| s.op == "stash").unwrap().count;
357 let post_load = post.iter().find(|s| s.op == "load_foyer").unwrap().count;
358 assert!(
363 post_stash > pre_stash,
364 "expected stash count to grow by ≥1; pre={pre_stash} post={post_stash}"
365 );
366 assert!(
367 post_load > pre_load,
368 "expected load_foyer count to grow by ≥1; pre={pre_load} post={post_load}"
369 );
370 }
371}