bob_runtime/
message_bus.rs1use bob_core::{
35 error::AgentError,
36 ports::MessageBusPort,
37 types::{InboundMessage, OutboundMessage},
38};
39use tokio::sync::{Mutex, mpsc};
40
41#[derive(Debug)]
46pub struct TokioMessageBus {
47 outbound_tx: mpsc::Sender<OutboundMessage>,
49 inbound_rx: Mutex<mpsc::Receiver<InboundMessage>>,
52}
53
54#[async_trait::async_trait]
55impl MessageBusPort for TokioMessageBus {
56 async fn send(&self, msg: OutboundMessage) -> Result<(), AgentError> {
57 self.outbound_tx
58 .send(msg)
59 .await
60 .map_err(|_| AgentError::Config("message bus outbound channel closed".into()))
61 }
62
63 async fn recv(&self) -> Result<InboundMessage, AgentError> {
64 self.inbound_rx
65 .lock()
66 .await
67 .recv()
68 .await
69 .ok_or_else(|| AgentError::Config("message bus inbound channel closed".into()))
70 }
71}
72
73#[derive(Debug)]
75pub struct BotSideBus {
76 inbound_tx: mpsc::Sender<InboundMessage>,
78 outbound_rx: mpsc::Receiver<OutboundMessage>,
80}
81
82impl BotSideBus {
83 pub async fn push(&self, msg: InboundMessage) -> Result<(), AgentError> {
85 self.inbound_tx
86 .send(msg)
87 .await
88 .map_err(|_| AgentError::Config("message bus inbound channel closed".into()))
89 }
90
91 pub async fn recv_outbound(&mut self) -> Option<OutboundMessage> {
93 self.outbound_rx.recv().await
94 }
95}
96
97#[derive(Debug)]
99#[must_use]
100pub struct MessageBusBuilder {
101 buffer_size: usize,
102}
103
104impl MessageBusBuilder {
105 pub fn new() -> Self {
107 Self { buffer_size: 64 }
108 }
109
110 pub fn with_buffer_size(mut self, size: usize) -> Self {
112 self.buffer_size = size;
113 self
114 }
115
116 pub fn build(self) -> MessageBusHandle {
118 let (inbound_tx, inbound_rx) = mpsc::channel(self.buffer_size);
119 let (outbound_tx, outbound_rx) = mpsc::channel(self.buffer_size);
120
121 let agent_port = TokioMessageBus { outbound_tx, inbound_rx: Mutex::new(inbound_rx) };
122 let bot_port = BotSideBus { inbound_tx, outbound_rx };
123
124 MessageBusHandle { agent_port, bot_port }
125 }
126}
127
128impl Default for MessageBusBuilder {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134#[derive(Debug)]
139pub struct MessageBusHandle {
140 agent_port: TokioMessageBus,
142 bot_port: BotSideBus,
144}
145
146impl MessageBusHandle {
147 pub fn new(buffer_size: usize) -> Self {
149 MessageBusBuilder::new().with_buffer_size(buffer_size).build()
150 }
151
152 pub fn split(self) -> (TokioMessageBus, BotSideBus) {
154 (self.agent_port, self.bot_port)
155 }
156
157 #[must_use]
159 pub fn agent_port(&self) -> &TokioMessageBus {
160 &self.agent_port
161 }
162
163 pub fn bot_port(&mut self) -> &mut BotSideBus {
165 &mut self.bot_port
166 }
167}
168
169#[cfg(test)]
172mod tests {
173 use std::sync::Arc;
174
175 use bob_core::ports::MessageBusPort;
176
177 use super::*;
178
179 fn sample_inbound(content: &str) -> InboundMessage {
180 InboundMessage {
181 channel: "slack".into(),
182 sender_id: "user-1".into(),
183 chat_id: "chat-1".into(),
184 content: content.into(),
185 timestamp_ms: 1_700_000_000_000,
186 }
187 }
188
189 fn sample_outbound(content: &str) -> OutboundMessage {
190 OutboundMessage {
191 channel: "slack".into(),
192 chat_id: "chat-1".into(),
193 content: content.into(),
194 is_progress: false,
195 reply_to: None,
196 }
197 }
198
199 #[tokio::test]
200 async fn send_and_recv_single_message() {
201 let handle = MessageBusHandle::new(8);
202 let (agent, bot) = handle.split();
203
204 bot.push(sample_inbound("hello")).await.expect("push should succeed");
205
206 let received = agent.recv().await.expect("recv should succeed");
207 assert_eq!(received.content, "hello");
208 assert_eq!(received.channel, "slack");
209 assert_eq!(received.sender_id, "user-1");
210 }
211
212 #[tokio::test]
213 async fn multiple_messages_received_in_order() {
214 let handle = MessageBusHandle::new(16);
215 let (agent, bot) = handle.split();
216
217 for i in 0..5 {
218 bot.push(sample_inbound(&format!("msg-{i}"))).await.expect("push should succeed");
219 }
220
221 for i in 0..5 {
222 let received = agent.recv().await.expect("recv should succeed");
223 assert_eq!(received.content, format!("msg-{i}"));
224 }
225 }
226
227 #[tokio::test]
228 async fn outbound_send_through_agent_port() {
229 let handle = MessageBusHandle::new(8);
230 let (agent, mut bot) = handle.split();
231
232 agent.send(sample_outbound("response")).await.expect("send should succeed");
233
234 let received = bot.recv_outbound().await.expect("should receive outbound");
235 assert_eq!(received.content, "response");
236 assert!(!received.is_progress);
237 }
238
239 #[tokio::test]
240 async fn outbound_progress_messages() {
241 let handle = MessageBusHandle::new(8);
242 let (agent, mut bot) = handle.split();
243
244 agent
245 .send(OutboundMessage {
246 channel: "discord".into(),
247 chat_id: "ch-1".into(),
248 content: "thinking...".into(),
249 is_progress: true,
250 reply_to: None,
251 })
252 .await
253 .expect("send should succeed");
254
255 let received = bot.recv_outbound().await.expect("should receive outbound");
256 assert!(received.is_progress);
257 assert_eq!(received.content, "thinking...");
258 }
259
260 #[tokio::test]
261 async fn recv_returns_error_when_sender_dropped() {
262 let handle = MessageBusHandle::new(8);
263 let (agent, bot) = handle.split();
264
265 drop(bot);
267
268 let result = agent.recv().await;
269 assert!(result.is_err(), "recv should fail when sender is dropped");
270 }
271
272 #[tokio::test]
273 async fn send_returns_error_when_receiver_dropped() {
274 let handle = MessageBusHandle::new(8);
275 let (agent, bot) = handle.split();
276
277 drop(bot);
281
282 let result = agent.send(sample_outbound("x")).await;
283 assert!(result.is_err(), "send should fail when receiver is dropped");
284 }
285
286 #[tokio::test]
287 async fn builder_pattern_default_buffer() {
288 let handle = MessageBusBuilder::new().build();
289 let (agent, bot) = handle.split();
290
291 bot.push(sample_inbound("test")).await.expect("push should succeed");
292 let received = agent.recv().await.expect("recv should succeed");
293 assert_eq!(received.content, "test");
294 }
295
296 #[tokio::test]
297 async fn builder_pattern_custom_buffer() {
298 let handle = MessageBusBuilder::new().with_buffer_size(2).build();
299 let (agent, bot) = handle.split();
300
301 bot.push(sample_inbound("a")).await.expect("push should succeed");
303 bot.push(sample_inbound("b")).await.expect("push should succeed");
304
305 assert_eq!(agent.recv().await.unwrap_or_else(|_| unreachable!()).content, "a");
307 assert_eq!(agent.recv().await.unwrap_or_else(|_| unreachable!()).content, "b");
308 }
309
310 #[tokio::test]
311 async fn agent_port_as_dyn_message_bus() {
312 let handle = MessageBusHandle::new(8);
313 let (agent, bot) = handle.split();
314
315 let bus: Arc<dyn MessageBusPort> = Arc::new(agent);
316
317 bot.push(sample_inbound("via-dyn")).await.expect("push should succeed");
318
319 let received = bus.recv().await.expect("dyn recv should succeed");
320 assert_eq!(received.content, "via-dyn");
321 }
322
323 #[tokio::test]
324 async fn bidirectional_communication() {
325 let handle = MessageBusHandle::new(8);
326 let (agent, mut bot) = handle.split();
327
328 bot.push(sample_inbound("question")).await.expect("push should succeed");
330
331 let inbound = agent.recv().await.expect("recv should succeed");
333 assert_eq!(inbound.content, "question");
334
335 agent
336 .send(OutboundMessage {
337 channel: inbound.channel,
338 chat_id: inbound.chat_id,
339 content: format!("answer to: {}", inbound.content),
340 is_progress: false,
341 reply_to: None,
342 })
343 .await
344 .expect("send should succeed");
345
346 let outbound = bot.recv_outbound().await.expect("should receive outbound");
348 assert_eq!(outbound.content, "answer to: question");
349 }
350}