openclaw_agents/workflow/
mod.rs1use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12#[derive(Error, Debug)]
14pub enum WorkflowError {
15 #[error("Node not found: {0}")]
17 NodeNotFound(String),
18
19 #[error("Branch not found: {0}")]
21 BranchNotFound(String),
22
23 #[error("Node execution failed: {0}")]
25 ExecutionFailed(String),
26
27 #[error("Invalid workflow: {0}")]
29 InvalidWorkflow(String),
30
31 #[error("Cycle detected at node: {0}")]
33 CycleDetected(String),
34}
35
36pub struct NodeContext {
38 pub input: serde_json::Value,
40 pub config: serde_json::Value,
42 pub state: HashMap<String, serde_json::Value>,
44}
45
46pub struct NodeOutput {
48 pub data: serde_json::Value,
50 pub next: Option<String>,
52 pub branch: Option<String>,
54}
55
56impl NodeOutput {
57 #[must_use]
59 pub const fn continue_with(data: serde_json::Value) -> Self {
60 Self {
61 data,
62 next: None,
63 branch: None,
64 }
65 }
66
67 #[must_use]
69 pub fn goto(data: serde_json::Value, node_id: impl Into<String>) -> Self {
70 Self {
71 data,
72 next: Some(node_id.into()),
73 branch: None,
74 }
75 }
76
77 #[must_use]
79 pub fn branch(data: serde_json::Value, branch_name: impl Into<String>) -> Self {
80 Self {
81 data,
82 next: None,
83 branch: Some(branch_name.into()),
84 }
85 }
86
87 #[must_use]
89 pub fn end(data: serde_json::Value) -> Self {
90 Self {
91 data,
92 next: Some("__end__".to_string()),
93 branch: None,
94 }
95 }
96}
97
98#[async_trait]
100pub trait WorkflowNode: Send + Sync {
101 fn id(&self) -> &str;
103
104 fn node_type(&self) -> &str;
106
107 async fn execute(&self, ctx: NodeContext) -> Result<NodeOutput, WorkflowError>;
109
110 fn input_schema(&self) -> Option<&serde_json::Value> {
112 None
113 }
114
115 fn output_schema(&self) -> Option<&serde_json::Value> {
117 None
118 }
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct WorkflowEdge {
124 pub from: String,
126 pub to: String,
128 pub condition: Option<String>,
130}
131
132pub struct Workflow {
134 pub id: String,
136 pub name: String,
138 pub nodes: Vec<Arc<dyn WorkflowNode>>,
140 pub edges: Vec<WorkflowEdge>,
142 pub start_node: String,
144}
145
146impl Workflow {
147 #[must_use]
149 pub fn new(
150 id: impl Into<String>,
151 name: impl Into<String>,
152 start_node: impl Into<String>,
153 ) -> Self {
154 Self {
155 id: id.into(),
156 name: name.into(),
157 nodes: Vec::new(),
158 edges: Vec::new(),
159 start_node: start_node.into(),
160 }
161 }
162
163 pub fn add_node(&mut self, node: Arc<dyn WorkflowNode>) {
165 self.nodes.push(node);
166 }
167
168 pub fn add_edge(&mut self, from: impl Into<String>, to: impl Into<String>) {
170 self.edges.push(WorkflowEdge {
171 from: from.into(),
172 to: to.into(),
173 condition: None,
174 });
175 }
176
177 pub fn add_conditional_edge(
179 &mut self,
180 from: impl Into<String>,
181 to: impl Into<String>,
182 condition: impl Into<String>,
183 ) {
184 self.edges.push(WorkflowEdge {
185 from: from.into(),
186 to: to.into(),
187 condition: Some(condition.into()),
188 });
189 }
190
191 #[must_use]
193 pub fn find_node(&self, id: &str) -> Option<&Arc<dyn WorkflowNode>> {
194 self.nodes.iter().find(|n| n.id() == id)
195 }
196
197 #[must_use]
199 pub fn outgoing_edges(&self, node_id: &str) -> Vec<&WorkflowEdge> {
200 self.edges.iter().filter(|e| e.from == node_id).collect()
201 }
202}
203
204pub struct WorkflowEngine {
206 max_iterations: usize,
207}
208
209impl WorkflowEngine {
210 #[must_use]
212 pub const fn new() -> Self {
213 Self {
214 max_iterations: 1000,
215 }
216 }
217
218 #[must_use]
220 pub const fn with_max_iterations(mut self, max: usize) -> Self {
221 self.max_iterations = max;
222 self
223 }
224
225 pub async fn execute(
231 &self,
232 workflow: &Workflow,
233 input: serde_json::Value,
234 ) -> Result<serde_json::Value, WorkflowError> {
235 let mut current_node_id = workflow.start_node.clone();
236 let mut data = input;
237 let state = HashMap::new();
238 let mut iterations = 0;
239
240 loop {
241 iterations += 1;
242 if iterations > self.max_iterations {
243 return Err(WorkflowError::CycleDetected(current_node_id));
244 }
245
246 let node = workflow
248 .find_node(¤t_node_id)
249 .ok_or_else(|| WorkflowError::NodeNotFound(current_node_id.clone()))?;
250
251 let ctx = NodeContext {
252 input: data.clone(),
253 config: serde_json::Value::Object(serde_json::Map::new()),
254 state: state.clone(),
255 };
256
257 let output = node.execute(ctx).await?;
258 data = output.data;
259
260 let next_node = if let Some(explicit_next) = output.next {
262 if explicit_next == "__end__" {
263 break;
264 }
265 Some(explicit_next)
266 } else if let Some(branch) = output.branch {
267 workflow
269 .outgoing_edges(¤t_node_id)
270 .iter()
271 .find(|e| e.condition.as_ref() == Some(&branch))
272 .map(|e| e.to.clone())
273 } else {
274 workflow
276 .outgoing_edges(¤t_node_id)
277 .iter()
278 .find(|e| e.condition.is_none())
279 .map(|e| e.to.clone())
280 };
281
282 match next_node {
283 Some(next) => current_node_id = next,
284 None => break, }
286 }
287
288 Ok(data)
289 }
290}
291
292impl Default for WorkflowEngine {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298pub struct PassthroughNode {
300 id: String,
301}
302
303impl PassthroughNode {
304 #[must_use]
306 pub fn new(id: impl Into<String>) -> Self {
307 Self { id: id.into() }
308 }
309}
310
311#[async_trait]
312impl WorkflowNode for PassthroughNode {
313 fn id(&self) -> &str {
314 &self.id
315 }
316
317 fn node_type(&self) -> &'static str {
318 "passthrough"
319 }
320
321 async fn execute(&self, ctx: NodeContext) -> Result<NodeOutput, WorkflowError> {
322 Ok(NodeOutput::continue_with(ctx.input))
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[tokio::test]
331 async fn test_simple_workflow() {
332 let mut workflow = Workflow::new("test", "Test Workflow", "node1");
333
334 workflow.add_node(Arc::new(PassthroughNode::new("node1")));
335 workflow.add_node(Arc::new(PassthroughNode::new("node2")));
336 workflow.add_edge("node1", "node2");
337
338 let engine = WorkflowEngine::new();
339 let result = engine
340 .execute(&workflow, serde_json::json!({"value": 42}))
341 .await;
342
343 assert!(result.is_ok());
344 assert_eq!(result.unwrap()["value"], 42);
345 }
346
347 #[tokio::test]
348 async fn test_workflow_node_not_found() {
349 let workflow = Workflow::new("test", "Test", "nonexistent");
350 let engine = WorkflowEngine::new();
351
352 let result = engine.execute(&workflow, serde_json::json!({})).await;
353 assert!(matches!(result, Err(WorkflowError::NodeNotFound(_))));
354 }
355}