1use super::{
19 WebhookMessage, WebhookPayload, enqueue_webhook_payload_for_replay, resolve_webhook_config,
20};
21use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
22use crate::fsutil;
23use crate::redaction;
24use anyhow::{Context, Result, anyhow, bail};
25use serde::{Deserialize, Serialize};
26use std::collections::HashSet;
27use std::fs;
28use std::path::{Path, PathBuf};
29use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
30use std::sync::{Mutex, OnceLock};
31
32const WEBHOOK_FAILURE_STORE_RELATIVE_PATH: &str = ".ralph/cache/webhooks/failures.json";
33const MAX_WEBHOOK_FAILURE_RECORDS: usize = 200;
34const MAX_FAILURE_ERROR_CHARS: usize = 400;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct WebhookFailureRecord {
38 pub id: String,
39 pub failed_at: String,
40 pub event: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub task_id: Option<String>,
43 pub error: String,
44 pub attempts: u32,
45 pub replay_count: u32,
46 pub payload: WebhookPayload,
47}
48
49#[derive(Debug, Clone, Serialize)]
50pub struct WebhookDiagnostics {
51 pub queue_depth: usize,
52 pub queue_capacity: usize,
53 pub queue_policy: WebhookQueuePolicy,
54 pub enqueued_total: u64,
55 pub delivered_total: u64,
56 pub failed_total: u64,
57 pub dropped_total: u64,
58 pub retry_attempts_total: u64,
59 pub failure_store_path: String,
60 pub recent_failures: Vec<WebhookFailureRecord>,
61}
62
63#[derive(Debug, Clone)]
64pub struct ReplaySelector {
65 pub ids: Vec<String>,
66 pub event: Option<String>,
67 pub task_id: Option<String>,
68 pub limit: usize,
69 pub max_replay_attempts: u32,
70}
71
72#[derive(Debug, Clone, Serialize)]
73pub struct ReplayCandidate {
74 pub id: String,
75 pub event: String,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub task_id: Option<String>,
78 pub failed_at: String,
79 pub attempts: u32,
80 pub replay_count: u32,
81 pub error: String,
82 pub eligible_for_replay: bool,
83}
84
85#[derive(Debug, Clone, Serialize)]
86pub struct ReplayReport {
87 pub dry_run: bool,
88 pub matched_count: usize,
89 pub eligible_count: usize,
90 pub replayed_count: usize,
91 pub skipped_max_replay_attempts: usize,
92 pub skipped_enqueue_failures: usize,
93 pub candidates: Vec<ReplayCandidate>,
94}
95
96#[derive(Debug, Clone)]
97struct SelectedReplayRecord {
98 id: String,
99 replay_count: u32,
100 payload: WebhookPayload,
101}
102
103#[derive(Debug, Default)]
104struct WebhookMetrics {
105 queue_depth: AtomicUsize,
106 queue_capacity: AtomicUsize,
107 enqueued_total: AtomicU64,
108 delivered_total: AtomicU64,
109 failed_total: AtomicU64,
110 dropped_total: AtomicU64,
111 retry_attempts_total: AtomicU64,
112}
113
114static METRICS: OnceLock<WebhookMetrics> = OnceLock::new();
115static FAILURE_STORE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
116static NEXT_FAILURE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
117
118fn metrics() -> &'static WebhookMetrics {
119 METRICS.get_or_init(WebhookMetrics::default)
120}
121
122fn failure_store_lock() -> &'static Mutex<()> {
123 FAILURE_STORE_LOCK.get_or_init(|| Mutex::new(()))
124}
125
126pub fn set_queue_capacity(capacity: usize) {
127 metrics().queue_capacity.store(capacity, Ordering::SeqCst);
128}
129
130pub fn note_queue_dequeue() {
131 let depth = &metrics().queue_depth;
132 let _ = depth.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
133 Some(current.saturating_sub(1))
134 });
135}
136
137pub fn note_enqueue_success() {
138 let state = metrics();
139 state.enqueued_total.fetch_add(1, Ordering::SeqCst);
140 state.queue_depth.fetch_add(1, Ordering::SeqCst);
141}
142
143pub fn note_dropped_message() {
144 metrics().dropped_total.fetch_add(1, Ordering::SeqCst);
145}
146
147pub fn note_retry_attempt() {
148 metrics()
149 .retry_attempts_total
150 .fetch_add(1, Ordering::SeqCst);
151}
152
153pub fn note_delivery_success() {
154 metrics().delivered_total.fetch_add(1, Ordering::SeqCst);
155}
156
157pub fn note_delivery_failure(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) {
158 metrics().failed_total.fetch_add(1, Ordering::SeqCst);
159
160 if let Err(write_err) = persist_failed_delivery(msg, err, attempts) {
161 log::warn!("Failed to persist webhook failure record: {write_err:#}");
162 }
163}
164
165pub fn failure_store_path(repo_root: &Path) -> PathBuf {
166 repo_root.join(WEBHOOK_FAILURE_STORE_RELATIVE_PATH)
167}
168
169pub fn diagnostics_snapshot(
170 repo_root: &Path,
171 config: &WebhookConfig,
172 recent_limit: usize,
173) -> Result<WebhookDiagnostics> {
174 let path = failure_store_path(repo_root);
175 let records = load_failure_records(&path)?;
176 let limit = if recent_limit == 0 {
177 records.len()
178 } else {
179 recent_limit
180 };
181 let recent_failures = records.into_iter().rev().take(limit).collect::<Vec<_>>();
182
183 let state = metrics();
184 let configured_capacity = config
185 .queue_capacity
186 .map(|value| value.clamp(1, 10000) as usize)
187 .unwrap_or(500);
188 let queue_capacity = match state.queue_capacity.load(Ordering::SeqCst) {
189 0 => configured_capacity,
190 value => value,
191 };
192
193 Ok(WebhookDiagnostics {
194 queue_depth: state.queue_depth.load(Ordering::SeqCst),
195 queue_capacity,
196 queue_policy: config.queue_policy.unwrap_or_default(),
197 enqueued_total: state.enqueued_total.load(Ordering::SeqCst),
198 delivered_total: state.delivered_total.load(Ordering::SeqCst),
199 failed_total: state.failed_total.load(Ordering::SeqCst),
200 dropped_total: state.dropped_total.load(Ordering::SeqCst),
201 retry_attempts_total: state.retry_attempts_total.load(Ordering::SeqCst),
202 failure_store_path: path.display().to_string(),
203 recent_failures,
204 })
205}
206
207pub fn replay_failed_deliveries(
208 repo_root: &Path,
209 config: &WebhookConfig,
210 selector: &ReplaySelector,
211 dry_run: bool,
212) -> Result<ReplayReport> {
213 if selector.ids.is_empty() && selector.event.is_none() && selector.task_id.is_none() {
214 bail!(
215 "refusing unbounded replay (would redeliver all failures)\n\
216 this could overwhelm external systems or re-trigger side effects\n\
217 \n\
218 examples:\n\
219 \n\
220 ralph webhook replay --id <failure-id> # replay specific failure\n\
221 ralph webhook replay --event task.done # replay all task.done failures\n\
222 ralph webhook replay --task-id RQ-0001 # replay failures for a task\n\
223 ralph webhook replay --id a,b,c --limit 5 # replay up to 5 specific failures"
224 );
225 }
226
227 if selector.max_replay_attempts == 0 {
228 bail!("max_replay_attempts must be greater than 0");
229 }
230
231 if !dry_run {
232 let resolved = resolve_webhook_config(config);
233 if !resolved.enabled {
234 bail!("Webhook replay requires agent.webhook.enabled=true");
235 }
236 if resolved
237 .url
238 .as_deref()
239 .is_none_or(|url| url.trim().is_empty())
240 {
241 bail!("Webhook replay requires agent.webhook.url to be configured");
242 }
243 }
244
245 let path = failure_store_path(repo_root);
246
247 let limit = if selector.limit == 0 {
248 usize::MAX
249 } else {
250 selector.limit
251 };
252
253 let id_filter = selector
254 .ids
255 .iter()
256 .map(std::string::String::as_str)
257 .collect::<HashSet<_>>();
258
259 let (selected_records, candidates) = {
260 let _guard = failure_store_lock()
261 .lock()
262 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
263 let records = load_failure_records_unlocked(&path)?;
264
265 let mut selected_records = Vec::new();
266 let mut candidates = Vec::new();
267
268 for record in records.iter().rev() {
269 if selected_records.len() >= limit {
270 break;
271 }
272 if !id_filter.is_empty() && !id_filter.contains(record.id.as_str()) {
273 continue;
274 }
275 if let Some(event_filter) = selector.event.as_deref()
276 && record.event != event_filter
277 {
278 continue;
279 }
280 if let Some(task_filter) = selector.task_id.as_deref()
281 && record.task_id.as_deref() != Some(task_filter)
282 {
283 continue;
284 }
285
286 let eligible = record.replay_count < selector.max_replay_attempts;
287 candidates.push(ReplayCandidate {
288 id: record.id.clone(),
289 event: record.event.clone(),
290 task_id: record.task_id.clone(),
291 failed_at: record.failed_at.clone(),
292 attempts: record.attempts,
293 replay_count: record.replay_count,
294 error: record.error.clone(),
295 eligible_for_replay: eligible,
296 });
297 selected_records.push(SelectedReplayRecord {
298 id: record.id.clone(),
299 replay_count: record.replay_count,
300 payload: record.payload.clone(),
301 });
302 }
303
304 (selected_records, candidates)
305 };
306
307 let matched_count = candidates.len();
308 let eligible_count = candidates
309 .iter()
310 .filter(|candidate| candidate.eligible_for_replay)
311 .count();
312
313 if dry_run {
314 return Ok(ReplayReport {
315 dry_run,
316 matched_count,
317 eligible_count,
318 replayed_count: 0,
319 skipped_max_replay_attempts: matched_count.saturating_sub(eligible_count),
320 skipped_enqueue_failures: 0,
321 candidates,
322 });
323 }
324
325 let mut replayed_count = 0usize;
326 let mut skipped_max_replay_attempts = 0usize;
327 let mut skipped_enqueue_failures = 0usize;
328 let mut replayed_ids = Vec::new();
329
330 for record in selected_records {
331 if record.replay_count >= selector.max_replay_attempts {
332 skipped_max_replay_attempts += 1;
333 continue;
334 }
335
336 if enqueue_webhook_payload_for_replay(record.payload, config) {
337 replayed_ids.push(record.id);
338 replayed_count += 1;
339 } else {
340 skipped_enqueue_failures += 1;
341 }
342 }
343
344 if !replayed_ids.is_empty() {
345 update_replay_counts(&path, &replayed_ids)?;
346 }
347
348 Ok(ReplayReport {
349 dry_run,
350 matched_count,
351 eligible_count,
352 replayed_count,
353 skipped_max_replay_attempts,
354 skipped_enqueue_failures,
355 candidates,
356 })
357}
358
359fn update_replay_counts(path: &Path, replayed_ids: &[String]) -> Result<()> {
360 let replayed_set = replayed_ids
361 .iter()
362 .map(std::string::String::as_str)
363 .collect::<HashSet<_>>();
364
365 let _guard = failure_store_lock()
366 .lock()
367 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
368 let mut records = load_failure_records_unlocked(path)?;
369 for record in &mut records {
370 if replayed_set.contains(record.id.as_str()) {
371 record.replay_count = record.replay_count.saturating_add(1);
372 }
373 }
374 write_failure_records_unlocked(path, &records)
375}
376
377fn persist_failed_delivery(msg: &WebhookMessage, err: &anyhow::Error, attempts: u32) -> Result<()> {
378 let repo_root = match resolve_repo_root_from_runtime(msg) {
379 Some(path) => path,
380 None => {
381 log::debug!("Unable to resolve repo root for webhook failure persistence");
382 return Ok(());
383 }
384 };
385
386 let path = failure_store_path(&repo_root);
387 persist_failed_delivery_at_path(&path, msg, err, attempts)
388}
389
390fn persist_failed_delivery_at_path(
391 path: &Path,
392 msg: &WebhookMessage,
393 err: &anyhow::Error,
394 attempts: u32,
395) -> Result<()> {
396 let _guard = failure_store_lock()
397 .lock()
398 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
399
400 let mut records = load_failure_records_unlocked(path)?;
401 records.push(WebhookFailureRecord {
402 id: next_failure_id(),
403 failed_at: crate::timeutil::now_utc_rfc3339_or_fallback(),
404 event: msg.payload.event.clone(),
405 task_id: msg.payload.task_id.clone(),
406 error: sanitize_error(err),
407 attempts,
408 replay_count: 0,
409 payload: msg.payload.clone(),
410 });
411
412 if records.len() > MAX_WEBHOOK_FAILURE_RECORDS {
413 let retain_from = records.len().saturating_sub(MAX_WEBHOOK_FAILURE_RECORDS);
414 records.drain(..retain_from);
415 }
416
417 write_failure_records_unlocked(path, &records)
418}
419
420fn load_failure_records(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
421 let _guard = failure_store_lock()
422 .lock()
423 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
424 load_failure_records_unlocked(path)
425}
426
427fn load_failure_records_unlocked(path: &Path) -> Result<Vec<WebhookFailureRecord>> {
428 if !path.exists() {
429 return Ok(Vec::new());
430 }
431
432 let content = fs::read_to_string(path)
433 .with_context(|| format!("read webhook failure store {}", path.display()))?;
434 if content.trim().is_empty() {
435 return Ok(Vec::new());
436 }
437
438 serde_json::from_str::<Vec<WebhookFailureRecord>>(&content)
439 .with_context(|| format!("parse webhook failure store {}", path.display()))
440}
441
442fn write_failure_records_unlocked(path: &Path, records: &[WebhookFailureRecord]) -> Result<()> {
443 if let Some(parent) = path.parent() {
444 fs::create_dir_all(parent).with_context(|| {
445 format!(
446 "create webhook failure store directory {}",
447 parent.display()
448 )
449 })?;
450 }
451
452 let rendered = serde_json::to_string_pretty(records).context("serialize webhook failures")?;
453 fsutil::write_atomic(path, rendered.as_bytes())
454 .with_context(|| format!("write webhook failure store {}", path.display()))
455}
456
457fn resolve_repo_root_from_runtime(msg: &WebhookMessage) -> Option<PathBuf> {
458 if let Some(repo_root) = msg.payload.context.repo_root.as_deref() {
459 let repo_root = PathBuf::from(repo_root);
460 if repo_root.exists() {
461 return Some(crate::config::find_repo_root(&repo_root));
462 }
463 log::debug!(
464 "webhook payload repo_root does not exist; falling back to current directory: {}",
465 repo_root.display()
466 );
467 }
468
469 let cwd = std::env::current_dir().ok()?;
470 Some(crate::config::find_repo_root(&cwd))
471}
472
473fn next_failure_id() -> String {
474 let nanos = std::time::SystemTime::now()
475 .duration_since(std::time::UNIX_EPOCH)
476 .map(|duration| duration.as_nanos())
477 .unwrap_or(0);
478 let sequence = NEXT_FAILURE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
479 format!("wf-{nanos}-{sequence}")
480}
481
482fn sanitize_error(err: &anyhow::Error) -> String {
483 let redacted = redaction::redact_text(&err.to_string());
484 let trimmed = redacted.trim();
485 if trimmed.chars().count() <= MAX_FAILURE_ERROR_CHARS {
486 return trimmed.to_string();
487 }
488
489 let truncated = trimmed
490 .chars()
491 .take(MAX_FAILURE_ERROR_CHARS)
492 .collect::<String>();
493 format!("{truncated}…")
494}
495
496#[cfg(test)]
497pub(crate) fn write_failure_records_for_tests(
498 repo_root: &Path,
499 records: &[WebhookFailureRecord],
500) -> Result<()> {
501 let path = failure_store_path(repo_root);
502 let _guard = failure_store_lock()
503 .lock()
504 .map_err(|_| anyhow!("failed to acquire webhook failure store lock"))?;
505 write_failure_records_unlocked(&path, records)
506}
507
508#[cfg(test)]
509pub(crate) fn load_failure_records_for_tests(
510 repo_root: &Path,
511) -> Result<Vec<WebhookFailureRecord>> {
512 let path = failure_store_path(repo_root);
513 load_failure_records(&path)
514}
515
516#[cfg(test)]
517pub(crate) fn persist_failed_delivery_for_tests(
518 repo_root: &Path,
519 msg: &WebhookMessage,
520 err: &anyhow::Error,
521 attempts: u32,
522) -> Result<()> {
523 let path = failure_store_path(repo_root);
524 persist_failed_delivery_at_path(&path, msg, err, attempts)
525}
526
527#[cfg(test)]
528pub(crate) fn reset_webhook_metrics_for_tests() {
529 let state = metrics();
530 state.queue_depth.store(0, Ordering::SeqCst);
531 state.queue_capacity.store(0, Ordering::SeqCst);
532 state.enqueued_total.store(0, Ordering::SeqCst);
533 state.delivered_total.store(0, Ordering::SeqCst);
534 state.failed_total.store(0, Ordering::SeqCst);
535 state.dropped_total.store(0, Ordering::SeqCst);
536 state.retry_attempts_total.store(0, Ordering::SeqCst);
537}