Skip to main content

echo_orchestration/workflow/
mod.rs

1//! Graph workflow engine + general Workflow / Pipeline
2//!
3//! Provides two orchestration capabilities:
4//!
5//! ## 1. Graph Workflow (analogous to LangGraph)
6//!
7//! Models agent execution as a **directed graph + shared state**, supporting:
8//! - Linear pipelines, conditional branching, cycles, parallel fan-out/fan-in
9//!
10//! | Concept | Type | Description |
11//! |------|------|------|
12//! | State | [`SharedState`] | KV store shared between nodes + structured message history |
13//! | Graph | [`Graph`] | Compiled immutable workflow |
14//! | Builder | [`GraphBuilder`] | Chain API for building graphs |
15//!
16//! ## 2. Pipeline Workflow (Sequential / Concurrent / DAG)
17//!
18//! | Type | Description |
19//! |------|------|
20//! | [`SequentialWorkflow`] | Sequential pipeline: previous step output becomes next step input |
21//! | [`ConcurrentWorkflow`] | Concurrent pipeline: all agents execute in parallel, then merge results |
22//! | [`DagWorkflow`] | DAG pipeline: topological execution, independent nodes run concurrently |
23
24// ── Graph Workflow ────────────────────────────────────────────────────────────
25
26pub mod checkpoint_store;
27mod graph;
28mod node;
29pub mod state;
30
31pub use checkpoint_store::{
32    Checkpoint, CheckpointInfo, CheckpointStore, FileCheckpointStore, InterruptType,
33    MemoryCheckpointStore,
34};
35
36pub use graph::{
37    Graph, GraphBuilder, GraphResult, InterruptConfig, InterruptState, RunUntilInterruptResult,
38};
39pub use state::SharedState;
40
41// ── Pipeline Workflow ─────────────────────────────────────────────────────────
42
43mod concurrent;
44mod dag;
45mod sequential;
46
47pub use concurrent::{ConcurrentWorkflow, ConcurrentWorkflowBuilder};
48pub use dag::{DagEdge, DagNode, DagWorkflow, DagWorkflowBuilder};
49pub use sequential::{SequentialWorkflow, SequentialWorkflowBuilder, WorkflowStep};
50
51use echo_core::agent::Agent;
52use echo_core::error::Result;
53use futures::future::BoxFuture;
54use futures::stream::BoxStream;
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::Mutex as AsyncMutex;
58
59/// Shareable agent handle for safe access across async tasks
60pub type SharedAgent = Arc<AsyncMutex<Box<dyn Agent>>>;
61
62/// Wrap arbitrary `impl Agent` as a [`SharedAgent`]
63pub fn shared_agent(agent: impl Agent + 'static) -> SharedAgent {
64    Arc::new(AsyncMutex::new(Box::new(agent)))
65}
66
67/// Step-by-step events emitted during workflow execution.
68///
69/// Obtain a `BoxStream<WorkflowEvent>` via [`Workflow::run_stream`]
70/// for realtime UI updates, progress bars, logging, etc.
71#[derive(Debug, Clone)]
72pub enum WorkflowEvent {
73    /// Node started execution
74    NodeStart {
75        node_name: String,
76        step_index: usize,
77    },
78    /// Node finished execution
79    NodeEnd {
80        node_name: String,
81        step_index: usize,
82        elapsed: Duration,
83    },
84    /// Token produced by node (forwarded during streaming agent output)
85    Token { node_name: String, token: String },
86    /// Node execution error (non-fatal; error is recorded but stream continues)
87    NodeError { node_name: String, error: String },
88    /// Workflow execution completed
89    Completed {
90        result: String,
91        total_steps: usize,
92        elapsed: Duration,
93    },
94}
95
96/// Unified workflow execution interface
97pub trait Workflow: Send + Sync {
98    /// Run the entire workflow with `input` as the initial input
99    fn run<'a>(&'a mut self, input: &'a str) -> BoxFuture<'a, Result<WorkflowOutput>>;
100
101    /// Run the entire workflow with `input` as initial input (streaming per-node events).
102    ///
103    /// Default implementation falls back to `run()` and only emits the `Completed` event.
104    fn run_stream<'a>(
105        &'a mut self,
106        input: &'a str,
107    ) -> BoxFuture<'a, Result<BoxStream<'a, Result<WorkflowEvent>>>> {
108        Box::pin(async move {
109            let output = self.run(input).await?;
110            let event = WorkflowEvent::Completed {
111                result: output.result,
112                total_steps: output.steps.len(),
113                elapsed: output.elapsed,
114            };
115            let stream: BoxStream<'a, Result<WorkflowEvent>> =
116                Box::pin(futures::stream::once(async { Ok(event) }));
117            Ok(stream)
118        })
119    }
120}
121
122/// Complete output of a workflow execution
123#[derive(Debug, Clone)]
124pub struct WorkflowOutput {
125    /// Final result text
126    pub result: String,
127    /// Detailed output for each step
128    pub steps: Vec<StepOutput>,
129    /// Total elapsed time
130    pub elapsed: Duration,
131}
132
133/// Detailed output of a single step execution
134#[derive(Debug, Clone)]
135pub struct StepOutput {
136    /// Name of the agent that executed this step
137    pub agent_name: String,
138    /// Input received by this step
139    pub input: String,
140    /// Output produced by this step
141    pub output: String,
142    /// Step elapsed time
143    pub elapsed: Duration,
144}