cortex_ai/composer/
branch.rs

1use super::{Flow, OtherwiseBuilder};
2use crate::flow::{condition::Condition, processor::Processor, stage::Stage};
3use std::error::Error;
4use tracing::{debug, instrument};
5
6/// A builder for constructing conditional branches in a flow.
7///
8/// # Examples
9///
10/// ```
11/// use cortex_ai::composer::Flow;
12/// use cortex_ai::flow::condition::Condition;
13/// use cortex_ai::flow::processor::Processor;
14/// use cortex_ai::FlowComponent;
15/// use cortex_ai::FlowError;
16/// use std::error::Error;
17/// use std::fmt;
18/// use std::pin::Pin;
19/// use std::future::Future;
20///
21/// #[derive(Clone, Debug)]
22/// struct MyData;
23///
24/// #[derive(Clone, Debug)]
25/// struct MyError;
26///
27/// impl fmt::Display for MyError {
28///     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29///         write!(f, "MyError")
30///     }
31/// }
32///
33/// impl Error for MyError {}
34///
35/// impl From<FlowError> for MyError {
36///     fn from(e: FlowError) -> Self { MyError }
37/// }
38///
39/// struct MyProcessor;
40/// impl FlowComponent for MyProcessor {
41///     type Input = MyData;
42///     type Output = MyData;
43///     type Error = MyError;
44/// }
45///
46/// impl Processor for MyProcessor {
47///     fn process(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>> {
48///         Box::pin(async move { Ok(input) })
49///     }
50/// }
51///
52/// struct MyCondition;
53/// impl FlowComponent for MyCondition {
54///     type Input = MyData;
55///     type Output = bool;
56///     type Error = MyError;
57/// }
58///
59/// impl Condition for MyCondition {
60///     fn evaluate(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<(bool, Option<Self::Output>), Self::Error>> + Send>> {
61///         Box::pin(async move { Ok((true, Some(false))) })
62///     }
63/// }
64///
65/// let flow = Flow::<MyData, MyError, bool>::new();
66/// let branch = flow
67///     .when(MyCondition)
68///     .process(MyProcessor)
69///     .otherwise();
70/// ```
71pub struct BranchBuilder<DataType, OutputType, ErrorType> {
72    condition:
73        Box<dyn Condition<Input = DataType, Output = OutputType, Error = ErrorType> + Send + Sync>,
74    then_branch: Vec<Stage<DataType, ErrorType, OutputType>>,
75    parent: Flow<DataType, ErrorType, OutputType>,
76}
77
78impl<DataType, OutputType, ErrorType> BranchBuilder<DataType, OutputType, ErrorType>
79where
80    DataType: Clone + Send + Sync + 'static,
81    OutputType: Send + Sync + 'static,
82    ErrorType: Error + Send + Sync + 'static,
83{
84    /// Creates a new `BranchBuilder` with the specified condition and parent flow.
85    ///
86    /// # Arguments
87    ///
88    /// * `condition` - A boxed condition that determines when this branch should execute
89    /// * `parent` - The parent flow this branch belongs to
90    ///
91    /// # Returns
92    ///
93    /// A new instance of `BranchBuilder`
94    #[instrument(skip(condition, parent))]
95    #[must_use]
96    pub fn new(
97        condition: Box<
98            dyn Condition<Input = DataType, Output = OutputType, Error = ErrorType> + Send + Sync,
99        >,
100        parent: Flow<DataType, ErrorType, OutputType>,
101    ) -> Self {
102        debug!("Creating new branch builder");
103        Self {
104            condition,
105            then_branch: Vec::new(),
106            parent,
107        }
108    }
109
110    /// Adds a processor to the branch that will be executed when the condition is true.
111    ///
112    /// # Arguments
113    ///
114    /// * `processor` - The processor to add to the branch
115    ///
116    /// # Returns
117    ///
118    /// The builder instance for method chaining
119    ///
120    /// # Type Parameters
121    ///
122    /// * `ProcessorType` - The type of the processor being added
123    #[instrument(skip(self, processor))]
124    #[must_use]
125    pub fn process<ProcessorType>(mut self, processor: ProcessorType) -> Self
126    where
127        ProcessorType: Processor<Input = DataType, Output = DataType, Error = ErrorType>
128            + Send
129            + Sync
130            + 'static,
131    {
132        debug!("Adding processor to then branch");
133        self.then_branch.push(Stage::Process(Box::new(processor)));
134        self
135    }
136
137    /// Transitions to building the alternative branch that will be executed when the condition is false.
138    ///
139    /// # Returns
140    ///
141    /// An `OtherwiseBuilder` for constructing the alternative branch
142    #[instrument(skip(self))]
143    #[must_use]
144    pub fn otherwise(self) -> OtherwiseBuilder<DataType, OutputType, ErrorType> {
145        debug!("Creating otherwise builder");
146        OtherwiseBuilder::new(self.condition, self.then_branch, self.parent)
147    }
148}