Skip to main content

rustyclaw_core/gateway/
webhooks.rs

1//! Webhook endpoints for external triggers.
2//!
3//! Provides HTTP endpoints for external systems to interact with RustyClaw:
4//! - POST /hooks/wake   - Wake an idle agent (bring it out of sleep/standby)
5//! - POST /hooks/agent  - Send a message directly to a specific agent session
6//!
7//! All webhook requests require a bearer token for authentication.
8
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, info, warn};
14
15/// Webhook configuration.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct WebhookConfig {
18    /// Whether webhooks are enabled.
19    #[serde(default)]
20    pub enabled: bool,
21
22    /// Bearer token required for webhook authentication.
23    /// If not set, webhooks will be disabled for security.
24    #[serde(default)]
25    pub token: Option<String>,
26
27    /// Maximum payload size in bytes (default: 64 KB).
28    #[serde(default = "default_max_payload")]
29    pub max_payload_bytes: usize,
30}
31
32fn default_max_payload() -> usize {
33    65_536
34}
35
36impl Default for WebhookConfig {
37    fn default() -> Self {
38        Self {
39            enabled: false,
40            token: None,
41            max_payload_bytes: default_max_payload(),
42        }
43    }
44}
45
46/// Incoming webhook request body for /hooks/wake.
47#[derive(Debug, Deserialize)]
48pub struct WakeRequest {
49    /// Optional reason for waking the agent.
50    pub reason: Option<String>,
51    /// Optional session/thread to target.
52    pub session_id: Option<String>,
53}
54
55/// Incoming webhook request body for /hooks/agent.
56#[derive(Debug, Deserialize)]
57pub struct AgentRequest {
58    /// The message to send to the agent.
59    pub message: String,
60    /// Target agent/session ID (uses default if not specified).
61    pub agent_id: Option<String>,
62    /// Optional metadata passed through to the agent context.
63    pub metadata: Option<serde_json::Value>,
64}
65
66/// Webhook response.
67#[derive(Debug, Serialize)]
68pub struct WebhookResponse {
69    pub status: String,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub message: Option<String>,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub session_id: Option<String>,
74}
75
76/// Pending webhook messages that the gateway loop should pick up.
77pub struct WebhookQueue {
78    pending: Vec<PendingWebhook>,
79}
80
81/// A webhook event waiting to be processed.
82#[derive(Debug)]
83pub enum PendingWebhook {
84    Wake {
85        reason: Option<String>,
86        session_id: Option<String>,
87    },
88    AgentMessage {
89        message: String,
90        agent_id: Option<String>,
91        metadata: Option<serde_json::Value>,
92    },
93}
94
95impl WebhookQueue {
96    pub fn new() -> Self {
97        Self {
98            pending: Vec::new(),
99        }
100    }
101
102    /// Enqueue a wake event.
103    pub fn enqueue_wake(&mut self, reason: Option<String>, session_id: Option<String>) {
104        self.pending.push(PendingWebhook::Wake {
105            reason,
106            session_id,
107        });
108    }
109
110    /// Enqueue an agent message.
111    pub fn enqueue_agent_message(
112        &mut self,
113        message: String,
114        agent_id: Option<String>,
115        metadata: Option<serde_json::Value>,
116    ) {
117        self.pending.push(PendingWebhook::AgentMessage {
118            message,
119            agent_id,
120            metadata,
121        });
122    }
123
124    /// Drain all pending webhooks.
125    pub fn drain(&mut self) -> Vec<PendingWebhook> {
126        std::mem::take(&mut self.pending)
127    }
128
129    /// Check if there are pending webhooks.
130    pub fn is_empty(&self) -> bool {
131        self.pending.is_empty()
132    }
133}
134
135impl Default for WebhookQueue {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141/// Shared webhook queue.
142pub type SharedWebhookQueue = Arc<Mutex<WebhookQueue>>;
143
144/// Handle an incoming webhook HTTP request.
145///
146/// This is designed to be called from the health server's request handler
147/// (or a dedicated webhook listener) when a POST to /hooks/* is received.
148pub async fn handle_webhook_request(
149    path: &str,
150    body: &str,
151    auth_header: Option<&str>,
152    config: &WebhookConfig,
153    queue: SharedWebhookQueue,
154) -> (String, String, String) {
155    // Check if webhooks are enabled
156    if !config.enabled {
157        return (
158            "403 Forbidden".to_string(),
159            "application/json".to_string(),
160            json!({"error": "Webhooks are not enabled"}).to_string(),
161        );
162    }
163
164    // Validate auth token
165    if let Some(expected_token) = &config.token {
166        let provided = auth_header
167            .and_then(|h| h.strip_prefix("Bearer "))
168            .unwrap_or("");
169
170        if provided != expected_token {
171            warn!("Webhook auth failed for {}", path);
172            return (
173                "401 Unauthorized".to_string(),
174                "application/json".to_string(),
175                json!({"error": "Invalid or missing authorization token"}).to_string(),
176            );
177        }
178    } else {
179        // No token configured — reject for security
180        return (
181            "403 Forbidden".to_string(),
182            "application/json".to_string(),
183            json!({"error": "Webhook token not configured"}).to_string(),
184        );
185    }
186
187    // Check payload size
188    if body.len() > config.max_payload_bytes {
189        return (
190            "413 Payload Too Large".to_string(),
191            "application/json".to_string(),
192            json!({"error": "Payload exceeds max size", "max_bytes": config.max_payload_bytes})
193                .to_string(),
194        );
195    }
196
197    match path {
198        "/hooks/wake" => handle_wake(body, queue).await,
199        "/hooks/agent" => handle_agent(body, queue).await,
200        _ => (
201            "404 Not Found".to_string(),
202            "application/json".to_string(),
203            json!({"error": "Unknown webhook endpoint", "available": ["/hooks/wake", "/hooks/agent"]})
204                .to_string(),
205        ),
206    }
207}
208
209async fn handle_wake(body: &str, queue: SharedWebhookQueue) -> (String, String, String) {
210    let req: WakeRequest = match serde_json::from_str(body) {
211        Ok(r) => r,
212        Err(e) => {
213            return (
214                "400 Bad Request".to_string(),
215                "application/json".to_string(),
216                json!({"error": format!("Invalid JSON: {}", e)}).to_string(),
217            );
218        }
219    };
220
221    info!(
222        reason = ?req.reason,
223        session = ?req.session_id,
224        "Webhook: wake request received"
225    );
226
227    let mut q = queue.lock().await;
228    q.enqueue_wake(req.reason.clone(), req.session_id.clone());
229
230    let resp = WebhookResponse {
231        status: "accepted".to_string(),
232        message: Some("Wake signal queued".to_string()),
233        session_id: req.session_id,
234    };
235
236    (
237        "202 Accepted".to_string(),
238        "application/json".to_string(),
239        serde_json::to_string(&resp).unwrap_or_default(),
240    )
241}
242
243async fn handle_agent(body: &str, queue: SharedWebhookQueue) -> (String, String, String) {
244    let req: AgentRequest = match serde_json::from_str(body) {
245        Ok(r) => r,
246        Err(e) => {
247            return (
248                "400 Bad Request".to_string(),
249                "application/json".to_string(),
250                json!({"error": format!("Invalid JSON: {}", e)}).to_string(),
251            );
252        }
253    };
254
255    if req.message.is_empty() {
256        return (
257            "400 Bad Request".to_string(),
258            "application/json".to_string(),
259            json!({"error": "Message cannot be empty"}).to_string(),
260        );
261    }
262
263    info!(
264        agent = ?req.agent_id,
265        message_len = req.message.len(),
266        "Webhook: agent message received"
267    );
268
269    debug!(message = %req.message, "Webhook agent message content");
270
271    let mut q = queue.lock().await;
272    q.enqueue_agent_message(req.message, req.agent_id.clone(), req.metadata);
273
274    let resp = WebhookResponse {
275        status: "accepted".to_string(),
276        message: Some("Message queued for agent".to_string()),
277        session_id: req.agent_id,
278    };
279
280    (
281        "202 Accepted".to_string(),
282        "application/json".to_string(),
283        serde_json::to_string(&resp).unwrap_or_default(),
284    )
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_webhook_config_defaults() {
293        let config = WebhookConfig::default();
294        assert!(!config.enabled);
295        assert!(config.token.is_none());
296        assert_eq!(config.max_payload_bytes, 65_536);
297    }
298
299    #[test]
300    fn test_webhook_queue() {
301        let mut queue = WebhookQueue::new();
302        assert!(queue.is_empty());
303
304        queue.enqueue_wake(Some("test".to_string()), None);
305        assert!(!queue.is_empty());
306
307        queue.enqueue_agent_message("hello".to_string(), None, None);
308
309        let drained = queue.drain();
310        assert_eq!(drained.len(), 2);
311        assert!(queue.is_empty());
312    }
313
314    #[tokio::test]
315    async fn test_webhook_disabled() {
316        let config = WebhookConfig::default(); // disabled
317        let queue = Arc::new(Mutex::new(WebhookQueue::new()));
318
319        let (status, _, body) =
320            handle_webhook_request("/hooks/wake", "{}", None, &config, queue).await;
321        assert!(status.contains("403"));
322        assert!(body.contains("not enabled"));
323    }
324
325    #[tokio::test]
326    async fn test_webhook_auth_required() {
327        let config = WebhookConfig {
328            enabled: true,
329            token: Some("secret123".to_string()),
330            ..Default::default()
331        };
332        let queue = Arc::new(Mutex::new(WebhookQueue::new()));
333
334        // No auth header
335        let (status, _, _) =
336            handle_webhook_request("/hooks/wake", "{}", None, &config, queue.clone()).await;
337        assert!(status.contains("401"));
338
339        // Wrong token
340        let (status, _, _) = handle_webhook_request(
341            "/hooks/wake",
342            "{}",
343            Some("Bearer wrong"),
344            &config,
345            queue.clone(),
346        )
347        .await;
348        assert!(status.contains("401"));
349
350        // Correct token
351        let (status, _, _) = handle_webhook_request(
352            "/hooks/wake",
353            "{}",
354            Some("Bearer secret123"),
355            &config,
356            queue,
357        )
358        .await;
359        assert!(status.contains("202"));
360    }
361
362    #[tokio::test]
363    async fn test_webhook_wake() {
364        let config = WebhookConfig {
365            enabled: true,
366            token: Some("tok".to_string()),
367            ..Default::default()
368        };
369        let queue = Arc::new(Mutex::new(WebhookQueue::new()));
370
371        let body = r#"{"reason": "cron trigger"}"#;
372        let (status, _, resp) = handle_webhook_request(
373            "/hooks/wake",
374            body,
375            Some("Bearer tok"),
376            &config,
377            queue.clone(),
378        )
379        .await;
380
381        assert!(status.contains("202"));
382        assert!(resp.contains("accepted"));
383
384        let q = queue.lock().await;
385        assert!(!q.is_empty());
386    }
387
388    #[tokio::test]
389    async fn test_webhook_agent_empty_message() {
390        let config = WebhookConfig {
391            enabled: true,
392            token: Some("tok".to_string()),
393            ..Default::default()
394        };
395        let queue = Arc::new(Mutex::new(WebhookQueue::new()));
396
397        let body = r#"{"message": ""}"#;
398        let (status, _, _) = handle_webhook_request(
399            "/hooks/agent",
400            body,
401            Some("Bearer tok"),
402            &config,
403            queue,
404        )
405        .await;
406
407        assert!(status.contains("400"));
408    }
409}