aagt_core/bus/
message_bus.rs1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use tokio::sync::mpsc;
4use std::sync::Arc;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct InboundMessage {
9 pub channel: String,
11 pub sender_id: String,
13 pub chat_id: String,
15 pub content: String,
17 pub media: Option<Vec<MediaAttachment>>,
19 pub timestamp: DateTime<Utc>,
21 pub session_key: String,
23}
24
25impl InboundMessage {
26 pub fn new(
28 channel: impl Into<String>,
29 sender_id: impl Into<String>,
30 chat_id: impl Into<String>,
31 content: impl Into<String>,
32 ) -> Self {
33 let channel = channel.into();
34 let chat_id = chat_id.into();
35 let session_key = format!("{}:{}", channel, chat_id);
36
37 Self {
38 channel,
39 sender_id: sender_id.into(),
40 chat_id,
41 content: content.into(),
42 media: None,
43 timestamp: Utc::now(),
44 session_key,
45 }
46 }
47
48 pub fn with_media(mut self, media: Vec<MediaAttachment>) -> Self {
50 self.media = Some(media);
51 self
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct OutboundMessage {
58 pub channel: String,
60 pub chat_id: String,
62 pub content: String,
64 pub media: Option<Vec<MediaAttachment>>,
66}
67
68impl OutboundMessage {
69 pub fn new(
71 channel: impl Into<String>,
72 chat_id: impl Into<String>,
73 content: impl Into<String>,
74 ) -> Self {
75 Self {
76 channel: channel.into(),
77 chat_id: chat_id.into(),
78 content: content.into(),
79 media: None,
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MediaAttachment {
87 pub media_type: MediaType,
89 pub url: String,
91 pub caption: Option<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97#[serde(rename_all = "lowercase")]
98pub enum MediaType {
99 Image,
100 Voice,
101 Video,
102 Document,
103}
104
105pub struct MessageBus {
134 inbound_tx: mpsc::Sender<InboundMessage>,
135 inbound_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<InboundMessage>>>,
136 outbound_tx: mpsc::Sender<OutboundMessage>,
137 outbound_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<OutboundMessage>>>,
138}
139
140impl MessageBus {
141 pub fn new(buffer_size: usize) -> Self {
143 let (inbound_tx, inbound_rx) = mpsc::channel(buffer_size);
144 let (outbound_tx, outbound_rx) = mpsc::channel(buffer_size);
145
146 Self {
147 inbound_tx,
148 inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
149 outbound_tx,
150 outbound_rx: Arc::new(tokio::sync::Mutex::new(outbound_rx)),
151 }
152 }
153
154 pub async fn publish_inbound(&self, message: InboundMessage) -> crate::error::Result<()> {
156 self.inbound_tx.send(message).await
157 .map_err(|e| crate::error::Error::Internal(format!("Failed to publish inbound message: {}", e)))
158 }
159
160 pub async fn consume_inbound(&self) -> crate::error::Result<InboundMessage> {
162 let mut rx = self.inbound_rx.lock().await;
163 rx.recv().await
164 .ok_or_else(|| crate::error::Error::Internal("Inbound channel closed".to_string()))
165 }
166
167 pub async fn publish_outbound(&self, message: OutboundMessage) -> crate::error::Result<()> {
169 self.outbound_tx.send(message).await
170 .map_err(|e| crate::error::Error::Internal(format!("Failed to publish outbound message: {}", e)))
171 }
172
173 pub async fn consume_outbound(&self) -> crate::error::Result<OutboundMessage> {
175 let mut rx = self.outbound_rx.lock().await;
176 rx.recv().await
177 .ok_or_else(|| crate::error::Error::Internal("Outbound channel closed".to_string()))
178 }
179
180 pub fn inbound_sender(&self) -> mpsc::Sender<InboundMessage> {
182 self.inbound_tx.clone()
183 }
184
185 pub fn outbound_sender(&self) -> mpsc::Sender<OutboundMessage> {
187 self.outbound_tx.clone()
188 }
189}
190
191impl Clone for MessageBus {
192 fn clone(&self) -> Self {
193 Self {
194 inbound_tx: self.inbound_tx.clone(),
195 inbound_rx: Arc::clone(&self.inbound_rx),
196 outbound_tx: self.outbound_tx.clone(),
197 outbound_rx: Arc::clone(&self.outbound_rx),
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[tokio::test]
207 async fn test_message_bus_flow() {
208 let bus = MessageBus::new(10);
209
210 let inbound = InboundMessage::new("telegram", "user123", "chat456", "Hello Agent");
212 bus.publish_inbound(inbound.clone()).await.unwrap();
213
214 let received = bus.consume_inbound().await.unwrap();
216 assert_eq!(received.channel, "telegram");
217 assert_eq!(received.content, "Hello Agent");
218
219 let outbound = OutboundMessage::new("telegram", "chat456", "Hello User");
221 bus.publish_outbound(outbound).await.unwrap();
222
223 let response = bus.consume_outbound().await.unwrap();
225 assert_eq!(response.content, "Hello User");
226 }
227
228 #[tokio::test]
229 async fn test_session_key_generation() {
230 let msg = InboundMessage::new("telegram", "user123", "chat456", "test");
231 assert_eq!(msg.session_key, "telegram:chat456");
232 }
233}