use std::collections::HashMap;
use std::io;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use chrono::Utc;
use tokio::sync::{broadcast, oneshot};
use tokio::task;
use super::diagnostics::{DiagnosticsStream, HealthState, SinkDiagnostic, SinkHealth};
use super::emitter::EventEmitter;
use super::hub::{EventHub, EventHubMetrics, EventStream};
use super::sink::{EventSink, StdOutSink};
const DEFAULT_BUFFER_CAPACITY: usize = 1024;
pub struct EventBus {
sinks: Arc<Mutex<Vec<SinkEntry>>>,
hub: Arc<EventHub>,
started: AtomicBool,
generation: Arc<AtomicU64>,
diagnostics_tx: broadcast::Sender<SinkDiagnostic>,
health: Arc<Mutex<HashMap<String, HealthState>>>,
diagnostics_enabled: bool,
diagnostics_emit_to_events: bool,
}
impl Default for EventBus {
fn default() -> Self {
Self::with_sink(StdOutSink::default())
}
}
impl EventBus {
pub fn with_sink<T: EventSink + 'static>(sink: T) -> Self {
Self::with_sinks(vec![Box::new(sink)])
}
pub fn with_sinks(sinks: Vec<Box<dyn EventSink>>) -> Self {
Self::with_capacity(sinks, DEFAULT_BUFFER_CAPACITY)
}
pub(crate) fn with_capacity(sinks: Vec<Box<dyn EventSink>>, buffer_capacity: usize) -> Self {
Self::with_capacity_and_diag(sinks, buffer_capacity, buffer_capacity, true, false)
}
pub(crate) fn with_capacity_and_diag(
sinks: Vec<Box<dyn EventSink>>,
buffer_capacity: usize,
diagnostics_capacity: usize,
diagnostics_enabled: bool,
diagnostics_emit_to_events: bool,
) -> Self {
let hub = EventHub::new(buffer_capacity);
let entries = sinks.into_iter().map(SinkEntry::new).collect();
let (diagnostics_tx, _) = broadcast::channel(diagnostics_capacity.max(1));
Self {
sinks: Arc::new(Mutex::new(entries)),
hub,
started: AtomicBool::new(false),
generation: Arc::new(AtomicU64::new(0)),
diagnostics_tx,
health: Arc::new(Mutex::new(HashMap::new())),
diagnostics_enabled,
diagnostics_emit_to_events,
}
}
pub fn add_sink<T: EventSink + 'static>(&self, sink: T) {
self.add_boxed_sink(Box::new(sink));
}
pub fn add_boxed_sink(&self, sink: Box<dyn EventSink>) {
let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
let mut entry = SinkEntry::new(sink);
if self.started.load(Ordering::SeqCst) {
entry.spawn_worker(
Arc::clone(&self.hub),
Arc::clone(&self.generation),
self.generation.load(Ordering::SeqCst),
self.worker_diag(),
);
}
sinks.push(entry);
}
pub fn get_emitter(&self) -> Arc<dyn EventEmitter> {
Arc::new(self.hub.emitter())
}
pub fn metrics(&self) -> EventHubMetrics {
self.hub.metrics()
}
pub fn subscribe(&self) -> EventStream {
self.listen_for_events();
self.hub.subscribe()
}
pub fn diagnostics(&self) -> DiagnosticsStream {
DiagnosticsStream::new(self.diagnostics_tx.subscribe())
}
pub fn sink_health(&self) -> Vec<SinkHealth> {
let health = self.health.lock().expect("EventBus health mutex poisoned");
health
.iter()
.map(|(name, state)| SinkHealth {
sink: name.clone(),
error_count: state.error_count,
last_error: state.last_error.clone(),
last_error_at: state.last_error_at,
})
.collect()
}
pub fn listen_for_events(&self) {
if self.started.swap(true, Ordering::SeqCst) {
return;
}
let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
let generation = self.generation.load(Ordering::SeqCst);
for entry in sinks.iter_mut() {
entry.spawn_worker(
Arc::clone(&self.hub),
Arc::clone(&self.generation),
generation,
self.worker_diag(),
);
}
}
fn worker_diag(&self) -> WorkerDiag {
WorkerDiag {
tx: self.diagnostics_tx.clone(),
health: Arc::clone(&self.health),
enabled: self.diagnostics_enabled,
emit_as_events: self.diagnostics_emit_to_events,
}
}
pub async fn stop_listener(&self) {
if !self.started.swap(false, Ordering::SeqCst) {
return;
}
self.generation.fetch_add(1, Ordering::SeqCst);
let workers: Vec<SinkWorker> = {
let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
sinks.iter_mut().filter_map(|e| e.worker.take()).collect()
};
for SinkWorker { shutdown, handle } in workers {
let _ = shutdown.send(());
let _ = handle.await;
}
}
pub fn close_channel(&self) {
self.hub.close();
}
}
impl Drop for EventBus {
fn drop(&mut self) {
self.hub.close();
if self.started.load(Ordering::SeqCst) {
let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
for entry in sinks.iter_mut() {
entry.abort_worker();
}
}
}
}
struct SinkEntry {
sink: Arc<Mutex<Box<dyn EventSink>>>,
name: String,
worker: Option<SinkWorker>,
}
impl SinkEntry {
fn new(sink: Box<dyn EventSink>) -> Self {
let candidate = sink.name();
let trait_default = std::any::type_name::<dyn EventSink>();
let name = if candidate == trait_default {
std::any::type_name_of_val(&*sink).to_owned()
} else {
candidate
};
Self {
sink: Arc::new(Mutex::new(sink)),
name,
worker: None,
}
}
fn spawn_worker(
&mut self,
hub: Arc<EventHub>,
generation_counter: Arc<AtomicU64>,
spawned_generation: u64,
diag: WorkerDiag,
) {
if self.worker.is_some() {
return;
}
let sink = Arc::clone(&self.sink);
let sink_name = self.name.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let mut stream = hub.subscribe();
let WorkerDiag {
tx: diagnostics_tx,
health,
enabled: diagnostics_enabled,
emit_as_events: emit_diagnostics_as_events,
} = diag;
let handle = task::spawn(async move {
loop {
if generation_counter.load(Ordering::SeqCst) != spawned_generation {
break;
}
tokio::select! {
_ = &mut shutdown_rx => break,
event = stream.recv() => match event {
Ok(event) => {
let sink = Arc::clone(&sink);
let dispatch = task::spawn_blocking(move || -> io::Result<()> {
sink.lock().expect("sink mutex poisoned").handle(&event)
});
let (label, err_msg) = match dispatch.await {
Ok(Ok(())) => continue,
Ok(Err(e)) => ("event_bus.sink_error", e.to_string()),
Err(e) => ("event_bus.sink_join_error", e.to_string()),
};
tracing::error!(
target: "weavegraph::event_bus",
error = %err_msg,
sink = %sink_name,
%label,
"sink worker error"
);
if diagnostics_enabled {
let mut map = health.lock().expect("health mutex poisoned");
let state = map.entry(sink_name.clone()).or_default();
state.error_count = state.error_count.saturating_add(1);
state.last_error = Some(err_msg.clone());
state.last_error_at = Some(Utc::now());
let occurrence = state.error_count;
drop(map);
let _ = diagnostics_tx.send(SinkDiagnostic {
sink: sink_name.clone(),
error: err_msg.clone(),
when: Utc::now(),
occurrence,
});
}
if emit_diagnostics_as_events {
let _ = hub.publish(super::event::Event::diagnostic(
label,
format!("{sink_name}: {err_msg}"),
));
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
}
}
}
});
self.worker = Some(SinkWorker {
shutdown: shutdown_tx,
handle,
});
}
fn abort_worker(&mut self) {
if let Some(worker) = self.worker.take() {
let _ = worker.shutdown.send(());
worker.handle.abort();
}
}
}
struct SinkWorker {
shutdown: oneshot::Sender<()>,
handle: task::JoinHandle<()>,
}
struct WorkerDiag {
tx: broadcast::Sender<SinkDiagnostic>,
health: Arc<Mutex<HashMap<String, HealthState>>>,
enabled: bool,
emit_as_events: bool,
}