Skip to main content

ralph/webhook/
worker.rs

1//! Webhook worker and delivery logic.
2//!
3//! Responsibilities:
4//! - Manage the background worker thread for webhook delivery.
5//! - Handle HTTP delivery with retries and HMAC signatures.
6//! - Apply backpressure policies for the webhook queue.
7//!
8//! Not handled here:
9//! - Type definitions (see `super::types`).
10//! - Notification convenience functions (see `super::notifications`).
11//! - Diagnostics and replay (see `super::diagnostics`).
12
13use crate::contracts::{WebhookConfig, WebhookQueuePolicy};
14use crossbeam_channel::{Sender, TrySendError, bounded};
15use std::sync::OnceLock;
16use std::time::Duration;
17
18use super::diagnostics;
19use super::types::{ResolvedWebhookConfig, WebhookMessage, WebhookPayload};
20
21/// Global webhook channel pair for backpressure handling.
22/// This is stored in a OnceLock and initialized on first use.
23struct WebhookChannel {
24    sender: Sender<WebhookMessage>,
25    // Note: receiver is moved into the worker thread, not stored here
26}
27
28// Global channel - initialized on first use.
29static CHANNEL: OnceLock<WebhookChannel> = OnceLock::new();
30
31/// Initialize the global webhook worker and channel.
32pub(crate) fn init_worker(config: &WebhookConfig) {
33    // Clamp capacity to valid range (1-10000) to avoid rendezvous channel behavior at 0
34    // Default to 500 for better parallel mode handling (was 100, too small for burst loads)
35    let capacity = config
36        .queue_capacity
37        .map(|c| c.clamp(1, 10000))
38        .unwrap_or(500) as usize;
39
40    // Use get_or_init to ensure thread-safe one-time initialization.
41    // Intentionally ignore return value - we only need to trigger initialization.
42    CHANNEL.get_or_init(|| {
43        let (sender, receiver) = bounded(capacity);
44        diagnostics::set_queue_capacity(capacity);
45
46        // Spawn the worker thread (moves receiver into the closure)
47        std::thread::spawn(move || {
48            log::debug!("Webhook worker started (capacity: {})", capacity);
49
50            while let Ok(msg) = receiver.recv() {
51                diagnostics::note_queue_dequeue();
52                if let Err(e) = deliver_webhook(&msg) {
53                    log::warn!("Webhook delivery failed: {}", e);
54                }
55            }
56
57            log::debug!("Webhook worker shutting down");
58        });
59
60        WebhookChannel {
61            sender: sender.clone(),
62        }
63    });
64}
65
66/// Initialize the global webhook worker with capacity scaled for parallel execution.
67/// Call this instead of relying on implicit init when running in parallel mode.
68///
69/// The effective capacity is calculated as:
70///   base_capacity * max(1, worker_count * parallel_queue_multiplier)
71///
72/// This provides a larger queue buffer for parallel mode where multiple workers
73/// may send webhooks concurrently while the delivery thread is blocked on slow endpoints.
74pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
75    let base_capacity = config
76        .queue_capacity
77        .map(|c| c.clamp(1, 10000))
78        .unwrap_or(500) as usize;
79
80    let multiplier = config
81        .parallel_queue_multiplier
82        .unwrap_or(2.0)
83        .clamp(1.0, 10.0);
84
85    // Scale capacity: base * max(1, workers * multiplier), clamped to max
86    let scaled =
87        (base_capacity as f64 * (worker_count as f64 * multiplier as f64).max(1.0)) as usize;
88    let capacity = scaled.clamp(1, 10000);
89
90    // Use get_or_init to ensure thread-safe one-time initialization.
91    // Intentionally ignore return value - we only need to trigger initialization.
92    CHANNEL.get_or_init(|| {
93        let (sender, receiver) = bounded(capacity);
94        diagnostics::set_queue_capacity(capacity);
95
96        // Spawn the worker thread (moves receiver into the closure)
97        std::thread::spawn(move || {
98            log::debug!(
99                "Webhook worker started (capacity: {}, parallel-optimized for {} workers)",
100                capacity,
101                worker_count
102            );
103
104            while let Ok(msg) = receiver.recv() {
105                diagnostics::note_queue_dequeue();
106                if let Err(e) = deliver_webhook(&msg) {
107                    log::warn!("Webhook delivery failed: {}", e);
108                }
109            }
110
111            log::debug!("Webhook worker shutting down");
112        });
113
114        WebhookChannel {
115            sender: sender.clone(),
116        }
117    });
118}
119
120/// Get the global webhook sender.
121pub(crate) fn get_sender() -> Option<Sender<WebhookMessage>> {
122    CHANNEL.get().map(|ch| ch.sender.clone())
123}
124
125/// Deliver a webhook in the worker thread (blocking, with retries).
126fn deliver_webhook(msg: &WebhookMessage) -> anyhow::Result<()> {
127    let url = msg
128        .config
129        .url
130        .as_ref()
131        .ok_or_else(|| anyhow::anyhow!("Webhook URL not configured"))?;
132
133    let body = serde_json::to_string(&msg.payload)?;
134    let signature = msg
135        .config
136        .secret
137        .as_ref()
138        .map(|secret| generate_signature(&body, secret));
139
140    let mut last_error = None;
141
142    for attempt in 0..=msg.config.retry_count {
143        if attempt > 0 {
144            diagnostics::note_retry_attempt();
145            let backoff = msg.config.retry_backoff.as_millis() as u64 * attempt as u64;
146            std::thread::sleep(Duration::from_millis(backoff));
147            log::debug!("Webhook retry attempt {} after {}ms", attempt, backoff);
148        }
149
150        match send_request(url, &body, signature.as_deref(), msg.config.timeout) {
151            Ok(()) => {
152                diagnostics::note_delivery_success();
153                log::debug!("Webhook delivered successfully to {}", url);
154                return Ok(());
155            }
156            Err(e) => {
157                log::debug!("Webhook attempt {} failed: {}", attempt + 1, e);
158                last_error = Some(e);
159            }
160        }
161    }
162
163    let final_error = last_error.unwrap_or_else(|| anyhow::anyhow!("All webhook attempts failed"));
164    diagnostics::note_delivery_failure(msg, &final_error, msg.config.retry_count.saturating_add(1));
165    Err(final_error)
166}
167
168/// Send a single HTTP POST request.
169fn send_request(
170    url: &str,
171    body: &str,
172    signature: Option<&str>,
173    timeout: Duration,
174) -> anyhow::Result<()> {
175    // In ureq 3.x, we use an Agent with timeout configuration
176    let agent = ureq::Agent::new_with_config(
177        ureq::Agent::config_builder()
178            .timeout_global(Some(timeout))
179            .build(),
180    );
181
182    let mut request = agent
183        .post(url)
184        .header("Content-Type", "application/json")
185        .header("User-Agent", concat!("ralph/", env!("CARGO_PKG_VERSION")));
186
187    if let Some(sig) = signature {
188        request = request.header("X-Ralph-Signature", sig);
189    }
190
191    let response = request.send(body)?;
192
193    let status = response.status();
194
195    if status.is_success() {
196        Ok(())
197    } else {
198        Err(anyhow::anyhow!(
199            "HTTP {}: webhook endpoint returned error",
200            status
201        ))
202    }
203}
204
205/// Generate HMAC-SHA256 signature for webhook payload.
206pub(crate) fn generate_signature(body: &str, secret: &str) -> String {
207    use hmac::{Hmac, Mac};
208    use sha2::Sha256;
209
210    type HmacSha256 = Hmac<Sha256>;
211
212    // HMAC accepts keys of any size per RFC 2104, so this should never fail.
213    // However, we handle the error gracefully rather than panicking.
214    let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) {
215        Ok(mac) => mac,
216        Err(e) => {
217            log::error!("Failed to create HMAC (this should never happen): {}", e);
218            // Return an invalid signature that will fail verification
219            return "sha256=invalid".to_string();
220        }
221    };
222    mac.update(body.as_bytes());
223    let result = mac.finalize();
224    let code_bytes = result.into_bytes();
225
226    format!("sha256={}", hex::encode(code_bytes))
227}
228
229/// Apply the configured backpressure policy for a webhook message.
230pub(crate) fn apply_backpressure_policy(
231    sender: &Sender<WebhookMessage>,
232    msg: WebhookMessage,
233    policy: WebhookQueuePolicy,
234) -> bool {
235    // Clone event details before moving msg into try_send/send_timeout
236    let event_type = msg.payload.event.clone();
237    let task_id = msg
238        .payload
239        .task_id
240        .clone()
241        .unwrap_or_else(|| "loop".to_string());
242
243    match policy {
244        WebhookQueuePolicy::DropOldest => {
245            // Drop new webhooks when queue is full, preserving existing queue contents.
246            // This is functionally equivalent to `drop_new` due to channel constraints
247            // (we cannot pop from the front of the queue from the sender side).
248            match sender.try_send(msg) {
249                Ok(()) => {
250                    diagnostics::note_enqueue_success();
251                    log::debug!("Webhook enqueued for delivery");
252                    true
253                }
254                Err(TrySendError::Full(_)) => {
255                    // Queue is full - drop the new message
256                    diagnostics::note_dropped_message();
257                    log::warn!(
258                        "Webhook queue full (drop_oldest policy); dropping event={} task={}",
259                        event_type,
260                        task_id
261                    );
262                    false
263                }
264                Err(TrySendError::Disconnected(_)) => {
265                    diagnostics::note_dropped_message();
266                    log::error!(
267                        "Webhook worker disconnected; cannot send event={} task={}",
268                        event_type,
269                        task_id
270                    );
271                    false
272                }
273            }
274        }
275        WebhookQueuePolicy::DropNew => match sender.try_send(msg) {
276            Ok(()) => {
277                diagnostics::note_enqueue_success();
278                log::debug!("Webhook enqueued for delivery");
279                true
280            }
281            Err(e) => {
282                diagnostics::note_dropped_message();
283                log::warn!(
284                    "Webhook queue full; dropping event={} task={}: {}",
285                    event_type,
286                    task_id,
287                    e
288                );
289                false
290            }
291        },
292        WebhookQueuePolicy::BlockWithTimeout => {
293            // Block briefly (100ms), then drop if still full
294            match sender.send_timeout(msg, Duration::from_millis(100)) {
295                Ok(()) => {
296                    diagnostics::note_enqueue_success();
297                    log::debug!("Webhook enqueued for delivery");
298                    true
299                }
300                Err(crossbeam_channel::SendTimeoutError::Timeout(_msg)) => {
301                    diagnostics::note_dropped_message();
302                    log::warn!(
303                        "Webhook queue full (timeout); dropping event={} task={}",
304                        event_type,
305                        task_id
306                    );
307                    false
308                }
309                Err(crossbeam_channel::SendTimeoutError::Disconnected(_)) => {
310                    diagnostics::note_dropped_message();
311                    log::error!(
312                        "Webhook worker disconnected; cannot send event={} task={}",
313                        event_type,
314                        task_id
315                    );
316                    false
317                }
318            }
319        }
320    }
321}
322
323/// Enqueue a webhook payload for replay (internal use).
324pub(crate) fn enqueue_webhook_payload_for_replay(
325    payload: WebhookPayload,
326    config: &WebhookConfig,
327) -> bool {
328    send_webhook_payload_internal(payload, config, true)
329}
330
331/// Internal function to send webhook payload.
332pub(crate) fn send_webhook_payload_internal(
333    payload: WebhookPayload,
334    config: &WebhookConfig,
335    bypass_event_filter: bool,
336) -> bool {
337    // Check if webhooks are enabled for this event type
338    if !bypass_event_filter && !config.is_event_enabled(&payload.event) {
339        log::debug!("Webhook for event {} is disabled; skipping", payload.event);
340        return false;
341    }
342
343    let resolved = ResolvedWebhookConfig::from_config(config);
344
345    if !resolved.enabled {
346        log::debug!("Webhooks globally disabled; skipping");
347        return false;
348    }
349
350    let url = match &resolved.url {
351        Some(url) if !url.is_empty() => url.clone(),
352        _ => {
353            log::debug!("Webhook URL not configured; skipping");
354            return false;
355        }
356    };
357
358    // Initialize worker on first use
359    init_worker(config);
360
361    let policy = config.queue_policy.unwrap_or_default();
362
363    let msg = WebhookMessage {
364        payload,
365        config: ResolvedWebhookConfig {
366            enabled: resolved.enabled,
367            url: Some(url),
368            secret: resolved.secret,
369            timeout: resolved.timeout,
370            retry_count: resolved.retry_count,
371            retry_backoff: resolved.retry_backoff,
372        },
373    };
374
375    // Apply backpressure policy
376    match get_sender() {
377        Some(sender) => apply_backpressure_policy(&sender, msg, policy),
378        None => {
379            log::error!("Webhook worker not initialized; cannot send webhook");
380            false
381        }
382    }
383}