zenoh_flow/
traits.rs

1//
2// Copyright (c) 2021 - 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use crate::prelude::{Inputs, Outputs};
16use crate::types::{Configuration, Context};
17use crate::Result;
18
19use async_trait::async_trait;
20use std::any::Any;
21
22/// The `SendSyncAny` trait allows Zenoh-Flow to send data between nodes running in the same process
23/// without serializing.
24///
25/// This trait is implemented for any type that has the `static` lifetime and implements `Send` and
26/// `Sync`. These constraints are the same than for the typed `Input` and `Output` which means that
27/// there is absolutely no need to manually implement it.
28pub trait SendSyncAny: Send + Sync {
29    fn as_any(&self) -> &dyn Any;
30
31    fn as_mut_any(&mut self) -> &mut dyn Any;
32}
33
34impl<T: 'static + Send + Sync> SendSyncAny for T {
35    fn as_any(&self) -> &dyn Any {
36        self
37    }
38
39    fn as_mut_any(&mut self) -> &mut dyn Any {
40        self
41    }
42}
43
44/// The `Source` trait represents a Source of data in Zenoh Flow. Sources only possess `Outputs` and
45/// their purpose is to fetch data from the external world.
46///
47/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
48/// state and to mutate it, the interior mutability pattern is necessary.
49///
50/// A struct implementing the Source trait typically needs to keep a reference to the `Output` it
51/// needs.
52///
53/// ## Example
54///
55/// ```no_run
56/// use zenoh_flow::prelude::*;
57///
58/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
59/// // the shared library.
60/// #[export_source]
61/// pub struct MySource {
62///     output: Output<usize>,
63///     // The state could go in such structure.
64///     // state: Arc<Mutex<State>>,
65/// }
66///
67/// #[async_trait::async_trait]
68/// impl Source for MySource {
69///     async fn new(
70///         _context: Context,
71///         _configuration: Option<Configuration>,
72///         mut outputs: Outputs,
73///     ) -> Result<Self> {
74///         let output = outputs
75///             .take("out")
76///             .expect("No output called 'out' found")
77///             .typed(|buffer, data| todo!("Provide your serializer here"));
78///             
79///         Ok(Self { output })
80///     }
81/// }
82///
83/// #[async_trait::async_trait]
84/// impl Node for MySource {
85///     async fn iteration(&self) -> Result<()> {
86///         // To mutate the state, first lock it.
87///         //
88///         // let state = self.state.lock().await;
89///         //
90///         // The state is a way for the Source to read information from the external world, i.e.,
91///         // interacting with I/O devices. We mimick an asynchronous iteraction with a sleep.
92///         async_std::task::sleep(std::time::Duration::from_secs(1)).await;
93///
94///         // self.output.send(10usize, None).await?;
95///         Ok(())
96///     }
97/// }
98/// ```
99#[async_trait]
100pub trait Source: Node + Send + Sync {
101    /// For a `Context`, a `Configuration` and a set of `Outputs`, produce a new *Source*.
102    ///
103    /// Sources only possess `Outputs` and their purpose is to fetch data from the external world.
104    ///
105    /// Sources are **started last** when initiating a data flow. This is to prevent data loss: if a
106    /// Source is started before its downstream nodes then the data it would send before said
107    /// downstream nodes are up would be lost.
108    async fn new(
109        context: Context,
110        configuration: Option<Configuration>,
111        outputs: Outputs,
112    ) -> Result<Self>
113    where
114        Self: Sized;
115}
116
117/// The `Sink` trait represents a Sink of data in Zenoh Flow.
118///
119/// Sinks only possess `Inputs`, their objective is to send the result of the computations to the
120/// external world.
121///
122/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
123/// state and to mutate it, the interior mutability pattern is necessary.
124///
125/// A struct implementing the Sink trait typically needs to keep a reference to the `Input` it
126/// needs.
127///
128/// ## Example
129///
130/// ```no_run
131/// use async_trait::async_trait;
132/// use zenoh_flow::prelude::*;
133///
134/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
135/// // the shared library.
136/// #[export_sink]
137/// struct GenericSink {
138///     input: Input<usize>,
139/// }
140///
141/// #[async_trait]
142/// impl Sink for GenericSink {
143///     async fn new(
144///         _context: Context,
145///         _configuration: Option<Configuration>,
146///         mut inputs: Inputs,
147///     ) -> Result<Self> {
148///         let input = inputs
149///             .take("in")
150///             .expect("No input called 'in' found")
151///             .typed(|bytes| todo!("Provide your deserializer here"));
152///
153///         Ok(GenericSink { input })
154///     }
155/// }
156///
157/// #[async_trait]
158/// impl Node for GenericSink {
159///     async fn iteration(&self) -> Result<()> {
160///         let (message, _timestamp) = self.input.recv().await?;
161///         match message {
162///             Message::Data(t) => println!("{}", *t),
163///             Message::Watermark => println!("Watermark"),
164///         }
165///
166///         Ok(())
167///     }
168/// }
169/// ```
170#[async_trait]
171pub trait Sink: Node + Send + Sync {
172    /// For a `Context`, a `Configuration` and a set of `Inputs`, produce a new **Sink**.
173    ///
174    /// Sinks only possess `Inputs`, their objective is to send the result of the computations to the
175    /// external world.
176    ///
177    /// Sinks are **started first** when initiating a data flow. As they are at the end of the chain of
178    /// computations, by starting them first we ensure that no data is lost.
179    async fn new(
180        context: Context,
181        configuration: Option<Configuration>,
182        inputs: Inputs,
183    ) -> Result<Self>
184    where
185        Self: Sized;
186}
187
188/// The `Operator` trait represents an Operator inside Zenoh-Flow.
189///
190/// Operators are at the heart of a data flow, they carry out computations on the data they receive
191/// before sending them out to the next downstream node.
192///
193/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
194/// state and to mutate it, the interior mutability pattern is necessary.
195///
196/// A struct implementing the Operator trait typically needs to keep a reference to the `Input` and
197/// `Output` it needs.
198///
199/// ## Example
200///
201/// ```no_run
202/// use async_trait::async_trait;
203/// use zenoh_flow::prelude::*;
204///
205/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
206/// // the shared library.
207/// #[export_operator]
208/// struct NoOp {
209///     input: Input<usize>,
210///     output: Output<usize>,
211/// }
212///
213/// #[async_trait]
214/// impl Operator for NoOp {
215///     async fn new(
216///         _context: Context,
217///         _configuration: Option<Configuration>,
218///         mut inputs: Inputs,
219///         mut outputs: Outputs,
220///     ) -> Result<Self> {
221///         Ok(NoOp {
222///             input: inputs
223///                 .take("in")
224///                 .expect("No input called 'in' found")
225///                 .typed(|bytes| todo!("Provide your deserializer here")),
226///             output: outputs
227///                 .take("out")
228///                 .expect("No output called 'out' found")
229///                 .typed(|buffer, data| todo!("Provide your serializer here")),
230///         })
231///     }
232/// }
233/// #[async_trait]
234/// impl Node for NoOp {
235///     async fn iteration(&self) -> Result<()> {
236///         let (message, _timestamp) = self.input.recv().await?;
237///         match message {
238///             Message::Data(t) => self.output.send(*t, None).await?,
239///             Message::Watermark => println!("Watermark"),
240///         }
241///         Ok(())
242///     }
243/// }
244/// ```
245#[async_trait]
246pub trait Operator: Node + Send + Sync {
247    /// For a `Context`, a `Configuration`, a set of `Inputs` and `Outputs`, produce a new
248    /// **Operator**.
249    ///
250    /// Operators are at the heart of a data flow, they carry out computations on the data they
251    /// receive before sending them out to the next downstream node.
252    ///
253    /// The Operators are started *before the Sources* such that they are active before the first
254    /// data are produced.
255    async fn new(
256        context: Context,
257        configuration: Option<Configuration>,
258        inputs: Inputs,
259        outputs: Outputs,
260    ) -> Result<Self>
261    where
262        Self: Sized;
263}
264
265/// A `Node` is defined by its `iteration` that is repeatedly called by Zenoh-Flow.
266///
267/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a
268/// state and to mutate it, the interior mutability pattern is necessary.
269///
270/// A struct implementing the Node trait typically needs to keep a reference to the `Input` and
271/// `Output` it needs.
272///
273/// For usage examples see: [`Operator`](`Operator`), [`Source`](`Source`) or [`Sink`](`Sink`)
274/// traits.
275#[async_trait]
276pub trait Node: Send + Sync {
277    async fn iteration(&self) -> Result<()>;
278}