eventbus_core/stream/observer.rs
1//! Observability hooks for the [`StreamBus`](super::StreamBus) runtime.
2//!
3//! Without an observer the bus silently retries / backs off on transient
4//! errors so steady-state traffic isn't poisoned by occasional failures.
5//! Production deployments usually want those errors surfaced to metrics,
6//! tracing, or alerting — that's what [`ErrorObserver`] is for.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
12//! use eventbus_core::{EventBusError, ErrorScope, ErrorObserver};
13//! use eventbus_core::stream::StreamBusOptions;
14//!
15//! struct Counter(AtomicU64);
16//!
17//! impl ErrorObserver for Counter {
18//! fn on_error(&self, scope: ErrorScope, err: &EventBusError) {
19//! eprintln!("[bus:{scope:?}] {err}");
20//! self.0.fetch_add(1, Ordering::Relaxed);
21//! }
22//! }
23//!
24//! let opts = StreamBusOptions::default()
25//! .with_error_observer(Arc::new(Counter(AtomicU64::new(0))));
26//! ```
27
28use crate::EventBusError;
29
30/// Where in the bus runtime an error was raised.
31///
32/// `#[non_exhaustive]` lets us add new sources without breaking observers.
33#[non_exhaustive]
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub enum ErrorScope {
36 /// `XREADGROUP` (or backend equivalent) failed; the consume loop will
37 /// back off and retry.
38 Read,
39 /// The reclaim task failed to fetch idle pending entries; the task will
40 /// back off and retry.
41 Reclaim,
42 /// A batched ack flush to the backend failed. The waiters got the error
43 /// via their oneshot channels; this hook fires once for the whole batch.
44 AckFlush,
45 /// Subscription was dropped without `close()` having been called. Fired
46 /// at most once from the [`Drop`] impl on `StreamSubscription`.
47 Drop,
48 /// A delivery task panicked. The panic message is delivered to
49 /// [`ErrorObserver::on_panic`]; this scope tags any associated
50 /// `on_error` invocation.
51 HandlerPanic,
52}
53
54/// Receives bus-level transient errors so they can be surfaced to metrics
55/// or tracing.
56///
57/// Implementations **must not block** — the hook is called from inside the
58/// consume / reclaim / ack loops. Push the event onto a queue or counter
59/// and return.
60pub trait ErrorObserver: Send + Sync {
61 fn on_error(&self, scope: ErrorScope, err: &EventBusError);
62
63 /// Called when a delivery task panics. Default empty for backwards
64 /// compatibility — implementors can override to route to crash metrics.
65 fn on_panic(&self, _scope: ErrorScope, _payload: &str) {}
66}