rsiot_channel_utils/
component_delay.rs

1//! Перенаправление сообщений с задержкой
2
3use std::time::Instant;
4
5use tokio::{
6    spawn,
7    sync::mpsc::{self, error::SendError},
8    time::{sleep, Duration},
9};
10
11use rsiot_messages_core::IMessage;
12use tracing::{error, info};
13
14use crate::{component_cache, create_cache, CacheType};
15
16/// Перенаправление сообщений с задержкой
17pub async fn component_delay<TMessage>(
18    stream_input: mpsc::Receiver<TMessage>,
19    stream_output: mpsc::Sender<TMessage>,
20    delay: Duration,
21) -> ()
22where
23    TMessage: IMessage + 'static,
24{
25    let cache = create_cache::<TMessage>();
26
27    let _task_cache = spawn(component_cache(stream_input, None, cache.clone()));
28
29    loop {
30        info!("Component started");
31        let result = loop_(cache.clone(), &stream_output, delay).await;
32        match result {
33            Ok(_) => (),
34            Err(err) => error!("{:?}", err),
35        }
36        info!("Restarting...");
37    }
38}
39
40#[derive(Debug)]
41enum Error<TMessage> {
42    SendError(SendError<TMessage>),
43}
44
45impl<TMessage> From<SendError<TMessage>> for Error<TMessage> {
46    fn from(value: SendError<TMessage>) -> Self {
47        Self::SendError(value)
48    }
49}
50
51async fn loop_<TMessage>(
52    cache: CacheType<TMessage>,
53    stream_output: &mpsc::Sender<TMessage>,
54    delay: Duration,
55) -> Result<(), Error<TMessage>>
56where
57    TMessage: IMessage,
58{
59    loop {
60        let begin = Instant::now();
61        {
62            let mut lock = cache.lock().await;
63            for msg in lock.values() {
64                stream_output.send(msg.clone()).await?;
65            }
66            lock.clear();
67        }
68        sleep(delay - begin.elapsed()).await;
69    }
70}