1pub mod ack_reactions;
8mod channels;
9pub mod commands;
10pub mod drafts;
11pub mod group_reply;
12pub mod image_markers;
13pub mod interruption;
14pub mod leak_guard;
15pub mod outbound;
16pub mod pipeline;
17
18pub use channels::channel_setup::{register_configured_channels, ChannelInstanceConfig};
19pub use channels::CHANNEL_CATALOG;
20
21use async_trait::async_trait;
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::sync::Arc;
25
26pub use channels::{CliChannel, WebhookChannel};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct ChannelMessage {
36 pub id: String,
37 pub sender: String,
38 pub reply_target: String,
39 pub content: String,
40 pub channel: String,
41 pub timestamp: u64,
42 pub thread_ts: Option<String>,
43 #[serde(default)]
46 pub privacy_boundary: String,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct SendMessage {
52 pub content: String,
53 pub recipient: String,
54 pub subject: Option<String>,
55 pub thread_ts: Option<String>,
56}
57
58impl SendMessage {
59 pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
60 Self {
61 content: content.into(),
62 recipient: recipient.into(),
63 subject: None,
64 thread_ts: None,
65 }
66 }
67
68 pub fn with_subject(
69 content: impl Into<String>,
70 recipient: impl Into<String>,
71 subject: impl Into<String>,
72 ) -> Self {
73 Self {
74 content: content.into(),
75 recipient: recipient.into(),
76 subject: Some(subject.into()),
77 thread_ts: None,
78 }
79 }
80
81 pub fn in_thread(mut self, thread_ts: Option<String>) -> Self {
82 self.thread_ts = thread_ts;
83 self
84 }
85}
86
87#[async_trait]
93pub trait Channel: Send + Sync {
94 fn name(&self) -> &str;
95
96 async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
97
98 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
99
100 async fn health_check(&self) -> bool {
101 true
102 }
103
104 async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
105 Ok(())
106 }
107
108 async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
109 Ok(())
110 }
111
112 fn supports_draft_updates(&self) -> bool {
113 false
114 }
115
116 async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
117 Ok(None)
118 }
119
120 async fn update_draft(
121 &self,
122 _recipient: &str,
123 _message_id: &str,
124 _text: &str,
125 ) -> anyhow::Result<Option<String>> {
126 Ok(None)
127 }
128
129 async fn finalize_draft(
130 &self,
131 _recipient: &str,
132 _message_id: &str,
133 _text: &str,
134 ) -> anyhow::Result<()> {
135 Ok(())
136 }
137
138 async fn cancel_draft(&self, _recipient: &str, _message_id: &str) -> anyhow::Result<()> {
139 Ok(())
140 }
141
142 async fn add_reaction(
143 &self,
144 _channel_id: &str,
145 _message_id: &str,
146 _emoji: &str,
147 ) -> anyhow::Result<()> {
148 Ok(())
149 }
150
151 async fn remove_reaction(
152 &self,
153 _channel_id: &str,
154 _message_id: &str,
155 _emoji: &str,
156 ) -> anyhow::Result<()> {
157 Ok(())
158 }
159}
160
161#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
166pub struct ChannelDescriptor {
167 pub id: &'static str,
168 pub display_name: &'static str,
169}
170
171pub fn channel_catalog() -> &'static [ChannelDescriptor] {
172 CHANNEL_CATALOG
173}
174
175pub fn normalize_channel_id(input: &str) -> Option<&'static str> {
176 let needle = input.trim();
177 if needle.is_empty() {
178 return None;
179 }
180
181 for channel in CHANNEL_CATALOG {
182 if channel.id.eq_ignore_ascii_case(needle)
183 || channel
184 .display_name
185 .replace(' ', "-")
186 .eq_ignore_ascii_case(needle)
187 || channel.display_name.eq_ignore_ascii_case(needle)
188 {
189 return Some(channel.id);
190 }
191 }
192
193 None
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201pub struct ChannelDelivery {
202 pub accepted: bool,
203 pub channel: String,
204 pub detail: String,
205}
206
207const LOCAL_CHANNELS: &[&str] = &["cli", "transcription"];
213
214pub fn is_local_channel(name: &str) -> bool {
216 LOCAL_CHANNELS.contains(&name)
217}
218
219#[derive(Default)]
224pub struct ChannelRegistry {
225 channels: HashMap<String, Arc<dyn Channel>>,
226}
227
228impl ChannelRegistry {
229 pub fn new() -> Self {
230 Self::default()
231 }
232
233 pub fn with_builtin_handlers() -> Self {
234 let mut registry = Self::new();
235 registry.register(Arc::new(CliChannel));
236 registry
237 }
238
239 pub fn register(&mut self, channel: Arc<dyn Channel>) {
240 self.channels.insert(channel.name().to_string(), channel);
241 }
242
243 pub fn get(&self, name: &str) -> Option<Arc<dyn Channel>> {
244 self.channels.get(name).cloned()
245 }
246
247 pub fn has_channel(&self, name: &str) -> bool {
248 self.channels.contains_key(name)
249 }
250
251 pub fn channel_names(&self) -> Vec<&str> {
252 self.channels.keys().map(String::as_str).collect()
253 }
254
255 pub fn all_channels(&self) -> Vec<Arc<dyn Channel>> {
256 self.channels.values().cloned().collect()
257 }
258
259 pub async fn dispatch(
263 &self,
264 channel: &str,
265 payload: serde_json::Value,
266 ) -> Option<ChannelDelivery> {
267 self.dispatch_with_boundary(channel, payload, "").await
268 }
269
270 pub async fn dispatch_with_boundary(
272 &self,
273 channel: &str,
274 payload: serde_json::Value,
275 boundary: &str,
276 ) -> Option<ChannelDelivery> {
277 let ch = self.channels.get(channel)?;
278
279 if boundary == "local_only" && !is_local_channel(channel) {
281 return Some(ChannelDelivery {
282 accepted: false,
283 channel: channel.to_string(),
284 detail: "blocked by local_only privacy boundary".to_string(),
285 });
286 }
287
288 let content = payload
289 .get("text")
290 .or_else(|| payload.get("content"))
291 .or_else(|| payload.get("message"))
292 .and_then(|v| v.as_str())
293 .unwrap_or("")
294 .to_string();
295 let recipient = payload
296 .get("recipient")
297 .or_else(|| payload.get("channel_id"))
298 .and_then(|v| v.as_str())
299 .unwrap_or("default")
300 .to_string();
301
302 let msg = SendMessage::new(content, recipient);
303 match ch.send(&msg).await {
304 Ok(()) => Some(ChannelDelivery {
305 accepted: true,
306 channel: channel.to_string(),
307 detail: "message sent".to_string(),
308 }),
309 Err(e) => Some(ChannelDelivery {
310 accepted: false,
311 channel: channel.to_string(),
312 detail: format!("send failed: {e}"),
313 }),
314 }
315 }
316}
317
318#[cfg(test)]
323mod tests {
324 use super::*;
325
326 struct EchoChannel;
327
328 #[async_trait]
329 impl Channel for EchoChannel {
330 fn name(&self) -> &str {
331 "echo"
332 }
333
334 async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
335 Ok(())
336 }
337
338 async fn listen(
339 &self,
340 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
341 ) -> anyhow::Result<()> {
342 tx.send(ChannelMessage {
343 id: "1".into(),
344 sender: "tester".into(),
345 reply_target: "tester".into(),
346 content: "hello".into(),
347 channel: "echo".into(),
348 timestamp: 123,
349 thread_ts: None,
350 privacy_boundary: String::new(),
351 })
352 .await
353 .map_err(|e| anyhow::anyhow!(e.to_string()))
354 }
355 }
356
357 #[test]
358 fn send_message_builder_success_path() {
359 let msg = SendMessage::new("hello", "user-1");
360 assert_eq!(msg.content, "hello");
361 assert_eq!(msg.recipient, "user-1");
362 assert!(msg.subject.is_none());
363 assert!(msg.thread_ts.is_none());
364
365 let threaded = msg.in_thread(Some("ts-123".into()));
366 assert_eq!(threaded.thread_ts.as_deref(), Some("ts-123"));
367 }
368
369 #[test]
370 fn send_message_with_subject_success_path() {
371 let msg = SendMessage::with_subject("body", "user", "subject line");
372 assert_eq!(msg.subject.as_deref(), Some("subject line"));
373 }
374
375 #[test]
376 fn channel_message_serde_round_trip_success_path() {
377 let msg = ChannelMessage {
378 id: "42".into(),
379 sender: "alice".into(),
380 reply_target: "alice".into(),
381 content: "ping".into(),
382 channel: "test".into(),
383 timestamp: 999,
384 thread_ts: Some("thread-1".into()),
385 privacy_boundary: String::new(),
386 };
387
388 let json = serde_json::to_string(&msg).expect("serialize should succeed");
389 let parsed: ChannelMessage =
390 serde_json::from_str(&json).expect("deserialize should succeed");
391 assert_eq!(parsed.id, "42");
392 assert_eq!(parsed.sender, "alice");
393 assert_eq!(parsed.thread_ts.as_deref(), Some("thread-1"));
394 }
395
396 #[tokio::test]
397 async fn default_trait_methods_return_success() {
398 let channel = EchoChannel;
399 assert!(channel.health_check().await);
400 assert!(channel.start_typing("bob").await.is_ok());
401 assert!(channel.stop_typing("bob").await.is_ok());
402 assert!(!channel.supports_draft_updates());
403 assert!(channel
404 .send_draft(&SendMessage::new("draft", "bob"))
405 .await
406 .unwrap()
407 .is_none());
408 }
409
410 #[tokio::test]
411 async fn listen_sends_message_to_channel() {
412 let channel = EchoChannel;
413 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
414
415 channel.listen(tx).await.unwrap();
416
417 let received = rx.recv().await.expect("message should be received");
418 assert_eq!(received.sender, "tester");
419 assert_eq!(received.content, "hello");
420 }
421
422 #[test]
423 fn registry_register_and_get_success_path() {
424 let mut registry = ChannelRegistry::new();
425 registry.register(Arc::new(EchoChannel));
426
427 assert!(registry.has_channel("echo"));
428 assert!(!registry.has_channel("missing"));
429 assert!(registry.get("echo").is_some());
430 }
431
432 #[tokio::test]
433 async fn registry_dispatch_success_path() {
434 let mut registry = ChannelRegistry::new();
435 registry.register(Arc::new(EchoChannel));
436
437 let delivery = registry
438 .dispatch("echo", serde_json::json!({"text": "hello"}))
439 .await
440 .expect("dispatch should find channel");
441
442 assert!(delivery.accepted);
443 assert_eq!(delivery.channel, "echo");
444 }
445
446 #[tokio::test]
447 async fn registry_dispatch_unknown_returns_none() {
448 let registry = ChannelRegistry::new();
449 let result = registry
450 .dispatch("missing", serde_json::json!({"text": "hello"}))
451 .await;
452 assert!(result.is_none());
453 }
454
455 #[test]
456 fn normalize_channel_id_success_path() {
457 assert_eq!(normalize_channel_id("telegram"), Some("telegram"));
458 assert_eq!(normalize_channel_id("Telegram"), Some("telegram"));
459 assert_eq!(
460 normalize_channel_id("NextCloud Talk"),
461 Some("nextcloud-talk")
462 );
463 }
464
465 #[test]
466 fn normalize_channel_id_unknown_returns_none() {
467 assert_eq!(normalize_channel_id("missing-channel"), None);
468 }
469
470 #[test]
471 fn channel_catalog_contains_known_entries() {
472 let catalog = channel_catalog();
473 assert!(!catalog.is_empty());
474 let ids: Vec<&str> = catalog.iter().map(|d| d.id).collect();
475 assert!(ids.contains(&"cli"));
476 assert!(ids.contains(&"telegram"));
477 assert!(ids.contains(&"webhook"));
478 }
479
480 #[test]
481 fn builtin_registry_has_cli_channel() {
482 let registry = ChannelRegistry::with_builtin_handlers();
483 assert!(registry.has_channel("cli"));
484 }
485
486 #[test]
489 fn channel_message_serde_backward_compat_without_privacy_boundary() {
490 let json = r#"{"id":"1","sender":"a","reply_target":"a","content":"hi","channel":"cli","timestamp":0}"#;
492 let msg: ChannelMessage = serde_json::from_str(json).expect("deserialize old format");
493 assert_eq!(msg.privacy_boundary, "");
494 }
495
496 #[test]
497 fn channel_message_serde_with_privacy_boundary() {
498 let msg = ChannelMessage {
499 id: "1".into(),
500 sender: "a".into(),
501 reply_target: "a".into(),
502 content: "hi".into(),
503 channel: "cli".into(),
504 timestamp: 0,
505 thread_ts: None,
506 privacy_boundary: "local_only".to_string(),
507 };
508 let json = serde_json::to_string(&msg).unwrap();
509 let parsed: ChannelMessage = serde_json::from_str(&json).unwrap();
510 assert_eq!(parsed.privacy_boundary, "local_only");
511 }
512
513 #[test]
514 fn is_local_channel_cli_and_transcription() {
515 assert!(is_local_channel("cli"));
516 assert!(is_local_channel("transcription"));
517 }
518
519 #[test]
520 fn is_local_channel_non_local() {
521 assert!(!is_local_channel("telegram"));
522 assert!(!is_local_channel("discord"));
523 assert!(!is_local_channel("slack"));
524 assert!(!is_local_channel("webhook"));
525 assert!(!is_local_channel("email"));
526 }
527
528 #[tokio::test]
529 async fn dispatch_local_only_blocks_non_local_channel() {
530 let mut registry = ChannelRegistry::new();
531 registry.register(Arc::new(EchoChannel)); let delivery = registry
533 .dispatch_with_boundary("echo", serde_json::json!({"text": "secret"}), "local_only")
534 .await
535 .expect("should return delivery");
536 assert!(!delivery.accepted);
537 assert!(delivery.detail.contains("local_only"));
538 }
539
540 #[tokio::test]
541 async fn dispatch_local_only_allows_local_channel() {
542 let registry = ChannelRegistry::with_builtin_handlers();
544 let delivery = registry
545 .dispatch_with_boundary("cli", serde_json::json!({"text": "hello"}), "local_only")
546 .await
547 .expect("should return delivery");
548 assert!(delivery.accepted);
549 }
550
551 #[tokio::test]
552 async fn dispatch_any_boundary_allows_all() {
553 let mut registry = ChannelRegistry::new();
554 registry.register(Arc::new(EchoChannel));
555 let delivery = registry
556 .dispatch_with_boundary("echo", serde_json::json!({"text": "hello"}), "any")
557 .await
558 .expect("should return delivery");
559 assert!(delivery.accepted);
560 }
561
562 #[tokio::test]
563 async fn dispatch_empty_boundary_allows_all() {
564 let mut registry = ChannelRegistry::new();
565 registry.register(Arc::new(EchoChannel));
566 let delivery = registry
567 .dispatch_with_boundary("echo", serde_json::json!({"text": "hello"}), "")
568 .await
569 .expect("should return delivery");
570 assert!(delivery.accepted);
571 }
572
573 #[tokio::test]
574 async fn dispatch_encrypted_only_allows_non_local() {
575 let mut registry = ChannelRegistry::new();
576 registry.register(Arc::new(EchoChannel));
577 let delivery = registry
578 .dispatch_with_boundary(
579 "echo",
580 serde_json::json!({"text": "hello"}),
581 "encrypted_only",
582 )
583 .await
584 .expect("should return delivery");
585 assert!(delivery.accepted);
586 }
587}