ralph/webhook/diagnostics/
replay.rs1use super::super::{WebhookPayload, enqueue_webhook_payload_for_replay, resolve_webhook_config};
20use super::failure_store::{failure_store_path, load_failure_records, update_replay_counts};
21use crate::contracts::WebhookConfig;
22use anyhow::{Result, bail};
23use serde::Serialize;
24use std::collections::HashSet;
25use std::path::Path;
26
27#[derive(Debug, Clone)]
28pub struct ReplaySelector {
29 pub ids: Vec<String>,
30 pub event: Option<String>,
31 pub task_id: Option<String>,
32 pub limit: usize,
33 pub max_replay_attempts: u32,
34}
35
36#[derive(Debug, Clone, Serialize)]
37pub struct ReplayCandidate {
38 pub id: String,
39 pub event: String,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub task_id: Option<String>,
42 pub failed_at: String,
43 pub attempts: u32,
44 pub replay_count: u32,
45 pub error: String,
46 pub eligible_for_replay: bool,
47}
48
49#[derive(Debug, Clone, Serialize)]
50pub struct ReplayReport {
51 pub dry_run: bool,
52 pub matched_count: usize,
53 pub eligible_count: usize,
54 pub replayed_count: usize,
55 pub skipped_max_replay_attempts: usize,
56 pub skipped_enqueue_failures: usize,
57 pub candidates: Vec<ReplayCandidate>,
58}
59
60#[derive(Debug, Clone)]
61struct SelectedReplayRecord {
62 id: String,
63 replay_count: u32,
64 payload: WebhookPayload,
65}
66
67pub fn replay_failed_deliveries(
68 repo_root: &Path,
69 config: &WebhookConfig,
70 selector: &ReplaySelector,
71 dry_run: bool,
72) -> Result<ReplayReport> {
73 if selector.ids.is_empty() && selector.event.is_none() && selector.task_id.is_none() {
74 bail!(
75 "refusing unbounded replay (would redeliver all failures)\n\
76 this could overwhelm external systems or re-trigger side effects\n\
77 \n\
78 examples:\n\
79 \n\
80 ralph webhook replay --id <failure-id> # replay specific failure\n\
81 ralph webhook replay --event task.done # replay all task.done failures\n\
82 ralph webhook replay --task-id RQ-0001 # replay failures for a task\n\
83 ralph webhook replay --id a,b,c --limit 5 # replay up to 5 specific failures"
84 );
85 }
86
87 if selector.max_replay_attempts == 0 {
88 bail!("max_replay_attempts must be greater than 0");
89 }
90
91 if !dry_run {
92 let resolved = resolve_webhook_config(config);
93 if !resolved.enabled {
94 bail!("Webhook replay requires agent.webhook.enabled=true");
95 }
96 if resolved
97 .url
98 .as_deref()
99 .is_none_or(|url| url.trim().is_empty())
100 {
101 bail!("Webhook replay requires agent.webhook.url to be configured");
102 }
103 }
104
105 let path = failure_store_path(repo_root);
106 let records = load_failure_records(&path)?;
107 let limit = if selector.limit == 0 {
108 usize::MAX
109 } else {
110 selector.limit
111 };
112 let id_filter = selector
113 .ids
114 .iter()
115 .map(std::string::String::as_str)
116 .collect::<HashSet<_>>();
117
118 let mut selected_records = Vec::new();
119 let mut candidates = Vec::new();
120
121 for record in records.iter().rev() {
122 if selected_records.len() >= limit {
123 break;
124 }
125 if !id_filter.is_empty() && !id_filter.contains(record.id.as_str()) {
126 continue;
127 }
128 if let Some(event_filter) = selector.event.as_deref()
129 && record.event != event_filter
130 {
131 continue;
132 }
133 if let Some(task_filter) = selector.task_id.as_deref()
134 && record.task_id.as_deref() != Some(task_filter)
135 {
136 continue;
137 }
138
139 let eligible = record.replay_count < selector.max_replay_attempts;
140 candidates.push(ReplayCandidate {
141 id: record.id.clone(),
142 event: record.event.clone(),
143 task_id: record.task_id.clone(),
144 failed_at: record.failed_at.clone(),
145 attempts: record.attempts,
146 replay_count: record.replay_count,
147 error: record.error.clone(),
148 eligible_for_replay: eligible,
149 });
150 selected_records.push(SelectedReplayRecord {
151 id: record.id.clone(),
152 replay_count: record.replay_count,
153 payload: record.payload.clone(),
154 });
155 }
156
157 let matched_count = candidates.len();
158 let eligible_count = candidates
159 .iter()
160 .filter(|candidate| candidate.eligible_for_replay)
161 .count();
162
163 if dry_run {
164 return Ok(ReplayReport {
165 dry_run,
166 matched_count,
167 eligible_count,
168 replayed_count: 0,
169 skipped_max_replay_attempts: matched_count.saturating_sub(eligible_count),
170 skipped_enqueue_failures: 0,
171 candidates,
172 });
173 }
174
175 let mut replayed_count = 0usize;
176 let mut skipped_max_replay_attempts = 0usize;
177 let mut skipped_enqueue_failures = 0usize;
178 let mut replayed_ids = Vec::new();
179
180 for record in selected_records {
181 if record.replay_count >= selector.max_replay_attempts {
182 skipped_max_replay_attempts += 1;
183 continue;
184 }
185
186 if enqueue_webhook_payload_for_replay(record.payload, config) {
187 replayed_ids.push(record.id);
188 replayed_count += 1;
189 } else {
190 skipped_enqueue_failures += 1;
191 }
192 }
193
194 if !replayed_ids.is_empty() {
195 update_replay_counts(&path, &replayed_ids)?;
196 }
197
198 Ok(ReplayReport {
199 dry_run,
200 matched_count,
201 eligible_count,
202 replayed_count,
203 skipped_max_replay_attempts,
204 skipped_enqueue_failures,
205 candidates,
206 })
207}