rs_flow/flow.rs
1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::component::Next;
5use crate::connection::{Connection, Connections};
6use crate::context::global::Global;
7use crate::context::Ctxs;
8use crate::error::{FlowError, Result, RunResult};
9use crate::prelude::{Component, Id};
10
11
12///
13/// A Flow provided a interface to run [Component]'s in a defined order.
14///
15/// That order is defined by the [Inputs](crate::ports::Inputs) and
16/// [Outputs](crate::ports::Outputs) port's of the component's and the
17/// [Connection]'s between the [Component]'s.
18///
19/// The image bellow show the logic of [Flow] execution and when each [Component] will run.
20///
21/// <img src="https://github.com/Adlizm/rs-flow/raw/main/assets/flow-execution.svg" alt="Flow Execution Logic"/>
22///
23/// The Flow run in cicles. In each cicle a Set of Component's execute
24/// the `run` function defined in trait [ComponentRunnable](crate::component::ComponentRunnable)
25///
26/// The image shows a Flow that have 10 componets with 3 differents types: Red, Green and Blue, and:
27/// - Red components have only a Output port, wihtout Inputs
28/// - Green components have only a Input port, wihtout Outputs
29/// - Blue components have one Input and one Output port
30///
31/// For the next explication, we be consider that:
32/// - When Red `run` a [Package](crate::package::Package) is sent to your Output port.
33/// - When Green `run` consume all [Package](crate::package::Package)'s sended to your Input port.
34/// - When Blue `run` consume all [Package](crate::package::Package)'s sended to your Input port and send a Package to your Output port.
35///
36/// In the First cicle the Component's `1` and `2` will the run. In fact every [Component]
37/// without a [Input](crate::ports::Inputs) ports will executed once in the first cicle.
38/// Note that `1` have two [Connection]'s (to `3` and `5`) and each one recieve a copy of the [Package](crate::package::Package) sent.
39///
40/// In the Second cicle the Component's `3` and `4` will run, because both recieve a
41/// [Package](crate::package::Package) in your [Input](crate::ports::Inputs) port.
42/// Note that `5` also recieve a Package but he is defined like [Type::Eager](crate::component::Type::Eager),
43/// for that he is waiting for `4` to execute.
44///
45/// In the Third cicle th Component's `5` and `8` run, because both have Packages in your
46/// Input port (`5` sended by `1` and `4`, `8` sended by `3`), in this case `5` will run
47/// because `1`,`2` and `4` already run.
48///
49/// This logic will be repeated until there are no more components that can be executed.
50/// (read [Type](crate::component::Type) and [Next]).
51///
52/// Note that `8` will execute a second (in 5ยบ cicle) time after recive a Package from `7`
53///
54///
55/// ```
56/// use tokio_test;
57/// use rs_flow::prelude::*;
58///
59/// struct Total {
60/// value: f64
61/// }
62///
63/// #[inputs]
64/// #[outputs { data }]
65/// struct One;
66///
67/// #[async_trait]
68/// impl ComponentRunnable for One {
69/// type Global = Total;
70/// async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
71/// ctx.send(self.output("data"), 1.into());
72/// Ok(Next::Continue)
73/// }
74/// }
75///
76/// #[inputs { a , b }]
77/// #[outputs]
78/// struct Sum;
79///
80/// #[async_trait]
81/// impl ComponentRunnable for Sum {
82/// type Global = Total;
83/// async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
84/// let a = ctx.receive(self.input("a")).unwrap().get_number()?;
85/// let b = ctx.receive(self.input("b")).unwrap().get_number()?;
86///
87/// ctx.with_mut_global(|total| {
88/// total.value += a + b;
89/// })?;
90///
91/// Ok(Next::Continue)
92/// }
93/// }
94///
95/// tokio_test::block_on(async {
96/// let a = Component::new(1, One);
97/// let b = Component::new(2, One);
98/// let sum = Component::new(3, Sum);
99///
100/// let connection_a = Connection::by(a.from("data"), sum.to("a"));
101/// let connection_b = Connection::by(b.from("data"), sum.to("b"));
102///
103/// let total = Flow::new()
104/// .add_component(a).unwrap()
105/// .add_component(b).unwrap()
106/// .add_component(sum).unwrap()
107/// .add_connection(connection_a).unwrap()
108/// .add_connection(connection_b).unwrap()
109/// .run(Total { value: 0.0 }).await
110/// .unwrap();
111///
112/// assert!(total.value == 2.0);
113/// });
114///
115/// ```
116///
117pub struct Flow<G>
118 where G: Sync + Send
119{
120 components: HashMap<Id, Component<G>>,
121 connections: Connections,
122}
123
124
125impl<G> Flow<G>
126 where G: Sync + Send + 'static
127{
128 /// Create a flow without components or connections
129 pub fn new() -> Self {
130 Self {
131 components: HashMap::new(),
132 connections: Connections::new(),
133 }
134 }
135
136 /// Insert a [Component]
137 ///
138 /// # Error
139 ///
140 /// Error if the [Component::id] is already used
141 pub fn add_component(mut self, component: Component<G>) -> Result<Self> {
142 if self.components.contains_key(&component.id) {
143 return Err(FlowError::ComponentAlreadyExist { id: component.id }.into());
144 }
145 self.components.insert(component.id ,component);
146 Ok(self)
147 }
148
149 /// Insert a [Connection]
150 ///
151 /// # Error
152 ///
153 /// - Error if [Connection] already exist
154 /// - Error if the this [Flow] not have a [Component::id] used in [Connection]
155 /// - Error if the [Component]'s used in [Connection] not have the Input/Output [Port](crate::ports::Port) defined.
156 /// - Error if add a connection create a Loop
157 pub fn add_connection(mut self, connection: Connection) -> Result<Self> {
158 if let Some(component) = self.components.get(&connection.from) {
159 if !component.data.outputs().contains(connection.out_port)
160 {
161 return Err(FlowError::OutPortNotFound {
162 component: connection.from,
163 out_port: connection.out_port,
164 }
165 .into());
166 }
167 } else {
168 return Err(FlowError::ComponentNotFound {
169 id: connection.from,
170 }
171 .into());
172 }
173
174 if let Some(component) = self.components.get(&connection.to){
175 if !component.data.inputs().contains(connection.in_port)
176 {
177 return Err(FlowError::InPortNotFound {
178 component: connection.from,
179 in_port: connection.in_port,
180 }
181 .into());
182 }
183 } else {
184 return Err(FlowError::ComponentNotFound { id: connection.to }.into());
185 }
186
187 self.connections.add(connection)?;
188
189 Ok(self)
190 }
191
192 ///
193 /// Run this Flow
194 ///
195 /// # Error
196 ///
197 /// Error if a component return a Error when [run](crate::component::ComponentRunnable::run)
198 ///
199 /// # Panics
200 ///
201 /// Panic if a component panic when [run](crate::component::ComponentRunnable::run)
202 ///
203 pub async fn run(&self, global: G) -> RunResult<G> {
204 let global_arc = Arc::new(Global::from_data(global));
205
206 let mut contexts = Ctxs::new(&self.components, &self.connections, &global_arc);
207
208 let mut ready_components = contexts.entry_points();
209 let mut first = true;
210
211 while !ready_components.is_empty() {
212 let mut futures = Vec::with_capacity(ready_components.len());
213
214 for id in ready_components {
215 let mut ctx = contexts.borrow(id)
216 .expect("Ready component never return ids that not exist");
217
218 ctx.consumed = false;
219
220 let component = self.components.get(&id)
221 .expect("Ready component never return ids that not exist");
222
223 futures.push(async move {
224 component.data.run(&mut ctx).await
225 .map(|next| (ctx, next))
226 });
227 }
228
229 let results = futures::future::try_join_all(futures).await?;
230 if results.iter().any(|(_, next)| next == &Next::Break) {
231 break;
232 }
233
234 for (ctx, _) in results {
235 if !ctx.consumed && !first { // entry points not have inputs to consume
236 return Err(Box::new(FlowError::AnyPackageConsumed { component: ctx.id }));
237 }
238 contexts.give_back(ctx);
239 }
240
241 contexts.refresh_queues();
242
243 ready_components = contexts.ready_components(&self.connections);
244
245 first = false;
246 }
247
248 drop(contexts);
249
250 let global = Arc::try_unwrap(global_arc)
251 .expect("Global have multiples owners, but contexts already drop")
252 .take();
253 Ok(global)
254 }
255}