weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
# Weavegraph crate anatomy
<!-- authored-nav-architecture -->
- Start with [QUICKSTART.md]QUICKSTART.md if you want the public API before the internals.
- Read [STREAMING.md]STREAMING.md for the event delivery helpers described here.
- Use [OPERATIONS.md]OPERATIONS.md for deployment, persistence, and troubleshooting notes.
- Browse [INDEX.md]INDEX.md for the rest of the published guides.
<!-- authored-intro-architecture -->
This file explains the runtime pieces that live under `src/` in the `weavegraph` crate. The codebase separates graph authoring from session execution: `graphs::GraphBuilder` assembles a workflow definition, `app::App` stores the compiled package, and `runtimes::AppRunner` turns that package into live sessions with checkpoints and structured events.
<!-- authored-module-table -->
## Module ledger
| Runtime slice | Primary modules | What that slice is responsible for |
| --- | --- | --- |
| Graph definition | `graphs::{builder,edges,compilation,iteration}` | Register nodes, store unconditional and conditional edges, validate topology, and expose graph iteration helpers. |
| Compiled facade | `app` | Hold the validated node registry plus graph metadata, then expose `invoke`, `invoke_streaming`, `invoke_with_channel`, and `invoke_with_sinks`. |
| Session engine | `runtimes::{runner,execution,session,streaming,observer}` | Create sessions, execute supersteps, emit completion markers, and manage iterative invocations. |
| Persistence backends | `runtimes::{checkpointer,checkpointer_sqlite,checkpointer_postgres,persistence,replay}` | Save and restore checkpoints in memory, SQLite, or Postgres. |
| Scheduling | `schedulers::scheduler` | Decide which frontier nodes run now, skip already-consumed snapshots, and cap concurrent work. |
| State model | `state`, `channels`, `message`, `control`, `types` | Represent the versioned workflow state, user/assistant messages, extra JSON data, errors, and routing commands. |
| Merge policy | `reducers::{reducer_registry,add_messages,map_merge,add_errors}` | Apply `NodePartial` deltas to the versioned channels in a deterministic order. |
| Observability | `event_bus::{bus,hub,event,sink,diagnostics}` | Broadcast node and framework events to sinks, subscribers, and health diagnostics. |
| Optional integrations | `llm`, `telemetry`, `utils` | Attach LLM adapters, render telemetry, and provide shared helpers such as IDs, clocks, and JSON utilities. |
<!-- authored-source-layout -->
## Source tree snapshot
~~~text
src/
├── app.rs
├── graphs/
│   ├── builder.rs
│   ├── compilation.rs
│   ├── edges.rs
│   └── iteration.rs
├── runtimes/
│   ├── runner.rs
│   ├── execution.rs
│   ├── session.rs
│   ├── streaming.rs
│   ├── checkpointer.rs
│   ├── checkpointer_sqlite.rs
│   └── checkpointer_postgres.rs
├── event_bus/
│   ├── bus.rs
│   ├── hub.rs
│   ├── event.rs
│   ├── sink.rs
│   └── diagnostics.rs
├── schedulers/scheduler.rs
├── reducers/
├── state.rs
├── node.rs
└── lib.rs
~~~
<!-- authored-flow-diagram -->
## End-to-end control path
~~~mermaid
flowchart LR
subgraph AuthoringSurface[Authoring surface]
builder[GraphBuilder chain]
registry[Reducer registry]
end
subgraph CompiledArtifact[Compiled artifact]
app[App definition]
end
subgraph SessionHost[Per-session runtime]
runner[AppRunner]
sched[Frontier scheduler]
barrier[Barrier merge]
end
subgraph StateCells[Versioned state]
store[VersionedState]
snap[StateSnapshot clone]
end
subgraph NodeLayer[Executable nodes]
workers[Node implementations]
end
subgraph IOEdges[Persistence and observers]
cp[Checkpointer backend]
bus[EventBus and EventHub]
clients[Subscribers or sinks]
end
builder -->|compile + validate| app
registry -->|attached during build| app
app -->|spawn invocation| runner
runner --> sched
store --> snap
sched -->|dispatch frontier| workers
snap --> workers
workers -->|NodePartial values| barrier
barrier -->|reduced updates| store
barrier -->|autosave checkpoint| cp
workers -->|ctx.emit / llm events| bus
runner -->|framework diagnostics| bus
bus --> clients
~~~
<!-- authored-build-phase -->
## GraphBuilder and compile-time checks
`GraphBuilder` starts empty and accepts fluent registration calls. `add_node` stores only executable `NodeKind::Custom` entries; attempts to register `Start` or `End` are ignored with a warning because those are structural markers rather than real nodes.
- `add_edge` records a fixed adjacency from one node kind to another.
- `add_conditional_edge` stores a predicate that inspects `StateSnapshot` and chooses the next frontier at runtime.
- `with_runtime_config` and `with_event_bus_config` carry runtime settings into the finished `App`.
- `with_reducer` appends one reducer to a channel, while `with_reducer_registry` swaps the whole merge policy.
`GraphBuilder::compile()` delegates to `graphs::compilation` before constructing the `App`. The validator rejects missing entry edges, unconditional cycles, duplicate edges, references to unknown custom nodes, and edges that originate from `End`. Reachability checks for “reachable from Start” and “has a route to End” run when the graph has no conditional edges, because predicates can hide the true path until execution time.
<!-- authored-app-runner -->
## `App` compared with `AppRunner`
`app::App` is the reusable compiled definition. It owns the node map, unconditional edge map, conditional edge list, reducer registry, and `RuntimeConfig`. Cloning an `App` is cheap enough for request handlers because the expensive work already happened at compile time.
`runtimes::AppRunner` is the execution host. It owns the per-run session table, the chosen `EventBus`, the optional checkpointer, the autosave flag, and optional observer or clock injection. One `App` can therefore serve many runners with separate event sinks or persistence settings.
The main entry points line up like this:
- `App::invoke` builds a runner from the runtime config and waits for the final `VersionedState`.
- `App::invoke_with_channel` appends a `ChannelSink` and returns `(Result<VersionedState, RunnerError>, flume::Receiver<Event>)`.
- `App::invoke_with_sinks` keeps the configured sinks and appends any extra sinks supplied by the caller.
- `App::invoke_streaming` allocates a fresh bus plus `EventStream`, spawns the workflow on Tokio, and hands back `(InvocationHandle, EventStream)`.
- `AppRunner::builder()` is the lower-level route when you need full control over the bus, checkpoint backend, or iterative session lifecycle.
`AppRunner` also exposes `create_session`, `create_iterative_session`, `invoke_next`, and `finish_iterative_session`, which is why the runner remains the escape hatch for multi-turn applications.
<!-- authored-state-barrier -->
## Versioned state, snapshots, and barrier reduction
The workflow state lives in `state::VersionedState`, which groups three independent channels: message history, arbitrary extra JSON, and accumulated error events. Each channel tracks its own version so the scheduler can tell whether a node has already consumed the current data.
Nodes never mutate the shared state directly. Each `Node::run` receives an immutable `StateSnapshot` plus a `NodeContext`, then returns a `NodePartial`. A partial can carry any combination of:
- appended messages,
- extra key/value updates,
- recoverable error events,
- a frontier command that changes routing.
`App::apply_barrier` collects all partials from one superstep, merges them into one aggregate update, and runs the reducer registry channel by channel. Channel versions increase only when the content actually changed, which keeps scheduler decisions deterministic.
The built-in reducer registry is assembled in `ReducerRegistry::default()`:
- `AddMessages` appends emitted messages.
- `MapMerge` performs a shallow JSON merge for extras and treats `null` as key deletion.
- `AddErrors` appends recoverable error entries.
Custom reducers can be stacked on a channel in registration order, so middleware-style validation or post-processing is possible without replacing the whole runtime.
<!-- authored-reducer-example -->
### Replacing one merge rule
~~~rust,no_run
use std::sync::Arc;
use weavegraph::graphs::GraphBuilder;
use weavegraph::reducers::{Reducer, ReducerRegistry};
use weavegraph::state::VersionedState;
use weavegraph::node::NodePartial;
use weavegraph::types::ChannelType;
struct LastWriteWins;
impl Reducer for LastWriteWins {
    fn definition_label(&self) -> &'static str { "docs::last_write_wins" }
    fn apply(&self, state: &mut VersionedState, update: &NodePartial) {
        if let Some(patch) = &update.extra {
            for (key, value) in patch { state.extra.get_mut().insert(key.clone(), value.clone()); }
        }
    }
}
let registry = ReducerRegistry::new().with_reducer(ChannelType::Extra, Arc::new(LastWriteWins));
let _builder = GraphBuilder::new().with_reducer_registry(registry);
~~~
<!-- authored-scheduler -->
## Scheduler behavior
The scheduler is intentionally small. `Scheduler` stores only a concurrency limit; `SchedulerState` carries the `versions_seen` map that remembers which message and extra versions each node already processed.
A superstep works in this order:
1. Pull the current frontier from the session state.
2. Skip `Start` and `End`, because they are structural nodes rather than user code.
3. Compare the snapshot versions against `versions_seen`; a node reruns only when a tracked version increased or the node has never been seen.
4. Execute eligible nodes concurrently up to `concurrency_limit`.
5. Hand the collected `NodePartial` values to the barrier reducer.
6. Compute the next frontier from unconditional edges, conditional edges, and any `FrontierCommand` values.
`create_session` seeds the scheduler with `available_parallelism()` from the host process, while `Scheduler::new(0)` still clamps to one worker so the runtime never creates a zero-width executor.
`NodeContext` is the bridge between the scheduler and user code. It carries the node identifier, step number, an event emitter, and optional clock or invocation metadata. The helper methods `emit`, `emit_diagnostic`, `emit_llm_chunk`, `emit_llm_final`, and `emit_llm_error` all route through that context.
<!-- authored-events -->
## Event bus and streaming hooks
The event system is split into two layers.
- `EventHub` owns the Tokio broadcast channel and tracks dropped-event metrics.
- `EventBus` manages worker tasks for sinks, subscribes clients, exposes diagnostics, and closes the channel when a run ends.
Every subscription becomes an `EventStream`. That stream can be consumed with `recv`, `try_recv`, `into_blocking_iter`, `into_async_stream`, or `next_timeout`. Slow subscribers do not block producers; the hub logs lag and increments the drop counter instead.
`event_bus::Event` has three variants:
- `Event::Node` for `NodeContext::emit` traffic,
- `Event::Diagnostic` for framework markers and generic telemetry,
- `Event::LLM` for chunk, final, and error notifications emitted by LLM-oriented nodes.
Sink delivery happens in per-sink worker tasks, and each worker calls `handle()` inside `spawn_blocking`, which lets a file sink or stdout sink perform blocking I/O without stalling async node execution.
A separate diagnostics channel tracks sink failures. `EventBus::diagnostics()` returns a `DiagnosticsStream`, and `EventBus::sink_health()` returns the last known counters and timestamps for each sink. Those diagnostics stay isolated from the main event feed unless `DiagnosticsConfig.emit_to_events` is enabled.
The runtime closes streams with framework markers rather than silent shutdown:
- `STREAM_END_SCOPE` means the whole event stream is finished and the channel will close.
- `INVOCATION_END_SCOPE` marks the end of one `invoke_next` call while keeping an iterative stream alive.
For runnable examples, see [examples/streaming_events.rs]../examples/streaming_events.rs, [examples/convenience_streaming.rs]../examples/convenience_streaming.rs, and [examples/production_streaming.rs]../examples/production_streaming.rs.
<!-- authored-checkpointing -->
## Checkpoint implementations
Checkpoint persistence is defined by the `Checkpointer` trait with three core operations: `save`, `load_latest`, and `list_sessions`.
- `InMemoryCheckpointer` keeps only the newest snapshot for each session and is the lightest option for tests or one-shot runs.
- `SQLiteCheckpointer` stores full step history in a local database and can apply embedded migrations when the `sqlite-migrations` feature is active.
- `PostgresCheckpointer` stores the same checkpoint model in PostgreSQL and can apply embedded migrations behind the `postgres-migrations` feature.
The shared `Checkpoint` struct captures the session identifier, step number, full `VersionedState`, current frontier, scheduler `versions_seen`, concurrency limit, created timestamp, and step-level metadata such as ran nodes, skipped nodes, and updated channels. `restore_session_state` reconstructs a runnable session from that record.
Checkpointing happens at the runner layer, not inside nodes. That keeps user code focused on business logic while `AppRunner` decides whether to autosave, resume from storage, or persist barrier results after each superstep.
<!-- authored-where-next -->
## Suggested next reads
- [examples/graph_execution.rs]../examples/graph_execution.rs shows the basic compile-and-run path.
- [examples/scheduler_fanout.rs]../examples/scheduler_fanout.rs demonstrates concurrent frontier execution.
- [examples/advanced_patterns.rs]../examples/advanced_patterns.rs covers conditional routing and control commands.
- [STREAMING.md]STREAMING.md narrows in on the live event APIs.
- [OPERATIONS.md]OPERATIONS.md covers production tuning, checkpoint deployment, and diagnostics policy.