Skip to main content

bamboo_agent/agent/core/composition/
mod.rs

1//! Tool Composition DSL for building complex tool workflows
2//!
3//! This module provides composable primitives for building tool execution workflows:
4//! - Sequence: Execute tools in sequence, passing results between them
5//! - Parallel: Execute tools in parallel
6//! - Choice: Conditional execution based on predicate
7//! - Retry: Retry execution with backoff
8//! - Map: Transform results
9//!
10//! # New Expression DSL
11//!
12//! The module also provides a serializable expression DSL (`ToolExpr`) that can be
13//! defined in YAML/JSON and executed by `CompositionExecutor`.
14//!
15//! ## Example YAML:
16//! ```yaml
17//! type: sequence
18//! steps:
19//!   - type: call
20//!     tool: read_file
21//!     args:
22//!       path: /tmp/input.txt
23//!   - type: parallel
24//!     branches:
25//!       - type: call
26//!         tool: process_a
27//!         args: {}
28//!       - type: call
29//!         tool: process_b
30//!         args: {}
31//!     wait: all
32//! ```
33
34use crate::agent::core::tools::{Tool, ToolError, ToolResult};
35use async_trait::async_trait;
36use futures::future::join_all;
37use serde_json::Value;
38use std::sync::Arc;
39use std::time::Duration;
40use tokio::time::sleep;
41
42// New expression DSL modules
43/// Condition predicates for workflow branching
44pub mod condition;
45/// Execution context and state management
46pub mod context;
47/// Workflow executor implementation
48pub mod executor;
49/// Tool expression AST types
50pub mod expr;
51/// Parallel execution strategies
52pub mod parallel;
53
54// Re-export new DSL types
55pub use condition::Condition;
56pub use context::ExecutionContext;
57pub use executor::CompositionExecutor;
58pub use expr::{CompositionError, ToolExpr};
59pub use parallel::ParallelWait;
60
61/// Result of a composition execution
62///
63/// Contains the final tool result, success status, and updated execution context
64/// after running a composition workflow.
65#[derive(Debug, Clone)]
66pub struct CompositionResult {
67    /// Whether the composition completed successfully
68    pub success: bool,
69
70    /// The final result from the last executed tool
71    pub result: ToolResult,
72
73    /// The execution context with accumulated variables and state
74    pub context: ExecutionContext,
75}
76
77/// Trait for composable tool operations
78///
79/// This trait defines the interface for all composition primitives that can be
80/// combined to build complex tool workflows. Compositions can be nested and
81/// combined to create sophisticated execution patterns.
82///
83/// # Implementations
84///
85/// - `Sequence`: Execute tools in order, passing results between them
86/// - `Parallel`: Execute tools concurrently
87/// - `Choice`: Conditional execution based on a predicate
88/// - `Retry`: Retry execution with exponential backoff
89/// - `ToolComposition`: Wrap a single tool as a composition
90/// - `Map`: Transform results with a function
91///
92/// # Example
93///
94/// ```ignore
95/// use bamboo_agent::agent::core::composition::{Sequence, Parallel, ToolComposition};
96///
97/// let workflow = Sequence::builder()
98///     .step(ToolComposition::new(tool1, args))
99///     .step(Parallel::builder()
100///         .branch(ToolComposition::new(tool2, args2))
101///         .branch(ToolComposition::new(tool3, args3))
102///         .build())
103///     .build();
104///
105/// let result = workflow.execute(ctx).await?;
106/// ```
107#[async_trait]
108pub trait Composition: Send + Sync {
109    /// Executes the composition with the given context
110    ///
111    /// # Arguments
112    ///
113    /// * `ctx` - Execution context containing variables and state
114    ///
115    /// # Returns
116    ///
117    /// The composition result with success status, final result, and updated context
118    async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError>;
119}
120
121/// Sequence composition - executes tools in order
122///
123/// Runs a series of compositions sequentially, passing the execution context
124/// from one step to the next. Stops early if any step fails.
125///
126/// # Example
127///
128/// ```ignore
129/// let sequence = Sequence::builder()
130///     .step(read_file_composition)
131///     .step(process_composition)
132///     .step(write_file_composition)
133///     .build();
134/// ```
135pub struct Sequence {
136    steps: Vec<Box<dyn Composition>>,
137}
138
139impl Sequence {
140    /// Creates a new sequence with the given steps
141    pub fn new(steps: Vec<Box<dyn Composition>>) -> Self {
142        Self { steps }
143    }
144
145    /// Creates a new sequence builder
146    pub fn builder() -> SequenceBuilder {
147        SequenceBuilder::new()
148    }
149}
150
151#[async_trait]
152impl Composition for Sequence {
153    async fn execute(&self, mut ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
154        let mut last_result = ToolResult {
155            success: true,
156            result: String::new(),
157            display_preference: None,
158        };
159
160        for step in &self.steps {
161            let result = step.execute(ctx.clone()).await?;
162            ctx = result.context;
163            last_result = result.result;
164
165            if !last_result.success {
166                return Ok(CompositionResult {
167                    success: false,
168                    result: last_result,
169                    context: ctx,
170                });
171            }
172        }
173
174        Ok(CompositionResult {
175            success: true,
176            result: last_result,
177            context: ctx,
178        })
179    }
180}
181
182/// Builder for creating sequence compositions
183pub struct SequenceBuilder {
184    steps: Vec<Box<dyn Composition>>,
185}
186
187impl SequenceBuilder {
188    /// Creates a new empty builder
189    pub fn new() -> Self {
190        Self { steps: Vec::new() }
191    }
192
193    /// Adds a step to the sequence
194    pub fn step(mut self, composition: impl Composition + 'static) -> Self {
195        self.steps.push(Box::new(composition));
196        self
197    }
198
199    /// Builds the final sequence
200    pub fn build(self) -> Sequence {
201        Sequence::new(self.steps)
202    }
203}
204
205impl Default for SequenceBuilder {
206    fn default() -> Self {
207        Self::new()
208    }
209}
210
211/// Parallel composition - executes tools concurrently
212///
213/// Runs multiple compositions in parallel using `futures::future::join_all`.
214/// All branches execute simultaneously and results are collected.
215///
216/// # Example
217///
218/// ```ignore
219/// let parallel = Parallel::builder()
220///     .branch(analyze_file_composition)
221///     .branch(run_tests_composition)
222///     .branch(check_lint_composition)
223///     .build();
224/// ```
225pub struct Parallel {
226    branches: Vec<Box<dyn Composition>>,
227}
228
229impl Parallel {
230    /// Creates a new parallel composition with the given branches
231    pub fn new(branches: Vec<Box<dyn Composition>>) -> Self {
232        Self { branches }
233    }
234
235    /// Creates a new parallel builder
236    pub fn builder() -> ParallelBuilder {
237        ParallelBuilder::new()
238    }
239}
240
241#[async_trait]
242impl Composition for Parallel {
243    async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
244        let futures: Vec<_> = self
245            .branches
246            .iter()
247            .map(|branch| branch.execute(ctx.clone()))
248            .collect();
249
250        let results = join_all(futures).await;
251
252        let mut all_success = true;
253        let mut combined_results = Vec::new();
254
255        for result in results {
256            match result {
257                Ok(comp_result) => {
258                    combined_results.push(comp_result.result.result);
259                    if !comp_result.success {
260                        all_success = false;
261                    }
262                }
263                Err(e) => {
264                    return Err(e);
265                }
266            }
267        }
268
269        Ok(CompositionResult {
270            success: all_success,
271            result: ToolResult {
272                success: all_success,
273                result: serde_json::to_string(&combined_results).unwrap_or_default(),
274                display_preference: None,
275            },
276            context: ctx,
277        })
278    }
279}
280
281/// Builder for creating parallel compositions
282pub struct ParallelBuilder {
283    branches: Vec<Box<dyn Composition>>,
284}
285
286impl ParallelBuilder {
287    /// Creates a new empty builder
288    pub fn new() -> Self {
289        Self {
290            branches: Vec::new(),
291        }
292    }
293
294    /// Adds a branch to execute in parallel
295    pub fn branch(mut self, composition: impl Composition + 'static) -> Self {
296        self.branches.push(Box::new(composition));
297        self
298    }
299
300    /// Builds the final parallel composition
301    pub fn build(self) -> Parallel {
302        Parallel::new(self.branches)
303    }
304}
305
306impl Default for ParallelBuilder {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312/// Choice composition - conditional execution
313///
314/// Executes one of two branches based on a predicate function.
315/// If the predicate returns true, the `if_true` branch executes;
316/// otherwise, the `if_false` branch executes (if provided).
317///
318/// # Example
319///
320/// ```ignore
321/// let choice = Choice::new(
322///     |ctx| ctx.get_variable("dry_run").is_some(),
323///     dry_run_composition,
324/// ).with_else(real_execution_composition);
325/// ```
326pub struct Choice {
327    /// Predicate function that determines which branch to execute
328    predicate: Arc<dyn Fn(&ExecutionContext) -> bool + Send + Sync>,
329
330    /// Branch to execute if predicate returns true
331    if_true: Box<dyn Composition>,
332
333    /// Optional branch to execute if predicate returns false
334    if_false: Option<Box<dyn Composition>>,
335}
336
337impl Choice {
338    /// Creates a new choice composition
339    ///
340    /// # Arguments
341    ///
342    /// * `predicate` - Function that evaluates the context to choose a branch
343    /// * `if_true` - Composition to execute if predicate returns true
344    pub fn new(
345        predicate: impl Fn(&ExecutionContext) -> bool + Send + Sync + 'static,
346        if_true: impl Composition + 'static,
347    ) -> Self {
348        Self {
349            predicate: Arc::new(predicate),
350            if_true: Box::new(if_true),
351            if_false: None,
352        }
353    }
354
355    /// Adds an else branch to execute if the predicate returns false
356    pub fn with_else(mut self, if_false: impl Composition + 'static) -> Self {
357        self.if_false = Some(Box::new(if_false));
358        self
359    }
360}
361
362#[async_trait]
363impl Composition for Choice {
364    async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
365        if (self.predicate)(&ctx) {
366            self.if_true.execute(ctx).await
367        } else if let Some(ref else_branch) = self.if_false {
368            else_branch.execute(ctx).await
369        } else {
370            Ok(CompositionResult {
371                success: true,
372                result: ToolResult {
373                    success: true,
374                    result: "Condition was false, no else branch".to_string(),
375                    display_preference: None,
376                },
377                context: ctx,
378            })
379        }
380    }
381}
382
383/// Retry composition - retry with backoff
384///
385/// Retries a composition up to a maximum number of attempts with
386/// configurable backoff delay between attempts.
387///
388/// # Example
389///
390/// ```ignore
391/// let retry = Retry::new(flaky_composition, 3)
392///     .with_backoff(200); // 200ms base backoff
393/// ```
394pub struct Retry {
395    /// The composition to retry
396    composition: Box<dyn Composition>,
397
398    /// Maximum number of attempts
399    max_attempts: u32,
400
401    /// Base backoff delay in milliseconds (multiplied by attempt number)
402    backoff_ms: u64,
403}
404
405impl Retry {
406    /// Creates a new retry composition
407    ///
408    /// # Arguments
409    ///
410    /// * `composition` - The composition to retry
411    /// * `max_attempts` - Maximum number of attempts (including the first)
412    pub fn new(composition: impl Composition + 'static, max_attempts: u32) -> Self {
413        Self {
414            composition: Box::new(composition),
415            max_attempts,
416            backoff_ms: 100,
417        }
418    }
419
420    /// Sets a custom backoff delay
421    ///
422    /// The actual delay is `backoff_ms * (attempt + 1)` for exponential backoff
423    pub fn with_backoff(mut self, backoff_ms: u64) -> Self {
424        self.backoff_ms = backoff_ms;
425        self
426    }
427}
428
429#[async_trait]
430impl Composition for Retry {
431    async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
432        let mut last_error = None;
433
434        for attempt in 0..self.max_attempts {
435            match self.composition.execute(ctx.clone()).await {
436                Ok(result) if result.success => return Ok(result),
437                Ok(result) => {
438                    if attempt == self.max_attempts - 1 {
439                        return Ok(result);
440                    }
441                }
442                Err(e) => {
443                    last_error = Some(e);
444                    if attempt < self.max_attempts - 1 {
445                        sleep(Duration::from_millis(
446                            self.backoff_ms * (attempt as u64 + 1),
447                        ))
448                        .await;
449                    }
450                }
451            }
452        }
453
454        Err(last_error.unwrap_or_else(|| ToolError::Execution("Max retries exceeded".to_string())))
455    }
456}
457
458/// Tool wrapper - wraps a Tool into a Composition
459///
460/// This adapter allows individual tools to be used within composition workflows.
461/// Optionally stores the result in a context variable for use by subsequent steps.
462///
463/// # Example
464///
465/// ```ignore
466/// let tool_comp = ToolComposition::new(tool, json!({"path": "/src/main.rs"}))
467///     .with_output_variable("file_content");
468/// ```
469pub struct ToolComposition {
470    /// The tool to execute
471    tool: Arc<dyn Tool>,
472
473    /// Arguments to pass to the tool
474    args: Value,
475
476    /// Optional variable name to store the result in the context
477    output_variable: Option<String>,
478}
479
480impl ToolComposition {
481    /// Creates a new tool composition
482    ///
483    /// # Arguments
484    ///
485    /// * `tool` - The tool to wrap
486    /// * `args` - Arguments to pass to the tool (JSON value)
487    pub fn new(tool: Arc<dyn Tool>, args: Value) -> Self {
488        Self {
489            tool,
490            args,
491            output_variable: None,
492        }
493    }
494
495    /// Sets the variable name to store the result in the context
496    ///
497    /// Subsequent steps can access this result via `ctx.get_variable(var_name)`
498    pub fn with_output_variable(mut self, var_name: impl Into<String>) -> Self {
499        self.output_variable = Some(var_name.into());
500        self
501    }
502}
503
504#[async_trait]
505impl Composition for ToolComposition {
506    async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
507        // Merge context variables into args
508        let mut final_args = self.args.clone();
509        if let Value::Object(ref mut map) = final_args {
510            for (key, value) in &ctx.variables {
511                if !map.contains_key(key) {
512                    map.insert(key.clone(), value.clone());
513                }
514            }
515        }
516
517        let result = self.tool.execute(final_args).await?;
518        let success = result.success;
519
520        let mut new_ctx = ctx;
521        if let Some(ref var_name) = self.output_variable {
522            new_ctx.set_variable(
523                var_name.clone(),
524                serde_json::to_value(&result).unwrap_or_default(),
525            );
526        }
527        new_ctx.last_result = Some(result.clone());
528
529        Ok(CompositionResult {
530            success,
531            result,
532            context: new_ctx,
533        })
534    }
535}
536
537/// Map composition - transform results
538///
539/// Applies a transformation function to the result of a composition,
540/// allowing post-processing of tool outputs.
541///
542/// # Example
543///
544/// ```ignore
545/// let map = Map::new(
546///     read_file_composition,
547///     |result| {
548///         ToolResult {
549///             success: result.success,
550///             result: result.result.to_uppercase(),
551///             display_preference: None,
552///         }
553///     },
554/// );
555/// ```
556pub struct Map {
557    /// The composition to transform
558    composition: Box<dyn Composition>,
559
560    /// Transformation function to apply to the result
561    transform: Arc<dyn Fn(ToolResult) -> ToolResult + Send + Sync>,
562}
563
564impl Map {
565    /// Creates a new map composition
566    ///
567    /// # Arguments
568    ///
569    /// * `composition` - The composition to transform
570    /// * `transform` - Function to apply to the result
571    pub fn new(
572        composition: impl Composition + 'static,
573        transform: impl Fn(ToolResult) -> ToolResult + Send + Sync + 'static,
574    ) -> Self {
575        Self {
576            composition: Box::new(composition),
577            transform: Arc::new(transform),
578        }
579    }
580}
581
582#[async_trait]
583impl Composition for Map {
584    async fn execute(&self, ctx: ExecutionContext) -> Result<CompositionResult, ToolError> {
585        let result = self.composition.execute(ctx).await?;
586        let transformed = (self.transform)(result.result);
587        let success = transformed.success;
588
589        Ok(CompositionResult {
590            success,
591            result: transformed,
592            context: result.context,
593        })
594    }
595}
596
597#[cfg(test)]
598mod tests;