echo_agent 0.1.4

Production-grade AI Agent framework for Rust — ReAct engine, multi-agent, memory, streaming, MCP, IM channels, workflows
Documentation
# Graph Workflow — LangGraph-style Agent Orchestration

## What It Is

Graph Workflow models Agent execution as a **directed graph with shared state**, supporting:
- Linear pipelines
- Conditional branching
- Loops
- Parallel fan-out/fan-in

This is echo-agent's implementation of the LangGraph pattern.

---

## Core Concepts

```
┌─────────────────────────────────────────────────────────────────────┐
│                    Graph Workflow Structure                          │
│                                                                      │
│   SharedState: KV store shared across nodes + structured message    │
│                                                                      │
│   ┌─────────┐     ┌─────────┐     ┌─────────┐                      │
│   │ Node A  │────▶│ Node B  │────▶│ Node C  │                      │
│   │ (Agent) │     │(Function)│    │ (Agent) │                      │
│   └─────────┘     └─────────┘     └─────────┘                      │
│        │                                                │            │
│        │           ┌─────────┐                         │            │
│        └──────────▶│ Node D  │◀────────────────────────┘            │
│                    │(Condition)│    (conditional edge)               │
│                    └─────────┘                                      │
│                         │                                           │
│                    ┌────┴────┐                                     │
│                    ▼         ▼                                     │
│               ┌────────┐ ┌────────┐                                │
│               │Node E  │ │Node F  │  (parallel fan-out)            │
│               └────────┘ └────────┘                                │
│                    │         │                                     │
│                    └────┬────┘                                     │
│                         ▼                                           │
│                    ┌────────┐                                       │
│                    │  END   │  (fan-in)                            │
│                    └────────┘                                       │
└─────────────────────────────────────────────────────────────────────┘
```

---

## SharedState

State shared across all nodes:

```rust
use echo_agent::workflow::SharedState;

let state = SharedState::new();
state.set("input", "Hello");
state.set("count", 42);

let input: String = state.get("input").unwrap_or_default();
let count: i32 = state.get("count").unwrap_or(0);

// Access messages
for msg in state.messages() {
    println!("[{}] {}", msg.role, msg.content);
}
```

---

## GraphBuilder DSL

```rust
use echo_agent::workflow::{GraphBuilder, SharedState, shared_agent};

// Create agents
let researcher = shared_agent(
    ReactAgentBuilder::simple("qwen3-max", "Researcher")?
);
let writer = shared_agent(
    ReactAgentBuilder::simple("qwen3-max", "Writer")?
);

let graph = GraphBuilder::new("research_pipeline")
    // Agent nodes with input/output key mapping
    .add_agent_node("researcher", researcher.clone())
        .input_key("task")
        .output_key("research")
    .add_agent_node("writer", writer.clone())
        .input_key("research")
        .output_key("result")
    // Function nodes for data transformation
    .add_function_node("format", |state| Box::pin(async move {
        let result: String = state.get("result").unwrap_or_default();
        state.set("final", format!("### Report\n\n{}", result));
        Ok(())
    }))
    // Graph structure
    .set_entry("researcher")
    .add_edge("researcher", "writer")
    .add_edge("writer", "format")
    .set_finish("format")
    .build()?;

// Execute
let state = SharedState::new();
state.set("task", "Research Rust async patterns");
let result = graph.run(state).await?;
println!("{}", result.state.get::<String>("final").unwrap_or_default());
```

---

## Conditional Edges

```rust
let graph = GraphBuilder::new("conditional_flow")
    .add_agent_node("analyzer", analyzer)
    .add_agent_node("detailed", detailed_agent)
    .add_agent_node("summary", summary_agent)
    // Conditional edge based on state
    .add_conditional_edge("analyzer", |state| {
        let complexity: f64 = state.get("complexity").unwrap_or(0.5);
        if complexity > 0.7 { "detailed" } else { "summary" }
    })
    .add_edge("detailed", "__end__")
    .add_edge("summary", "__end__")
    .set_entry("analyzer")
    .set_finish("__end__")
    .build()?;
```

---

## Parallel Fan-out/Fan-in

```rust
let graph = GraphBuilder::new("parallel_processing")
    .add_function_node("input", |state| Box::pin(async move {
        state.set("data", vec!["a", "b", "c"]);
        Ok(())
    }))
    .add_function_node("process_a", |state| Box::pin(async move {
        let data: Vec<String> = state.get("data").unwrap_or_default();
        state.set("result_a", format!("A: {:?}", data));
        Ok(())
    }))
    .add_function_node("process_b", |state| Box::pin(async move {
        let data: Vec<String> = state.get("data").unwrap_or_default();
        state.set("result_b", format!("B: {:?}", data));
        Ok(())
    }))
    .add_function_node("merge", |state| Box::pin(async move {
        let a: String = state.get("result_a").unwrap_or_default();
        let b: String = state.get("result_b").unwrap_or_default();
        state.set("final", format!("{}\n{}", a, b));
        Ok(())
    }))
    // Fan-out: input → process_a, process_b (parallel)
    .set_entry("input")
    .add_edge("input", "process_a")
    .add_edge("input", "process_b")
    // Fan-in: process_a, process_b → merge
    .add_edge("process_a", "merge")
    .add_edge("process_b", "merge")
    .set_finish("merge")
    .build()?;
```

