rsiot_channel_utils/
component_combine_message.rs1use std::collections::HashMap;
4use tokio::{
5 sync::mpsc::{self, error::SendError},
6 time::{sleep, Duration},
7};
8
9use rsiot_messages_core::IMessage;
10use tracing::{error, info};
11
12type FilterFn<TMessage> = fn(TMessage) -> Option<TMessage>;
13type CombineFn<TMessage> = fn(Vec<TMessage>) -> Option<TMessage>;
14
15pub async fn component_combine_message<TMessage>(
25 mut input: mpsc::Receiver<TMessage>,
26 output: mpsc::Sender<TMessage>,
27 filter_fn: FilterFn<TMessage>,
28 combine_fn: CombineFn<TMessage>,
29) where
30 TMessage: IMessage,
31{
32 info!("Component component_combine_message started");
33 loop {
34 let result = loop_(&mut input, &output, filter_fn, combine_fn).await;
35 match result {
36 Ok(_) => (),
37 Err(err) => error!("{:?}", err),
38 }
39 sleep(Duration::from_secs(2)).await;
40 info!("Restarting...")
41 }
42}
43
44async fn loop_<TMessage>(
45 input: &mut mpsc::Receiver<TMessage>,
46 output: &mpsc::Sender<TMessage>,
47 filter_fn: FilterFn<TMessage>,
48 combine_fn: CombineFn<TMessage>,
49) -> Result<(), SendError<TMessage>>
50where
51 TMessage: IMessage,
52{
53 let mut msg_hash = HashMap::<String, TMessage>::new();
54 while let Some(msg) = input.recv().await {
55 output.send(msg.clone()).await?;
56 let new_msg =
57 filter_and_transform(&mut msg_hash, msg, filter_fn, combine_fn);
58 if let Some(new_msg) = new_msg {
59 output.send(new_msg).await?;
60 }
61 }
62 Ok(())
63}
64
65fn filter_and_transform<TMessage>(
69 msg_hash: &mut HashMap<String, TMessage>,
70 msg: TMessage,
71 filter_fn: fn(TMessage) -> Option<TMessage>,
72 transform_fn: fn(Vec<TMessage>) -> Option<TMessage>,
73) -> Option<TMessage>
74where
75 TMessage: IMessage,
76{
77 let msg = filter_fn(msg);
78 let msg = match msg {
79 Some(val) => val,
80 None => return None,
81 };
82 msg_hash.insert(msg.key(), msg.clone());
83 let msg_vec: Vec<TMessage> = msg_hash.values().map(|m| m.clone()).collect();
84 transform_fn(msg_vec)
85}