rs_flow/
flow.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::component::Next;
5use crate::connection::{Connection, Connections};
6use crate::context::global::Global;
7use crate::context::Ctxs;
8use crate::error::{FlowError, Result, RunResult};
9use crate::prelude::{Component, Id};
10
11
12///
13/// A Flow provided a interface to run [Component]'s in a defined order.
14/// 
15/// That order is defined by the [Inputs](crate::ports::Inputs) and 
16/// [Outputs](crate::ports::Outputs) port's of the component's and the 
17/// [Connection]'s between the [Component]'s.
18/// 
19/// The image bellow show the logic of [Flow] execution and when each [Component] will run.
20/// 
21/// <img src="https://github.com/Adlizm/rs-flow/raw/main/assets/flow-execution.svg" alt="Flow Execution Logic"/>
22/// 
23/// The Flow run in cicles. In each cicle a Set of Component's execute
24/// the `run` function defined in trait [ComponentRunnable](crate::component::ComponentRunnable)
25/// 
26/// The image shows a Flow that have 10 componets with 3 differents types: Red, Green and Blue, and:
27///  - Red components have only a Output port, wihtout Inputs
28///  - Green components have only a Input port, wihtout Outputs
29///  - Blue components have one Input and one Output port
30/// 
31/// For the next explication, we be consider that:
32///  - When Red `run` a [Package](crate::package::Package) is sent to your Output port.
33///  - When Green `run` consume all [Package](crate::package::Package)'s sended to your Input port.
34///  - When Blue `run` consume all [Package](crate::package::Package)'s sended to your Input port and send a Package to your Output port.
35/// 
36/// In the First cicle the Component's `1` and `2` will the run. In fact every [Component]
37/// without a [Input](crate::ports::Inputs) ports will executed once in the first cicle. 
38/// Note that `1` have two [Connection]'s (to `3` and `5`) and each one recieve a copy of the [Package](crate::package::Package) sent.  
39/// 
40/// In the Second cicle the Component's `3` and `4` will run, because both recieve a 
41/// [Package](crate::package::Package) in your [Input](crate::ports::Inputs) port.
42/// Note that `5` also recieve a Package but he is defined like [Type::Eager](crate::component::Type::Eager),
43/// for that he is waiting for `4` to execute.
44/// 
45/// In the Third cicle th Component's `5` and `8` run, because both have Packages in your
46/// Input port (`5` sended by `1` and `4`, `8` sended by `3`), in this case `5` will run
47/// because `1`,`2` and `4` already run. 
48/// 
49/// This logic will be repeated until there are no more components that can be executed.
50/// (read [Type](crate::component::Type) and [Next]). 
51/// 
52/// Note that `8` will execute a second (in 5ยบ cicle) time after recive a Package from `7`
53/// 
54/// 
55/// ```
56/// use tokio_test;
57/// use rs_flow::prelude::*;
58///
59/// struct Total { 
60///    value: f64
61/// }
62///
63/// #[inputs]
64/// #[outputs { data }]
65/// struct One;
66///
67/// #[async_trait]
68/// impl ComponentRunnable for One {
69///     type Global = Total;
70///     async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
71///         ctx.send(self.output("data"), 1.into());
72///         Ok(Next::Continue)
73///     }
74/// }
75/// 
76/// #[inputs { a , b }]
77/// #[outputs]
78/// struct Sum;
79/// 
80/// #[async_trait]
81/// impl ComponentRunnable for Sum {
82///     type Global = Total;
83///     async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
84///         let a = ctx.receive(self.input("a")).unwrap().get_number()?;
85///         let b = ctx.receive(self.input("b")).unwrap().get_number()?;
86///
87///         ctx.with_mut_global(|total| {
88///             total.value += a + b;
89///         })?;
90/// 
91///         Ok(Next::Continue)
92///     }
93/// }
94///
95/// tokio_test::block_on(async {
96///     let a = Component::new(1, One);
97///     let b = Component::new(2, One);
98///     let sum = Component::new(3, Sum);
99///
100///     let connection_a = Connection::by(a.from("data"), sum.to("a"));
101///     let connection_b = Connection::by(b.from("data"), sum.to("b"));
102///
103///     let total = Flow::new()
104///         .add_component(a).unwrap()
105///         .add_component(b).unwrap()
106///         .add_component(sum).unwrap()
107///         .add_connection(connection_a).unwrap()
108///         .add_connection(connection_b).unwrap()
109///         .run(Total { value: 0.0 }).await
110///         .unwrap();
111///
112///     assert!(total.value == 2.0);
113/// });
114/// 
115/// ```
116/// 
117pub struct Flow<G> 
118    where G: Sync + Send
119{
120    components: HashMap<Id, Component<G>>,
121    connections: Connections,
122}
123
124
125impl<G> Flow<G> 
126    where G: Sync + Send + 'static
127{
128    /// Create a flow without components or connections
129    pub fn new() -> Self {
130        Self {
131            components: HashMap::new(),
132            connections: Connections::new(),
133        }
134    }
135
136    /// Insert a [Component]
137    /// 
138    /// # Error
139    /// 
140    /// Error if the [Component::id] is already used
141    pub fn add_component(mut self, component: Component<G>) -> Result<Self> {
142        if self.components.contains_key(&component.id) {
143            return Err(FlowError::ComponentAlreadyExist { id: component.id }.into());
144        }
145        self.components.insert(component.id ,component);
146        Ok(self)
147    }
148
149    /// Insert a [Connection]
150    /// 
151    /// # Error
152    /// 
153    /// - Error if [Connection] already exist
154    /// - Error if the this [Flow] not have a [Component::id] used in [Connection]
155    /// - Error if the [Component]'s used in [Connection] not have the Input/Output [Port](crate::ports::Port) defined.
156    /// - Error if add a connection create a Loop 
157    pub fn add_connection(mut self, connection: Connection) -> Result<Self> {
158        if let Some(component) = self.components.get(&connection.from) {
159            if !component.data.outputs().contains(connection.out_port)
160            {
161                return Err(FlowError::OutPortNotFound {
162                    component: connection.from,
163                    out_port: connection.out_port,
164                }
165                .into());
166            }
167        } else {
168            return Err(FlowError::ComponentNotFound {
169                id: connection.from,
170            }
171            .into());
172        }
173
174        if let Some(component) = self.components.get(&connection.to){
175            if !component.data.inputs().contains(connection.in_port)
176            {
177                return Err(FlowError::InPortNotFound {
178                    component: connection.from,
179                    in_port: connection.in_port,
180                }
181                .into());
182            }
183        } else {
184            return Err(FlowError::ComponentNotFound { id: connection.to }.into());
185        }
186
187        self.connections.add(connection)?;
188
189        Ok(self)
190    }
191
192    /// 
193    /// Run this Flow 
194    /// 
195    /// # Error
196    /// 
197    /// Error if a component return a Error when [run](crate::component::ComponentRunnable::run)
198    /// 
199    /// # Panics
200    /// 
201    /// Panic if a component panic when [run](crate::component::ComponentRunnable::run)
202    /// 
203    pub async fn run(&self, global: G) -> RunResult<G> {
204        let global_arc = Arc::new(Global::from_data(global));
205        
206        let mut contexts = Ctxs::new(&self.components, &self.connections, &global_arc);
207
208        let mut ready_components = contexts.entry_points();
209        let mut first = true;
210
211        while !ready_components.is_empty() {
212            let mut futures = Vec::with_capacity(ready_components.len());
213
214            for id in ready_components {
215                let mut ctx = contexts.borrow(id)
216                    .expect("Ready component never return ids that not exist");
217
218                ctx.consumed = false;
219
220                let component = self.components.get(&id)
221                    .expect("Ready component never return ids that not exist");
222
223                futures.push(async move {
224                    component.data.run(&mut ctx).await
225                        .map(|next| (ctx, next))
226                });
227            }
228
229            let results = futures::future::try_join_all(futures).await?;
230            if results.iter().any(|(_, next)| next == &Next::Break) {
231                break;
232            }
233
234            for (ctx, _) in results {
235                if !ctx.consumed && !first { // entry points not have inputs to consume
236                    return Err(Box::new(FlowError::AnyPackageConsumed { component: ctx.id }));
237                }
238                contexts.give_back(ctx);
239            }
240
241            contexts.refresh_queues();
242
243            ready_components = contexts.ready_components(&self.connections);
244
245            first = false;
246        }
247        
248        drop(contexts);
249        
250        let global = Arc::try_unwrap(global_arc)
251            .expect("Global have multiples owners, but contexts already drop")
252            .take();
253        Ok(global)
254    }
255}