ralph/webhook/diagnostics/
failure_store.rs1use super::super::{WebhookMessage, WebhookPayload};
21use crate::{fsutil, redaction};
22use anyhow::{Context, Result, anyhow};
23use serde::{Deserialize, Serialize};
24use std::collections::HashSet;
25use std::fs;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::{Mutex, OnceLock};
29use std::time::{Duration, Instant};
30
31const WEBHOOK_FAILURE_STORE_RELATIVE_PATH: &str = ".ralph/cache/webhooks/failures.json";
32const MAX_WEBHOOK_FAILURE_RECORDS: usize = 200;
33const MAX_FAILURE_ERROR_CHARS: usize = 400;
34const FAILURE_STORE_LOCK_LABEL: &str = "webhook failure store";
35const FAILURE_STORE_LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
36const FAILURE_STORE_LOCK_WAIT_SLICE: Duration = Duration::from_millis(25);
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct WebhookFailureRecord {
40 pub id: String,
41 pub failed_at: String,
42 pub event: String,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub task_id: Option<String>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub destination: Option<String>,
47 pub error: String,
48 pub attempts: u32,
49 pub replay_count: u32,
50 pub payload: WebhookPayload,
51}
52
53static FAILURE_STORE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
54static NEXT_FAILURE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
55
56fn failure_store_lock() -> &'static Mutex<()> {
57 FAILURE_STORE_LOCK.get_or_init(|| Mutex::new(()))
58}
59
60pub fn failure_store_path(repo_root: &Path) -> PathBuf {
61 repo_root.join(WEBHOOK_FAILURE_STORE_RELATIVE_PATH)
62}
63
64fn failure_store_lock_dir(path: &Path) -> PathBuf {
65 path.with_extension("lock")
66}
67
68pub(super) fn persist_failed_delivery(
69 msg: &WebhookMessage,
70 err: &anyhow::Error,
71 attempts: u32,
72) -> Result<()> {
73 let repo_root = match resolve_repo_root_from_runtime(msg) {
74 Some(path) => path,
75 None => {
76 log::debug!("Unable to resolve repo root for webhook failure persistence");
77 return Ok(());
78 }
79 };
80
81 let path = failure_store_path(&repo_root);
82 persist_failed_delivery_at_path(&path, msg, err, attempts)
83}
84
85pub(super) fn persist_failed_delivery_at_path(
86 path: &Path,
87 msg: &WebhookMessage,
88 err: &anyhow::Error,
89 attempts: u32,
90) -> Result<()> {
91 let _guard = failure_store_lock()
92 .lock()
93 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
94 let _file_lock = acquire_failure_store_lock(path)?;
95
96 let mut records = load_failure_records_unlocked(path)?;
97 maybe_pause_failure_store_mutation_for_tests();
98 records.push(WebhookFailureRecord {
99 id: next_failure_id(),
100 failed_at: crate::timeutil::now_utc_rfc3339_or_fallback(),
101 event: msg.payload.event.clone(),
102 task_id: msg.payload.task_id.clone(),
103 destination: msg
104 .config
105 .url
106 .as_deref()
107 .map(super::super::worker::redact_webhook_destination),
108 error: sanitize_error(err, msg.config.url.as_deref()),
109 attempts,
110 replay_count: 0,
111 payload: msg.payload.clone(),
112 });
113
114 if records.len() > MAX_WEBHOOK_FAILURE_RECORDS {
115 let retain_from = records.len().saturating_sub(MAX_WEBHOOK_FAILURE_RECORDS);
116 records.drain(..retain_from);
117 }
118
119 write_failure_records_unlocked(path, &records)
120}
121
122pub(super) fn load_failure_records(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
123 let _guard = failure_store_lock()
124 .lock()
125 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
126 load_failure_records_unlocked(path)
127}
128
129#[cfg(test)]
130pub(super) fn write_failure_records(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
131 let _guard = failure_store_lock()
132 .lock()
133 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
134 let _file_lock = acquire_failure_store_lock(path)?;
135 write_failure_records_unlocked(path, records)
136}
137
138pub(super) fn update_replay_counts(path: &Path, replayed_ids: &[String]) -> Result<()> {
139 let replayed_set = replayed_ids
140 .iter()
141 .map(std::string::String::as_str)
142 .collect::<HashSet<_>>();
143
144 let _guard = failure_store_lock()
145 .lock()
146 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
147 let _file_lock = acquire_failure_store_lock(path)?;
148 let mut records = load_failure_records_unlocked(path)?;
149 maybe_pause_failure_store_mutation_for_tests();
150 for record in &mut records {
151 if replayed_set.contains(record.id.as_str()) {
152 record.replay_count = record.replay_count.saturating_add(1);
153 }
154 }
155 write_failure_records_unlocked(path, &records)
156}
157
158fn load_failure_records_unlocked(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
159 if !path.exists() {
160 return Ok(Vec::new());
161 }
162
163 let content = fs::read_to_string(path)
164 .with_context(|| format!("read webhook failure store {}", path.display()))?;
165 if content.trim().is_empty() {
166 return Ok(Vec::new());
167 }
168
169 serde_json::from_str::<Vec<WebhookFailureRecord>>(&content)
170 .with_context(|| format!("parse webhook failure store {}", path.display()))
171}
172
173fn write_failure_records_unlocked(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
174 if let Some(parent) = path.parent() {
175 fs::create_dir_all(parent).with_context(|| {
176 format!(
177 "create webhook failure store directory {}",
178 parent.display()
179 )
180 })?;
181 }
182
183 let rendered = serde_json::to_string_pretty(records).context("serialize webhook failures")?;
184 fsutil::write_atomic(path, rendered.as_bytes())
185 .with_context(|| format!("write webhook failure store {}", path.display()))
186}
187
188fn acquire_failure_store_lock(path: &Path) -> Result<crate::lock::DirLock> {
189 let lock_dir = failure_store_lock_dir(path);
190 let deadline = Instant::now() + FAILURE_STORE_LOCK_WAIT_TIMEOUT;
191
192 loop {
193 match crate::lock::acquire_dir_lock(&lock_dir, FAILURE_STORE_LOCK_LABEL, true) {
194 Ok(lock) => return Ok(lock),
195 Err(error) if Instant::now() < deadline => {
196 log::debug!(
197 "waiting for webhook failure store lock {}: {}",
198 lock_dir.display(),
199 error
200 );
201 std::thread::sleep(FAILURE_STORE_LOCK_WAIT_SLICE);
202 }
203 Err(error) => {
204 return Err(error).with_context(|| {
205 format!(
206 "acquire webhook failure store lock {} within {}ms",
207 lock_dir.display(),
208 FAILURE_STORE_LOCK_WAIT_TIMEOUT.as_millis()
209 )
210 });
211 }
212 }
213 }
214}
215
216fn resolve_repo_root_from_runtime(msg: &WebhookMessage) -> Option<PathBuf> {
217 if let Some(repo_root) = msg.payload.context.repo_root.as_deref() {
218 let repo_root = PathBuf::from(repo_root);
219 if repo_root.exists() {
220 return Some(crate::config::find_repo_root(&repo_root));
221 }
222 log::debug!(
223 "webhook payload repo_root does not exist; falling back to current directory: {}",
224 repo_root.display()
225 );
226 }
227
228 let cwd = std::env::current_dir().ok()?;
229 Some(crate::config::find_repo_root(&cwd))
230}
231
232fn next_failure_id() -> String {
233 let nanos = std::time::SystemTime::now()
234 .duration_since(std::time::UNIX_EPOCH)
235 .map(|duration| duration.as_nanos())
236 .unwrap_or(0);
237 let sequence = NEXT_FAILURE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
238 format!("wf-{nanos}-{sequence}")
239}
240
241fn sanitize_error(err: &anyhow::Error, destination_url: Option<&str>) -> String {
242 let mut rendered = err.to_string();
243 if let Some(url) = destination_url {
244 rendered = rendered.replace(url, &super::super::worker::redact_webhook_destination(url));
245 }
246
247 let redacted = redaction::redact_text(&rendered);
248 let trimmed = redacted.trim();
249 if trimmed.chars().count() <= MAX_FAILURE_ERROR_CHARS {
250 return trimmed.to_string();
251 }
252
253 let truncated = trimmed
254 .chars()
255 .take(MAX_FAILURE_ERROR_CHARS)
256 .collect::<String>();
257 format!("{truncated}…")
258}
259
260#[cfg(test)]
261fn maybe_pause_failure_store_mutation_for_tests() {
262 let delay_ms = std::env::var("RALPH_TEST_WEBHOOK_FAILURE_STORE_DELAY_MS")
263 .ok()
264 .and_then(|value| value.parse::<u64>().ok())
265 .filter(|delay_ms| *delay_ms > 0);
266
267 if let Some(delay_ms) = delay_ms {
268 std::thread::sleep(Duration::from_millis(delay_ms));
269 }
270}
271
272#[cfg(not(test))]
273fn maybe_pause_failure_store_mutation_for_tests() {}