agentzero_channels/channels/
webhook.rs1use crate::{Channel, ChannelMessage, SendMessage};
2use async_trait::async_trait;
3use std::sync::Arc;
4use tokio::sync::{mpsc, Mutex};
5
6super::channel_meta!(WEBHOOK_DESCRIPTOR, "webhook", "Webhook");
7
8pub struct WebhookChannel {
11 injector_tx: mpsc::Sender<ChannelMessage>,
12 injector_rx: Arc<Mutex<Option<mpsc::Receiver<ChannelMessage>>>>,
13}
14
15impl WebhookChannel {
16 pub fn new() -> Self {
17 let (tx, rx) = mpsc::channel(256);
18 Self {
19 injector_tx: tx,
20 injector_rx: Arc::new(Mutex::new(Some(rx))),
21 }
22 }
23
24 pub async fn inject_message(&self, msg: ChannelMessage) -> anyhow::Result<()> {
26 self.injector_tx
27 .send(msg)
28 .await
29 .map_err(|_| anyhow::anyhow!("webhook channel listener not running"))
30 }
31}
32
33impl Default for WebhookChannel {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39#[async_trait]
40impl Channel for WebhookChannel {
41 fn name(&self) -> &str {
42 "webhook"
43 }
44
45 async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
46 tracing::debug!("webhook channel send is a no-op (inbound-only)");
47 Ok(())
48 }
49
50 async fn listen(
51 &self,
52 tx: mpsc::Sender<ChannelMessage>,
53 ) -> anyhow::Result<()> {
54 let mut rx = self
55 .injector_rx
56 .lock()
57 .await
58 .take()
59 .ok_or_else(|| anyhow::anyhow!("webhook listener already started"))?;
60
61 while let Some(msg) = rx.recv().await {
62 if tx.send(msg).await.is_err() {
63 break;
64 }
65 }
66 Ok(())
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73 use crate::channels::helpers;
74
75 #[test]
76 fn webhook_channel_name() {
77 let ch = WebhookChannel::new();
78 assert_eq!(ch.name(), "webhook");
79 }
80
81 #[tokio::test]
82 async fn webhook_inject_and_listen() {
83 let ch = Arc::new(WebhookChannel::new());
84 let (tx, mut rx) = mpsc::channel(16);
85
86 let ch_clone = ch.clone();
87 let listen_handle = tokio::spawn(async move {
88 ch_clone.listen(tx).await.unwrap();
89 });
90
91 let msg = ChannelMessage {
92 id: helpers::new_message_id(),
93 sender: "external".into(),
94 reply_target: "external".into(),
95 content: "webhook payload".into(),
96 channel: "webhook".into(),
97 timestamp: helpers::now_epoch_secs(),
98 thread_ts: None,
99 privacy_boundary: String::new(),
100 };
101
102 ch.inject_message(msg).await.unwrap();
103
104 let received = rx.recv().await.expect("should receive injected message");
105 assert_eq!(received.content, "webhook payload");
106 assert_eq!(received.channel, "webhook");
107
108 listen_handle.abort();
110 }
111
112 #[tokio::test]
113 async fn webhook_send_is_noop() {
114 let ch = WebhookChannel::new();
115 let msg = SendMessage::new("test", "recipient");
116 assert!(ch.send(&msg).await.is_ok());
117 }
118}