---

## Streaming Events

```rust
let mut stream = graph.run_stream(state).await?;

while let Some(event) = stream.next().await {
    match event? {
        WorkflowEvent::NodeStart { node_name, step_index } => {
            println!("▶ Starting node: {} (step {})", node_name, step_index);
        }
        WorkflowEvent::NodeEnd { node_name, step_index, elapsed } => {
            println!("✓ Completed: {} in {:?}", node_name, elapsed);
        }
        WorkflowEvent::Token { node_name, token } => {
            print!("{}", token);  // Stream tokens from agent nodes
        }
        WorkflowEvent::NodeError { node_name, error } => {
            eprintln!("✗ Error in {}: {}", node_name, error);
        }
        WorkflowEvent::Completed { result, total_steps, elapsed } => {
            println!("\n=== Workflow completed in {:?} ===", elapsed);
            println!("Result: {}", result);
        }
    }
}
```

---

## Declarative YAML Workflow

Define workflows without writing Rust code:

```yaml
# workflow.yaml
name: content_pipeline
nodes:
  - name: researcher
    type: agent
    model: qwen3-max
    system_prompt: "You are a research assistant"
    input_key: task
    output_key: research
    
  - name: writer
    type: agent
    model: qwen3-max
    system_prompt: "You are a writer"
    input_key: research
    output_key: draft
    
  - name: reviewer
    type: agent
    model: qwen3-max
    system_prompt: "You are an editor"
    input_key: draft
    output_key: final

edges:
  - from: researcher
    to: writer
  - from: writer
    to: reviewer

entry: researcher
finish: [reviewer]
```

```rust
use echo_agent::workflow::Graph;

let graph = Graph::from_yaml("workflow.yaml")?;
let state = SharedState::new();
state.set("task", "Write about Rust ownership");
let result = graph.run(state).await?;
```

---

## Workflow Types

| Type | Description | Use Case |
|------|-------------|----------|
| `Graph` | LangGraph-style with conditional edges, loops | Complex multi-agent orchestration |
| `SequentialWorkflow` | Simple pipeline, step N output → step N+1 input | ETL pipelines |
| `ConcurrentWorkflow` | All agents run in parallel, results merged | Parallel analysis |
| `DagWorkflow` | Topological scheduling, independent nodes parallel | DAG tasks |

### SequentialWorkflow

```rust
use echo_agent::workflow::{SequentialWorkflow, SequentialWorkflowBuilder, WorkflowStep};

let workflow = SequentialWorkflowBuilder::new()
    .add_step(WorkflowStep::agent("step1", agent1))
    .add_step(WorkflowStep::agent("step2", agent2))
    .add_step(WorkflowStep::function("format", |input| Box::pin(async move {
        Ok(format!("Result: {}", input))
    })))
    .build()?;

let result = workflow.run("Initial input").await?;
```

### ConcurrentWorkflow

```rust
use echo_agent::workflow::{ConcurrentWorkflow, ConcurrentWorkflowBuilder};

let workflow = ConcurrentWorkflowBuilder::new()
    .add_agent("analyzer1", agent1)
    .add_agent("analyzer2", agent2)
    .add_agent("analyzer3", agent3)
    .build()?;

let result = workflow.run("Analyze this data").await?;
// All agents run in parallel, results merged
```

---

## Checkpoint & Resume

```rust
use echo_agent::workflow::{Graph, MemoryCheckpointStore, CheckpointStore};

let checkpoint_store = MemoryCheckpointStore::new();

// Run with checkpointing
let graph = GraphBuilder::new("long_running")
    // ... node definitions
    .build()?;

let result = graph.run_with_checkpoints(state.clone(), &checkpoint_store).await?;

// Resume from checkpoint if interrupted
let checkpoint_id = checkpoint_store.latest()?.id;
let resumed = graph.resume_from_checkpoint(&checkpoint_store, &checkpoint_id).await?;
```

---

## Best Practices

1. **Keep nodes focused**: Each node should do one thing well
2. **Use SharedState keys consistently**: Document input/output keys
3. **Handle errors in function nodes**: Return `Err()` to stop workflow
4. **Use streaming for long-running workflows**: Provide real-time feedback
5. **Design for resumability**: Use checkpoints for critical workflows

---

## When to Use

| Scenario | Suitability | Reason |
|----------|-------------|--------|
| Multi-agent pipelines | ★★★★★ | Natural fit |
| Conditional flows | ★★★★★ | Conditional edges |
| Parallel processing | ★★★★★ | Fan-out/fan-in |
| Simple single-agent | ★☆☆☆☆ | Overkill |

See: `examples/demo39_workflow.rs`, `examples/demo34_workflow_stream.rs`, `examples/demo37_declarative_workflow.rs`