logic_mesh/tokio_impl/
input.rs1use std::pin::Pin;
4
5use futures::FutureExt;
6use tokio::sync::mpsc::{channel, Receiver, Sender};
7
8use libhaystack::val::{kind::HaystackKind, Value};
9use uuid::Uuid;
10
11use crate::base::{
12 input::{BaseInput, Input, InputDefault, InputReceiver},
13 link::LinkState,
14};
15
16pub type Reader = Receiver<Value>;
17pub type Writer = Sender<Value>;
18pub type InputImpl = BaseInput<Reader, Writer>;
19
20impl InputImpl {
21 pub fn new(name: &str, kind: HaystackKind, block_id: Uuid) -> Self {
22 Self::new_with_default(name, kind, block_id, Default::default())
23 }
24
25 pub fn new_with_default(
26 name: &str,
27 kind: HaystackKind,
28 block_id: Uuid,
29 default: InputDefault,
30 ) -> Self {
31 let (writer, reader) = channel::<Value>(32);
32
33 Self {
34 name: name.to_string(),
35 kind,
36
37 block_id,
38 connection_count: 0,
39
40 reader,
41 writer,
42
43 val: Default::default(),
44 default,
45 links: Default::default(),
46 }
47 }
48}
49
50impl Input for InputImpl {
51 fn receiver(&mut self) -> Pin<Box<dyn InputReceiver + '_>> {
52 self.reader.recv().boxed()
53 }
54
55 fn set_value(&mut self, value: Value) {
56 for link in &mut self.links {
57 if let Some(tx) = &link.tx {
58 if let Err(__) = tx.try_send(value.clone()) {
59 link.state = LinkState::Error;
60 } else {
61 link.state = LinkState::Connected;
62 }
63 }
64 }
65 self.val = Some(value);
66 }
67}
68
69#[cfg(test)]
70mod test {
71 use libhaystack::val::{kind::HaystackKind, Value};
72 use uuid::Uuid;
73
74 use crate::base::input::InputDefault;
75
76 use super::InputImpl;
77
78 #[test]
79 fn test_input_init() {
80 let input = InputImpl::new_with_default(
81 "test",
82 HaystackKind::Bool,
83 Uuid::new_v4(),
84 InputDefault {
85 val: 0.into(),
86 min: Value::Null,
87 max: Value::Null,
88 },
89 );
90
91 assert_eq!(input.name, "test");
92 assert_eq!(input.kind, HaystackKind::Bool);
93 }
94}