weavegraph 0.3.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# Architecture Overview

Comprehensive technical documentation for Weavegraph's internal design and module organization.

**Related Documentation:**
- [Quickstart]QUICKSTART.md - Core concepts, messages, state, and graphs
- [Operations Guide]OPERATIONS.md - Event streaming, persistence, testing, and production
- [Documentation Index]INDEX.md - Complete reference with anchor links

## πŸŽ“ Project Background

Weavegraph originated as a capstone project for a Rust online course, developed by contributors with Python/TypeScript backgrounds and experience with LangGraph and LangChain. The goal was to bring similar graph-based workflow capabilities to Rust while leveraging its performance, safety, and concurrency advantages.

While rooted in educational exploration, Weavegraph continues active development well beyond the classroom setting. The core architecture is solid and the framework is functional, but as an early beta release (v0.2.x), it's still maturing; use with awareness of ongoing API evolution.


| Crate | Purpose | Highlights |
| ----- | ------- | ---------- |
| `weavegraph` | Executes concurrent, stateful graphs with structured observability. | Graph builder + runtime, event bus, checkpointing, reducers, scheduler. |
| `wg-ragsmith` | Provides ingestion, semantic chunking, and storage utilities for RAG workloads. | HTML/JSON parsers, semantic chunkers, SQLite vector store helpers. |


## Overview flowchart of the app (mermaid)

```mermaid
flowchart TB

subgraph Client
  user[Client App or UI]
end

subgraph Build
  gb[GraphBuilder]
end

subgraph Runtime
  app[App]
  sched[Scheduler]
  router[Router: Edges and Commands]
  barrier[Barrier Applier]
end

subgraph Nodes
  usernode[Custom User Nodes]
  llmnode[LLM Node]
  toolnode[Tool Node]
end

subgraph State
  vstate[Versioned State]
  snap[State Snapshot]
end

subgraph Reducers
  redreg[Reducer Registry]
end

subgraph Checkpoint
  cpif[Checkpointer: SQLite/InMemory]
end

subgraph EventBus
  eventbus[Event Bus with Sinks]
end

subgraph Rig
  rigad[Rig Adapter]
  llmprov[LLM Provider: Ollama/MCP]
end

subgraph Tools
  toolreg[Tool Registry]
  exttools[External Tools]
end

user --> gb
gb -->|compile| app

user -->|invoke/invoke_streaming| app

app --> sched
sched -->|creates| snap
vstate --> snap

sched --> usernode
sched --> llmnode
sched --> toolnode

usernode -->|NodePartial| barrier
llmnode -->|NodePartial| barrier
toolnode -->|NodePartial| barrier
redreg --> barrier
barrier -->|merges updates| vstate

snap --> router
app --> router
router -->|next frontier| sched

llmnode --> rigad
rigad --> llmprov
llmprov --> rigad
rigad --> llmnode

toolnode --> toolreg
toolnode --> exttools
exttools --> toolnode

barrier --> cpif

app --> eventbus
eventbus -->|streams events| user
```

---

## Workspace Topology

```
docs/                     β†’ Architectural plans, production hardening roadmap.
weavegraph/               β†’ Core orchestration crate (library + examples + tests).
wg-ragsmith/              β†’ RAG utilities crate (library + examples + tests).
data/                     β†’ Local development databases (ignored in version control).
external/                 β†’ Vendor snapshots (RAGatouille, raptor) kept outside the workspace.
.github/workflows/        β†’ Continuous integration pipelines.
ARCHITECTURE.md           β†’ This document.
```

The workspace targets Rust 1.89 as the minimum supported version and enables 2024 edition
features across both crates.

---

## `weavegraph` Crate

`weavegraph` implements the runtime that powers concurrent, graph-based workflows. The library
is organised around a handful of core modules:

