Skip to main content

agentzero_channels/
lib.rs

1//! Platform integrations for AgentZero.
2//!
3//! Implements Telegram, Discord, and Slack channel adapters that bridge
4//! chat messages to the agent loop. Includes command parsing, reaction
5//! acknowledgement, and leak-guard middleware for sensitive data filtering.
6
7pub mod ack_reactions;
8mod channels;
9pub mod commands;
10pub mod drafts;
11pub mod group_reply;
12pub mod image_markers;
13pub mod interruption;
14pub mod leak_guard;
15pub mod outbound;
16pub mod pipeline;
17
18pub use channels::channel_setup::{register_configured_channels, ChannelInstanceConfig};
19pub use channels::CHANNEL_CATALOG;
20
21use async_trait::async_trait;
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::sync::Arc;
25
26// Re-export channel implementations that need public access
27pub use channels::{CliChannel, WebhookChannel};
28
29// ---------------------------------------------------------------------------
30// Message types
31// ---------------------------------------------------------------------------
32
33/// A message received from a channel (inbound).
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct ChannelMessage {
36    pub id: String,
37    pub sender: String,
38    pub reply_target: String,
39    pub content: String,
40    pub channel: String,
41    pub timestamp: u64,
42    pub thread_ts: Option<String>,
43    /// Privacy boundary inherited from the channel configuration.
44    /// Empty string means inherit the global privacy mode.
45    #[serde(default)]
46    pub privacy_boundary: String,
47}
48
49/// A message to send through a channel (outbound).
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct SendMessage {
52    pub content: String,
53    pub recipient: String,
54    pub subject: Option<String>,
55    pub thread_ts: Option<String>,
56}
57
58impl SendMessage {
59    pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
60        Self {
61            content: content.into(),
62            recipient: recipient.into(),
63            subject: None,
64            thread_ts: None,
65        }
66    }
67
68    pub fn with_subject(
69        content: impl Into<String>,
70        recipient: impl Into<String>,
71        subject: impl Into<String>,
72    ) -> Self {
73        Self {
74            content: content.into(),
75            recipient: recipient.into(),
76            subject: Some(subject.into()),
77            thread_ts: None,
78        }
79    }
80
81    pub fn in_thread(mut self, thread_ts: Option<String>) -> Self {
82        self.thread_ts = thread_ts;
83        self
84    }
85}
86
87// ---------------------------------------------------------------------------
88// Channel trait
89// ---------------------------------------------------------------------------
90
91/// Core channel trait — implement for any messaging platform.
92#[async_trait]
93pub trait Channel: Send + Sync {
94    fn name(&self) -> &str;
95
96    async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
97
98    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
99
100    async fn health_check(&self) -> bool {
101        true
102    }
103
104    async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
105        Ok(())
106    }
107
108    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
109        Ok(())
110    }
111
112    fn supports_draft_updates(&self) -> bool {
113        false
114    }
115
116    async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
117        Ok(None)
118    }
119
120    async fn update_draft(
121        &self,
122        _recipient: &str,
123        _message_id: &str,
124        _text: &str,
125    ) -> anyhow::Result<Option<String>> {
126        Ok(None)
127    }
128
129    async fn finalize_draft(
130        &self,
131        _recipient: &str,
132        _message_id: &str,
133        _text: &str,
134    ) -> anyhow::Result<()> {
135        Ok(())
136    }
137
138    async fn cancel_draft(&self, _recipient: &str, _message_id: &str) -> anyhow::Result<()> {
139        Ok(())
140    }
141
142    async fn add_reaction(
143        &self,
144        _channel_id: &str,
145        _message_id: &str,
146        _emoji: &str,
147    ) -> anyhow::Result<()> {
148        Ok(())
149    }
150
151    async fn remove_reaction(
152        &self,
153        _channel_id: &str,
154        _message_id: &str,
155        _emoji: &str,
156    ) -> anyhow::Result<()> {
157        Ok(())
158    }
159}
160
161// ---------------------------------------------------------------------------
162// Channel descriptor & catalog
163// ---------------------------------------------------------------------------
164
165#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
166pub struct ChannelDescriptor {
167    pub id: &'static str,
168    pub display_name: &'static str,
169}
170
171pub fn channel_catalog() -> &'static [ChannelDescriptor] {
172    CHANNEL_CATALOG
173}
174
175pub fn normalize_channel_id(input: &str) -> Option<&'static str> {
176    let needle = input.trim();
177    if needle.is_empty() {
178        return None;
179    }
180
181    for channel in CHANNEL_CATALOG {
182        if channel.id.eq_ignore_ascii_case(needle)
183            || channel
184                .display_name
185                .replace(' ', "-")
186                .eq_ignore_ascii_case(needle)
187            || channel.display_name.eq_ignore_ascii_case(needle)
188        {
189            return Some(channel.id);
190        }
191    }
192
193    None
194}
195
196// ---------------------------------------------------------------------------
197// Delivery types (gateway webhook compatibility)
198// ---------------------------------------------------------------------------
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201pub struct ChannelDelivery {
202    pub accepted: bool,
203    pub channel: String,
204    pub detail: String,
205}
206
207// ---------------------------------------------------------------------------
208// Channel locality
209// ---------------------------------------------------------------------------
210
211/// Channels that operate entirely locally (no network egress).
212const LOCAL_CHANNELS: &[&str] = &["cli", "transcription"];
213
214/// Check if a channel operates locally (no outbound network traffic).
215pub fn is_local_channel(name: &str) -> bool {
216    LOCAL_CHANNELS.contains(&name)
217}
218
219// ---------------------------------------------------------------------------
220// Channel registry
221// ---------------------------------------------------------------------------
222
223#[derive(Default)]
224pub struct ChannelRegistry {
225    channels: HashMap<String, Arc<dyn Channel>>,
226}
227
228impl ChannelRegistry {
229    pub fn new() -> Self {
230        Self::default()
231    }
232
233    pub fn with_builtin_handlers() -> Self {
234        let mut registry = Self::new();
235        registry.register(Arc::new(CliChannel));
236        registry
237    }
238
239    pub fn register(&mut self, channel: Arc<dyn Channel>) {
240        self.channels.insert(channel.name().to_string(), channel);
241    }
242
243    pub fn get(&self, name: &str) -> Option<Arc<dyn Channel>> {
244        self.channels.get(name).cloned()
245    }
246
247    pub fn has_channel(&self, name: &str) -> bool {
248        self.channels.contains_key(name)
249    }
250
251    pub fn channel_names(&self) -> Vec<&str> {
252        self.channels.keys().map(String::as_str).collect()
253    }
254
255    pub fn all_channels(&self) -> Vec<Arc<dyn Channel>> {
256        self.channels.values().cloned().collect()
257    }
258
259    /// Dispatch a message to a channel. If `boundary` is `"local_only"`, only
260    /// local channels (CLI, transcription) are allowed; non-local targets are
261    /// rejected with `accepted: false`.
262    pub async fn dispatch(
263        &self,
264        channel: &str,
265        payload: serde_json::Value,
266    ) -> Option<ChannelDelivery> {
267        self.dispatch_with_boundary(channel, payload, "").await
268    }
269
270    /// Dispatch with an explicit privacy boundary check.
271    pub async fn dispatch_with_boundary(
272        &self,
273        channel: &str,
274        payload: serde_json::Value,
275        boundary: &str,
276    ) -> Option<ChannelDelivery> {
277        let ch = self.channels.get(channel)?;
278
279        // Enforce privacy boundary: local_only blocks non-local channels.
280        if boundary == "local_only" && !is_local_channel(channel) {
281            return Some(ChannelDelivery {
282                accepted: false,
283                channel: channel.to_string(),
284                detail: "blocked by local_only privacy boundary".to_string(),
285            });
286        }
287
288        let content = payload
289            .get("text")
290            .or_else(|| payload.get("content"))
291            .or_else(|| payload.get("message"))
292            .and_then(|v| v.as_str())
293            .unwrap_or("")
294            .to_string();
295        let recipient = payload
296            .get("recipient")
297            .or_else(|| payload.get("channel_id"))
298            .and_then(|v| v.as_str())
299            .unwrap_or("default")
300            .to_string();
301
302        let msg = SendMessage::new(content, recipient);
303        match ch.send(&msg).await {
304            Ok(()) => Some(ChannelDelivery {
305                accepted: true,
306                channel: channel.to_string(),
307                detail: "message sent".to_string(),
308            }),
309            Err(e) => Some(ChannelDelivery {
310                accepted: false,
311                channel: channel.to_string(),
312                detail: format!("send failed: {e}"),
313            }),
314        }
315    }
316}
317
318// ---------------------------------------------------------------------------
319// Tests
320// ---------------------------------------------------------------------------
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    struct EchoChannel;
327
328    #[async_trait]
329    impl Channel for EchoChannel {
330        fn name(&self) -> &str {
331            "echo"
332        }
333
334        async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
335            Ok(())
336        }
337
338        async fn listen(
339            &self,
340            tx: tokio::sync::mpsc::Sender<ChannelMessage>,
341        ) -> anyhow::Result<()> {
342            tx.send(ChannelMessage {
343                id: "1".into(),
344                sender: "tester".into(),
345                reply_target: "tester".into(),
346                content: "hello".into(),
347                channel: "echo".into(),
348                timestamp: 123,
349                thread_ts: None,
350                privacy_boundary: String::new(),
351            })
352            .await
353            .map_err(|e| anyhow::anyhow!(e.to_string()))
354        }
355    }
356
357    #[test]
358    fn send_message_builder_success_path() {
359        let msg = SendMessage::new("hello", "user-1");
360        assert_eq!(msg.content, "hello");
361        assert_eq!(msg.recipient, "user-1");
362        assert!(msg.subject.is_none());
363        assert!(msg.thread_ts.is_none());
364
365        let threaded = msg.in_thread(Some("ts-123".into()));
366        assert_eq!(threaded.thread_ts.as_deref(), Some("ts-123"));
367    }
368
369    #[test]
370    fn send_message_with_subject_success_path() {
371        let msg = SendMessage::with_subject("body", "user", "subject line");
372        assert_eq!(msg.subject.as_deref(), Some("subject line"));
373    }
374
375    #[test]
376    fn channel_message_serde_round_trip_success_path() {
377        let msg = ChannelMessage {
378            id: "42".into(),
379            sender: "alice".into(),
380            reply_target: "alice".into(),
381            content: "ping".into(),
382            channel: "test".into(),
383            timestamp: 999,
384            thread_ts: Some("thread-1".into()),
385            privacy_boundary: String::new(),
386        };
387
388        let json = serde_json::to_string(&msg).expect("serialize should succeed");
389        let parsed: ChannelMessage =
390            serde_json::from_str(&json).expect("deserialize should succeed");
391        assert_eq!(parsed.id, "42");
392        assert_eq!(parsed.sender, "alice");
393        assert_eq!(parsed.thread_ts.as_deref(), Some("thread-1"));
394    }
395
396    #[tokio::test]
397    async fn default_trait_methods_return_success() {
398        let channel = EchoChannel;
399        assert!(channel.health_check().await);
400        assert!(channel.start_typing("bob").await.is_ok());
401        assert!(channel.stop_typing("bob").await.is_ok());
402        assert!(!channel.supports_draft_updates());
403        assert!(channel
404            .send_draft(&SendMessage::new("draft", "bob"))
405            .await
406            .unwrap()
407            .is_none());
408    }
409
410    #[tokio::test]
411    async fn listen_sends_message_to_channel() {
412        let channel = EchoChannel;
413        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
414
415        channel.listen(tx).await.unwrap();
416
417        let received = rx.recv().await.expect("message should be received");
418        assert_eq!(received.sender, "tester");
419        assert_eq!(received.content, "hello");
420    }
421
422    #[test]
423    fn registry_register_and_get_success_path() {
424        let mut registry = ChannelRegistry::new();
425        registry.register(Arc::new(EchoChannel));
426
427        assert!(registry.has_channel("echo"));
428        assert!(!registry.has_channel("missing"));
429        assert!(registry.get("echo").is_some());
430    }
431
432    #[tokio::test]
433    async fn registry_dispatch_success_path() {
434        let mut registry = ChannelRegistry::new();
435        registry.register(Arc::new(EchoChannel));
436
437        let delivery = registry
438            .dispatch("echo", serde_json::json!({"text": "hello"}))
439            .await
440            .expect("dispatch should find channel");
441
442        assert!(delivery.accepted);
443        assert_eq!(delivery.channel, "echo");
444    }
445
446    #[tokio::test]
447    async fn registry_dispatch_unknown_returns_none() {
448        let registry = ChannelRegistry::new();
449        let result = registry
450            .dispatch("missing", serde_json::json!({"text": "hello"}))
451            .await;
452        assert!(result.is_none());
453    }
454
455    #[test]
456    fn normalize_channel_id_success_path() {
457        assert_eq!(normalize_channel_id("telegram"), Some("telegram"));
458        assert_eq!(normalize_channel_id("Telegram"), Some("telegram"));
459        assert_eq!(
460            normalize_channel_id("NextCloud Talk"),
461            Some("nextcloud-talk")
462        );
463    }
464
465    #[test]
466    fn normalize_channel_id_unknown_returns_none() {
467        assert_eq!(normalize_channel_id("missing-channel"), None);
468    }
469
470    #[test]
471    fn channel_catalog_contains_known_entries() {
472        let catalog = channel_catalog();
473        assert!(!catalog.is_empty());
474        let ids: Vec<&str> = catalog.iter().map(|d| d.id).collect();
475        assert!(ids.contains(&"cli"));
476        assert!(ids.contains(&"telegram"));
477        assert!(ids.contains(&"webhook"));
478    }
479
480    #[test]
481    fn builtin_registry_has_cli_channel() {
482        let registry = ChannelRegistry::with_builtin_handlers();
483        assert!(registry.has_channel("cli"));
484    }
485
486    // --- Phase 2: Channel privacy boundary tests ---
487
488    #[test]
489    fn channel_message_serde_backward_compat_without_privacy_boundary() {
490        // Old JSON without privacy_boundary should deserialize with default empty string.
491        let json = r#"{"id":"1","sender":"a","reply_target":"a","content":"hi","channel":"cli","timestamp":0}"#;
492        let msg: ChannelMessage = serde_json::from_str(json).expect("deserialize old format");
493        assert_eq!(msg.privacy_boundary, "");
494    }
495
496    #[test]
497    fn channel_message_serde_with_privacy_boundary() {
498        let msg = ChannelMessage {
499            id: "1".into(),
500            sender: "a".into(),
501            reply_target: "a".into(),
502            content: "hi".into(),
503            channel: "cli".into(),
504            timestamp: 0,
505            thread_ts: None,
506            privacy_boundary: "local_only".to_string(),
507        };
508        let json = serde_json::to_string(&msg).unwrap();
509        let parsed: ChannelMessage = serde_json::from_str(&json).unwrap();
510        assert_eq!(parsed.privacy_boundary, "local_only");
511    }
512
513    #[test]
514    fn is_local_channel_cli_and_transcription() {
515        assert!(is_local_channel("cli"));
516        assert!(is_local_channel("transcription"));
517    }
518
519    #[test]
520    fn is_local_channel_non_local() {
521        assert!(!is_local_channel("telegram"));
522        assert!(!is_local_channel("discord"));
523        assert!(!is_local_channel("slack"));
524        assert!(!is_local_channel("webhook"));
525        assert!(!is_local_channel("email"));
526    }
527
528    #[tokio::test]
529    async fn dispatch_local_only_blocks_non_local_channel() {
530        let mut registry = ChannelRegistry::new();
531        registry.register(Arc::new(EchoChannel)); // "echo" is non-local
532        let delivery = registry
533            .dispatch_with_boundary("echo", serde_json::json!({"text": "secret"}), "local_only")
534            .await
535            .expect("should return delivery");
536        assert!(!delivery.accepted);
537        assert!(delivery.detail.contains("local_only"));
538    }
539
540    #[tokio::test]
541    async fn dispatch_local_only_allows_local_channel() {
542        // CLI is local, so local_only should allow it.
543        let registry = ChannelRegistry::with_builtin_handlers();
544        let delivery = registry
545            .dispatch_with_boundary("cli", serde_json::json!({"text": "hello"}), "local_only")
546            .await
547            .expect("should return delivery");
548        assert!(delivery.accepted);
549    }
550
551    #[tokio::test]
552    async fn dispatch_any_boundary_allows_all() {
553        let mut registry = ChannelRegistry::new();
554        registry.register(Arc::new(EchoChannel));
555        let delivery = registry
556            .dispatch_with_boundary("echo", serde_json::json!({"text": "hello"}), "any")
557            .await
558            .expect("should return delivery");
559        assert!(delivery.accepted);
560    }
561
562    #[tokio::test]
563    async fn dispatch_empty_boundary_allows_all() {
564        let mut registry = ChannelRegistry::new();
565        registry.register(Arc::new(EchoChannel));
566        let delivery = registry
567            .dispatch_with_boundary("echo", serde_json::json!({"text": "hello"}), "")
568            .await
569            .expect("should return delivery");
570        assert!(delivery.accepted);
571    }
572
573    #[tokio::test]
574    async fn dispatch_encrypted_only_allows_non_local() {
575        let mut registry = ChannelRegistry::new();
576        registry.register(Arc::new(EchoChannel));
577        let delivery = registry
578            .dispatch_with_boundary(
579                "echo",
580                serde_json::json!({"text": "hello"}),
581                "encrypted_only",
582            )
583            .await
584            .expect("should return delivery");
585        assert!(delivery.accepted);
586    }
587}