rsiot_channel_utils/
component_mpsc_to_broadcast.rs

1//! Преобразование mpsc::Receiver в broadcast::Sender
2
3use tokio::{
4    sync::{
5        broadcast::{self, error::SendError},
6        mpsc,
7    },
8    time::{sleep, Duration},
9};
10use tracing::{error, info};
11
12use rsiot_messages_core::IMessage;
13
14/// Компонент для перенаправления сообщений из `tokio::sync::mpsc`
15/// в `tokio::sync::broadcast`
16///
17pub async fn component_mpsc_to_broadcast<TMessage>(
18    mut input: mpsc::Receiver<TMessage>,
19    output: broadcast::Sender<TMessage>,
20) -> ()
21where
22    TMessage: IMessage,
23{
24    info!("Component component_mpsc_to_broadcast started");
25    loop {
26        let result = loop_(&mut input, &output).await;
27        match result {
28            Ok(_) => (),
29            Err(err) => error!("{:?}", err),
30        }
31        info!("Restarting...");
32        sleep(Duration::from_secs(2)).await;
33    }
34}
35
36async fn loop_<TMessage>(
37    channel_mpsc_rcv: &mut mpsc::Receiver<TMessage>,
38    channel_broadcast_send: &broadcast::Sender<TMessage>,
39) -> Result<(), SendError<TMessage>> {
40    while let Some(msg) = channel_mpsc_rcv.recv().await {
41        channel_broadcast_send.send(msg)?;
42    }
43    Ok(())
44}