| Module | Highlights |
| ------ | ---------- |
| `graphs::{builder, edges, compilation}` | `GraphBuilder` DSL for wiring nodes, unconditional and conditional edges, and compiling into a runnable `App`. |
| `app` | High-level faΓ§ade that owns compiled nodes/edges, reducer registry, and runtime config. Provides `invoke`, `invoke_streaming`, and event stream APIs. |
| `runtimes::{runner, checkpointer_*, runtime_config}` | `AppRunner` drives supersteps, coordinates the scheduler, applies barriers, and persists to SQLite (via `sqlx::migrate!`). |
| `schedulers` | Dependency-aware scheduler that fans out runnable nodes and enforces bounded concurrency. |
| `node` | `Node` trait, `NodeContext`, `NodePartial`, and error types used by application code. |
| `state`, `channels`, `reducers` | Versioned state model split across message/extra/error channels with deterministic merge reducers. |
| `event_bus` | Broadcast-based event hub with sinks (stdout, memory, channel, JSON Lines) and streaming helpers for web servers or CLIs. Events support JSON serialization for log aggregation. |
| `telemetry`, `utils` | Tracing helpers, deterministic RNG, clocks, ID generators, and collection utilities. |

### Authoring Nodes & State

Weavegraph applications revolve around three building blocks: nodes, state, and graphs.

> **Note:** `NodeKind::Start` and `NodeKind::End` are virtual structural endpoints.  
> You never register them with `add_node`; attempts to do so are ignored with a warning.  
> Define only your executable (custom) nodes and connect them with edges from `Start` and to `End`.

```rust
use weavegraph::{
    graphs::GraphBuilder,
    message::{Message, Role},
    node::{Node, NodeContext, NodePartial},
    state::VersionedState,
    types::NodeKind,
};
use async_trait::async_trait;

struct GreetingNode;

#[async_trait]
impl Node for GreetingNode {
    async fn run(
        &self,
        _snapshot: weavegraph::state::StateSnapshot,
        ctx: NodeContext,
    ) -> Result<NodePartial, weavegraph::node::NodeError> {
        ctx.emit("greeting", "Saying hi!")?;
        Ok(NodePartial::new().with_messages(vec![Message::with_role(
          Role::Assistant,
          "Hello!",
        )]))
    }
}

let app = GraphBuilder::new()
    .add_node(NodeKind::Custom("greet".into()), GreetingNode)
    .add_edge(NodeKind::Start, NodeKind::Custom("greet".into()))
    .add_edge(NodeKind::Custom("greet".into()), NodeKind::End)
    .compile()?;

let initial = VersionedState::new_with_user_message("Hi?");
let result = app.invoke(initial).await?;
```

**Key practices:**

- Prefer typed roles with `Message::with_role(Role::...)` - see [Messages]QUICKSTART.md#messages
- Build state with `VersionedState::new_with_user_message` or the builder pattern - see [State Management]QUICKSTART.md#state
- Use `NodeContext::emit*` helpers for telemetry instead of writing directly to stdout
- Return structured errors (`NodeError::MissingInput`, `NodeError::Provider`, `NodeError::Other`) or populate `NodePartial::with_errors` for recoverable issues - see [Error Handling]OPERATIONS.md#errors

### Custom Reducers {#custom-reducers}

Weavegraph supports custom reducers for extending or replacing channel update behavior. By default,
three reducers are registered:

- **Message channel**: `AddMessages` – Appends messages to the message list
- **Extra channel**: `MapMerge` – Shallow merges JSON objects in the extra data map
- **Error channel**: `AddErrors` – Appends error events to the error list

To register custom reducers:

```rust
use std::sync::Arc;
use weavegraph::reducers::{Reducer, ReducerRegistry};
use weavegraph::types::ChannelType;

// Define a custom reducer
struct MyCustomReducer;

impl Reducer for MyCustomReducer {
    fn apply(&self, state: &mut VersionedState, update: &NodePartial) {
        // Custom merge logic here
    }
}

// Register during graph building
let app = GraphBuilder::new()
    .add_node(...)
    .with_reducer(ChannelType::Message, Arc::new(MyCustomReducer))
    .compile()?;

// Or replace the entire registry
let custom_registry = ReducerRegistry::new()
    .with_reducer(ChannelType::Message, Arc::new(MyCustomReducer));

let app = GraphBuilder::new()
    .add_node(...)
    .with_reducer_registry(custom_registry)
    .compile()?;
```

