Skip to main content

ralph/webhook/diagnostics/
replay.rs

1//! Purpose: Select, report, and replay persisted webhook delivery failures.
2//!
3//! Responsibilities:
4//! - Define replay selector and reporting types.
5//! - Filter persisted failure records into bounded replay candidates.
6//! - Execute explicit replay requests through the normal enqueue path.
7//!
8//! Scope:
9//! - Replay validation, candidate selection, dry-run reporting, and live replay execution only.
10//!
11//! Usage:
12//! - Called by CLI webhook replay commands through the diagnostics facade.
13//!
14//! Invariants/Assumptions:
15//! - Replay is always explicit and bounded by caller-provided selectors or limits.
16//! - Dry-run replay never mutates persisted replay counters.
17//! - Live replay increments replay counts only for successfully enqueued records.
18
19use super::super::{WebhookPayload, enqueue_webhook_payload_for_replay, resolve_webhook_config};
20use super::failure_store::{failure_store_path, load_failure_records, update_replay_counts};
21use crate::contracts::WebhookConfig;
22use anyhow::{Result, bail};
23use serde::Serialize;
24use std::collections::HashSet;
25use std::path::Path;
26
27#[derive(Debug, Clone)]
28pub struct ReplaySelector {
29    pub ids: Vec<String>,
30    pub event: Option<String>,
31    pub task_id: Option<String>,
32    pub limit: usize,
33    pub max_replay_attempts: u32,
34}
35
36#[derive(Debug, Clone, Serialize)]
37pub struct ReplayCandidate {
38    pub id: String,
39    pub event: String,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub task_id: Option<String>,
42    pub failed_at: String,
43    pub attempts: u32,
44    pub replay_count: u32,
45    pub error: String,
46    pub eligible_for_replay: bool,
47}
48
49#[derive(Debug, Clone, Serialize)]
50pub struct ReplayReport {
51    pub dry_run: bool,
52    pub matched_count: usize,
53    pub eligible_count: usize,
54    pub replayed_count: usize,
55    pub skipped_max_replay_attempts: usize,
56    pub skipped_enqueue_failures: usize,
57    pub candidates: Vec<ReplayCandidate>,
58}
59
60#[derive(Debug, Clone)]
61struct SelectedReplayRecord {
62    id: String,
63    replay_count: u32,
64    payload: WebhookPayload,
65}
66
67pub fn replay_failed_deliveries(
68    repo_root: &Path,
69    config: &WebhookConfig,
70    selector: &ReplaySelector,
71    dry_run: bool,
72) -> Result<ReplayReport> {
73    if selector.ids.is_empty() && selector.event.is_none() && selector.task_id.is_none() {
74        bail!(
75            "refusing unbounded replay (would redeliver all failures)\n\
76             this could overwhelm external systems or re-trigger side effects\n\
77             \n\
78             examples:\n\
79             \n\
80             ralph webhook replay --id <failure-id>     # replay specific failure\n\
81             ralph webhook replay --event task.done     # replay all task.done failures\n\
82             ralph webhook replay --task-id RQ-0001     # replay failures for a task\n\
83             ralph webhook replay --id a,b,c --limit 5  # replay up to 5 specific failures"
84        );
85    }
86
87    if selector.max_replay_attempts == 0 {
88        bail!("max_replay_attempts must be greater than 0");
89    }
90
91    if !dry_run {
92        let resolved = resolve_webhook_config(config);
93        if !resolved.enabled {
94            bail!("Webhook replay requires agent.webhook.enabled=true");
95        }
96        if resolved
97            .url
98            .as_deref()
99            .is_none_or(|url| url.trim().is_empty())
100        {
101            bail!("Webhook replay requires agent.webhook.url to be configured");
102        }
103    }
104
105    let path = failure_store_path(repo_root);
106    let records = load_failure_records(&path)?;
107    let limit = if selector.limit == 0 {
108        usize::MAX
109    } else {
110        selector.limit
111    };
112    let id_filter = selector
113        .ids
114        .iter()
115        .map(std::string::String::as_str)
116        .collect::<HashSet<_>>();
117
118    let mut selected_records = Vec::new();
119    let mut candidates = Vec::new();
120
121    for record in records.iter().rev() {
122        if selected_records.len() >= limit {
123            break;
124        }
125        if !id_filter.is_empty() && !id_filter.contains(record.id.as_str()) {
126            continue;
127        }
128        if let Some(event_filter) = selector.event.as_deref()
129            && record.event != event_filter
130        {
131            continue;
132        }
133        if let Some(task_filter) = selector.task_id.as_deref()
134            && record.task_id.as_deref() != Some(task_filter)
135        {
136            continue;
137        }
138
139        let eligible = record.replay_count < selector.max_replay_attempts;
140        candidates.push(ReplayCandidate {
141            id: record.id.clone(),
142            event: record.event.clone(),
143            task_id: record.task_id.clone(),
144            failed_at: record.failed_at.clone(),
145            attempts: record.attempts,
146            replay_count: record.replay_count,
147            error: record.error.clone(),
148            eligible_for_replay: eligible,
149        });
150        selected_records.push(SelectedReplayRecord {
151            id: record.id.clone(),
152            replay_count: record.replay_count,
153            payload: record.payload.clone(),
154        });
155    }
156
157    let matched_count = candidates.len();
158    let eligible_count = candidates
159        .iter()
160        .filter(|candidate| candidate.eligible_for_replay)
161        .count();
162
163    if dry_run {
164        return Ok(ReplayReport {
165            dry_run,
166            matched_count,
167            eligible_count,
168            replayed_count: 0,
169            skipped_max_replay_attempts: matched_count.saturating_sub(eligible_count),
170            skipped_enqueue_failures: 0,
171            candidates,
172        });
173    }
174
175    let mut replayed_count = 0usize;
176    let mut skipped_max_replay_attempts = 0usize;
177    let mut skipped_enqueue_failures = 0usize;
178    let mut replayed_ids = Vec::new();
179
180    for record in selected_records {
181        if record.replay_count >= selector.max_replay_attempts {
182            skipped_max_replay_attempts += 1;
183            continue;
184        }
185
186        if enqueue_webhook_payload_for_replay(record.payload, config) {
187            replayed_ids.push(record.id);
188            replayed_count += 1;
189        } else {
190            skipped_enqueue_failures += 1;
191        }
192    }
193
194    if !replayed_ids.is_empty() {
195        update_replay_counts(&path, &replayed_ids)?;
196    }
197
198    Ok(ReplayReport {
199        dry_run,
200        matched_count,
201        eligible_count,
202        replayed_count,
203        skipped_max_replay_attempts,
204        skipped_enqueue_failures,
205        candidates,
206    })
207}