Skip to main content

ralph/webhook/
worker.rs

1//! Webhook worker runtime and delivery logic.
2//!
3//! Responsibilities:
4//! - Manage the reloadable in-process webhook dispatcher and worker pool.
5//! - Handle HTTP delivery attempts, retry scheduling, and HMAC signatures.
6//! - Apply backpressure policies for the ready queue before work enters the pool.
7//!
8//! Not handled here:
9//! - Type definitions (see `super::types`).
10//! - Notification convenience functions (see `super::notifications`).
11//! - Diagnostics persistence and replay selection (see `super::diagnostics`).
12//!
13//! Invariants/assumptions:
14//! - Delivery runtime settings are derived deterministically from the active mode + config.
15//! - Retries are scheduled off the hot worker path so one failing endpoint does not stall peers.
16//! - All human-visible destinations must be rendered through `redact_webhook_destination`.
17
18use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
19use anyhow::Context;
20use crossbeam_channel::{Receiver, SendTimeoutError, Sender, TrySendError, bounded, unbounded};
21use std::cmp::Ordering as CmpOrdering;
22use std::collections::BinaryHeap;
23use std::sync::{Arc, OnceLock, RwLock};
24use std::time::{Duration, Instant};
25
26use super::diagnostics;
27use super::types::{ResolvedWebhookConfig, WebhookMessage, WebhookPayload};
28
29const DEFAULT_QUEUE_CAPACITY: usize = 500;
30const DEFAULT_WORKER_COUNT: usize = 4;
31const MAX_QUEUE_CAPACITY: usize = 10_000;
32const MAX_PARALLEL_MULTIPLIER: f64 = 10.0;
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35struct DispatcherSettings {
36    queue_capacity: usize,
37    worker_count: usize,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41enum RuntimeMode {
42    Standard,
43    Parallel { worker_count: u8 },
44}
45
46#[derive(Debug)]
47struct DispatcherState {
48    mode: RuntimeMode,
49    dispatcher: Option<Arc<WebhookDispatcher>>,
50}
51
52impl Default for DispatcherState {
53    fn default() -> Self {
54        Self {
55            mode: RuntimeMode::Standard,
56            dispatcher: None,
57        }
58    }
59}
60
61#[derive(Debug)]
62struct WebhookDispatcher {
63    settings: DispatcherSettings,
64    ready_sender: Sender<DeliveryTask>,
65    retry_sender: Sender<ScheduledRetry>,
66}
67
68#[derive(Debug, Clone)]
69struct DeliveryTask {
70    msg: WebhookMessage,
71    attempt: u32,
72}
73
74#[derive(Debug, Clone)]
75struct ScheduledRetry {
76    ready_at: Instant,
77    task: DeliveryTask,
78}
79
80#[derive(Debug, Clone)]
81struct RetryQueueEntry(ScheduledRetry);
82
83impl PartialEq for RetryQueueEntry {
84    fn eq(&self, other: &Self) -> bool {
85        self.0.ready_at.eq(&other.0.ready_at)
86    }
87}
88
89impl Eq for RetryQueueEntry {}
90
91impl PartialOrd for RetryQueueEntry {
92    fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
93        Some(self.cmp(other))
94    }
95}
96
97impl Ord for RetryQueueEntry {
98    fn cmp(&self, other: &Self) -> CmpOrdering {
99        other.0.ready_at.cmp(&self.0.ready_at)
100    }
101}
102
103static DISPATCHER_STATE: OnceLock<RwLock<DispatcherState>> = OnceLock::new();
104
105fn dispatcher_state() -> &'static RwLock<DispatcherState> {
106    DISPATCHER_STATE.get_or_init(|| RwLock::new(DispatcherState::default()))
107}
108
109impl DispatcherSettings {
110    fn for_mode(config: &WebhookConfig, mode: &RuntimeMode) -> Self {
111        let base_capacity = config
112            .queue_capacity
113            .map(|value| value.clamp(1, MAX_QUEUE_CAPACITY as u32) as usize)
114            .unwrap_or(DEFAULT_QUEUE_CAPACITY);
115
116        match mode {
117            RuntimeMode::Standard => Self {
118                queue_capacity: base_capacity,
119                worker_count: DEFAULT_WORKER_COUNT,
120            },
121            RuntimeMode::Parallel { worker_count } => {
122                let multiplier = config
123                    .parallel_queue_multiplier
124                    .unwrap_or(2.0)
125                    .clamp(1.0, MAX_PARALLEL_MULTIPLIER as f32)
126                    as f64;
127                let scaled_capacity =
128                    (base_capacity as f64 * (*worker_count as f64 * multiplier).max(1.0)) as usize;
129
130                Self {
131                    queue_capacity: scaled_capacity.clamp(1, MAX_QUEUE_CAPACITY),
132                    worker_count: usize::max(DEFAULT_WORKER_COUNT, *worker_count as usize),
133                }
134            }
135        }
136    }
137}
138
139impl WebhookDispatcher {
140    fn new(settings: DispatcherSettings) -> Arc<Self> {
141        let (ready_sender, ready_receiver) = bounded(settings.queue_capacity);
142        let (retry_sender, retry_receiver) = unbounded();
143
144        let dispatcher = Arc::new(Self {
145            settings: settings.clone(),
146            ready_sender,
147            retry_sender,
148        });
149
150        diagnostics::set_queue_capacity(settings.queue_capacity);
151
152        for worker_id in 0..settings.worker_count {
153            let ready_receiver = ready_receiver.clone();
154            let retry_sender = dispatcher.retry_sender.clone();
155            let thread_name = format!("ralph-webhook-worker-{worker_id}");
156            std::thread::Builder::new()
157                .name(thread_name)
158                .spawn(move || worker_loop(ready_receiver, retry_sender))
159                .expect("spawn webhook delivery worker");
160        }
161
162        let scheduler_ready = dispatcher.ready_sender.clone();
163        std::thread::Builder::new()
164            .name("ralph-webhook-retry-scheduler".to_string())
165            .spawn(move || retry_scheduler_loop(retry_receiver, scheduler_ready))
166            .expect("spawn webhook retry scheduler");
167
168        log::debug!(
169            "Webhook dispatcher started with {} workers and queue capacity {}",
170            settings.worker_count,
171            settings.queue_capacity
172        );
173
174        dispatcher
175    }
176}
177
178impl Drop for WebhookDispatcher {
179    fn drop(&mut self) {
180        log::debug!(
181            "Webhook dispatcher shutting down (workers: {}, capacity: {})",
182            self.settings.worker_count,
183            self.settings.queue_capacity
184        );
185    }
186}
187
188fn with_dispatcher_state_write<T>(mut f: impl FnMut(&mut DispatcherState) -> T) -> T {
189    match dispatcher_state().write() {
190        Ok(mut guard) => f(&mut guard),
191        Err(poisoned) => {
192            let mut guard = poisoned.into_inner();
193            f(&mut guard)
194        }
195    }
196}
197
198fn dispatcher_for_config(config: &WebhookConfig) -> Arc<WebhookDispatcher> {
199    with_dispatcher_state_write(|state| {
200        let settings = DispatcherSettings::for_mode(config, &state.mode);
201        let needs_rebuild = state
202            .dispatcher
203            .as_ref()
204            .is_none_or(|dispatcher| dispatcher.settings != settings);
205
206        if needs_rebuild {
207            state.dispatcher = Some(WebhookDispatcher::new(settings));
208        }
209
210        state
211            .dispatcher
212            .as_ref()
213            .expect("dispatcher initialized")
214            .clone()
215    })
216}
217
218/// Initialize the webhook dispatcher with capacity scaled for parallel execution.
219pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
220    with_dispatcher_state_write(|state| {
221        state.mode = RuntimeMode::Parallel { worker_count };
222    });
223    let _ = dispatcher_for_config(config);
224}
225
226fn worker_loop(ready_receiver: Receiver<DeliveryTask>, retry_sender: Sender<ScheduledRetry>) {
227    while let Ok(task) = ready_receiver.recv() {
228        diagnostics::note_queue_dequeue();
229        handle_delivery_task(task, &retry_sender);
230    }
231}
232
233fn handle_delivery_task(task: DeliveryTask, retry_sender: &Sender<ScheduledRetry>) {
234    match deliver_attempt(&task.msg) {
235        Ok(()) => {
236            diagnostics::note_delivery_success();
237            log::debug!(
238                "Webhook delivered successfully to {}",
239                redact_webhook_destination(
240                    task.msg
241                        .config
242                        .url
243                        .as_deref()
244                        .unwrap_or("<missing webhook URL>")
245                )
246            );
247        }
248        Err(err) => {
249            if task.attempt < task.msg.config.retry_count {
250                diagnostics::note_retry_attempt();
251
252                let retry_number = task.attempt.saturating_add(1);
253                let scheduled = ScheduledRetry {
254                    ready_at: Instant::now()
255                        + retry_delay(task.msg.config.retry_backoff, retry_number),
256                    task: DeliveryTask {
257                        msg: task.msg.clone(),
258                        attempt: retry_number,
259                    },
260                };
261
262                log::debug!(
263                    "Webhook attempt {} failed for {}; scheduling retry: {:#}",
264                    retry_number,
265                    redact_webhook_destination(
266                        task.msg
267                            .config
268                            .url
269                            .as_deref()
270                            .unwrap_or("<missing webhook URL>")
271                    ),
272                    err
273                );
274
275                if let Err(send_err) = retry_sender.send(scheduled) {
276                    let scheduler_error =
277                        anyhow::anyhow!("retry scheduler unavailable for webhook: {}", send_err);
278                    diagnostics::note_delivery_failure(
279                        &task.msg,
280                        &scheduler_error,
281                        retry_number.saturating_add(1),
282                    );
283                    log::warn!("{scheduler_error:#}");
284                }
285            } else {
286                let attempts = task.attempt.saturating_add(1);
287                diagnostics::note_delivery_failure(&task.msg, &err, attempts);
288                log::warn!(
289                    "Webhook delivery failed after {} attempts: {:#}",
290                    attempts,
291                    err
292                );
293            }
294        }
295    }
296}
297
298fn retry_scheduler_loop(
299    retry_receiver: Receiver<ScheduledRetry>,
300    ready_sender: Sender<DeliveryTask>,
301) {
302    let mut pending = BinaryHeap::<RetryQueueEntry>::new();
303
304    loop {
305        let timeout = pending
306            .peek()
307            .map(|entry| entry.0.ready_at.saturating_duration_since(Instant::now()));
308
309        let scheduled = match timeout {
310            Some(duration) => match retry_receiver.recv_timeout(duration) {
311                Ok(task) => Some(task),
312                Err(crossbeam_channel::RecvTimeoutError::Timeout) => None,
313                Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
314                    if pending.is_empty() {
315                        break;
316                    }
317                    None
318                }
319            },
320            None => match retry_receiver.recv() {
321                Ok(task) => Some(task),
322                Err(_) => break,
323            },
324        };
325
326        if let Some(task) = scheduled {
327            pending.push(RetryQueueEntry(task));
328        }
329
330        let now = Instant::now();
331        while let Some(entry) = pending.peek() {
332            if entry.0.ready_at > now {
333                break;
334            }
335
336            let RetryQueueEntry(scheduled) = pending.pop().expect("pending retry exists");
337            match ready_sender.send(scheduled.task.clone()) {
338                Ok(()) => diagnostics::note_retry_requeue(),
339                Err(send_err) => {
340                    let error = anyhow::anyhow!(
341                        "webhook dispatcher shut down before retry enqueue: {send_err}"
342                    );
343                    diagnostics::note_delivery_failure(
344                        &scheduled.task.msg,
345                        &error,
346                        scheduled.task.attempt.saturating_add(1),
347                    );
348                    log::warn!("{error:#}");
349                    return;
350                }
351            }
352        }
353    }
354}
355
356fn retry_delay(base: Duration, retry_number: u32) -> Duration {
357    let millis = base
358        .as_millis()
359        .saturating_mul(retry_number as u128)
360        .min(u64::MAX as u128) as u64;
361    Duration::from_millis(millis)
362}
363
364fn deliver_attempt(msg: &WebhookMessage) -> anyhow::Result<()> {
365    let url = msg
366        .config
367        .url
368        .as_deref()
369        .ok_or_else(|| anyhow::anyhow!("Webhook URL not configured"))?;
370    let destination = redact_webhook_destination(url);
371
372    let body = serde_json::to_string(&msg.payload)?;
373    let signature = msg
374        .config
375        .secret
376        .as_ref()
377        .map(|secret| generate_signature(&body, secret));
378
379    send_request(url, &body, signature.as_deref(), msg.config.timeout)
380        .with_context(|| format!("webhook delivery to {destination}"))
381}
382
383fn send_request(
384    url: &str,
385    body: &str,
386    signature: Option<&str>,
387    timeout: Duration,
388) -> anyhow::Result<()> {
389    #[cfg(test)]
390    if let Some(handler) = test_transport() {
391        return handler(&TestRequest {
392            url: url.to_string(),
393            body: body.to_string(),
394            signature: signature.map(std::string::ToString::to_string),
395            timeout,
396        });
397    }
398
399    let agent = ureq::Agent::new_with_config(
400        ureq::Agent::config_builder()
401            .timeout_global(Some(timeout))
402            .build(),
403    );
404
405    let mut request = agent
406        .post(url)
407        .header("Content-Type", "application/json")
408        .header("User-Agent", concat!("ralph/", env!("CARGO_PKG_VERSION")));
409
410    if let Some(sig) = signature {
411        request = request.header("X-Ralph-Signature", sig);
412    }
413
414    let response = request.send(body)?;
415    let status = response.status();
416
417    if status.is_success() {
418        Ok(())
419    } else {
420        Err(anyhow::anyhow!(
421            "HTTP {}: webhook endpoint returned error",
422            status
423        ))
424    }
425}
426
427/// Render a webhook destination for logs and diagnostics without leaking secrets.
428pub(crate) fn redact_webhook_destination(url: &str) -> String {
429    let trimmed = url.trim();
430    if trimmed.is_empty() {
431        return "<missing webhook URL>".to_string();
432    }
433
434    let without_fragment = trimmed.split('#').next().unwrap_or(trimmed);
435    let without_query = without_fragment
436        .split('?')
437        .next()
438        .unwrap_or(without_fragment);
439
440    if let Some((scheme, rest)) = without_query.split_once("://") {
441        let authority_and_path = rest.trim_start_matches('/');
442        let authority = authority_and_path
443            .split('/')
444            .next()
445            .unwrap_or(authority_and_path)
446            .split('@')
447            .next_back()
448            .unwrap_or(authority_and_path);
449
450        if authority.is_empty() {
451            return format!("{scheme}://<redacted>");
452        }
453
454        let has_path = authority_and_path.len() > authority.len();
455        return if has_path {
456            format!("{scheme}://{authority}/…")
457        } else {
458            format!("{scheme}://{authority}")
459        };
460    }
461
462    let without_userinfo = without_query
463        .split('@')
464        .next_back()
465        .unwrap_or(without_query);
466    let host = without_userinfo
467        .split('/')
468        .next()
469        .unwrap_or(without_userinfo);
470
471    if host.is_empty() {
472        "<redacted webhook destination>".to_string()
473    } else if without_userinfo.len() > host.len() {
474        format!("{host}/…")
475    } else {
476        host.to_string()
477    }
478}
479
480/// Generate HMAC-SHA256 signature for webhook payload.
481pub(crate) fn generate_signature(body: &str, secret: &str) -> String {
482    use hmac::{Hmac, Mac};
483    use sha2::Sha256;
484
485    type HmacSha256 = Hmac<Sha256>;
486
487    let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) {
488        Ok(mac) => mac,
489        Err(e) => {
490            log::error!("Failed to create HMAC (this should never happen): {}", e);
491            return "sha256=invalid".to_string();
492        }
493    };
494    mac.update(body.as_bytes());
495    let result = mac.finalize();
496    let code_bytes = result.into_bytes();
497
498    format!("sha256={}", hex::encode(code_bytes))
499}
500
501/// Apply the configured backpressure policy for a webhook message.
502fn apply_backpressure_policy(
503    sender: &Sender<DeliveryTask>,
504    msg: DeliveryTask,
505    policy: WebhookQueuePolicy,
506) -> bool {
507    let event_type = msg.msg.payload.event.clone();
508    let task_id = msg
509        .msg
510        .payload
511        .task_id
512        .clone()
513        .unwrap_or_else(|| "loop".to_string());
514
515    match policy {
516        WebhookQueuePolicy::DropOldest | WebhookQueuePolicy::DropNew => {
517            match sender.try_send(msg) {
518                Ok(()) => {
519                    diagnostics::note_enqueue_success();
520                    log::debug!("Webhook enqueued for delivery");
521                    true
522                }
523                Err(TrySendError::Full(_)) => {
524                    diagnostics::note_dropped_message();
525                    log::warn!(
526                        "Webhook queue full; dropping event={} task={}",
527                        event_type,
528                        task_id
529                    );
530                    false
531                }
532                Err(TrySendError::Disconnected(_)) => {
533                    diagnostics::note_dropped_message();
534                    log::error!(
535                        "Webhook dispatcher disconnected; cannot send event={} task={}",
536                        event_type,
537                        task_id
538                    );
539                    false
540                }
541            }
542        }
543        WebhookQueuePolicy::BlockWithTimeout => {
544            match sender.send_timeout(msg, Duration::from_millis(100)) {
545                Ok(()) => {
546                    diagnostics::note_enqueue_success();
547                    log::debug!("Webhook enqueued for delivery");
548                    true
549                }
550                Err(SendTimeoutError::Timeout(_)) => {
551                    diagnostics::note_dropped_message();
552                    log::warn!(
553                        "Webhook queue full (timeout); dropping event={} task={}",
554                        event_type,
555                        task_id
556                    );
557                    false
558                }
559                Err(SendTimeoutError::Disconnected(_)) => {
560                    diagnostics::note_dropped_message();
561                    log::error!(
562                        "Webhook dispatcher disconnected; cannot send event={} task={}",
563                        event_type,
564                        task_id
565                    );
566                    false
567                }
568            }
569        }
570    }
571}
572
573/// Enqueue a webhook payload for replay (internal use).
574pub(crate) fn enqueue_webhook_payload_for_replay(
575    payload: WebhookPayload,
576    config: &WebhookConfig,
577) -> bool {
578    send_webhook_payload_internal(payload, config, true)
579}
580
581/// Internal function to send a webhook payload.
582pub(crate) fn send_webhook_payload_internal(
583    payload: WebhookPayload,
584    config: &WebhookConfig,
585    bypass_event_filter: bool,
586) -> bool {
587    if !bypass_event_filter && !config.is_event_enabled(&payload.event) {
588        log::debug!("Webhook for event {} is disabled; skipping", payload.event);
589        return false;
590    }
591
592    let resolved = ResolvedWebhookConfig::from_config(config);
593    if !resolved.enabled {
594        log::debug!("Webhooks globally disabled; skipping");
595        return false;
596    }
597
598    let url = match &resolved.url {
599        Some(url) if !url.trim().is_empty() => url.clone(),
600        _ => {
601            log::debug!("Webhook URL not configured; skipping");
602            return false;
603        }
604    };
605
606    let dispatcher = dispatcher_for_config(config);
607    let policy = config.queue_policy.unwrap_or_default();
608    let msg = DeliveryTask {
609        msg: WebhookMessage {
610            payload,
611            config: ResolvedWebhookConfig {
612                enabled: resolved.enabled,
613                url: Some(url),
614                secret: resolved.secret,
615                timeout: resolved.timeout,
616                retry_count: resolved.retry_count,
617                retry_backoff: resolved.retry_backoff,
618            },
619        },
620        attempt: 0,
621    };
622
623    apply_backpressure_policy(&dispatcher.ready_sender, msg, policy)
624}
625
626#[cfg(test)]
627#[derive(Clone, Debug)]
628pub(crate) struct TestRequest {
629    pub(crate) url: String,
630    pub(crate) body: String,
631    pub(crate) signature: Option<String>,
632    pub(crate) timeout: Duration,
633}
634
635#[cfg(test)]
636type TestTransportHandler = Arc<dyn Fn(&TestRequest) -> anyhow::Result<()> + Send + Sync + 'static>;
637
638#[cfg(test)]
639static TEST_TRANSPORT: OnceLock<RwLock<Option<TestTransportHandler>>> = OnceLock::new();
640
641#[cfg(test)]
642fn test_transport() -> Option<TestTransportHandler> {
643    let lock = TEST_TRANSPORT.get_or_init(|| RwLock::new(None));
644    match lock.read() {
645        Ok(guard) => guard.clone(),
646        Err(poisoned) => poisoned.into_inner().clone(),
647    }
648}
649
650#[cfg(test)]
651pub(crate) fn install_test_transport_for_tests(handler: Option<TestTransportHandler>) {
652    let lock = TEST_TRANSPORT.get_or_init(|| RwLock::new(None));
653    match lock.write() {
654        Ok(mut guard) => *guard = handler,
655        Err(poisoned) => {
656            let mut guard = poisoned.into_inner();
657            *guard = handler;
658        }
659    }
660}
661
662#[cfg(test)]
663pub(crate) fn current_dispatcher_settings_for_tests(config: &WebhookConfig) -> (usize, usize) {
664    let dispatcher = dispatcher_for_config(config);
665    (
666        dispatcher.settings.queue_capacity,
667        dispatcher.settings.worker_count,
668    )
669}
670
671#[cfg(test)]
672pub(crate) fn reset_dispatcher_for_tests() {
673    with_dispatcher_state_write(|state| {
674        state.mode = RuntimeMode::Standard;
675        state.dispatcher = None;
676    });
677    install_test_transport_for_tests(None);
678}