use anyhow::{Context, Result};
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, RwLock};
use tracing::{debug, info, warn};
use crate::domain::{Group, Message, MessageTarget};
pub struct MessageBus {
private_txs: dashmap::DashMap<String, mpsc::Sender<Message>>,
groups: Arc<RwLock<std::collections::HashMap<String, Group>>>,
group_txs: dashmap::DashMap<String, broadcast::Sender<Message>>,
store: Option<Arc<dyn crate::core::store::Store>>,
}
impl MessageBus {
pub fn new() -> Self {
Self {
private_txs: dashmap::DashMap::new(),
groups: Arc::new(RwLock::new(std::collections::HashMap::new())),
group_txs: dashmap::DashMap::new(),
store: None,
}
}
pub fn with_store(store: Arc<dyn crate::core::store::Store>) -> Self {
Self {
private_txs: dashmap::DashMap::new(),
groups: Arc::new(RwLock::new(std::collections::HashMap::new())),
group_txs: dashmap::DashMap::new(),
store: Some(store),
}
}
pub fn register(&self, agent_id: &str) -> mpsc::Receiver<Message> {
let (tx, rx) = mpsc::channel(100);
self.private_txs.insert(agent_id.to_string(), tx);
info!("Registered agent to message bus: {}", agent_id);
rx
}
pub fn unregister(&self, agent_id: &str) {
self.private_txs.remove(agent_id);
info!("Unregistered agent from message bus: {}", agent_id);
}
pub async fn create_group(
&self,
id: &str,
name: &str,
creator_id: &str,
members: Vec<String>,
) -> Result<()> {
if !self.private_txs.contains_key(creator_id) {
return Err(anyhow::anyhow!("Creator not registered: {}", creator_id));
}
let group = Group::new(id, name, creator_id, members);
let (tx, _) = broadcast::channel(100);
{
let mut groups = self.groups.write().await;
groups.insert(id.to_string(), group);
}
self.group_txs.insert(id.to_string(), tx);
info!("Created group: {} by {}", id, creator_id);
Ok(())
}
pub async fn send(&self, message: Message) -> Result<()> {
if let Some(ref store) = self.store {
if let Err(e) = store.save_message(&message).await {
warn!("Failed to save message to store: {}", e);
}
}
let target = message.to.clone();
match target {
MessageTarget::Direct(agent_id) => self.send_private(message, &agent_id).await,
MessageTarget::Group(group_id) => self.send_group(message, &group_id).await,
}
}
async fn send_private(&self, message: Message, to: &str) -> Result<()> {
if let Some(tx) = self.private_txs.get(to) {
tx.send(message)
.await
.context("Failed to send private message")?;
debug!("Sent private message to {}", to);
Ok(())
} else {
warn!("Recipient not found: {}", to);
Err(anyhow::anyhow!("Recipient not found: {}", to))
}
}
async fn send_group(&self, mut message: Message, group_id: &str) -> Result<()> {
if let MessageTarget::Group(_) = &message.to {
if let Some(group) = self.get_group(group_id).await {
message = self.extract_mentions_from_content(message, &group);
}
}
if let Some(tx) = self.group_txs.get(group_id) {
tx.send(message).context("Failed to send group message")?;
debug!("Sent group message to {}", group_id);
Ok(())
} else {
warn!("Group not found: {}", group_id);
Err(anyhow::anyhow!("Group not found: {}", group_id))
}
}
fn extract_mentions_from_content(&self, mut message: Message, group: &Group) -> Message {
let content = message.content.clone();
let mut pos = 0;
while let Some(at_pos) = content[pos..].find('@') {
let actual_pos = pos + at_pos;
if actual_pos + 1 < content.len() {
let rest = &content[actual_pos + 1..];
let word_end = rest
.find(|c: char| !c.is_alphanumeric() && c != '_' && c != '-')
.unwrap_or(rest.len());
if word_end > 0 {
let mentioned_name = &rest[..word_end];
for member_id in &group.members {
if member_id == mentioned_name
|| self.is_name_match(member_id, mentioned_name)
{
if !message.mentions.contains(member_id) {
message = message.with_mention(member_id.clone());
}
}
}
pos = actual_pos + 1 + word_end; } else {
pos = actual_pos + 1;
}
} else {
break;
}
}
message
}
fn is_name_match(&self, agent_id: &str, mentioned_name: &str) -> bool {
agent_id == mentioned_name
|| agent_id.contains(mentioned_name)
|| mentioned_name.contains(agent_id)
}
pub fn subscribe_group(&self, group_id: &str) -> Option<broadcast::Receiver<Message>> {
self.group_txs.get(group_id).map(|tx| tx.subscribe())
}
pub async fn get_group(&self, group_id: &str) -> Option<Group> {
let groups = self.groups.read().await;
groups.get(group_id).cloned()
}
pub async fn list_agent_groups(&self, agent_id: &str) -> Vec<Group> {
let groups = self.groups.read().await;
groups
.values()
.filter(|g| g.has_member(agent_id))
.cloned()
.collect()
}
pub async fn get_message_history(
&self,
filter: crate::core::store::MessageFilter,
) -> Result<Vec<Message>> {
if let Some(ref store) = self.store {
store.load_messages(filter).await
} else {
Ok(Vec::new())
}
}
pub async fn get_agent_message_history(
&self,
agent_id: &str,
limit: usize,
) -> Result<Vec<Message>> {
if let Some(ref store) = self.store {
store.load_messages_by_agent(agent_id, limit).await
} else {
Ok(Vec::new())
}
}
pub async fn get_group_message_history(
&self,
group_id: &str,
limit: usize,
) -> Result<Vec<Message>> {
if let Some(ref store) = self.store {
store.load_messages_by_group(group_id, limit).await
} else {
Ok(Vec::new())
}
}
}
impl Default for MessageBus {
fn default() -> Self {
Self::new()
}
}
pub struct MessageReceiver {
agent_id: String,
private_rx: mpsc::Receiver<Message>,
group_rxs: Vec<(String, broadcast::Receiver<Message>)>,
}
impl MessageReceiver {
pub fn new(agent_id: String, private_rx: mpsc::Receiver<Message>) -> Self {
Self {
agent_id,
private_rx,
group_rxs: Vec::new(),
}
}
pub fn join_group(&mut self, group_id: &str, bus: &MessageBus) -> Result<()> {
if let Some(rx) = bus.subscribe_group(group_id) {
self.group_rxs.push((group_id.to_string(), rx));
Ok(())
} else {
Err(anyhow::anyhow!("Group not found: {}", group_id))
}
}
pub fn leave_group(&mut self, group_id: &str) {
self.group_rxs.retain(|(id, _)| id != group_id);
}
pub async fn recv(&mut self) -> Option<Message> {
if let Ok(msg) = self.private_rx.try_recv() {
return Some(msg);
}
for (_group_id, rx) in &mut self.group_rxs {
if let Ok(msg) = rx.try_recv() {
if msg.from != self.agent_id {
return Some(msg);
}
}
}
self.private_rx.recv().await
}
pub fn try_recv(&mut self) -> Option<Message> {
if let Ok(msg) = self.private_rx.try_recv() {
return Some(msg);
}
for (_group_id, rx) in &mut self.group_rxs {
if let Ok(msg) = rx.try_recv() {
if msg.from != self.agent_id {
return Some(msg);
}
}
}
None
}
}