cortex_ai/composer/otherwise.rs
1use super::Flow;
2use crate::flow::{
3 condition::Condition,
4 processor::Processor,
5 stage::{BranchStage, Stage},
6};
7use std::error::Error;
8use std::marker::PhantomData;
9use tracing::{debug, instrument};
10
11/// A builder for constructing the alternative branch of a conditional flow.
12///
13/// # Examples
14///
15/// ```
16/// use cortex_ai::composer::Flow;
17/// use cortex_ai::flow::condition::Condition;
18/// use cortex_ai::flow::processor::Processor;
19/// use cortex_ai::FlowComponent;
20/// use cortex_ai::FlowError;
21/// use std::error::Error;
22/// use std::fmt;
23/// use std::pin::Pin;
24/// use std::future::Future;
25///
26/// #[derive(Clone, Debug)]
27/// struct MyData;
28///
29/// #[derive(Clone, Debug)]
30/// struct MyError;
31///
32/// impl fmt::Display for MyError {
33/// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34/// write!(f, "MyError")
35/// }
36/// }
37///
38/// impl Error for MyError {}
39///
40/// impl From<FlowError> for MyError {
41/// fn from(e: FlowError) -> Self { MyError }
42/// }
43///
44/// struct MyProcessor;
45/// impl FlowComponent for MyProcessor {
46/// type Input = MyData;
47/// type Output = MyData;
48/// type Error = MyError;
49/// }
50///
51/// impl Processor for MyProcessor {
52/// fn process(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>> {
53/// Box::pin(async move { Ok(input) })
54/// }
55/// }
56///
57/// struct MyCondition;
58/// impl FlowComponent for MyCondition {
59/// type Input = MyData;
60/// type Output = bool;
61/// type Error = MyError;
62/// }
63///
64/// impl Condition for MyCondition {
65/// fn evaluate(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<(bool, Option<Self::Output>), Self::Error>> + Send>> {
66/// Box::pin(async move { Ok((true, Some(false))) })
67/// }
68/// }
69///
70/// let flow = Flow::<MyData, MyError, bool>::new()
71/// .when(MyCondition)
72/// .process(MyProcessor)
73/// .otherwise()
74/// .process(MyProcessor)
75/// .end();
76/// ```
77pub struct OtherwiseBuilder<DataType, OutputType, ErrorType> {
78 condition:
79 Box<dyn Condition<Input = DataType, Output = OutputType, Error = ErrorType> + Send + Sync>,
80 then_branch: Vec<Stage<DataType, ErrorType, OutputType>>,
81 else_branch: Vec<Stage<DataType, ErrorType, OutputType>>,
82 parent: Flow<DataType, ErrorType, OutputType>,
83}
84
85impl<DataType, OutputType, ErrorType> OtherwiseBuilder<DataType, OutputType, ErrorType>
86where
87 DataType: Clone + Send + Sync + 'static,
88 OutputType: Send + Sync + 'static,
89 ErrorType: Error + Send + Sync + 'static,
90{
91 /// Creates a new `OtherwiseBuilder` with the specified condition, then branch, and parent flow.
92 ///
93 /// This is an internal constructor used by the `BranchBuilder`.
94 ///
95 /// # Arguments
96 ///
97 /// * `condition` - The condition from the parent branch
98 /// * `then_branch` - The stages to execute when the condition is true
99 /// * `parent` - The parent flow this branch belongs to
100 #[instrument(skip(condition, then_branch, parent))]
101 pub(crate) fn new(
102 condition: Box<
103 dyn Condition<Input = DataType, Output = OutputType, Error = ErrorType> + Send + Sync,
104 >,
105 then_branch: Vec<Stage<DataType, ErrorType, OutputType>>,
106 parent: Flow<DataType, ErrorType, OutputType>,
107 ) -> Self {
108 debug!("Creating new otherwise builder");
109 Self {
110 condition,
111 then_branch,
112 else_branch: Vec::new(),
113 parent,
114 }
115 }
116
117 /// Adds a processor to the alternative branch that will be executed when the condition is false.
118 ///
119 /// # Arguments
120 ///
121 /// * `processor` - The processor to add to the alternative branch
122 ///
123 /// # Returns
124 ///
125 /// The builder instance for method chaining
126 ///
127 /// # Type Parameters
128 ///
129 /// * `ProcessorType` - The type of the processor being added
130 #[instrument(skip(self, processor))]
131 #[must_use]
132 pub fn process<ProcessorType>(mut self, processor: ProcessorType) -> Self
133 where
134 ProcessorType: Processor<Input = DataType, Output = DataType, Error = ErrorType>
135 + Send
136 + Sync
137 + 'static,
138 {
139 debug!("Adding processor to else branch");
140 self.else_branch.push(Stage::Process(Box::new(processor)));
141 self
142 }
143
144 /// Finalizes the branch construction and returns the parent flow.
145 ///
146 /// This method combines the condition, then branch, and else branch into a single
147 /// branch stage and adds it to the parent flow.
148 ///
149 /// # Returns
150 ///
151 /// The parent flow with the completed branch stage added
152 #[instrument(skip(self))]
153 #[must_use]
154 pub fn end(self) -> Flow<DataType, ErrorType, OutputType> {
155 debug!("Finalizing branch construction");
156 let branch_stage = Stage::Branch(Box::new(BranchStage {
157 condition: self.condition,
158 then_branch: self.then_branch,
159 else_branch: self.else_branch,
160 _marker: PhantomData,
161 }));
162
163 let mut flow = self.parent;
164 flow.stages.push(branch_stage);
165 debug!("Branch added to flow");
166 flow
167 }
168}