Skip to main content

agent_event_bus/
lib.rs

1//! # agent-event-bus
2//!
3//! Tiny in-process pub/sub for agent loop events. Sync-only Rust mirror of
4//! the Python [`agent-event-bus`](https://pypi.org/project/agent-event-bus/)
5//! library.
6//!
7//! Producers emit events by name; subscribers register a handler for the
8//! event name they care about. Pass `"*"` as the name to subscribe to every
9//! event (the "firehose"). One-shot handlers are supported via
10//! [`EventBus::on_once`].
11//!
12//! Subscribers are dispatched inline on the caller's thread, in registration
13//! order. A handler that panics is caught with [`std::panic::catch_unwind`]
14//! so the bus keeps dispatching to the rest. An optional `on_error`
15//! callback is invoked with the offending [`Subscription`] and [`Event`].
16//!
17//! This crate is sync-only. The Python sibling also exposes an `emit_async`
18//! path; that is intentionally not mirrored here because it would pull in a
19//! runtime (tokio/async-std) and tangle the dispatch model. If you want
20//! async dispatch, spawn it on top of this bus.
21//!
22//! ## Quick example
23//!
24//! ```
25//! use agent_event_bus::EventBus;
26//! use std::sync::{Arc, Mutex};
27//!
28//! let bus = EventBus::new();
29//! let seen = Arc::new(Mutex::new(Vec::<String>::new()));
30//!
31//! let seen_cl = Arc::clone(&seen);
32//! bus.on("llm.call.start", move |event| {
33//!     seen_cl.lock().unwrap().push(event.name.clone());
34//! });
35//!
36//! bus.emit("llm.call.start", serde_json::json!({"model": "claude-opus-4-7"}));
37//! assert_eq!(seen.lock().unwrap().as_slice(), &["llm.call.start"]);
38//! ```
39//!
40//! ## Wildcard (firehose)
41//!
42//! ```
43//! use agent_event_bus::EventBus;
44//! use std::sync::{Arc, Mutex};
45//!
46//! let bus = EventBus::new();
47//! let counter = Arc::new(Mutex::new(0u32));
48//! let cl = Arc::clone(&counter);
49//! bus.on("*", move |_| { *cl.lock().unwrap() += 1; });
50//! bus.emit("a", serde_json::Value::Null);
51//! bus.emit("b", serde_json::Value::Null);
52//! assert_eq!(*counter.lock().unwrap(), 2);
53//! ```
54//!
55//! ## Not a real message queue
56//!
57//! No persistence, no cross-process delivery, no backpressure. Reach for
58//! Redis, NATS, or RabbitMQ if you need those. This crate is just clean
59//! wiring inside one Rust process.
60
61#![deny(missing_docs)]
62
63use std::panic::{catch_unwind, AssertUnwindSafe};
64use std::sync::atomic::{AtomicU64, Ordering};
65use std::sync::Mutex;
66
67use serde_json::Value;
68
69/// Wildcard event name that matches every emit.
70pub const WILDCARD: &str = "*";
71
72/// A single dispatched event.
73#[derive(Debug, Clone)]
74pub struct Event {
75    /// The event name passed to [`EventBus::emit`].
76    pub name: String,
77    /// The arbitrary JSON payload attached to the event.
78    pub payload: Value,
79}
80
81/// Boxed handler type. Subscribers must be `Send + Sync + 'static` so the
82/// bus can be shared across threads.
83type Handler = Box<dyn Fn(&Event) + Send + Sync + 'static>;
84
85/// Boxed error callback, invoked once per handler that panics during
86/// [`EventBus::emit`].
87pub type OnError = Box<dyn Fn(&Subscription, &Event) + Send + Sync + 'static>;
88
89/// Opaque handle returned from [`EventBus::on`] / [`EventBus::on_once`].
90///
91/// Pass it back to [`EventBus::off`] to remove the underlying subscriber.
92/// Cheap to clone (`u64` id + short event name).
93#[derive(Debug, Clone, PartialEq, Eq, Hash)]
94pub struct Subscription {
95    id: u64,
96    event: String,
97}
98
99impl Subscription {
100    /// The monotonic id assigned at registration time.
101    pub fn id(&self) -> u64 {
102        self.id
103    }
104
105    /// The event name this subscription was registered against
106    /// (`"*"` for the firehose).
107    pub fn event(&self) -> &str {
108        &self.event
109    }
110}
111
112struct Slot {
113    sub: Subscription,
114    handler: Handler,
115    once: bool,
116}
117
118struct Inner {
119    /// Handlers keyed by event name. The special key `"*"` is the firehose.
120    /// We use a `Vec<(String, Vec<Slot>)>` instead of `HashMap` to keep
121    /// the file small and because registration count is usually tiny;
122    /// linear scan is fine.
123    buckets: Vec<(String, Vec<Slot>)>,
124    on_error: Option<OnError>,
125}
126
127impl Inner {
128    fn new() -> Self {
129        Self {
130            buckets: Vec::new(),
131            on_error: None,
132        }
133    }
134
135    fn bucket_mut(&mut self, event: &str) -> &mut Vec<Slot> {
136        if let Some(idx) = self.buckets.iter().position(|(k, _)| k == event) {
137            return &mut self.buckets[idx].1;
138        }
139        self.buckets.push((event.to_string(), Vec::new()));
140        &mut self.buckets.last_mut().unwrap().1
141    }
142
143    fn len(&self) -> usize {
144        self.buckets.iter().map(|(_, v)| v.len()).sum()
145    }
146}
147
148/// In-process pub/sub bus.
149///
150/// `EventBus` is `Send + Sync` — clone an `Arc<EventBus>` to share it across
151/// threads. All methods take `&self` and lock an internal mutex.
152pub struct EventBus {
153    inner: Mutex<Inner>,
154    next_id: AtomicU64,
155}
156
157impl EventBus {
158    /// Create a new empty bus.
159    pub fn new() -> Self {
160        Self {
161            inner: Mutex::new(Inner::new()),
162            next_id: AtomicU64::new(1),
163        }
164    }
165
166    /// Install a panic-aware error callback.
167    ///
168    /// The callback fires once for each subscriber whose handler panicked
169    /// during an [`emit`](Self::emit). The panic payload is intentionally
170    /// not surfaced — most agent code only needs to know that *something*
171    /// went wrong with a given subscription, and the payload type
172    /// (`Box<dyn Any>`) is awkward to thread through a stable API.
173    pub fn set_on_error(&self, cb: OnError) {
174        self.inner.lock().unwrap().on_error = Some(cb);
175    }
176
177    /// Remove any installed error callback.
178    pub fn clear_on_error(&self) {
179        self.inner.lock().unwrap().on_error = None;
180    }
181
182    /// Register a handler for `event_name`.
183    ///
184    /// Pass `"*"` (or [`WILDCARD`]) to fire on every event.
185    pub fn on<F>(&self, event_name: &str, handler: F) -> Subscription
186    where
187        F: Fn(&Event) + Send + Sync + 'static,
188    {
189        self.register(event_name, Box::new(handler), false)
190    }
191
192    /// Register a one-shot handler. It fires at most once, then is removed
193    /// automatically before the next [`emit`](Self::emit) of any event.
194    pub fn on_once<F>(&self, event_name: &str, handler: F) -> Subscription
195    where
196        F: Fn(&Event) + Send + Sync + 'static,
197    {
198        self.register(event_name, Box::new(handler), true)
199    }
200
201    fn register(&self, event_name: &str, handler: Handler, once: bool) -> Subscription {
202        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
203        let sub = Subscription {
204            id,
205            event: event_name.to_string(),
206        };
207        let mut inner = self.inner.lock().unwrap();
208        let bucket = inner.bucket_mut(event_name);
209        bucket.push(Slot {
210            sub: sub.clone(),
211            handler,
212            once,
213        });
214        sub
215    }
216
217    /// Remove a single subscription by handle. Returns `true` if it was
218    /// found and removed, `false` if the subscription was already gone
219    /// (off-twice is a no-op).
220    pub fn off(&self, sub: Subscription) -> bool {
221        let mut inner = self.inner.lock().unwrap();
222        if let Some((_, slots)) = inner.buckets.iter_mut().find(|(k, _)| k == &sub.event) {
223            if let Some(pos) = slots.iter().position(|s| s.sub.id == sub.id) {
224                slots.remove(pos);
225                return true;
226            }
227        }
228        false
229    }
230
231    /// Remove every subscriber registered against `event_name`. Returns the
232    /// number removed.
233    pub fn off_event(&self, event_name: &str) -> usize {
234        let mut inner = self.inner.lock().unwrap();
235        if let Some(idx) = inner.buckets.iter().position(|(k, _)| k == event_name) {
236            let removed = inner.buckets[idx].1.len();
237            inner.buckets.remove(idx);
238            return removed;
239        }
240        0
241    }
242
243    /// Remove every subscriber on the bus. Returns the total number
244    /// removed.
245    pub fn off_all(&self) -> usize {
246        let mut inner = self.inner.lock().unwrap();
247        let removed = inner.len();
248        inner.buckets.clear();
249        removed
250    }
251
252    /// Emit an event. Fires all handlers for the exact `event_name`, then
253    /// all firehose handlers registered against `"*"`, in registration
254    /// order within each bucket.
255    ///
256    /// Handler panics are caught; dispatch continues. If an [`OnError`]
257    /// callback is installed via [`set_on_error`](Self::set_on_error), it
258    /// is invoked once per panicking handler.
259    ///
260    /// One-shot subscribers registered with [`on_once`](Self::on_once) are
261    /// removed after they fire — even if they panicked.
262    ///
263    /// Returns the [`Event`] that was dispatched (cheaply clonable).
264    pub fn emit(&self, event_name: &str, payload: Value) -> Event {
265        let event = Event {
266            name: event_name.to_string(),
267            payload,
268        };
269
270        // Snapshot the (bucket, id) pairs to fire. We can't move boxed
271        // handlers out of the bus, so each handler call re-locks and
272        // looks up by subscription id. Handlers run while holding the
273        // mutex, which means same-thread re-entry into the bus will
274        // deadlock — a deliberate trade-off to keep this crate small.
275        let to_fire: Vec<(String, u64)> = {
276            let inner = self.inner.lock().unwrap();
277            let mut out = Vec::new();
278            for (key, slots) in &inner.buckets {
279                if key == event_name || key == WILDCARD {
280                    for slot in slots {
281                        out.push((key.clone(), slot.sub.id));
282                    }
283                }
284            }
285            out
286        };
287
288        let mut errored: Vec<Subscription> = Vec::new();
289        let mut fired_once: Vec<(String, u64)> = Vec::new();
290
291        for (bucket_key, id) in to_fire {
292            let result = {
293                let inner = self.inner.lock().unwrap();
294                let bucket = inner.buckets.iter().find(|(k, _)| k == &bucket_key);
295                let slot = bucket.and_then(|(_, slots)| slots.iter().find(|s| s.sub.id == id));
296                slot.map(|slot| {
297                    let panicked =
298                        catch_unwind(AssertUnwindSafe(|| (slot.handler)(&event))).is_err();
299                    (slot.sub.clone(), slot.once, panicked)
300                })
301            };
302
303            if let Some((sub, once, panicked)) = result {
304                if panicked {
305                    errored.push(sub.clone());
306                }
307                if once {
308                    fired_once.push((bucket_key, sub.id));
309                }
310            }
311        }
312
313        // Remove one-shot subscribers that fired.
314        if !fired_once.is_empty() {
315            let mut inner = self.inner.lock().unwrap();
316            for (bucket_key, id) in fired_once {
317                if let Some((_, slots)) = inner.buckets.iter_mut().find(|(k, _)| k == &bucket_key) {
318                    if let Some(pos) = slots.iter().position(|s| s.sub.id == id) {
319                        slots.remove(pos);
320                    }
321                }
322            }
323        }
324
325        // Notify the error callback after dispatch so it doesn't
326        // interleave with normal handler invocation order.
327        if !errored.is_empty() {
328            let inner = self.inner.lock().unwrap();
329            if let Some(cb) = inner.on_error.as_ref() {
330                for sub in &errored {
331                    cb(sub, &event);
332                }
333            }
334        }
335
336        event
337    }
338
339    /// Return the list of subscription handles registered against an
340    /// event name. Pass `None` to list every subscription on the bus
341    /// (across all event names, including the firehose).
342    pub fn subscribers(&self, event_name: Option<&str>) -> Vec<Subscription> {
343        let inner = self.inner.lock().unwrap();
344        let mut out = Vec::new();
345        match event_name {
346            Some(name) => {
347                if let Some((_, slots)) = inner.buckets.iter().find(|(k, _)| k == name) {
348                    for slot in slots {
349                        out.push(slot.sub.clone());
350                    }
351                }
352            }
353            None => {
354                for (_, slots) in &inner.buckets {
355                    for slot in slots {
356                        out.push(slot.sub.clone());
357                    }
358                }
359            }
360        }
361        out
362    }
363
364    /// Total number of subscribers across every event name (including
365    /// the firehose).
366    pub fn len(&self) -> usize {
367        self.inner.lock().unwrap().len()
368    }
369
370    /// `true` if no subscribers are registered.
371    pub fn is_empty(&self) -> bool {
372        self.len() == 0
373    }
374}
375
376impl Default for EventBus {
377    fn default() -> Self {
378        Self::new()
379    }
380}
381
382impl std::fmt::Debug for EventBus {
383    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384        f.debug_struct("EventBus")
385            .field("subscribers", &self.len())
386            .finish()
387    }
388}