use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IrcMessage {
pub from: String,
pub to: String,
pub body: String,
pub timestamp: DateTime<Utc>,
}
#[derive(Default)]
struct IrcRouter {
inboxes: HashMap<String, mpsc::UnboundedSender<IrcMessage>>,
}
#[derive(Clone, Default)]
pub struct IrcHub(Arc<Mutex<IrcRouter>>);
impl IrcHub {
pub fn new() -> Self {
Self::default()
}
pub fn join(&self, agent_id: impl Into<String>) -> IrcRelay {
IrcRelay::join_with_hub(agent_id, self.clone())
}
}
pub struct IrcRelay {
agent_id: String,
hub: IrcHub,
inbox_rx: mpsc::UnboundedReceiver<IrcMessage>,
}
impl IrcRelay {
fn join_with_hub(agent_id: impl Into<String>, hub: IrcHub) -> Self {
let agent_id = agent_id.into();
let (tx, rx) = mpsc::unbounded_channel();
hub.0
.lock()
.expect("irc router lock")
.inboxes
.insert(agent_id.clone(), tx);
Self {
agent_id,
hub,
inbox_rx: rx,
}
}
pub fn agent_id(&self) -> &str {
&self.agent_id
}
pub fn send(&self, to: &str, body: impl Into<String>) -> Result<IrcMessage, String> {
let body = body.into();
if body.trim().is_empty() {
return Err("message body is empty".to_string());
}
let to = to.trim();
if to.is_empty() {
return Err("recipient is required".to_string());
}
let msg = IrcMessage {
from: self.agent_id.clone(),
to: to.to_string(),
body,
timestamp: Utc::now(),
};
self.deliver(&msg)?;
Ok(msg)
}
fn deliver(&self, msg: &IrcMessage) -> Result<(), String> {
let router = self.hub.0.lock().expect("irc router lock");
if msg.to == "all" {
let mut delivered = 0usize;
for (id, tx) in &router.inboxes {
if id == &msg.from {
continue;
}
if tx.send(msg.clone()).is_ok() {
delivered += 1;
}
}
if delivered == 0 {
return Err("no peers available for broadcast".to_string());
}
return Ok(());
}
if msg.to == msg.from {
return Err("cannot message self".to_string());
}
let Some(tx) = router.inboxes.get(&msg.to) else {
return Err(format!("unknown peer: {}", msg.to));
};
tx.send(msg.clone())
.map_err(|_| format!("peer '{}' is unavailable", msg.to))
}
pub fn try_receive(&mut self) -> Option<IrcMessage> {
self.inbox_rx.try_recv().ok()
}
pub async fn receive(&mut self) -> Option<IrcMessage> {
self.try_receive()
}
pub fn list_peers(&self) -> Vec<String> {
let router = self.hub.0.lock().expect("irc router lock");
let mut peers: Vec<String> = router
.inboxes
.keys()
.filter(|id| *id != &self.agent_id)
.cloned()
.collect();
peers.sort();
peers
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn routes_direct_message() {
let hub = IrcHub::new();
let main = hub.join("main");
let mut worker = hub.join("worker");
main.send("worker", "ping").expect("send");
let msg = worker.try_receive().expect("inbox");
assert_eq!(msg.from, "main");
assert_eq!(msg.to, "worker");
assert_eq!(msg.body, "ping");
}
#[test]
fn broadcast_skips_sender() {
let hub = IrcHub::new();
let mut main = hub.join("main");
let mut a = hub.join("a");
let mut b = hub.join("b");
main.send("all", "hello team").expect("broadcast");
assert!(a.try_receive().is_some());
assert!(b.try_receive().is_some());
assert!(main.try_receive().is_none());
}
#[test]
fn list_peers_returns_registered_ids() {
let hub = IrcHub::new();
let main = hub.join("main");
hub.join("zebra");
hub.join("alpha");
assert_eq!(main.list_peers(), vec!["alpha", "zebra"]);
}
#[test]
fn send_to_unknown_peer_errors() {
let hub = IrcHub::new();
let main = hub.join("main");
let err = main.send("ghost", "hello").unwrap_err();
assert!(err.contains("unknown peer: ghost"));
}
#[test]
fn duplicate_join_replaces_inbox() {
let hub = IrcHub::new();
let main = hub.join("main");
let mut first = hub.join("worker");
let mut second = hub.join("worker");
main.send("worker", "after rejoin").expect("send");
assert!(first.try_receive().is_none());
let msg = second.try_receive().expect("new inbox");
assert_eq!(msg.body, "after rejoin");
}
#[test]
fn message_timestamp_set() {
let hub = IrcHub::new();
let main = hub.join("main");
let _worker = hub.join("worker");
let before = Utc::now();
let msg = main.send("worker", "timed").expect("send");
let after = Utc::now();
assert!(msg.timestamp >= before);
assert!(msg.timestamp <= after);
}
#[test]
fn try_receive_on_empty_inbox_returns_none() {
let hub = IrcHub::new();
let mut main = hub.join("main");
assert!(main.try_receive().is_none());
}
#[test]
fn send_empty_body_returns_err() {
let hub = IrcHub::new();
let main = hub.join("main");
let _peer = hub.join("worker");
let err = main.send("worker", " ").unwrap_err();
assert!(err.contains("empty"));
}
#[test]
fn send_to_self_returns_err() {
let hub = IrcHub::new();
let main = hub.join("main");
let err = main.send("main", "hello").unwrap_err();
assert!(err.contains("cannot message self"));
}
#[test]
fn broadcast_with_no_other_peers_errors() {
let hub = IrcHub::new();
let main = hub.join("main");
let err = main.send("all", "solo").unwrap_err();
assert!(err.contains("no peers available"));
}
#[test]
fn list_peers_empty_when_only_self_registered() {
let hub = IrcHub::new();
let main = hub.join("main");
assert!(main.list_peers().is_empty());
}
#[test]
fn try_receive_drains_direct_messages_in_order() {
let hub = IrcHub::new();
let main = hub.join("main");
let mut worker = hub.join("worker");
main.send("worker", "first").unwrap();
main.send("worker", "second").unwrap();
assert_eq!(worker.try_receive().unwrap().body, "first");
assert_eq!(worker.try_receive().unwrap().body, "second");
assert!(worker.try_receive().is_none());
}
}