rsiot_channel_utils/
component_mpsc_to_broadcast.rs1use tokio::{
4 sync::{
5 broadcast::{self, error::SendError},
6 mpsc,
7 },
8 time::{sleep, Duration},
9};
10use tracing::{error, info};
11
12use rsiot_messages_core::IMessage;
13
14pub async fn component_mpsc_to_broadcast<TMessage>(
18 mut input: mpsc::Receiver<TMessage>,
19 output: broadcast::Sender<TMessage>,
20) -> ()
21where
22 TMessage: IMessage,
23{
24 info!("Component component_mpsc_to_broadcast started");
25 loop {
26 let result = loop_(&mut input, &output).await;
27 match result {
28 Ok(_) => (),
29 Err(err) => error!("{:?}", err),
30 }
31 info!("Restarting...");
32 sleep(Duration::from_secs(2)).await;
33 }
34}
35
36async fn loop_<TMessage>(
37 channel_mpsc_rcv: &mut mpsc::Receiver<TMessage>,
38 channel_broadcast_send: &broadcast::Sender<TMessage>,
39) -> Result<(), SendError<TMessage>> {
40 while let Some(msg) = channel_mpsc_rcv.recv().await {
41 channel_broadcast_send.send(msg)?;
42 }
43 Ok(())
44}