Skip to main content

eventbus_core/stream/
observer.rs

1//! Observability hooks for the [`StreamBus`](super::StreamBus) runtime.
2//!
3//! Without an observer the bus silently retries / backs off on transient
4//! errors so steady-state traffic isn't poisoned by occasional failures.
5//! Production deployments usually want those errors surfaced to metrics,
6//! tracing, or alerting — that's what [`ErrorObserver`] is for.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
12//! use eventbus_core::{EventBusError, ErrorScope, ErrorObserver};
13//! use eventbus_core::stream::StreamBusOptions;
14//!
15//! struct Counter(AtomicU64);
16//!
17//! impl ErrorObserver for Counter {
18//!     fn on_error(&self, scope: ErrorScope, err: &EventBusError) {
19//!         eprintln!("[bus:{scope:?}] {err}");
20//!         self.0.fetch_add(1, Ordering::Relaxed);
21//!     }
22//! }
23//!
24//! let opts = StreamBusOptions::default()
25//!     .with_error_observer(Arc::new(Counter(AtomicU64::new(0))));
26//! ```
27
28use crate::EventBusError;
29
30/// Where in the bus runtime an error was raised.
31///
32/// `#[non_exhaustive]` lets us add new sources without breaking observers.
33#[non_exhaustive]
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub enum ErrorScope {
36    /// `XREADGROUP` (or backend equivalent) failed; the consume loop will
37    /// back off and retry.
38    Read,
39    /// The reclaim task failed to fetch idle pending entries; the task will
40    /// back off and retry.
41    Reclaim,
42    /// A batched ack flush to the backend failed. The waiters got the error
43    /// via their oneshot channels; this hook fires once for the whole batch.
44    AckFlush,
45    /// Subscription was dropped without `close()` having been called. Fired
46    /// at most once from the [`Drop`] impl on `StreamSubscription`.
47    Drop,
48    /// A delivery task panicked. The panic message is delivered to
49    /// [`ErrorObserver::on_panic`]; this scope tags any associated
50    /// `on_error` invocation.
51    HandlerPanic,
52}
53
54/// Receives bus-level transient errors so they can be surfaced to metrics
55/// or tracing.
56///
57/// Implementations **must not block** — the hook is called from inside the
58/// consume / reclaim / ack loops. Push the event onto a queue or counter
59/// and return.
60pub trait ErrorObserver: Send + Sync {
61    fn on_error(&self, scope: ErrorScope, err: &EventBusError);
62
63    /// Called when a delivery task panics. Default empty for backwards
64    /// compatibility — implementors can override to route to crash metrics.
65    fn on_panic(&self, _scope: ErrorScope, _payload: &str) {}
66}