Skip to main content

Crate weavegraph

Crate weavegraph 

Source
Expand description
GraphBuilder -> App::compile -> AppRunner
                   |              |
                   |              +-> Scheduler -> Nodes -> NodePartial
                   |                               |
                   |                               +-> Reducers -> VersionedState
                   |                               +-> EventBus (diagnostics / LLM)
                   |
                   +-> RuntimeConfig (persistence, sinks, execution knobs)

Weavegraph is a graph-driven workflow framework for concurrent, stateful execution. You define nodes and edges with graphs::GraphBuilder, compile to an app::App, and run with either high-level invocation helpers or the lower-level runtimes::AppRunner.

§Quick Start

use async_trait::async_trait;
use weavegraph::graphs::GraphBuilder;
use weavegraph::message::Message;
use weavegraph::node::{Node, NodeContext, NodeError, NodePartial};
use weavegraph::state::{StateSnapshot, VersionedState};
use weavegraph::types::NodeKind;

struct EchoNode;

#[async_trait]
impl Node for EchoNode {
    async fn run(
        &self,
        snapshot: StateSnapshot,
        _ctx: NodeContext,
    ) -> Result<NodePartial, NodeError> {
        let reply = snapshot
            .messages
            .last()
            .map(|m| format!("Echo: {}", m.content))
            .unwrap_or_else(|| "Echo: (no input)".to_string());

        Ok(NodePartial::new().with_messages(vec![Message::assistant(&reply)]))
    }
}

let app = GraphBuilder::new()
    .add_node(NodeKind::Custom("echo".into()), EchoNode)
    .add_edge(NodeKind::Start, NodeKind::Custom("echo".into()))
    .add_edge(NodeKind::Custom("echo".into()), NodeKind::End)
    .compile()?;

let initial = VersionedState::new_with_user_message("hello");
let final_state = app.invoke(initial).await?;
assert!(!final_state.snapshot().messages.is_empty());

§Feature Flags

FeatureDefaultPurpose
sqlite-migrationsyesEnables SQLite persistence support via sqlx and migration wiring.
sqlitenoEnables SQLite checkpointer APIs and runtime backend.
postgres-migrationsnoEnables Postgres migration support for checkpointer setup.
postgresnoEnables PostgreSQL checkpointer APIs and runtime backend.
rignoEnables Rig-based LLM interop and adapters.
diagnosticsnoAdds miette diagnostic metadata to error types.
examplesnoPulls additional deps used by selected examples.
petgraph-compatnoExposes petgraph conversion helpers for graph analysis and visualization.

§Documentation

  • docs/QUICKSTART.md for API-first onboarding and composition patterns.
  • docs/OPERATIONS.md for runtime operations, persistence, and deployment concerns.
  • docs/STREAMING.md for event streaming patterns and production guidance.
  • docs/ARCHITECTURE.md for internal architecture and execution model details.

§Common Patterns

§Graph lifecycle

// 1. Build — declare nodes and edges.
// 2. Compile — validate topology, attach runtime config.
// 3. Invoke — run once or stream events to clients.
let app = GraphBuilder::new()
    /* .add_node(...).add_edge(...) */
    .compile()?;

let state = VersionedState::new_with_user_message("hello");
let final_state = app.invoke(state).await?;

See examples/graph_execution.rs for a runnable graph lifecycle example.

§Streaming events via SSE

// Each call gets an isolated runner + event bus.
let state = VersionedState::new_with_user_message("hello");
let (handle, event_stream) = app.invoke_streaming(state).await;

// Convert to an async stream and forward to your SSE layer.
// Terminate when STREAM_END_SCOPE is observed.
let _ = event_stream.into_async_stream(); // futures::Stream<Item = Event>
let _ = handle; // join or abort the background task

See examples/production_streaming.rs for the full Axum + Postgres reference.

§Error handling in nodes

// Return a domain error from any node:
fn validate(input: &str) -> Result<(), NodeError> {
    if input.is_empty() {
        return Err(NodeError::Other("input must not be empty".into()));
    }
    // Lift arbitrary std::error::Error with ?:
    std::str::from_utf8(input.as_bytes()).node_err()?;
    Ok(())
}

See examples/errors_pretty.rs for error display patterns.

Re-exports§

pub use control::FrontierCommand;
pub use control::NodeRoute;

Modules§

app
Application layer providing the high-level App entry point for workflow invocation.
channels
Channel types that form the typed state slots of a workflow’s VersionedState.
control
Control-flow primitives emitted by nodes to influence subsequent scheduling.
event_bus
Event bus utilities providing fan-out, sinks, and subscriber APIs.
graphs
Graph definition and compilation for workflow execution.
llm
Framework-agnostic LLM abstractions and optional adapters.
message
Message types representing chat turns and content in a workflow conversation.
node
Node execution framework for the Weavegraph workflow system.
reducers
State reducers that apply NodePartial updates to VersionedState.
runtimes
Workflow runtime infrastructure for session management and state persistence.
schedulers
Frontier-based workflow scheduler with version gating and bounded concurrency.
state
State management for the Weavegraph workflow framework.
telemetry
Telemetry formatting utilities for rendering workflow events as human-readable or machine-readable output.
types
Core types for the Weavegraph workflow framework.
utils
Utilities module for common functionality across the Weavegraph framework.