ralph/webhook/worker/runtime/
state.rs1use crate::contracts::WebhookConfig;
4use std::sync::{Arc, OnceLock, RwLock};
5
6use super::types::{DispatcherSettings, RuntimeMode, WebhookDispatcher};
7use crate::webhook::diagnostics;
8
9#[derive(Debug)]
10struct DispatcherState {
11 mode: RuntimeMode,
12 dispatcher: Option<Arc<WebhookDispatcher>>,
13 disabled_reason: Option<String>,
14}
15
16impl Default for DispatcherState {
17 fn default() -> Self {
18 Self {
19 mode: RuntimeMode::Standard,
20 dispatcher: None,
21 disabled_reason: None,
22 }
23 }
24}
25
26static DISPATCHER_STATE: OnceLock<RwLock<DispatcherState>> = OnceLock::new();
27
28fn dispatcher_state() -> &'static RwLock<DispatcherState> {
29 DISPATCHER_STATE.get_or_init(|| RwLock::new(DispatcherState::default()))
30}
31
32fn with_dispatcher_state_write<T>(mut f: impl FnMut(&mut DispatcherState) -> T) -> T {
33 match dispatcher_state().write() {
34 Ok(mut guard) => f(&mut guard),
35 Err(poisoned) => {
36 let mut guard = poisoned.into_inner();
37 f(&mut guard)
38 }
39 }
40}
41
42pub(crate) fn dispatcher_for_config(config: &WebhookConfig) -> Option<Arc<WebhookDispatcher>> {
43 dispatcher_for_config_with_factory(config, WebhookDispatcher::new)
44}
45
46#[cfg(test)]
47pub(crate) fn dispatcher_for_config_with_spawner(
48 config: &WebhookConfig,
49 spawner: &impl super::types::ThreadSpawner,
50) -> Option<Arc<WebhookDispatcher>> {
51 dispatcher_for_config_with_factory(config, |settings| {
52 WebhookDispatcher::new_with_spawner(settings, spawner)
53 })
54}
55
56fn dispatcher_for_config_with_factory(
57 config: &WebhookConfig,
58 mut build_dispatcher: impl FnMut(DispatcherSettings) -> anyhow::Result<Arc<WebhookDispatcher>>,
59) -> Option<Arc<WebhookDispatcher>> {
60 with_dispatcher_state_write(|state| {
61 if state.disabled_reason.is_some() {
62 log::debug!("Webhooks disabled for this run after dispatcher startup failure");
63 return None;
64 }
65
66 let settings = DispatcherSettings::for_mode(config, &state.mode);
67 let needs_rebuild = state
68 .dispatcher
69 .as_ref()
70 .is_none_or(|dispatcher| dispatcher.settings != settings);
71
72 if needs_rebuild {
73 match build_dispatcher(settings) {
74 Ok(dispatcher) => state.dispatcher = Some(dispatcher),
75 Err(err) => {
76 let reason = format!("{err:#}");
77 state.dispatcher = None;
78 state.disabled_reason = Some(reason.clone());
79 diagnostics::set_queue_capacity(0);
80 log::warn!(
81 "Webhook delivery disabled for this run: failed to start dispatcher runtime: {reason}"
82 );
83 return None;
84 }
85 }
86 }
87
88 state.dispatcher.as_ref().cloned()
89 })
90}
91
92pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
94 with_dispatcher_state_write(|state| {
95 state.mode = RuntimeMode::Parallel { worker_count };
96 });
97 let _ = dispatcher_for_config(config);
98}
99
100#[cfg(test)]
101pub(crate) fn current_dispatcher_settings_for_tests(
102 config: &WebhookConfig,
103) -> Option<(usize, usize)> {
104 dispatcher_for_config(config).map(|dispatcher| {
105 (
106 dispatcher.settings.queue_capacity,
107 dispatcher.settings.worker_count,
108 )
109 })
110}
111
112#[cfg(test)]
113pub(crate) fn reset_dispatcher_for_tests() {
114 with_dispatcher_state_write(|state| {
115 state.mode = RuntimeMode::Standard;
116 state.dispatcher = None;
117 state.disabled_reason = None;
118 });
119 crate::webhook::worker::delivery::install_test_transport_for_tests(None);
120}