zenoh_flow/traits.rs
1//
2// Copyright (c) 2021 - 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use crate::prelude::{Inputs, Outputs};
16use crate::types::{Configuration, Context};
17use crate::Result;
18
19use async_trait::async_trait;
20use std::any::Any;
21
22/// The `SendSyncAny` trait allows Zenoh-Flow to send data between nodes running in the same process
23/// without serializing.
24///
25/// This trait is implemented for any type that has the `static` lifetime and implements `Send` and
26/// `Sync`. These constraints are the same than for the typed `Input` and `Output` which means that
27/// there is absolutely no need to manually implement it.
28pub trait SendSyncAny: Send + Sync {
29 fn as_any(&self) -> &dyn Any;
30
31 fn as_mut_any(&mut self) -> &mut dyn Any;
32}
33
34impl<T: 'static + Send + Sync> SendSyncAny for T {
35 fn as_any(&self) -> &dyn Any {
36 self
37 }
38
39 fn as_mut_any(&mut self) -> &mut dyn Any {
40 self
41 }
42}
43
44/// The `Source` trait represents a Source of data in Zenoh Flow. Sources only possess `Outputs` and
45/// their purpose is to fetch data from the external world.
46///
47/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
48/// state and to mutate it, the interior mutability pattern is necessary.
49///
50/// A struct implementing the Source trait typically needs to keep a reference to the `Output` it
51/// needs.
52///
53/// ## Example
54///
55/// ```no_run
56/// use zenoh_flow::prelude::*;
57///
58/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
59/// // the shared library.
60/// #[export_source]
61/// pub struct MySource {
62/// output: Output<usize>,
63/// // The state could go in such structure.
64/// // state: Arc<Mutex<State>>,
65/// }
66///
67/// #[async_trait::async_trait]
68/// impl Source for MySource {
69/// async fn new(
70/// _context: Context,
71/// _configuration: Option<Configuration>,
72/// mut outputs: Outputs,
73/// ) -> Result<Self> {
74/// let output = outputs
75/// .take("out")
76/// .expect("No output called 'out' found")
77/// .typed(|buffer, data| todo!("Provide your serializer here"));
78///
79/// Ok(Self { output })
80/// }
81/// }
82///
83/// #[async_trait::async_trait]
84/// impl Node for MySource {
85/// async fn iteration(&self) -> Result<()> {
86/// // To mutate the state, first lock it.
87/// //
88/// // let state = self.state.lock().await;
89/// //
90/// // The state is a way for the Source to read information from the external world, i.e.,
91/// // interacting with I/O devices. We mimick an asynchronous iteraction with a sleep.
92/// async_std::task::sleep(std::time::Duration::from_secs(1)).await;
93///
94/// // self.output.send(10usize, None).await?;
95/// Ok(())
96/// }
97/// }
98/// ```
99#[async_trait]
100pub trait Source: Node + Send + Sync {
101 /// For a `Context`, a `Configuration` and a set of `Outputs`, produce a new *Source*.
102 ///
103 /// Sources only possess `Outputs` and their purpose is to fetch data from the external world.
104 ///
105 /// Sources are **started last** when initiating a data flow. This is to prevent data loss: if a
106 /// Source is started before its downstream nodes then the data it would send before said
107 /// downstream nodes are up would be lost.
108 async fn new(
109 context: Context,
110 configuration: Option<Configuration>,
111 outputs: Outputs,
112 ) -> Result<Self>
113 where
114 Self: Sized;
115}
116
117/// The `Sink` trait represents a Sink of data in Zenoh Flow.
118///
119/// Sinks only possess `Inputs`, their objective is to send the result of the computations to the
120/// external world.
121///
122/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
123/// state and to mutate it, the interior mutability pattern is necessary.
124///
125/// A struct implementing the Sink trait typically needs to keep a reference to the `Input` it
126/// needs.
127///
128/// ## Example
129///
130/// ```no_run
131/// use async_trait::async_trait;
132/// use zenoh_flow::prelude::*;
133///
134/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
135/// // the shared library.
136/// #[export_sink]
137/// struct GenericSink {
138/// input: Input<usize>,
139/// }
140///
141/// #[async_trait]
142/// impl Sink for GenericSink {
143/// async fn new(
144/// _context: Context,
145/// _configuration: Option<Configuration>,
146/// mut inputs: Inputs,
147/// ) -> Result<Self> {
148/// let input = inputs
149/// .take("in")
150/// .expect("No input called 'in' found")
151/// .typed(|bytes| todo!("Provide your deserializer here"));
152///
153/// Ok(GenericSink { input })
154/// }
155/// }
156///
157/// #[async_trait]
158/// impl Node for GenericSink {
159/// async fn iteration(&self) -> Result<()> {
160/// let (message, _timestamp) = self.input.recv().await?;
161/// match message {
162/// Message::Data(t) => println!("{}", *t),
163/// Message::Watermark => println!("Watermark"),
164/// }
165///
166/// Ok(())
167/// }
168/// }
169/// ```
170#[async_trait]
171pub trait Sink: Node + Send + Sync {
172 /// For a `Context`, a `Configuration` and a set of `Inputs`, produce a new **Sink**.
173 ///
174 /// Sinks only possess `Inputs`, their objective is to send the result of the computations to the
175 /// external world.
176 ///
177 /// Sinks are **started first** when initiating a data flow. As they are at the end of the chain of
178 /// computations, by starting them first we ensure that no data is lost.
179 async fn new(
180 context: Context,
181 configuration: Option<Configuration>,
182 inputs: Inputs,
183 ) -> Result<Self>
184 where
185 Self: Sized;
186}
187
188/// The `Operator` trait represents an Operator inside Zenoh-Flow.
189///
190/// Operators are at the heart of a data flow, they carry out computations on the data they receive
191/// before sending them out to the next downstream node.
192///
193/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
194/// state and to mutate it, the interior mutability pattern is necessary.
195///
196/// A struct implementing the Operator trait typically needs to keep a reference to the `Input` and
197/// `Output` it needs.
198///
199/// ## Example
200///
201/// ```no_run
202/// use async_trait::async_trait;
203/// use zenoh_flow::prelude::*;
204///
205/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
206/// // the shared library.
207/// #[export_operator]
208/// struct NoOp {
209/// input: Input<usize>,
210/// output: Output<usize>,
211/// }
212///
213/// #[async_trait]
214/// impl Operator for NoOp {
215/// async fn new(
216/// _context: Context,
217/// _configuration: Option<Configuration>,
218/// mut inputs: Inputs,
219/// mut outputs: Outputs,
220/// ) -> Result<Self> {
221/// Ok(NoOp {
222/// input: inputs
223/// .take("in")
224/// .expect("No input called 'in' found")
225/// .typed(|bytes| todo!("Provide your deserializer here")),
226/// output: outputs
227/// .take("out")
228/// .expect("No output called 'out' found")
229/// .typed(|buffer, data| todo!("Provide your serializer here")),
230/// })
231/// }
232/// }
233/// #[async_trait]
234/// impl Node for NoOp {
235/// async fn iteration(&self) -> Result<()> {
236/// let (message, _timestamp) = self.input.recv().await?;
237/// match message {
238/// Message::Data(t) => self.output.send(*t, None).await?,
239/// Message::Watermark => println!("Watermark"),
240/// }
241/// Ok(())
242/// }
243/// }
244/// ```
245#[async_trait]
246pub trait Operator: Node + Send + Sync {
247 /// For a `Context`, a `Configuration`, a set of `Inputs` and `Outputs`, produce a new
248 /// **Operator**.
249 ///
250 /// Operators are at the heart of a data flow, they carry out computations on the data they
251 /// receive before sending them out to the next downstream node.
252 ///
253 /// The Operators are started *before the Sources* such that they are active before the first
254 /// data are produced.
255 async fn new(
256 context: Context,
257 configuration: Option<Configuration>,
258 inputs: Inputs,
259 outputs: Outputs,
260 ) -> Result<Self>
261 where
262 Self: Sized;
263}
264
265/// A `Node` is defined by its `iteration` that is repeatedly called by Zenoh-Flow.
266///
267/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
268/// state and to mutate it, the interior mutability pattern is necessary.
269///
270/// A struct implementing the Node trait typically needs to keep a reference to the `Input` and
271/// `Output` it needs.
272///
273/// For usage examples see: [`Operator`](`Operator`), [`Source`](`Source`) or [`Sink`](`Sink`)
274/// traits.
275#[async_trait]
276pub trait Node: Send + Sync {
277 async fn iteration(&self) -> Result<()>;
278}