weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! [`EventBus`] implementation: fan-out broadcast to registered [`EventSink`] workers.

use std::collections::HashMap;
use std::io;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use chrono::Utc;
use tokio::sync::{broadcast, oneshot};
use tokio::task;

use super::diagnostics::{DiagnosticsStream, HealthState, SinkDiagnostic, SinkHealth};
use super::emitter::EventEmitter;
use super::hub::{EventHub, EventHubMetrics, EventStream};
use super::sink::{EventSink, StdOutSink};

const DEFAULT_BUFFER_CAPACITY: usize = 1024;

/// Central event broadcasting system that fans out workflow execution events to registered sinks.
///
/// `EventBus` is the observability backbone of Weavegraph. Workflow nodes emit events via
/// [`EventEmitter`]; the bus delivers each event to every registered [`EventSink`] in a
/// dedicated background worker.
///
/// Owned by [`AppRunner`](crate::runtimes::runner::AppRunner) rather than
/// [`App`](crate::app::App), so multiple runners can share the same graph with isolated
/// event configurations, and per-request isolation in web servers is straightforward.
///
/// ```text
/// Workflow Nodes
///     │ ctx.emit()
////// EventBus
///     │ broadcast
///     ├─────┬─────┬─────┐
///     ▼     ▼     ▼     ▼
/// StdOut Channel File Custom
///  Sink   Sink   Sink  Sink
/// ```
///
/// # Available Sinks
///
/// - [`StdOutSink`] — write events to stdout (default)
/// - [`ChannelSink`](crate::event_bus::ChannelSink) — stream events to async channels
/// - [`MemorySink`](crate::event_bus::MemorySink) — capture events for testing
/// - Custom sinks implementing [`EventSink`]
pub struct EventBus {
    sinks: Arc<Mutex<Vec<SinkEntry>>>,
    hub: Arc<EventHub>,
    started: AtomicBool,
    generation: Arc<AtomicU64>,
    diagnostics_tx: broadcast::Sender<SinkDiagnostic>,
    health: Arc<Mutex<HashMap<String, HealthState>>>,
    diagnostics_enabled: bool,
    diagnostics_emit_to_events: bool,
}

impl Default for EventBus {
    fn default() -> Self {
        Self::with_sink(StdOutSink::default())
    }
}

impl EventBus {
    /// Create an `EventBus` with a single sink.
    pub fn with_sink<T: EventSink + 'static>(sink: T) -> Self {
        Self::with_sinks(vec![Box::new(sink)])
    }

    /// Create an `EventBus` backed by the provided collection of sinks.
    pub fn with_sinks(sinks: Vec<Box<dyn EventSink>>) -> Self {
        Self::with_capacity(sinks, DEFAULT_BUFFER_CAPACITY)
    }

    pub(crate) fn with_capacity(sinks: Vec<Box<dyn EventSink>>, buffer_capacity: usize) -> Self {
        Self::with_capacity_and_diag(sinks, buffer_capacity, buffer_capacity, true, false)
    }

    pub(crate) fn with_capacity_and_diag(
        sinks: Vec<Box<dyn EventSink>>,
        buffer_capacity: usize,
        diagnostics_capacity: usize,
        diagnostics_enabled: bool,
        diagnostics_emit_to_events: bool,
    ) -> Self {
        let hub = EventHub::new(buffer_capacity);
        let entries = sinks.into_iter().map(SinkEntry::new).collect();
        let (diagnostics_tx, _) = broadcast::channel(diagnostics_capacity.max(1));
        Self {
            sinks: Arc::new(Mutex::new(entries)),
            hub,
            started: AtomicBool::new(false),
            generation: Arc::new(AtomicU64::new(0)),
            diagnostics_tx,
            health: Arc::new(Mutex::new(HashMap::new())),
            diagnostics_enabled,
            diagnostics_emit_to_events,
        }
    }

    /// Add a typed sink to this bus, starting a worker if the bus is already live.
    pub fn add_sink<T: EventSink + 'static>(&self, sink: T) {
        self.add_boxed_sink(Box::new(sink));
    }

    /// Attach a new sink to the hub, starting a worker immediately if the bus is live.
    pub fn add_boxed_sink(&self, sink: Box<dyn EventSink>) {
        let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
        let mut entry = SinkEntry::new(sink);
        if self.started.load(Ordering::SeqCst) {
            entry.spawn_worker(
                Arc::clone(&self.hub),
                Arc::clone(&self.generation),
                self.generation.load(Ordering::SeqCst),
                self.worker_diag(),
            );
        }
        sinks.push(entry);
    }

    /// Return an [`EventEmitter`] handle for publishing events to this bus.
    pub fn get_emitter(&self) -> Arc<dyn EventEmitter> {
        Arc::new(self.hub.emitter())
    }

    /// Return current hub metrics (buffer capacity and cumulative drop count).
    pub fn metrics(&self) -> EventHubMetrics {
        self.hub.metrics()
    }

    /// Subscribe to the event stream, starting workers if not yet started.
    pub fn subscribe(&self) -> EventStream {
        self.listen_for_events();
        self.hub.subscribe()
    }

    /// Return a broadcast stream of [`SinkDiagnostic`] entries emitted when sinks error.
    ///
    /// Isolated from the main event flow to avoid feedback loops.
    pub fn diagnostics(&self) -> DiagnosticsStream {
        DiagnosticsStream::new(self.diagnostics_tx.subscribe())
    }

    /// Return a snapshot of per-sink health counters and last error details.
    pub fn sink_health(&self) -> Vec<SinkHealth> {
        let health = self.health.lock().expect("EventBus health mutex poisoned");
        health
            .iter()
            .map(|(name, state)| SinkHealth {
                sink: name.clone(),
                error_count: state.error_count,
                last_error: state.last_error.clone(),
                last_error_at: state.last_error_at,
            })
            .collect()
    }

    /// Start all registered sink workers. No-op if already started.
    pub fn listen_for_events(&self) {
        if self.started.swap(true, Ordering::SeqCst) {
            return;
        }
        let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
        let generation = self.generation.load(Ordering::SeqCst);
        for entry in sinks.iter_mut() {
            entry.spawn_worker(
                Arc::clone(&self.hub),
                Arc::clone(&self.generation),
                generation,
                self.worker_diag(),
            );
        }
    }

    fn worker_diag(&self) -> WorkerDiag {
        WorkerDiag {
            tx: self.diagnostics_tx.clone(),
            health: Arc::clone(&self.health),
            enabled: self.diagnostics_enabled,
            emit_as_events: self.diagnostics_emit_to_events,
        }
    }

    /// Signal all sink workers to stop pulling from the hub.
    pub async fn stop_listener(&self) {
        if !self.started.swap(false, Ordering::SeqCst) {
            return;
        }
        self.generation.fetch_add(1, Ordering::SeqCst);
        let workers: Vec<SinkWorker> = {
            let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
            sinks.iter_mut().filter_map(|e| e.worker.take()).collect()
        };
        for SinkWorker { shutdown, handle } in workers {
            let _ = shutdown.send(());
            let _ = handle.await;
        }
    }

    /// Close the underlying hub channel, signalling all subscribers that the stream has ended.
    pub fn close_channel(&self) {
        self.hub.close();
    }
}

