Skip to main content

ralph/webhook/diagnostics/
failure_store.rs

1//! Purpose: Persist webhook delivery failures safely and load bounded failure history.
2//!
3//! Responsibilities:
4//! - Store failed delivery records in the repo-local webhook diagnostics cache.
5//! - Enforce bounded retention, replay-count updates, and serialized failure-store access.
6//! - Redact destinations and sanitize persisted errors so secrets never reach disk.
7//!
8//! Scope:
9//! - Failure-history file paths, locking, serialization, redaction, and retention only.
10//!
11//! Usage:
12//! - Called by metrics, replay, and test helper companions behind the diagnostics facade.
13//!
14//! Invariants/Assumptions:
15//! - Failure records never include raw secrets or token-bearing destination URLs.
16//! - Stored history is bounded to the newest 200 failure records.
17//! - Store writes are best-effort for runtime delivery failures and serialized by
18//!   cross-process locking around each read-modify-write mutation.
19
20use super::super::{WebhookMessage, WebhookPayload};
21use crate::{fsutil, redaction};
22use anyhow::{Context, Result, anyhow};
23use serde::{Deserialize, Serialize};
24use std::collections::HashSet;
25use std::fs;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::{Mutex, OnceLock};
29use std::time::{Duration, Instant};
30
31const WEBHOOK_FAILURE_STORE_RELATIVE_PATH: &str = ".ralph/cache/webhooks/failures.json";
32const MAX_WEBHOOK_FAILURE_RECORDS: usize = 200;
33const MAX_FAILURE_ERROR_CHARS: usize = 400;
34const FAILURE_STORE_LOCK_LABEL: &str = "webhook failure store";
35const FAILURE_STORE_LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
36const FAILURE_STORE_LOCK_WAIT_SLICE: Duration = Duration::from_millis(25);
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct WebhookFailureRecord {
40    pub id: String,
41    pub failed_at: String,
42    pub event: String,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub task_id: Option<String>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub destination: Option<String>,
47    pub error: String,
48    pub attempts: u32,
49    pub replay_count: u32,
50    pub payload: WebhookPayload,
51}
52
53static FAILURE_STORE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
54static NEXT_FAILURE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
55
56fn failure_store_lock() -> &'static Mutex<()> {
57    FAILURE_STORE_LOCK.get_or_init(|| Mutex::new(()))
58}
59
60pub fn failure_store_path(repo_root: &Path) -> PathBuf {
61    repo_root.join(WEBHOOK_FAILURE_STORE_RELATIVE_PATH)
62}
63
64fn failure_store_lock_dir(path: &Path) -> PathBuf {
65    path.with_extension("lock")
66}
67
68pub(super) fn persist_failed_delivery(
69    msg: &WebhookMessage,
70    err: &anyhow::Error,
71    attempts: u32,
72) -> Result<()> {
73    let repo_root = match resolve_repo_root_from_runtime(msg) {
74        Some(path) => path,
75        None => {
76            log::debug!("Unable to resolve repo root for webhook failure persistence");
77            return Ok(());
78        }
79    };
80
81    let path = failure_store_path(&repo_root);
82    persist_failed_delivery_at_path(&path, msg, err, attempts)
83}
84
85pub(super) fn persist_failed_delivery_at_path(
86    path: &Path,
87    msg: &WebhookMessage,
88    err: &anyhow::Error,
89    attempts: u32,
90) -> Result<()> {
91    let _guard = failure_store_lock()
92        .lock()
93        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
94    let _file_lock = acquire_failure_store_lock(path)?;
95
96    let mut records = load_failure_records_unlocked(path)?;
97    maybe_pause_failure_store_mutation_for_tests();
98    records.push(WebhookFailureRecord {
99        id: next_failure_id(),
100        failed_at: crate::timeutil::now_utc_rfc3339_or_fallback(),
101        event: msg.payload.event.clone(),
102        task_id: msg.payload.task_id.clone(),
103        destination: msg
104            .config
105            .url
106            .as_deref()
107            .map(super::super::worker::redact_webhook_destination),
108        error: sanitize_error(err, msg.config.url.as_deref()),
109        attempts,
110        replay_count: 0,
111        payload: msg.payload.clone(),
112    });
113
114    if records.len() > MAX_WEBHOOK_FAILURE_RECORDS {
115        let retain_from = records.len().saturating_sub(MAX_WEBHOOK_FAILURE_RECORDS);
116        records.drain(..retain_from);
117    }
118
119    write_failure_records_unlocked(path, &records)
120}
121
122pub(super) fn load_failure_records(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
123    let _guard = failure_store_lock()
124        .lock()
125        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
126    load_failure_records_unlocked(path)
127}
128
129#[cfg(test)]
130pub(super) fn write_failure_records(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
131    let _guard = failure_store_lock()
132        .lock()
133        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
134    let _file_lock = acquire_failure_store_lock(path)?;
135    write_failure_records_unlocked(path, records)
136}
137
138pub(super) fn update_replay_counts(path: &Path, replayed_ids: &[String]) -> Result<()> {
139    let replayed_set = replayed_ids
140        .iter()
141        .map(std::string::String::as_str)
142        .collect::<HashSet<_>>();
143
144    let _guard = failure_store_lock()
145        .lock()
146        .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
147    let _file_lock = acquire_failure_store_lock(path)?;
148    let mut records = load_failure_records_unlocked(path)?;
149    maybe_pause_failure_store_mutation_for_tests();
150    for record in &mut records {
151        if replayed_set.contains(record.id.as_str()) {
152            record.replay_count = record.replay_count.saturating_add(1);
153        }
154    }
155    write_failure_records_unlocked(path, &records)
156}
157
158fn load_failure_records_unlocked(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
159    if !path.exists() {
160        return Ok(Vec::new());
161    }
162
163    let content = fs::read_to_string(path)
164        .with_context(|| format!("read webhook failure store {}", path.display()))?;
165    if content.trim().is_empty() {
166        return Ok(Vec::new());
167    }
168
169    serde_json::from_str::<Vec<WebhookFailureRecord>>(&content)
170        .with_context(|| format!("parse webhook failure store {}", path.display()))
171}
172
173fn write_failure_records_unlocked(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
174    if let Some(parent) = path.parent() {
175        fs::create_dir_all(parent).with_context(|| {
176            format!(
177                "create webhook failure store directory {}",
178                parent.display()
179            )
180        })?;
181    }
182
183    let rendered = serde_json::to_string_pretty(records).context("serialize webhook failures")?;
184    fsutil::write_atomic(path, rendered.as_bytes())
185        .with_context(|| format!("write webhook failure store {}", path.display()))
186}
187
188fn acquire_failure_store_lock(path: &Path) -> Result<crate::lock::DirLock> {
189    let lock_dir = failure_store_lock_dir(path);
190    let deadline = Instant::now() + FAILURE_STORE_LOCK_WAIT_TIMEOUT;
191
192    loop {
193        match crate::lock::acquire_dir_lock(&lock_dir, FAILURE_STORE_LOCK_LABEL, true) {
194            Ok(lock) => return Ok(lock),
195            Err(error) if Instant::now() < deadline => {
196                log::debug!(
197                    "waiting for webhook failure store lock {}: {}",
198                    lock_dir.display(),
199                    error
200                );
201                std::thread::sleep(FAILURE_STORE_LOCK_WAIT_SLICE);
202            }
203            Err(error) => {
204                return Err(error).with_context(|| {
205                    format!(
206                        "acquire webhook failure store lock {} within {}ms",
207                        lock_dir.display(),
208                        FAILURE_STORE_LOCK_WAIT_TIMEOUT.as_millis()
209                    )
210                });
211            }
212        }
213    }
214}
215
216fn resolve_repo_root_from_runtime(msg: &WebhookMessage) -> Option<PathBuf> {
217    if let Some(repo_root) = msg.payload.context.repo_root.as_deref() {
218        let repo_root = PathBuf::from(repo_root);
219        if repo_root.exists() {
220            return Some(crate::config::find_repo_root(&repo_root));
221        }
222        log::debug!(
223            "webhook payload repo_root does not exist; falling back to current directory: {}",
224            repo_root.display()
225        );
226    }
227
228    let cwd = std::env::current_dir().ok()?;
229    Some(crate::config::find_repo_root(&cwd))
230}
231
232fn next_failure_id() -> String {
233    let nanos = std::time::SystemTime::now()
234        .duration_since(std::time::UNIX_EPOCH)
235        .map(|duration| duration.as_nanos())
236        .unwrap_or(0);
237    let sequence = NEXT_FAILURE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
238    format!("wf-{nanos}-{sequence}")
239}
240
241fn sanitize_error(err: &anyhow::Error, destination_url: Option<&str>) -> String {
242    let mut rendered = err.to_string();
243    if let Some(url) = destination_url {
244        rendered = rendered.replace(url, &super::super::worker::redact_webhook_destination(url));
245    }
246
247    let redacted = redaction::redact_text(&rendered);
248    let trimmed = redacted.trim();
249    if trimmed.chars().count() <= MAX_FAILURE_ERROR_CHARS {
250        return trimmed.to_string();
251    }
252
253    let truncated = trimmed
254        .chars()
255        .take(MAX_FAILURE_ERROR_CHARS)
256        .collect::<String>();
257    format!("{truncated}…")
258}
259
260#[cfg(test)]
261fn maybe_pause_failure_store_mutation_for_tests() {
262    let delay_ms = std::env::var("RALPH_TEST_WEBHOOK_FAILURE_STORE_DELAY_MS")
263        .ok()
264        .and_then(|value| value.parse::<u64>().ok())
265        .filter(|delay_ms| *delay_ms > 0);
266
267    if let Some(delay_ms) = delay_ms {
268        std::thread::sleep(Duration::from_millis(delay_ms));
269    }
270}
271
272#[cfg(not(test))]
273fn maybe_pause_failure_store_mutation_for_tests() {}