Weavegraph
NOTE:
NodeKind::StartandNodeKind::Endare virtual structural endpoints.
You never register them withadd_node; attempts to do so are ignored with a warning.
Define only your executable (custom) nodes and connect them with edges fromStartand toEnd.
Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Weavegraph is a modern Rust framework for building complex, stateful workflows using graph-based execution. It's designed for AI agents, data processing pipelines, and any application requiring sophisticated state management with concurrent node execution.
⚠️ EARLY BETA WARNING
This framework is in active development (v0.1.x). APIs are evolving rapidly, and breaking changes will happen between minor versions.
The core architecture is solid, but expect rough edges, API churn, and occasional surprises. Pin exact versions if stability matters.
Use in production at your own risk—or better yet, help us shape the future by reporting issues and suggesting improvements.
✨ Key Features
- 🔄 Concurrent Graph Execution: Bounded-concurrency scheduler with dependency resolution and versioned barrier synchronization
- 📝 Rich Message System: Type-safe message construction with role-based messaging for AI workflows
- 🎯 Versioned State Management: Channel-based state with snapshot isolation and deterministic merges
- 🚨 Comprehensive Error Handling: Structured error propagation with beautiful diagnostics via
mietteandthiserror - 📊 Built-in Observability: Rich tracing spans and event streaming for monitoring and debugging
- 💾 Flexible Persistence: SQLite checkpointing with automatic schema management, plus in-memory options
- 🎭 Conditional Workflows: Dynamic routing and edge conditions based on runtime state
- 🔧 Developer Experience: Extensive examples, comprehensive docs, and ergonomic APIs
🚀 Quick Start
Add Weavegraph to your Cargo.toml:
[]
= "0.1"
Basic Example
use ;
use async_trait;
// Define a simple greeting node
;
async
Handling GraphCompileError
When compiling a graph, structural validation may fail (cycles, unreachable nodes, duplicate edges, etc.).
Propagate errors with ?, or match on specific variants for custom handling:
use ;
use NodeKind;
📋 Core Concepts
Messages
Messages are the primary communication primitive with convenient constructors:
use Message;
// Use convenience constructors (recommended)
let user_msg = user;
let assistant_msg = assistant;
let system_msg = system;
// For custom roles
let function_msg = new;
// Complex cases with builder pattern
let complex_msg = builder
.role
.content
.build;
State Management
Versioned state with channel isolation and snapshot consistency:
use VersionedState;
// Simple initialization
let state = new_with_user_message;
// Rich initialization with builder
let state = builder
.with_user_message
.with_system_message
.with_extra
.build;
Graph Building
Declarative workflow definition with conditional routing:
use GraphBuilder;
use NodeKind;
use Arc;
let graph = new
.add_node
.add_node
.add_node
.add_node
// Virtual Start/End: connect from Start and into End explicitly
.add_edge
.add_edge
.add_conditional_edge
.add_edge
.add_edge
.compile;
Note: Conditional predicates must return the name of a valid next node or a virtual endpoint. The runtime accepts:
- Custom nodes by name (e.g., "respond", "escalate") that were registered via add_node
- The virtual endpoints "Start" and "End" If a predicate returns an unknown target, the route is skipped and a warning is logged.
Conditional Edges
Use conditional edges to route dynamically based on runtime state. Predicates return target node names (Vec), allowing flexible routing to single or multiple nodes.
Compact example:
use Arc;
use ;
use NodeKind;
let route: EdgePredicate = new;
let app = new
.add_node
.add_node
.add_node
.add_edge
.add_conditional_edge
.add_edge
.add_edge
.compile?;
Troubleshooting:
- If nothing happens after a node with a conditional edge, ensure the predicate returns valid target names matching registered nodes, or the virtual endpoints "Start"/"End".
- For readability, use small helper predicates (EdgePredicate) and unit test them with sample StateSnapshots.
🔧 Examples
The repository includes comprehensive examples demonstrating various patterns and integrations. See weavegraph/examples/README.md for detailed information on running examples, including which ones require specific Cargo features.
# Basic node patterns and message handling
# Advanced patterns: error handling, conditional routing, data transformation
# Error handling and pretty diagnostics
Demo Applications
Historical demo applications showcase evolution of capabilities:
# Basic graph execution patterns (examples/demo1.rs)
# Direct scheduler usage and barrier synchronization (examples/demo2.rs)
# LLM workflows with Ollama integration (examples/demo3.rs)
# Advanced multi-step workflows (examples/demo4.rs)
Note: Demo3 requires Ollama running at http://localhost:11434 with models like gemma3. Use the provided docker-compose.yml to set up Ollama:
🏗️ Architecture
Weavegraph is built around several core modules:
- [
message] - Type-safe message construction and role-based messaging - [
state] - Versioned state management with channel isolation - [
node] - Node execution primitives and async trait definitions - [
graph] - Workflow graph definition and conditional routing - [
schedulers] - Concurrent execution with dependency resolution - [
runtimes] - High-level execution runtime and checkpointing - [
channels] - Channel-based state storage and versioning - [
reducers] - State merge strategies and conflict resolution - [
event_bus] - Event streaming and observability infrastructure
🔍 Observability & Debugging
Tracing
Rich tracing integration with configurable log levels:
# Debug level for weavegraph modules
RUST_LOG=debug
# Error level globally, debug for weavegraph
RUST_LOG=error,weavegraph=debug
Event Streaming ⭐
Weavegraph provides multiple patterns for streaming workflow events with JSON serialization support.
Event Sinks
Built-in sinks for different use cases:
StdOutSink- Human-readable console outputMemorySink- In-memory capture for testingChannelSink- Async streaming to channelsJsonLinesSink- Machine-readable JSON Lines format for log aggregation
Events can be serialized to JSON using event.to_json_value(), event.to_json_string(), or event.to_json_pretty().
Simple Pattern (CLI Tools & Scripts)
Use convenience methods for single-execution scenarios:
// Pattern 1: Single channel (simplest)
let = app.invoke_with_channel.await;
// Collect events while processing
spawn;
// Pattern 2: Multiple sinks
use ;
app.invoke_with_sinks.await?;
See cargo run --example convenience_streaming for complete examples.
Web Servers (SSE/WebSockets)
Use App::invoke_streaming to run a workflow while streaming events to clients:
use Arc;
use ;
use StreamExt;
use ;
use STREAM_END_SCOPE;
let initial = new_with_user_message;
let = app.invoke_streaming.await;
let invocation = new;
let sse_stream = stream! ;
let response = new;
spawn;
response
See cargo run --example demo7_axum_sse for an Axum SSE demo that requires no direct interaction with AppRunner. The event stream closes once the sentinel diagnostic with scope STREAM_END_SCOPE is received.
Pre-Invocation Event Subscription
Need to add sinks or instrumentation before the workflow starts? Call App::event_stream to obtain an AppEventStream handle, configure the bus, and then drive execution with AppRunner::with_options_and_bus:
use StreamExt;
use ;
use ;
let mut handle = app.event_stream;
handle.event_bus.add_sink;
let = handle.split;
let mut runner = with_options_and_bus.await;
spawn;
The helper constructors described in the event bus migration guide (RuntimeConfig::with_event_bus, EventBusConfig::with_memory_sink, etc.) pair nicely with this approach when you want to declare sinks and buffer capacities up front.
Production Pattern (Web Servers)
For per-request isolation with SSE/WebSocket:
use ;
use AppRunner;
// Per-request EventBus with isolated channel
let = unbounded;
let bus = with_sinks;
let mut runner = with_options_and_bus.await;
// Stream events to client while workflow runs
spawn;
// Events include node starts/completions, state changes, errors
See cargo run --example streaming_events and STREAMING_QUICKSTART.md for full details, including how to use EventStream::next_timeout for slow polling and how to tune EventBusConfig::buffer_capacity.
For projects upgrading from earlier releases, consult docs/event_bus_migration.md for a checklist of API changes.
Testing Pattern
Use MemorySink for synchronous event capture in tests:
use ;
let sink = new;
let event_bus = with_sink;
let runner = with_bus;
// After execution
let events = sink.snapshot;
assert_eq!;
Sink Diagnostics
Monitor event sink health and failures without disrupting the main event stream:
use EventBus;
let bus = with_sinks;
// Subscribe to sink diagnostics (optional)
let mut diags = bus.diagnostics;
spawn;
// Query health snapshot at any time
let health = bus.sink_health;
for entry in health
No changes needed for existing code—diagnostics are opt-in. See STREAMING_QUICKSTART.md for configuration options and advanced patterns.
Error Diagnostics
Beautiful error reporting with context and suggestions:
// Automatic error context and pretty printing
💾 Persistence
SQLite Checkpointing
Automatic state persistence with configurable database location:
use SqliteCheckpointer;
let checkpointer = new.await?;
let runner = with_checkpointer;
Database URL resolution order:
WEAVEGRAPH_SQLITE_URLenvironment variable- Explicit URL in code
SQLITE_DB_NAMEenvironment variable (filename only)- Default:
sqlite://weavegraph.db
In-Memory Mode
For testing and ephemeral workflows:
let runner = new; // Uses in-memory state
🧪 Testing
Run the comprehensive test suite:
# All tests with output
# Specific test categories
Property-based testing with proptest ensures correctness across edge cases.
Overview mermain flowchart of the app
flowchart TB
subgraph Client
user[Client App or UI]
end
subgraph Build
gb[GraphBuilder]
end
subgraph Graph
cg[CompiledGraph]
end
subgraph Runtime
app[App]
sched[Scheduler]
router[Router Edges and Commands]
barrier[Barrier Applier]
end
subgraph Nodes
usernode[User Nodes]
llmnode[LLM Node]
toolnode[Tool Node]
end
subgraph State
vstate[Versioned State]
snap[State Snapshot]
end
subgraph Reducers
redreg[Reducer Registry]
end
subgraph Checkpoint
cpif[Checkpointer]
end
subgraph Rig
rigad[Rig Adapter]
llmprov[LLM Provider]
end
subgraph Tools
toolreg[Tool Registry]
exttools[External Tools]
end
subgraph Stream
stream[Stream Controller]
end
subgraph Viz
viz[Visualizer]
end
user --> gb
gb --> cg
user --> app
cg --> app
app --> sched
sched --> snap
vstate --> snap
sched --> usernode
sched --> llmnode
sched --> toolnode
usernode --> barrier
llmnode --> barrier
toolnode --> barrier
redreg --> barrier
barrier --> vstate
snap --> router
app --> router
router --> sched
llmnode --> rigad
rigad --> llmprov
llmprov --> rigad
rigad --> llmnode
toolnode --> toolreg
toolnode --> exttools
exttools --> toolnode
barrier --> cpif
app --> stream
stream --> user
cg --> viz
🚀 Production Considerations
Performance
- Bounded concurrency prevents resource exhaustion
- Snapshot isolation eliminates state races
- Channel-based architecture enables efficient partial updates
- SQLite checkpointing supports failure recovery
Monitoring
- Structured event streaming for observability platforms
- Rich tracing spans for distributed tracing
- Error aggregation and pretty diagnostics
- Custom event sinks for metrics collection
Deployment
- Docker-ready with provided compose configuration
- Environment-based configuration
- Graceful shutdown handling
- Migration support for schema evolution
🎓 Project Background
Weavegraph originated as a capstone project for a Rust online course, developed by contributors with Python backgrounds and experience with LangGraph and LangChain. The goal was to bring similar graph-based workflow capabilities to Rust while leveraging its performance, safety, and concurrency advantages.
While rooted in educational exploration, Weavegraph continues active development well beyond the classroom setting. The core architecture is solid and the framework is functional, but as an early beta release (v0.1.x), it's still maturing—use with awareness of ongoing API evolution.
🤝 Contributing
We welcome contributions! Please see our contributing guidelines for details.
Areas we're particularly interested in:
- Additional persistence backends (PostgreSQL, Redis)
- Enhanced AI/LLM integration patterns
- Performance optimizations
- Documentation improvements
- Example applications
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.