1use std::sync::Arc;
33use std::time::Instant;
34
35use axum::extract::{MatchedPath, Request, State};
36use axum::http::StatusCode;
37use axum::middleware::Next;
38use axum::response::{IntoResponse, Response};
39use prometheus_client::encoding::{EncodeLabelSet, text::encode};
40use prometheus_client::metrics::counter::Counter;
41use prometheus_client::metrics::family::Family;
42use prometheus_client::metrics::gauge::Gauge;
43use prometheus_client::metrics::histogram::{Histogram, exponential_buckets};
44use prometheus_client::registry::Registry;
45
46use crate::state::AppState;
47
48#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
54pub struct HttpRequestLabels {
55 pub method: String,
57 pub route: String,
61 pub status: String,
63}
64
65#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
70pub struct AdvanceHeadLabels {
71 pub result: String,
75}
76
77#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
81pub struct LeidenModeLabels {
82 pub mode: String,
84}
85
86#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
91pub struct PprSizeGateLabels {
92 pub reason: String,
96}
97
98#[derive(Clone)]
104pub struct Metrics {
105 registry: Arc<Registry>,
106 pub http_requests: Family<HttpRequestLabels, Counter>,
108 pub http_duration: Histogram,
111 pub retrieve_latency: Histogram,
115 pub commit_duration: Histogram,
118 pub ingest_duration: Histogram,
123 pub ingest_chunks: Counter,
127 pub remote_fetch_blocks: Counter,
129 pub remote_push_blocks: Counter,
132 pub remote_advance_head: Family<AdvanceHeadLabels, Counter>,
135 pub leiden_mode: Family<LeidenModeLabels, Counter>,
139 pub leiden_debounce_effective: Gauge,
141 pub leiden_storm_cap_effective: Gauge,
143 pub leiden_delta_ratio_effective: Gauge,
146 pub leiden_mode_current: Gauge,
148 pub traverse_answer_hard_wall_ms_effective: Gauge,
150 pub traverse_answer_max_hops_effective: Gauge,
152 pub traverse_answer_hard_wall_exceeded: Counter,
154 pub ppr_size_gate_skipped: Family<PprSizeGateLabels, Counter>,
157 pub ppr_size_gate_threshold: Gauge,
160}
161
162impl Metrics {
163 #[must_use]
171 pub fn new() -> Self {
172 let mut registry = Registry::default();
173
174 let http_requests = Family::<HttpRequestLabels, Counter>::default();
175 registry.register(
176 "mnem_http_requests_total",
177 "Total HTTP requests handled by mnem http, bucketed by method, route, and status.",
178 http_requests.clone(),
179 );
180
181 let http_duration = Histogram::new(exponential_buckets(0.001, 2.0, 14));
183 registry.register(
184 "mnem_http_request_duration_seconds",
185 "HTTP request duration in seconds, from axum route match to response body sent.",
186 http_duration.clone(),
187 );
188
189 let retrieve_latency = Histogram::new(exponential_buckets(0.0001, 2.0, 17));
191 registry.register(
192 "mnem_retrieve_latency_seconds",
193 "Retrieval pipeline latency in seconds, measured around the `Retriever::execute` call.",
194 retrieve_latency.clone(),
195 );
196
197 let commit_duration = Histogram::new(exponential_buckets(0.0001, 2.0, 17));
199 registry.register(
200 "mnem_commit_duration_seconds",
201 "Transaction commit duration in seconds, measured around Transaction::commit_opts.",
202 commit_duration.clone(),
203 );
204
205 let ingest_duration = Histogram::new(exponential_buckets(0.001, 2.0, 14));
209 registry.register(
210 "mnem_ingest_duration_seconds",
211 "End-to-end ingest duration in seconds, measured around the full POST /v1/ingest run.",
212 ingest_duration.clone(),
213 );
214
215 let ingest_chunks = Counter::default();
216 registry.register(
217 "mnem_ingest_chunks_total",
218 "Total chunks produced across every successful POST /v1/ingest call.",
219 ingest_chunks.clone(),
220 );
221
222 let remote_fetch_blocks = Counter::default();
227 registry.register(
228 "mnem_remote_fetch_blocks_total",
229 "Total `/remote/v1/fetch-blocks` invocations that produced a CAR response.",
230 remote_fetch_blocks.clone(),
231 );
232 let remote_push_blocks = Counter::default();
233 registry.register(
234 "mnem_remote_push_blocks_total",
235 "Total `/remote/v1/push-blocks` invocations that completed an import.",
236 remote_push_blocks.clone(),
237 );
238 let remote_advance_head = Family::<AdvanceHeadLabels, Counter>::default();
239 registry.register(
240 "mnem_remote_advance_head_total",
241 "Total `/remote/v1/advance-head` invocations bucketed by result (success, cas_mismatch, auth_fail).",
242 remote_advance_head.clone(),
243 );
244
245 let leiden_mode = Family::<LeidenModeLabels, Counter>::default();
247 registry.register(
248 "mnem_leiden_mode_total",
249 "Total Leiden community-cache serves bucketed by mode (full, full_debounced, fallback_stale).",
250 leiden_mode.clone(),
251 );
252 let leiden_debounce_effective = Gauge::default();
253 registry.register(
254 "mnem_leiden_debounce_effective",
255 "Effective Leiden debounce window in ms (max(1000, rolling p75 commit latency)).",
256 leiden_debounce_effective.clone(),
257 );
258 let leiden_storm_cap_effective = Gauge::default();
259 registry.register(
260 "mnem_leiden_storm_cap_effective",
261 "Effective commit-storm cap per minute (floor-c tunable; default 60).",
262 leiden_storm_cap_effective.clone(),
263 );
264 let leiden_delta_ratio_effective = Gauge::default();
265 registry.register(
266 "mnem_leiden_delta_ratio_effective",
267 "Effective delta_ratio_force_full rendered as parts-per-ten-thousand.",
268 leiden_delta_ratio_effective.clone(),
269 );
270 let leiden_mode_current = Gauge::default();
271 registry.register(
272 "mnem_leiden_mode_current",
273 "Current Leiden mode: 0=full, 1=full_debounced, 2=fallback_stale.",
274 leiden_mode_current.clone(),
275 );
276
277 let traverse_answer_hard_wall_ms_effective = Gauge::default();
279 registry.register(
280 "mnem_traverse_answer_hard_wall_ms_effective",
281 "Effective hard-wall latency budget for /v1/traverse_answer in ms.",
282 traverse_answer_hard_wall_ms_effective.clone(),
283 );
284 let traverse_answer_max_hops_effective = Gauge::default();
285 registry.register(
286 "mnem_traverse_answer_max_hops_effective",
287 "Effective max-hops for /v1/traverse_answer.",
288 traverse_answer_max_hops_effective.clone(),
289 );
290 let traverse_answer_hard_wall_exceeded = Counter::default();
291 registry.register(
292 "mnem_traverse_answer_hard_wall_exceeded_total",
293 "Total /v1/traverse_answer requests that breached the hard-wall budget.",
294 traverse_answer_hard_wall_exceeded.clone(),
295 );
296
297 let ppr_size_gate_skipped = Family::<PprSizeGateLabels, Counter>::default();
299 registry.register(
300 "mnem_ppr_size_gate_skipped_total",
301 "Total PPR requests skipped by the default-on size gate, bucketed by reason (above_threshold, opted_out).",
302 ppr_size_gate_skipped.clone(),
303 );
304 let ppr_size_gate_threshold = Gauge::default();
305 registry.register(
306 "mnem_ppr_size_gate_threshold",
307 "Effective PPR size-gate node threshold (mirrors PPR_DEFAULT_MAX_NODES).",
308 ppr_size_gate_threshold.clone(),
309 );
310 #[allow(clippy::cast_possible_wrap)]
313 ppr_size_gate_threshold.set(mnem_core::ppr::PPR_DEFAULT_MAX_NODES as i64);
314
315 Self {
316 registry: Arc::new(registry),
317 http_requests,
318 http_duration,
319 retrieve_latency,
320 commit_duration,
321 ingest_duration,
322 ingest_chunks,
323 remote_fetch_blocks,
324 remote_push_blocks,
325 remote_advance_head,
326 leiden_mode,
327 leiden_debounce_effective,
328 leiden_storm_cap_effective,
329 leiden_delta_ratio_effective,
330 leiden_mode_current,
331 traverse_answer_hard_wall_ms_effective,
332 traverse_answer_max_hops_effective,
333 traverse_answer_hard_wall_exceeded,
334 ppr_size_gate_skipped,
335 ppr_size_gate_threshold,
336 }
337 }
338
339 pub fn encode(&self) -> Result<String, std::fmt::Error> {
348 let mut buf = String::new();
349 encode(&mut buf, &self.registry)?;
350 Ok(buf)
351 }
352}
353
354impl Default for Metrics {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360pub(crate) async fn track_metrics(
368 State(state): State<AppState>,
369 req: Request,
370 next: Next,
371) -> Response {
372 let method = req.method().as_str().to_string();
373 let route = req
378 .extensions()
379 .get::<MatchedPath>()
380 .map_or_else(|| req.uri().path().to_string(), |m| m.as_str().to_string());
381
382 if route == "/metrics" {
385 return next.run(req).await;
386 }
387
388 let start = Instant::now();
389 let response = next.run(req).await;
390 let elapsed = start.elapsed().as_secs_f64();
391
392 let status = response.status().as_u16().to_string();
393 state
394 .metrics
395 .http_requests
396 .get_or_create(&HttpRequestLabels {
397 method,
398 route,
399 status,
400 })
401 .inc();
402 state.metrics.http_duration.observe(elapsed);
403
404 response
405}
406
407pub(crate) async fn metrics_handler(State(state): State<AppState>) -> Response {
410 match state.metrics.encode() {
411 Ok(body) => (
412 StatusCode::OK,
413 [(
414 axum::http::header::CONTENT_TYPE,
415 "text/plain; version=0.0.4; charset=utf-8",
416 )],
417 body,
418 )
419 .into_response(),
420 Err(e) => (
421 StatusCode::INTERNAL_SERVER_ERROR,
422 format!("metrics encoding failure: {e}"),
423 )
424 .into_response(),
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[test]
433 fn metrics_encode_is_well_formed() {
434 let m = Metrics::new();
439 m.http_requests
440 .get_or_create(&HttpRequestLabels {
441 method: "GET".into(),
442 route: "/v1/healthz".into(),
443 status: "200".into(),
444 })
445 .inc();
446 m.http_duration.observe(0.002);
447 m.retrieve_latency.observe(0.015);
448 m.commit_duration.observe(0.050);
449
450 let text = m.encode().expect("encode");
451
452 assert!(
454 text.contains("# HELP mnem_http_requests_total"),
455 "missing HELP for mnem_http_requests_total in:\n{text}"
456 );
457 assert!(
458 text.contains("# TYPE mnem_http_requests_total counter"),
459 "missing TYPE for mnem_http_requests_total"
460 );
461 assert!(
462 text.contains("# HELP mnem_http_request_duration_seconds"),
463 "missing HELP for mnem_http_request_duration_seconds"
464 );
465 assert!(
466 text.contains("# HELP mnem_retrieve_latency_seconds"),
467 "missing HELP for mnem_retrieve_latency_seconds"
468 );
469 assert!(
470 text.contains("# HELP mnem_commit_duration_seconds"),
471 "missing HELP for mnem_commit_duration_seconds"
472 );
473
474 assert!(
476 text.contains("method=\"GET\""),
477 "counter label `method=GET` missing in:\n{text}"
478 );
479 assert!(
480 text.contains("route=\"/v1/healthz\""),
481 "counter label `route=/v1/healthz` missing"
482 );
483 assert!(
484 text.contains("status=\"200\""),
485 "counter label `status=200` missing"
486 );
487 }
488
489 #[test]
490 fn metrics_new_registers_all_four_families() {
491 let m = Metrics::new();
495 let text = m.encode().unwrap();
496 for family in [
497 "mnem_http_requests_total",
498 "mnem_http_request_duration_seconds",
499 "mnem_retrieve_latency_seconds",
500 "mnem_commit_duration_seconds",
501 "mnem_ingest_duration_seconds",
502 "mnem_ingest_chunks_total",
503 ] {
504 assert!(
505 text.contains(family),
506 "expected metric family `{family}` in output:\n{text}"
507 );
508 }
509 }
510
511 #[test]
512 fn metrics_new_registers_all_remote_families() {
513 let m = Metrics::new();
516 let text = m.encode().unwrap();
517 for family in [
518 "mnem_remote_fetch_blocks_total",
519 "mnem_remote_push_blocks_total",
520 "mnem_remote_advance_head_total",
521 ] {
522 assert!(
523 text.contains(family),
524 "expected metric family `{family}` in output:\n{text}"
525 );
526 }
527 }
528
529 #[test]
530 fn remote_counters_increment_and_render() {
531 let m = Metrics::new();
532 m.remote_fetch_blocks.inc();
533 m.remote_push_blocks.inc();
534 m.remote_advance_head
535 .get_or_create(&AdvanceHeadLabels {
536 result: "success".into(),
537 })
538 .inc();
539 m.remote_advance_head
540 .get_or_create(&AdvanceHeadLabels {
541 result: "cas_mismatch".into(),
542 })
543 .inc();
544 m.remote_advance_head
545 .get_or_create(&AdvanceHeadLabels {
546 result: "auth_fail".into(),
547 })
548 .inc();
549 let text = m.encode().unwrap();
550 assert!(text.contains("mnem_remote_fetch_blocks_total"));
551 assert!(text.contains("mnem_remote_push_blocks_total"));
552 for r in ["success", "cas_mismatch", "auth_fail"] {
554 assert!(
555 text.contains(&format!("result=\"{r}\"")),
556 "missing advance-head result `{r}` in:\n{text}"
557 );
558 }
559 }
560}