bamboo_agent/server/workflow/
mod.rs1mod loader;
31
32#[cfg(test)]
33mod tests;
34
35use crate::agent::core::composition::ToolExpr;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::io;
40use std::path::{Path, PathBuf};
41use std::sync::RwLock;
42use std::time::SystemTime;
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub struct WorkflowDefinition {
46 pub id: String,
47 pub name: String,
48 pub description: String,
49 pub version: String,
50 pub composition: ToolExpr,
51}
52
53impl WorkflowDefinition {
54 pub fn validate(&self) -> Result<(), String> {
55 validate_required("id", &self.id)?;
56 validate_required("name", &self.name)?;
57 validate_required("description", &self.description)?;
58 validate_required("version", &self.version)?;
59 validate_expr(&self.composition)
60 }
61}
62
63fn validate_required(field: &str, value: &str) -> Result<(), String> {
64 if value.trim().is_empty() {
65 return Err(format!("{field} cannot be empty"));
66 }
67
68 Ok(())
69}
70
71fn validate_expr(expr: &ToolExpr) -> Result<(), String> {
72 match expr {
73 ToolExpr::Call { tool, .. } => {
74 if tool.trim().is_empty() {
75 return Err("composition call tool cannot be empty".to_string());
76 }
77 }
78 ToolExpr::Sequence { steps, .. } => {
79 if steps.is_empty() {
80 return Err("composition sequence requires at least one step".to_string());
81 }
82
83 for step in steps {
84 validate_expr(step)?;
85 }
86 }
87 ToolExpr::Parallel { branches, .. } => {
88 if branches.is_empty() {
89 return Err("composition parallel requires at least one branch".to_string());
90 }
91
92 for branch in branches {
93 validate_expr(branch)?;
94 }
95 }
96 ToolExpr::Choice {
97 then_branch,
98 else_branch,
99 ..
100 } => {
101 validate_expr(then_branch)?;
102
103 if let Some(else_expr) = else_branch {
104 validate_expr(else_expr)?;
105 }
106 }
107 ToolExpr::Retry {
108 expr, max_attempts, ..
109 } => {
110 if *max_attempts == 0 {
111 return Err("composition retry max_attempts must be greater than zero".to_string());
112 }
113
114 validate_expr(expr)?;
115 }
116 ToolExpr::Let { var, expr, body } => {
117 if var.trim().is_empty() {
118 return Err("composition let variable cannot be empty".to_string());
119 }
120
121 validate_expr(expr)?;
122 validate_expr(body)?;
123 }
124 ToolExpr::Var(name) => {
125 if name.trim().is_empty() {
126 return Err("composition var cannot be empty".to_string());
127 }
128 }
129 }
130
131 Ok(())
132}
133
134#[derive(Debug)]
135pub enum WorkflowLoadError {
136 FileNotFound(PathBuf),
137 NotAFile(PathBuf),
138 NotADirectory(PathBuf),
139 Io {
140 path: PathBuf,
141 source: io::Error,
142 },
143 Parse {
144 path: PathBuf,
145 source: serde_yaml::Error,
146 },
147 InvalidWorkflow {
148 path: PathBuf,
149 message: String,
150 },
151}
152
153impl fmt::Display for WorkflowLoadError {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 match self {
156 WorkflowLoadError::FileNotFound(path) => {
157 write!(f, "workflow file not found: {}", path.display())
158 }
159 WorkflowLoadError::NotAFile(path) => {
160 write!(f, "path is not a file: {}", path.display())
161 }
162 WorkflowLoadError::NotADirectory(path) => {
163 write!(f, "path is not a directory: {}", path.display())
164 }
165 WorkflowLoadError::Io { path, source } => {
166 write!(f, "I/O error for {}: {}", path.display(), source)
167 }
168 WorkflowLoadError::Parse { path, source } => {
169 write!(f, "failed to parse workflow {}: {}", path.display(), source)
170 }
171 WorkflowLoadError::InvalidWorkflow { path, message } => {
172 write!(f, "invalid workflow in {}: {}", path.display(), message)
173 }
174 }
175 }
176}
177
178impl std::error::Error for WorkflowLoadError {
179 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
180 match self {
181 WorkflowLoadError::Io { source, .. } => Some(source),
182 WorkflowLoadError::Parse { source, .. } => Some(source),
183 _ => None,
184 }
185 }
186}
187
188#[derive(Debug, Clone)]
189pub(crate) struct CachedWorkflow {
190 pub(crate) modified: Option<SystemTime>,
191 pub(crate) definition: WorkflowDefinition,
192}
193
194pub struct WorkflowLoader {
195 workflows_dir: PathBuf,
196 cache: RwLock<HashMap<PathBuf, CachedWorkflow>>,
197}
198
199impl WorkflowLoader {
200 pub fn new() -> Self {
201 let home = dirs::home_dir().unwrap_or_else(std::env::temp_dir);
202 Self {
203 workflows_dir: home.join(".bamboo").join("workflows"),
204 cache: RwLock::new(HashMap::new()),
205 }
206 }
207
208 pub fn with_dir(path: PathBuf) -> Self {
209 Self {
210 workflows_dir: path,
211 cache: RwLock::new(HashMap::new()),
212 }
213 }
214
215 pub fn load_from_file<P>(&self, path: P) -> Result<WorkflowDefinition, WorkflowLoadError>
216 where
217 P: AsRef<Path>,
218 {
219 loader::load_from_file(self, path.as_ref())
220 }
221
222 pub fn load_all_from_directory<P>(
223 &self,
224 dir: P,
225 ) -> Result<Vec<WorkflowDefinition>, WorkflowLoadError>
226 where
227 P: AsRef<Path>,
228 {
229 loader::load_all_from_directory(self, dir.as_ref())
230 }
231
232 pub fn load_all(&self) -> Result<Vec<WorkflowDefinition>, WorkflowLoadError> {
233 self.load_all_from_directory(&self.workflows_dir)
234 }
235
236 pub fn validate_definition(&self, definition: &WorkflowDefinition) -> Result<(), String> {
237 definition.validate()
238 }
239
240 pub(crate) fn validate_with_path(
241 &self,
242 path: &Path,
243 definition: &WorkflowDefinition,
244 ) -> Result<(), WorkflowLoadError> {
245 definition
246 .validate()
247 .map_err(|message| WorkflowLoadError::InvalidWorkflow {
248 path: path.to_path_buf(),
249 message,
250 })
251 }
252}
253
254impl Default for WorkflowLoader {
255 fn default() -> Self {
256 Self::new()
257 }
258}