1use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
14use crossbeam_channel::{Sender, TrySendError, bounded};
15use std::sync::OnceLock;
16use std::time::Duration;
17
18use super::diagnostics;
19use super::types::{ResolvedWebhookConfig, WebhookMessage, WebhookPayload};
20
21struct WebhookChannel {
24 sender: Sender<WebhookMessage>,
25 }
27
28static CHANNEL: OnceLock<WebhookChannel> = OnceLock::new();
30
31pub(crate) fn init_worker(config: &WebhookConfig) {
33 let capacity = config
36 .queue_capacity
37 .map(|c| c.clamp(1, 10000))
38 .unwrap_or(500) as usize;
39
40 CHANNEL.get_or_init(|| {
43 let (sender, receiver) = bounded(capacity);
44 diagnostics::set_queue_capacity(capacity);
45
46 std::thread::spawn(move || {
48 log::debug!("Webhook worker started (capacity: {})", capacity);
49
50 while let Ok(msg) = receiver.recv() {
51 diagnostics::note_queue_dequeue();
52 if let Err(e) = deliver_webhook(&msg) {
53 log::warn!("Webhook delivery failed: {}", e);
54 }
55 }
56
57 log::debug!("Webhook worker shutting down");
58 });
59
60 WebhookChannel {
61 sender: sender.clone(),
62 }
63 });
64}
65
66pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
75 let base_capacity = config
76 .queue_capacity
77 .map(|c| c.clamp(1, 10000))
78 .unwrap_or(500) as usize;
79
80 let multiplier = config
81 .parallel_queue_multiplier
82 .unwrap_or(2.0)
83 .clamp(1.0, 10.0);
84
85 let scaled =
87 (base_capacity as f64 * (worker_count as f64 * multiplier as f64).max(1.0)) as usize;
88 let capacity = scaled.clamp(1, 10000);
89
90 CHANNEL.get_or_init(|| {
93 let (sender, receiver) = bounded(capacity);
94 diagnostics::set_queue_capacity(capacity);
95
96 std::thread::spawn(move || {
98 log::debug!(
99 "Webhook worker started (capacity: {}, parallel-optimized for {} workers)",
100 capacity,
101 worker_count
102 );
103
104 while let Ok(msg) = receiver.recv() {
105 diagnostics::note_queue_dequeue();
106 if let Err(e) = deliver_webhook(&msg) {
107 log::warn!("Webhook delivery failed: {}", e);
108 }
109 }
110
111 log::debug!("Webhook worker shutting down");
112 });
113
114 WebhookChannel {
115 sender: sender.clone(),
116 }
117 });
118}
119
120pub(crate) fn get_sender() -> Option<Sender<WebhookMessage>> {
122 CHANNEL.get().map(|ch| ch.sender.clone())
123}
124
125fn deliver_webhook(msg: &WebhookMessage) -> anyhow::Result<()> {
127 let url = msg
128 .config
129 .url
130 .as_ref()
131 .ok_or_else(|| anyhow::anyhow!("Webhook URL not configured"))?;
132
133 let body = serde_json::to_string(&msg.payload)?;
134 let signature = msg
135 .config
136 .secret
137 .as_ref()
138 .map(|secret| generate_signature(&body, secret));
139
140 let mut last_error = None;
141
142 for attempt in 0..=msg.config.retry_count {
143 if attempt > 0 {
144 diagnostics::note_retry_attempt();
145 let backoff = msg.config.retry_backoff.as_millis() as u64 * attempt as u64;
146 std::thread::sleep(Duration::from_millis(backoff));
147 log::debug!("Webhook retry attempt {} after {}ms", attempt, backoff);
148 }
149
150 match send_request(url, &body, signature.as_deref(), msg.config.timeout) {
151 Ok(()) => {
152 diagnostics::note_delivery_success();
153 log::debug!("Webhook delivered successfully to {}", url);
154 return Ok(());
155 }
156 Err(e) => {
157 log::debug!("Webhook attempt {} failed: {}", attempt + 1, e);
158 last_error = Some(e);
159 }
160 }
161 }
162
163 let final_error = last_error.unwrap_or_else(|| anyhow::anyhow!("All webhook attempts failed"));
164 diagnostics::note_delivery_failure(msg, &final_error, msg.config.retry_count.saturating_add(1));
165 Err(final_error)
166}
167
168fn send_request(
170 url: &str,
171 body: &str,
172 signature: Option<&str>,
173 timeout: Duration,
174) -> anyhow::Result<()> {
175 let agent = ureq::Agent::new_with_config(
177 ureq::Agent::config_builder()
178 .timeout_global(Some(timeout))
179 .build(),
180 );
181
182 let mut request = agent
183 .post(url)
184 .header("Content-Type", "application/json")
185 .header("User-Agent", concat!("ralph/", env!("CARGO_PKG_VERSION")));
186
187 if let Some(sig) = signature {
188 request = request.header("X-Ralph-Signature", sig);
189 }
190
191 let response = request.send(body)?;
192
193 let status = response.status();
194
195 if status.is_success() {
196 Ok(())
197 } else {
198 Err(anyhow::anyhow!(
199 "HTTP {}: webhook endpoint returned error",
200 status
201 ))
202 }
203}
204
205pub(crate) fn generate_signature(body: &str, secret: &str) -> String {
207 use hmac::{Hmac, Mac};
208 use sha2::Sha256;
209
210 type HmacSha256 = Hmac<Sha256>;
211
212 let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) {
215 Ok(mac) => mac,
216 Err(e) => {
217 log::error!("Failed to create HMAC (this should never happen): {}", e);
218 return "sha256=invalid".to_string();
220 }
221 };
222 mac.update(body.as_bytes());
223 let result = mac.finalize();
224 let code_bytes = result.into_bytes();
225
226 format!("sha256={}", hex::encode(code_bytes))
227}
228
229pub(crate) fn apply_backpressure_policy(
231 sender: &Sender<WebhookMessage>,
232 msg: WebhookMessage,
233 policy: WebhookQueuePolicy,
234) -> bool {
235 let event_type = msg.payload.event.clone();
237 let task_id = msg
238 .payload
239 .task_id
240 .clone()
241 .unwrap_or_else(|| "loop".to_string());
242
243 match policy {
244 WebhookQueuePolicy::DropOldest => {
245 match sender.try_send(msg) {
249 Ok(()) => {
250 diagnostics::note_enqueue_success();
251 log::debug!("Webhook enqueued for delivery");
252 true
253 }
254 Err(TrySendError::Full(_)) => {
255 diagnostics::note_dropped_message();
257 log::warn!(
258 "Webhook queue full (drop_oldest policy); dropping event={} task={}",
259 event_type,
260 task_id
261 );
262 false
263 }
264 Err(TrySendError::Disconnected(_)) => {
265 diagnostics::note_dropped_message();
266 log::error!(
267 "Webhook worker disconnected; cannot send event={} task={}",
268 event_type,
269 task_id
270 );
271 false
272 }
273 }
274 }
275 WebhookQueuePolicy::DropNew => match sender.try_send(msg) {
276 Ok(()) => {
277 diagnostics::note_enqueue_success();
278 log::debug!("Webhook enqueued for delivery");
279 true
280 }
281 Err(e) => {
282 diagnostics::note_dropped_message();
283 log::warn!(
284 "Webhook queue full; dropping event={} task={}: {}",
285 event_type,
286 task_id,
287 e
288 );
289 false
290 }
291 },
292 WebhookQueuePolicy::BlockWithTimeout => {
293 match sender.send_timeout(msg, Duration::from_millis(100)) {
295 Ok(()) => {
296 diagnostics::note_enqueue_success();
297 log::debug!("Webhook enqueued for delivery");
298 true
299 }
300 Err(crossbeam_channel::SendTimeoutError::Timeout(_msg)) => {
301 diagnostics::note_dropped_message();
302 log::warn!(
303 "Webhook queue full (timeout); dropping event={} task={}",
304 event_type,
305 task_id
306 );
307 false
308 }
309 Err(crossbeam_channel::SendTimeoutError::Disconnected(_)) => {
310 diagnostics::note_dropped_message();
311 log::error!(
312 "Webhook worker disconnected; cannot send event={} task={}",
313 event_type,
314 task_id
315 );
316 false
317 }
318 }
319 }
320 }
321}
322
323pub(crate) fn enqueue_webhook_payload_for_replay(
325 payload: WebhookPayload,
326 config: &WebhookConfig,
327) -> bool {
328 send_webhook_payload_internal(payload, config, true)
329}
330
331pub(crate) fn send_webhook_payload_internal(
333 payload: WebhookPayload,
334 config: &WebhookConfig,
335 bypass_event_filter: bool,
336) -> bool {
337 if !bypass_event_filter && !config.is_event_enabled(&payload.event) {
339 log::debug!("Webhook for event {} is disabled; skipping", payload.event);
340 return false;
341 }
342
343 let resolved = ResolvedWebhookConfig::from_config(config);
344
345 if !resolved.enabled {
346 log::debug!("Webhooks globally disabled; skipping");
347 return false;
348 }
349
350 let url = match &resolved.url {
351 Some(url) if !url.is_empty() => url.clone(),
352 _ => {
353 log::debug!("Webhook URL not configured; skipping");
354 return false;
355 }
356 };
357
358 init_worker(config);
360
361 let policy = config.queue_policy.unwrap_or_default();
362
363 let msg = WebhookMessage {
364 payload,
365 config: ResolvedWebhookConfig {
366 enabled: resolved.enabled,
367 url: Some(url),
368 secret: resolved.secret,
369 timeout: resolved.timeout,
370 retry_count: resolved.retry_count,
371 retry_backoff: resolved.retry_backoff,
372 },
373 };
374
375 match get_sender() {
377 Some(sender) => apply_backpressure_policy(&sender, msg, policy),
378 None => {
379 log::error!("Webhook worker not initialized; cannot send webhook");
380 false
381 }
382 }
383}