bamboo_agent/agent/core/composition/mod.rs
1//! Tool Composition DSL for building complex tool workflows
2//!
3//! This module provides composable primitives for building tool execution workflows:
4//! - Sequence: Execute tools in sequence, passing results between them
5//! - Parallel: Execute tools in parallel
6//! - Choice: Conditional execution based on predicate
7//! - Retry: Retry execution with backoff
8//! - Map: Transform results
9//!
10//! # New Expression DSL
11//!
12//! The module also provides a serializable expression DSL (`ToolExpr`) that can be
13//! defined in YAML/JSON and executed by `CompositionExecutor`.
14//!
15//! ## Example YAML:
16//! ```yaml
17//! type: sequence
18//! steps:
19//! - type: call
20//! tool: read_file
21//! args:
22//! path: /tmp/input.txt
23//! - type: parallel
24//! branches:
25//! - type: call
26//! tool: process_a
27//! args: {}
28//! - type: call
29//! tool: process_b
30//! args: {}
31//! wait: all
32//! ```
33
34use crate::agent::core::tools::{Tool, ToolError, ToolResult};
35use async_trait::async_trait;
36use futures::future::join_all;
37use serde_json::Value;
38use std::sync::Arc;
39use std::time::Duration;
40use tokio::time::sleep;
41
42// New expression DSL modules
43/// Condition predicates for workflow branching
44pub mod condition;
45/// Execution context and state management
46pub mod context;
47/// Workflow executor implementation
48pub mod executor;
49/// Tool expression AST types
50pub mod expr;
51/// Parallel execution strategies
52pub mod parallel;
53
54// Re-export new DSL types
55pub use condition::Condition;
56pub use context::ExecutionContext;
57pub use executor::CompositionExecutor;
58pub use expr::{CompositionError, ToolExpr};
59pub use parallel::ParallelWait;
60
61/// Result of a composition execution
62///
63/// Contains the final tool result, success status, and updated execution context
64/// after running a composition workflow.
65#[derive(Debug, Clone)]
66pub struct CompositionResult {
67 /// Whether the composition completed successfully
68 pub success: bool,
69
70 /// The final result from the last executed tool
71 pub result: ToolResult,
72
73 /// The execution context with accumulated variables and state
74 pub context: ExecutionContext,
75}
76
77/// Trait for composable tool operations
78///
79/// This trait defines the interface for all composition primitives that can be
80/// combined to build complex tool workflows. Compositions can be nested and
81/// combined to create sophisticated execution patterns.
82///
83/// # Implementations
84///
85/// - `Sequence`: Execute tools in order, passing results between them
86/// - `Parallel`: Execute tools concurrently
87/// - `Choice`: Conditional execution based on a predicate
88/// - `Retry`: Retry execution with exponential backoff
89/// - `ToolComposition`: Wrap a single tool as a composition
90/// - `Map`: Transform results with a function
91///
92/// # Example
93///
94/// ```ignore
95/// use bamboo_agent::agent::core::composition::{Sequence, Parallel, ToolComposition};
96///
97/// let workflow = Sequence::builder()
98/// .step(ToolComposition::new(tool1, args))
99/// .step(Parallel::builder()
100/// .branch(ToolComposition::new(tool2, args2))
101/// .branch(ToolComposition::new(tool3, args3))
102/// .build())
103/// .build();
104///
105/// let result = workflow.execute(ctx).await?;
106/// ```
107#[async_trait]
108pub trait Composition: Send + Sync {
109 /// Executes the composition with the given context
110 ///
111 /// # Arguments
112 ///
113 /// * `ctx` - Execution context containing variables and state
114 ///
115 /// # Returns
116 ///
117 /// The composition result with success status, final result, and updated context
118 async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError>;
119}
120
121/// Sequence composition - executes tools in order
122///
123/// Runs a series of compositions sequentially, passing the execution context
124/// from one step to the next. Stops early if any step fails.
125///
126/// # Example
127///
128/// ```ignore
129/// let sequence = Sequence::builder()
130/// .step(read_file_composition)
131/// .step(process_composition)
132/// .step(write_file_composition)
133/// .build();
134/// ```
135pub struct Sequence {
136 steps: Vec<Box<dyn Composition>>,
137}
138
139impl Sequence {
140 /// Creates a new sequence with the given steps
141 pub fn new(steps: Vec<Box<dyn Composition>>) -> Self {
142 Self { steps }
143 }
144
145 /// Creates a new sequence builder
146 pub fn builder() -> SequenceBuilder {
147 SequenceBuilder::new()
148 }
149}
150
151#[async_trait]
152impl Composition for Sequence {
153 async fn execute(&self, mut ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
154 let mut last_result = ToolResult {
155 success: true,
156 result: String::new(),
157 display_preference: None,
158 };
159
160 for step in &self.steps {
161 let result = step.execute(ctx.clone()).await?;
162 ctx = result.context;
163 last_result = result.result;
164
165 if !last_result.success {
166 return Ok(CompositionResult {
167 success: false,
168 result: last_result,
169 context: ctx,
170 });
171 }
172 }
173
174 Ok(CompositionResult {
175 success: true,
176 result: last_result,
177 context: ctx,
178 })
179 }
180}
181
182/// Builder for creating sequence compositions
183pub struct SequenceBuilder {
184 steps: Vec<Box<dyn Composition>>,
185}
186
187impl SequenceBuilder {
188 /// Creates a new empty builder
189 pub fn new() -> Self {
190 Self { steps: Vec::new() }
191 }
192
193 /// Adds a step to the sequence
194 pub fn step(mut self, composition: impl Composition + 'static) -> Self {
195 self.steps.push(Box::new(composition));
196 self
197 }
198
199 /// Builds the final sequence
200 pub fn build(self) -> Sequence {
201 Sequence::new(self.steps)
202 }
203}
204
205impl Default for SequenceBuilder {
206 fn default() -> Self {
207 Self::new()
208 }
209}
210
211/// Parallel composition - executes tools concurrently
212///
213/// Runs multiple compositions in parallel using `futures::future::join_all`.
214/// All branches execute simultaneously and results are collected.
215///
216/// # Example
217///
218/// ```ignore
219/// let parallel = Parallel::builder()
220/// .branch(analyze_file_composition)
221/// .branch(run_tests_composition)
222/// .branch(check_lint_composition)
223/// .build();
224/// ```
225pub struct Parallel {
226 branches: Vec<Box<dyn Composition>>,
227}
228
229impl Parallel {
230 /// Creates a new parallel composition with the given branches
231 pub fn new(branches: Vec<Box<dyn Composition>>) -> Self {
232 Self { branches }
233 }
234
235 /// Creates a new parallel builder
236 pub fn builder() -> ParallelBuilder {
237 ParallelBuilder::new()
238 }
239}
240
241#[async_trait]
242impl Composition for Parallel {
243 async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
244 let futures: Vec<_> = self
245 .branches
246 .iter()
247 .map(|branch| branch.execute(ctx.clone()))
248 .collect();
249
250 let results = join_all(futures).await;
251
252 let mut all_success = true;
253 let mut combined_results = Vec::new();
254
255 for result in results {
256 match result {
257 Ok(comp_result) => {
258 combined_results.push(comp_result.result.result);
259 if !comp_result.success {
260 all_success = false;
261 }
262 }
263 Err(e) => {
264 return Err(e);
265 }
266 }
267 }
268
269 Ok(CompositionResult {
270 success: all_success,
271 result: ToolResult {
272 success: all_success,
273 result: serde_json::to_string(&combined_results).unwrap_or_default(),
274 display_preference: None,
275 },
276 context: ctx,
277 })
278 }
279}
280
281/// Builder for creating parallel compositions
282pub struct ParallelBuilder {
283 branches: Vec<Box<dyn Composition>>,
284}
285
286impl ParallelBuilder {
287 /// Creates a new empty builder
288 pub fn new() -> Self {
289 Self {
290 branches: Vec::new(),
291 }
292 }
293
294 /// Adds a branch to execute in parallel
295 pub fn branch(mut self, composition: impl Composition + 'static) -> Self {
296 self.branches.push(Box::new(composition));
297 self
298 }
299
300 /// Builds the final parallel composition
301 pub fn build(self) -> Parallel {
302 Parallel::new(self.branches)
303 }
304}
305
306impl Default for ParallelBuilder {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312/// Choice composition - conditional execution
313///
314/// Executes one of two branches based on a predicate function.
315/// If the predicate returns true, the `if_true` branch executes;
316/// otherwise, the `if_false` branch executes (if provided).
317///
318/// # Example
319///
320/// ```ignore
321/// let choice = Choice::new(
322/// |ctx| ctx.get_variable("dry_run").is_some(),
323/// dry_run_composition,
324/// ).with_else(real_execution_composition);
325/// ```
326pub struct Choice {
327 /// Predicate function that determines which branch to execute
328 predicate: Arc<dyn Fn(&ExecutionContext) -> bool + Send + Sync>,
329
330 /// Branch to execute if predicate returns true
331 if_true: Box<dyn Composition>,
332
333 /// Optional branch to execute if predicate returns false
334 if_false: Option<Box<dyn Composition>>,
335}
336
337impl Choice {
338 /// Creates a new choice composition
339 ///
340 /// # Arguments
341 ///
342 /// * `predicate` - Function that evaluates the context to choose a branch
343 /// * `if_true` - Composition to execute if predicate returns true
344 pub fn new(
345 predicate: impl Fn(&ExecutionContext) -> bool + Send + Sync + 'static,
346 if_true: impl Composition + 'static,
347 ) -> Self {
348 Self {
349 predicate: Arc::new(predicate),
350 if_true: Box::new(if_true),
351 if_false: None,
352 }
353 }
354
355 /// Adds an else branch to execute if the predicate returns false
356 pub fn with_else(mut self, if_false: impl Composition + 'static) -> Self {
357 self.if_false = Some(Box::new(if_false));
358 self
359 }
360}
361
362#[async_trait]
363impl Composition for Choice {
364 async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
365 if (self.predicate)(&ctx) {
366 self.if_true.execute(ctx).await
367 } else if let Some(ref else_branch) = self.if_false {
368 else_branch.execute(ctx).await
369 } else {
370 Ok(CompositionResult {
371 success: true,
372 result: ToolResult {
373 success: true,
374 result: "Condition was false, no else branch".to_string(),
375 display_preference: None,
376 },
377 context: ctx,
378 })
379 }
380 }
381}
382
383/// Retry composition - retry with backoff
384///
385/// Retries a composition up to a maximum number of attempts with
386/// configurable backoff delay between attempts.
387///
388/// # Example
389///
390/// ```ignore
391/// let retry = Retry::new(flaky_composition, 3)
392/// .with_backoff(200); // 200ms base backoff
393/// ```
394pub struct Retry {
395 /// The composition to retry
396 composition: Box<dyn Composition>,
397
398 /// Maximum number of attempts
399 max_attempts: u32,
400
401 /// Base backoff delay in milliseconds (multiplied by attempt number)
402 backoff_ms: u64,
403}
404
405impl Retry {
406 /// Creates a new retry composition
407 ///
408 /// # Arguments
409 ///
410 /// * `composition` - The composition to retry
411 /// * `max_attempts` - Maximum number of attempts (including the first)
412 pub fn new(composition: impl Composition + 'static, max_attempts: u32) -> Self {
413 Self {
414 composition: Box::new(composition),
415 max_attempts,
416 backoff_ms: 100,
417 }
418 }
419
420 /// Sets a custom backoff delay
421 ///
422 /// The actual delay is `backoff_ms * (attempt + 1)` for exponential backoff
423 pub fn with_backoff(mut self, backoff_ms: u64) -> Self {
424 self.backoff_ms = backoff_ms;
425 self
426 }
427}
428
429#[async_trait]
430impl Composition for Retry {
431 async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
432 let mut last_error = None;
433
434 for attempt in 0..self.max_attempts {
435 match self.composition.execute(ctx.clone()).await {
436 Ok(result) if result.success => return Ok(result),
437 Ok(result) => {
438 if attempt == self.max_attempts - 1 {
439 return Ok(result);
440 }
441 }
442 Err(e) => {
443 last_error = Some(e);
444 if attempt < self.max_attempts - 1 {
445 sleep(Duration::from_millis(
446 self.backoff_ms * (attempt as u64 + 1),
447 ))
448 .await;
449 }
450 }
451 }
452 }
453
454 Err(last_error.unwrap_or_else(|| ToolError::Execution("Max retries exceeded".to_string())))
455 }
456}
457
458/// Tool wrapper - wraps a Tool into a Composition
459///
460/// This adapter allows individual tools to be used within composition workflows.
461/// Optionally stores the result in a context variable for use by subsequent steps.
462///
463/// # Example
464///
465/// ```ignore
466/// let tool_comp = ToolComposition::new(tool, json!({"path": "/src/main.rs"}))
467/// .with_output_variable("file_content");
468/// ```
469pub struct ToolComposition {
470 /// The tool to execute
471 tool: Arc<dyn Tool>,
472
473 /// Arguments to pass to the tool
474 args: Value,
475
476 /// Optional variable name to store the result in the context
477 output_variable: Option<String>,
478}
479
480impl ToolComposition {
481 /// Creates a new tool composition
482 ///
483 /// # Arguments
484 ///
485 /// * `tool` - The tool to wrap
486 /// * `args` - Arguments to pass to the tool (JSON value)
487 pub fn new(tool: Arc<dyn Tool>, args: Value) -> Self {
488 Self {
489 tool,
490 args,
491 output_variable: None,
492 }
493 }
494
495 /// Sets the variable name to store the result in the context
496 ///
497 /// Subsequent steps can access this result via `ctx.get_variable(var_name)`
498 pub fn with_output_variable(mut self, var_name: impl Into<String>) -> Self {
499 self.output_variable = Some(var_name.into());
500 self
501 }
502}
503
504#[async_trait]
505impl Composition for ToolComposition {
506 async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
507 // Merge context variables into args
508 let mut final_args = self.args.clone();
509 if let Value::Object(ref mut map) = final_args {
510 for (key, value) in &ctx.variables {
511 if !map.contains_key(key) {
512 map.insert(key.clone(), value.clone());
513 }
514 }
515 }
516
517 let result = self.tool.execute(final_args).await?;
518 let success = result.success;
519
520 let mut new_ctx = ctx;
521 if let Some(ref var_name) = self.output_variable {
522 new_ctx.set_variable(
523 var_name.clone(),
524 serde_json::to_value(&result).unwrap_or_default(),
525 );
526 }
527 new_ctx.last_result = Some(result.clone());
528
529 Ok(CompositionResult {
530 success,
531 result,
532 context: new_ctx,
533 })
534 }
535}
536
537/// Map composition - transform results
538///
539/// Applies a transformation function to the result of a composition,
540/// allowing post-processing of tool outputs.
541///
542/// # Example
543///
544/// ```ignore
545/// let map = Map::new(
546/// read_file_composition,
547/// |result| {
548/// ToolResult {
549/// success: result.success,
550/// result: result.result.to_uppercase(),
551/// display_preference: None,
552/// }
553/// },
554/// );
555/// ```
556pub struct Map {
557 /// The composition to transform
558 composition: Box<dyn Composition>,
559
560 /// Transformation function to apply to the result
561 transform: Arc<dyn Fn(ToolResult) -> ToolResult + Send + Sync>,
562}
563
564impl Map {
565 /// Creates a new map composition
566 ///
567 /// # Arguments
568 ///
569 /// * `composition` - The composition to transform
570 /// * `transform` - Function to apply to the result
571 pub fn new(
572 composition: impl Composition + 'static,
573 transform: impl Fn(ToolResult) -> ToolResult + Send + Sync + 'static,
574 ) -> Self {
575 Self {
576 composition: Box::new(composition),
577 transform: Arc::new(transform),
578 }
579 }
580}
581
582#[async_trait]
583impl Composition for Map {
584 async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
585 let result = self.composition.execute(ctx).await?;
586 let transformed = (self.transform)(result.result);
587 let success = transformed.success;
588
589 Ok(CompositionResult {
590 success,
591 result: transformed,
592 context: result.context,
593 })
594 }
595}
596
597#[cfg(test)]
598mod tests;