Skip to main content

bamboo_agent/server/workflow/
mod.rs

1//! Workflow system for defining and executing agent workflows
2//!
3//! This module provides a workflow engine that allows users to define
4//! complex agent behaviors using a declarative composition syntax.
5//!
6//! # Workflow Definition
7//!
8//! Workflows are defined in YAML or JSON files and support:
9//! - Sequential step execution
10//! - Parallel execution
11//! - Conditional branching
12//! - Tool composition
13//!
14//! # Example
15//!
16//! ```yaml
17//! id: my-workflow
18//! name: Example Workflow
19//! description: Demonstrates workflow capabilities
20//! version: "1.0"
21//! composition:
22//!   type: sequence
23//!   steps:
24//!     - type: tool
25//!       name: read_file
26//!     - type: tool
27//!       name: analyze
28//! ```
29
30mod loader;
31
32#[cfg(test)]
33mod tests;
34
35use crate::agent::core::composition::ToolExpr;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::io;
40use std::path::{Path, PathBuf};
41use std::sync::RwLock;
42use std::time::SystemTime;
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub struct WorkflowDefinition {
46    pub id: String,
47    pub name: String,
48    pub description: String,
49    pub version: String,
50    pub composition: ToolExpr,
51}
52
53impl WorkflowDefinition {
54    pub fn validate(&self) -> Result<(), String> {
55        validate_required("id", &self.id)?;
56        validate_required("name", &self.name)?;
57        validate_required("description", &self.description)?;
58        validate_required("version", &self.version)?;
59        validate_expr(&self.composition)
60    }
61}
62
63fn validate_required(field: &str, value: &str) -> Result<(), String> {
64    if value.trim().is_empty() {
65        return Err(format!("{field} cannot be empty"));
66    }
67
68    Ok(())
69}
70
71fn validate_expr(expr: &ToolExpr) -> Result<(), String> {
72    match expr {
73        ToolExpr::Call { tool, .. } => {
74            if tool.trim().is_empty() {
75                return Err("composition call tool cannot be empty".to_string());
76            }
77        }
78        ToolExpr::Sequence { steps, .. } => {
79            if steps.is_empty() {
80                return Err("composition sequence requires at least one step".to_string());
81            }
82
83            for step in steps {
84                validate_expr(step)?;
85            }
86        }
87        ToolExpr::Parallel { branches, .. } => {
88            if branches.is_empty() {
89                return Err("composition parallel requires at least one branch".to_string());
90            }
91
92            for branch in branches {
93                validate_expr(branch)?;
94            }
95        }
96        ToolExpr::Choice {
97            then_branch,
98            else_branch,
99            ..
100        } => {
101            validate_expr(then_branch)?;
102
103            if let Some(else_expr) = else_branch {
104                validate_expr(else_expr)?;
105            }
106        }
107        ToolExpr::Retry {
108            expr, max_attempts, ..
109        } => {
110            if *max_attempts == 0 {
111                return Err("composition retry max_attempts must be greater than zero".to_string());
112            }
113
114            validate_expr(expr)?;
115        }
116        ToolExpr::Let { var, expr, body } => {
117            if var.trim().is_empty() {
118                return Err("composition let variable cannot be empty".to_string());
119            }
120
121            validate_expr(expr)?;
122            validate_expr(body)?;
123        }
124        ToolExpr::Var(name) => {
125            if name.trim().is_empty() {
126                return Err("composition var cannot be empty".to_string());
127            }
128        }
129    }
130
131    Ok(())
132}
133
134#[derive(Debug)]
135pub enum WorkflowLoadError {
136    FileNotFound(PathBuf),
137    NotAFile(PathBuf),
138    NotADirectory(PathBuf),
139    Io {
140        path: PathBuf,
141        source: io::Error,
142    },
143    Parse {
144        path: PathBuf,
145        source: serde_yaml::Error,
146    },
147    InvalidWorkflow {
148        path: PathBuf,
149        message: String,
150    },
151}
152
153impl fmt::Display for WorkflowLoadError {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        match self {
156            WorkflowLoadError::FileNotFound(path) => {
157                write!(f, "workflow file not found: {}", path.display())
158            }
159            WorkflowLoadError::NotAFile(path) => {
160                write!(f, "path is not a file: {}", path.display())
161            }
162            WorkflowLoadError::NotADirectory(path) => {
163                write!(f, "path is not a directory: {}", path.display())
164            }
165            WorkflowLoadError::Io { path, source } => {
166                write!(f, "I/O error for {}: {}", path.display(), source)
167            }
168            WorkflowLoadError::Parse { path, source } => {
169                write!(f, "failed to parse workflow {}: {}", path.display(), source)
170            }
171            WorkflowLoadError::InvalidWorkflow { path, message } => {
172                write!(f, "invalid workflow in {}: {}", path.display(), message)
173            }
174        }
175    }
176}
177
178impl std::error::Error for WorkflowLoadError {
179    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
180        match self {
181            WorkflowLoadError::Io { source, .. } => Some(source),
182            WorkflowLoadError::Parse { source, .. } => Some(source),
183            _ => None,
184        }
185    }
186}
187
188#[derive(Debug, Clone)]
189pub(crate) struct CachedWorkflow {
190    pub(crate) modified: Option<SystemTime>,
191    pub(crate) definition: WorkflowDefinition,
192}
193
194pub struct WorkflowLoader {
195    workflows_dir: PathBuf,
196    cache: RwLock<HashMap<PathBuf, CachedWorkflow>>,
197}
198
199impl WorkflowLoader {
200    pub fn new() -> Self {
201        let home = dirs::home_dir().unwrap_or_else(std::env::temp_dir);
202        Self {
203            workflows_dir: home.join(".bamboo").join("workflows"),
204            cache: RwLock::new(HashMap::new()),
205        }
206    }
207
208    pub fn with_dir(path: PathBuf) -> Self {
209        Self {
210            workflows_dir: path,
211            cache: RwLock::new(HashMap::new()),
212        }
213    }
214
215    pub fn load_from_file<P>(&self, path: P) -> Result<WorkflowDefinition, WorkflowLoadError>
216    where
217        P: AsRef<Path>,
218    {
219        loader::load_from_file(self, path.as_ref())
220    }
221
222    pub fn load_all_from_directory<P>(
223        &self,
224        dir: P,
225    ) -> Result<Vec<WorkflowDefinition>, WorkflowLoadError>
226    where
227        P: AsRef<Path>,
228    {
229        loader::load_all_from_directory(self, dir.as_ref())
230    }
231
232    pub fn load_all(&self) -> Result<Vec<WorkflowDefinition>, WorkflowLoadError> {
233        self.load_all_from_directory(&self.workflows_dir)
234    }
235
236    pub fn validate_definition(&self, definition: &WorkflowDefinition) -> Result<(), String> {
237        definition.validate()
238    }
239
240    pub(crate) fn validate_with_path(
241        &self,
242        path: &Path,
243        definition: &WorkflowDefinition,
244    ) -> Result<(), WorkflowLoadError> {
245        definition
246            .validate()
247            .map_err(|message| WorkflowLoadError::InvalidWorkflow {
248                path: path.to_path_buf(),
249                message,
250            })
251    }
252}
253
254impl Default for WorkflowLoader {
255    fn default() -> Self {
256        Self::new()
257    }
258}