Skip to main content

aagt_core/bus/
message_bus.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use tokio::sync::mpsc;
4use std::sync::Arc;
5
6/// Inbound message from external channels (Telegram, CLI, Scheduler, etc.)
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct InboundMessage {
9    /// Source channel (e.g., "telegram", "cli", "scheduler")
10    pub channel: String,
11    /// Sender identifier (user ID, phone number, etc.)
12    pub sender_id: String,
13    /// Chat/conversation identifier
14    pub chat_id: String,
15    /// Message content
16    pub content: String,
17    /// Optional: Media attachments (images, voice, etc.)
18    pub media: Option<Vec<MediaAttachment>>,
19    /// Message timestamp
20    pub timestamp: DateTime<Utc>,
21    /// Session key for conversation tracking
22    pub session_key: String,
23}
24
25impl InboundMessage {
26    /// Create a new inbound message
27    pub fn new(
28        channel: impl Into<String>,
29        sender_id: impl Into<String>,
30        chat_id: impl Into<String>,
31        content: impl Into<String>,
32    ) -> Self {
33        let channel = channel.into();
34        let chat_id = chat_id.into();
35        let session_key = format!("{}:{}", channel, chat_id);
36        
37        Self {
38            channel,
39            sender_id: sender_id.into(),
40            chat_id,
41            content: content.into(),
42            media: None,
43            timestamp: Utc::now(),
44            session_key,
45        }
46    }
47    
48    /// Add media attachment
49    pub fn with_media(mut self, media: Vec<MediaAttachment>) -> Self {
50        self.media = Some(media);
51        self
52    }
53}
54
55/// Outbound message to external channels
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct OutboundMessage {
58    /// Target channel
59    pub channel: String,
60    /// Target chat ID
61    pub chat_id: String,
62    /// Message content
63    pub content: String,
64    /// Optional: Media attachments
65    pub media: Option<Vec<MediaAttachment>>,
66}
67
68impl OutboundMessage {
69    /// Create a new outbound message
70    pub fn new(
71        channel: impl Into<String>,
72        chat_id: impl Into<String>,
73        content: impl Into<String>,
74    ) -> Self {
75        Self {
76            channel: channel.into(),
77            chat_id: chat_id.into(),
78            content: content.into(),
79            media: None,
80        }
81    }
82}
83
84/// Media attachment (images, voice, documents)
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MediaAttachment {
87    /// Media type
88    pub media_type: MediaType,
89    /// File path or URL
90    pub url: String,
91    /// Optional caption
92    pub caption: Option<String>,
93}
94
95/// Media types
96#[derive(Debug, Clone, Serialize, Deserialize)]
97#[serde(rename_all = "lowercase")]
98pub enum MediaType {
99    Image,
100    Voice,
101    Video,
102    Document,
103}
104
105/// Message Bus - central routing for all messages
106/// 
107/// # Architecture
108/// 
109/// ```text
110/// Telegram ──┐
111/// Discord ───┼──▶ InboundQueue ──▶ Agent ──▶ OutboundQueue ──┐
112/// CLI ───────┤                                               ├──▶ Channels
113/// Scheduler ─┘                                               └──▶ Notifiers
114/// ```
115/// 
116/// # Example
117/// 
118/// ```ignore
119/// let bus = MessageBus::new(100);
120/// 
121/// // Channel publishes message
122/// bus.publish_inbound(InboundMessage::new("telegram", "123", "456", "Hello")).await?;
123/// 
124/// // Agent consumes message
125/// let msg = bus.consume_inbound().await?;
126/// 
127/// // Agent publishes response
128/// bus.publish_outbound(OutboundMessage::new("telegram", "456", "Hi there!")).await?;
129/// 
130/// // Channel consumes response
131/// let response = bus.consume_outbound().await?;
132/// ```
133pub struct MessageBus {
134    inbound_tx: mpsc::Sender<InboundMessage>,
135    inbound_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<InboundMessage>>>,
136    outbound_tx: mpsc::Sender<OutboundMessage>,
137    outbound_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<OutboundMessage>>>,
138}
139
140impl MessageBus {
141    /// Create a new message bus with specified buffer size
142    pub fn new(buffer_size: usize) -> Self {
143        let (inbound_tx, inbound_rx) = mpsc::channel(buffer_size);
144        let (outbound_tx, outbound_rx) = mpsc::channel(buffer_size);
145        
146        Self {
147            inbound_tx,
148            inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
149            outbound_tx,
150            outbound_rx: Arc::new(tokio::sync::Mutex::new(outbound_rx)),
151        }
152    }
153    
154    /// Publish an inbound message (from channel to agent)
155    pub async fn publish_inbound(&self, message: InboundMessage) -> crate::error::Result<()> {
156        self.inbound_tx.send(message).await
157            .map_err(|e| crate::error::Error::Internal(format!("Failed to publish inbound message: {}", e)))
158    }
159    
160    /// Consume an inbound message (agent reads from channels)
161    pub async fn consume_inbound(&self) -> crate::error::Result<InboundMessage> {
162        let mut rx = self.inbound_rx.lock().await;
163        rx.recv().await
164            .ok_or_else(|| crate::error::Error::Internal("Inbound channel closed".to_string()))
165    }
166    
167    /// Publish an outbound message (from agent to channels)
168    pub async fn publish_outbound(&self, message: OutboundMessage) -> crate::error::Result<()> {
169        self.outbound_tx.send(message).await
170            .map_err(|e| crate::error::Error::Internal(format!("Failed to publish outbound message: {}", e)))
171    }
172    
173    /// Consume an outbound message (channels read agent responses)
174    pub async fn consume_outbound(&self) -> crate::error::Result<OutboundMessage> {
175        let mut rx = self.outbound_rx.lock().await;
176        rx.recv().await
177            .ok_or_else(|| crate::error::Error::Internal("Outbound channel closed".to_string()))
178    }
179    
180    /// Get inbound sender (for cloning to multiple publishers)
181    pub fn inbound_sender(&self) -> mpsc::Sender<InboundMessage> {
182        self.inbound_tx.clone()
183    }
184    
185    /// Get outbound sender (for agent to publish responses)
186    pub fn outbound_sender(&self) -> mpsc::Sender<OutboundMessage> {
187        self.outbound_tx.clone()
188    }
189}
190
191impl Clone for MessageBus {
192    fn clone(&self) -> Self {
193        Self {
194            inbound_tx: self.inbound_tx.clone(),
195            inbound_rx: Arc::clone(&self.inbound_rx),
196            outbound_tx: self.outbound_tx.clone(),
197            outbound_rx: Arc::clone(&self.outbound_rx),
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[tokio::test]
207    async fn test_message_bus_flow() {
208        let bus = MessageBus::new(10);
209        
210        // Simulate channel publishing message
211        let inbound = InboundMessage::new("telegram", "user123", "chat456", "Hello Agent");
212        bus.publish_inbound(inbound.clone()).await.unwrap();
213        
214        // Simulate agent consuming message
215        let received = bus.consume_inbound().await.unwrap();
216        assert_eq!(received.channel, "telegram");
217        assert_eq!(received.content, "Hello Agent");
218        
219        // Simulate agent responding
220        let outbound = OutboundMessage::new("telegram", "chat456", "Hello User");
221        bus.publish_outbound(outbound).await.unwrap();
222        
223        // Simulate channel consuming response
224        let response = bus.consume_outbound().await.unwrap();
225        assert_eq!(response.content, "Hello User");
226    }
227
228    #[tokio::test]
229    async fn test_session_key_generation() {
230        let msg = InboundMessage::new("telegram", "user123", "chat456", "test");
231        assert_eq!(msg.session_key, "telegram:chat456");
232    }
233}