agentzero_core/
event_bus.rs1use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::sync::broadcast;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct Event {
17 pub id: String,
19 pub topic: String,
21 pub source: String,
23 pub payload: String,
25 pub privacy_boundary: String,
27 pub timestamp_ms: u64,
29 pub correlation_id: Option<String>,
32}
33
34impl Event {
35 pub fn new(
37 topic: impl Into<String>,
38 source: impl Into<String>,
39 payload: impl Into<String>,
40 ) -> Self {
41 Self {
42 id: new_event_id(),
43 topic: topic.into(),
44 source: source.into(),
45 payload: payload.into(),
46 privacy_boundary: String::new(),
47 timestamp_ms: now_ms(),
48 correlation_id: None,
49 }
50 }
51
52 pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
54 self.correlation_id = Some(id.into());
55 self
56 }
57
58 pub fn with_boundary(mut self, boundary: impl Into<String>) -> Self {
60 self.privacy_boundary = boundary.into();
61 self
62 }
63}
64
65#[async_trait]
67pub trait EventBus: Send + Sync {
68 async fn publish(&self, event: Event) -> anyhow::Result<()>;
70
71 fn subscribe(&self) -> Box<dyn EventSubscriber>;
73
74 fn subscriber_count(&self) -> usize;
76}
77
78#[async_trait]
80pub trait EventSubscriber: Send {
81 async fn recv(&mut self) -> anyhow::Result<Event>;
83
84 async fn recv_filtered(&mut self, topic_prefix: &str) -> anyhow::Result<Event> {
87 loop {
88 let event = self.recv().await?;
89 if event.topic.starts_with(topic_prefix) {
90 return Ok(event);
91 }
92 }
93 }
94}
95
96pub struct InMemoryBus {
98 tx: broadcast::Sender<Event>,
99}
100
101impl InMemoryBus {
102 pub fn new(capacity: usize) -> Self {
105 let (tx, _) = broadcast::channel(capacity);
106 Self { tx }
107 }
108
109 pub fn default_capacity() -> Self {
111 Self::new(256)
112 }
113}
114
115#[async_trait]
116impl EventBus for InMemoryBus {
117 async fn publish(&self, event: Event) -> anyhow::Result<()> {
118 let _ = self.tx.send(event);
121 Ok(())
122 }
123
124 fn subscribe(&self) -> Box<dyn EventSubscriber> {
125 Box::new(InMemorySubscriber {
126 rx: self.tx.subscribe(),
127 })
128 }
129
130 fn subscriber_count(&self) -> usize {
131 self.tx.receiver_count()
132 }
133}
134
135pub struct InMemorySubscriber {
137 rx: broadcast::Receiver<Event>,
138}
139
140#[async_trait]
141impl EventSubscriber for InMemorySubscriber {
142 async fn recv(&mut self) -> anyhow::Result<Event> {
143 loop {
144 match self.rx.recv().await {
145 Ok(event) => return Ok(event),
146 Err(broadcast::error::RecvError::Lagged(n)) => {
147 tracing::warn!(skipped = n, "event bus subscriber lagged, skipping events");
148 }
150 Err(broadcast::error::RecvError::Closed) => {
151 anyhow::bail!("event bus closed");
152 }
153 }
154 }
155 }
156}
157
158pub fn topic_matches(pattern: &str, topic: &str) -> bool {
160 if pattern == "*" {
161 return true;
162 }
163 if pattern.ends_with(".*") {
164 let prefix = &pattern[..pattern.len() - 1];
165 topic.starts_with(prefix)
166 } else {
167 pattern == topic
168 }
169}
170
171fn new_event_id() -> String {
173 use std::sync::atomic::{AtomicU64, Ordering};
174 static COUNTER: AtomicU64 = AtomicU64::new(0);
175 let ts = now_ms();
176 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
177 format!("evt-{ts}-{seq}")
178}
179
180fn now_ms() -> u64 {
181 SystemTime::now()
182 .duration_since(UNIX_EPOCH)
183 .unwrap_or_default()
184 .as_millis() as u64
185}
186
187pub fn is_boundary_compatible(source_boundary: &str, consumer_boundary: &str) -> bool {
191 fn level(b: &str) -> u8 {
193 match b {
194 "local_only" => 3,
195 "encrypted_only" => 2,
196 "any" => 1,
197 _ => 0,
198 }
199 }
200 level(consumer_boundary) >= level(source_boundary)
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use std::sync::Arc;
211
212 #[test]
213 fn topic_matching() {
214 assert!(topic_matches("task.image.*", "task.image.complete"));
215 assert!(topic_matches("task.image.*", "task.image.error"));
216 assert!(!topic_matches("task.image.*", "task.text.complete"));
217 assert!(topic_matches(
218 "channel.telegram.message",
219 "channel.telegram.message"
220 ));
221 assert!(!topic_matches(
222 "channel.telegram.message",
223 "channel.slack.message"
224 ));
225 assert!(topic_matches("*", "anything.at.all"));
226 }
227
228 #[test]
229 fn boundary_compatibility() {
230 assert!(is_boundary_compatible("", ""));
232 assert!(is_boundary_compatible("", "any"));
233 assert!(is_boundary_compatible("", "local_only"));
234
235 assert!(is_boundary_compatible("any", "any"));
237 assert!(is_boundary_compatible("any", "encrypted_only"));
238 assert!(is_boundary_compatible("any", "local_only"));
239
240 assert!(is_boundary_compatible("local_only", "local_only"));
242 assert!(!is_boundary_compatible("local_only", "any"));
243 assert!(!is_boundary_compatible("local_only", "encrypted_only"));
244 assert!(!is_boundary_compatible("local_only", ""));
245
246 assert!(is_boundary_compatible("encrypted_only", "encrypted_only"));
248 assert!(is_boundary_compatible("encrypted_only", "local_only"));
249 assert!(!is_boundary_compatible("encrypted_only", "any"));
250 }
251
252 #[test]
253 fn event_builder() {
254 let event = Event::new("task.test", "agent-1", r#"{"result":"ok"}"#)
255 .with_correlation("corr-123")
256 .with_boundary("local_only");
257
258 assert_eq!(event.topic, "task.test");
259 assert_eq!(event.source, "agent-1");
260 assert_eq!(event.correlation_id.as_deref(), Some("corr-123"));
261 assert_eq!(event.privacy_boundary, "local_only");
262 assert!(event.timestamp_ms > 0);
263 assert!(event.id.starts_with("evt-"));
264 }
265
266 #[tokio::test]
267 async fn in_memory_bus_publish_subscribe() {
268 let bus = InMemoryBus::new(16);
269 let mut sub = bus.subscribe();
270
271 bus.publish(Event::new("test.topic", "src", "hello"))
272 .await
273 .unwrap();
274
275 let event = sub.recv().await.unwrap();
276 assert_eq!(event.topic, "test.topic");
277 assert_eq!(event.payload, "hello");
278 }
279
280 #[tokio::test]
281 async fn in_memory_bus_multiple_subscribers() {
282 let bus = InMemoryBus::new(16);
283 let mut sub1 = bus.subscribe();
284 let mut sub2 = bus.subscribe();
285
286 assert_eq!(bus.subscriber_count(), 2);
287
288 bus.publish(Event::new("t", "s", "data")).await.unwrap();
289
290 let e1 = sub1.recv().await.unwrap();
291 let e2 = sub2.recv().await.unwrap();
292 assert_eq!(e1.payload, "data");
293 assert_eq!(e2.payload, "data");
294 }
295
296 #[tokio::test]
297 async fn filtered_recv() {
298 let bus = InMemoryBus::new(16);
299 let mut sub = bus.subscribe();
300
301 bus.publish(Event::new("task.text.complete", "a", "text"))
302 .await
303 .unwrap();
304 bus.publish(Event::new("task.image.complete", "b", "image"))
305 .await
306 .unwrap();
307
308 let event = sub.recv_filtered("task.image.").await.unwrap();
309 assert_eq!(event.payload, "image");
310 }
311
312 #[tokio::test]
313 async fn publish_with_no_subscribers_is_ok() {
314 let bus = InMemoryBus::new(16);
315 bus.publish(Event::new("orphan", "system", ""))
317 .await
318 .unwrap();
319 }
320
321 #[tokio::test]
322 async fn bus_closed_returns_error() {
323 let bus = Arc::new(InMemoryBus::new(16));
324 let mut sub = bus.subscribe();
325
326 drop(bus);
328
329 let result = sub.recv().await;
330 assert!(result.is_err());
331 }
332}