rsiot_channel_utils/
component_delay.rs1use std::time::Instant;
4
5use tokio::{
6 spawn,
7 sync::mpsc::{self, error::SendError},
8 time::{sleep, Duration},
9};
10
11use rsiot_messages_core::IMessage;
12use tracing::{error, info};
13
14use crate::{component_cache, create_cache, CacheType};
15
16pub async fn component_delay<TMessage>(
18 stream_input: mpsc::Receiver<TMessage>,
19 stream_output: mpsc::Sender<TMessage>,
20 delay: Duration,
21) -> ()
22where
23 TMessage: IMessage + 'static,
24{
25 let cache = create_cache::<TMessage>();
26
27 let _task_cache = spawn(component_cache(stream_input, None, cache.clone()));
28
29 loop {
30 info!("Component started");
31 let result = loop_(cache.clone(), &stream_output, delay).await;
32 match result {
33 Ok(_) => (),
34 Err(err) => error!("{:?}", err),
35 }
36 info!("Restarting...");
37 }
38}
39
40#[derive(Debug)]
41enum Error<TMessage> {
42 SendError(SendError<TMessage>),
43}
44
45impl<TMessage> From<SendError<TMessage>> for Error<TMessage> {
46 fn from(value: SendError<TMessage>) -> Self {
47 Self::SendError(value)
48 }
49}
50
51async fn loop_<TMessage>(
52 cache: CacheType<TMessage>,
53 stream_output: &mpsc::Sender<TMessage>,
54 delay: Duration,
55) -> Result<(), Error<TMessage>>
56where
57 TMessage: IMessage,
58{
59 loop {
60 let begin = Instant::now();
61 {
62 let mut lock = cache.lock().await;
63 for msg in lock.values() {
64 stream_output.send(msg.clone()).await?;
65 }
66 lock.clear();
67 }
68 sleep(delay - begin.elapsed()).await;
69 }
70}