Multiple reducers can be registered for the same channel and will be applied in registration order.
This enables middleware-style processing, validation, or transformation of channel updates during
barrier synchronization.

### Execution Flow

1. **Authoring** – Build a graph with `GraphBuilder`, registering nodes (implementations of `Node`)
   and the edges that connect them. Conditional edges can inspect `StateSnapshot` at runtime.
   See [Graph Building]QUICKSTART.md#graphs for details.
2. **Compilation** – `GraphBuilder::compile()` validates topology and produces an `App`.
3. **Invocation** – `App::invoke()` (or streaming variants like `invoke_streaming`, `invoke_with_channel`)
   constructs an `AppRunner` with the chosen checkpointer (`InMemory` or SQLite), and event bus configuration.
   See [Event Streaming]OPERATIONS.md#event-streaming for streaming patterns.
4. **Scheduling** – The scheduler selects runnable nodes, issues `NodeContext`s, and executes
   nodes concurrently. Each node returns a `NodePartial` with channel deltas and optional
   control-flow directives.
5. **Barrier & Reduction** – Reducers merge channel updates deterministically, update the
   versioned state, and hand control back to the scheduler for the next superstep.
   See [Custom Reducers]#custom-reducers above.
6. **Persistence & Observability** – Checkpointer snapshots state into SQLite (when enabled),
   the event bus broadcasts diagnostics / LLM chunk streams, and telemetry surfaces to sinks.
   See [Persistence]OPERATIONS.md#persistence and [Event Streaming]OPERATIONS.md#event-streaming.

### Optional Features

* `rig` – Enables Rig-based LLM support (Ollama/MCP integrations).
* `llm` – Backward-compatible alias to `rig` for 0.3.x (planned removal in 0.4.0).
* `sqlite-migrations` – Turns on SQLite-backed persistence (default).
* `examples` – Pulls in extra dependencies used by a subset of examples (e.g. `reqwest`, `scraper`).

### Tests & Examples

* `weavegraph/tests/` – Covers state channels, reducers, scheduler semantics, checkpointer, and event bus.
  See [Testing]OPERATIONS.md#testing for running tests and patterns.
* `examples/` – Progressive walkthroughs:
  * `basic_nodes.rs`, `graph_execution.rs`, `scheduler_fanout.rs` show core messaging and state channels.
    See [Messages]QUICKSTART.md#messages and [State]QUICKSTART.md#state.
  * `advanced_patterns.rs` covers conditional routing and control-flow helpers.
  * `streaming_events.rs`, `convenience_streaming.rs` demonstrate the
    broadcast event bus and web-friendly streaming patterns.
    See [Event Streaming]OPERATIONS.md#event-streaming.
  * `event_backpressure.rs`, `json_serialization.rs`, `errors_pretty.rs` cover production-facing
    concerns like lag handling, JSON sinks, and pretty diagnostics.

---

### Backpressure and Drop Policy

The event bus uses a bounded broadcast channel (default capacity: 1024 events per subscriber).
When a subscriber falls behind faster producers, the following semantics apply:

- Slow subscribers receive a lag notice and skip older events (no blocking of producers)
- Missed events are counted and exposed via sink diagnostics
- A WARN log entry is emitted with the number of dropped events and the running total
- Streams continue from the most recent position for graceful degradation under load

To adjust capacity, configure the event bus when using `App::invoke_streaming` or construct
an `EventBus` directly with custom capacity via `EventBus::with_capacity`.

