rsiot_extra_components/
cmp_add_input_stream.rs

1//! Компонент для добавления сообщений из побочного потока
2
3use async_trait::async_trait;
4use tokio::sync::broadcast;
5
6use rsiot_component_core::{CmpInOut, Component, ComponentError, IComponentProcess};
7use rsiot_messages_core::*;
8
9/// Настройки
10#[derive(Debug)]
11pub struct Cfg<TMessage> {
12    pub channel: broadcast::Receiver<Message<TMessage>>,
13}
14
15/// Компонент для добавления сообщений из побочного потока
16#[cfg_attr(not(feature = "single-thread"), async_trait)]
17#[cfg_attr(feature = "single-thread", async_trait(?Send))]
18impl<TMsg> IComponentProcess<Cfg<TMsg>, TMsg> for Component<Cfg<TMsg>, TMsg>
19where
20    TMsg: MsgDataBound + 'static,
21{
22    async fn process(
23        &self,
24        mut config: Cfg<TMsg>,
25        in_out: CmpInOut<TMsg>,
26    ) -> Result<(), ComponentError> {
27        while let Ok(msg) = config.channel.recv().await {
28            in_out
29                .send_output(msg)
30                .await
31                .map_err(|err| ComponentError::Execution(err.to_string()))?;
32        }
33        Ok(())
34    }
35}
36
37pub type Cmp<TMsg> = Component<Cfg<TMsg>, TMsg>;