rsiot_channel_utils/
component_combine_message.rs

1//! Компонент для преобразования нескольких сообщений в новое
2
3use std::collections::HashMap;
4use tokio::{
5    sync::mpsc::{self, error::SendError},
6    time::{sleep, Duration},
7};
8
9use rsiot_messages_core::IMessage;
10use tracing::{error, info};
11
12type FilterFn<TMessage> = fn(TMessage) -> Option<TMessage>;
13type CombineFn<TMessage> = fn(Vec<TMessage>) -> Option<TMessage>;
14
15/// Компонент для преобразования нескольких сообщений в новое.
16/// На выход передаются все исходные сообщения, плюс новые
17///
18/// - `input` - исходный поток сообщений
19/// - `output` - исходный поток сообщений, плюс новые сообщения
20/// - `filter_fn` - функция для фильтрации необходимых исходных сообщений.
21/// Сообщения сохраняются в хеше. Сигнатура `fn(TMessage) -> Option<TMessage>`
22/// - `transform_fn` - функция для преобразования сохраненных сообщений в новое.
23/// Сигнатура `fn(Vec<TMessage>) -> Option<TMessage>`
24pub async fn component_combine_message<TMessage>(
25    mut input: mpsc::Receiver<TMessage>,
26    output: mpsc::Sender<TMessage>,
27    filter_fn: FilterFn<TMessage>,
28    combine_fn: CombineFn<TMessage>,
29) where
30    TMessage: IMessage,
31{
32    info!("Component component_combine_message started");
33    loop {
34        let result = loop_(&mut input, &output, filter_fn, combine_fn).await;
35        match result {
36            Ok(_) => (),
37            Err(err) => error!("{:?}", err),
38        }
39        sleep(Duration::from_secs(2)).await;
40        info!("Restarting...")
41    }
42}
43
44async fn loop_<TMessage>(
45    input: &mut mpsc::Receiver<TMessage>,
46    output: &mpsc::Sender<TMessage>,
47    filter_fn: FilterFn<TMessage>,
48    combine_fn: CombineFn<TMessage>,
49) -> Result<(), SendError<TMessage>>
50where
51    TMessage: IMessage,
52{
53    let mut msg_hash = HashMap::<String, TMessage>::new();
54    while let Some(msg) = input.recv().await {
55        output.send(msg.clone()).await?;
56        let new_msg =
57            filter_and_transform(&mut msg_hash, msg, filter_fn, combine_fn);
58        if let Some(new_msg) = new_msg {
59            output.send(new_msg).await?;
60        }
61    }
62    Ok(())
63}
64
65/// Обработка сообщения.
66/// Проводим сообщение через функцию фильтрации. Сохраняем в хеше. Проводим все
67/// сообщения через функцию трасформации.
68fn filter_and_transform<TMessage>(
69    msg_hash: &mut HashMap<String, TMessage>,
70    msg: TMessage,
71    filter_fn: fn(TMessage) -> Option<TMessage>,
72    transform_fn: fn(Vec<TMessage>) -> Option<TMessage>,
73) -> Option<TMessage>
74where
75    TMessage: IMessage,
76{
77    let msg = filter_fn(msg);
78    let msg = match msg {
79        Some(val) => val,
80        None => return None,
81    };
82    msg_hash.insert(msg.key(), msg.clone());
83    let msg_vec: Vec<TMessage> = msg_hash.values().map(|m| m.clone()).collect();
84    transform_fn(msg_vec)
85}