rsiot_extra_components/
cmp_add_input_stream.rs1use async_trait::async_trait;
4use tokio::sync::broadcast;
5
6use rsiot_component_core::{CmpInOut, Component, ComponentError, IComponentProcess};
7use rsiot_messages_core::*;
8
9#[derive(Debug)]
11pub struct Cfg<TMessage> {
12 pub channel: broadcast::Receiver<Message<TMessage>>,
13}
14
15#[cfg_attr(not(feature = "single-thread"), async_trait)]
17#[cfg_attr(feature = "single-thread", async_trait(?Send))]
18impl<TMsg> IComponentProcess<Cfg<TMsg>, TMsg> for Component<Cfg<TMsg>, TMsg>
19where
20 TMsg: MsgDataBound + 'static,
21{
22 async fn process(
23 &self,
24 mut config: Cfg<TMsg>,
25 in_out: CmpInOut<TMsg>,
26 ) -> Result<(), ComponentError> {
27 while let Ok(msg) = config.channel.recv().await {
28 in_out
29 .send_output(msg)
30 .await
31 .map_err(|err| ComponentError::Execution(err.to_string()))?;
32 }
33 Ok(())
34 }
35}
36
37pub type Cmp<TMsg> = Component<Cfg<TMsg>, TMsg>;