logic_mesh/tokio_impl/engine/
message_dispatch.rs1use crate::base::block::connect::disconnect_link;
4use crate::base::engine::messages::BlockDefinition;
5use crate::base::engine::messages::BlockInputData;
6use crate::base::engine::messages::BlockOutputData;
7use crate::base::engine::messages::EngineMessage;
8
9use crate::blocks::registry::get_block;
10use crate::single_threaded::Messages;
11use crate::single_threaded::SingleThreadedEngine;
12use libhaystack::val::Value;
13use uuid::Uuid;
14
15use super::eval_block;
16
17pub(super) async fn dispatch_message(engine: &mut SingleThreadedEngine, msg: Messages) {
18 match msg {
19 EngineMessage::AddBlockReq(sender_uuid, block_name, block_uuid, lib) => {
20 log::debug!(
21 "Adding block: {}::{}",
22 lib.clone().unwrap_or("core".into()),
23 block_name,
24 );
25
26 let block_id = if let Some(uuid) = block_uuid {
27 match Uuid::parse_str(&uuid) {
28 Ok(uuid) => Some(uuid),
29 Err(_) => {
30 return reply_to_sender(
31 engine,
32 sender_uuid,
33 EngineMessage::AddBlockRes(Err("Invalid UUID".into())),
34 )
35 }
36 }
37 } else {
38 None
39 };
40
41 let block_id = engine
42 .add_block(block_name, block_id, lib)
43 .map_err(|err| err.to_string());
44
45 reply_to_sender(engine, sender_uuid, EngineMessage::AddBlockRes(block_id));
46 }
47
48 EngineMessage::RemoveBlockReq(sender_uuid, block_id) => {
49 log::debug!("Removing block: {:?}", block_id);
50
51 let block_id = engine
52 .remove_block(&block_id)
53 .map_err(|err| err.to_string());
54 reply_to_sender(engine, sender_uuid, EngineMessage::RemoveBlockRes(block_id));
55 }
56
57 EngineMessage::InspectBlockReq(sender_uuid, block_uuid) => {
58 match engine.get_block_props_mut(&block_uuid) {
59 Some(block) => {
60 let data = BlockDefinition {
61 id: block.id().to_string(),
62 name: block.name().to_string(),
63 library: block.desc().library.clone(),
64 inputs: block
65 .inputs()
66 .iter()
67 .map(|input| {
68 (
69 input.name().to_string(),
70 BlockInputData {
71 kind: input.kind().to_string(),
72 val: input.get_value().cloned().unwrap_or_default(),
73 },
74 )
75 })
76 .collect(),
77 outputs: block
78 .outputs()
79 .iter()
80 .map(|output| {
81 (
82 output.desc().name.to_string(),
83 BlockOutputData {
84 kind: output.desc().kind.to_string(),
85 val: output.value().clone(),
86 },
87 )
88 })
89 .collect(),
90 };
91
92 reply_to_sender(
93 engine,
94 sender_uuid,
95 EngineMessage::InspectBlockRes(Ok(data)),
96 );
97 }
98 None => {
99 reply_to_sender(
100 engine,
101 sender_uuid,
102 EngineMessage::InspectBlockRes(Err("Block not found".into())),
103 );
104 }
105 }
106 }
107
108 EngineMessage::EvaluateBlockReq(sender_uuid, name, inputs, lib) => {
109 let Some(block) = get_block(name.as_str(), lib) else {
110 return reply_to_sender(
111 engine,
112 sender_uuid,
113 EngineMessage::EvaluateBlockRes(Err("Block not found".into())),
114 );
115 };
116
117 let response = eval_block(&block.desc, inputs).await;
118
119 reply_to_sender(
120 engine,
121 sender_uuid,
122 EngineMessage::EvaluateBlockRes(response.map_err(|err| err.to_string())),
123 );
124 }
125
126 EngineMessage::WriteBlockOutputReq(sender_uuid, block_uuid, output_name, value) => {
127 let response: Result<Value, String>;
128
129 match engine.get_block_props_mut(&block_uuid) {
130 Some(block) => {
131 if let Some(output) = block.get_output_mut(&output_name) {
132 let prev = output.value().clone();
133 output.set(value);
134
135 response = Ok(prev);
136 } else {
137 response = Err("Output not found".to_string());
138 }
139 }
140 None => {
141 response = Err("Block not found".to_string());
142 }
143 }
144
145 reply_to_sender(
146 engine,
147 sender_uuid,
148 EngineMessage::WriteBlockOutputRes(response),
149 );
150 }
151
152 EngineMessage::WriteBlockInputReq(sender_uuid, block_uuid, input_name, value) => {
153 let response: Result<Option<Value>, String>;
154
155 match engine.get_block_props_mut(&block_uuid) {
156 Some(block) => {
157 if let Some(input) = block.get_input_mut(&input_name) {
158 let prev = input.get_value().cloned();
159
160 input.set_value(value);
161
162 response = Ok(prev);
163 } else {
164 response = Err("Input not found".to_string());
165 }
166 }
167 None => {
168 response = Err("Block not found".to_string());
169 }
170 }
171
172 reply_to_sender(
173 engine,
174 sender_uuid,
175 EngineMessage::WriteBlockInputRes(response),
176 );
177 }
178
179 EngineMessage::WatchBlockSubReq(sender_uuid, sender) => {
180 engine.watchers.borrow_mut().insert(sender_uuid, sender);
181
182 reply_to_sender(
183 engine,
184 sender_uuid,
185 EngineMessage::WatchBlockSubRes(Ok(sender_uuid)),
186 );
187 }
188
189 EngineMessage::WatchBlockUnsubReq(sender_uuid) => {
190 engine.watchers.borrow_mut().remove(&sender_uuid);
191
192 reply_to_sender(
193 engine,
194 sender_uuid,
195 EngineMessage::WatchBlockUnsubRes(Ok(sender_uuid)),
196 );
197 }
198
199 EngineMessage::GetCurrentProgramReq(sender_uuid) => {
200 log::debug!("GetCurrentProgramReq");
201
202 let program = engine.save_blocks_and_links();
203
204 reply_to_sender(
205 engine,
206 sender_uuid,
207 EngineMessage::GetCurrentProgramRes(program.map_err(|err| err.to_string())),
208 );
209 }
210
211 EngineMessage::ConnectBlocksReq(sender_uuid, link_data) => {
212 log::debug!("ConnectBlocksReq: {:?}", link_data);
213
214 let res = engine.connect_blocks(&link_data);
215 reply_to_sender(
216 engine,
217 sender_uuid,
218 EngineMessage::ConnectBlocksRes(res.map_err(|err| err.to_string())),
219 );
220 }
221
222 EngineMessage::RemoveLinkReq(sender_uuid, link_id) => {
223 log::debug!("RemoveLinkReq: {:?}", link_id);
224
225 let res = engine.blocks_iter_mut().any(|block| {
226 disconnect_link(block, &link_id, |id, name| {
227 engine.decrement_refresh_block_input(id, name)
228 })
229 });
230
231 reply_to_sender(engine, sender_uuid, EngineMessage::RemoveLinkRes(Ok(res)));
232 }
233
234 _ => unreachable!("Invalid message"),
235 }
236}
237
238fn reply_to_sender(engine: &mut SingleThreadedEngine, sender_uuid: Uuid, engine_message: Messages) {
239 for (sender_id, sender) in engine.reply_senders.iter() {
240 if sender_id != &sender_uuid {
241 continue;
242 }
243
244 let _ = sender.try_send(engine_message.clone());
245 }
246}