weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! [`EventSink`] trait and built-in implementations: stdout, in-memory, channel, and JSON lines.
use flume;
use std::any::type_name;
use std::fs::File;
use std::io::{self, Result as IoResult, Stdout, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};

use super::event::Event;
use crate::telemetry::{PlainFormatter, TelemetryFormatter};

/// Output target that consumes structured [`Event`] objects.
pub trait EventSink: Sync + Send {
    /// Deliver a structured event to this sink.
    ///
    /// Implementations may perform blocking I/O; the event bus dispatches via
    /// `spawn_blocking` so callers are not stalled.
    fn handle(&mut self, event: &Event) -> IoResult<()>;

    /// Human-readable identifier for this sink instance.
    ///
    /// Defaults to the concrete type name; override to include configuration context.
    fn name(&self) -> String {
        type_name::<Self>().to_string()
    }
}

/// Stdout sink backed by an optional [`TelemetryFormatter`].
pub struct StdOutSink<F: TelemetryFormatter = PlainFormatter> {
    handle: Stdout,
    formatter: F,
}

impl Default for StdOutSink {
    fn default() -> Self {
        Self {
            handle: io::stdout(),
            formatter: PlainFormatter::new(),
        }
    }
}

impl<F: TelemetryFormatter> StdOutSink<F> {
    /// Build a `StdOutSink` using `formatter` to render each event.
    pub fn with_formatter(formatter: F) -> Self {
        Self {
            handle: io::stdout(),
            formatter,
        }
    }
}

impl<F: TelemetryFormatter> EventSink for StdOutSink<F> {
    fn handle(&mut self, event: &Event) -> IoResult<()> {
        let text = self.formatter.render_event(event).join_lines();
        self.handle.write_all(text.as_bytes())?;
        self.handle.flush()
    }
}

/// In-memory sink that accumulates events for inspection or testing.
#[derive(Clone, Default)]
pub struct MemorySink {
    entries: Arc<Mutex<Vec<Event>>>,
}

impl MemorySink {
    /// Create an empty `MemorySink`.
    pub fn new() -> Self {
        Self::default()
    }

    /// Return a snapshot of all captured events.
    ///
    /// Clones the internal buffer so callers do not hold the mutex.
    pub fn snapshot(&self) -> Vec<Event> {
        self.entries
            .lock()
            .expect("MemorySink mutex poisoned")
            .clone()
    }

    /// Discard all captured events.
    pub fn clear(&self) {
        self.entries
            .lock()
            .expect("MemorySink mutex poisoned")
            .clear();
    }
}

impl EventSink for MemorySink {
    fn handle(&mut self, event: &Event) -> IoResult<()> {
        self.entries
            .lock()
            .expect("MemorySink mutex poisoned")
            .push(event.clone());
        Ok(())
    }
}

/// Sink that writes one JSON object per line (JSONL / JSON Lines format).
///
/// Each event serializes to a single output line, suitable for log aggregation
/// pipelines (ELK, Splunk, DataDog), stream processors, and structured test assertions.
///
/// # Example
///
/// ```rust,no_run
/// use weavegraph::event_bus::{EventBus, JsonLinesSink};
///
/// let bus = EventBus::with_sinks(vec![Box::new(JsonLinesSink::to_stdout())]);
/// ```
pub struct JsonLinesSink {
    handle: Box<dyn Write + Send + Sync>,
    pretty: bool,
}

impl JsonLinesSink {
    /// Create a compact (one-line-per-event) sink writing to `handle`.
    pub fn new(handle: Box<dyn Write + Send + Sync>) -> Self {
        Self {
            handle,
            pretty: false,
        }
    }

    /// Create a pretty-printed sink writing to `handle`.
    ///
    /// Pretty-printed output spans multiple lines and is **not** valid JSONL.
    /// Use for debugging and human-readable logs only.
    pub fn with_pretty_print(handle: Box<dyn Write + Send + Sync>) -> Self {
        Self {
            handle,
            pretty: true,
        }
    }

    /// Create a compact sink writing to stdout.
    pub fn to_stdout() -> Self {
        Self::new(Box::new(io::stdout()))
    }

    /// Create a compact sink writing to `path` (created or truncated).
    ///
    /// Returns an error if the file cannot be opened.
    pub fn to_file(path: impl AsRef<Path>) -> IoResult<Self> {
        Ok(Self::new(Box::new(File::create(path)?)))
    }
}

impl EventSink for JsonLinesSink {
    fn handle(&mut self, event: &Event) -> IoResult<()> {
        let json = if self.pretty {
            event.to_json_pretty()
        } else {
            event.to_json_string()
        }
        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

        writeln!(self.handle, "{json}")?;
        self.handle.flush()
    }

    fn name(&self) -> String {
        if self.pretty {
            "JsonLinesSink(pretty)".to_string()
        } else {
            "JsonLinesSink".to_string()
        }
    }
}

/// Sink that forwards events to a [`flume`] channel for async consumers.
///
/// Enables real-time streaming to web clients, dashboards, or monitoring systems.
/// Wire the paired receiver to your consumer before the graph starts running.
///
/// ⚠️ Must be injected via `AppRunner` — `App::invoke()` creates its own internal
/// bus and will not route events through this sink.
///
/// # Example
///
/// ```rust,no_run
/// use weavegraph::event_bus::{EventBus, ChannelSink};
/// use weavegraph::runtimes::{AppRunner, CheckpointerType};
/// # use weavegraph::app::App;
/// # async fn example(app: App) -> Result<(), Box<dyn std::error::Error>> {
///
/// let (tx, rx) = flume::unbounded();
/// let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]);
///
/// let mut runner = AppRunner::builder()
///     .app(app)
///     .checkpointer(CheckpointerType::InMemory)
///     .event_bus(bus)
///     .build()
///     .await;
///
/// tokio::spawn(async move {
///     while let Ok(event) = rx.recv_async().await {
///         println!("{event:?}");
///     }
/// });
///
/// runner.run_until_complete("session").await?;
/// # Ok(())
/// # }
/// ```
///
/// If the receiver is dropped before the graph finishes, `handle()` returns a
/// [`BrokenPipe`](io::ErrorKind::BrokenPipe) error. The event bus logs it and
/// continues broadcasting to any remaining sinks.
pub struct ChannelSink {
    tx: flume::Sender<Event>,
}

impl ChannelSink {
    /// Create a `ChannelSink` that forwards events through `tx`.
    pub fn new(tx: flume::Sender<Event>) -> Self {
        Self { tx }
    }
}

impl EventSink for ChannelSink {
    fn handle(&mut self, event: &Event) -> IoResult<()> {
        self.tx
            .send(event.clone())
            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "channel receiver dropped"))
    }
}