weavegraph 0.3.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
# Operations Guide

Runtime operations, observability, persistence, testing, and production deployment.

## Event Streaming & Observability {#event-streaming}

Weavegraph provides multiple patterns for streaming workflow events with JSON serialization support.

### Event Sinks

Built-in sinks for different use cases:
- **StdOutSink**: Human-readable console output
- **MemorySink**: In-memory capture for testing
- **ChannelSink**: Async streaming to channels
- **JsonLinesSink**: Machine-readable JSON Lines format

Events can be serialized to JSON using `event.to_json_value()`, `event.to_json_string()`, or `event.to_json_pretty()`.

### Simple Pattern (CLI Tools)

```rust
let (result, events) = app.invoke_with_channel(initial_state).await;

// Collect events while processing
tokio::spawn(async move {
    while let Ok(event) = events.recv_async().await {
        println!("Event: {:?}", event);
    }
});
```

### Multiple Sinks

```rust
use weavegraph::event_bus::{StdOutSink, ChannelSink};

app.invoke_with_sinks(
    initial_state,
    vec![
        Box::new(StdOutSink::default()),
        Box::new(ChannelSink::new(tx)),
    ]
).await?;
```

### Web Servers (SSE/WebSockets)

Use `App::invoke_streaming` to run a workflow while streaming events to clients. See [STREAMING.md](STREAMING.md) for full details.

### Sink Diagnostics

Monitor event sink health and failures:

```rust
use weavegraph::event_bus::EventBus;

let bus = EventBus::with_sinks(vec![/* your sinks */]);

// Subscribe to diagnostics (optional)
let mut diags = bus.diagnostics();
tokio::spawn(async move {
    while let Ok(diagnostic) = diags.recv().await {
        eprintln!("Sink '{}' error: {}", diagnostic.sink, diagnostic.error);
    }
});

// Query health snapshot
let health = bus.sink_health();
for entry in health {
    println!("{}: {} errors", entry.sink, entry.error_count);
}
```

### Tracing

Rich tracing integration with configurable log levels:

```bash
# Debug level for weavegraph modules
RUST_LOG=debug cargo run --example basic_nodes

# Error level globally, debug for weavegraph
RUST_LOG=error,weavegraph=debug cargo run --example advanced_patterns
```

## Persistence {#persistence}

Weavegraph supports SQLite and PostgreSQL checkpointing, as well as in-memory state for workflows.

### SQLite Checkpointing

Automatic state persistence with configurable database location:

```rust
use weavegraph::runtimes::{AppRunner, CheckpointerType};

// Using the builder pattern (recommended)
let runner = AppRunner::builder()
    .app(app)
    .checkpointer(CheckpointerType::SQLite)
    .build()
    .await;
```

**SQLite URL resolution order (when `CheckpointerType::SQLite` is selected):**
1. `WEAVEGRAPH_SQLITE_URL` environment variable (full URL)
2. `SQLITE_DB_NAME` environment variable (filename only; used as `sqlite://{name}`)
3. Default: `sqlite://weavegraph.db`

Tip: `RuntimeConfig` also loads `.env` automatically (via `dotenvy`) for local dev.

### PostgreSQL Checkpointing

For production deployments requiring horizontal scaling or shared state:

```rust
use weavegraph::runtimes::{AppRunner, CheckpointerType};

// Using the builder pattern
let runner = AppRunner::builder()
    .app(app)
    .checkpointer(CheckpointerType::Postgres)
    .build()
    .await;
```

**Database URL resolution order:**
1. `WEAVEGRAPH_POSTGRES_URL` environment variable
2. `DATABASE_URL` environment variable (common convention)
3. Fallback: `postgresql://localhost/weavegraph`

**PostgreSQL vs SQLite:**

| Aspect | SQLite | PostgreSQL |
|--------|--------|------------|
| **Deployment** | Single-file, embedded | Server-based |
| **Concurrency** | Single-writer | Multi-writer |
| **Scaling** | Single instance | Horizontal scaling |
| **Best for** | Development, single-instance | Production, distributed |

**Migrations:** PostgreSQL migrations are in `migrations/postgres/`. Run with:

```bash
# Using sqlx-cli
sqlx migrate run --source migrations/postgres
```

### In-Memory Mode

For testing and ephemeral workflows:

```rust
let runner = AppRunner::builder()
    .app(app)
    .checkpointer(CheckpointerType::InMemory)
    .build()
    .await?;
```

### Storage Management

**InMemoryCheckpointer** stores only the latest checkpoint per session (automatic retention). No storage management needed.

**SQLiteCheckpointer** stores complete step history for audit trails and debugging. Storage grows with `(sessions × steps_per_session × state_size)`.

For long-running applications, implement periodic cleanup:

**Option 1: Direct SQL maintenance (recommended)**

