Skip to main content

ralph/webhook/worker/
runtime.rs

1//! Purpose: Own the reloadable webhook dispatcher runtime and its worker/scheduler lifecycle.
2//!
3//! Responsibilities:
4//! - Build and rebuild dispatcher state from webhook runtime mode and config.
5//! - Start delivery workers and the retry scheduler with deterministic startup behavior.
6//! - Route ready delivery tasks to delivery helpers without blocking enqueue callers.
7//!
8//! Scope:
9//! - Dispatcher lifecycle, thread startup/teardown, queue sizing, and retry scheduling orchestration.
10//!
11//! Usage:
12//! - Called by webhook enqueue helpers and test-only runtime controls through the worker facade.
13//!
14//! Invariants/Assumptions:
15//! - Runtime settings are rebuilt when the effective mode/config changes.
16//! - Retry scheduling stays off worker threads so failing endpoints do not sleep in place.
17//! - Dispatcher teardown must not leak background threads or retain stale queue channels across rebuilds.
18
19use crate::contracts::WebhookConfig;
20use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
21use std::cmp::Ordering as CmpOrdering;
22use std::collections::BinaryHeap;
23use std::sync::{Arc, OnceLock, RwLock, Weak};
24use std::time::{Duration, Instant};
25
26use super::super::diagnostics;
27use super::super::types::WebhookMessage;
28use super::delivery::handle_delivery_task;
29
30const DEFAULT_QUEUE_CAPACITY: usize = 500;
31const DEFAULT_WORKER_COUNT: usize = 4;
32const MAX_QUEUE_CAPACITY: usize = 10_000;
33const MAX_PARALLEL_MULTIPLIER: f64 = 10.0;
34const DISPATCHER_STARTUP_TIMEOUT: Duration = Duration::from_secs(1);
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub(super) struct DispatcherSettings {
38    pub(super) queue_capacity: usize,
39    pub(super) worker_count: usize,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43enum RuntimeMode {
44    Standard,
45    Parallel { worker_count: u8 },
46}
47
48#[derive(Debug)]
49struct DispatcherState {
50    mode: RuntimeMode,
51    dispatcher: Option<Arc<WebhookDispatcher>>,
52}
53
54impl Default for DispatcherState {
55    fn default() -> Self {
56        Self {
57            mode: RuntimeMode::Standard,
58            dispatcher: None,
59        }
60    }
61}
62
63#[derive(Debug)]
64pub(super) struct WebhookDispatcher {
65    pub(super) settings: DispatcherSettings,
66    pub(super) ready_sender: Arc<Sender<DeliveryTask>>,
67    retry_sender: Arc<Sender<ScheduledRetry>>,
68}
69
70#[derive(Debug, Clone)]
71pub(super) struct DeliveryTask {
72    pub(super) msg: WebhookMessage,
73    pub(super) attempt: u32,
74}
75
76#[derive(Debug, Clone)]
77pub(super) struct ScheduledRetry {
78    pub(super) ready_at: Instant,
79    pub(super) task: DeliveryTask,
80}
81
82#[derive(Debug, Clone)]
83struct RetryQueueEntry(ScheduledRetry);
84
85impl PartialEq for RetryQueueEntry {
86    fn eq(&self, other: &Self) -> bool {
87        self.0.ready_at.eq(&other.0.ready_at)
88    }
89}
90
91impl Eq for RetryQueueEntry {}
92
93impl PartialOrd for RetryQueueEntry {
94    fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
95        Some(self.cmp(other))
96    }
97}
98
99impl Ord for RetryQueueEntry {
100    fn cmp(&self, other: &Self) -> CmpOrdering {
101        other.0.ready_at.cmp(&self.0.ready_at)
102    }
103}
104
105static DISPATCHER_STATE: OnceLock<RwLock<DispatcherState>> = OnceLock::new();
106
107fn dispatcher_state() -> &'static RwLock<DispatcherState> {
108    DISPATCHER_STATE.get_or_init(|| RwLock::new(DispatcherState::default()))
109}
110
111impl DispatcherSettings {
112    fn for_mode(config: &WebhookConfig, mode: &RuntimeMode) -> Self {
113        let base_capacity = config
114            .queue_capacity
115            .map(|value| value.clamp(1, MAX_QUEUE_CAPACITY as u32) as usize)
116            .unwrap_or(DEFAULT_QUEUE_CAPACITY);
117
118        match mode {
119            RuntimeMode::Standard => Self {
120                queue_capacity: base_capacity,
121                worker_count: DEFAULT_WORKER_COUNT,
122            },
123            RuntimeMode::Parallel { worker_count } => {
124                let multiplier = config
125                    .parallel_queue_multiplier
126                    .unwrap_or(2.0)
127                    .clamp(1.0, MAX_PARALLEL_MULTIPLIER as f32)
128                    as f64;
129                let scaled_capacity =
130                    (base_capacity as f64 * (*worker_count as f64 * multiplier).max(1.0)) as usize;
131
132                Self {
133                    queue_capacity: scaled_capacity.clamp(1, MAX_QUEUE_CAPACITY),
134                    worker_count: usize::max(DEFAULT_WORKER_COUNT, *worker_count as usize),
135                }
136            }
137        }
138    }
139}
140
141impl WebhookDispatcher {
142    fn new(settings: DispatcherSettings) -> Arc<Self> {
143        let (ready_sender, ready_receiver) = bounded(settings.queue_capacity);
144        let (retry_sender, retry_receiver) = unbounded();
145        let startup_signals = settings.worker_count.saturating_add(1);
146        let (startup_sender, startup_receiver) = bounded(startup_signals);
147
148        let dispatcher = Arc::new(Self {
149            settings: settings.clone(),
150            ready_sender: Arc::new(ready_sender),
151            retry_sender: Arc::new(retry_sender),
152        });
153
154        diagnostics::set_queue_capacity(settings.queue_capacity);
155
156        for worker_id in 0..settings.worker_count {
157            let ready_receiver = ready_receiver.clone();
158            let retry_sender = Arc::downgrade(&dispatcher.retry_sender);
159            let startup_sender = startup_sender.clone();
160            let thread_name = format!("ralph-webhook-worker-{worker_id}");
161            std::thread::Builder::new()
162                .name(thread_name)
163                .spawn(move || {
164                    startup_sender
165                        .send(())
166                        .expect("signal webhook delivery worker startup");
167                    worker_loop(ready_receiver, retry_sender)
168                })
169                .expect("spawn webhook delivery worker");
170        }
171
172        let scheduler_ready = Arc::downgrade(&dispatcher.ready_sender);
173        let scheduler_startup_sender = startup_sender.clone();
174        std::thread::Builder::new()
175            .name("ralph-webhook-retry-scheduler".to_string())
176            .spawn(move || {
177                scheduler_startup_sender
178                    .send(())
179                    .expect("signal webhook retry scheduler startup");
180                retry_scheduler_loop(retry_receiver, scheduler_ready)
181            })
182            .expect("spawn webhook retry scheduler");
183        drop(startup_sender);
184
185        for _ in 0..startup_signals {
186            startup_receiver
187                .recv_timeout(DISPATCHER_STARTUP_TIMEOUT)
188                .expect("wait for webhook dispatcher thread startup");
189        }
190
191        log::debug!(
192            "Webhook dispatcher started with {} workers and queue capacity {}",
193            settings.worker_count,
194            settings.queue_capacity
195        );
196
197        dispatcher
198    }
199}
200
201impl Drop for WebhookDispatcher {
202    fn drop(&mut self) {
203        log::debug!(
204            "Webhook dispatcher shutting down (workers: {}, capacity: {})",
205            self.settings.worker_count,
206            self.settings.queue_capacity
207        );
208    }
209}
210
211fn with_dispatcher_state_write<T>(mut f: impl FnMut(&mut DispatcherState) -> T) -> T {
212    match dispatcher_state().write() {
213        Ok(mut guard) => f(&mut guard),
214        Err(poisoned) => {
215            let mut guard = poisoned.into_inner();
216            f(&mut guard)
217        }
218    }
219}
220
221pub(super) fn dispatcher_for_config(config: &WebhookConfig) -> Arc<WebhookDispatcher> {
222    with_dispatcher_state_write(|state| {
223        let settings = DispatcherSettings::for_mode(config, &state.mode);
224        let needs_rebuild = state
225            .dispatcher
226            .as_ref()
227            .is_none_or(|dispatcher| dispatcher.settings != settings);
228
229        if needs_rebuild {
230            state.dispatcher = Some(WebhookDispatcher::new(settings));
231        }
232
233        state
234            .dispatcher
235            .as_ref()
236            .expect("dispatcher initialized")
237            .clone()
238    })
239}
240
241/// Initialize the webhook dispatcher with capacity scaled for parallel execution.
242pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
243    with_dispatcher_state_write(|state| {
244        state.mode = RuntimeMode::Parallel { worker_count };
245    });
246    let _ = dispatcher_for_config(config);
247}
248
249fn worker_loop(ready_receiver: Receiver<DeliveryTask>, retry_sender: Weak<Sender<ScheduledRetry>>) {
250    while let Ok(task) = ready_receiver.recv() {
251        diagnostics::note_queue_dequeue();
252        handle_delivery_task(task, &retry_sender);
253    }
254}
255
256fn retry_scheduler_loop(
257    retry_receiver: Receiver<ScheduledRetry>,
258    ready_sender: Weak<Sender<DeliveryTask>>,
259) {
260    let mut pending = BinaryHeap::<RetryQueueEntry>::new();
261
262    loop {
263        let timeout = pending
264            .peek()
265            .map(|entry| entry.0.ready_at.saturating_duration_since(Instant::now()));
266
267        let scheduled = match timeout {
268            Some(duration) => match retry_receiver.recv_timeout(duration) {
269                Ok(task) => Some(task),
270                Err(crossbeam_channel::RecvTimeoutError::Timeout) => None,
271                Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
272                    if pending.is_empty() {
273                        break;
274                    }
275                    None
276                }
277            },
278            None => match retry_receiver.recv() {
279                Ok(task) => Some(task),
280                Err(_) => break,
281            },
282        };
283
284        if let Some(task) = scheduled {
285            pending.push(RetryQueueEntry(task));
286        }
287
288        let now = Instant::now();
289        while let Some(entry) = pending.peek() {
290            if entry.0.ready_at > now {
291                break;
292            }
293
294            let RetryQueueEntry(scheduled) = pending.pop().expect("pending retry exists");
295            let Some(ready_sender) = ready_sender.upgrade() else {
296                let error = anyhow::anyhow!(
297                    "webhook dispatcher shut down before retry enqueue: ready queue unavailable"
298                );
299                diagnostics::note_delivery_failure(
300                    &scheduled.task.msg,
301                    &error,
302                    scheduled.task.attempt.saturating_add(1),
303                );
304                log::warn!("{error:#}");
305                return;
306            };
307
308            match ready_sender.send(scheduled.task.clone()) {
309                Ok(()) => diagnostics::note_retry_requeue(),
310                Err(send_err) => {
311                    let error = anyhow::anyhow!(
312                        "webhook dispatcher shut down before retry enqueue: {send_err}"
313                    );
314                    diagnostics::note_delivery_failure(
315                        &scheduled.task.msg,
316                        &error,
317                        scheduled.task.attempt.saturating_add(1),
318                    );
319                    log::warn!("{error:#}");
320                    return;
321                }
322            }
323        }
324    }
325}
326
327#[cfg(test)]
328pub(crate) fn current_dispatcher_settings_for_tests(config: &WebhookConfig) -> (usize, usize) {
329    let dispatcher = dispatcher_for_config(config);
330    (
331        dispatcher.settings.queue_capacity,
332        dispatcher.settings.worker_count,
333    )
334}
335
336#[cfg(test)]
337pub(crate) fn reset_dispatcher_for_tests() {
338    with_dispatcher_state_write(|state| {
339        state.mode = RuntimeMode::Standard;
340        state.dispatcher = None;
341    });
342    super::delivery::install_test_transport_for_tests(None);
343}