Skip to main content

bob_runtime/
message_bus.rs

1//! # Message Bus
2//!
3//! Tokio-based [`MessageBusPort`] implementation using `tokio::sync::mpsc` channels.
4//!
5//! ## Architecture
6//!
7//! ```text
8//! ┌──────────┐  InboundMessage   ┌──────────────┐
9//! │ Bot /    │ ────────────────→ │              │
10//! │ Adapter  │                   │ TokioMessage │
11//! │          │ ←──────────────── │     Bus      │
12//! └──────────┘  OutboundMessage  └──────────────┘
13//!                                  ↑         ↓
14//!                           recv()         send()
15//!                        (agent side)   (bot side)
16//! ```
17//!
18//! ## Usage
19//!
20//! ```rust,ignore
21//! use bob_runtime::message_bus::MessageBusHandle;
22//!
23//! let handle = MessageBusHandle::new(64);
24//! let inbound = handle.inbound_port();
25//! let outbound = handle.outbound_port();
26//!
27//! // Bot side pushes inbound messages
28//! inbound.push(msg).await?;
29//!
30//! // Agent side consumes inbound messages
31//! let msg = outbound.recv().await?;
32//! ```
33
34use bob_core::{
35    error::AgentError,
36    ports::MessageBusPort,
37    types::{InboundMessage, OutboundMessage},
38};
39use tokio::sync::{Mutex, mpsc};
40
41/// Concrete [`MessageBusPort`] backed by `tokio::sync::mpsc` channels.
42///
43/// Each `TokioMessageBus` instance holds one direction of communication.
44/// Use [`MessageBusHandle`] to create a matched pair.
45#[derive(Debug)]
46pub struct TokioMessageBus {
47    /// Sender for outbound messages (agent → bot).
48    outbound_tx: mpsc::Sender<OutboundMessage>,
49    /// Receiver for inbound messages (bot → agent), wrapped in `Mutex` for
50    /// interior mutability since `MessageBusPort::recv` takes `&self`.
51    inbound_rx: Mutex<mpsc::Receiver<InboundMessage>>,
52}
53
54#[async_trait::async_trait]
55impl MessageBusPort for TokioMessageBus {
56    async fn send(&self, msg: OutboundMessage) -> Result<(), AgentError> {
57        self.outbound_tx
58            .send(msg)
59            .await
60            .map_err(|_| AgentError::Config("message bus outbound channel closed".into()))
61    }
62
63    async fn recv(&self) -> Result<InboundMessage, AgentError> {
64        self.inbound_rx
65            .lock()
66            .await
67            .recv()
68            .await
69            .ok_or_else(|| AgentError::Config("message bus inbound channel closed".into()))
70    }
71}
72
73/// The bot-side port for pushing inbound messages and consuming outbound messages.
74#[derive(Debug)]
75pub struct BotSideBus {
76    /// Sender for inbound messages (bot → agent).
77    inbound_tx: mpsc::Sender<InboundMessage>,
78    /// Receiver for outbound messages (agent → bot).
79    outbound_rx: mpsc::Receiver<OutboundMessage>,
80}
81
82impl BotSideBus {
83    /// Push an inbound message onto the bus for the agent to consume.
84    pub async fn push(&self, msg: InboundMessage) -> Result<(), AgentError> {
85        self.inbound_tx
86            .send(msg)
87            .await
88            .map_err(|_| AgentError::Config("message bus inbound channel closed".into()))
89    }
90
91    /// Receive the next outbound message from the agent.
92    pub async fn recv_outbound(&mut self) -> Option<OutboundMessage> {
93        self.outbound_rx.recv().await
94    }
95}
96
97/// Builder for configuring a [`MessageBusHandle`].
98#[derive(Debug)]
99#[must_use]
100pub struct MessageBusBuilder {
101    buffer_size: usize,
102}
103
104impl MessageBusBuilder {
105    /// Create a new builder with default settings.
106    pub fn new() -> Self {
107        Self { buffer_size: 64 }
108    }
109
110    /// Set the channel buffer size (number of messages that can be buffered).
111    pub fn with_buffer_size(mut self, size: usize) -> Self {
112        self.buffer_size = size;
113        self
114    }
115
116    /// Build the message bus handle.
117    pub fn build(self) -> MessageBusHandle {
118        let (inbound_tx, inbound_rx) = mpsc::channel(self.buffer_size);
119        let (outbound_tx, outbound_rx) = mpsc::channel(self.buffer_size);
120
121        let agent_port = TokioMessageBus { outbound_tx, inbound_rx: Mutex::new(inbound_rx) };
122        let bot_port = BotSideBus { inbound_tx, outbound_rx };
123
124        MessageBusHandle { agent_port, bot_port }
125    }
126}
127
128impl Default for MessageBusBuilder {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134/// Handle that owns both sides of a message bus channel pair.
135///
136/// Destructure into [`TokioMessageBus`] (agent side) and [`BotSideBus`]
137/// (bot side) for independent use.
138#[derive(Debug)]
139pub struct MessageBusHandle {
140    /// Agent-side port implementing [`MessageBusPort`].
141    agent_port: TokioMessageBus,
142    /// Bot-side port for pushing inbound and receiving outbound messages.
143    bot_port: BotSideBus,
144}
145
146impl MessageBusHandle {
147    /// Create a new message bus with the given buffer size.
148    pub fn new(buffer_size: usize) -> Self {
149        MessageBusBuilder::new().with_buffer_size(buffer_size).build()
150    }
151
152    /// Consume the handle and return both ports.
153    pub fn split(self) -> (TokioMessageBus, BotSideBus) {
154        (self.agent_port, self.bot_port)
155    }
156
157    /// Get a reference to the agent-side port.
158    #[must_use]
159    pub fn agent_port(&self) -> &TokioMessageBus {
160        &self.agent_port
161    }
162
163    /// Get a mutable reference to the bot-side port.
164    pub fn bot_port(&mut self) -> &mut BotSideBus {
165        &mut self.bot_port
166    }
167}
168
169// ── Tests ────────────────────────────────────────────────────────────
170
171#[cfg(test)]
172mod tests {
173    use std::sync::Arc;
174
175    use bob_core::ports::MessageBusPort;
176
177    use super::*;
178
179    fn sample_inbound(content: &str) -> InboundMessage {
180        InboundMessage {
181            channel: "slack".into(),
182            sender_id: "user-1".into(),
183            chat_id: "chat-1".into(),
184            content: content.into(),
185            timestamp_ms: 1_700_000_000_000,
186        }
187    }
188
189    fn sample_outbound(content: &str) -> OutboundMessage {
190        OutboundMessage {
191            channel: "slack".into(),
192            chat_id: "chat-1".into(),
193            content: content.into(),
194            is_progress: false,
195            reply_to: None,
196        }
197    }
198
199    #[tokio::test]
200    async fn send_and_recv_single_message() {
201        let handle = MessageBusHandle::new(8);
202        let (agent, bot) = handle.split();
203
204        bot.push(sample_inbound("hello")).await.expect("push should succeed");
205
206        let received = agent.recv().await.expect("recv should succeed");
207        assert_eq!(received.content, "hello");
208        assert_eq!(received.channel, "slack");
209        assert_eq!(received.sender_id, "user-1");
210    }
211
212    #[tokio::test]
213    async fn multiple_messages_received_in_order() {
214        let handle = MessageBusHandle::new(16);
215        let (agent, bot) = handle.split();
216
217        for i in 0..5 {
218            bot.push(sample_inbound(&format!("msg-{i}"))).await.expect("push should succeed");
219        }
220
221        for i in 0..5 {
222            let received = agent.recv().await.expect("recv should succeed");
223            assert_eq!(received.content, format!("msg-{i}"));
224        }
225    }
226
227    #[tokio::test]
228    async fn outbound_send_through_agent_port() {
229        let handle = MessageBusHandle::new(8);
230        let (agent, mut bot) = handle.split();
231
232        agent.send(sample_outbound("response")).await.expect("send should succeed");
233
234        let received = bot.recv_outbound().await.expect("should receive outbound");
235        assert_eq!(received.content, "response");
236        assert!(!received.is_progress);
237    }
238
239    #[tokio::test]
240    async fn outbound_progress_messages() {
241        let handle = MessageBusHandle::new(8);
242        let (agent, mut bot) = handle.split();
243
244        agent
245            .send(OutboundMessage {
246                channel: "discord".into(),
247                chat_id: "ch-1".into(),
248                content: "thinking...".into(),
249                is_progress: true,
250                reply_to: None,
251            })
252            .await
253            .expect("send should succeed");
254
255        let received = bot.recv_outbound().await.expect("should receive outbound");
256        assert!(received.is_progress);
257        assert_eq!(received.content, "thinking...");
258    }
259
260    #[tokio::test]
261    async fn recv_returns_error_when_sender_dropped() {
262        let handle = MessageBusHandle::new(8);
263        let (agent, bot) = handle.split();
264
265        // Drop the bot side (which holds the inbound sender).
266        drop(bot);
267
268        let result = agent.recv().await;
269        assert!(result.is_err(), "recv should fail when sender is dropped");
270    }
271
272    #[tokio::test]
273    async fn send_returns_error_when_receiver_dropped() {
274        let handle = MessageBusHandle::new(8);
275        let (agent, bot) = handle.split();
276
277        // Drop the bot side's outbound receiver by dropping the whole bot.
278        // We need to extract and drop just the receiver — but since BotSideBus
279        // owns both, we drop the whole thing.
280        drop(bot);
281
282        let result = agent.send(sample_outbound("x")).await;
283        assert!(result.is_err(), "send should fail when receiver is dropped");
284    }
285
286    #[tokio::test]
287    async fn builder_pattern_default_buffer() {
288        let handle = MessageBusBuilder::new().build();
289        let (agent, bot) = handle.split();
290
291        bot.push(sample_inbound("test")).await.expect("push should succeed");
292        let received = agent.recv().await.expect("recv should succeed");
293        assert_eq!(received.content, "test");
294    }
295
296    #[tokio::test]
297    async fn builder_pattern_custom_buffer() {
298        let handle = MessageBusBuilder::new().with_buffer_size(2).build();
299        let (agent, bot) = handle.split();
300
301        // Fill the small buffer.
302        bot.push(sample_inbound("a")).await.expect("push should succeed");
303        bot.push(sample_inbound("b")).await.expect("push should succeed");
304
305        // Drain in order.
306        assert_eq!(agent.recv().await.unwrap_or_else(|_| unreachable!()).content, "a");
307        assert_eq!(agent.recv().await.unwrap_or_else(|_| unreachable!()).content, "b");
308    }
309
310    #[tokio::test]
311    async fn agent_port_as_dyn_message_bus() {
312        let handle = MessageBusHandle::new(8);
313        let (agent, bot) = handle.split();
314
315        let bus: Arc<dyn MessageBusPort> = Arc::new(agent);
316
317        bot.push(sample_inbound("via-dyn")).await.expect("push should succeed");
318
319        let received = bus.recv().await.expect("dyn recv should succeed");
320        assert_eq!(received.content, "via-dyn");
321    }
322
323    #[tokio::test]
324    async fn bidirectional_communication() {
325        let handle = MessageBusHandle::new(8);
326        let (agent, mut bot) = handle.split();
327
328        // Bot sends inbound.
329        bot.push(sample_inbound("question")).await.expect("push should succeed");
330
331        // Agent receives inbound and sends outbound.
332        let inbound = agent.recv().await.expect("recv should succeed");
333        assert_eq!(inbound.content, "question");
334
335        agent
336            .send(OutboundMessage {
337                channel: inbound.channel,
338                chat_id: inbound.chat_id,
339                content: format!("answer to: {}", inbound.content),
340                is_progress: false,
341                reply_to: None,
342            })
343            .await
344            .expect("send should succeed");
345
346        // Bot receives outbound.
347        let outbound = bot.recv_outbound().await.expect("should receive outbound");
348        assert_eq!(outbound.content, "answer to: question");
349    }
350}