logic_mesh/tokio_impl/
block.rs1use std::time::Duration;
4
5use futures::future::select_all;
6use futures::FutureExt;
7use libhaystack::val::kind::HaystackKind;
8
9use super::sleep::sleep_millis;
10use crate::base::block::{convert_value_kind, BlockState};
11use crate::base::input::InputProps;
12use crate::base::{block::Block, input::input_reader::InputReader};
13use crate::blocks::utils::get_sleep_dur;
14use crate::blocks::InputImpl;
15
16pub trait BlockImpl = Block<Reader = <InputImpl as InputProps>::Reader, Writer = <InputImpl as InputProps>::Writer>
17 + Default
18 + 'static;
19
20impl<B: Block> InputReader for B {
21 async fn read_inputs(&mut self) -> Option<usize> {
22 read_block_inputs(self).await
23 }
24
25 async fn read_inputs_until_ready(&mut self) -> Option<usize> {
26 let dur = get_sleep_dur();
27 loop {
28 let result = read_block_inputs(self).await;
29 if result.is_some() {
30 return result;
31 }
32 sleep_millis(dur).await;
33 }
34 }
35
36 async fn wait_on_inputs(&mut self, timeout: Duration) -> Option<usize> {
37 let millis = timeout.as_millis() as u64;
38 let (result, index, _) = select_all([
39 async {
40 sleep_millis(millis).await;
41 None
42 }
43 .boxed_local(),
44 async { self.read_inputs().await }.boxed_local(),
45 ])
46 .await;
47
48 if index != 0 {
49 sleep_millis(millis).await;
50 None
51 } else {
52 result
53 }
54 }
55}
56
57#[allow(clippy::needless_pass_by_ref_mut)]
68pub(crate) async fn read_block_inputs<B: Block>(block: &mut B) -> Option<usize> {
69 let mut inputs = block
70 .inputs_mut()
71 .into_iter()
72 .filter(|input| input.is_connected())
73 .collect::<Vec<_>>();
74
75 if inputs.is_empty() {
76 return None;
77 }
78
79 let (val, idx, _) = {
80 let input_futures = inputs
81 .iter_mut()
82 .map(|input| input.receiver())
83 .collect::<Vec<_>>();
84
85 select_all(input_futures).await
86 };
87
88 if let Some(value) = val {
89 if let Some(input) = inputs.get_mut(idx) {
90 let expected = *input.kind();
91 let actual = HaystackKind::from(&value);
92
93 if expected != HaystackKind::Null && expected != actual {
94 match convert_value_kind(value, expected, actual) {
95 Ok(value) => input.set_value(value),
96 Err(err) => {
97 log::error!("Error converting value: {}", err);
98 block.set_state(BlockState::Fault);
99 }
100 }
101 } else {
102 input.set_value(value);
103 }
104 } else {
105 block.set_state(BlockState::Fault);
106 }
107 Some(idx)
108 } else {
109 None
110 }
111}