ralph/webhook/worker/
runtime.rs1use crate::contracts::WebhookConfig;
20use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
21use std::cmp::Ordering as CmpOrdering;
22use std::collections::BinaryHeap;
23use std::sync::{Arc, OnceLock, RwLock, Weak};
24use std::time::{Duration, Instant};
25
26use super::super::diagnostics;
27use super::super::types::WebhookMessage;
28use super::delivery::handle_delivery_task;
29
30const DEFAULT_QUEUE_CAPACITY: usize = 500;
31const DEFAULT_WORKER_COUNT: usize = 4;
32const MAX_QUEUE_CAPACITY: usize = 10_000;
33const MAX_PARALLEL_MULTIPLIER: f64 = 10.0;
34const DISPATCHER_STARTUP_TIMEOUT: Duration = Duration::from_secs(1);
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub(super) struct DispatcherSettings {
38 pub(super) queue_capacity: usize,
39 pub(super) worker_count: usize,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43enum RuntimeMode {
44 Standard,
45 Parallel { worker_count: u8 },
46}
47
48#[derive(Debug)]
49struct DispatcherState {
50 mode: RuntimeMode,
51 dispatcher: Option<Arc<WebhookDispatcher>>,
52}
53
54impl Default for DispatcherState {
55 fn default() -> Self {
56 Self {
57 mode: RuntimeMode::Standard,
58 dispatcher: None,
59 }
60 }
61}
62
63#[derive(Debug)]
64pub(super) struct WebhookDispatcher {
65 pub(super) settings: DispatcherSettings,
66 pub(super) ready_sender: Arc<Sender<DeliveryTask>>,
67 retry_sender: Arc<Sender<ScheduledRetry>>,
68}
69
70#[derive(Debug, Clone)]
71pub(super) struct DeliveryTask {
72 pub(super) msg: WebhookMessage,
73 pub(super) attempt: u32,
74}
75
76#[derive(Debug, Clone)]
77pub(super) struct ScheduledRetry {
78 pub(super) ready_at: Instant,
79 pub(super) task: DeliveryTask,
80}
81
82#[derive(Debug, Clone)]
83struct RetryQueueEntry(ScheduledRetry);
84
85impl PartialEq for RetryQueueEntry {
86 fn eq(&self, other: &Self) -> bool {
87 self.0.ready_at.eq(&other.0.ready_at)
88 }
89}
90
91impl Eq for RetryQueueEntry {}
92
93impl PartialOrd for RetryQueueEntry {
94 fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
95 Some(self.cmp(other))
96 }
97}
98
99impl Ord for RetryQueueEntry {
100 fn cmp(&self, other: &Self) -> CmpOrdering {
101 other.0.ready_at.cmp(&self.0.ready_at)
102 }
103}
104
105static DISPATCHER_STATE: OnceLock<RwLock<DispatcherState>> = OnceLock::new();
106
107fn dispatcher_state() -> &'static RwLock<DispatcherState> {
108 DISPATCHER_STATE.get_or_init(|| RwLock::new(DispatcherState::default()))
109}
110
111impl DispatcherSettings {
112 fn for_mode(config: &WebhookConfig, mode: &RuntimeMode) -> Self {
113 let base_capacity = config
114 .queue_capacity
115 .map(|value| value.clamp(1, MAX_QUEUE_CAPACITY as u32) as usize)
116 .unwrap_or(DEFAULT_QUEUE_CAPACITY);
117
118 match mode {
119 RuntimeMode::Standard => Self {
120 queue_capacity: base_capacity,
121 worker_count: DEFAULT_WORKER_COUNT,
122 },
123 RuntimeMode::Parallel { worker_count } => {
124 let multiplier = config
125 .parallel_queue_multiplier
126 .unwrap_or(2.0)
127 .clamp(1.0, MAX_PARALLEL_MULTIPLIER as f32)
128 as f64;
129 let scaled_capacity =
130 (base_capacity as f64 * (*worker_count as f64 * multiplier).max(1.0)) as usize;
131
132 Self {
133 queue_capacity: scaled_capacity.clamp(1, MAX_QUEUE_CAPACITY),
134 worker_count: usize::max(DEFAULT_WORKER_COUNT, *worker_count as usize),
135 }
136 }
137 }
138 }
139}
140
141impl WebhookDispatcher {
142 fn new(settings: DispatcherSettings) -> Arc<Self> {
143 let (ready_sender, ready_receiver) = bounded(settings.queue_capacity);
144 let (retry_sender, retry_receiver) = unbounded();
145 let startup_signals = settings.worker_count.saturating_add(1);
146 let (startup_sender, startup_receiver) = bounded(startup_signals);
147
148 let dispatcher = Arc::new(Self {
149 settings: settings.clone(),
150 ready_sender: Arc::new(ready_sender),
151 retry_sender: Arc::new(retry_sender),
152 });
153
154 diagnostics::set_queue_capacity(settings.queue_capacity);
155
156 for worker_id in 0..settings.worker_count {
157 let ready_receiver = ready_receiver.clone();
158 let retry_sender = Arc::downgrade(&dispatcher.retry_sender);
159 let startup_sender = startup_sender.clone();
160 let thread_name = format!("ralph-webhook-worker-{worker_id}");
161 std::thread::Builder::new()
162 .name(thread_name)
163 .spawn(move || {
164 startup_sender
165 .send(())
166 .expect("signal webhook delivery worker startup");
167 worker_loop(ready_receiver, retry_sender)
168 })
169 .expect("spawn webhook delivery worker");
170 }
171
172 let scheduler_ready = Arc::downgrade(&dispatcher.ready_sender);
173 let scheduler_startup_sender = startup_sender.clone();
174 std::thread::Builder::new()
175 .name("ralph-webhook-retry-scheduler".to_string())
176 .spawn(move || {
177 scheduler_startup_sender
178 .send(())
179 .expect("signal webhook retry scheduler startup");
180 retry_scheduler_loop(retry_receiver, scheduler_ready)
181 })
182 .expect("spawn webhook retry scheduler");
183 drop(startup_sender);
184
185 for _ in 0..startup_signals {
186 startup_receiver
187 .recv_timeout(DISPATCHER_STARTUP_TIMEOUT)
188 .expect("wait for webhook dispatcher thread startup");
189 }
190
191 log::debug!(
192 "Webhook dispatcher started with {} workers and queue capacity {}",
193 settings.worker_count,
194 settings.queue_capacity
195 );
196
197 dispatcher
198 }
199}
200
201impl Drop for WebhookDispatcher {
202 fn drop(&mut self) {
203 log::debug!(
204 "Webhook dispatcher shutting down (workers: {}, capacity: {})",
205 self.settings.worker_count,
206 self.settings.queue_capacity
207 );
208 }
209}
210
211fn with_dispatcher_state_write<T>(mut f: impl FnMut(&mut DispatcherState) -> T) -> T {
212 match dispatcher_state().write() {
213 Ok(mut guard) => f(&mut guard),
214 Err(poisoned) => {
215 let mut guard = poisoned.into_inner();
216 f(&mut guard)
217 }
218 }
219}
220
221pub(super) fn dispatcher_for_config(config: &WebhookConfig) -> Arc<WebhookDispatcher> {
222 with_dispatcher_state_write(|state| {
223 let settings = DispatcherSettings::for_mode(config, &state.mode);
224 let needs_rebuild = state
225 .dispatcher
226 .as_ref()
227 .is_none_or(|dispatcher| dispatcher.settings != settings);
228
229 if needs_rebuild {
230 state.dispatcher = Some(WebhookDispatcher::new(settings));
231 }
232
233 state
234 .dispatcher
235 .as_ref()
236 .expect("dispatcher initialized")
237 .clone()
238 })
239}
240
241pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
243 with_dispatcher_state_write(|state| {
244 state.mode = RuntimeMode::Parallel { worker_count };
245 });
246 let _ = dispatcher_for_config(config);
247}
248
249fn worker_loop(ready_receiver: Receiver<DeliveryTask>, retry_sender: Weak<Sender<ScheduledRetry>>) {
250 while let Ok(task) = ready_receiver.recv() {
251 diagnostics::note_queue_dequeue();
252 handle_delivery_task(task, &retry_sender);
253 }
254}
255
256fn retry_scheduler_loop(
257 retry_receiver: Receiver<ScheduledRetry>,
258 ready_sender: Weak<Sender<DeliveryTask>>,
259) {
260 let mut pending = BinaryHeap::<RetryQueueEntry>::new();
261
262 loop {
263 let timeout = pending
264 .peek()
265 .map(|entry| entry.0.ready_at.saturating_duration_since(Instant::now()));
266
267 let scheduled = match timeout {
268 Some(duration) => match retry_receiver.recv_timeout(duration) {
269 Ok(task) => Some(task),
270 Err(crossbeam_channel::RecvTimeoutError::Timeout) => None,
271 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
272 if pending.is_empty() {
273 break;
274 }
275 None
276 }
277 },
278 None => match retry_receiver.recv() {
279 Ok(task) => Some(task),
280 Err(_) => break,
281 },
282 };
283
284 if let Some(task) = scheduled {
285 pending.push(RetryQueueEntry(task));
286 }
287
288 let now = Instant::now();
289 while let Some(entry) = pending.peek() {
290 if entry.0.ready_at > now {
291 break;
292 }
293
294 let RetryQueueEntry(scheduled) = pending.pop().expect("pending retry exists");
295 let Some(ready_sender) = ready_sender.upgrade() else {
296 let error = anyhow::anyhow!(
297 "webhook dispatcher shut down before retry enqueue: ready queue unavailable"
298 );
299 diagnostics::note_delivery_failure(
300 &scheduled.task.msg,
301 &error,
302 scheduled.task.attempt.saturating_add(1),
303 );
304 log::warn!("{error:#}");
305 return;
306 };
307
308 match ready_sender.send(scheduled.task.clone()) {
309 Ok(()) => diagnostics::note_retry_requeue(),
310 Err(send_err) => {
311 let error = anyhow::anyhow!(
312 "webhook dispatcher shut down before retry enqueue: {send_err}"
313 );
314 diagnostics::note_delivery_failure(
315 &scheduled.task.msg,
316 &error,
317 scheduled.task.attempt.saturating_add(1),
318 );
319 log::warn!("{error:#}");
320 return;
321 }
322 }
323 }
324 }
325}
326
327#[cfg(test)]
328pub(crate) fn current_dispatcher_settings_for_tests(config: &WebhookConfig) -> (usize, usize) {
329 let dispatcher = dispatcher_for_config(config);
330 (
331 dispatcher.settings.queue_capacity,
332 dispatcher.settings.worker_count,
333 )
334}
335
336#[cfg(test)]
337pub(crate) fn reset_dispatcher_for_tests() {
338 with_dispatcher_state_write(|state| {
339 state.mode = RuntimeMode::Standard;
340 state.dispatcher = None;
341 });
342 super::delivery::install_test_transport_for_tests(None);
343}