1use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
19use anyhow::Context;
20use crossbeam_channel::{Receiver, SendTimeoutError, Sender, TrySendError, bounded, unbounded};
21use std::cmp::Ordering as CmpOrdering;
22use std::collections::BinaryHeap;
23use std::sync::{Arc, OnceLock, RwLock};
24use std::time::{Duration, Instant};
25
26use super::diagnostics;
27use super::types::{ResolvedWebhookConfig, WebhookMessage, WebhookPayload};
28
29const DEFAULT_QUEUE_CAPACITY: usize = 500;
30const DEFAULT_WORKER_COUNT: usize = 4;
31const MAX_QUEUE_CAPACITY: usize = 10_000;
32const MAX_PARALLEL_MULTIPLIER: f64 = 10.0;
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35struct DispatcherSettings {
36 queue_capacity: usize,
37 worker_count: usize,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41enum RuntimeMode {
42 Standard,
43 Parallel { worker_count: u8 },
44}
45
46#[derive(Debug)]
47struct DispatcherState {
48 mode: RuntimeMode,
49 dispatcher: Option<Arc<WebhookDispatcher>>,
50}
51
52impl Default for DispatcherState {
53 fn default() -> Self {
54 Self {
55 mode: RuntimeMode::Standard,
56 dispatcher: None,
57 }
58 }
59}
60
61#[derive(Debug)]
62struct WebhookDispatcher {
63 settings: DispatcherSettings,
64 ready_sender: Sender<DeliveryTask>,
65 retry_sender: Sender<ScheduledRetry>,
66}
67
68#[derive(Debug, Clone)]
69struct DeliveryTask {
70 msg: WebhookMessage,
71 attempt: u32,
72}
73
74#[derive(Debug, Clone)]
75struct ScheduledRetry {
76 ready_at: Instant,
77 task: DeliveryTask,
78}
79
80#[derive(Debug, Clone)]
81struct RetryQueueEntry(ScheduledRetry);
82
83impl PartialEq for RetryQueueEntry {
84 fn eq(&self, other: &Self) -> bool {
85 self.0.ready_at.eq(&other.0.ready_at)
86 }
87}
88
89impl Eq for RetryQueueEntry {}
90
91impl PartialOrd for RetryQueueEntry {
92 fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
93 Some(self.cmp(other))
94 }
95}
96
97impl Ord for RetryQueueEntry {
98 fn cmp(&self, other: &Self) -> CmpOrdering {
99 other.0.ready_at.cmp(&self.0.ready_at)
100 }
101}
102
103static DISPATCHER_STATE: OnceLock<RwLock<DispatcherState>> = OnceLock::new();
104
105fn dispatcher_state() -> &'static RwLock<DispatcherState> {
106 DISPATCHER_STATE.get_or_init(|| RwLock::new(DispatcherState::default()))
107}
108
109impl DispatcherSettings {
110 fn for_mode(config: &WebhookConfig, mode: &RuntimeMode) -> Self {
111 let base_capacity = config
112 .queue_capacity
113 .map(|value| value.clamp(1, MAX_QUEUE_CAPACITY as u32) as usize)
114 .unwrap_or(DEFAULT_QUEUE_CAPACITY);
115
116 match mode {
117 RuntimeMode::Standard => Self {
118 queue_capacity: base_capacity,
119 worker_count: DEFAULT_WORKER_COUNT,
120 },
121 RuntimeMode::Parallel { worker_count } => {
122 let multiplier = config
123 .parallel_queue_multiplier
124 .unwrap_or(2.0)
125 .clamp(1.0, MAX_PARALLEL_MULTIPLIER as f32)
126 as f64;
127 let scaled_capacity =
128 (base_capacity as f64 * (*worker_count as f64 * multiplier).max(1.0)) as usize;
129
130 Self {
131 queue_capacity: scaled_capacity.clamp(1, MAX_QUEUE_CAPACITY),
132 worker_count: usize::max(DEFAULT_WORKER_COUNT, *worker_count as usize),
133 }
134 }
135 }
136 }
137}
138
139impl WebhookDispatcher {
140 fn new(settings: DispatcherSettings) -> Arc<Self> {
141 let (ready_sender, ready_receiver) = bounded(settings.queue_capacity);
142 let (retry_sender, retry_receiver) = unbounded();
143
144 let dispatcher = Arc::new(Self {
145 settings: settings.clone(),
146 ready_sender,
147 retry_sender,
148 });
149
150 diagnostics::set_queue_capacity(settings.queue_capacity);
151
152 for worker_id in 0..settings.worker_count {
153 let ready_receiver = ready_receiver.clone();
154 let retry_sender = dispatcher.retry_sender.clone();
155 let thread_name = format!("ralph-webhook-worker-{worker_id}");
156 std::thread::Builder::new()
157 .name(thread_name)
158 .spawn(move || worker_loop(ready_receiver, retry_sender))
159 .expect("spawn webhook delivery worker");
160 }
161
162 let scheduler_ready = dispatcher.ready_sender.clone();
163 std::thread::Builder::new()
164 .name("ralph-webhook-retry-scheduler".to_string())
165 .spawn(move || retry_scheduler_loop(retry_receiver, scheduler_ready))
166 .expect("spawn webhook retry scheduler");
167
168 log::debug!(
169 "Webhook dispatcher started with {} workers and queue capacity {}",
170 settings.worker_count,
171 settings.queue_capacity
172 );
173
174 dispatcher
175 }
176}
177
178impl Drop for WebhookDispatcher {
179 fn drop(&mut self) {
180 log::debug!(
181 "Webhook dispatcher shutting down (workers: {}, capacity: {})",
182 self.settings.worker_count,
183 self.settings.queue_capacity
184 );
185 }
186}
187
188fn with_dispatcher_state_write<T>(mut f: impl FnMut(&mut DispatcherState) -> T) -> T {
189 match dispatcher_state().write() {
190 Ok(mut guard) => f(&mut guard),
191 Err(poisoned) => {
192 let mut guard = poisoned.into_inner();
193 f(&mut guard)
194 }
195 }
196}
197
198fn dispatcher_for_config(config: &WebhookConfig) -> Arc<WebhookDispatcher> {
199 with_dispatcher_state_write(|state| {
200 let settings = DispatcherSettings::for_mode(config, &state.mode);
201 let needs_rebuild = state
202 .dispatcher
203 .as_ref()
204 .is_none_or(|dispatcher| dispatcher.settings != settings);
205
206 if needs_rebuild {
207 state.dispatcher = Some(WebhookDispatcher::new(settings));
208 }
209
210 state
211 .dispatcher
212 .as_ref()
213 .expect("dispatcher initialized")
214 .clone()
215 })
216}
217
218pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
220 with_dispatcher_state_write(|state| {
221 state.mode = RuntimeMode::Parallel { worker_count };
222 });
223 let _ = dispatcher_for_config(config);
224}
225
226fn worker_loop(ready_receiver: Receiver<DeliveryTask>, retry_sender: Sender<ScheduledRetry>) {
227 while let Ok(task) = ready_receiver.recv() {
228 diagnostics::note_queue_dequeue();
229 handle_delivery_task(task, &retry_sender);
230 }
231}
232
233fn handle_delivery_task(task: DeliveryTask, retry_sender: &Sender<ScheduledRetry>) {
234 match deliver_attempt(&task.msg) {
235 Ok(()) => {
236 diagnostics::note_delivery_success();
237 log::debug!(
238 "Webhook delivered successfully to {}",
239 redact_webhook_destination(
240 task.msg
241 .config
242 .url
243 .as_deref()
244 .unwrap_or("<missing webhook URL>")
245 )
246 );
247 }
248 Err(err) => {
249 if task.attempt < task.msg.config.retry_count {
250 diagnostics::note_retry_attempt();
251
252 let retry_number = task.attempt.saturating_add(1);
253 let scheduled = ScheduledRetry {
254 ready_at: Instant::now()
255 + retry_delay(task.msg.config.retry_backoff, retry_number),
256 task: DeliveryTask {
257 msg: task.msg.clone(),
258 attempt: retry_number,
259 },
260 };
261
262 log::debug!(
263 "Webhook attempt {} failed for {}; scheduling retry: {:#}",
264 retry_number,
265 redact_webhook_destination(
266 task.msg
267 .config
268 .url
269 .as_deref()
270 .unwrap_or("<missing webhook URL>")
271 ),
272 err
273 );
274
275 if let Err(send_err) = retry_sender.send(scheduled) {
276 let scheduler_error =
277 anyhow::anyhow!("retry scheduler unavailable for webhook: {}", send_err);
278 diagnostics::note_delivery_failure(
279 &task.msg,
280 &scheduler_error,
281 retry_number.saturating_add(1),
282 );
283 log::warn!("{scheduler_error:#}");
284 }
285 } else {
286 let attempts = task.attempt.saturating_add(1);
287 diagnostics::note_delivery_failure(&task.msg, &err, attempts);
288 log::warn!(
289 "Webhook delivery failed after {} attempts: {:#}",
290 attempts,
291 err
292 );
293 }
294 }
295 }
296}
297
298fn retry_scheduler_loop(
299 retry_receiver: Receiver<ScheduledRetry>,
300 ready_sender: Sender<DeliveryTask>,
301) {
302 let mut pending = BinaryHeap::<RetryQueueEntry>::new();
303
304 loop {
305 let timeout = pending
306 .peek()
307 .map(|entry| entry.0.ready_at.saturating_duration_since(Instant::now()));
308
309 let scheduled = match timeout {
310 Some(duration) => match retry_receiver.recv_timeout(duration) {
311 Ok(task) => Some(task),
312 Err(crossbeam_channel::RecvTimeoutError::Timeout) => None,
313 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
314 if pending.is_empty() {
315 break;
316 }
317 None
318 }
319 },
320 None => match retry_receiver.recv() {
321 Ok(task) => Some(task),
322 Err(_) => break,
323 },
324 };
325
326 if let Some(task) = scheduled {
327 pending.push(RetryQueueEntry(task));
328 }
329
330 let now = Instant::now();
331 while let Some(entry) = pending.peek() {
332 if entry.0.ready_at > now {
333 break;
334 }
335
336 let RetryQueueEntry(scheduled) = pending.pop().expect("pending retry exists");
337 match ready_sender.send(scheduled.task.clone()) {
338 Ok(()) => diagnostics::note_retry_requeue(),
339 Err(send_err) => {
340 let error = anyhow::anyhow!(
341 "webhook dispatcher shut down before retry enqueue: {send_err}"
342 );
343 diagnostics::note_delivery_failure(
344 &scheduled.task.msg,
345 &error,
346 scheduled.task.attempt.saturating_add(1),
347 );
348 log::warn!("{error:#}");
349 return;
350 }
351 }
352 }
353 }
354}
355
356fn retry_delay(base: Duration, retry_number: u32) -> Duration {
357 let millis = base
358 .as_millis()
359 .saturating_mul(retry_number as u128)
360 .min(u64::MAX as u128) as u64;
361 Duration::from_millis(millis)
362}
363
364fn deliver_attempt(msg: &WebhookMessage) -> anyhow::Result<()> {
365 let url = msg
366 .config
367 .url
368 .as_deref()
369 .ok_or_else(|| anyhow::anyhow!("Webhook URL not configured"))?;
370 let destination = redact_webhook_destination(url);
371
372 let body = serde_json::to_string(&msg.payload)?;
373 let signature = msg
374 .config
375 .secret
376 .as_ref()
377 .map(|secret| generate_signature(&body, secret));
378
379 send_request(url, &body, signature.as_deref(), msg.config.timeout)
380 .with_context(|| format!("webhook delivery to {destination}"))
381}
382
383fn send_request(
384 url: &str,
385 body: &str,
386 signature: Option<&str>,
387 timeout: Duration,
388) -> anyhow::Result<()> {
389 #[cfg(test)]
390 if let Some(handler) = test_transport() {
391 return handler(&TestRequest {
392 url: url.to_string(),
393 body: body.to_string(),
394 signature: signature.map(std::string::ToString::to_string),
395 timeout,
396 });
397 }
398
399 let agent = ureq::Agent::new_with_config(
400 ureq::Agent::config_builder()
401 .timeout_global(Some(timeout))
402 .build(),
403 );
404
405 let mut request = agent
406 .post(url)
407 .header("Content-Type", "application/json")
408 .header("User-Agent", concat!("ralph/", env!("CARGO_PKG_VERSION")));
409
410 if let Some(sig) = signature {
411 request = request.header("X-Ralph-Signature", sig);
412 }
413
414 let response = request.send(body)?;
415 let status = response.status();
416
417 if status.is_success() {
418 Ok(())
419 } else {
420 Err(anyhow::anyhow!(
421 "HTTP {}: webhook endpoint returned error",
422 status
423 ))
424 }
425}
426
427pub(crate) fn redact_webhook_destination(url: &str) -> String {
429 let trimmed = url.trim();
430 if trimmed.is_empty() {
431 return "<missing webhook URL>".to_string();
432 }
433
434 let without_fragment = trimmed.split('#').next().unwrap_or(trimmed);
435 let without_query = without_fragment
436 .split('?')
437 .next()
438 .unwrap_or(without_fragment);
439
440 if let Some((scheme, rest)) = without_query.split_once("://") {
441 let authority_and_path = rest.trim_start_matches('/');
442 let authority = authority_and_path
443 .split('/')
444 .next()
445 .unwrap_or(authority_and_path)
446 .split('@')
447 .next_back()
448 .unwrap_or(authority_and_path);
449
450 if authority.is_empty() {
451 return format!("{scheme}://<redacted>");
452 }
453
454 let has_path = authority_and_path.len() > authority.len();
455 return if has_path {
456 format!("{scheme}://{authority}/…")
457 } else {
458 format!("{scheme}://{authority}")
459 };
460 }
461
462 let without_userinfo = without_query
463 .split('@')
464 .next_back()
465 .unwrap_or(without_query);
466 let host = without_userinfo
467 .split('/')
468 .next()
469 .unwrap_or(without_userinfo);
470
471 if host.is_empty() {
472 "<redacted webhook destination>".to_string()
473 } else if without_userinfo.len() > host.len() {
474 format!("{host}/…")
475 } else {
476 host.to_string()
477 }
478}
479
480pub(crate) fn generate_signature(body: &str, secret: &str) -> String {
482 use hmac::{Hmac, Mac};
483 use sha2::Sha256;
484
485 type HmacSha256 = Hmac<Sha256>;
486
487 let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) {
488 Ok(mac) => mac,
489 Err(e) => {
490 log::error!("Failed to create HMAC (this should never happen): {}", e);
491 return "sha256=invalid".to_string();
492 }
493 };
494 mac.update(body.as_bytes());
495 let result = mac.finalize();
496 let code_bytes = result.into_bytes();
497
498 format!("sha256={}", hex::encode(code_bytes))
499}
500
501fn apply_backpressure_policy(
503 sender: &Sender<DeliveryTask>,
504 msg: DeliveryTask,
505 policy: WebhookQueuePolicy,
506) -> bool {
507 let event_type = msg.msg.payload.event.clone();
508 let task_id = msg
509 .msg
510 .payload
511 .task_id
512 .clone()
513 .unwrap_or_else(|| "loop".to_string());
514
515 match policy {
516 WebhookQueuePolicy::DropOldest | WebhookQueuePolicy::DropNew => {
517 match sender.try_send(msg) {
518 Ok(()) => {
519 diagnostics::note_enqueue_success();
520 log::debug!("Webhook enqueued for delivery");
521 true
522 }
523 Err(TrySendError::Full(_)) => {
524 diagnostics::note_dropped_message();
525 log::warn!(
526 "Webhook queue full; dropping event={} task={}",
527 event_type,
528 task_id
529 );
530 false
531 }
532 Err(TrySendError::Disconnected(_)) => {
533 diagnostics::note_dropped_message();
534 log::error!(
535 "Webhook dispatcher disconnected; cannot send event={} task={}",
536 event_type,
537 task_id
538 );
539 false
540 }
541 }
542 }
543 WebhookQueuePolicy::BlockWithTimeout => {
544 match sender.send_timeout(msg, Duration::from_millis(100)) {
545 Ok(()) => {
546 diagnostics::note_enqueue_success();
547 log::debug!("Webhook enqueued for delivery");
548 true
549 }
550 Err(SendTimeoutError::Timeout(_)) => {
551 diagnostics::note_dropped_message();
552 log::warn!(
553 "Webhook queue full (timeout); dropping event={} task={}",
554 event_type,
555 task_id
556 );
557 false
558 }
559 Err(SendTimeoutError::Disconnected(_)) => {
560 diagnostics::note_dropped_message();
561 log::error!(
562 "Webhook dispatcher disconnected; cannot send event={} task={}",
563 event_type,
564 task_id
565 );
566 false
567 }
568 }
569 }
570 }
571}
572
573pub(crate) fn enqueue_webhook_payload_for_replay(
575 payload: WebhookPayload,
576 config: &WebhookConfig,
577) -> bool {
578 send_webhook_payload_internal(payload, config, true)
579}
580
581pub(crate) fn send_webhook_payload_internal(
583 payload: WebhookPayload,
584 config: &WebhookConfig,
585 bypass_event_filter: bool,
586) -> bool {
587 if !bypass_event_filter && !config.is_event_enabled(&payload.event) {
588 log::debug!("Webhook for event {} is disabled; skipping", payload.event);
589 return false;
590 }
591
592 let resolved = ResolvedWebhookConfig::from_config(config);
593 if !resolved.enabled {
594 log::debug!("Webhooks globally disabled; skipping");
595 return false;
596 }
597
598 let url = match &resolved.url {
599 Some(url) if !url.trim().is_empty() => url.clone(),
600 _ => {
601 log::debug!("Webhook URL not configured; skipping");
602 return false;
603 }
604 };
605
606 let dispatcher = dispatcher_for_config(config);
607 let policy = config.queue_policy.unwrap_or_default();
608 let msg = DeliveryTask {
609 msg: WebhookMessage {
610 payload,
611 config: ResolvedWebhookConfig {
612 enabled: resolved.enabled,
613 url: Some(url),
614 secret: resolved.secret,
615 timeout: resolved.timeout,
616 retry_count: resolved.retry_count,
617 retry_backoff: resolved.retry_backoff,
618 },
619 },
620 attempt: 0,
621 };
622
623 apply_backpressure_policy(&dispatcher.ready_sender, msg, policy)
624}
625
626#[cfg(test)]
627#[derive(Clone, Debug)]
628pub(crate) struct TestRequest {
629 pub(crate) url: String,
630 pub(crate) body: String,
631 pub(crate) signature: Option<String>,
632 pub(crate) timeout: Duration,
633}
634
635#[cfg(test)]
636type TestTransportHandler = Arc<dyn Fn(&TestRequest) -> anyhow::Result<()> + Send + Sync + 'static>;
637
638#[cfg(test)]
639static TEST_TRANSPORT: OnceLock<RwLock<Option<TestTransportHandler>>> = OnceLock::new();
640
641#[cfg(test)]
642fn test_transport() -> Option<TestTransportHandler> {
643 let lock = TEST_TRANSPORT.get_or_init(|| RwLock::new(None));
644 match lock.read() {
645 Ok(guard) => guard.clone(),
646 Err(poisoned) => poisoned.into_inner().clone(),
647 }
648}
649
650#[cfg(test)]
651pub(crate) fn install_test_transport_for_tests(handler: Option<TestTransportHandler>) {
652 let lock = TEST_TRANSPORT.get_or_init(|| RwLock::new(None));
653 match lock.write() {
654 Ok(mut guard) => *guard = handler,
655 Err(poisoned) => {
656 let mut guard = poisoned.into_inner();
657 *guard = handler;
658 }
659 }
660}
661
662#[cfg(test)]
663pub(crate) fn current_dispatcher_settings_for_tests(config: &WebhookConfig) -> (usize, usize) {
664 let dispatcher = dispatcher_for_config(config);
665 (
666 dispatcher.settings.queue_capacity,
667 dispatcher.settings.worker_count,
668 )
669}
670
671#[cfg(test)]
672pub(crate) fn reset_dispatcher_for_tests() {
673 with_dispatcher_state_write(|state| {
674 state.mode = RuntimeMode::Standard;
675 state.dispatcher = None;
676 });
677 install_test_transport_for_tests(None);
678}