Skip to main content

pawan/agent/
irc.rs

1//! IRC-style message relay between named agents in one process.
2//!
3//! Adapted from oh-my-pi's agent-to-agent messaging pattern: peers are
4//! discovered by id, messages are plain prose routed over in-memory channels
5//! (no IRC wire protocol). Reply generation and history injection are left
6//! for a later integration pass.
7
8use chrono::{DateTime, Utc};
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11use tokio::sync::mpsc;
12
13/// A single routed message between agents.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct IrcMessage {
16    pub from: String,
17    pub to: String,
18    pub body: String,
19    pub timestamp: DateTime<Utc>,
20}
21
22#[derive(Default)]
23struct IrcRouter {
24    inboxes: HashMap<String, mpsc::UnboundedSender<IrcMessage>>,
25}
26
27/// Shared routing hub — clone and call [`IrcHub::join`] for each live agent.
28#[derive(Clone, Default)]
29pub struct IrcHub(Arc<Mutex<IrcRouter>>);
30
31impl IrcHub {
32    /// Create an empty hub (orchestrator + subagents share one instance per process).
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Register `agent_id` and return a handle for send/receive.
38    pub fn join(&self, agent_id: impl Into<String>) -> IrcRelay {
39        IrcRelay::join_with_hub(agent_id, self.clone())
40    }
41}
42
43/// Per-agent IRC endpoint: send to peers and poll the local inbox.
44pub struct IrcRelay {
45    agent_id: String,
46    hub: IrcHub,
47    inbox_rx: mpsc::UnboundedReceiver<IrcMessage>,
48}
49
50impl IrcRelay {
51    fn join_with_hub(agent_id: impl Into<String>, hub: IrcHub) -> Self {
52        let agent_id = agent_id.into();
53        let (tx, rx) = mpsc::unbounded_channel();
54        hub.0
55            .lock()
56            .expect("irc router lock")
57            .inboxes
58            .insert(agent_id.clone(), tx);
59        Self {
60            agent_id,
61            hub,
62            inbox_rx: rx,
63        }
64    }
65
66    /// This agent's address (e.g. `"main"`, `"subagent-explore"`).
67    pub fn agent_id(&self) -> &str {
68        &self.agent_id
69    }
70
71    /// Send prose to `to`. Use `"all"` to broadcast to every other registered peer.
72    ///
73    /// Stub: delivery is synchronous over mpsc; auto-replies are not generated yet.
74    pub fn send(&self, to: &str, body: impl Into<String>) -> Result<IrcMessage, String> {
75        let body = body.into();
76        if body.trim().is_empty() {
77            return Err("message body is empty".to_string());
78        }
79        let to = to.trim();
80        if to.is_empty() {
81            return Err("recipient is required".to_string());
82        }
83
84        let msg = IrcMessage {
85            from: self.agent_id.clone(),
86            to: to.to_string(),
87            body,
88            timestamp: Utc::now(),
89        };
90        self.deliver(&msg)?;
91        Ok(msg)
92    }
93
94    fn deliver(&self, msg: &IrcMessage) -> Result<(), String> {
95        let router = self.hub.0.lock().expect("irc router lock");
96
97        if msg.to == "all" {
98            let mut delivered = 0usize;
99            for (id, tx) in &router.inboxes {
100                if id == &msg.from {
101                    continue;
102                }
103                if tx.send(msg.clone()).is_ok() {
104                    delivered += 1;
105                }
106            }
107            if delivered == 0 {
108                return Err("no peers available for broadcast".to_string());
109            }
110            return Ok(());
111        }
112
113        if msg.to == msg.from {
114            return Err("cannot message self".to_string());
115        }
116
117        let Some(tx) = router.inboxes.get(&msg.to) else {
118            return Err(format!("unknown peer: {}", msg.to));
119        };
120
121        tx.send(msg.clone())
122            .map_err(|_| format!("peer '{}' is unavailable", msg.to))
123    }
124
125    /// Non-blocking poll of this agent's inbox.
126    pub fn try_receive(&mut self) -> Option<IrcMessage> {
127        self.inbox_rx.try_recv().ok()
128    }
129
130    /// Blocking receive stub — delegates to [`Self::try_receive`] until integrated with async loops.
131    pub async fn receive(&mut self) -> Option<IrcMessage> {
132        self.try_receive()
133    }
134
135    /// Other registered agent ids (excluding self), sorted for stable display.
136    pub fn list_peers(&self) -> Vec<String> {
137        let router = self.hub.0.lock().expect("irc router lock");
138        let mut peers: Vec<String> = router
139            .inboxes
140            .keys()
141            .filter(|id| *id != &self.agent_id)
142            .cloned()
143            .collect();
144        peers.sort();
145        peers
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn routes_direct_message() {
155        let hub = IrcHub::new();
156        let main = hub.join("main");
157        let mut worker = hub.join("worker");
158
159        main.send("worker", "ping").expect("send");
160        let msg = worker.try_receive().expect("inbox");
161        assert_eq!(msg.from, "main");
162        assert_eq!(msg.to, "worker");
163        assert_eq!(msg.body, "ping");
164    }
165
166    #[test]
167    fn broadcast_skips_sender() {
168        let hub = IrcHub::new();
169        let mut main = hub.join("main");
170        let mut a = hub.join("a");
171        let mut b = hub.join("b");
172
173        main.send("all", "hello team").expect("broadcast");
174        assert!(a.try_receive().is_some());
175        assert!(b.try_receive().is_some());
176        assert!(main.try_receive().is_none());
177    }
178
179    #[test]
180    fn list_peers_returns_registered_ids() {
181        let hub = IrcHub::new();
182        let main = hub.join("main");
183        hub.join("zebra");
184        hub.join("alpha");
185
186        assert_eq!(main.list_peers(), vec!["alpha", "zebra"]);
187    }
188
189    #[test]
190    fn send_to_unknown_peer_errors() {
191        let hub = IrcHub::new();
192        let main = hub.join("main");
193
194        let err = main.send("ghost", "hello").unwrap_err();
195        assert!(err.contains("unknown peer: ghost"));
196    }
197
198    #[test]
199    fn duplicate_join_replaces_inbox() {
200        let hub = IrcHub::new();
201        let main = hub.join("main");
202        let mut first = hub.join("worker");
203        let mut second = hub.join("worker");
204
205        main.send("worker", "after rejoin").expect("send");
206        assert!(first.try_receive().is_none());
207        let msg = second.try_receive().expect("new inbox");
208        assert_eq!(msg.body, "after rejoin");
209    }
210
211    #[test]
212    fn message_timestamp_set() {
213        let hub = IrcHub::new();
214        let main = hub.join("main");
215        let _worker = hub.join("worker");
216        let before = Utc::now();
217        let msg = main.send("worker", "timed").expect("send");
218        let after = Utc::now();
219
220        assert!(msg.timestamp >= before);
221        assert!(msg.timestamp <= after);
222    }
223
224    #[test]
225    fn try_receive_on_empty_inbox_returns_none() {
226        let hub = IrcHub::new();
227        let mut main = hub.join("main");
228        assert!(main.try_receive().is_none());
229    }
230
231    #[test]
232    fn send_empty_body_returns_err() {
233        let hub = IrcHub::new();
234        let main = hub.join("main");
235        let _peer = hub.join("worker");
236        let err = main.send("worker", "   ").unwrap_err();
237        assert!(err.contains("empty"));
238    }
239
240    #[test]
241    fn send_to_self_returns_err() {
242        let hub = IrcHub::new();
243        let main = hub.join("main");
244        let err = main.send("main", "hello").unwrap_err();
245        assert!(err.contains("cannot message self"));
246    }
247
248    #[test]
249    fn broadcast_with_no_other_peers_errors() {
250        let hub = IrcHub::new();
251        let main = hub.join("main");
252        let err = main.send("all", "solo").unwrap_err();
253        assert!(err.contains("no peers available"));
254    }
255
256    #[test]
257    fn list_peers_empty_when_only_self_registered() {
258        let hub = IrcHub::new();
259        let main = hub.join("main");
260        assert!(main.list_peers().is_empty());
261    }
262
263    #[test]
264    fn try_receive_drains_direct_messages_in_order() {
265        let hub = IrcHub::new();
266        let main = hub.join("main");
267        let mut worker = hub.join("worker");
268        main.send("worker", "first").unwrap();
269        main.send("worker", "second").unwrap();
270        assert_eq!(worker.try_receive().unwrap().body, "first");
271        assert_eq!(worker.try_receive().unwrap().body, "second");
272        assert!(worker.try_receive().is_none());
273    }
274}