use bob_core::{
error::AgentError,
ports::MessageBusPort,
types::{InboundMessage, OutboundMessage},
};
use tokio::sync::{Mutex, mpsc};
#[derive(Debug)]
pub struct TokioMessageBus {
outbound_tx: mpsc::Sender<OutboundMessage>,
inbound_rx: Mutex<mpsc::Receiver<InboundMessage>>,
}
#[async_trait::async_trait]
impl MessageBusPort for TokioMessageBus {
async fn send(&self, msg: OutboundMessage) -> Result<(), AgentError> {
self.outbound_tx
.send(msg)
.await
.map_err(|_| AgentError::Config("message bus outbound channel closed".into()))
}
async fn recv(&self) -> Result<InboundMessage, AgentError> {
self.inbound_rx
.lock()
.await
.recv()
.await
.ok_or_else(|| AgentError::Config("message bus inbound channel closed".into()))
}
}
#[derive(Debug)]
pub struct BotSideBus {
inbound_tx: mpsc::Sender<InboundMessage>,
outbound_rx: mpsc::Receiver<OutboundMessage>,
}
impl BotSideBus {
pub async fn push(&self, msg: InboundMessage) -> Result<(), AgentError> {
self.inbound_tx
.send(msg)
.await
.map_err(|_| AgentError::Config("message bus inbound channel closed".into()))
}
pub async fn recv_outbound(&mut self) -> Option<OutboundMessage> {
self.outbound_rx.recv().await
}
}
#[derive(Debug)]
#[must_use]
pub struct MessageBusBuilder {
buffer_size: usize,
}
impl MessageBusBuilder {
pub fn new() -> Self {
Self { buffer_size: 64 }
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn build(self) -> MessageBusHandle {
let (inbound_tx, inbound_rx) = mpsc::channel(self.buffer_size);
let (outbound_tx, outbound_rx) = mpsc::channel(self.buffer_size);
let agent_port = TokioMessageBus { outbound_tx, inbound_rx: Mutex::new(inbound_rx) };
let bot_port = BotSideBus { inbound_tx, outbound_rx };
MessageBusHandle { agent_port, bot_port }
}
}
impl Default for MessageBusBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct MessageBusHandle {
agent_port: TokioMessageBus,
bot_port: BotSideBus,
}
impl MessageBusHandle {
pub fn new(buffer_size: usize) -> Self {
MessageBusBuilder::new().with_buffer_size(buffer_size).build()
}
pub fn split(self) -> (TokioMessageBus, BotSideBus) {
(self.agent_port, self.bot_port)
}
#[must_use]
pub fn agent_port(&self) -> &TokioMessageBus {
&self.agent_port
}
pub fn bot_port(&mut self) -> &mut BotSideBus {
&mut self.bot_port
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bob_core::ports::MessageBusPort;
use super::*;
fn sample_inbound(content: &str) -> InboundMessage {
InboundMessage {
channel: "slack".into(),
sender_id: "user-1".into(),
chat_id: "chat-1".into(),
content: content.into(),
timestamp_ms: 1_700_000_000_000,
}
}
fn sample_outbound(content: &str) -> OutboundMessage {
OutboundMessage {
channel: "slack".into(),
chat_id: "chat-1".into(),
content: content.into(),
is_progress: false,
reply_to: None,
}
}
#[tokio::test]
async fn send_and_recv_single_message() {
let handle = MessageBusHandle::new(8);
let (agent, bot) = handle.split();
bot.push(sample_inbound("hello")).await.expect("push should succeed");
let received = agent.recv().await.expect("recv should succeed");
assert_eq!(received.content, "hello");
assert_eq!(received.channel, "slack");
assert_eq!(received.sender_id, "user-1");
}
#[tokio::test]
async fn multiple_messages_received_in_order() {
let handle = MessageBusHandle::new(16);
let (agent, bot) = handle.split();
for i in 0..5 {
bot.push(sample_inbound(&format!("msg-{i}"))).await.expect("push should succeed");
}
for i in 0..5 {
let received = agent.recv().await.expect("recv should succeed");
assert_eq!(received.content, format!("msg-{i}"));
}
}
#[tokio::test]
async fn outbound_send_through_agent_port() {
let handle = MessageBusHandle::new(8);
let (agent, mut bot) = handle.split();
agent.send(sample_outbound("response")).await.expect("send should succeed");
let received = bot.recv_outbound().await.expect("should receive outbound");
assert_eq!(received.content, "response");
assert!(!received.is_progress);
}
#[tokio::test]
async fn outbound_progress_messages() {
let handle = MessageBusHandle::new(8);
let (agent, mut bot) = handle.split();
agent
.send(OutboundMessage {
channel: "discord".into(),
chat_id: "ch-1".into(),
content: "thinking...".into(),
is_progress: true,
reply_to: None,
})
.await
.expect("send should succeed");
let received = bot.recv_outbound().await.expect("should receive outbound");
assert!(received.is_progress);
assert_eq!(received.content, "thinking...");
}
#[tokio::test]
async fn recv_returns_error_when_sender_dropped() {
let handle = MessageBusHandle::new(8);
let (agent, bot) = handle.split();
drop(bot);
let result = agent.recv().await;
assert!(result.is_err(), "recv should fail when sender is dropped");
}
#[tokio::test]
async fn send_returns_error_when_receiver_dropped() {
let handle = MessageBusHandle::new(8);
let (agent, bot) = handle.split();
drop(bot);
let result = agent.send(sample_outbound("x")).await;
assert!(result.is_err(), "send should fail when receiver is dropped");
}
#[tokio::test]
async fn builder_pattern_default_buffer() {
let handle = MessageBusBuilder::new().build();
let (agent, bot) = handle.split();
bot.push(sample_inbound("test")).await.expect("push should succeed");
let received = agent.recv().await.expect("recv should succeed");
assert_eq!(received.content, "test");
}
#[tokio::test]
async fn builder_pattern_custom_buffer() {
let handle = MessageBusBuilder::new().with_buffer_size(2).build();
let (agent, bot) = handle.split();
bot.push(sample_inbound("a")).await.expect("push should succeed");
bot.push(sample_inbound("b")).await.expect("push should succeed");
assert_eq!(agent.recv().await.unwrap_or_else(|_| unreachable!()).content, "a");
assert_eq!(agent.recv().await.unwrap_or_else(|_| unreachable!()).content, "b");
}
#[tokio::test]
async fn agent_port_as_dyn_message_bus() {
let handle = MessageBusHandle::new(8);
let (agent, bot) = handle.split();
let bus: Arc<dyn MessageBusPort> = Arc::new(agent);
bot.push(sample_inbound("via-dyn")).await.expect("push should succeed");
let received = bus.recv().await.expect("dyn recv should succeed");
assert_eq!(received.content, "via-dyn");
}
#[tokio::test]
async fn bidirectional_communication() {
let handle = MessageBusHandle::new(8);
let (agent, mut bot) = handle.split();
bot.push(sample_inbound("question")).await.expect("push should succeed");
let inbound = agent.recv().await.expect("recv should succeed");
assert_eq!(inbound.content, "question");
agent
.send(OutboundMessage {
channel: inbound.channel,
chat_id: inbound.chat_id,
content: format!("answer to: {}", inbound.content),
is_progress: false,
reply_to: None,
})
.await
.expect("send should succeed");
let outbound = bot.recv_outbound().await.expect("should receive outbound");
assert_eq!(outbound.content, "answer to: question");
}
}