1use chrono::{DateTime, Utc};
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11use tokio::sync::mpsc;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct IrcMessage {
16 pub from: String,
17 pub to: String,
18 pub body: String,
19 pub timestamp: DateTime<Utc>,
20}
21
22#[derive(Default)]
23struct IrcRouter {
24 inboxes: HashMap<String, mpsc::UnboundedSender<IrcMessage>>,
25}
26
27#[derive(Clone, Default)]
29pub struct IrcHub(Arc<Mutex<IrcRouter>>);
30
31impl IrcHub {
32 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn join(&self, agent_id: impl Into<String>) -> IrcRelay {
39 IrcRelay::join_with_hub(agent_id, self.clone())
40 }
41}
42
43pub struct IrcRelay {
45 agent_id: String,
46 hub: IrcHub,
47 inbox_rx: mpsc::UnboundedReceiver<IrcMessage>,
48}
49
50impl IrcRelay {
51 fn join_with_hub(agent_id: impl Into<String>, hub: IrcHub) -> Self {
52 let agent_id = agent_id.into();
53 let (tx, rx) = mpsc::unbounded_channel();
54 hub.0
55 .lock()
56 .expect("irc router lock")
57 .inboxes
58 .insert(agent_id.clone(), tx);
59 Self {
60 agent_id,
61 hub,
62 inbox_rx: rx,
63 }
64 }
65
66 pub fn agent_id(&self) -> &str {
68 &self.agent_id
69 }
70
71 pub fn send(&self, to: &str, body: impl Into<String>) -> Result<IrcMessage, String> {
75 let body = body.into();
76 if body.trim().is_empty() {
77 return Err("message body is empty".to_string());
78 }
79 let to = to.trim();
80 if to.is_empty() {
81 return Err("recipient is required".to_string());
82 }
83
84 let msg = IrcMessage {
85 from: self.agent_id.clone(),
86 to: to.to_string(),
87 body,
88 timestamp: Utc::now(),
89 };
90 self.deliver(&msg)?;
91 Ok(msg)
92 }
93
94 fn deliver(&self, msg: &IrcMessage) -> Result<(), String> {
95 let router = self.hub.0.lock().expect("irc router lock");
96
97 if msg.to == "all" {
98 let mut delivered = 0usize;
99 for (id, tx) in &router.inboxes {
100 if id == &msg.from {
101 continue;
102 }
103 if tx.send(msg.clone()).is_ok() {
104 delivered += 1;
105 }
106 }
107 if delivered == 0 {
108 return Err("no peers available for broadcast".to_string());
109 }
110 return Ok(());
111 }
112
113 if msg.to == msg.from {
114 return Err("cannot message self".to_string());
115 }
116
117 let Some(tx) = router.inboxes.get(&msg.to) else {
118 return Err(format!("unknown peer: {}", msg.to));
119 };
120
121 tx.send(msg.clone())
122 .map_err(|_| format!("peer '{}' is unavailable", msg.to))
123 }
124
125 pub fn try_receive(&mut self) -> Option<IrcMessage> {
127 self.inbox_rx.try_recv().ok()
128 }
129
130 pub async fn receive(&mut self) -> Option<IrcMessage> {
132 self.try_receive()
133 }
134
135 pub fn list_peers(&self) -> Vec<String> {
137 let router = self.hub.0.lock().expect("irc router lock");
138 let mut peers: Vec<String> = router
139 .inboxes
140 .keys()
141 .filter(|id| *id != &self.agent_id)
142 .cloned()
143 .collect();
144 peers.sort();
145 peers
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn routes_direct_message() {
155 let hub = IrcHub::new();
156 let main = hub.join("main");
157 let mut worker = hub.join("worker");
158
159 main.send("worker", "ping").expect("send");
160 let msg = worker.try_receive().expect("inbox");
161 assert_eq!(msg.from, "main");
162 assert_eq!(msg.to, "worker");
163 assert_eq!(msg.body, "ping");
164 }
165
166 #[test]
167 fn broadcast_skips_sender() {
168 let hub = IrcHub::new();
169 let mut main = hub.join("main");
170 let mut a = hub.join("a");
171 let mut b = hub.join("b");
172
173 main.send("all", "hello team").expect("broadcast");
174 assert!(a.try_receive().is_some());
175 assert!(b.try_receive().is_some());
176 assert!(main.try_receive().is_none());
177 }
178
179 #[test]
180 fn list_peers_returns_registered_ids() {
181 let hub = IrcHub::new();
182 let main = hub.join("main");
183 hub.join("zebra");
184 hub.join("alpha");
185
186 assert_eq!(main.list_peers(), vec!["alpha", "zebra"]);
187 }
188
189 #[test]
190 fn send_to_unknown_peer_errors() {
191 let hub = IrcHub::new();
192 let main = hub.join("main");
193
194 let err = main.send("ghost", "hello").unwrap_err();
195 assert!(err.contains("unknown peer: ghost"));
196 }
197
198 #[test]
199 fn duplicate_join_replaces_inbox() {
200 let hub = IrcHub::new();
201 let main = hub.join("main");
202 let mut first = hub.join("worker");
203 let mut second = hub.join("worker");
204
205 main.send("worker", "after rejoin").expect("send");
206 assert!(first.try_receive().is_none());
207 let msg = second.try_receive().expect("new inbox");
208 assert_eq!(msg.body, "after rejoin");
209 }
210
211 #[test]
212 fn message_timestamp_set() {
213 let hub = IrcHub::new();
214 let main = hub.join("main");
215 let _worker = hub.join("worker");
216 let before = Utc::now();
217 let msg = main.send("worker", "timed").expect("send");
218 let after = Utc::now();
219
220 assert!(msg.timestamp >= before);
221 assert!(msg.timestamp <= after);
222 }
223
224 #[test]
225 fn try_receive_on_empty_inbox_returns_none() {
226 let hub = IrcHub::new();
227 let mut main = hub.join("main");
228 assert!(main.try_receive().is_none());
229 }
230
231 #[test]
232 fn send_empty_body_returns_err() {
233 let hub = IrcHub::new();
234 let main = hub.join("main");
235 let _peer = hub.join("worker");
236 let err = main.send("worker", " ").unwrap_err();
237 assert!(err.contains("empty"));
238 }
239
240 #[test]
241 fn send_to_self_returns_err() {
242 let hub = IrcHub::new();
243 let main = hub.join("main");
244 let err = main.send("main", "hello").unwrap_err();
245 assert!(err.contains("cannot message self"));
246 }
247
248 #[test]
249 fn broadcast_with_no_other_peers_errors() {
250 let hub = IrcHub::new();
251 let main = hub.join("main");
252 let err = main.send("all", "solo").unwrap_err();
253 assert!(err.contains("no peers available"));
254 }
255
256 #[test]
257 fn list_peers_empty_when_only_self_registered() {
258 let hub = IrcHub::new();
259 let main = hub.join("main");
260 assert!(main.list_peers().is_empty());
261 }
262
263 #[test]
264 fn try_receive_drains_direct_messages_in_order() {
265 let hub = IrcHub::new();
266 let main = hub.join("main");
267 let mut worker = hub.join("worker");
268 main.send("worker", "first").unwrap();
269 main.send("worker", "second").unwrap();
270 assert_eq!(worker.try_receive().unwrap().body, "first");
271 assert_eq!(worker.try_receive().unwrap().body, "second");
272 assert!(worker.try_receive().is_none());
273 }
274}