Skip to main content

ralph/contracts/config/
webhook.rs

1//! Webhook configuration for HTTP task event notifications.
2//!
3//! Responsibilities:
4//! - Define webhook config structs and backpressure policy enum.
5//! - Provide merge behavior and event filtering.
6//! - Define valid webhook event subscription types for config validation.
7//!
8//! Not handled here:
9//! - Actual webhook delivery (see `crate::webhook` module).
10
11use schemars::JsonSchema;
12use serde::{Deserialize, Serialize};
13
14/// Webhook event subscription type for config.
15/// Each variant corresponds to a WebhookEventType, plus Wildcard for "all events".
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
17#[serde(rename_all = "snake_case")]
18pub enum WebhookEventSubscription {
19    /// Task was created/added to queue.
20    TaskCreated,
21    /// Task status changed to Doing (execution started).
22    TaskStarted,
23    /// Task completed successfully (status Done).
24    TaskCompleted,
25    /// Task failed or was rejected.
26    TaskFailed,
27    /// Generic status change.
28    TaskStatusChanged,
29    /// Run loop started.
30    LoopStarted,
31    /// Run loop stopped.
32    LoopStopped,
33    /// Phase started for a task.
34    PhaseStarted,
35    /// Phase completed for a task.
36    PhaseCompleted,
37    /// Queue became unblocked.
38    QueueUnblocked,
39    /// Wildcard: subscribe to all events.
40    #[serde(rename = "*")]
41    Wildcard,
42}
43
44impl WebhookEventSubscription {
45    /// Convert to the string representation used in event matching.
46    pub fn as_str(&self) -> &'static str {
47        match self {
48            Self::TaskCreated => "task_created",
49            Self::TaskStarted => "task_started",
50            Self::TaskCompleted => "task_completed",
51            Self::TaskFailed => "task_failed",
52            Self::TaskStatusChanged => "task_status_changed",
53            Self::LoopStarted => "loop_started",
54            Self::LoopStopped => "loop_stopped",
55            Self::PhaseStarted => "phase_started",
56            Self::PhaseCompleted => "phase_completed",
57            Self::QueueUnblocked => "queue_unblocked",
58            Self::Wildcard => "*",
59        }
60    }
61}
62
63/// Backpressure policy for webhook delivery queue.
64#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, JsonSchema)]
65#[serde(rename_all = "snake_case")]
66pub enum WebhookQueuePolicy {
67    /// Drop new webhooks when queue is full, preserving existing queue contents.
68    /// This is functionally equivalent to `drop_new` due to channel constraints.
69    #[default]
70    DropOldest,
71    /// Drop the new webhook if queue is full.
72    DropNew,
73    /// Block sender briefly, then drop if queue is still full.
74    BlockWithTimeout,
75}
76
77/// Webhook configuration for HTTP task event notifications.
78#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
79#[serde(default, deny_unknown_fields)]
80pub struct WebhookConfig {
81    /// Enable webhook notifications (default: false).
82    pub enabled: Option<bool>,
83
84    /// Webhook endpoint URL (required when enabled).
85    pub url: Option<String>,
86
87    /// Secret key for HMAC-SHA256 signature generation.
88    /// When set, webhooks include an X-Ralph-Signature header.
89    pub secret: Option<String>,
90
91    /// Events to subscribe to (default: legacy task events only).
92    pub events: Option<Vec<WebhookEventSubscription>>,
93
94    /// Request timeout in seconds (default: 30, max: 300).
95    #[schemars(range(min = 1, max = 300))]
96    pub timeout_secs: Option<u32>,
97
98    /// Number of retry attempts for failed deliveries (default: 3, max: 10).
99    #[schemars(range(min = 0, max = 10))]
100    pub retry_count: Option<u32>,
101
102    /// Retry backoff base in milliseconds (default: 1000, max: 30000).
103    #[schemars(range(min = 100, max = 30000))]
104    pub retry_backoff_ms: Option<u32>,
105
106    /// Maximum number of pending webhooks in the delivery queue (default: 500, range: 10-10000).
107    #[schemars(range(min = 10, max = 10000))]
108    pub queue_capacity: Option<u32>,
109
110    /// Multiplier for queue capacity in parallel mode (default: 2.0, range: 1.0-10.0).
111    /// When running with N parallel workers, effective capacity = queue_capacity * max(1, workers * multiplier).
112    /// Set higher (e.g., 3.0) if webhook endpoint is slow or unreliable.
113    #[schemars(range(min = 1.0, max = 10.0))]
114    pub parallel_queue_multiplier: Option<f32>,
115
116    /// Backpressure policy when queue is full (default: drop_oldest).
117    /// - drop_oldest: Drop new webhooks when full (preserves existing queue contents)
118    /// - drop_new: Drop the new webhook if queue is full
119    /// - block_with_timeout: Block sender briefly (100ms), then drop if still full
120    pub queue_policy: Option<WebhookQueuePolicy>,
121}
122
123impl WebhookConfig {
124    pub fn merge_from(&mut self, other: Self) {
125        if other.enabled.is_some() {
126            self.enabled = other.enabled;
127        }
128        if other.url.is_some() {
129            self.url = other.url;
130        }
131        if other.secret.is_some() {
132            self.secret = other.secret;
133        }
134        if other.events.is_some() {
135            self.events = other.events;
136        }
137        if other.timeout_secs.is_some() {
138            self.timeout_secs = other.timeout_secs;
139        }
140        if other.retry_count.is_some() {
141            self.retry_count = other.retry_count;
142        }
143        if other.retry_backoff_ms.is_some() {
144            self.retry_backoff_ms = other.retry_backoff_ms;
145        }
146        if other.queue_capacity.is_some() {
147            self.queue_capacity = other.queue_capacity;
148        }
149        if other.parallel_queue_multiplier.is_some() {
150            self.parallel_queue_multiplier = other.parallel_queue_multiplier;
151        }
152        if other.queue_policy.is_some() {
153            self.queue_policy = other.queue_policy;
154        }
155    }
156
157    /// Legacy default events that are enabled when `events` is not specified.
158    /// New events (loop_*, phase_*) are opt-in and require explicit configuration.
159    const DEFAULT_EVENTS_V1: [&'static str; 5] = [
160        "task_created",
161        "task_started",
162        "task_completed",
163        "task_failed",
164        "task_status_changed",
165    ];
166
167    /// Check if a specific event type is enabled.
168    ///
169    /// Event filtering behavior:
170    /// - If webhooks are disabled, no events are sent.
171    /// - If `events` is `None`: only legacy task events are enabled (backward compatible).
172    /// - If `events` is `Some([...])`: only those events are enabled; use `["*"]` to enable all.
173    pub fn is_event_enabled(&self, event: &str) -> bool {
174        if !self.enabled.unwrap_or(false) {
175            return false;
176        }
177        match &self.events {
178            None => Self::DEFAULT_EVENTS_V1.contains(&event),
179            Some(events) => events
180                .iter()
181                .any(|e| e.as_str() == event || e.as_str() == "*"),
182        }
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn test_event_subscription_serialization() {
192        // Test snake_case serialization
193        let sub = WebhookEventSubscription::TaskCreated;
194        assert_eq!(serde_json::to_string(&sub).unwrap(), "\"task_created\"");
195
196        // Test wildcard serialization
197        let wild = WebhookEventSubscription::Wildcard;
198        assert_eq!(serde_json::to_string(&wild).unwrap(), "\"*\"");
199    }
200
201    #[test]
202    fn test_event_subscription_deserialization() {
203        let sub: WebhookEventSubscription = serde_json::from_str("\"task_created\"").unwrap();
204        assert_eq!(sub, WebhookEventSubscription::TaskCreated);
205
206        let wild: WebhookEventSubscription = serde_json::from_str("\"*\"").unwrap();
207        assert_eq!(wild, WebhookEventSubscription::Wildcard);
208    }
209
210    #[test]
211    fn test_invalid_event_rejected() {
212        let result: Result<WebhookEventSubscription, _> = serde_json::from_str("\"task_creatd\"");
213        assert!(result.is_err());
214    }
215
216    #[test]
217    fn test_is_event_enabled_with_subscription_type() {
218        let config = WebhookConfig {
219            enabled: Some(true),
220            events: Some(vec![
221                WebhookEventSubscription::TaskCreated,
222                WebhookEventSubscription::Wildcard,
223            ]),
224            ..Default::default()
225        };
226        assert!(config.is_event_enabled("task_created"));
227        assert!(config.is_event_enabled("loop_started")); // via wildcard
228    }
229
230    #[test]
231    fn test_is_event_enabled_default_events_when_none() {
232        let config = WebhookConfig {
233            enabled: Some(true),
234            events: None,
235            ..Default::default()
236        };
237        assert!(config.is_event_enabled("task_created"));
238        assert!(config.is_event_enabled("task_started"));
239        assert!(!config.is_event_enabled("loop_started")); // not in default set
240    }
241
242    #[test]
243    fn test_is_event_enabled_disabled_when_not_enabled() {
244        let config = WebhookConfig {
245            enabled: Some(false),
246            events: Some(vec![WebhookEventSubscription::TaskCreated]),
247            ..Default::default()
248        };
249        assert!(!config.is_event_enabled("task_created"));
250    }
251}