Struct Flow

Source
pub struct Flow<G>
where G: Sync + Send,
{ /* private fields */ }
Expand description

A Flow provided a interface to run Component’s in a defined order.

That order is defined by the Inputs and Outputs port’s of the component’s and the Connection’s between the Component’s.

The image bellow show the logic of Flow execution and when each Component will run.

Flow Execution Logic

The Flow run in cicles. In each cicle a Set of Component’s execute the run function defined in trait ComponentRunnable

The image shows a Flow that have 10 componets with 3 differents types: Red, Green and Blue, and:

  • Red components have only a Output port, wihtout Inputs
  • Green components have only a Input port, wihtout Outputs
  • Blue components have one Input and one Output port

For the next explication, we be consider that:

  • When Red run a Package is sent to your Output port.
  • When Green run consume all Package’s sended to your Input port.
  • When Blue run consume all Package’s sended to your Input port and send a Package to your Output port.

In the First cicle the Component’s 1 and 2 will the run. In fact every Component without a Input ports will executed once in the first cicle. Note that 1 have two Connection’s (to 3 and 5) and each one recieve a copy of the Package sent.

In the Second cicle the Component’s 3 and 4 will run, because both recieve a Package in your Input port. Note that 5 also recieve a Package but he is defined like Type::Eager, for that he is waiting for 4 to execute.

In the Third cicle th Component’s 5 and 8 run, because both have Packages in your Input port (5 sended by 1 and 4, 8 sended by 3), in this case 5 will run because 1,2 and 4 already run.

This logic will be repeated until there are no more components that can be executed. (read Type and Next).

Note that 8 will execute a second (in 5º cicle) time after recive a Package from 7

use tokio_test;
use rs_flow::prelude::*;

struct Total { 
   value: f64
}

#[inputs]
#[outputs { data }]
struct One;

#[async_trait]
impl ComponentRunnable for One {
    type Global = Total;
    async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
        ctx.send(self.output("data"), 1.into());
        Ok(Next::Continue)
    }
}
 
#[inputs { a , b }]
#[outputs]
struct Sum;
 
#[async_trait]
impl ComponentRunnable for Sum {
    type Global = Total;
    async fn run(&self, ctx: &mut Ctx<Total>) -> Result<Next> {
        let a = ctx.receive(self.input("a")).unwrap().get_number()?;
        let b = ctx.receive(self.input("b")).unwrap().get_number()?;

        ctx.with_mut_global(|total| {
            total.value += a + b;
        })?;
 
        Ok(Next::Continue)
    }
}

tokio_test::block_on(async {
    let a = Component::new(1, One);
    let b = Component::new(2, One);
    let sum = Component::new(3, Sum);

    let connection_a = Connection::by(a.from("data"), sum.to("a"));
    let connection_b = Connection::by(b.from("data"), sum.to("b"));

    let total = Flow::new()
        .add_component(a).unwrap()
        .add_component(b).unwrap()
        .add_component(sum).unwrap()
        .add_connection(connection_a).unwrap()
        .add_connection(connection_b).unwrap()
        .run(Total { value: 0.0 }).await
        .unwrap();

    assert!(total.value == 2.0);
});
 

Implementations§

Source§

impl<G> Flow<G>
where G: Sync + Send + 'static,

Source

pub fn new() -> Self

Create a flow without components or connections

Source

pub fn add_component(self, component: Component<G>) -> Result<Self, FlowError>

Insert a Component

§Error

Error if the Component::id is already used

Source

pub fn add_connection(self, connection: Connection) -> Result<Self, FlowError>

Insert a Connection

§Error
Source

pub async fn run(&self, global: G) -> RunResult<G>

Run this Flow

§Error

Error if a component return a Error when run

§Panics

Panic if a component panic when run

Auto Trait Implementations§

§

impl<G> Freeze for Flow<G>

§

impl<G> !RefUnwindSafe for Flow<G>

§

impl<G> Send for Flow<G>

§

impl<G> Sync for Flow<G>

§

impl<G> Unpin for Flow<G>

§

impl<G> !UnwindSafe for Flow<G>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.