Skip to main content

ralph/webhook/diagnostics/
failure_store.rs

1//! Purpose: Persist webhook delivery failures safely and load bounded failure history.
2//!
3//! Responsibilities:
4//! - Store failed delivery records in the repo-local webhook diagnostics cache.
5//! - Enforce bounded retention, replay-count updates, and serialized failure-store access.
6//! - Redact destinations and sanitize persisted errors so secrets never reach disk.
7//!
8//! Scope:
9//! - Failure-history file paths, locking, serialization, redaction, and retention only.
10//!
11//! Usage:
12//! - Called by metrics, replay, and test helper companions behind the diagnostics facade.
13//!
14//! Invariants/Assumptions:
15//! - Failure records never include raw secrets or token-bearing destination URLs.
16//! - Stored history is bounded to the newest 200 failure records.
17//! - Store writes are best-effort for runtime delivery failures and serialized by a process-local lock.
18
19use super::super::{WebhookMessage, WebhookPayload};
20use crate::{fsutil, redaction};
21use anyhow::{Context, Result, anyhow};
22use serde::{Deserialize, Serialize};
23use std::collections::HashSet;
24use std::fs;
25use std::path::{Path, PathBuf};
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Mutex, OnceLock};
28
29const WEBHOOK_FAILURE_STORE_RELATIVE_PATH: &str = ".ralph/cache/webhooks/failures.json";
30const MAX_WEBHOOK_FAILURE_RECORDS: usize = 200;
31const MAX_FAILURE_ERROR_CHARS: usize = 400;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct WebhookFailureRecord {
35    pub id: String,
36    pub failed_at: String,
37    pub event: String,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub task_id: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub destination: Option<String>,
42    pub error: String,
43    pub attempts: u32,
44    pub replay_count: u32,
45    pub payload: WebhookPayload,
46}
47
48static FAILURE_STORE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
49static NEXT_FAILURE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
50
51fn failure_store_lock() -> &'static Mutex<()> {
52    FAILURE_STORE_LOCK.get_or_init(|| Mutex::new(()))
53}
54
55pub fn failure_store_path(repo_root: &Path) -> PathBuf {
56    repo_root.join(WEBHOOK_FAILURE_STORE_RELATIVE_PATH)
57}
58
59pub(super) fn persist_failed_delivery(
60    msg: &WebhookMessage,
61    err: &anyhow::Error,
62    attempts: u32,
63) -> Result<()> {
64    let repo_root = match resolve_repo_root_from_runtime(msg) {
65        Some(path) => path,
66        None => {
67            log::debug!("Unable to resolve repo root for webhook failure persistence");
68            return Ok(());
69        }
70    };
71
72    let path = failure_store_path(&repo_root);
73    persist_failed_delivery_at_path(&path, msg, err, attempts)
74}
75
76pub(super) fn persist_failed_delivery_at_path(
77    path: &Path,
78    msg: &WebhookMessage,
79    err: &anyhow::Error,
80    attempts: u32,
81) -> Result<()> {
82    let _guard = failure_store_lock()
83        .lock()
84        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
85
86    let mut records = load_failure_records_unlocked(path)?;
87    records.push(WebhookFailureRecord {
88        id: next_failure_id(),
89        failed_at: crate::timeutil::now_utc_rfc3339_or_fallback(),
90        event: msg.payload.event.clone(),
91        task_id: msg.payload.task_id.clone(),
92        destination: msg
93            .config
94            .url
95            .as_deref()
96            .map(super::super::worker::redact_webhook_destination),
97        error: sanitize_error(err, msg.config.url.as_deref()),
98        attempts,
99        replay_count: 0,
100        payload: msg.payload.clone(),
101    });
102
103    if records.len() > MAX_WEBHOOK_FAILURE_RECORDS {
104        let retain_from = records.len().saturating_sub(MAX_WEBHOOK_FAILURE_RECORDS);
105        records.drain(..retain_from);
106    }
107
108    write_failure_records_unlocked(path, &records)
109}
110
111pub(super) fn load_failure_records(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
112    let _guard = failure_store_lock()
113        .lock()
114        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
115    load_failure_records_unlocked(path)
116}
117
118#[cfg(test)]
119pub(super) fn write_failure_records(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
120    let _guard = failure_store_lock()
121        .lock()
122        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
123    write_failure_records_unlocked(path, records)
124}
125
126pub(super) fn update_replay_counts(path: &Path, replayed_ids: &[String]) -> Result<()> {
127    let replayed_set = replayed_ids
128        .iter()
129        .map(std::string::String::as_str)
130        .collect::<HashSet<_>>();
131
132    let _guard = failure_store_lock()
133        .lock()
134        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
135    let mut records = load_failure_records_unlocked(path)?;
136    for record in &mut records {
137        if replayed_set.contains(record.id.as_str()) {
138            record.replay_count = record.replay_count.saturating_add(1);
139        }
140    }
141    write_failure_records_unlocked(path, &records)
142}
143
144fn load_failure_records_unlocked(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
145    if !path.exists() {
146        return Ok(Vec::new());
147    }
148
149    let content = fs::read_to_string(path)
150        .with_context(|| format!("read webhook failure store {}", path.display()))?;
151    if content.trim().is_empty() {
152        return Ok(Vec::new());
153    }
154
155    serde_json::from_str::<Vec<WebhookFailureRecord>>(&content)
156        .with_context(|| format!("parse webhook failure store {}", path.display()))
157}
158
159fn write_failure_records_unlocked(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
160    if let Some(parent) = path.parent() {
161        fs::create_dir_all(parent).with_context(|| {
162            format!(
163                "create webhook failure store directory {}",
164                parent.display()
165            )
166        })?;
167    }
168
169    let rendered = serde_json::to_string_pretty(records).context("serialize webhook failures")?;
170    fsutil::write_atomic(path, rendered.as_bytes())
171        .with_context(|| format!("write webhook failure store {}", path.display()))
172}
173
174fn resolve_repo_root_from_runtime(msg: &WebhookMessage) -> Option<PathBuf> {
175    if let Some(repo_root) = msg.payload.context.repo_root.as_deref() {
176        let repo_root = PathBuf::from(repo_root);
177        if repo_root.exists() {
178            return Some(crate::config::find_repo_root(&repo_root));
179        }
180        log::debug!(
181            "webhook payload repo_root does not exist; falling back to current directory: {}",
182            repo_root.display()
183        );
184    }
185
186    let cwd = std::env::current_dir().ok()?;
187    Some(crate::config::find_repo_root(&cwd))
188}
189
190fn next_failure_id() -> String {
191    let nanos = std::time::SystemTime::now()
192        .duration_since(std::time::UNIX_EPOCH)
193        .map(|duration| duration.as_nanos())
194        .unwrap_or(0);
195    let sequence = NEXT_FAILURE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
196    format!("wf-{nanos}-{sequence}")
197}
198
199fn sanitize_error(err: &anyhow::Error, destination_url: Option<&str>) -> String {
200    let mut rendered = err.to_string();
201    if let Some(url) = destination_url {
202        rendered = rendered.replace(url, &super::super::worker::redact_webhook_destination(url));
203    }
204
205    let redacted = redaction::redact_text(&rendered);
206    let trimmed = redacted.trim();
207    if trimmed.chars().count() <= MAX_FAILURE_ERROR_CHARS {
208        return trimmed.to_string();
209    }
210
211    let truncated = trimmed
212        .chars()
213        .take(MAX_FAILURE_ERROR_CHARS)
214        .collect::<String>();
215    format!("{truncated}…")
216}