weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
# Delivering live events from Weavegraph
<!-- authored-nav-streaming -->
- Use [ARCHITECTURE.md]ARCHITECTURE.md for the larger runtime map behind these helpers.
- Use [OPERATIONS.md]OPERATIONS.md when you need deployment, persistence, or troubleshooting guidance.
- Runnable samples live in [examples/convenience_streaming.rs]../examples/convenience_streaming.rs, [examples/streaming_events.rs]../examples/streaming_events.rs, and [examples/production_streaming.rs]../examples/production_streaming.rs.
<!-- authored-streaming-intro -->
Weavegraph exposes three convenience entry points on `App` and one shared subscription type, `EventStream`. Pick the helper based on where the events need to go: a `flume` receiver, a custom sink list, or a broadcast stream that feeds SSE or WebSocket code.
<!-- authored-choice-table -->
## Which helper to reach for
| If you need... | Call this API | What you get back | Typical fit |
| --- | --- | --- | --- |
| a quick receiver in a CLI or test harness | `App::invoke_with_channel` | `(Result<VersionedState, RunnerError>, flume::Receiver<Event>)` | progress output, smoke tests, ad-hoc scripts |
| extra sinks in addition to the runtime defaults | `App::invoke_with_sinks` | `Result<VersionedState, RunnerError>` | stdout + JSONL, memory capture, metrics fan-out |
| a background run plus a first-class subscription | `App::invoke_streaming` | `(InvocationHandle, EventStream)` | SSE, WebSocket, long-lived dashboards |
| total control over session lifetime | `AppRunner::builder()` | your own runner and your own bus | iterative sessions, custom checkpointing, bespoke wiring |
<!-- authored-channel-helper -->
## `invoke_with_channel`
`invoke_with_channel` is the smallest streaming surface. The method builds an `EventBus` from `RuntimeConfig`, appends a `ChannelSink`, starts the workflow, and hands the caller the paired receiver.
Use it when a single consumer is enough and you do not need to manage `EventStream` directly.
~~~rust,no_run
use tokio::time::{timeout, Duration};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let seed = VersionedState::new_with_user_message("compile daily report");
let (result, rx) = app.invoke_with_channel(seed).await;
let collector = tokio::spawn(async move {
    while let Ok(event) = timeout(Duration::from_millis(250), rx.recv_async()).await.unwrap_or_else(|_| Err(flume::RecvError::Disconnected)) {
        println!("{} :: {}", event.scope_label().unwrap_or("unknown"), event.message());
    } // receiver loop
}); // spawned collector
let finished = result?;
collector.await?;
assert!(!finished.snapshot().messages.is_empty());
# Ok(()) }
~~~
A few practical notes:
- The workflow still runs to completion even if the receiver is slow.
- If the receiver disappears, `ChannelSink` reports a broken pipe and the bus keeps serving any remaining sinks.
- The result half resolves to the final `VersionedState`, so this helper still works for one-shot command-line tools.
<!-- authored-sinks-helper -->
## `invoke_with_sinks`
`invoke_with_sinks` keeps the sinks already described by `RuntimeConfig.event_bus` and appends the sinks you pass in. This is the helper to use when one invocation needs multiple outputs at once.
~~~rust,no_run
use weavegraph::event_bus::{ChannelSink, JsonLinesSink, StdOutSink};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = flume::unbounded();
let state = VersionedState::new_with_user_message("refresh cache");
let _final_state = app.invoke_with_sinks(
    state,
    vec![
        Box::new(StdOutSink::default()),
        Box::new(JsonLinesSink::to_stdout()),
        Box::new(ChannelSink::new(tx)),
    ],
).await?;
while let Ok(event) = rx.try_recv() {
    eprintln!("mirrored event: {}", event.message());
}
# Ok(()) }
~~~
This helper is useful when you want a plain-text operator feed, a structured log sink, and a receiver for application code without building `AppRunner` by hand.
<!-- authored-streaming-helper -->
## `invoke_streaming`
`invoke_streaming` is the preferred interface for web transports. It creates a fresh `EventBus`, subscribes an `EventStream`, spawns the workflow in the background, and returns an `InvocationHandle` that can be awaited or aborted.
~~~rust,no_run
use futures_util::StreamExt as _;
use weavegraph::event_bus::{Event, STREAM_END_SCOPE};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let seed = VersionedState::new_with_user_message("stream this run");
let (handle, events) = app.invoke_streaming(seed).await;
let reader = tokio::spawn(async move {
    let mut feed = events.into_async_stream();
    loop {
        match feed.next().await {
            Some(Event::Diagnostic(marker)) if marker.scope() == STREAM_END_SCOPE => break,
            Some(event) => println!("event => {}", event.message()),
            None => break,
        } // next event branch
    } // async stream loop
}); // spawned reader
let final_state = handle.join().await?;
reader.await?;
assert!(final_state.snapshot().messages.len() >= 1);
# Ok(()) }
~~~
Two lifecycle rules matter here:
- Dropping the `InvocationHandle` aborts the background workflow task.
- Dropping only the `EventStream` does not stop the run; call `abort()` or `join()` on the handle when the client disconnects.
The sentinel diagnostic with scope `STREAM_END_SCOPE` is the clean end-of-stream marker. The runtime emits it before closing the channel so HTTP code can flush a final frame and then terminate the connection intentionally.
<!-- authored-eventstream-api -->
## `EventStream` operations
`EventStream` wraps a Tokio broadcast receiver and exposes several consumption styles.
| Method | What it does | Good for |
| --- | --- | --- |
| `recv().await` | wait for the next event or a lag/closed error | direct async loops |
| `try_recv()` | poll without blocking | manual event pumps |
| `into_async_stream()` | produce a boxed `futures` stream and silently skip lagged slots | SSE or WebSocket adapters |
| `into_blocking_iter()` | iterate from synchronous code | CLI bridges or thread-based tools |
| `next_timeout(duration).await` | wait up to a deadline and skip lag notices | periodic polling loops |
| `with_shutdown(watch_rx)` | stop the stream when external shutdown flips to `true` | server-managed cancellation |
Lag handling is built into the hub rather than the consumer. When a receiver falls behind, the hub records dropped messages, logs a warning, and keeps the producer side moving.
<!-- authored-sse-example -->
## Server-Sent Events shape
`examples/production_streaming.rs` shows the full Axum integration, but the essential flow is short: start the workflow, map each `Event` into an SSE frame, and stop when `STREAM_END_SCOPE` arrives.
~~~rust,no_run
use axum::response::sse::{Event as AxumSseFrame, Sse};
use futures_util::StreamExt as _;
use weavegraph::event_bus::{Event, STREAM_END_SCOPE};
# async fn sse(app: weavegraph::app::App, initial: weavegraph::state::VersionedState) {
let (handle, events) = app.invoke_streaming(initial).await;
let sse_stream = events.into_async_stream().map(|event| {
    let _closing_frame = matches!(&event, Event::Diagnostic(marker) if marker.scope() == STREAM_END_SCOPE);
    Ok::<_, std::convert::Infallible>(AxumSseFrame::default().json_data(event).expect("serialize event"))
}); // SSE frame mapper
let join_task = tokio::spawn(async move {
    match handle.join().await {
        Ok(_) => {}
        Err(err) => tracing::error!("workflow join failed: {err}"),
    } // join outcome
}); // spawned join task
let _ = join_task;
let _response = Sse::new(sse_stream);
# }
~~~
If you need to cancel when the client vanishes, keep the handle in shared state and call `abort()` from the disconnect path, just like the production example does.
<!-- authored-buffering -->
## Buffer sizing and diagnostics
The broadcast side is configured through `RuntimeConfig` and `EventBusConfig`.
~~~rust,no_run
use weavegraph::runtimes::{DiagnosticsConfig, EventBusConfig, RuntimeConfig, SinkConfig};
let runtime = RuntimeConfig::default().with_event_bus(
    EventBusConfig::new(2048, vec![SinkConfig::StdOut]).with_diagnostics(DiagnosticsConfig {
        enabled: true,
        buffer_capacity: Some(512),
        emit_to_events: false,
    })
);
let _ = runtime;
~~~
What these settings change:
- `buffer_capacity` sets the size of the broadcast ring used by `EventHub`.
- diagnostics capacity can match or diverge from the main event buffer.
- `emit_to_events` controls whether sink failures stay isolated in `DiagnosticsStream` or are also mirrored into the primary event feed.
At runtime you can inspect sink status with `EventBus::sink_health()` and subscribe to failure notifications with `EventBus::diagnostics()`. That is the supported way to watch sink health without polluting the normal client stream.
<!-- authored-event-kinds -->
## Event payload families
Every helper above ultimately delivers the same `event_bus::Event` enum.
- `Event::Node` carries structured node output emitted through `NodeContext::emit`.
- `Event::Diagnostic` carries framework markers, including `STREAM_END_SCOPE` and `INVOCATION_END_SCOPE`.
- `Event::LLM` carries chunk, final, and error events produced by `emit_llm_chunk`, `emit_llm_final`, or `emit_llm_error`.
Because every variant exposes `scope_label()` and `message()`, many consumers can stay generic and only branch when they need variant-specific metadata.
<!-- authored-advanced-runner -->
## When to drop down to `AppRunner`
The convenience methods cover the common one-shot cases. Use `AppRunner::builder()` when you need any of the following:
- an explicitly supplied `EventBus`,
- a custom or preconnected `Checkpointer`,
- iterative sessions driven by `invoke_next`,
- a runner that survives across multiple logical inputs.
That lower-level API is the same engine used underneath the convenience helpers, so moving down a level does not change event semantics; it only exposes more of the wiring.
<!-- authored-next-steps-streaming -->
## Where to explore next
- [examples/convenience_streaming.rs]../examples/convenience_streaming.rs demonstrates the channel and multi-sink helpers.
- [examples/streaming_events.rs]../examples/streaming_events.rs shows the generic `EventStream` pattern.
- [examples/production_streaming.rs]../examples/production_streaming.rs shows SSE plus checkpoint-backed execution.
- [ARCHITECTURE.md]ARCHITECTURE.md explains how the bus, scheduler, reducers, and checkpointers fit together.