1use crate::{
2 channel::mpsc::UnboundedReceiver,
3 collections::HashMap,
4 global::{
5 dxb_block::{
6 DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
7 OutgoingContextId,
8 },
9 protocol_structures::{
10 block_header::BlockHeader, encrypted_header::EncryptedHeader,
11 routing_header::RoutingHeader,
12 },
13 },
14 network::{
15 com_hub::{
16 ComHub, InterfacePriority, network_response::ResponseOptions,
17 },
18 com_interfaces::local_loopback_interface::LocalLoopbackInterfaceSetupData,
19 },
20 prelude::*,
21 runtime::{
22 RuntimeConfig, RuntimeConfigInterface,
23 execution::{
24 ExecutionError,
25 context::{
26 ExecutionContext, ExecutionMode, RemoteExecutionContext,
27 ScriptExecutionError,
28 },
29 },
30 memory::Memory,
31 },
32 time::Instant,
33 utils::task_manager::TaskManager,
34 values::{
35 core_values::endpoint::Endpoint, value_container::ValueContainer,
36 },
37};
38use alloc::rc::Rc;
39use core::{cell::RefCell, pin::Pin, slice};
40use log::{debug, error, info};
41
42#[derive(Debug)]
43pub struct RuntimeInternal {
44 pub memory: RefCell<Memory>,
45 pub com_hub: Rc<ComHub>,
46 pub endpoint: Endpoint,
47 pub config: RuntimeConfig,
48
49 pub task_manager: TaskManager,
50
51 pub(crate) incoming_sections_receiver:
53 RefCell<UnboundedReceiver<IncomingSection>>,
54
55 pub execution_contexts:
57 RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
58}
59
60macro_rules! get_execution_context {
61 ($self_rc:expr, $execution_context:expr) => {
63 match $execution_context {
64 Some(context) => {
65 if let &mut ExecutionContext::Local(ref mut local_context) = context {
67 local_context.set_runtime_internal($self_rc.clone());
68 }
69 context
70 },
71 None => {
72 &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), ExecutionMode::Static)
73 }
74 }
75 };
76}
77
78impl RuntimeInternal {
79 async fn create_configured_interfaces(&self) {
81 if let Some(interfaces) = &self.config.interfaces {
82 for RuntimeConfigInterface {
83 interface_type,
84 setup_data: config,
85 priority,
86 } in interfaces.iter()
87 {
88 let create_future = self
89 .com_hub
90 .clone()
91 .create_interface(interface_type, config.clone(), *priority)
92 .await;
93 match create_future {
94 Err(err) => {
95 error!(
96 "Failed to create interface {interface_type}: {err:?}"
97 )
98 }
99 Ok((_, ready_receiver)) => {
100 if let Some(ready_receiver) = ready_receiver {
101 let _ = ready_receiver.await;
102 }
103 }
104 }
105 }
106 }
107 }
108
109 async fn init_local_loopback_interface(&self) {
110 let local_interface_setup_data =
112 LocalLoopbackInterfaceSetupData.create_interface().unwrap();
113
114 let ready_signal = self
115 .com_hub
116 .clone()
117 .add_interface_from_configuration(
118 local_interface_setup_data,
119 InterfacePriority::None,
120 )
121 .expect("Failed to add local loopback interface");
122 ready_signal.unwrap().await.unwrap()
125 }
126
127 pub(crate) async fn init_async(&self) {
129 self.init_local_loopback_interface().await;
131 self.create_configured_interfaces().await;
132 }
133
134 #[cfg(feature = "compiler")]
135 pub async fn execute(
136 self_rc: Rc<RuntimeInternal>,
137 script: &str,
138 inserted_values: &[ValueContainer],
139 execution_context: Option<&mut ExecutionContext>,
140 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
141 let execution_context =
142 get_execution_context!(self_rc, execution_context);
143 let compile_start = Instant::now();
144 let dxb = execution_context.compile(script, inserted_values)?;
145 debug!(
146 "[Compilation took {} ms]",
147 compile_start.elapsed().as_millis()
148 );
149 let execute_start = Instant::now();
150 let result = RuntimeInternal::execute_dxb(
151 self_rc,
152 dxb,
153 Some(execution_context),
154 true,
155 )
156 .await
157 .map_err(ScriptExecutionError::from);
158 debug!(
159 "[Execution took {} ms]",
160 execute_start.elapsed().as_millis()
161 );
162 result
163 }
164
165 #[cfg(feature = "compiler")]
166 pub fn execute_sync(
167 self_rc: Rc<RuntimeInternal>,
168 script: &str,
169 inserted_values: &[ValueContainer],
170 execution_context: Option<&mut ExecutionContext>,
171 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
172 let execution_context =
173 get_execution_context!(self_rc, execution_context);
174 let compile_start = Instant::now();
175 let dxb = execution_context.compile(script, inserted_values)?;
176 debug!(
177 "[Compilation took {} ms]",
178 compile_start.elapsed().as_millis()
179 );
180 let execute_start = Instant::now();
181 let result = RuntimeInternal::execute_dxb_sync(
182 self_rc,
183 &dxb,
184 Some(execution_context),
185 true,
186 )
187 .map_err(ScriptExecutionError::from);
188 debug!(
189 "[Execution took {} ms]",
190 execute_start.elapsed().as_millis()
191 );
192 result
193 }
194
195 pub fn execute_dxb<'a>(
196 self_rc: Rc<RuntimeInternal>,
197 dxb: Vec<u8>,
198 execution_context: Option<&'a mut ExecutionContext>,
199 _end_execution: bool,
200 ) -> Pin<
201 Box<
202 dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
203 + 'a,
204 >,
205 > {
206 Box::pin(async move {
207 let execution_context =
208 get_execution_context!(self_rc, execution_context);
209 match execution_context {
210 ExecutionContext::Remote(context) => {
211 RuntimeInternal::execute_remote(self_rc, context, dxb).await
212 }
213 ExecutionContext::Local(_) => {
214 execution_context.execute_dxb(&dxb).await
215 }
216 }
217 })
218 }
219
220 pub fn execute_dxb_sync(
221 self_rc: Rc<RuntimeInternal>,
222 dxb: &[u8],
223 execution_context: Option<&mut ExecutionContext>,
224 _end_execution: bool,
225 ) -> Result<Option<ValueContainer>, ExecutionError> {
226 let execution_context =
227 get_execution_context!(self_rc, execution_context);
228 match execution_context {
229 ExecutionContext::Remote(_) => {
230 Err(ExecutionError::RequiresAsyncExecution)
231 }
232 ExecutionContext::Local(_) => {
233 execution_context.execute_dxb_sync(dxb)
234 }
235 }
236 }
237
238 fn take_execution_context(
242 self_rc: Rc<RuntimeInternal>,
243 context_id: &IncomingEndpointContextSectionId,
244 ) -> ExecutionContext {
245 let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
246 let execution_context = execution_contexts.remove(context_id);
248 if let Some(context) = execution_context {
249 context
250 } else {
251 ExecutionContext::local_with_runtime_internal(
252 self_rc.clone(),
253 ExecutionMode::unbounded(),
254 )
255 }
256 }
257
258 pub async fn execute_remote(
259 self_rc: Rc<RuntimeInternal>,
260 remote_execution_context: &mut RemoteExecutionContext,
261 dxb: Vec<u8>,
262 ) -> Result<Option<ValueContainer>, ExecutionError> {
263 let routing_header: RoutingHeader = RoutingHeader::default()
264 .with_sender(self_rc.endpoint.clone())
265 .to_owned();
266
267 let context_id =
269 remote_execution_context.context_id.unwrap_or_else(|| {
270 remote_execution_context.context_id =
272 Some(self_rc.com_hub.block_handler.get_new_context_id());
273 remote_execution_context.context_id.unwrap()
274 });
275
276 let block_header = BlockHeader {
277 context_id,
278 ..BlockHeader::default()
279 };
280 let encrypted_header = EncryptedHeader::default();
281
282 let mut block =
283 DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
284
285 block
286 .set_receivers(slice::from_ref(&remote_execution_context.endpoint));
287
288 let response = self_rc
289 .com_hub
290 .send_own_block_await_response(block, ResponseOptions::default())
291 .await
292 .remove(0)?;
293 let incoming_section = response.take_incoming_section();
294 RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
295 .await
296 .0
297 }
298
299 pub(crate) async fn execute_incoming_section(
300 self_rc: Rc<RuntimeInternal>,
301 mut incoming_section: IncomingSection,
302 ) -> (
303 Result<Option<ValueContainer>, ExecutionError>,
304 Endpoint,
305 OutgoingContextId,
306 ) {
307 let section_context_id =
308 incoming_section.get_section_context_id().clone();
309 let mut context =
310 Self::take_execution_context(self_rc.clone(), §ion_context_id);
311 info!(
312 "Executing incoming section with index: {}",
313 incoming_section.get_section_index()
314 );
315
316 let mut result = None;
317 let mut last_block = None;
318
319 loop {
321 let block = incoming_section.next().await;
322 if let Some(block) = block {
323 let res = RuntimeInternal::execute_dxb_block_local(
324 self_rc.clone(),
325 block.clone(),
326 Some(&mut context),
327 )
328 .await;
329 if let Err(err) = res {
330 return (
331 Err(err),
332 block.sender().clone(),
333 block.block_header.context_id,
334 );
335 }
336 result = res.unwrap();
337 last_block = Some(block);
338 } else {
339 break;
340 }
341 }
342
343 if last_block.is_none() {
344 unreachable!("Incoming section must contain at least one block");
345 }
346 let last_block = last_block.unwrap();
347 let sender_endpoint = last_block.sender().clone();
348 let context_id = last_block.block_header.context_id;
349
350 self_rc
353 .execution_contexts
354 .borrow_mut()
355 .insert(section_context_id, context);
356
357 (Ok(result), sender_endpoint, context_id)
358 }
359
360 async fn execute_dxb_block_local(
361 self_rc: Rc<RuntimeInternal>,
362 block: DXBBlock,
363 execution_context: Option<&mut ExecutionContext>,
364 ) -> Result<Option<ValueContainer>, ExecutionError> {
365 let execution_context =
366 get_execution_context!(self_rc, execution_context);
367 if !core::matches!(execution_context, ExecutionContext::Local(_)) {
369 unreachable!(
370 "Execution context must be local for executing a DXB block"
371 );
372 }
373 let dxb = block.body;
374 let end_execution =
375 block.block_header.flags_and_timestamp.is_end_of_section();
376 RuntimeInternal::execute_dxb(
377 self_rc,
378 dxb,
379 Some(execution_context),
380 end_execution,
381 )
382 .await
383 }
384
385 pub fn get_env(&self) -> HashMap<String, String> {
386 self.config.env.clone().unwrap_or_default()
387 }
388}