1use super::{
19 WebhookMessage, WebhookPayload, enqueue_webhook_payload_for_replay, resolve_webhook_config,
20};
21use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
22use crate::fsutil;
23use crate::redaction;
24use anyhow::{Context, Result, anyhow, bail};
25use serde::{Deserialize, Serialize};
26use std::collections::HashSet;
27use std::fs;
28use std::path::{Path, PathBuf};
29use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
30use std::sync::{Mutex, OnceLock};
31
32const WEBHOOK_FAILURE_STORE_RELATIVE_PATH: &str = ".ralph/cache/webhooks/failures.json";
33const MAX_WEBHOOK_FAILURE_RECORDS: usize = 200;
34const MAX_FAILURE_ERROR_CHARS: usize = 400;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct WebhookFailureRecord {
38 pub id: String,
39 pub failed_at: String,
40 pub event: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub task_id: Option<String>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub destination: Option<String>,
45 pub error: String,
46 pub attempts: u32,
47 pub replay_count: u32,
48 pub payload: WebhookPayload,
49}
50
51#[derive(Debug, Clone, Serialize)]
52pub struct WebhookDiagnostics {
53 pub queue_depth: usize,
54 pub queue_capacity: usize,
55 pub queue_policy: WebhookQueuePolicy,
56 pub enqueued_total: u64,
57 pub delivered_total: u64,
58 pub failed_total: u64,
59 pub dropped_total: u64,
60 pub retry_attempts_total: u64,
61 pub failure_store_path: String,
62 pub recent_failures: Vec<WebhookFailureRecord>,
63}
64
65#[derive(Debug, Clone)]
66pub struct ReplaySelector {
67 pub ids: Vec<String>,
68 pub event: Option<String>,
69 pub task_id: Option<String>,
70 pub limit: usize,
71 pub max_replay_attempts: u32,
72}
73
74#[derive(Debug, Clone, Serialize)]
75pub struct ReplayCandidate {
76 pub id: String,
77 pub event: String,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub task_id: Option<String>,
80 pub failed_at: String,
81 pub attempts: u32,
82 pub replay_count: u32,
83 pub error: String,
84 pub eligible_for_replay: bool,
85}
86
87#[derive(Debug, Clone, Serialize)]
88pub struct ReplayReport {
89 pub dry_run: bool,
90 pub matched_count: usize,
91 pub eligible_count: usize,
92 pub replayed_count: usize,
93 pub skipped_max_replay_attempts: usize,
94 pub skipped_enqueue_failures: usize,
95 pub candidates: Vec<ReplayCandidate>,
96}
97
98#[derive(Debug, Clone)]
99struct SelectedReplayRecord {
100 id: String,
101 replay_count: u32,
102 payload: WebhookPayload,
103}
104
105#[derive(Debug, Default)]
106struct WebhookMetrics {
107 queue_depth: AtomicUsize,
108 queue_capacity: AtomicUsize,
109 enqueued_total: AtomicU64,
110 delivered_total: AtomicU64,
111 failed_total: AtomicU64,
112 dropped_total: AtomicU64,
113 retry_attempts_total: AtomicU64,
114}
115
116static METRICS: OnceLock<WebhookMetrics> = OnceLock::new();
117static FAILURE_STORE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
118static NEXT_FAILURE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
119
120fn metrics() -> &'static WebhookMetrics {
121 METRICS.get_or_init(WebhookMetrics::default)
122}
123
124fn failure_store_lock() -> &'static Mutex<()> {
125 FAILURE_STORE_LOCK.get_or_init(|| Mutex::new(()))
126}
127
128pub fn set_queue_capacity(capacity: usize) {
129 metrics().queue_capacity.store(capacity, Ordering::SeqCst);
130}
131
132pub fn note_queue_dequeue() {
133 let depth = &metrics().queue_depth;
134 let _ = depth.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
135 Some(current.saturating_sub(1))
136 });
137}
138
139pub fn note_enqueue_success() {
140 let state = metrics();
141 state.enqueued_total.fetch_add(1, Ordering::SeqCst);
142 state.queue_depth.fetch_add(1, Ordering::SeqCst);
143}
144
145pub fn note_retry_requeue() {
146 metrics().queue_depth.fetch_add(1, Ordering::SeqCst);
147}
148
149pub fn note_dropped_message() {
150 metrics().dropped_total.fetch_add(1, Ordering::SeqCst);
151}
152
153pub fn note_retry_attempt() {
154 metrics()
155 .retry_attempts_total
156 .fetch_add(1, Ordering::SeqCst);
157}
158
159pub fn note_delivery_success() {
160 metrics().delivered_total.fetch_add(1, Ordering::SeqCst);
161}
162
163pub fn note_delivery_failure(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) {
164 metrics().failed_total.fetch_add(1, Ordering::SeqCst);
165
166 if let Err(write_err) = persist_failed_delivery(msg, err, attempts) {
167 log::warn!("Failed to persist webhook failure record: {write_err:#}");
168 }
169}
170
171pub fn failure_store_path(repo_root: &Path) -> PathBuf {
172 repo_root.join(WEBHOOK_FAILURE_STORE_RELATIVE_PATH)
173}
174
175pub fn diagnostics_snapshot(
176 repo_root: &Path,
177 config: &WebhookConfig,
178 recent_limit: usize,
179) -> Result<WebhookDiagnostics> {
180 let path = failure_store_path(repo_root);
181 let records = load_failure_records(&path)?;
182 let limit = if recent_limit == 0 {
183 records.len()
184 } else {
185 recent_limit
186 };
187 let recent_failures = records.into_iter().rev().take(limit).collect::<Vec<_>>();
188
189 let state = metrics();
190 let configured_capacity = config
191 .queue_capacity
192 .map(|value| value.clamp(1, 10000) as usize)
193 .unwrap_or(500);
194 let queue_capacity = match state.queue_capacity.load(Ordering::SeqCst) {
195 0 => configured_capacity,
196 value => value,
197 };
198
199 Ok(WebhookDiagnostics {
200 queue_depth: state.queue_depth.load(Ordering::SeqCst),
201 queue_capacity,
202 queue_policy: config.queue_policy.unwrap_or_default(),
203 enqueued_total: state.enqueued_total.load(Ordering::SeqCst),
204 delivered_total: state.delivered_total.load(Ordering::SeqCst),
205 failed_total: state.failed_total.load(Ordering::SeqCst),
206 dropped_total: state.dropped_total.load(Ordering::SeqCst),
207 retry_attempts_total: state.retry_attempts_total.load(Ordering::SeqCst),
208 failure_store_path: path.display().to_string(),
209 recent_failures,
210 })
211}
212
213pub fn replay_failed_deliveries(
214 repo_root: &Path,
215 config: &WebhookConfig,
216 selector: &ReplaySelector,
217 dry_run: bool,
218) -> Result<ReplayReport> {
219 if selector.ids.is_empty() && selector.event.is_none() && selector.task_id.is_none() {
220 bail!(
221 "refusing unbounded replay (would redeliver all failures)\n\
222 this could overwhelm external systems or re-trigger side effects\n\
223 \n\
224 examples:\n\
225 \n\
226 ralph webhook replay --id <failure-id> # replay specific failure\n\
227 ralph webhook replay --event task.done # replay all task.done failures\n\
228 ralph webhook replay --task-id RQ-0001 # replay failures for a task\n\
229 ralph webhook replay --id a,b,c --limit 5 # replay up to 5 specific failures"
230 );
231 }
232
233 if selector.max_replay_attempts == 0 {
234 bail!("max_replay_attempts must be greater than 0");
235 }
236
237 if !dry_run {
238 let resolved = resolve_webhook_config(config);
239 if !resolved.enabled {
240 bail!("Webhook replay requires agent.webhook.enabled=true");
241 }
242 if resolved
243 .url
244 .as_deref()
245 .is_none_or(|url| url.trim().is_empty())
246 {
247 bail!("Webhook replay requires agent.webhook.url to be configured");
248 }
249 }
250
251 let path = failure_store_path(repo_root);
252
253 let limit = if selector.limit == 0 {
254 usize::MAX
255 } else {
256 selector.limit
257 };
258
259 let id_filter = selector
260 .ids
261 .iter()
262 .map(std::string::String::as_str)
263 .collect::<HashSet<_>>();
264
265 let (selected_records, candidates) = {
266 let _guard = failure_store_lock()
267 .lock()
268 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
269 let records = load_failure_records_unlocked(&path)?;
270
271 let mut selected_records = Vec::new();
272 let mut candidates = Vec::new();
273
274 for record in records.iter().rev() {
275 if selected_records.len() >= limit {
276 break;
277 }
278 if !id_filter.is_empty() && !id_filter.contains(record.id.as_str()) {
279 continue;
280 }
281 if let Some(event_filter) = selector.event.as_deref()
282 && record.event != event_filter
283 {
284 continue;
285 }
286 if let Some(task_filter) = selector.task_id.as_deref()
287 && record.task_id.as_deref() != Some(task_filter)
288 {
289 continue;
290 }
291
292 let eligible = record.replay_count < selector.max_replay_attempts;
293 candidates.push(ReplayCandidate {
294 id: record.id.clone(),
295 event: record.event.clone(),
296 task_id: record.task_id.clone(),
297 failed_at: record.failed_at.clone(),
298 attempts: record.attempts,
299 replay_count: record.replay_count,
300 error: record.error.clone(),
301 eligible_for_replay: eligible,
302 });
303 selected_records.push(SelectedReplayRecord {
304 id: record.id.clone(),
305 replay_count: record.replay_count,
306 payload: record.payload.clone(),
307 });
308 }
309
310 (selected_records, candidates)
311 };
312
313 let matched_count = candidates.len();
314 let eligible_count = candidates
315 .iter()
316 .filter(|candidate| candidate.eligible_for_replay)
317 .count();
318
319 if dry_run {
320 return Ok(ReplayReport {
321 dry_run,
322 matched_count,
323 eligible_count,
324 replayed_count: 0,
325 skipped_max_replay_attempts: matched_count.saturating_sub(eligible_count),
326 skipped_enqueue_failures: 0,
327 candidates,
328 });
329 }
330
331 let mut replayed_count = 0usize;
332 let mut skipped_max_replay_attempts = 0usize;
333 let mut skipped_enqueue_failures = 0usize;
334 let mut replayed_ids = Vec::new();
335
336 for record in selected_records {
337 if record.replay_count >= selector.max_replay_attempts {
338 skipped_max_replay_attempts += 1;
339 continue;
340 }
341
342 if enqueue_webhook_payload_for_replay(record.payload, config) {
343 replayed_ids.push(record.id);
344 replayed_count += 1;
345 } else {
346 skipped_enqueue_failures += 1;
347 }
348 }
349
350 if !replayed_ids.is_empty() {
351 update_replay_counts(&path, &replayed_ids)?;
352 }
353
354 Ok(ReplayReport {
355 dry_run,
356 matched_count,
357 eligible_count,
358 replayed_count,
359 skipped_max_replay_attempts,
360 skipped_enqueue_failures,
361 candidates,
362 })
363}
364
365fn update_replay_counts(path: &Path, replayed_ids: &[String]) -> Result<()> {
366 let replayed_set = replayed_ids
367 .iter()
368 .map(std::string::String::as_str)
369 .collect::<HashSet<_>>();
370
371 let _guard = failure_store_lock()
372 .lock()
373 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
374 let mut records = load_failure_records_unlocked(path)?;
375 for record in &mut records {
376 if replayed_set.contains(record.id.as_str()) {
377 record.replay_count = record.replay_count.saturating_add(1);
378 }
379 }
380 write_failure_records_unlocked(path, &records)
381}
382
383fn persist_failed_delivery(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) -> Result<()> {
384 let repo_root = match resolve_repo_root_from_runtime(msg) {
385 Some(path) => path,
386 None => {
387 log::debug!("Unable to resolve repo root for webhook failure persistence");
388 return Ok(());
389 }
390 };
391
392 let path = failure_store_path(&repo_root);
393 persist_failed_delivery_at_path(&path, msg, err, attempts)
394}
395
396fn persist_failed_delivery_at_path(
397 path: &Path,
398 msg: &WebhookMessage,
399 err: &anyhow::Error,
400 attempts: u32,
401) -> Result<()> {
402 let _guard = failure_store_lock()
403 .lock()
404 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
405
406 let mut records = load_failure_records_unlocked(path)?;
407 records.push(WebhookFailureRecord {
408 id: next_failure_id(),
409 failed_at: crate::timeutil::now_utc_rfc3339_or_fallback(),
410 event: msg.payload.event.clone(),
411 task_id: msg.payload.task_id.clone(),
412 destination: msg
413 .config
414 .url
415 .as_deref()
416 .map(super::worker::redact_webhook_destination),
417 error: sanitize_error(err, msg.config.url.as_deref()),
418 attempts,
419 replay_count: 0,
420 payload: msg.payload.clone(),
421 });
422
423 if records.len() > MAX_WEBHOOK_FAILURE_RECORDS {
424 let retain_from = records.len().saturating_sub(MAX_WEBHOOK_FAILURE_RECORDS);
425 records.drain(..retain_from);
426 }
427
428 write_failure_records_unlocked(path, &records)
429}
430
431fn load_failure_records(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
432 let _guard = failure_store_lock()
433 .lock()
434 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
435 load_failure_records_unlocked(path)
436}
437
438fn load_failure_records_unlocked(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
439 if !path.exists() {
440 return Ok(Vec::new());
441 }
442
443 let content = fs::read_to_string(path)
444 .with_context(|| format!("read webhook failure store {}", path.display()))?;
445 if content.trim().is_empty() {
446 return Ok(Vec::new());
447 }
448
449 serde_json::from_str::<Vec<WebhookFailureRecord>>(&content)
450 .with_context(|| format!("parse webhook failure store {}", path.display()))
451}
452
453fn write_failure_records_unlocked(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
454 if let Some(parent) = path.parent() {
455 fs::create_dir_all(parent).with_context(|| {
456 format!(
457 "create webhook failure store directory {}",
458 parent.display()
459 )
460 })?;
461 }
462
463 let rendered = serde_json::to_string_pretty(records).context("serialize webhook failures")?;
464 fsutil::write_atomic(path, rendered.as_bytes())
465 .with_context(|| format!("write webhook failure store {}", path.display()))
466}
467
468fn resolve_repo_root_from_runtime(msg: &WebhookMessage) -> Option<PathBuf> {
469 if let Some(repo_root) = msg.payload.context.repo_root.as_deref() {
470 let repo_root = PathBuf::from(repo_root);
471 if repo_root.exists() {
472 return Some(crate::config::find_repo_root(&repo_root));
473 }
474 log::debug!(
475 "webhook payload repo_root does not exist; falling back to current directory: {}",
476 repo_root.display()
477 );
478 }
479
480 let cwd = std::env::current_dir().ok()?;
481 Some(crate::config::find_repo_root(&cwd))
482}
483
484fn next_failure_id() -> String {
485 let nanos = std::time::SystemTime::now()
486 .duration_since(std::time::UNIX_EPOCH)
487 .map(|duration| duration.as_nanos())
488 .unwrap_or(0);
489 let sequence = NEXT_FAILURE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
490 format!("wf-{nanos}-{sequence}")
491}
492
493fn sanitize_error(err: &anyhow::Error, destination_url: Option<&str>) -> String {
494 let mut rendered = err.to_string();
495 if let Some(url) = destination_url {
496 rendered = rendered.replace(url, &super::worker::redact_webhook_destination(url));
497 }
498
499 let redacted = redaction::redact_text(&rendered);
500 let trimmed = redacted.trim();
501 if trimmed.chars().count() <= MAX_FAILURE_ERROR_CHARS {
502 return trimmed.to_string();
503 }
504
505 let truncated = trimmed
506 .chars()
507 .take(MAX_FAILURE_ERROR_CHARS)
508 .collect::<String>();
509 format!("{truncated}…")
510}
511
512#[cfg(test)]
513pub(crate) fn write_failure_records_for_tests(
514 repo_root: &Path,
515 records: &[WebhookFailureRecord],
516) -> Result<()> {
517 let path = failure_store_path(repo_root);
518 let _guard = failure_store_lock()
519 .lock()
520 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
521 write_failure_records_unlocked(&path, records)
522}
523
524#[cfg(test)]
525pub(crate) fn load_failure_records_for_tests(
526 repo_root: &Path,
527) -> Result<Vec<WebhookFailureRecord>> {
528 let path = failure_store_path(repo_root);
529 load_failure_records(&path)
530}
531
532#[cfg(test)]
533pub(crate) fn persist_failed_delivery_for_tests(
534 repo_root: &Path,
535 msg: &WebhookMessage,
536 err: &anyhow::Error,
537 attempts: u32,
538) -> Result<()> {
539 let path = failure_store_path(repo_root);
540 persist_failed_delivery_at_path(&path, msg, err, attempts)
541}
542
543#[cfg(test)]
544pub(crate) fn reset_webhook_metrics_for_tests() {
545 let state = metrics();
546 state.queue_depth.store(0, Ordering::SeqCst);
547 state.queue_capacity.store(0, Ordering::SeqCst);
548 state.enqueued_total.store(0, Ordering::SeqCst);
549 state.delivered_total.store(0, Ordering::SeqCst);
550 state.failed_total.store(0, Ordering::SeqCst);
551 state.dropped_total.store(0, Ordering::SeqCst);
552 state.retry_attempts_total.store(0, Ordering::SeqCst);
553}