logic_mesh/tokio_impl/
block.rs

1// Copyright (c) 2022-2023, Radu Racariu.
2
3use std::time::Duration;
4
5use futures::future::select_all;
6use futures::FutureExt;
7use libhaystack::val::kind::HaystackKind;
8
9use super::sleep::sleep_millis;
10use crate::base::block::{convert_value_kind, BlockState};
11use crate::base::input::InputProps;
12use crate::base::{block::Block, input::input_reader::InputReader};
13use crate::blocks::utils::get_sleep_dur;
14use crate::blocks::InputImpl;
15
16pub trait BlockImpl = Block<Reader = <InputImpl as InputProps>::Reader, Writer = <InputImpl as InputProps>::Writer>
17    + Default
18    + 'static;
19
20impl<B: Block> InputReader for B {
21    async fn read_inputs(&mut self) -> Option<usize> {
22        read_block_inputs(self).await
23    }
24
25    async fn read_inputs_until_ready(&mut self) -> Option<usize> {
26        let dur = get_sleep_dur();
27        loop {
28            let result = read_block_inputs(self).await;
29            if result.is_some() {
30                return result;
31            }
32            sleep_millis(dur).await;
33        }
34    }
35
36    async fn wait_on_inputs(&mut self, timeout: Duration) -> Option<usize> {
37        let millis = timeout.as_millis() as u64;
38        let (result, index, _) = select_all([
39            async {
40                sleep_millis(millis).await;
41                None
42            }
43            .boxed_local(),
44            async { self.read_inputs().await }.boxed_local(),
45        ])
46        .await;
47
48        if index != 0 {
49            sleep_millis(millis).await;
50            None
51        } else {
52            result
53        }
54    }
55}
56
57///
58/// Reads all inputs and awaits for any of them to have data
59/// On the first input that has data, read the data and update
60/// the input's value.
61///
62/// If the input kind does not match the received Value kind, this would put the block in fault.
63///
64/// # Returns
65/// The index of the input that was read with a valid value.
66// TODO: clippy issue
67#[allow(clippy::needless_pass_by_ref_mut)]
68pub(crate) async fn read_block_inputs<B: Block>(block: &mut B) -> Option<usize> {
69    let mut inputs = block
70        .inputs_mut()
71        .into_iter()
72        .filter(|input| input.is_connected())
73        .collect::<Vec<_>>();
74
75    if inputs.is_empty() {
76        return None;
77    }
78
79    let (val, idx, _) = {
80        let input_futures = inputs
81            .iter_mut()
82            .map(|input| input.receiver())
83            .collect::<Vec<_>>();
84
85        select_all(input_futures).await
86    };
87
88    if let Some(value) = val {
89        if let Some(input) = inputs.get_mut(idx) {
90            let expected = *input.kind();
91            let actual = HaystackKind::from(&value);
92
93            if expected != HaystackKind::Null && expected != actual {
94                match convert_value_kind(value, expected, actual) {
95                    Ok(value) => input.set_value(value),
96                    Err(err) => {
97                        log::error!("Error converting value: {}", err);
98                        block.set_state(BlockState::Fault);
99                    }
100                }
101            } else {
102                input.set_value(value);
103            }
104        } else {
105            block.set_state(BlockState::Fault);
106        }
107        Some(idx)
108    } else {
109        None
110    }
111}