Skip to main content

Crate weavegraph

Crate weavegraph 

Source
Expand description

Weavegraph at a glance: graphs::GraphBuilder describes the workflow topology and compiles it into an app::App. runtimes::AppRunner executes the graph with schedulers, reducers, and node implementations. state::VersionedState carries messages, typed extras, and errors across node boundaries. event_bus exposes diagnostics and streaming hooks for observing a running workflow.

Weavegraph is a graph-driven workflow framework for concurrent, stateful execution. You define nodes and edges with graphs::GraphBuilder, compile to an app::App, and run with either high-level invocation helpers or the lower-level runtimes::AppRunner.

§Quick Start

use async_trait::async_trait;
use weavegraph::graphs::GraphBuilder;
use weavegraph::message::Message;
use weavegraph::node::{Node, NodeContext, NodeError, NodePartial};
use weavegraph::state::{StateSnapshot, VersionedState};
use weavegraph::types::NodeKind;

struct EchoNode;

#[async_trait]
impl Node for EchoNode {
    async fn run(
        &self,
        snapshot: StateSnapshot,
        _ctx: NodeContext,
    ) -> Result<NodePartial, NodeError> {
        let reply = snapshot
            .messages
            .last()
            .map(|m| format!("Echo: {}", m.content))
            .unwrap_or_else(|| "Echo: (no input)".to_string());

        Ok(NodePartial::new().with_messages(vec![Message::assistant(&reply)]))
    }
}

let app = GraphBuilder::new()
    .add_node(NodeKind::Custom("echo".into()), EchoNode)
    .add_edge(NodeKind::Start, NodeKind::Custom("echo".into()))
    .add_edge(NodeKind::Custom("echo".into()), NodeKind::End)
    .compile()?;

let initial = VersionedState::new_with_user_message("hello");
let final_state = app.invoke(initial).await?;
assert!(!final_state.snapshot().messages.is_empty());

§Feature Flags

FeatureDefaultPurpose
sqlite-migrationsyesEnables SQLite persistence support via sqlx and migration wiring.
sqlitenoEnables SQLite checkpointer APIs and runtime backend.
postgres-migrationsnoEnables Postgres migration support for checkpointer setup.
postgresnoEnables PostgreSQL checkpointer APIs and runtime backend.
rignoEnables Rig-based LLM interop and adapters.
diagnosticsnoAdds miette diagnostic metadata to error types.
examplesnoPulls additional deps used by selected examples.
petgraph-compatnoExposes petgraph conversion helpers for graph analysis and visualization.

§Documentation

  • docs/QUICKSTART.md for API-first onboarding and composition patterns.
  • docs/OPERATIONS.md for runtime operations, persistence, and deployment concerns.
  • docs/STREAMING.md for event streaming patterns and production guidance.
  • docs/ARCHITECTURE.md for internal architecture and execution model details.

§Common Patterns

§Graph lifecycle

// 1. Build — declare nodes and edges.
// 2. Compile — validate topology, attach runtime config.
// 3. Invoke — run once or stream events to clients.
let app = GraphBuilder::new()
    /* .add_node(...).add_edge(...) */
    .compile()?;

let state = VersionedState::new_with_user_message("hello");
let final_state = app.invoke(state).await?;

See examples/graph_execution.rs for a runnable graph lifecycle example.

§Streaming events via SSE

// Each call gets an isolated runner + event bus.
let state = VersionedState::new_with_user_message("hello");
let (handle, event_stream) = app.invoke_streaming(state).await;

// Convert to an async stream and forward to your SSE layer.
// Terminate when STREAM_END_SCOPE is observed.
let _ = event_stream.into_async_stream(); // futures::Stream<Item = Event>
let _ = handle; // join or abort the background task

See examples/production_streaming.rs for the full Axum + Postgres reference.

§Error handling in nodes

// Return a domain error from any node:
fn validate(input: &str) -> Result<(), NodeError> {
    if input.is_empty() {
        return Err(NodeError::Other("input must not be empty".into()));
    }
    // Lift arbitrary std::error::Error with ?:
    std::str::from_utf8(input.as_bytes()).node_err()?;
    Ok(())
}

See examples/errors_pretty.rs for error display patterns.

Re-exports§

pub use self::control::FrontierCommand;
pub use self::control::NodeRoute;

Modules§

app
High-level App entry point for workflow invocation.
channels
Channel types that form the typed state slots of a workflow’s VersionedState.
control
Control-flow directives emitted by nodes to shape the next scheduling frontier.
event_bus
Event bus: broadcast-based EventHub, sink configuration via EventBus, and subscriber APIs over the resulting EventStream.
graphs
Graph construction, compilation, and iteration for workflow execution.
llm
Framework-agnostic LLM provider traits and optional SDK adapters.
message
Chat message types for conversation turns and roles.
node
Node execution framework for the Weavegraph workflow system.
reducers
Reducers apply NodePartial deltas to VersionedState channel-by-channel.
runtimes
Runtime infrastructure for stepwise workflow execution and state persistence.
schedulers
Frontier-based workflow scheduler with version gating and bounded concurrency.
state
Versioned state management for workflow execution.
telemetry
Telemetry formatting: renders workflow events and errors as human-readable or machine-readable text.
types
Core domain types for workflow graphs.
utils
Shared utilities used throughout the Weavegraph framework.