agent_event_bus/lib.rs
1//! # agent-event-bus
2//!
3//! Tiny in-process pub/sub for agent loop events. Sync-only Rust mirror of
4//! the Python [`agent-event-bus`](https://pypi.org/project/agent-event-bus/)
5//! library.
6//!
7//! Producers emit events by name; subscribers register a handler for the
8//! event name they care about. Pass `"*"` as the name to subscribe to every
9//! event (the "firehose"). One-shot handlers are supported via
10//! [`EventBus::on_once`].
11//!
12//! Subscribers are dispatched inline on the caller's thread, in registration
13//! order. A handler that panics is caught with [`std::panic::catch_unwind`]
14//! so the bus keeps dispatching to the rest. An optional `on_error`
15//! callback is invoked with the offending [`Subscription`] and [`Event`].
16//!
17//! This crate is sync-only. The Python sibling also exposes an `emit_async`
18//! path; that is intentionally not mirrored here because it would pull in a
19//! runtime (tokio/async-std) and tangle the dispatch model. If you want
20//! async dispatch, spawn it on top of this bus.
21//!
22//! ## Quick example
23//!
24//! ```
25//! use agent_event_bus::EventBus;
26//! use std::sync::{Arc, Mutex};
27//!
28//! let bus = EventBus::new();
29//! let seen = Arc::new(Mutex::new(Vec::<String>::new()));
30//!
31//! let seen_cl = Arc::clone(&seen);
32//! bus.on("llm.call.start", move |event| {
33//! seen_cl.lock().unwrap().push(event.name.clone());
34//! });
35//!
36//! bus.emit("llm.call.start", serde_json::json!({"model": "claude-opus-4-7"}));
37//! assert_eq!(seen.lock().unwrap().as_slice(), &["llm.call.start"]);
38//! ```
39//!
40//! ## Wildcard (firehose)
41//!
42//! ```
43//! use agent_event_bus::EventBus;
44//! use std::sync::{Arc, Mutex};
45//!
46//! let bus = EventBus::new();
47//! let counter = Arc::new(Mutex::new(0u32));
48//! let cl = Arc::clone(&counter);
49//! bus.on("*", move |_| { *cl.lock().unwrap() += 1; });
50//! bus.emit("a", serde_json::Value::Null);
51//! bus.emit("b", serde_json::Value::Null);
52//! assert_eq!(*counter.lock().unwrap(), 2);
53//! ```
54//!
55//! ## Not a real message queue
56//!
57//! No persistence, no cross-process delivery, no backpressure. Reach for
58//! Redis, NATS, or RabbitMQ if you need those. This crate is just clean
59//! wiring inside one Rust process.
60
61#![deny(missing_docs)]
62
63use std::panic::{catch_unwind, AssertUnwindSafe};
64use std::sync::atomic::{AtomicU64, Ordering};
65use std::sync::Mutex;
66
67use serde_json::Value;
68
69/// Wildcard event name that matches every emit.
70pub const WILDCARD: &str = "*";
71
72/// A single dispatched event.
73#[derive(Debug, Clone)]
74pub struct Event {
75 /// The event name passed to [`EventBus::emit`].
76 pub name: String,
77 /// The arbitrary JSON payload attached to the event.
78 pub payload: Value,
79}
80
81/// Boxed handler type. Subscribers must be `Send + Sync + 'static` so the
82/// bus can be shared across threads.
83type Handler = Box<dyn Fn(&Event) + Send + Sync + 'static>;
84
85/// Boxed error callback, invoked once per handler that panics during
86/// [`EventBus::emit`].
87pub type OnError = Box<dyn Fn(&Subscription, &Event) + Send + Sync + 'static>;
88
89/// Opaque handle returned from [`EventBus::on`] / [`EventBus::on_once`].
90///
91/// Pass it back to [`EventBus::off`] to remove the underlying subscriber.
92/// Cheap to clone (`u64` id + short event name).
93#[derive(Debug, Clone, PartialEq, Eq, Hash)]
94pub struct Subscription {
95 id: u64,
96 event: String,
97}
98
99impl Subscription {
100 /// The monotonic id assigned at registration time.
101 pub fn id(&self) -> u64 {
102 self.id
103 }
104
105 /// The event name this subscription was registered against
106 /// (`"*"` for the firehose).
107 pub fn event(&self) -> &str {
108 &self.event
109 }
110}
111
112struct Slot {
113 sub: Subscription,
114 handler: Handler,
115 once: bool,
116}
117
118struct Inner {
119 /// Handlers keyed by event name. The special key `"*"` is the firehose.
120 /// We use a `Vec<(String, Vec<Slot>)>` instead of `HashMap` to keep
121 /// the file small and because registration count is usually tiny;
122 /// linear scan is fine.
123 buckets: Vec<(String, Vec<Slot>)>,
124 on_error: Option<OnError>,
125}
126
127impl Inner {
128 fn new() -> Self {
129 Self {
130 buckets: Vec::new(),
131 on_error: None,
132 }
133 }
134
135 fn bucket_mut(&mut self, event: &str) -> &mut Vec<Slot> {
136 if let Some(idx) = self.buckets.iter().position(|(k, _)| k == event) {
137 return &mut self.buckets[idx].1;
138 }
139 self.buckets.push((event.to_string(), Vec::new()));
140 &mut self.buckets.last_mut().unwrap().1
141 }
142
143 fn len(&self) -> usize {
144 self.buckets.iter().map(|(_, v)| v.len()).sum()
145 }
146}
147
148/// In-process pub/sub bus.
149///
150/// `EventBus` is `Send + Sync` — clone an `Arc<EventBus>` to share it across
151/// threads. All methods take `&self` and lock an internal mutex.
152pub struct EventBus {
153 inner: Mutex<Inner>,
154 next_id: AtomicU64,
155}
156
157impl EventBus {
158 /// Create a new empty bus.
159 pub fn new() -> Self {
160 Self {
161 inner: Mutex::new(Inner::new()),
162 next_id: AtomicU64::new(1),
163 }
164 }
165
166 /// Install a panic-aware error callback.
167 ///
168 /// The callback fires once for each subscriber whose handler panicked
169 /// during an [`emit`](Self::emit). The panic payload is intentionally
170 /// not surfaced — most agent code only needs to know that *something*
171 /// went wrong with a given subscription, and the payload type
172 /// (`Box<dyn Any>`) is awkward to thread through a stable API.
173 pub fn set_on_error(&self, cb: OnError) {
174 self.inner.lock().unwrap().on_error = Some(cb);
175 }
176
177 /// Remove any installed error callback.
178 pub fn clear_on_error(&self) {
179 self.inner.lock().unwrap().on_error = None;
180 }
181
182 /// Register a handler for `event_name`.
183 ///
184 /// Pass `"*"` (or [`WILDCARD`]) to fire on every event.
185 pub fn on<F>(&self, event_name: &str, handler: F) -> Subscription
186 where
187 F: Fn(&Event) + Send + Sync + 'static,
188 {
189 self.register(event_name, Box::new(handler), false)
190 }
191
192 /// Register a one-shot handler. It fires at most once, then is removed
193 /// automatically before the next [`emit`](Self::emit) of any event.
194 pub fn on_once<F>(&self, event_name: &str, handler: F) -> Subscription
195 where
196 F: Fn(&Event) + Send + Sync + 'static,
197 {
198 self.register(event_name, Box::new(handler), true)
199 }
200
201 fn register(&self, event_name: &str, handler: Handler, once: bool) -> Subscription {
202 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
203 let sub = Subscription {
204 id,
205 event: event_name.to_string(),
206 };
207 let mut inner = self.inner.lock().unwrap();
208 let bucket = inner.bucket_mut(event_name);
209 bucket.push(Slot {
210 sub: sub.clone(),
211 handler,
212 once,
213 });
214 sub
215 }
216
217 /// Remove a single subscription by handle. Returns `true` if it was
218 /// found and removed, `false` if the subscription was already gone
219 /// (off-twice is a no-op).
220 pub fn off(&self, sub: Subscription) -> bool {
221 let mut inner = self.inner.lock().unwrap();
222 if let Some((_, slots)) = inner.buckets.iter_mut().find(|(k, _)| k == &sub.event) {
223 if let Some(pos) = slots.iter().position(|s| s.sub.id == sub.id) {
224 slots.remove(pos);
225 return true;
226 }
227 }
228 false
229 }
230
231 /// Remove every subscriber registered against `event_name`. Returns the
232 /// number removed.
233 pub fn off_event(&self, event_name: &str) -> usize {
234 let mut inner = self.inner.lock().unwrap();
235 if let Some(idx) = inner.buckets.iter().position(|(k, _)| k == event_name) {
236 let removed = inner.buckets[idx].1.len();
237 inner.buckets.remove(idx);
238 return removed;
239 }
240 0
241 }
242
243 /// Remove every subscriber on the bus. Returns the total number
244 /// removed.
245 pub fn off_all(&self) -> usize {
246 let mut inner = self.inner.lock().unwrap();
247 let removed = inner.len();
248 inner.buckets.clear();
249 removed
250 }
251
252 /// Emit an event. Fires all handlers for the exact `event_name`, then
253 /// all firehose handlers registered against `"*"`, in registration
254 /// order within each bucket.
255 ///
256 /// Handler panics are caught; dispatch continues. If an [`OnError`]
257 /// callback is installed via [`set_on_error`](Self::set_on_error), it
258 /// is invoked once per panicking handler.
259 ///
260 /// One-shot subscribers registered with [`on_once`](Self::on_once) are
261 /// removed after they fire — even if they panicked.
262 ///
263 /// Returns the [`Event`] that was dispatched (cheaply clonable).
264 pub fn emit(&self, event_name: &str, payload: Value) -> Event {
265 let event = Event {
266 name: event_name.to_string(),
267 payload,
268 };
269
270 // Snapshot the (bucket, id) pairs to fire. We can't move boxed
271 // handlers out of the bus, so each handler call re-locks and
272 // looks up by subscription id. Handlers run while holding the
273 // mutex, which means same-thread re-entry into the bus will
274 // deadlock — a deliberate trade-off to keep this crate small.
275 let to_fire: Vec<(String, u64)> = {
276 let inner = self.inner.lock().unwrap();
277 let mut out = Vec::new();
278 for (key, slots) in &inner.buckets {
279 if key == event_name || key == WILDCARD {
280 for slot in slots {
281 out.push((key.clone(), slot.sub.id));
282 }
283 }
284 }
285 out
286 };
287
288 let mut errored: Vec<Subscription> = Vec::new();
289 let mut fired_once: Vec<(String, u64)> = Vec::new();
290
291 for (bucket_key, id) in to_fire {
292 let result = {
293 let inner = self.inner.lock().unwrap();
294 let bucket = inner.buckets.iter().find(|(k, _)| k == &bucket_key);
295 let slot = bucket.and_then(|(_, slots)| slots.iter().find(|s| s.sub.id == id));
296 slot.map(|slot| {
297 let panicked =
298 catch_unwind(AssertUnwindSafe(|| (slot.handler)(&event))).is_err();
299 (slot.sub.clone(), slot.once, panicked)
300 })
301 };
302
303 if let Some((sub, once, panicked)) = result {
304 if panicked {
305 errored.push(sub.clone());
306 }
307 if once {
308 fired_once.push((bucket_key, sub.id));
309 }
310 }
311 }
312
313 // Remove one-shot subscribers that fired.
314 if !fired_once.is_empty() {
315 let mut inner = self.inner.lock().unwrap();
316 for (bucket_key, id) in fired_once {
317 if let Some((_, slots)) = inner.buckets.iter_mut().find(|(k, _)| k == &bucket_key) {
318 if let Some(pos) = slots.iter().position(|s| s.sub.id == id) {
319 slots.remove(pos);
320 }
321 }
322 }
323 }
324
325 // Notify the error callback after dispatch so it doesn't
326 // interleave with normal handler invocation order.
327 if !errored.is_empty() {
328 let inner = self.inner.lock().unwrap();
329 if let Some(cb) = inner.on_error.as_ref() {
330 for sub in &errored {
331 cb(sub, &event);
332 }
333 }
334 }
335
336 event
337 }
338
339 /// Return the list of subscription handles registered against an
340 /// event name. Pass `None` to list every subscription on the bus
341 /// (across all event names, including the firehose).
342 pub fn subscribers(&self, event_name: Option<&str>) -> Vec<Subscription> {
343 let inner = self.inner.lock().unwrap();
344 let mut out = Vec::new();
345 match event_name {
346 Some(name) => {
347 if let Some((_, slots)) = inner.buckets.iter().find(|(k, _)| k == name) {
348 for slot in slots {
349 out.push(slot.sub.clone());
350 }
351 }
352 }
353 None => {
354 for (_, slots) in &inner.buckets {
355 for slot in slots {
356 out.push(slot.sub.clone());
357 }
358 }
359 }
360 }
361 out
362 }
363
364 /// Total number of subscribers across every event name (including
365 /// the firehose).
366 pub fn len(&self) -> usize {
367 self.inner.lock().unwrap().len()
368 }
369
370 /// `true` if no subscribers are registered.
371 pub fn is_empty(&self) -> bool {
372 self.len() == 0
373 }
374}
375
376impl Default for EventBus {
377 fn default() -> Self {
378 Self::new()
379 }
380}
381
382impl std::fmt::Debug for EventBus {
383 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384 f.debug_struct("EventBus")
385 .field("subscribers", &self.len())
386 .finish()
387 }
388}