Skip to main content

openclaw_agents/workflow/
mod.rs

1//! Workflow engine (m9m pattern).
2//!
3//! Execute agent logic as a graph of workflow nodes.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12/// Workflow execution errors.
13#[derive(Error, Debug)]
14pub enum WorkflowError {
15    /// Node not found.
16    #[error("Node not found: {0}")]
17    NodeNotFound(String),
18
19    /// Branch not found.
20    #[error("Branch not found: {0}")]
21    BranchNotFound(String),
22
23    /// Node execution failed.
24    #[error("Node execution failed: {0}")]
25    ExecutionFailed(String),
26
27    /// Invalid workflow configuration.
28    #[error("Invalid workflow: {0}")]
29    InvalidWorkflow(String),
30
31    /// Cycle detected in workflow.
32    #[error("Cycle detected at node: {0}")]
33    CycleDetected(String),
34}
35
36/// Node execution context.
37pub struct NodeContext {
38    /// Input data.
39    pub input: serde_json::Value,
40    /// Node configuration.
41    pub config: serde_json::Value,
42    /// Shared workflow state.
43    pub state: HashMap<String, serde_json::Value>,
44}
45
46/// Result of node execution.
47pub struct NodeOutput {
48    /// Output data.
49    pub data: serde_json::Value,
50    /// Next node ID (explicit routing).
51    pub next: Option<String>,
52    /// Branch name (conditional routing).
53    pub branch: Option<String>,
54}
55
56impl NodeOutput {
57    /// Create output that continues to next node.
58    #[must_use]
59    pub const fn continue_with(data: serde_json::Value) -> Self {
60        Self {
61            data,
62            next: None,
63            branch: None,
64        }
65    }
66
67    /// Create output that goes to specific node.
68    #[must_use]
69    pub fn goto(data: serde_json::Value, node_id: impl Into<String>) -> Self {
70        Self {
71            data,
72            next: Some(node_id.into()),
73            branch: None,
74        }
75    }
76
77    /// Create output that takes a branch.
78    #[must_use]
79    pub fn branch(data: serde_json::Value, branch_name: impl Into<String>) -> Self {
80        Self {
81            data,
82            next: None,
83            branch: Some(branch_name.into()),
84        }
85    }
86
87    /// Create output that ends the workflow.
88    #[must_use]
89    pub fn end(data: serde_json::Value) -> Self {
90        Self {
91            data,
92            next: Some("__end__".to_string()),
93            branch: None,
94        }
95    }
96}
97
98/// Workflow node trait.
99#[async_trait]
100pub trait WorkflowNode: Send + Sync {
101    /// Node identifier.
102    fn id(&self) -> &str;
103
104    /// Node type name.
105    fn node_type(&self) -> &str;
106
107    /// Execute the node.
108    async fn execute(&self, ctx: NodeContext) -> Result<NodeOutput, WorkflowError>;
109
110    /// Input schema (optional).
111    fn input_schema(&self) -> Option<&serde_json::Value> {
112        None
113    }
114
115    /// Output schema (optional).
116    fn output_schema(&self) -> Option<&serde_json::Value> {
117        None
118    }
119}
120
121/// Edge connecting two nodes.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct WorkflowEdge {
124    /// Source node ID.
125    pub from: String,
126    /// Target node ID.
127    pub to: String,
128    /// Condition for this edge (branch name).
129    pub condition: Option<String>,
130}
131
132/// Workflow definition.
133pub struct Workflow {
134    /// Workflow ID.
135    pub id: String,
136    /// Workflow name.
137    pub name: String,
138    /// Nodes in the workflow.
139    pub nodes: Vec<Arc<dyn WorkflowNode>>,
140    /// Edges connecting nodes.
141    pub edges: Vec<WorkflowEdge>,
142    /// Starting node ID.
143    pub start_node: String,
144}
145
146impl Workflow {
147    /// Create a new workflow.
148    #[must_use]
149    pub fn new(
150        id: impl Into<String>,
151        name: impl Into<String>,
152        start_node: impl Into<String>,
153    ) -> Self {
154        Self {
155            id: id.into(),
156            name: name.into(),
157            nodes: Vec::new(),
158            edges: Vec::new(),
159            start_node: start_node.into(),
160        }
161    }
162
163    /// Add a node.
164    pub fn add_node(&mut self, node: Arc<dyn WorkflowNode>) {
165        self.nodes.push(node);
166    }
167
168    /// Add an edge.
169    pub fn add_edge(&mut self, from: impl Into<String>, to: impl Into<String>) {
170        self.edges.push(WorkflowEdge {
171            from: from.into(),
172            to: to.into(),
173            condition: None,
174        });
175    }
176
177    /// Add a conditional edge.
178    pub fn add_conditional_edge(
179        &mut self,
180        from: impl Into<String>,
181        to: impl Into<String>,
182        condition: impl Into<String>,
183    ) {
184        self.edges.push(WorkflowEdge {
185            from: from.into(),
186            to: to.into(),
187            condition: Some(condition.into()),
188        });
189    }
190
191    /// Find node by ID.
192    #[must_use]
193    pub fn find_node(&self, id: &str) -> Option<&Arc<dyn WorkflowNode>> {
194        self.nodes.iter().find(|n| n.id() == id)
195    }
196
197    /// Find outgoing edges from a node.
198    #[must_use]
199    pub fn outgoing_edges(&self, node_id: &str) -> Vec<&WorkflowEdge> {
200        self.edges.iter().filter(|e| e.from == node_id).collect()
201    }
202}
203
204/// Workflow execution engine.
205pub struct WorkflowEngine {
206    max_iterations: usize,
207}
208
209impl WorkflowEngine {
210    /// Create a new workflow engine.
211    #[must_use]
212    pub const fn new() -> Self {
213        Self {
214            max_iterations: 1000,
215        }
216    }
217
218    /// Set maximum iterations (cycle protection).
219    #[must_use]
220    pub const fn with_max_iterations(mut self, max: usize) -> Self {
221        self.max_iterations = max;
222        self
223    }
224
225    /// Execute a workflow.
226    ///
227    /// # Errors
228    ///
229    /// Returns error if execution fails or cycle detected.
230    pub async fn execute(
231        &self,
232        workflow: &Workflow,
233        input: serde_json::Value,
234    ) -> Result<serde_json::Value, WorkflowError> {
235        let mut current_node_id = workflow.start_node.clone();
236        let mut data = input;
237        let state = HashMap::new();
238        let mut iterations = 0;
239
240        loop {
241            iterations += 1;
242            if iterations > self.max_iterations {
243                return Err(WorkflowError::CycleDetected(current_node_id));
244            }
245
246            // Find and execute current node
247            let node = workflow
248                .find_node(&current_node_id)
249                .ok_or_else(|| WorkflowError::NodeNotFound(current_node_id.clone()))?;
250
251            let ctx = NodeContext {
252                input: data.clone(),
253                config: serde_json::Value::Object(serde_json::Map::new()),
254                state: state.clone(),
255            };
256
257            let output = node.execute(ctx).await?;
258            data = output.data;
259
260            // Determine next node
261            let next_node = if let Some(explicit_next) = output.next {
262                if explicit_next == "__end__" {
263                    break;
264                }
265                Some(explicit_next)
266            } else if let Some(branch) = output.branch {
267                // Find edge matching the branch
268                workflow
269                    .outgoing_edges(&current_node_id)
270                    .iter()
271                    .find(|e| e.condition.as_ref() == Some(&branch))
272                    .map(|e| e.to.clone())
273            } else {
274                // Take first unconditional edge
275                workflow
276                    .outgoing_edges(&current_node_id)
277                    .iter()
278                    .find(|e| e.condition.is_none())
279                    .map(|e| e.to.clone())
280            };
281
282            match next_node {
283                Some(next) => current_node_id = next,
284                None => break, // No more nodes
285            }
286        }
287
288        Ok(data)
289    }
290}
291
292impl Default for WorkflowEngine {
293    fn default() -> Self {
294        Self::new()
295    }
296}
297
298/// Simple pass-through node for testing.
299pub struct PassthroughNode {
300    id: String,
301}
302
303impl PassthroughNode {
304    /// Create a new passthrough node.
305    #[must_use]
306    pub fn new(id: impl Into<String>) -> Self {
307        Self { id: id.into() }
308    }
309}
310
311#[async_trait]
312impl WorkflowNode for PassthroughNode {
313    fn id(&self) -> &str {
314        &self.id
315    }
316
317    fn node_type(&self) -> &'static str {
318        "passthrough"
319    }
320
321    async fn execute(&self, ctx: NodeContext) -> Result<NodeOutput, WorkflowError> {
322        Ok(NodeOutput::continue_with(ctx.input))
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[tokio::test]
331    async fn test_simple_workflow() {
332        let mut workflow = Workflow::new("test", "Test Workflow", "node1");
333
334        workflow.add_node(Arc::new(PassthroughNode::new("node1")));
335        workflow.add_node(Arc::new(PassthroughNode::new("node2")));
336        workflow.add_edge("node1", "node2");
337
338        let engine = WorkflowEngine::new();
339        let result = engine
340            .execute(&workflow, serde_json::json!({"value": 42}))
341            .await;
342
343        assert!(result.is_ok());
344        assert_eq!(result.unwrap()["value"], 42);
345    }
346
347    #[tokio::test]
348    async fn test_workflow_node_not_found() {
349        let workflow = Workflow::new("test", "Test", "nonexistent");
350        let engine = WorkflowEngine::new();
351
352        let result = engine.execute(&workflow, serde_json::json!({})).await;
353        assert!(matches!(result, Err(WorkflowError::NodeNotFound(_))));
354    }
355}