For practical guidance and code samples, see:
- [Event Streaming]OPERATIONS.md#event-streaming for patterns and sink configuration
- `docs/STREAMING.md` for detailed tuning guidance

## `wg-ragsmith` Crate

`wg-ragsmith` contains the ingestion and vector-store tooling used by RAG pipelines. It can be
used standalone or pulled into `weavegraph` via the `examples` feature.

| Module | Highlights |
| ------ | ---------- |
| `ingestion::{cache, chunk, resume}` | Disk-backed document cache, chunk-to-ingestion conversion, and resumable pipeline tracking. |
| `semantic_chunking::{html, json, segmenter, embeddings, service}` | HTML/JSON preprocessors, statistical breakpoint strategies, mock/real embedding providers, and the async chunking service. |
| `stores::sqlite` | `SqliteChunkStore` built on `rig-sqlite` + `sqlite-vec`, including schema, vec3 registration, and helper methods to upsert/search chunks. |
| `types` | `RagError` and supporting data structures for ingestion/persistence. |

### Examples

* `examples/rust_book_pipeline.rs` – Async ingestion pipeline that scrapes the Rust book,
  chunks and embeds sections, and writes them into SQLite.
* `examples/query_chunks.rs` & `query_db.sh` – Smoke tests showing how to query stored chunks.

These examples share environment variables with the weavegraph RAG demo (see `.env.example`).

### Feature Flags

* `semantic-chunking-tiktoken` (default) – OpenAI tiktoken tokeniser.
* `semantic-chunking-rust-bert` – Enables Rust-BERT based embedding pipeline.
* `semantic-chunking-segtok` – Alternative segmentation strategy.

---

## Shared Operational Pieces

* **Tooling** – Standard Rust tooling (`cargo fmt`, `cargo clippy`, `cargo test`,
  `cargo +nightly doc`, `cargo deny`, `cargo machete`) plus `sqlx` migrations keep local
  workflows and CI aligned.
* **CI/CD** – `.github/workflows/ci.yml` runs required checks on `1.90.0`, uses current
  stable as a canary lane, and validates docs on `nightly` with
  `RUSTDOCFLAGS="--cfg docsrs -D warnings"`.
* **Migrations** – `weavegraph/migrations` houses the `sqlx` migration set for the SQLite
  checkpointer. Use `sqlx migrate` to apply or rollback changes.
* **Docs** – `docs/` captures forward-looking design documents (event bus refactor,
  control-flow commands, hybrid RAG pipeline) and the production readiness plan. Use
  this architecture document as the entry point.

---
## petgraph Comparison

