ralph/webhook/diagnostics/
failure_store.rs1use super::super::{WebhookMessage, WebhookPayload};
20use crate::{fsutil, redaction};
21use anyhow::{Context, Result, anyhow};
22use serde::{Deserialize, Serialize};
23use std::collections::HashSet;
24use std::fs;
25use std::path::{Path, PathBuf};
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Mutex, OnceLock};
28
29const WEBHOOK_FAILURE_STORE_RELATIVE_PATH: &str = ".ralph/cache/webhooks/failures.json";
30const MAX_WEBHOOK_FAILURE_RECORDS: usize = 200;
31const MAX_FAILURE_ERROR_CHARS: usize = 400;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct WebhookFailureRecord {
35 pub id: String,
36 pub failed_at: String,
37 pub event: String,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub task_id: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub destination: Option<String>,
42 pub error: String,
43 pub attempts: u32,
44 pub replay_count: u32,
45 pub payload: WebhookPayload,
46}
47
48static FAILURE_STORE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
49static NEXT_FAILURE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
50
51fn failure_store_lock() -> &'static Mutex<()> {
52 FAILURE_STORE_LOCK.get_or_init(|| Mutex::new(()))
53}
54
55pub fn failure_store_path(repo_root: &Path) -> PathBuf {
56 repo_root.join(WEBHOOK_FAILURE_STORE_RELATIVE_PATH)
57}
58
59pub(super) fn persist_failed_delivery(
60 msg: &WebhookMessage,
61 err: &anyhow::Error,
62 attempts: u32,
63) -> Result<()> {
64 let repo_root = match resolve_repo_root_from_runtime(msg) {
65 Some(path) => path,
66 None => {
67 log::debug!("Unable to resolve repo root for webhook failure persistence");
68 return Ok(());
69 }
70 };
71
72 let path = failure_store_path(&repo_root);
73 persist_failed_delivery_at_path(&path, msg, err, attempts)
74}
75
76pub(super) fn persist_failed_delivery_at_path(
77 path: &Path,
78 msg: &WebhookMessage,
79 err: &anyhow::Error,
80 attempts: u32,
81) -> Result<()> {
82 let _guard = failure_store_lock()
83 .lock()
84 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
85
86 let mut records = load_failure_records_unlocked(path)?;
87 records.push(WebhookFailureRecord {
88 id: next_failure_id(),
89 failed_at: crate::timeutil::now_utc_rfc3339_or_fallback(),
90 event: msg.payload.event.clone(),
91 task_id: msg.payload.task_id.clone(),
92 destination: msg
93 .config
94 .url
95 .as_deref()
96 .map(super::super::worker::redact_webhook_destination),
97 error: sanitize_error(err, msg.config.url.as_deref()),
98 attempts,
99 replay_count: 0,
100 payload: msg.payload.clone(),
101 });
102
103 if records.len() > MAX_WEBHOOK_FAILURE_RECORDS {
104 let retain_from = records.len().saturating_sub(MAX_WEBHOOK_FAILURE_RECORDS);
105 records.drain(..retain_from);
106 }
107
108 write_failure_records_unlocked(path, &records)
109}
110
111pub(super) fn load_failure_records(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
112 let _guard = failure_store_lock()
113 .lock()
114 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
115 load_failure_records_unlocked(path)
116}
117
118#[cfg(test)]
119pub(super) fn write_failure_records(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
120 let _guard = failure_store_lock()
121 .lock()
122 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
123 write_failure_records_unlocked(path, records)
124}
125
126pub(super) fn update_replay_counts(path: &Path, replayed_ids: &[String]) -> Result<()> {
127 let replayed_set = replayed_ids
128 .iter()
129 .map(std::string::String::as_str)
130 .collect::<HashSet<_>>();
131
132 let _guard = failure_store_lock()
133 .lock()
134 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
135 let mut records = load_failure_records_unlocked(path)?;
136 for record in &mut records {
137 if replayed_set.contains(record.id.as_str()) {
138 record.replay_count = record.replay_count.saturating_add(1);
139 }
140 }
141 write_failure_records_unlocked(path, &records)
142}
143
144fn load_failure_records_unlocked(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
145 if !path.exists() {
146 return Ok(Vec::new());
147 }
148
149 let content = fs::read_to_string(path)
150 .with_context(|| format!("read webhook failure store {}", path.display()))?;
151 if content.trim().is_empty() {
152 return Ok(Vec::new());
153 }
154
155 serde_json::from_str::<Vec<WebhookFailureRecord>>(&content)
156 .with_context(|| format!("parse webhook failure store {}", path.display()))
157}
158
159fn write_failure_records_unlocked(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
160 if let Some(parent) = path.parent() {
161 fs::create_dir_all(parent).with_context(|| {
162 format!(
163 "create webhook failure store directory {}",
164 parent.display()
165 )
166 })?;
167 }
168
169 let rendered = serde_json::to_string_pretty(records).context("serialize webhook failures")?;
170 fsutil::write_atomic(path, rendered.as_bytes())
171 .with_context(|| format!("write webhook failure store {}", path.display()))
172}
173
174fn resolve_repo_root_from_runtime(msg: &WebhookMessage) -> Option<PathBuf> {
175 if let Some(repo_root) = msg.payload.context.repo_root.as_deref() {
176 let repo_root = PathBuf::from(repo_root);
177 if repo_root.exists() {
178 return Some(crate::config::find_repo_root(&repo_root));
179 }
180 log::debug!(
181 "webhook payload repo_root does not exist; falling back to current directory: {}",
182 repo_root.display()
183 );
184 }
185
186 let cwd = std::env::current_dir().ok()?;
187 Some(crate::config::find_repo_root(&cwd))
188}
189
190fn next_failure_id() -> String {
191 let nanos = std::time::SystemTime::now()
192 .duration_since(std::time::UNIX_EPOCH)
193 .map(|duration| duration.as_nanos())
194 .unwrap_or(0);
195 let sequence = NEXT_FAILURE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
196 format!("wf-{nanos}-{sequence}")
197}
198
199fn sanitize_error(err: &anyhow::Error, destination_url: Option<&str>) -> String {
200 let mut rendered = err.to_string();
201 if let Some(url) = destination_url {
202 rendered = rendered.replace(url, &super::super::worker::redact_webhook_destination(url));
203 }
204
205 let redacted = redaction::redact_text(&rendered);
206 let trimmed = redacted.trim();
207 if trimmed.chars().count() <= MAX_FAILURE_ERROR_CHARS {
208 return trimmed.to_string();
209 }
210
211 let truncated = trimmed
212 .chars()
213 .take(MAX_FAILURE_ERROR_CHARS)
214 .collect::<String>();
215 format!("{truncated}…")
216}