weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! [`EventHub`] broadcast channel, [`EventStream`] receiver, and blocking iterator.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use futures_util::stream::{self, BoxStream, StreamExt};
use std::sync::RwLock;
use tokio::sync::{
    broadcast::{self, Receiver, Sender},
    watch,
};
use tokio::time::timeout;

use super::emitter::{EmitterError, EventEmitter};
use super::event::Event;

/// Snapshot of hub health for monitoring and diagnostics.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EventHubMetrics {
    /// Maximum number of events buffered per subscriber before lag occurs.
    pub capacity: usize,
    /// Total count of events dropped due to slow subscribers.
    pub dropped: usize,
}

/// Broadcast hub that owns the Tokio broadcast channel used by [`EventBus`](crate::event_bus::EventBus).
#[derive(Debug)]
pub struct EventHub {
    sender: RwLock<Option<Sender<Event>>>,
    dropped_events: AtomicUsize,
    capacity: usize,
}

impl EventHub {
    /// Create a new hub backed by a Tokio broadcast channel.
    ///
    /// `capacity` is clamped to at least 1 to satisfy the broadcast API.
    pub fn new(capacity: usize) -> Arc<Self> {
        let capacity = capacity.max(1);
        let (sender, _) = broadcast::channel(capacity);
        Arc::new(Self {
            sender: RwLock::new(Some(sender)),
            dropped_events: AtomicUsize::new(0),
            capacity,
        })
    }

    /// Publish an event to all subscribers.
    ///
    /// Returns [`EmitterError::Closed`] if the hub has been shut down.
    pub fn publish(&self, event: Event) -> Result<(), EmitterError> {
        self.current_sender()
            .ok_or(EmitterError::Closed)
            .and_then(|s| s.send(event).map(|_| ()).map_err(|_| EmitterError::Closed))
    }

    /// Subscribe to a fresh receiver.
    ///
    /// If the hub has already been closed, returns a closed receiver so downstream
    /// code can proceed uniformly.
    pub fn subscribe(self: &Arc<Self>) -> EventStream {
        let receiver = match self.current_sender() {
            Some(s) => s.subscribe(),
            None => {
                let (tx, rx) = broadcast::channel(self.capacity);
                drop(tx);
                rx
            }
        };
        EventStream {
            receiver,
            hub: Arc::clone(self),
            shutdown: None,
        }
    }

    /// Returns the configured buffer capacity of the underlying broadcast channel.
    pub fn capacity(&self) -> usize {
        self.capacity
    }

    /// Returns the total count of events dropped due to slow subscribers.
    pub fn dropped(&self) -> usize {
        self.dropped_events.load(Ordering::Relaxed)
    }

    /// Returns a snapshot of current hub health metrics.
    pub fn metrics(&self) -> EventHubMetrics {
        EventHubMetrics {
            capacity: self.capacity(),
            dropped: self.dropped(),
        }
    }

    /// Create a [`HubEmitter`] that publishes events to this hub.
    pub fn emitter(self: &Arc<Self>) -> HubEmitter {
        HubEmitter {
            hub: Arc::clone(self),
        }
    }

    /// Close the hub and signal all subscribers that no further events will arrive.
    pub fn close(&self) {
        self.sender
            .write()
            .expect("hub sender lock poisoned")
            .take();
    }

    fn current_sender(&self) -> Option<Sender<Event>> {
        self.sender
            .read()
            .expect("hub sender lock poisoned")
            .clone()
    }

    fn record_lag(&self, missed: u64) {
        if missed == 0 {
            return;
        }
        let n = usize::try_from(missed).unwrap_or(usize::MAX);
        let prev = self.dropped_events.fetch_add(n, Ordering::Relaxed);
        tracing::warn!(
            target: "weavegraph::event_bus",
            missed,
            total_dropped = prev.saturating_add(n),
            "event stream lagged; dropped events"
        );
    }
}

/// [`EventEmitter`] implementation backed by an [`EventHub`] broadcast channel.
#[derive(Clone, Debug)]
pub struct HubEmitter {
    hub: Arc<EventHub>,
}

