Skip to main content

agentzero_channels/channels/
webhook.rs

1use crate::{Channel, ChannelMessage, SendMessage};
2use async_trait::async_trait;
3use std::sync::Arc;
4use tokio::sync::{mpsc, Mutex};
5
6super::channel_meta!(WEBHOOK_DESCRIPTOR, "webhook", "Webhook");
7
8/// Generic HTTP webhook channel.
9/// Messages are injected via `inject_message()` from the gateway webhook handler.
10pub struct WebhookChannel {
11    injector_tx: mpsc::Sender<ChannelMessage>,
12    injector_rx: Arc<Mutex<Option<mpsc::Receiver<ChannelMessage>>>>,
13}
14
15impl WebhookChannel {
16    pub fn new() -> Self {
17        let (tx, rx) = mpsc::channel(256);
18        Self {
19            injector_tx: tx,
20            injector_rx: Arc::new(Mutex::new(Some(rx))),
21        }
22    }
23
24    /// Called by the gateway webhook handler to inject an inbound message.
25    pub async fn inject_message(&self, msg: ChannelMessage) -> anyhow::Result<()> {
26        self.injector_tx
27            .send(msg)
28            .await
29            .map_err(|_| anyhow::anyhow!("webhook channel listener not running"))
30    }
31}
32
33impl Default for WebhookChannel {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39#[async_trait]
40impl Channel for WebhookChannel {
41    fn name(&self) -> &str {
42        "webhook"
43    }
44
45    async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
46        tracing::debug!("webhook channel send is a no-op (inbound-only)");
47        Ok(())
48    }
49
50    async fn listen(
51        &self,
52        tx: mpsc::Sender<ChannelMessage>,
53    ) -> anyhow::Result<()> {
54        let mut rx = self
55            .injector_rx
56            .lock()
57            .await
58            .take()
59            .ok_or_else(|| anyhow::anyhow!("webhook listener already started"))?;
60
61        while let Some(msg) = rx.recv().await {
62            if tx.send(msg).await.is_err() {
63                break;
64            }
65        }
66        Ok(())
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use crate::channels::helpers;
74
75    #[test]
76    fn webhook_channel_name() {
77        let ch = WebhookChannel::new();
78        assert_eq!(ch.name(), "webhook");
79    }
80
81    #[tokio::test]
82    async fn webhook_inject_and_listen() {
83        let ch = Arc::new(WebhookChannel::new());
84        let (tx, mut rx) = mpsc::channel(16);
85
86        let ch_clone = ch.clone();
87        let listen_handle = tokio::spawn(async move {
88            ch_clone.listen(tx).await.unwrap();
89        });
90
91        let msg = ChannelMessage {
92            id: helpers::new_message_id(),
93            sender: "external".into(),
94            reply_target: "external".into(),
95            content: "webhook payload".into(),
96            channel: "webhook".into(),
97            timestamp: helpers::now_epoch_secs(),
98            thread_ts: None,
99            privacy_boundary: String::new(),
100        };
101
102        ch.inject_message(msg).await.unwrap();
103
104        let received = rx.recv().await.expect("should receive injected message");
105        assert_eq!(received.content, "webhook payload");
106        assert_eq!(received.channel, "webhook");
107
108        // Abort the listener (it would run forever in production)
109        listen_handle.abort();
110    }
111
112    #[tokio::test]
113    async fn webhook_send_is_noop() {
114        let ch = WebhookChannel::new();
115        let msg = SendMessage::new("test", "recipient");
116        assert!(ch.send(&msg).await.is_ok());
117    }
118}