Skip to main content

ralph/webhook/diagnostics/
metrics.rs

1//! Purpose: Track in-process webhook delivery metrics and build operator diagnostics snapshots.
2//!
3//! Responsibilities:
4//! - Own atomic queue/delivery counters for the webhook runtime.
5//! - Persist exhausted delivery failures through the failure-store companion.
6//! - Build diagnostics snapshots from runtime metrics plus persisted failure history.
7//!
8//! Scope:
9//! - Runtime counters and snapshot assembly only.
10//!
11//! Usage:
12//! - Called by webhook worker runtime/delivery/enqueue code and CLI diagnostics commands.
13//!
14//! Invariants/Assumptions:
15//! - Queue capacity reflects runtime state when available, otherwise a clamped config fallback.
16//! - Failed-delivery persistence is best-effort and must not panic the hot path.
17//! - Counter mutations remain lock-free via atomics.
18
19use super::super::WebhookMessage;
20use super::failure_store::{
21    WebhookFailureRecord, failure_store_path, load_failure_records, persist_failed_delivery,
22};
23use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
24use anyhow::Result;
25use serde::Serialize;
26use std::path::Path;
27use std::sync::OnceLock;
28use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
29
30#[derive(Debug, Clone, Serialize)]
31pub struct WebhookDiagnostics {
32    pub queue_depth: usize,
33    pub queue_capacity: usize,
34    pub queue_policy: WebhookQueuePolicy,
35    pub enqueued_total: u64,
36    pub delivered_total: u64,
37    pub failed_total: u64,
38    pub dropped_total: u64,
39    pub retry_attempts_total: u64,
40    pub failure_store_path: String,
41    pub recent_failures: Vec<WebhookFailureRecord>,
42}
43
44#[derive(Debug, Default)]
45struct WebhookMetrics {
46    queue_depth: AtomicUsize,
47    queue_capacity: AtomicUsize,
48    enqueued_total: AtomicU64,
49    delivered_total: AtomicU64,
50    failed_total: AtomicU64,
51    dropped_total: AtomicU64,
52    retry_attempts_total: AtomicU64,
53}
54
55static METRICS: OnceLock<WebhookMetrics> = OnceLock::new();
56
57fn metrics() -> &'static WebhookMetrics {
58    METRICS.get_or_init(WebhookMetrics::default)
59}
60
61pub(crate) fn set_queue_capacity(capacity: usize) {
62    metrics().queue_capacity.store(capacity, Ordering::SeqCst);
63}
64
65pub(crate) fn note_queue_dequeue() {
66    let depth = &metrics().queue_depth;
67    let _ = depth.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
68        Some(current.saturating_sub(1))
69    });
70}
71
72pub(crate) fn note_enqueue_success() {
73    let state = metrics();
74    state.enqueued_total.fetch_add(1, Ordering::SeqCst);
75    state.queue_depth.fetch_add(1, Ordering::SeqCst);
76}
77
78pub(crate) fn note_retry_requeue() {
79    metrics().queue_depth.fetch_add(1, Ordering::SeqCst);
80}
81
82pub(crate) fn note_dropped_message() {
83    metrics().dropped_total.fetch_add(1, Ordering::SeqCst);
84}
85
86pub(crate) fn note_retry_attempt() {
87    metrics()
88        .retry_attempts_total
89        .fetch_add(1, Ordering::SeqCst);
90}
91
92pub(crate) fn note_delivery_success() {
93    metrics().delivered_total.fetch_add(1, Ordering::SeqCst);
94}
95
96pub(crate) fn note_delivery_failure(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) {
97    metrics().failed_total.fetch_add(1, Ordering::SeqCst);
98
99    if let Err(write_err) = persist_failed_delivery(msg, err, attempts) {
100        log::warn!("Failed to persist webhook failure record: {write_err:#}");
101    }
102}
103
104pub fn diagnostics_snapshot(
105    repo_root: &Path,
106    config: &WebhookConfig,
107    recent_limit: usize,
108) -> Result<WebhookDiagnostics> {
109    let path = failure_store_path(repo_root);
110    let records = load_failure_records(&path)?;
111    let limit = if recent_limit == 0 {
112        records.len()
113    } else {
114        recent_limit
115    };
116    let recent_failures = records.into_iter().rev().take(limit).collect::<Vec<_>>();
117
118    let state = metrics();
119    let configured_capacity = config
120        .queue_capacity
121        .map(|value| value.clamp(1, 10_000) as usize)
122        .unwrap_or(500);
123    let queue_capacity = match state.queue_capacity.load(Ordering::SeqCst) {
124        0 => configured_capacity,
125        value => value,
126    };
127
128    Ok(WebhookDiagnostics {
129        queue_depth: state.queue_depth.load(Ordering::SeqCst),
130        queue_capacity,
131        queue_policy: config.queue_policy.unwrap_or_default(),
132        enqueued_total: state.enqueued_total.load(Ordering::SeqCst),
133        delivered_total: state.delivered_total.load(Ordering::SeqCst),
134        failed_total: state.failed_total.load(Ordering::SeqCst),
135        dropped_total: state.dropped_total.load(Ordering::SeqCst),
136        retry_attempts_total: state.retry_attempts_total.load(Ordering::SeqCst),
137        failure_store_path: path.display().to_string(),
138        recent_failures,
139    })
140}
141
142#[cfg(test)]
143pub(super) fn reset_metrics_for_tests() {
144    let state = metrics();
145    state.queue_depth.store(0, Ordering::SeqCst);
146    state.queue_capacity.store(0, Ordering::SeqCst);
147    state.enqueued_total.store(0, Ordering::SeqCst);
148    state.delivered_total.store(0, Ordering::SeqCst);
149    state.failed_total.store(0, Ordering::SeqCst);
150    state.dropped_total.store(0, Ordering::SeqCst);
151    state.retry_attempts_total.store(0, Ordering::SeqCst);
152}