1use serde::{Deserialize, Serialize};
10use serde_json::json;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, info, warn};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct WebhookConfig {
18 #[serde(default)]
20 pub enabled: bool,
21
22 #[serde(default)]
25 pub token: Option<String>,
26
27 #[serde(default = "default_max_payload")]
29 pub max_payload_bytes: usize,
30}
31
32fn default_max_payload() -> usize {
33 65_536
34}
35
36impl Default for WebhookConfig {
37 fn default() -> Self {
38 Self {
39 enabled: false,
40 token: None,
41 max_payload_bytes: default_max_payload(),
42 }
43 }
44}
45
46#[derive(Debug, Deserialize)]
48pub struct WakeRequest {
49 pub reason: Option<String>,
51 pub session_id: Option<String>,
53}
54
55#[derive(Debug, Deserialize)]
57pub struct AgentRequest {
58 pub message: String,
60 pub agent_id: Option<String>,
62 pub metadata: Option<serde_json::Value>,
64}
65
66#[derive(Debug, Serialize)]
68pub struct WebhookResponse {
69 pub status: String,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub message: Option<String>,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub session_id: Option<String>,
74}
75
76pub struct WebhookQueue {
78 pending: Vec<PendingWebhook>,
79}
80
81#[derive(Debug)]
83pub enum PendingWebhook {
84 Wake {
85 reason: Option<String>,
86 session_id: Option<String>,
87 },
88 AgentMessage {
89 message: String,
90 agent_id: Option<String>,
91 metadata: Option<serde_json::Value>,
92 },
93}
94
95impl WebhookQueue {
96 pub fn new() -> Self {
97 Self {
98 pending: Vec::new(),
99 }
100 }
101
102 pub fn enqueue_wake(&mut self, reason: Option<String>, session_id: Option<String>) {
104 self.pending.push(PendingWebhook::Wake {
105 reason,
106 session_id,
107 });
108 }
109
110 pub fn enqueue_agent_message(
112 &mut self,
113 message: String,
114 agent_id: Option<String>,
115 metadata: Option<serde_json::Value>,
116 ) {
117 self.pending.push(PendingWebhook::AgentMessage {
118 message,
119 agent_id,
120 metadata,
121 });
122 }
123
124 pub fn drain(&mut self) -> Vec<PendingWebhook> {
126 std::mem::take(&mut self.pending)
127 }
128
129 pub fn is_empty(&self) -> bool {
131 self.pending.is_empty()
132 }
133}
134
135impl Default for WebhookQueue {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141pub type SharedWebhookQueue = Arc<Mutex<WebhookQueue>>;
143
144pub async fn handle_webhook_request(
149 path: &str,
150 body: &str,
151 auth_header: Option<&str>,
152 config: &WebhookConfig,
153 queue: SharedWebhookQueue,
154) -> (String, String, String) {
155 if !config.enabled {
157 return (
158 "403 Forbidden".to_string(),
159 "application/json".to_string(),
160 json!({"error": "Webhooks are not enabled"}).to_string(),
161 );
162 }
163
164 if let Some(expected_token) = &config.token {
166 let provided = auth_header
167 .and_then(|h| h.strip_prefix("Bearer "))
168 .unwrap_or("");
169
170 if provided != expected_token {
171 warn!("Webhook auth failed for {}", path);
172 return (
173 "401 Unauthorized".to_string(),
174 "application/json".to_string(),
175 json!({"error": "Invalid or missing authorization token"}).to_string(),
176 );
177 }
178 } else {
179 return (
181 "403 Forbidden".to_string(),
182 "application/json".to_string(),
183 json!({"error": "Webhook token not configured"}).to_string(),
184 );
185 }
186
187 if body.len() > config.max_payload_bytes {
189 return (
190 "413 Payload Too Large".to_string(),
191 "application/json".to_string(),
192 json!({"error": "Payload exceeds max size", "max_bytes": config.max_payload_bytes})
193 .to_string(),
194 );
195 }
196
197 match path {
198 "/hooks/wake" => handle_wake(body, queue).await,
199 "/hooks/agent" => handle_agent(body, queue).await,
200 _ => (
201 "404 Not Found".to_string(),
202 "application/json".to_string(),
203 json!({"error": "Unknown webhook endpoint", "available": ["/hooks/wake", "/hooks/agent"]})
204 .to_string(),
205 ),
206 }
207}
208
209async fn handle_wake(body: &str, queue: SharedWebhookQueue) -> (String, String, String) {
210 let req: WakeRequest = match serde_json::from_str(body) {
211 Ok(r) => r,
212 Err(e) => {
213 return (
214 "400 Bad Request".to_string(),
215 "application/json".to_string(),
216 json!({"error": format!("Invalid JSON: {}", e)}).to_string(),
217 );
218 }
219 };
220
221 info!(
222 reason = ?req.reason,
223 session = ?req.session_id,
224 "Webhook: wake request received"
225 );
226
227 let mut q = queue.lock().await;
228 q.enqueue_wake(req.reason.clone(), req.session_id.clone());
229
230 let resp = WebhookResponse {
231 status: "accepted".to_string(),
232 message: Some("Wake signal queued".to_string()),
233 session_id: req.session_id,
234 };
235
236 (
237 "202 Accepted".to_string(),
238 "application/json".to_string(),
239 serde_json::to_string(&resp).unwrap_or_default(),
240 )
241}
242
243async fn handle_agent(body: &str, queue: SharedWebhookQueue) -> (String, String, String) {
244 let req: AgentRequest = match serde_json::from_str(body) {
245 Ok(r) => r,
246 Err(e) => {
247 return (
248 "400 Bad Request".to_string(),
249 "application/json".to_string(),
250 json!({"error": format!("Invalid JSON: {}", e)}).to_string(),
251 );
252 }
253 };
254
255 if req.message.is_empty() {
256 return (
257 "400 Bad Request".to_string(),
258 "application/json".to_string(),
259 json!({"error": "Message cannot be empty"}).to_string(),
260 );
261 }
262
263 info!(
264 agent = ?req.agent_id,
265 message_len = req.message.len(),
266 "Webhook: agent message received"
267 );
268
269 debug!(message = %req.message, "Webhook agent message content");
270
271 let mut q = queue.lock().await;
272 q.enqueue_agent_message(req.message, req.agent_id.clone(), req.metadata);
273
274 let resp = WebhookResponse {
275 status: "accepted".to_string(),
276 message: Some("Message queued for agent".to_string()),
277 session_id: req.agent_id,
278 };
279
280 (
281 "202 Accepted".to_string(),
282 "application/json".to_string(),
283 serde_json::to_string(&resp).unwrap_or_default(),
284 )
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_webhook_config_defaults() {
293 let config = WebhookConfig::default();
294 assert!(!config.enabled);
295 assert!(config.token.is_none());
296 assert_eq!(config.max_payload_bytes, 65_536);
297 }
298
299 #[test]
300 fn test_webhook_queue() {
301 let mut queue = WebhookQueue::new();
302 assert!(queue.is_empty());
303
304 queue.enqueue_wake(Some("test".to_string()), None);
305 assert!(!queue.is_empty());
306
307 queue.enqueue_agent_message("hello".to_string(), None, None);
308
309 let drained = queue.drain();
310 assert_eq!(drained.len(), 2);
311 assert!(queue.is_empty());
312 }
313
314 #[tokio::test]
315 async fn test_webhook_disabled() {
316 let config = WebhookConfig::default(); let queue = Arc::new(Mutex::new(WebhookQueue::new()));
318
319 let (status, _, body) =
320 handle_webhook_request("/hooks/wake", "{}", None, &config, queue).await;
321 assert!(status.contains("403"));
322 assert!(body.contains("not enabled"));
323 }
324
325 #[tokio::test]
326 async fn test_webhook_auth_required() {
327 let config = WebhookConfig {
328 enabled: true,
329 token: Some("secret123".to_string()),
330 ..Default::default()
331 };
332 let queue = Arc::new(Mutex::new(WebhookQueue::new()));
333
334 let (status, _, _) =
336 handle_webhook_request("/hooks/wake", "{}", None, &config, queue.clone()).await;
337 assert!(status.contains("401"));
338
339 let (status, _, _) = handle_webhook_request(
341 "/hooks/wake",
342 "{}",
343 Some("Bearer wrong"),
344 &config,
345 queue.clone(),
346 )
347 .await;
348 assert!(status.contains("401"));
349
350 let (status, _, _) = handle_webhook_request(
352 "/hooks/wake",
353 "{}",
354 Some("Bearer secret123"),
355 &config,
356 queue,
357 )
358 .await;
359 assert!(status.contains("202"));
360 }
361
362 #[tokio::test]
363 async fn test_webhook_wake() {
364 let config = WebhookConfig {
365 enabled: true,
366 token: Some("tok".to_string()),
367 ..Default::default()
368 };
369 let queue = Arc::new(Mutex::new(WebhookQueue::new()));
370
371 let body = r#"{"reason": "cron trigger"}"#;
372 let (status, _, resp) = handle_webhook_request(
373 "/hooks/wake",
374 body,
375 Some("Bearer tok"),
376 &config,
377 queue.clone(),
378 )
379 .await;
380
381 assert!(status.contains("202"));
382 assert!(resp.contains("accepted"));
383
384 let q = queue.lock().await;
385 assert!(!q.is_empty());
386 }
387
388 #[tokio::test]
389 async fn test_webhook_agent_empty_message() {
390 let config = WebhookConfig {
391 enabled: true,
392 token: Some("tok".to_string()),
393 ..Default::default()
394 };
395 let queue = Arc::new(Mutex::new(WebhookQueue::new()));
396
397 let body = r#"{"message": ""}"#;
398 let (status, _, _) = handle_webhook_request(
399 "/hooks/agent",
400 body,
401 Some("Bearer tok"),
402 &config,
403 queue,
404 )
405 .await;
406
407 assert!(status.contains("400"));
408 }
409}