pub struct Flow<G>{ /* private fields */ }
Expand description
A Flow provided a interface to run Component’s in a defined order.
That order is defined by the Inputs and Outputs port’s of the component’s and the Connection’s between the Component’s.
The image bellow show the logic of Flow execution and when each Component will run.
The Flow run in cicles. In each cicle a Set of Component’s execute
the run
function defined in trait ComponentRunnable
The image shows a Flow that have 10 componets with 3 differents types: Red, Green and Blue, and:
- Red components have only a Output port, wihtout Inputs
- Green components have only a Input port, wihtout Outputs
- Blue components have one Input and one Output port
For the next explication, we be consider that:
- When Red
run
a Package is sent to your Output port. - When Green
run
consume all Package’s sended to your Input port. - When Blue
run
consume all Package’s sended to your Input port and send a Package to your Output port.
In the First cicle the Component’s 1
and 2
will the run. In fact every Component
without a Input ports will executed once in the first cicle.
Note that 1
have two Connection’s (to 3
and 5
) and each one recieve a copy of the Package sent.
In the Second cicle the Component’s 3
and 4
will run, because both recieve a
Package in your Input port.
Note that 5
also recieve a Package but he is defined like Type::Eager,
for that he is waiting for 4
to execute.
In the Third cicle th Component’s 5
and 8
run, because both have Packages in your
Input port (5
sended by 1
and 4
, 8
sended by 3
), in this case 5
will run
because 1
,2
and 4
already run.
This logic will be repeated until there are no more components that can be executed. (read Type and Next).
Note that 8
will execute a second (in 5º cicle) time after recive a Package from 7
use tokio_test;
use rs_flow::prelude::*;
struct Total {
value: f64
}
#[inputs]
#[outputs { data }]
struct One;
#[async_trait]
impl ComponentRunnable for One {
type Global = Total;
async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
ctx.send(self.output("data"), 1.into());
Ok(Next::Continue)
}
}
#[inputs { a , b }]
#[outputs]
struct Sum;
#[async_trait]
impl ComponentRunnable for Sum {
type Global = Total;
async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
let a = ctx.receive(self.input("a")).unwrap().get_number()?;
let b = ctx.receive(self.input("b")).unwrap().get_number()?;
ctx.with_mut_global(|total| {
total.value += a + b;
})?;
Ok(Next::Continue)
}
}
tokio_test::block_on(async {
let a = Component::new(1, One);
let b = Component::new(2, One);
let sum = Component::new(3, Sum);
let connection_a = Connection::by(a.from("data"), sum.to("a"));
let connection_b = Connection::by(b.from("data"), sum.to("b"));
let total = Flow::new()
.add_component(a).unwrap()
.add_component(b).unwrap()
.add_component(sum).unwrap()
.add_connection(connection_a).unwrap()
.add_connection(connection_b).unwrap()
.run(Total { value: 0.0 }).await
.unwrap();
assert!(total.value == 2.0);
});
Implementations§
Source§impl<G> Flow<G>
impl<G> Flow<G>
Sourcepub fn add_component(self, component: Component<G>) -> Result<Self, FlowError>
pub fn add_component(self, component: Component<G>) -> Result<Self, FlowError>
Sourcepub fn add_connection(self, connection: Connection) -> Result<Self, FlowError>
pub fn add_connection(self, connection: Connection) -> Result<Self, FlowError>
Insert a Connection
§Error
- Error if Connection already exist
- Error if the this Flow not have a Component::id used in Connection
- Error if the Component’s used in Connection not have the Input/Output Port defined.
- Error if add a connection create a Loop