1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
//
// Copyright (c) 2021 - 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use crate::context::Context;
use crate::io::{Inputs, Outputs};
use async_trait::async_trait;
use std::any::Any;
use zenoh_flow_commons::{Configuration, Result};
/// The `SendSyncAny` trait allows Zenoh-Flow to send data between nodes running in the same process without
/// serialising.
///
/// This trait is implemented for any type that has the `static` lifetime and implements [Send] and [Sync]. These
/// constraints are the same than for the typed [Input](crate::prelude::Input) and typed
/// [Output](crate::prelude::Output) which means that there should be no need to manually implement it.
pub trait SendSyncAny: Send + Sync {
fn as_any(&self) -> &dyn Any;
fn as_mut_any(&mut self) -> &mut dyn Any;
}
impl<T: 'static + Send + Sync> SendSyncAny for T {
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
}
/// The `Node` trait, shared among all node types, dictates how a node *runs*.
///
/// # `Iteration`
///
/// The central method, for which there is no default implementation, is [iteration](Node::iteration()). This method
/// is called, in a loop, by the Zenoh-Flow runtime managing the node.
///
/// Method in this trait takes an immutable reference to `self` so as to not impact performance. To keep a state and to
/// mutate it, the [interior mutability](https://doc.rust-lang.org/reference/interior-mutability.html) pattern is
/// necessary.
///
/// For usage examples see the [Operator](crate::prelude::Operator), [Source](crate::prelude::Source) or
/// [Sink](crate::prelude::Sink) traits.
///
/// # Additional hooks: `on_resume`, `on_abort`
///
/// It is possible to define specific code that the Zenoh-Flow runtime should run *before* the node is aborted and
/// *before* it is resumed.
///
/// Note that the `on_resume` hook is only run once the node has been aborted. It is not run when it is created.
///
/// A default blank implementation is provided.
#[async_trait]
pub trait Node: Send + Sync {
/// The code a Zenoh-Flow runtime will execute in a loop.
///
/// A typical workflow would be to wait for all or a subset of the [Input(s)](crate::prelude::Input) to be ready,
/// perform some computation and finally forward the result(s) downstream on the
/// [Output(s)](crate::prelude::Output).
async fn iteration(&self) -> Result<()>;
/// Custom code that Zenoh-Flow will run *before* re-starting a node that was previously aborted.
///
/// The code to correctly manage the state of a node should go there. This hook is for instance leveraged within
/// the implementation of the Zenoh Source built-in node.
///
/// The blanket implementation defaults to returning `Ok(())`.
///
/// # Performance
///
/// This method is only called when the node is restarted. Hence, its impact is limited and does not affect the
/// normal execution of a node.
async fn on_resume(&self) -> Result<()> {
Ok(())
}
async fn on_abort(&self) {}
}
/// A `Source` feeds data into a data flow.
///
/// A `Source` only possesses `Output` (either [typed](crate::prelude::Output) or [raw](crate::prelude::OutputRaw)) as
/// it does not receive any data from upstream nodes but from "outside" the data flow.
///
/// A structure implementing the `Source` trait typically needs to keep a reference to the `Output`.
///
/// ## Example
///
/// ```no_run
/// use async_trait::async_trait;
/// use zenoh_flow_nodes::prelude::*;
///
/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
/// // the shared library.
/// #[export_source]
/// pub struct MySource {
/// output: Output<usize>,
/// // The state could go in such structure.
/// // state: Arc<Mutex<State>>,
/// }
///
/// #[async_trait::async_trait]
/// impl Source for MySource {
/// async fn new(
/// _context: Context,
/// _configuration: Configuration,
/// mut outputs: Outputs,
/// ) -> Result<Self> {
/// let output = outputs
/// .take("out")
/// .expect("No output called 'out' found")
/// .typed(|buffer, data| todo!("Provide your serialiser here"));
///
/// Ok(Self { output })
/// }
/// }
///
/// #[async_trait::async_trait]
/// impl Node for MySource {
/// async fn iteration(&self) -> Result<()> {
/// // To mutate the state, first lock it.
/// //
/// // let state = self.state.lock().await;
/// //
/// // The state is a way for the Source to read information from the external world, i.e., interacting with
/// // I/O devices.
///
/// self.output.send(10usize, None).await
/// }
/// }
/// ```
#[async_trait]
pub trait Source: Node + Send + Sync {
/// For a [Context], a [Configuration] and a set of [Outputs], produce a new [Source].
///
/// Sources only possess `Outputs` as their purpose is to fetch data from the external world.
async fn new(context: Context, configuration: Configuration, outputs: Outputs) -> Result<Self>
where
Self: Sized;
}
/// An `Operator` is a node performing transformation over the data it receives, outputting the end result to downstream
/// node(s).
///
/// An `Operator` possesses both `Input` (either [typed](crate::prelude::Input) or [raw](crate::prelude::InputRaw)) and
/// `Output` (either [typed](crate::prelude::Output) or [raw](crate::prelude::OutputRaw)).
///
/// A structure implementing the `Operator` trait typically needs to keep a reference to its `Input`(s) and `Output`(s).
///
/// ## Example
///
/// ```no_run
/// use async_trait::async_trait;
/// use zenoh_flow_nodes::prelude::*;
///
/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
/// // the shared library.
/// #[export_operator]
/// struct NoOp {
/// input: Input<usize>,
/// output: Output<usize>,
/// }
///
/// #[async_trait]
/// impl Operator for NoOp {
/// async fn new(
/// _context: Context,
/// _configuration: Configuration,
/// mut inputs: Inputs,
/// mut outputs: Outputs,
/// ) -> Result<Self> {
/// Ok(NoOp {
/// input: inputs
/// .take("in")
/// .expect("No input called 'in' found")
/// .typed(|bytes| todo!("Provide your deserialiser here")),
/// output: outputs
/// .take("out")
/// .expect("No output called 'out' found")
/// .typed(|buffer, data| todo!("Provide your serialiser here")),
/// })
/// }
/// }
///
/// #[async_trait]
/// impl Node for NoOp {
/// async fn iteration(&self) -> Result<()> {
/// let (message, _timestamp) = self.input.recv().await?;
/// self.output.send(*message, None).await
/// }
/// }
/// ```
#[async_trait]
pub trait Operator: Node + Send + Sync {
/// For a [Context], a [Configuration], a set of [Inputs] and [Outputs], produce a new [Operator].
///
/// Operators are at the heart of a data flow, they carry out computations on the data they
/// receive before sending them out to the next downstream node.
async fn new(
context: Context,
configuration: Configuration,
inputs: Inputs,
outputs: Outputs,
) -> Result<Self>
where
Self: Sized;
}
/// A `Sink` exposes the outcome of the data flow processing.
///
/// A `Sink` only possesses `Input` (either [typed](crate::prelude::Input) or [raw](crate::prelude::InputRaw)) as its
/// purpose is to communicate with entities outside of the data flow.
///
/// A structure implementing the `Sink` trait typically needs to keep a reference to its `Input`(s).
///
/// ## Example
///
/// ```no_run
/// use async_trait::async_trait;
/// use zenoh_flow_nodes::prelude::*;
///
/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load
/// // the shared library.
/// #[export_sink]
/// struct GenericSink {
/// input: Input<usize>,
/// }
///
/// #[async_trait]
/// impl Sink for GenericSink {
/// async fn new(
/// _context: Context,
/// _configuration: Configuration,
/// mut inputs: Inputs,
/// ) -> Result<Self> {
/// let input = inputs
/// .take("in")
/// .expect("No input called 'in' found")
/// .typed(|bytes| todo!("Provide your deserializer here"));
///
/// Ok(GenericSink { input })
/// }
/// }
///
/// #[async_trait]
/// impl Node for GenericSink {
/// async fn iteration(&self) -> Result<()> {
/// let (message, _timestamp) = self.input.recv().await?;
/// println!("{}", *message);
///
/// Ok(())
/// }
/// }
/// ```
#[async_trait]
pub trait Sink: Node + Send + Sync {
/// For a [Context], a [Configuration] and [Inputs], produce a new [Sink].
///
/// Sinks only possess `Inputs`, their objective is to send the result of the computations to the external world.
async fn new(context: Context, configuration: Configuration, inputs: Inputs) -> Result<Self>
where
Self: Sized;
}