impl Drop for EventBus {
    fn drop(&mut self) {
        self.hub.close();
        if self.started.load(Ordering::SeqCst) {
            let mut sinks = self.sinks.lock().expect("EventBus sinks mutex poisoned");
            for entry in sinks.iter_mut() {
                entry.abort_worker();
            }
        }
    }
}

struct SinkEntry {
    sink: Arc<Mutex<Box<dyn EventSink>>>,
    name: String,
    worker: Option<SinkWorker>,
}

impl SinkEntry {
    fn new(sink: Box<dyn EventSink>) -> Self {
        let candidate = sink.name();
        let trait_default = std::any::type_name::<dyn EventSink>();
        let name = if candidate == trait_default {
            std::any::type_name_of_val(&*sink).to_owned()
        } else {
            candidate
        };
        Self {
            sink: Arc::new(Mutex::new(sink)),
            name,
            worker: None,
        }
    }

    fn spawn_worker(
        &mut self,
        hub: Arc<EventHub>,
        generation_counter: Arc<AtomicU64>,
        spawned_generation: u64,
        diag: WorkerDiag,
    ) {
        if self.worker.is_some() {
            return;
        }
        let sink = Arc::clone(&self.sink);
        let sink_name = self.name.clone();
        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
        let mut stream = hub.subscribe();
        let WorkerDiag {
            tx: diagnostics_tx,
            health,
            enabled: diagnostics_enabled,
            emit_as_events: emit_diagnostics_as_events,
        } = diag;
        let handle = task::spawn(async move {
            loop {
                if generation_counter.load(Ordering::SeqCst) != spawned_generation {
                    break;
                }
                tokio::select! {
                    _ = &mut shutdown_rx => break,
                    event = stream.recv() => match event {
                        Ok(event) => {
                            let sink = Arc::clone(&sink);
                            let dispatch = task::spawn_blocking(move || -> io::Result<()> {
                                sink.lock().expect("sink mutex poisoned").handle(&event)
                            });
                            let (label, err_msg) = match dispatch.await {
                                Ok(Ok(())) => continue,
                                Ok(Err(e)) => ("event_bus.sink_error", e.to_string()),
                                Err(e) => ("event_bus.sink_join_error", e.to_string()),
                            };
                            tracing::error!(
                                target: "weavegraph::event_bus",
                                error = %err_msg,
                                sink = %sink_name,
                                %label,
                                "sink worker error"
                            );
                            if diagnostics_enabled {
                                let mut map = health.lock().expect("health mutex poisoned");
                                let state = map.entry(sink_name.clone()).or_default();
                                state.error_count = state.error_count.saturating_add(1);
                                state.last_error = Some(err_msg.clone());
                                state.last_error_at = Some(Utc::now());
                                let occurrence = state.error_count;
                                drop(map);
                                let _ = diagnostics_tx.send(SinkDiagnostic {
                                    sink: sink_name.clone(),
                                    error: err_msg.clone(),
                                    when: Utc::now(),
                                    occurrence,
                                });
                            }
                            if emit_diagnostics_as_events {
                                let _ = hub.publish(super::event::Event::diagnostic(
                                    label,
                                    format!("{sink_name}: {err_msg}"),
                                ));
                            }
                        }
                        Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
                        Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
                    }
                }
            }
        });
        self.worker = Some(SinkWorker {
            shutdown: shutdown_tx,
            handle,
        });
    }

    fn abort_worker(&mut self) {
        if let Some(worker) = self.worker.take() {
            let _ = worker.shutdown.send(());
            worker.handle.abort();
        }
    }
}

struct SinkWorker {
    shutdown: oneshot::Sender<()>,
    handle: task::JoinHandle<()>,
}

struct WorkerDiag {
    tx: broadcast::Sender<SinkDiagnostic>,
    health: Arc<Mutex<HashMap<String, HealthState>>>,
    enabled: bool,
    emit_as_events: bool,
}