```bash
# Delete checkpoints older than 30 days
sqlite3 workflow.db "DELETE FROM steps WHERE created_at < datetime('now', '-30 days')"

# Keep only latest 100 steps per session
sqlite3 workflow.db "
  DELETE FROM steps 
  WHERE step NOT IN (
    SELECT step FROM steps 
    WHERE session_id = steps.session_id 
    ORDER BY step DESC 
    LIMIT 100
  )
"

# Vacuum to reclaim space
sqlite3 workflow.db "VACUUM"
```

**Option 2: Application-level session management**

Delete entire sessions when workflows complete:

```rust
// Using sqlx directly
sqlx::query("DELETE FROM sessions WHERE id = ?")
    .bind(&session_id)
    .execute(&pool)
    .await?;
// Cascading delete removes all associated steps
```

**Storage monitoring:**

```bash
# Check database size
ls -lh workflow.db

# Count checkpoints per session
sqlite3 workflow.db "
  SELECT session_id, COUNT(*) as checkpoint_count 
  FROM steps 
  GROUP BY session_id
"
```

## Testing {#testing}

Weavegraph supports comprehensive testing, including property-based tests and event capture.

### Running Tests

```bash
# All tests with output
cargo test --all -- --nocapture

# Specific test categories
cargo test schedulers:: -- --nocapture
cargo test channels:: -- --nocapture
cargo test integration:: -- --nocapture
```

### Event Capture in Tests

Use `MemorySink` for synchronous event capture:

```rust
use weavegraph::event_bus::{EventBus, MemorySink};
use weavegraph::runtimes::{AppRunner, CheckpointerType};
use weavegraph::state::VersionedState;

# async fn example(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let sink = MemorySink::new();
let event_bus = EventBus::with_sink(sink.clone());

let mut runner = AppRunner::builder()
    .app(app)
    .checkpointer(CheckpointerType::InMemory)
    .event_bus(event_bus)
    .autosave(false)
    .start_listener(true)
    .build()
    .await;

let session_id = "test-session".to_string();
runner
    .create_session(session_id.clone(), VersionedState::new_with_user_message("Hi"))
    .await?;
runner.run_until_complete(&session_id).await?;

let events = sink.snapshot();
assert!(!events.is_empty());
# Ok(())
# }
```

### Property-Based Testing

Weavegraph uses `proptest` to ensure correctness across edge cases. See the test suite for examples of property-based validation of schedulers, channels, and state management.

## Error Handling {#errors}

Weavegraph provides structured, matchable error enums via `thiserror`.
Rich diagnostic metadata is available behind the optional `diagnostics` feature.

### Basic Usage

```rust
fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Your workflow code here
    Ok(())
}
```

### Handling Graph Compilation Errors

```rust
use weavegraph::graphs::{GraphBuilder, GraphCompileError};
use weavegraph::types::NodeKind;

fn build_app() -> Result<weavegraph::app::App, weavegraph::graphs::GraphCompileError> {
    match GraphBuilder::new()
        .add_edge(NodeKind::Start, NodeKind::Custom("A".into()))
        .add_edge(NodeKind::Custom("A".into()), NodeKind::End)
        .compile()
    {
        Ok(app) => Ok(app),
        Err(GraphCompileError::MissingEntry) => Err(GraphCompileError::MissingEntry),
        Err(GraphCompileError::UnknownNode(nk)) => Err(GraphCompileError::UnknownNode(nk)),
        Err(e) => Err(e),
    }
}
```

**Features:**
- Match on error variants for custom handling
- Lightweight core error model for library consumers
- Optional diagnostic metadata (`--features diagnostics`) for richer terminal reporting

See `examples/errors_pretty.rs` for comprehensive error handling patterns.

## Production Considerations {#production}

### Performance

- **Bounded concurrency** prevents resource exhaustion
- **Snapshot isolation** eliminates state races
- **Channel-based architecture** enables efficient partial updates
- **SQLite checkpointing** supports failure recovery

### Monitoring

- **Structured event streaming** for observability platforms
- **Rich tracing spans** for distributed tracing
- **Error aggregation** and pretty diagnostics
- **Custom event sinks** for metrics collection

### Deployment

- **Docker-ready** with provided `docker-compose.yml`
- **Environment-based configuration** for flexible deployment
- **Graceful shutdown handling** for clean termination
- **Migration support** for schema evolution

### Production Patterns

For web servers with per-request isolation:

```rust
use weavegraph::event_bus::{EventBus, ChannelSink};
use weavegraph::runtimes::{AppRunner, CheckpointerType};

// Per-request EventBus with isolated channel
let (tx, rx) = flume::unbounded();
let bus = EventBus::with_sinks(vec![Box::new(ChannelSink::new(tx))]);

let mut runner = AppRunner::builder()
    .app(app.clone())
    .checkpointer(CheckpointerType::InMemory)
    .event_bus(bus)
    .autosave(true)
    .start_listener(true)
    .build()
    .await;
```

See also: [Quickstart](QUICKSTART.md), [Architecture](ARCHITECTURE.md)