Skip to main content

ralph/webhook/
diagnostics.rs

1//! Webhook diagnostics and replay support.
2//!
3//! Responsibilities:
4//! - Track in-process webhook delivery metrics (queue depth, totals, retries).
5//! - Persist failed delivery records to a bounded repo-local failure store.
6//! - Provide status snapshots and bounded replay helpers for CLI commands.
7//!
8//! Does NOT handle:
9//! - Guaranteed delivery semantics across process restarts.
10//! - External observability backends or remote metric exports.
11//! - Authentication/endpoint registration management.
12//!
13//! Invariants/assumptions:
14//! - Failure store writes are best-effort for runtime delivery failures.
15//! - Failure records never include webhook secrets/headers.
16//! - Replay is always explicit and bounded by caller-provided selectors/caps.
17
18use super::{
19    WebhookMessage, WebhookPayload, enqueue_webhook_payload_for_replay, resolve_webhook_config,
20};
21use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
22use crate::fsutil;
23use crate::redaction;
24use anyhow::{Context, Result, anyhow, bail};
25use serde::{Deserialize, Serialize};
26use std::collections::HashSet;
27use std::fs;
28use std::path::{Path, PathBuf};
29use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
30use std::sync::{Mutex, OnceLock};
31
32const WEBHOOK_FAILURE_STORE_RELATIVE_PATH: &str = ".ralph/cache/webhooks/failures.json";
33const MAX_WEBHOOK_FAILURE_RECORDS: usize = 200;
34const MAX_FAILURE_ERROR_CHARS: usize = 400;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct WebhookFailureRecord {
38    pub id: String,
39    pub failed_at: String,
40    pub event: String,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub task_id: Option<String>,
43    pub error: String,
44    pub attempts: u32,
45    pub replay_count: u32,
46    pub payload: WebhookPayload,
47}
48
49#[derive(Debug, Clone, Serialize)]
50pub struct WebhookDiagnostics {
51    pub queue_depth: usize,
52    pub queue_capacity: usize,
53    pub queue_policy: WebhookQueuePolicy,
54    pub enqueued_total: u64,
55    pub delivered_total: u64,
56    pub failed_total: u64,
57    pub dropped_total: u64,
58    pub retry_attempts_total: u64,
59    pub failure_store_path: String,
60    pub recent_failures: Vec<WebhookFailureRecord>,
61}
62
63#[derive(Debug, Clone)]
64pub struct ReplaySelector {
65    pub ids: Vec<String>,
66    pub event: Option<String>,
67    pub task_id: Option<String>,
68    pub limit: usize,
69    pub max_replay_attempts: u32,
70}
71
72#[derive(Debug, Clone, Serialize)]
73pub struct ReplayCandidate {
74    pub id: String,
75    pub event: String,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub task_id: Option<String>,
78    pub failed_at: String,
79    pub attempts: u32,
80    pub replay_count: u32,
81    pub error: String,
82    pub eligible_for_replay: bool,
83}
84
85#[derive(Debug, Clone, Serialize)]
86pub struct ReplayReport {
87    pub dry_run: bool,
88    pub matched_count: usize,
89    pub eligible_count: usize,
90    pub replayed_count: usize,
91    pub skipped_max_replay_attempts: usize,
92    pub skipped_enqueue_failures: usize,
93    pub candidates: Vec<ReplayCandidate>,
94}
95
96#[derive(Debug, Clone)]
97struct SelectedReplayRecord {
98    id: String,
99    replay_count: u32,
100    payload: WebhookPayload,
101}
102
103#[derive(Debug, Default)]
104struct WebhookMetrics {
105    queue_depth: AtomicUsize,
106    queue_capacity: AtomicUsize,
107    enqueued_total: AtomicU64,
108    delivered_total: AtomicU64,
109    failed_total: AtomicU64,
110    dropped_total: AtomicU64,
111    retry_attempts_total: AtomicU64,
112}
113
114static METRICS: OnceLock<WebhookMetrics> = OnceLock::new();
115static FAILURE_STORE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
116static NEXT_FAILURE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
117
118fn metrics() -> &'static WebhookMetrics {
119    METRICS.get_or_init(WebhookMetrics::default)
120}
121
122fn failure_store_lock() -> &'static Mutex<()> {
123    FAILURE_STORE_LOCK.get_or_init(|| Mutex::new(()))
124}
125
126pub fn set_queue_capacity(capacity: usize) {
127    metrics().queue_capacity.store(capacity, Ordering::SeqCst);
128}
129
130pub fn note_queue_dequeue() {
131    let depth = &metrics().queue_depth;
132    let _ = depth.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
133        Some(current.saturating_sub(1))
134    });
135}
136
137pub fn note_enqueue_success() {
138    let state = metrics();
139    state.enqueued_total.fetch_add(1, Ordering::SeqCst);
140    state.queue_depth.fetch_add(1, Ordering::SeqCst);
141}
142
143pub fn note_dropped_message() {
144    metrics().dropped_total.fetch_add(1, Ordering::SeqCst);
145}
146
147pub fn note_retry_attempt() {
148    metrics()
149        .retry_attempts_total
150        .fetch_add(1, Ordering::SeqCst);
151}
152
153pub fn note_delivery_success() {
154    metrics().delivered_total.fetch_add(1, Ordering::SeqCst);
155}
156
157pub fn note_delivery_failure(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) {
158    metrics().failed_total.fetch_add(1, Ordering::SeqCst);
159
160    if let Err(write_err) = persist_failed_delivery(msg, err, attempts) {
161        log::warn!("Failed to persist webhook failure record: {write_err:#}");
162    }
163}
164
165pub fn failure_store_path(repo_root: &Path) -> PathBuf {
166    repo_root.join(WEBHOOK_FAILURE_STORE_RELATIVE_PATH)
167}
168
169pub fn diagnostics_snapshot(
170    repo_root: &Path,
171    config: &WebhookConfig,
172    recent_limit: usize,
173) -> Result<WebhookDiagnostics> {
174    let path = failure_store_path(repo_root);
175    let records = load_failure_records(&path)?;
176    let limit = if recent_limit == 0 {
177        records.len()
178    } else {
179        recent_limit
180    };
181    let recent_failures = records.into_iter().rev().take(limit).collect::<Vec<_>>();
182
183    let state = metrics();
184    let configured_capacity = config
185        .queue_capacity
186        .map(|value| value.clamp(1, 10000) as usize)
187        .unwrap_or(500);
188    let queue_capacity = match state.queue_capacity.load(Ordering::SeqCst) {
189        0 => configured_capacity,
190        value => value,
191    };
192
193    Ok(WebhookDiagnostics {
194        queue_depth: state.queue_depth.load(Ordering::SeqCst),
195        queue_capacity,
196        queue_policy: config.queue_policy.unwrap_or_default(),
197        enqueued_total: state.enqueued_total.load(Ordering::SeqCst),
198        delivered_total: state.delivered_total.load(Ordering::SeqCst),
199        failed_total: state.failed_total.load(Ordering::SeqCst),
200        dropped_total: state.dropped_total.load(Ordering::SeqCst),
201        retry_attempts_total: state.retry_attempts_total.load(Ordering::SeqCst),
202        failure_store_path: path.display().to_string(),
203        recent_failures,
204    })
205}
206
207pub fn replay_failed_deliveries(
208    repo_root: &Path,
209    config: &WebhookConfig,
210    selector: &ReplaySelector,
211    dry_run: bool,
212) -> Result<ReplayReport> {
213    if selector.ids.is_empty() && selector.event.is_none() && selector.task_id.is_none() {
214        bail!(
215            "refusing unbounded replay (would redeliver all failures)\n\
216             this could overwhelm external systems or re-trigger side effects\n\
217             \n\
218             examples:\n\
219             \n\
220             ralph webhook replay --id <failure-id>     # replay specific failure\n\
221             ralph webhook replay --event task.done     # replay all task.done failures\n\
222             ralph webhook replay --task-id RQ-0001     # replay failures for a task\n\
223             ralph webhook replay --id a,b,c --limit 5  # replay up to 5 specific failures"
224        );
225    }
226
227    if selector.max_replay_attempts == 0 {
228        bail!("max_replay_attempts must be greater than 0");
229    }
230
231    if !dry_run {
232        let resolved = resolve_webhook_config(config);
233        if !resolved.enabled {
234            bail!("Webhook replay requires agent.webhook.enabled=true");
235        }
236        if resolved
237            .url
238            .as_deref()
239            .is_none_or(|url| url.trim().is_empty())
240        {
241            bail!("Webhook replay requires agent.webhook.url to be configured");
242        }
243    }
244
245    let path = failure_store_path(repo_root);
246
247    let limit = if selector.limit == 0 {
248        usize::MAX
249    } else {
250        selector.limit
251    };
252
253    let id_filter = selector
254        .ids
255        .iter()
256        .map(std::string::String::as_str)
257        .collect::<HashSet<_>>();
258
259    let (selected_records, candidates) = {
260        let _guard = failure_store_lock()
261            .lock()
262            .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
263        let records = load_failure_records_unlocked(&path)?;
264
265        let mut selected_records = Vec::new();
266        let mut candidates = Vec::new();
267
268        for record in records.iter().rev() {
269            if selected_records.len() >= limit {
270                break;
271            }
272            if !id_filter.is_empty() && !id_filter.contains(record.id.as_str()) {
273                continue;
274            }
275            if let Some(event_filter) = selector.event.as_deref()
276                && record.event != event_filter
277            {
278                continue;
279            }
280            if let Some(task_filter) = selector.task_id.as_deref()
281                && record.task_id.as_deref() != Some(task_filter)
282            {
283                continue;
284            }
285
286            let eligible = record.replay_count < selector.max_replay_attempts;
287            candidates.push(ReplayCandidate {
288                id: record.id.clone(),
289                event: record.event.clone(),
290                task_id: record.task_id.clone(),
291                failed_at: record.failed_at.clone(),
292                attempts: record.attempts,
293                replay_count: record.replay_count,
294                error: record.error.clone(),
295                eligible_for_replay: eligible,
296            });
297            selected_records.push(SelectedReplayRecord {
298                id: record.id.clone(),
299                replay_count: record.replay_count,
300                payload: record.payload.clone(),
301            });
302        }
303
304        (selected_records, candidates)
305    };
306
307    let matched_count = candidates.len();
308    let eligible_count = candidates
309        .iter()
310        .filter(|candidate| candidate.eligible_for_replay)
311        .count();
312
313    if dry_run {
314        return Ok(ReplayReport {
315            dry_run,
316            matched_count,
317            eligible_count,
318            replayed_count: 0,
319            skipped_max_replay_attempts: matched_count.saturating_sub(eligible_count),
320            skipped_enqueue_failures: 0,
321            candidates,
322        });
323    }
324
325    let mut replayed_count = 0usize;
326    let mut skipped_max_replay_attempts = 0usize;
327    let mut skipped_enqueue_failures = 0usize;
328    let mut replayed_ids = Vec::new();
329
330    for record in selected_records {
331        if record.replay_count >= selector.max_replay_attempts {
332            skipped_max_replay_attempts += 1;
333            continue;
334        }
335
336        if enqueue_webhook_payload_for_replay(record.payload, config) {
337            replayed_ids.push(record.id);
338            replayed_count += 1;
339        } else {
340            skipped_enqueue_failures += 1;
341        }
342    }
343
344    if !replayed_ids.is_empty() {
345        update_replay_counts(&path, &replayed_ids)?;
346    }
347
348    Ok(ReplayReport {
349        dry_run,
350        matched_count,
351        eligible_count,
352        replayed_count,
353        skipped_max_replay_attempts,
354        skipped_enqueue_failures,
355        candidates,
356    })
357}
358
359fn update_replay_counts(path: &Path, replayed_ids: &[String]) -> Result<()> {
360    let replayed_set = replayed_ids
361        .iter()
362        .map(std::string::String::as_str)
363        .collect::<HashSet<_>>();
364
365    let _guard = failure_store_lock()
366        .lock()
367        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
368    let mut records = load_failure_records_unlocked(path)?;
369    for record in &mut records {
370        if replayed_set.contains(record.id.as_str()) {
371            record.replay_count = record.replay_count.saturating_add(1);
372        }
373    }
374    write_failure_records_unlocked(path, &records)
375}
376
377fn persist_failed_delivery(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) -> Result<()> {
378    let repo_root = match resolve_repo_root_from_runtime(msg) {
379        Some(path) => path,
380        None => {
381            log::debug!("Unable to resolve repo root for webhook failure persistence");
382            return Ok(());
383        }
384    };
385
386    let path = failure_store_path(&repo_root);
387    persist_failed_delivery_at_path(&path, msg, err, attempts)
388}
389
390fn persist_failed_delivery_at_path(
391    path: &Path,
392    msg: &WebhookMessage,
393    err: &anyhow::Error,
394    attempts: u32,
395) -> Result<()> {
396    let _guard = failure_store_lock()
397        .lock()
398        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
399
400    let mut records = load_failure_records_unlocked(path)?;
401    records.push(WebhookFailureRecord {
402        id: next_failure_id(),
403        failed_at: crate::timeutil::now_utc_rfc3339_or_fallback(),
404        event: msg.payload.event.clone(),
405        task_id: msg.payload.task_id.clone(),
406        error: sanitize_error(err),
407        attempts,
408        replay_count: 0,
409        payload: msg.payload.clone(),
410    });
411
412    if records.len() > MAX_WEBHOOK_FAILURE_RECORDS {
413        let retain_from = records.len().saturating_sub(MAX_WEBHOOK_FAILURE_RECORDS);
414        records.drain(..retain_from);
415    }
416
417    write_failure_records_unlocked(path, &records)
418}
419
420fn load_failure_records(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
421    let _guard = failure_store_lock()
422        .lock()
423        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
424    load_failure_records_unlocked(path)
425}
426
427fn load_failure_records_unlocked(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
428    if !path.exists() {
429        return Ok(Vec::new());
430    }
431
432    let content = fs::read_to_string(path)
433        .with_context(|| format!("read webhook failure store {}", path.display()))?;
434    if content.trim().is_empty() {
435        return Ok(Vec::new());
436    }
437
438    serde_json::from_str::<Vec<WebhookFailureRecord>>(&content)
439        .with_context(|| format!("parse webhook failure store {}", path.display()))
440}
441
442fn write_failure_records_unlocked(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
443    if let Some(parent) = path.parent() {
444        fs::create_dir_all(parent).with_context(|| {
445            format!(
446                "create webhook failure store directory {}",
447                parent.display()
448            )
449        })?;
450    }
451
452    let rendered = serde_json::to_string_pretty(records).context("serialize webhook failures")?;
453    fsutil::write_atomic(path, rendered.as_bytes())
454        .with_context(|| format!("write webhook failure store {}", path.display()))
455}
456
457fn resolve_repo_root_from_runtime(msg: &WebhookMessage) -> Option<PathBuf> {
458    if let Some(repo_root) = msg.payload.context.repo_root.as_deref() {
459        let repo_root = PathBuf::from(repo_root);
460        if repo_root.exists() {
461            return Some(crate::config::find_repo_root(&repo_root));
462        }
463        log::debug!(
464            "webhook payload repo_root does not exist; falling back to current directory: {}",
465            repo_root.display()
466        );
467    }
468
469    let cwd = std::env::current_dir().ok()?;
470    Some(crate::config::find_repo_root(&cwd))
471}
472
473fn next_failure_id() -> String {
474    let nanos = std::time::SystemTime::now()
475        .duration_since(std::time::UNIX_EPOCH)
476        .map(|duration| duration.as_nanos())
477        .unwrap_or(0);
478    let sequence = NEXT_FAILURE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
479    format!("wf-{nanos}-{sequence}")
480}
481
482fn sanitize_error(err: &anyhow::Error) -> String {
483    let redacted = redaction::redact_text(&err.to_string());
484    let trimmed = redacted.trim();
485    if trimmed.chars().count() <= MAX_FAILURE_ERROR_CHARS {
486        return trimmed.to_string();
487    }
488
489    let truncated = trimmed
490        .chars()
491        .take(MAX_FAILURE_ERROR_CHARS)
492        .collect::<String>();
493    format!("{truncated}…")
494}
495
496#[cfg(test)]
497pub(crate) fn write_failure_records_for_tests(
498    repo_root: &Path,
499    records: &[WebhookFailureRecord],
500) -> Result<()> {
501    let path = failure_store_path(repo_root);
502    let _guard = failure_store_lock()
503        .lock()
504        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
505    write_failure_records_unlocked(&path, records)
506}
507
508#[cfg(test)]
509pub(crate) fn load_failure_records_for_tests(
510    repo_root: &Path,
511) -> Result<Vec<WebhookFailureRecord>> {
512    let path = failure_store_path(repo_root);
513    load_failure_records(&path)
514}
515
516#[cfg(test)]
517pub(crate) fn persist_failed_delivery_for_tests(
518    repo_root: &Path,
519    msg: &WebhookMessage,
520    err: &anyhow::Error,
521    attempts: u32,
522) -> Result<()> {
523    let path = failure_store_path(repo_root);
524    persist_failed_delivery_at_path(&path, msg, err, attempts)
525}
526
527#[cfg(test)]
528pub(crate) fn reset_webhook_metrics_for_tests() {
529    let state = metrics();
530    state.queue_depth.store(0, Ordering::SeqCst);
531    state.queue_capacity.store(0, Ordering::SeqCst);
532    state.enqueued_total.store(0, Ordering::SeqCst);
533    state.delivered_total.store(0, Ordering::SeqCst);
534    state.failed_total.store(0, Ordering::SeqCst);
535    state.dropped_total.store(0, Ordering::SeqCst);
536    state.retry_attempts_total.store(0, Ordering::SeqCst);
537}