Weavegraph's graph implementation was designed with workflow execution in mind, making different
tradeoffs than the general-purpose [petgraph](https://github.com/petgraph/petgraph) crate.
This section documents the architectural differences and integration opportunities.

### Architecture Comparison

| Aspect | Weavegraph | petgraph |
|--------|-----------|----------|
| **Primary Use Case** | Workflow orchestration with async node execution | General graph algorithms and data structures |
| **Graph Type** | Custom `FxHashMap<NodeKind, Vec<NodeKind>>` adjacency | `Graph`, `StableGraph`, `GraphMap`, `MatrixGraph` |
| **Node Identity** | `NodeKind` enum (Start/End/Custom) | `NodeIndex` (u32 handle) |
| **Node Data** | Nodes carry `Arc<dyn Node>` trait objects | Generic node weight type `N` |
| **Edge Storage** | HashMap adjacency list with conditional predicates | Compact edge list with indices |
| **Edge Data** | Unconditional or `EdgePredicate` closures | Generic edge weight type `E` |
| **Cycle Detection** | Custom DFS with three-color marking | `petgraph::algo::is_cyclic_directed` |
| **Reachability** | Custom BFS from Start | `petgraph::algo::has_path_connecting` |
| **Algorithms** | Validation-focused (cycles, reachability, deadends) | Rich library (Dijkstra, MST, SCC, isomorphism, max flow) |
| **Async Support** | First-class (nodes are async) | None (pure data structure) |
| **Serialization** | Custom JSON via serde | `serde-1` feature |

### Key Differences Explained

**Why Weavegraph uses a custom graph:**

1. **Domain-Specific Semantics** β€” `NodeKind::Start` and `NodeKind::End` are virtual structural
   endpoints that cannot be registered as executable nodes. This enables clear workflow boundaries
   without special-casing in user code.

2. **Conditional Edges** β€” petgraph edges are static data. Weavegraph edges can be runtime
   predicates (`EdgePredicate`) that inspect state to determine routing. This is fundamental to
   agent decision-making workflows.

3. **Execution Context** β€” Nodes aren't just data; they're async executables with access to
   `NodeContext` for event emission and metadata. petgraph's node weights are passive data.

4. **Validation Errors** β€” Compilation produces domain-specific errors like `UnknownNode`,
   `CycleDetected { path }`, `UnreachableFromStart`, and `DeadendNode` with actionable context.

**petgraph advantages:**

1. **Battle-tested** β€” 3.7k+ GitHub stars, 144 contributors, extensive production usage
2. **Memory-efficient** β€” Compact edge storage, cache-friendly node indices
3. **Algorithm library** β€” Dijkstra, topological sort, strongly connected components, etc.
4. **Index stability** β€” `StableGraph` maintains valid indices through mutations

### Integration Approach

Weavegraph takes a **selective adoption** approach rather than replacing its core graph:

```rust
// Feature-gated behind `petgraph-compat`
#[cfg(feature = "petgraph-compat")]
impl From<&CompiledGraph> for petgraph::Graph<NodeKind, ()> {
    fn from(graph: &CompiledGraph) -> Self {
        // Convert for visualization or analysis
    }
}
```

**Current integrations:**

- **Graph iteration API** β€” `Graph::nodes()` and `Graph::edges()` iterators mirror petgraph idioms
- **Topological sort** β€” `Graph::topological_sort()` for deterministic node ordering
- **DOT export** β€” Optional petgraph-based visualization via `dot` format

**Future opportunities:**

- **Advanced routing** β€” Use petgraph's shortest path for "fastest path to End" analysis
- **Cycle detection fallback** β€” Validate against petgraph's implementation
- **Graph visualization** β€” Generate DOT/GraphViz output for debugging

### When to Use petgraph Directly

Use petgraph when you need:
- Pure graph algorithms without execution semantics
- Memory-optimal large graph storage
- Pre-built algorithms (MST, max flow, isomorphism)
- Static graph analysis tooling

Use Weavegraph when you need:
- Async node execution with state management
- Conditional runtime routing based on state
- Event streaming and observability
- Checkpoint/resume workflow persistence
- LLM agent orchestration patterns

### Code Example: Hybrid Usage

```rust
use weavegraph::graphs::GraphBuilder;
use weavegraph::types::NodeKind;

// Build workflow with Weavegraph
let builder = GraphBuilder::new()
    .add_node(NodeKind::Custom("analyze".into()), AnalyzeNode)
    .add_node(NodeKind::Custom("summarize".into()), SummarizeNode)
    .add_edge(NodeKind::Start, NodeKind::Custom("analyze".into()))
    .add_edge(NodeKind::Custom("analyze".into()), NodeKind::Custom("summarize".into()))
  .add_edge(NodeKind::Custom("summarize".into()), NodeKind::End);

// Convert to petgraph for analysis (feature-gated)
#[cfg(feature = "petgraph-compat")]
{
    use weavegraph::graphs::PetgraphConversion;
    let pg = builder.to_petgraph();

    // Use petgraph algorithms
    let topo_order = petgraph::algo::toposort(&pg.graph, None)?;
    let dot = petgraph::dot::Dot::new(&pg.graph);
    println!("DOT output:\n{:?}", dot);
}

// Execute with Weavegraph
let app = builder.compile()?;
let result = app.invoke(initial_state).await?;
```