echo_orchestration/workflow/mod.rs
1//! Graph workflow engine + general Workflow / Pipeline
2//!
3//! Provides two orchestration capabilities:
4//!
5//! ## 1. Graph Workflow (analogous to LangGraph)
6//!
7//! Models agent execution as a **directed graph + shared state**, supporting:
8//! - Linear pipelines, conditional branching, cycles, parallel fan-out/fan-in
9//!
10//! | Concept | Type | Description |
11//! |------|------|------|
12//! | State | [`SharedState`] | KV store shared between nodes + structured message history |
13//! | Graph | [`Graph`] | Compiled immutable workflow |
14//! | Builder | [`GraphBuilder`] | Chain API for building graphs |
15//!
16//! ## 2. Pipeline Workflow (Sequential / Concurrent / DAG)
17//!
18//! | Type | Description |
19//! |------|------|
20//! | [`SequentialWorkflow`] | Sequential pipeline: previous step output becomes next step input |
21//! | [`ConcurrentWorkflow`] | Concurrent pipeline: all agents execute in parallel, then merge results |
22//! | [`DagWorkflow`] | DAG pipeline: topological execution, independent nodes run concurrently |
23
24// ── Graph Workflow ────────────────────────────────────────────────────────────
25
26pub mod checkpoint_store;
27mod graph;
28mod node;
29pub mod state;
30
31pub use checkpoint_store::{
32 Checkpoint, CheckpointInfo, CheckpointStore, FileCheckpointStore, InterruptType,
33 MemoryCheckpointStore,
34};
35
36pub use graph::{
37 Graph, GraphBuilder, GraphResult, InterruptConfig, InterruptState, RunUntilInterruptResult,
38};
39pub use state::SharedState;
40
41// ── Pipeline Workflow ─────────────────────────────────────────────────────────
42
43mod concurrent;
44mod dag;
45mod sequential;
46
47pub use concurrent::{ConcurrentWorkflow, ConcurrentWorkflowBuilder};
48pub use dag::{DagEdge, DagNode, DagWorkflow, DagWorkflowBuilder};
49pub use sequential::{SequentialWorkflow, SequentialWorkflowBuilder, WorkflowStep};
50
51use echo_core::agent::Agent;
52use echo_core::error::Result;
53use futures::future::BoxFuture;
54use futures::stream::BoxStream;
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::Mutex as AsyncMutex;
58
59/// Shareable agent handle for safe access across async tasks
60pub type SharedAgent = Arc<AsyncMutex<Box<dyn Agent>>>;
61
62/// Wrap arbitrary `impl Agent` as a [`SharedAgent`]
63pub fn shared_agent(agent: impl Agent + 'static) -> SharedAgent {
64 Arc::new(AsyncMutex::new(Box::new(agent)))
65}
66
67/// Step-by-step events emitted during workflow execution.
68///
69/// Obtain a `BoxStream<WorkflowEvent>` via [`Workflow::run_stream`]
70/// for realtime UI updates, progress bars, logging, etc.
71#[derive(Debug, Clone)]
72pub enum WorkflowEvent {
73 /// Node started execution
74 NodeStart {
75 node_name: String,
76 step_index: usize,
77 },
78 /// Node finished execution
79 NodeEnd {
80 node_name: String,
81 step_index: usize,
82 elapsed: Duration,
83 },
84 /// Token produced by node (forwarded during streaming agent output)
85 Token { node_name: String, token: String },
86 /// Node execution error (non-fatal; error is recorded but stream continues)
87 NodeError { node_name: String, error: String },
88 /// Workflow execution completed
89 Completed {
90 result: String,
91 total_steps: usize,
92 elapsed: Duration,
93 },
94}
95
96/// Unified workflow execution interface
97pub trait Workflow: Send + Sync {
98 /// Run the entire workflow with `input` as the initial input
99 fn run<'a>(&'a mut self, input: &'a str) -> BoxFuture<'a, Result<WorkflowOutput>>;
100
101 /// Run the entire workflow with `input` as initial input (streaming per-node events).
102 ///
103 /// Default implementation falls back to `run()` and only emits the `Completed` event.
104 fn run_stream<'a>(
105 &'a mut self,
106 input: &'a str,
107 ) -> BoxFuture<'a, Result<BoxStream<'a, Result<WorkflowEvent>>>> {
108 Box::pin(async move {
109 let output = self.run(input).await?;
110 let event = WorkflowEvent::Completed {
111 result: output.result,
112 total_steps: output.steps.len(),
113 elapsed: output.elapsed,
114 };
115 let stream: BoxStream<'a, Result<WorkflowEvent>> =
116 Box::pin(futures::stream::once(async { Ok(event) }));
117 Ok(stream)
118 })
119 }
120}
121
122/// Complete output of a workflow execution
123#[derive(Debug, Clone)]
124pub struct WorkflowOutput {
125 /// Final result text
126 pub result: String,
127 /// Detailed output for each step
128 pub steps: Vec<StepOutput>,
129 /// Total elapsed time
130 pub elapsed: Duration,
131}
132
133/// Detailed output of a single step execution
134#[derive(Debug, Clone)]
135pub struct StepOutput {
136 /// Name of the agent that executed this step
137 pub agent_name: String,
138 /// Input received by this step
139 pub input: String,
140 /// Output produced by this step
141 pub output: String,
142 /// Step elapsed time
143 pub elapsed: Duration,
144}