rs_flow/component.rs
1use async_trait::async_trait;
2
3use crate::connection::Point;
4use crate::context::Ctx;
5use crate::error::RunResult as Result;
6use crate::ports::{Inputs, Outputs};
7
8/// Define if next cicle of [Flow](crate::flow::Flow) will be executed
9///
10/// - If any component return <code> Ok([Next::Break]) </code> flow run will be interrupted and return Ok(Global)
11/// - If all component return <code> Ok([Next::Continue]) </code> flow continue to run for a more cicle
12/// - If any component return <code> Err(_) </code>, flow will be interrupted and return that Error
13///
14#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
15pub enum Next {
16 #[default]
17 Continue,
18 Break
19}
20
21
22///
23/// Define when a [Component] is prepared to run.
24///
25/// - [`Lazy`](Type::Lazy) : Wait for at least one [Package](crate::package::Package) received at each input port.
26///
27/// - [`Eager`](Type::Eager):
28/// - Wait for at least one [Package](crate::package::Package) received at each input port.
29/// - Wait for all ancestral components to run, it's means that if any
30/// ancestral of this [Component] is prepared to run, this [Component] will not run.
31///
32/// Obs: If a [Component] does not have an [Inputs](crate::ports::Inputs) port's, it will be selected
33/// as the flow's entry point, and will be executed once in the first cicle.
34///
35#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
36pub enum Type {
37 #[default]
38 Lazy,
39 Eager
40}
41
42///
43/// Id of a component
44///
45pub type Id = usize;
46
47
48///
49/// Define the function that will excuted when a [Component] run
50///
51/// Global define tha data that this component can be access,
52/// the data is like a global state of Flow that any component can be read or write
53///
54/// A [Flow](crate::flow::Flow) hava a unique [Global](ComponentRunnable::Global) type, what means that only component
55/// with the same Self::Global can be use for contruct the flow.
56///
57/// # Examples
58/// ```
59/// use rs_flow::prelude::*;
60///
61/// struct GlobalA;
62/// struct GlobalB;
63///
64/// #[inputs]
65/// #[outputs]
66/// struct ComponentA;
67///
68/// #[async_trait]
69/// impl ComponentRunnable for ComponentA {
70/// type Global = GlobalA;
71/// async fn run(&self, ctx: &mut Ctx<Self::Global>) -> Result<Next> {
72/// Ok(Next::Continue)
73/// }
74/// }
75///
76/// #[inputs]
77/// #[outputs]
78/// struct ComponentB;
79///
80/// #[async_trait]
81/// impl ComponentRunnable for ComponentB {
82/// type Global = GlobalB;
83/// async fn run(&self, ctx: &mut Ctx<Self::Global>) -> Result<Next> {
84/// Ok(Next::Continue)
85/// }
86/// }
87///
88/// let mut flow = Flow::new();
89/// flow = flow.add_component(Component::new(1, ComponentA)).unwrap();
90///
91/// // flow = flow.add_component(Component::new(2, ComponentB)).unwrap();
92/// // Will fail because ComponentA and ComponentB not have same Global
93///
94/// ```
95///
96#[async_trait]
97pub trait ComponentRunnable: Send + Sync + Inputs + Outputs + 'static {
98 type Global: Send + Sync;
99
100 async fn run(&self, ctx: &mut Ctx<Self::Global>) -> Result<Next>;
101}
102
103
104///
105/// Storage the component infos:
106/// - [Id] that indentify a component in a [Flow](crate::flow::Flow),
107/// - [Type] of component
108/// - Traits needed to run ([ComponentRunnable] + [Inputs] + [Outputs])
109///
110///
111/// ```
112/// use rs_flow::prelude::*;
113///
114/// struct G;
115///
116/// #[outputs { out1 }]
117/// #[inputs { in1, in2 }]
118/// struct A;
119///
120/// assert_eq!(A.output("out1"), 0); // first output port
121/// assert_eq!(A.input("in1"), 0); // first input port
122/// assert_eq!(A.input("in2"), 1); // second input port
123///
124/// #[async_trait]
125/// impl ComponentRunnable for A {
126/// type Global = G;
127/// async fn run(&self, ctx: &mut Ctx<Self::Global>) -> Result<Next> {
128/// return Ok(Next::Continue);
129/// }
130/// }
131/// let component1 = Component::new(1, A); // Type::Lazy
132/// let component2 = Component::eager(2, A); // Type::Eeager
133///
134/// assert_eq!(component1.ty(), Type::Lazy);
135/// assert_eq!(component2.ty(), Type::Eager);
136///
137/// let c = Connection::by(component1.from("out1"), component2.to("in1"));
138/// assert_eq!(Connection::new(1, 0, 2, 0), c);
139///
140/// ```
141pub struct Component<G> {
142 pub(crate) id: Id,
143 pub(crate) data: Box<dyn ComponentRunnable<Global = G>>,
144 pub(crate) ty: Type
145}
146
147impl<G> Component<G> {
148 /// Create a component with Type::Lazy
149 pub fn new<T>(id: Id, data: T) -> Self
150 where T: ComponentRunnable<Global = G>
151 {
152 Self { id, data: Box::new(data), ty: Type::default() }
153 }
154 /// Create a component with Type::Eager
155 pub fn eager<T>(id: Id, data: T) -> Self
156 where T: ComponentRunnable<Global = G>
157 {
158 Self { id, data: Box::new(data), ty: Type::Eager }
159 }
160
161 /// Return id of component
162 pub fn id(&self) -> Id {
163 self.id
164 }
165
166 /// Return type of component
167 pub fn ty(&self) -> Type {
168 self.ty
169 }
170
171 /// Return a output point for connection
172 ///
173 /// # Panics
174 /// Panic if could not found a output port by a label
175 pub fn from(&self, label: &'static str) -> Point {
176 Point::new(self.id, self.data.output(label))
177 }
178
179 /// Return a input point for connection
180 ///
181 /// # Panics
182 /// Panic if could not found a input port by a label
183 pub fn to(&self, label: &'static str) -> Point {
184 Point::new(self.id, self.data.input(label))
185 }
186}