impl EventEmitter for HubEmitter {
    fn emit(&self, event: Event) -> Result<(), EmitterError> {
        self.hub.publish(event)
    }
}

/// Async receive handle for a subscription to an [`EventHub`].
#[derive(Debug)]
pub struct EventStream {
    receiver: Receiver<Event>,
    hub: Arc<EventHub>,
    shutdown: Option<watch::Receiver<bool>>,
}

impl EventStream {
    /// Receive the next event, awaiting if the channel is empty.
    pub async fn recv(&mut self) -> Result<Event, broadcast::error::RecvError> {
        match self.receiver.recv().await {
            Err(broadcast::error::RecvError::Lagged(n)) => {
                self.hub.record_lag(n);
                Err(broadcast::error::RecvError::Lagged(n))
            }
            result => result,
        }
    }

    /// Try to receive an event without blocking; returns immediately if none is available.
    pub fn try_recv(&mut self) -> Result<Event, broadcast::error::TryRecvError> {
        match self.receiver.try_recv() {
            Err(broadcast::error::TryRecvError::Lagged(n)) => {
                self.hub.record_lag(n);
                Err(broadcast::error::TryRecvError::Lagged(n))
            }
            result => result,
        }
    }

    /// Consume the stream and return the raw broadcast receiver.
    pub fn into_inner(self) -> Receiver<Event> {
        self.receiver
    }

    /// Convert this stream into a synchronous blocking iterator.
    pub fn into_blocking_iter(self) -> BlockingEventIter {
        BlockingEventIter {
            receiver: self.receiver,
            hub: self.hub,
        }
    }

    /// Attach a shutdown watch channel; the stream ends when the watch value becomes `true`.
    pub fn with_shutdown(mut self, shutdown: watch::Receiver<bool>) -> Self {
        self.shutdown = Some(shutdown);
        self
    }

    /// Convert this stream into a pinned `BoxStream` for use with async combinators.
    pub fn into_async_stream(self) -> BoxStream<'static, Event> {
        let EventStream {
            receiver,
            hub,
            shutdown,
        } = self;
        stream::unfold(
            (receiver, hub, shutdown),
            |(mut receiver, hub, mut shutdown)| async move {
                loop {
                    let recv_result = if let Some(ref mut rx) = shutdown {
                        tokio::select! {
                            biased;
                            changed = rx.changed() => {
                                if changed.is_ok() && *rx.borrow() { return None; }
                                continue;
                            }
                            result = receiver.recv() => result,
                        }
                    } else {
                        receiver.recv().await
                    };
                    match recv_result {
                        Ok(ev) => return Some((ev, (receiver, hub, shutdown))),
                        Err(broadcast::error::RecvError::Lagged(n)) => hub.record_lag(n),
                        Err(broadcast::error::RecvError::Closed) => return None,
                    }
                }
            },
        )
        .boxed()
    }

    /// Receive the next event, waiting at most `duration`; returns `None` on timeout or close.
    pub async fn next_timeout(&mut self, duration: Duration) -> Option<Event> {
        loop {
            match timeout(duration, self.recv()).await {
                Ok(Ok(event)) => return Some(event),
                Ok(Err(broadcast::error::RecvError::Lagged(_))) => continue,
                Ok(Err(broadcast::error::RecvError::Closed)) | Err(_) => return None,
            }
        }
    }
}

/// Synchronous blocking iterator over events from an [`EventHub`].
pub struct BlockingEventIter {
    receiver: Receiver<Event>,
    hub: Arc<EventHub>,
}

impl Iterator for BlockingEventIter {
    type Item = Event;

    fn next(&mut self) -> Option<Self::Item> {
        loop {
            match self.receiver.blocking_recv() {
                Ok(event) => return Some(event),
                Err(broadcast::error::RecvError::Lagged(n)) => self.hub.record_lag(n),
                Err(broadcast::error::RecvError::Closed) => return None,
            }
        }
    }
}