Skip to main content

ralph/webhook/worker/runtime/
state.rs

1//! Global dispatcher state, reload/rebuild decisions, and config entry points.
2
3use crate::contracts::WebhookConfig;
4use std::sync::{Arc, OnceLock, RwLock};
5
6use super::types::{DispatcherSettings, RuntimeMode, WebhookDispatcher};
7use crate::webhook::diagnostics;
8
9#[derive(Debug)]
10struct DispatcherState {
11    mode: RuntimeMode,
12    dispatcher: Option<Arc<WebhookDispatcher>>,
13    disabled_reason: Option<String>,
14}
15
16impl Default for DispatcherState {
17    fn default() -> Self {
18        Self {
19            mode: RuntimeMode::Standard,
20            dispatcher: None,
21            disabled_reason: None,
22        }
23    }
24}
25
26static DISPATCHER_STATE: OnceLock<RwLock<DispatcherState>> = OnceLock::new();
27
28fn dispatcher_state() -> &'static RwLock<DispatcherState> {
29    DISPATCHER_STATE.get_or_init(|| RwLock::new(DispatcherState::default()))
30}
31
32fn with_dispatcher_state_write<T>(mut f: impl FnMut(&mut DispatcherState) -> T) -> T {
33    match dispatcher_state().write() {
34        Ok(mut guard) => f(&mut guard),
35        Err(poisoned) => {
36            let mut guard = poisoned.into_inner();
37            f(&mut guard)
38        }
39    }
40}
41
42pub(crate) fn dispatcher_for_config(config: &WebhookConfig) -> Option<Arc<WebhookDispatcher>> {
43    dispatcher_for_config_with_factory(config, WebhookDispatcher::new)
44}
45
46#[cfg(test)]
47pub(crate) fn dispatcher_for_config_with_spawner(
48    config: &WebhookConfig,
49    spawner: &impl super::types::ThreadSpawner,
50) -> Option<Arc<WebhookDispatcher>> {
51    dispatcher_for_config_with_factory(config, |settings| {
52        WebhookDispatcher::new_with_spawner(settings, spawner)
53    })
54}
55
56fn dispatcher_for_config_with_factory(
57    config: &WebhookConfig,
58    mut build_dispatcher: impl FnMut(DispatcherSettings) -> anyhow::Result<Arc<WebhookDispatcher>>,
59) -> Option<Arc<WebhookDispatcher>> {
60    with_dispatcher_state_write(|state| {
61        if state.disabled_reason.is_some() {
62            log::debug!("Webhooks disabled for this run after dispatcher startup failure");
63            return None;
64        }
65
66        let settings = DispatcherSettings::for_mode(config, &state.mode);
67        let needs_rebuild = state
68            .dispatcher
69            .as_ref()
70            .is_none_or(|dispatcher| dispatcher.settings != settings);
71
72        if needs_rebuild {
73            match build_dispatcher(settings) {
74                Ok(dispatcher) => state.dispatcher = Some(dispatcher),
75                Err(err) => {
76                    let reason = format!("{err:#}");
77                    state.dispatcher = None;
78                    state.disabled_reason = Some(reason.clone());
79                    diagnostics::set_queue_capacity(0);
80                    log::warn!(
81                        "Webhook delivery disabled for this run: failed to start dispatcher runtime: {reason}"
82                    );
83                    return None;
84                }
85            }
86        }
87
88        state.dispatcher.as_ref().cloned()
89    })
90}
91
92/// Initialize the webhook dispatcher with capacity scaled for parallel execution.
93pub fn init_worker_for_parallel(config: &WebhookConfig, worker_count: u8) {
94    with_dispatcher_state_write(|state| {
95        state.mode = RuntimeMode::Parallel { worker_count };
96    });
97    let _ = dispatcher_for_config(config);
98}
99
100#[cfg(test)]
101pub(crate) fn current_dispatcher_settings_for_tests(
102    config: &WebhookConfig,
103) -> Option<(usize, usize)> {
104    dispatcher_for_config(config).map(|dispatcher| {
105        (
106            dispatcher.settings.queue_capacity,
107            dispatcher.settings.worker_count,
108        )
109    })
110}
111
112#[cfg(test)]
113pub(crate) fn reset_dispatcher_for_tests() {
114    with_dispatcher_state_write(|state| {
115        state.mode = RuntimeMode::Standard;
116        state.dispatcher = None;
117        state.disabled_reason = None;
118    });
119    crate::webhook::worker::delivery::install_test_transport_for_tests(None);
120}