ralph/webhook/diagnostics/
metrics.rs1use super::super::WebhookMessage;
20use super::failure_store::{
21 WebhookFailureRecord, failure_store_path, load_failure_records, persist_failed_delivery,
22};
23use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
24use anyhow::Result;
25use serde::Serialize;
26use std::path::Path;
27use std::sync::OnceLock;
28use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
29
30#[derive(Debug, Clone, Serialize)]
31pub struct WebhookDiagnostics {
32 pub queue_depth: usize,
33 pub queue_capacity: usize,
34 pub queue_policy: WebhookQueuePolicy,
35 pub enqueued_total: u64,
36 pub delivered_total: u64,
37 pub failed_total: u64,
38 pub dropped_total: u64,
39 pub retry_attempts_total: u64,
40 pub failure_store_path: String,
41 pub recent_failures: Vec<WebhookFailureRecord>,
42}
43
44#[derive(Debug, Default)]
45struct WebhookMetrics {
46 queue_depth: AtomicUsize,
47 queue_capacity: AtomicUsize,
48 enqueued_total: AtomicU64,
49 delivered_total: AtomicU64,
50 failed_total: AtomicU64,
51 dropped_total: AtomicU64,
52 retry_attempts_total: AtomicU64,
53}
54
55static METRICS: OnceLock<WebhookMetrics> = OnceLock::new();
56
57fn metrics() -> &'static WebhookMetrics {
58 METRICS.get_or_init(WebhookMetrics::default)
59}
60
61pub(crate) fn set_queue_capacity(capacity: usize) {
62 metrics().queue_capacity.store(capacity, Ordering::SeqCst);
63}
64
65pub(crate) fn note_queue_dequeue() {
66 let depth = &metrics().queue_depth;
67 let _ = depth.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
68 Some(current.saturating_sub(1))
69 });
70}
71
72pub(crate) fn note_enqueue_success() {
73 let state = metrics();
74 state.enqueued_total.fetch_add(1, Ordering::SeqCst);
75 state.queue_depth.fetch_add(1, Ordering::SeqCst);
76}
77
78pub(crate) fn note_retry_requeue() {
79 metrics().queue_depth.fetch_add(1, Ordering::SeqCst);
80}
81
82pub(crate) fn note_dropped_message() {
83 metrics().dropped_total.fetch_add(1, Ordering::SeqCst);
84}
85
86pub(crate) fn note_retry_attempt() {
87 metrics()
88 .retry_attempts_total
89 .fetch_add(1, Ordering::SeqCst);
90}
91
92pub(crate) fn note_delivery_success() {
93 metrics().delivered_total.fetch_add(1, Ordering::SeqCst);
94}
95
96pub(crate) fn note_delivery_failure(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) {
97 metrics().failed_total.fetch_add(1, Ordering::SeqCst);
98
99 if let Err(write_err) = persist_failed_delivery(msg, err, attempts) {
100 log::warn!("Failed to persist webhook failure record: {write_err:#}");
101 }
102}
103
104pub fn diagnostics_snapshot(
105 repo_root: &Path,
106 config: &WebhookConfig,
107 recent_limit: usize,
108) -> Result<WebhookDiagnostics> {
109 let path = failure_store_path(repo_root);
110 let records = load_failure_records(&path)?;
111 let limit = if recent_limit == 0 {
112 records.len()
113 } else {
114 recent_limit
115 };
116 let recent_failures = records.into_iter().rev().take(limit).collect::<Vec<_>>();
117
118 let state = metrics();
119 let configured_capacity = config
120 .queue_capacity
121 .map(|value| value.clamp(1, 10_000) as usize)
122 .unwrap_or(500);
123 let queue_capacity = match state.queue_capacity.load(Ordering::SeqCst) {
124 0 => configured_capacity,
125 value => value,
126 };
127
128 Ok(WebhookDiagnostics {
129 queue_depth: state.queue_depth.load(Ordering::SeqCst),
130 queue_capacity,
131 queue_policy: config.queue_policy.unwrap_or_default(),
132 enqueued_total: state.enqueued_total.load(Ordering::SeqCst),
133 delivered_total: state.delivered_total.load(Ordering::SeqCst),
134 failed_total: state.failed_total.load(Ordering::SeqCst),
135 dropped_total: state.dropped_total.load(Ordering::SeqCst),
136 retry_attempts_total: state.retry_attempts_total.load(Ordering::SeqCst),
137 failure_store_path: path.display().to_string(),
138 recent_failures,
139 })
140}
141
142#[cfg(test)]
143pub(super) fn reset_metrics_for_tests() {
144 let state = metrics();
145 state.queue_depth.store(0, Ordering::SeqCst);
146 state.queue_capacity.store(0, Ordering::SeqCst);
147 state.enqueued_total.store(0, Ordering::SeqCst);
148 state.delivered_total.store(0, Ordering::SeqCst);
149 state.failed_total.store(0, Ordering::SeqCst);
150 state.dropped_total.store(0, Ordering::SeqCst);
151 state.retry_attempts_total.store(0, Ordering::SeqCst);
152}