logic_mesh/tokio_impl/engine/
message_dispatch.rs

1// Copyright (c) 2022-2024, Radu Racariu.
2
3use crate::base::block::connect::disconnect_link;
4use crate::base::engine::messages::BlockDefinition;
5use crate::base::engine::messages::BlockInputData;
6use crate::base::engine::messages::BlockOutputData;
7use crate::base::engine::messages::EngineMessage;
8
9use crate::blocks::registry::get_block;
10use crate::single_threaded::Messages;
11use crate::single_threaded::SingleThreadedEngine;
12use libhaystack::val::Value;
13use uuid::Uuid;
14
15use super::eval_block;
16
17pub(super) async fn dispatch_message(engine: &mut SingleThreadedEngine, msg: Messages) {
18    match msg {
19        EngineMessage::AddBlockReq(sender_uuid, block_name, block_uuid, lib) => {
20            log::debug!(
21                "Adding block: {}::{}",
22                lib.clone().unwrap_or("core".into()),
23                block_name,
24            );
25
26            let block_id = if let Some(uuid) = block_uuid {
27                match Uuid::parse_str(&uuid) {
28                    Ok(uuid) => Some(uuid),
29                    Err(_) => {
30                        return reply_to_sender(
31                            engine,
32                            sender_uuid,
33                            EngineMessage::AddBlockRes(Err("Invalid UUID".into())),
34                        )
35                    }
36                }
37            } else {
38                None
39            };
40
41            let block_id = engine
42                .add_block(block_name, block_id, lib)
43                .map_err(|err| err.to_string());
44
45            reply_to_sender(engine, sender_uuid, EngineMessage::AddBlockRes(block_id));
46        }
47
48        EngineMessage::RemoveBlockReq(sender_uuid, block_id) => {
49            log::debug!("Removing block: {:?}", block_id);
50
51            let block_id = engine
52                .remove_block(&block_id)
53                .map_err(|err| err.to_string());
54            reply_to_sender(engine, sender_uuid, EngineMessage::RemoveBlockRes(block_id));
55        }
56
57        EngineMessage::InspectBlockReq(sender_uuid, block_uuid) => {
58            match engine.get_block_props_mut(&block_uuid) {
59                Some(block) => {
60                    let data = BlockDefinition {
61                        id: block.id().to_string(),
62                        name: block.name().to_string(),
63                        library: block.desc().library.clone(),
64                        inputs: block
65                            .inputs()
66                            .iter()
67                            .map(|input| {
68                                (
69                                    input.name().to_string(),
70                                    BlockInputData {
71                                        kind: input.kind().to_string(),
72                                        val: input.get_value().cloned().unwrap_or_default(),
73                                    },
74                                )
75                            })
76                            .collect(),
77                        outputs: block
78                            .outputs()
79                            .iter()
80                            .map(|output| {
81                                (
82                                    output.desc().name.to_string(),
83                                    BlockOutputData {
84                                        kind: output.desc().kind.to_string(),
85                                        val: output.value().clone(),
86                                    },
87                                )
88                            })
89                            .collect(),
90                    };
91
92                    reply_to_sender(
93                        engine,
94                        sender_uuid,
95                        EngineMessage::InspectBlockRes(Ok(data)),
96                    );
97                }
98                None => {
99                    reply_to_sender(
100                        engine,
101                        sender_uuid,
102                        EngineMessage::InspectBlockRes(Err("Block not found".into())),
103                    );
104                }
105            }
106        }
107
108        EngineMessage::EvaluateBlockReq(sender_uuid, name, inputs, lib) => {
109            let Some(block) = get_block(name.as_str(), lib) else {
110                return reply_to_sender(
111                    engine,
112                    sender_uuid,
113                    EngineMessage::EvaluateBlockRes(Err("Block not found".into())),
114                );
115            };
116
117            let response = eval_block(&block.desc, inputs).await;
118
119            reply_to_sender(
120                engine,
121                sender_uuid,
122                EngineMessage::EvaluateBlockRes(response.map_err(|err| err.to_string())),
123            );
124        }
125
126        EngineMessage::WriteBlockOutputReq(sender_uuid, block_uuid, output_name, value) => {
127            let response: Result<Value, String>;
128
129            match engine.get_block_props_mut(&block_uuid) {
130                Some(block) => {
131                    if let Some(output) = block.get_output_mut(&output_name) {
132                        let prev = output.value().clone();
133                        output.set(value);
134
135                        response = Ok(prev);
136                    } else {
137                        response = Err("Output not found".to_string());
138                    }
139                }
140                None => {
141                    response = Err("Block not found".to_string());
142                }
143            }
144
145            reply_to_sender(
146                engine,
147                sender_uuid,
148                EngineMessage::WriteBlockOutputRes(response),
149            );
150        }
151
152        EngineMessage::WriteBlockInputReq(sender_uuid, block_uuid, input_name, value) => {
153            let response: Result<Option<Value>, String>;
154
155            match engine.get_block_props_mut(&block_uuid) {
156                Some(block) => {
157                    if let Some(input) = block.get_input_mut(&input_name) {
158                        let prev = input.get_value().cloned();
159
160                        input.set_value(value);
161
162                        response = Ok(prev);
163                    } else {
164                        response = Err("Input not found".to_string());
165                    }
166                }
167                None => {
168                    response = Err("Block not found".to_string());
169                }
170            }
171
172            reply_to_sender(
173                engine,
174                sender_uuid,
175                EngineMessage::WriteBlockInputRes(response),
176            );
177        }
178
179        EngineMessage::WatchBlockSubReq(sender_uuid, sender) => {
180            engine.watchers.borrow_mut().insert(sender_uuid, sender);
181
182            reply_to_sender(
183                engine,
184                sender_uuid,
185                EngineMessage::WatchBlockSubRes(Ok(sender_uuid)),
186            );
187        }
188
189        EngineMessage::WatchBlockUnsubReq(sender_uuid) => {
190            engine.watchers.borrow_mut().remove(&sender_uuid);
191
192            reply_to_sender(
193                engine,
194                sender_uuid,
195                EngineMessage::WatchBlockUnsubRes(Ok(sender_uuid)),
196            );
197        }
198
199        EngineMessage::GetCurrentProgramReq(sender_uuid) => {
200            log::debug!("GetCurrentProgramReq");
201
202            let program = engine.save_blocks_and_links();
203
204            reply_to_sender(
205                engine,
206                sender_uuid,
207                EngineMessage::GetCurrentProgramRes(program.map_err(|err| err.to_string())),
208            );
209        }
210
211        EngineMessage::ConnectBlocksReq(sender_uuid, link_data) => {
212            log::debug!("ConnectBlocksReq: {:?}", link_data);
213
214            let res = engine.connect_blocks(&link_data);
215            reply_to_sender(
216                engine,
217                sender_uuid,
218                EngineMessage::ConnectBlocksRes(res.map_err(|err| err.to_string())),
219            );
220        }
221
222        EngineMessage::RemoveLinkReq(sender_uuid, link_id) => {
223            log::debug!("RemoveLinkReq: {:?}", link_id);
224
225            let res = engine.blocks_iter_mut().any(|block| {
226                disconnect_link(block, &link_id, |id, name| {
227                    engine.decrement_refresh_block_input(id, name)
228                })
229            });
230
231            reply_to_sender(engine, sender_uuid, EngineMessage::RemoveLinkRes(Ok(res)));
232        }
233
234        _ => unreachable!("Invalid message"),
235    }
236}
237
238fn reply_to_sender(engine: &mut SingleThreadedEngine, sender_uuid: Uuid, engine_message: Messages) {
239    for (sender_id, sender) in engine.reply_senders.iter() {
240        if sender_id != &sender_uuid {
241            continue;
242        }
243
244        let _ = sender.try_send(engine_message.clone());
245    }
246}