use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::{broadcast, oneshot};
use tokio::task;
use super::diagnostics::{DiagnosticsStream, HealthState, SinkDiagnostic, SinkHealth};
use super::emitter::EventEmitter;
use super::hub::{EventHub, EventHubMetrics, EventStream};
use super::sink::{EventSink, StdOutSink};
use chrono::Utc;
const DEFAULT_BUFFER_CAPACITY: usize = 1024;
pub struct EventBus {
sinks: Arc<Mutex<Vec<SinkEntry>>>,
hub: Arc<EventHub>,
started: AtomicBool,
generation: Arc<AtomicU64>,
diagnostics_tx: broadcast::Sender<SinkDiagnostic>,
health: Arc<Mutex<std::collections::HashMap<String, HealthState>>>,
diagnostics_enabled: bool,
diagnostics_emit_to_events: bool,
}
impl Default for EventBus {
fn default() -> Self {
Self::with_sink(StdOutSink::default())
}
}
impl EventBus {
pub fn with_sink<T>(sink: T) -> Self
where
T: EventSink + 'static,
{
Self::with_sinks(vec![Box::new(sink)])
}
pub fn with_sinks(sinks: Vec<Box<dyn EventSink>>) -> Self {
Self::with_capacity(sinks, DEFAULT_BUFFER_CAPACITY)
}
pub(crate) fn with_capacity(sinks: Vec<Box<dyn EventSink>>, buffer_capacity: usize) -> Self {
Self::with_capacity_and_diag(sinks, buffer_capacity, buffer_capacity, true, false)
}
pub(crate) fn with_capacity_and_diag(
sinks: Vec<Box<dyn EventSink>>,
buffer_capacity: usize,
diagnostics_capacity: usize,
diagnostics_enabled: bool,
diagnostics_emit_to_events: bool,
) -> Self {
let hub = EventHub::new(buffer_capacity);
let entries = sinks.into_iter().map(SinkEntry::new).collect();
let (diagnostics_tx, _) = if diagnostics_enabled {
broadcast::channel(diagnostics_capacity.max(1))
} else {
broadcast::channel(1)
};
Self {
sinks: Arc::new(Mutex::new(entries)),
hub,
started: AtomicBool::new(false),
generation: Arc::new(AtomicU64::new(0)),
diagnostics_tx,
health: Arc::new(Mutex::new(std::collections::HashMap::new())),
diagnostics_enabled,
diagnostics_emit_to_events,
}
}
pub fn add_sink<T: EventSink + 'static>(&self, sink: T) {
self.add_boxed_sink(Box::new(sink));
}
pub fn add_boxed_sink(&self, sink: Box<dyn EventSink>) {
let mut sinks_guard = self.sinks.lock().expect("EventBus sinks mutex poisoned");
let mut entry = SinkEntry::new(sink);
if self.started.load(Ordering::SeqCst) {
let generation = self.generation.load(Ordering::SeqCst);
entry.spawn_worker(
self.hub.clone(),
Arc::clone(&self.generation),
generation,
self.diagnostics_tx.clone(),
Arc::clone(&self.health),
self.diagnostics_enabled,
self.diagnostics_emit_to_events,
);
}
sinks_guard.push(entry);
}
pub fn get_emitter(&self) -> Arc<dyn EventEmitter> {
Arc::new(self.hub.emitter())
}
pub fn metrics(&self) -> EventHubMetrics {
self.hub.metrics()
}
pub fn subscribe(&self) -> EventStream {
self.listen_for_events();
self.hub.subscribe()
}
pub fn diagnostics(&self) -> DiagnosticsStream {
DiagnosticsStream::new(self.diagnostics_tx.subscribe())
}
pub fn sink_health(&self) -> Vec<SinkHealth> {
let health = self.health.lock().expect("EventBus health mutex poisoned");
health
.iter()
.map(|(sink, state)| SinkHealth {
sink: sink.clone(),
error_count: state.error_count,
last_error: state.last_error.clone(),
last_error_at: state.last_error_at,
})
.collect()
}
pub fn listen_for_events(&self) {
if self.started.swap(true, Ordering::SeqCst) {
return;
}
let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
let generation = self.generation.load(Ordering::SeqCst);
for entry in sinks.iter_mut() {
entry.spawn_worker(
self.hub.clone(),
Arc::clone(&self.generation),
generation,
self.diagnostics_tx.clone(),
Arc::clone(&self.health),
self.diagnostics_enabled,
self.diagnostics_emit_to_events,
);
}
}
pub async fn stop_listener(&self) {
if !self.started.swap(false, Ordering::SeqCst) {
return;
}
self.generation.fetch_add(1, Ordering::SeqCst);
let workers = {
let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
let mut collected = Vec::with_capacity(sinks.len());
for entry in sinks.iter_mut() {
if let Some(worker) = entry.worker.take() {
collected.push(worker);
}
}
collected
};
for worker in workers {
let SinkWorker { shutdown, handle } = worker;
let _ = shutdown.send(());
let _ = handle.await;
}
}
pub fn close_channel(&self) {
self.hub.close();
}
}
impl Drop for EventBus {
fn drop(&mut self) {
self.hub.close();
if self.started.load(Ordering::SeqCst) {
let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
for entry in sinks.iter_mut() {
entry.abort_worker();
}
}
}
}
struct SinkEntry {
sink: Arc<Mutex<Box<dyn EventSink>>>,
name: String,
worker: Option<SinkWorker>,
}
impl SinkEntry {
fn new(sink: Box<dyn EventSink>) -> Self {
let candidate = sink.name();
let default_marker: &str = std::any::type_name::<dyn EventSink>();
let name = if candidate == default_marker {
std::any::type_name_of_val(&*sink).to_string()
} else {
candidate
};
Self {
sink: Arc::new(Mutex::new(sink)),
name,
worker: None,
}
}
#[allow(clippy::too_many_arguments)]
fn spawn_worker(
&mut self,
hub: Arc<EventHub>,
generation_state: Arc<AtomicU64>,
active_generation: u64,
diagnostics_tx: broadcast::Sender<SinkDiagnostic>,
health: Arc<Mutex<std::collections::HashMap<String, HealthState>>>,
diagnostics_enabled: bool,
diagnostics_emit_to_events: bool,
) {
if self.worker.is_some() {
return;
}
let sink = Arc::clone(&self.sink);
let sink_name = self.name.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let mut stream = hub.subscribe();
let de_enabled = diagnostics_enabled;
let de_emit = diagnostics_emit_to_events;
let hub_clone = Arc::clone(&hub);
let handle = task::spawn(async move {
fn record_sink_error(
health: &Arc<Mutex<std::collections::HashMap<String, HealthState>>>,
diagnostics_tx: &broadcast::Sender<SinkDiagnostic>,
sink_name: &str,
err_msg: &str,
diagnostics_enabled: bool,
) {
if diagnostics_enabled {
let mut map = health.lock().expect("health mutex poisoned");
let entry = map.entry(sink_name.to_string()).or_default();
entry.error_count = entry.error_count.saturating_add(1);
entry.last_error = Some(err_msg.to_string());
entry.last_error_at = Some(Utc::now());
let occurrence = entry.error_count;
drop(map);
let _ = diagnostics_tx.send(SinkDiagnostic {
sink: sink_name.to_string(),
error: err_msg.to_string(),
when: Utc::now(),
occurrence,
});
}
}
loop {
if generation_state.load(Ordering::SeqCst) != active_generation {
break;
}
tokio::select! {
_ = &mut shutdown_rx => break,
event = stream.recv() => match event {
Ok(event) => {
let sink = Arc::clone(&sink);
let sink_name = sink_name.clone();
let diagnostics_tx = diagnostics_tx.clone();
let health = Arc::clone(&health);
let diagnostics_enabled = de_enabled;
let hub_for_emit = Arc::clone(&hub_clone);
let diagnostics_emit_to_events = de_emit;
let dispatch = task::spawn_blocking(move || -> io::Result<()> {
let mut guard = sink.lock().expect("sink mutex poisoned");
guard.handle(&event)
});
match dispatch.await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
let err_msg = err.to_string();
tracing::error!(
target: "weavegraph::event_bus",
error = %err_msg,
sink = %sink_name,
"event sink reported an error while handling event"
);
record_sink_error(&health, &diagnostics_tx, &sink_name, &err_msg, diagnostics_enabled);
if diagnostics_emit_to_events {
let _ = hub_for_emit.publish(super::event::Event::diagnostic(
"event_bus.sink_error",
format!("{sink}: {err}", sink=sink_name, err=err_msg),
));
}
}
Err(err) => {
let err_msg = err.to_string();
tracing::error!(
target: "weavegraph::event_bus",
error = %err_msg,
sink = %sink_name,
"event sink worker task failed to join"
);
record_sink_error(&health, &diagnostics_tx, &sink_name, &err_msg, diagnostics_enabled);
if diagnostics_emit_to_events {
let _ = hub_for_emit.publish(super::event::Event::diagnostic(
"event_bus.sink_join_error",
format!("{sink}: {err}", sink=sink_name, err=err_msg),
));
}
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
}
}
}
});
self.worker = Some(SinkWorker {
shutdown: shutdown_tx,
handle,
});
}
fn abort_worker(&mut self) {
if let Some(worker) = self.worker.take() {
let _ = worker.shutdown.send(());
worker.handle.abort();
}
}
}
struct SinkWorker {
shutdown: oneshot::Sender<()>,
handle: task::JoinHandle<()>,
}