1use crate::{
2 channel::mpsc::{UnboundedReceiver, create_unbounded_channel},
3 collections::HashMap,
4 global::{
5 dxb_block::{
6 DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
7 OutgoingContextId,
8 },
9 protocol_structures::{
10 block_header::BlockHeader, encrypted_header::EncryptedHeader,
11 routing_header::RoutingHeader,
12 },
13 },
14 network::{
15 com_hub::{
16 ComHub, InterfacePriority, network_response::ResponseOptions,
17 },
18 com_interfaces::local_loopback_interface::LocalLoopbackInterfaceSetupData,
19 },
20 prelude::*,
21 runtime::{
22 RuntimeConfig, RuntimeConfigInterface,
23 execution::{
24 ExecutionError,
25 context::{
26 ExecutionContext, ExecutionMode, RemoteExecutionContext,
27 ScriptExecutionError,
28 },
29 },
30 memory::Memory,
31 },
32 time::Instant,
33 utils::task_manager::TaskManager,
34 values::{
35 core_values::endpoint::Endpoint, value_container::ValueContainer,
36 },
37};
38use alloc::rc::Rc;
39use core::{cell::RefCell, pin::Pin, slice};
40use log::{debug, error, info};
41
42#[derive(Debug)]
43pub struct RuntimeInternal {
44 pub memory: RefCell<Memory>,
45 pub com_hub: Rc<ComHub>,
46 pub endpoint: Endpoint,
47 pub config: RuntimeConfig,
48
49 pub task_manager: TaskManager,
50
51 pub(crate) incoming_sections_receiver:
53 RefCell<UnboundedReceiver<IncomingSection>>,
54
55 pub execution_contexts:
57 RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
58}
59
60macro_rules! get_execution_context {
61 ($self_rc:expr, $execution_context:expr) => {
63 match $execution_context {
64 Some(context) => {
65 if let &mut ExecutionContext::Local(ref mut local_context) = context {
67 local_context.set_runtime_internal($self_rc.clone());
68 }
69 context
70 },
71 None => {
72 &mut ExecutionContext::local(ExecutionMode::Static, $self_rc.clone())
73 }
74 }
75 };
76}
77
78impl RuntimeInternal {
79 pub(crate) fn new(
80 endpoint: Endpoint,
81 memory: RefCell<Memory>,
82 config: RuntimeConfig,
83 com_hub: Rc<ComHub>,
84 task_manager: TaskManager,
85 incoming_sections_receiver: UnboundedReceiver<IncomingSection>,
86 ) -> RuntimeInternal {
87 RuntimeInternal {
88 endpoint,
89 memory,
90 config,
91 com_hub,
92 task_manager,
93 incoming_sections_receiver: RefCell::new(
94 incoming_sections_receiver,
95 ),
96 execution_contexts: RefCell::new(HashMap::new()),
97 }
98 }
99
100 pub fn stub() -> RuntimeInternal {
101 let (sender, receiver) = create_unbounded_channel();
102 RuntimeInternal::new(
103 Endpoint::default(),
104 RefCell::new(Memory::default()),
105 RuntimeConfig::default(),
106 ComHub::create(Endpoint::default(), sender).0,
107 TaskManager::create().0,
108 receiver,
109 )
110 }
111
112 async fn create_configured_interfaces(&self) {
114 if let Some(interfaces) = &self.config.interfaces {
115 for RuntimeConfigInterface {
116 interface_type,
117 setup_data: config,
118 priority,
119 } in interfaces.iter()
120 {
121 let create_future = self
122 .com_hub
123 .clone()
124 .create_interface(interface_type, config.clone(), *priority)
125 .await;
126 match create_future {
127 Err(err) => {
128 error!(
129 "Failed to create interface {interface_type}: {err:?}"
130 )
131 }
132 Ok((_, ready_receiver)) => {
133 if let Some(ready_receiver) = ready_receiver {
134 let _ = ready_receiver.await;
135 }
136 }
137 }
138 }
139 }
140 }
141
142 async fn init_local_loopback_interface(&self) {
143 let local_interface_setup_data =
145 LocalLoopbackInterfaceSetupData.create_interface().unwrap();
146
147 let ready_signal = self
148 .com_hub
149 .clone()
150 .add_interface_from_configuration(
151 local_interface_setup_data,
152 InterfacePriority::None,
153 )
154 .expect("Failed to add local loopback interface");
155 ready_signal.unwrap().await.unwrap()
158 }
159
160 pub(crate) async fn init_async(&self) {
162 self.init_local_loopback_interface().await;
164 self.create_configured_interfaces().await;
165 }
166
167 #[cfg(feature = "compiler")]
168 pub async fn execute(
169 self_rc: Rc<RuntimeInternal>,
170 script: &str,
171 inserted_values: &[ValueContainer],
172 execution_context: Option<&mut ExecutionContext>,
173 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
174 let execution_context =
175 get_execution_context!(self_rc, execution_context);
176 let compile_start = Instant::now();
177 let dxb = execution_context.compile(script, inserted_values)?;
178 debug!(
179 "[Compilation took {} ms]",
180 compile_start.elapsed().as_millis()
181 );
182 let execute_start = Instant::now();
183 let result = RuntimeInternal::execute_dxb(
184 self_rc,
185 dxb,
186 Some(execution_context),
187 true,
188 )
189 .await
190 .map_err(ScriptExecutionError::from);
191 debug!(
192 "[Execution took {} ms]",
193 execute_start.elapsed().as_millis()
194 );
195 result
196 }
197
198 #[cfg(feature = "compiler")]
199 pub fn execute_sync(
200 self_rc: Rc<RuntimeInternal>,
201 script: &str,
202 inserted_values: &[ValueContainer],
203 execution_context: Option<&mut ExecutionContext>,
204 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
205 let execution_context =
206 get_execution_context!(self_rc, execution_context);
207 let compile_start = Instant::now();
208 let dxb = execution_context.compile(script, inserted_values)?;
209 debug!(
210 "[Compilation took {} ms]",
211 compile_start.elapsed().as_millis()
212 );
213 let execute_start = Instant::now();
214 let result = RuntimeInternal::execute_dxb_sync(
215 self_rc,
216 &dxb,
217 Some(execution_context),
218 true,
219 )
220 .map_err(ScriptExecutionError::from);
221 debug!(
222 "[Execution took {} ms]",
223 execute_start.elapsed().as_millis()
224 );
225 result
226 }
227
228 pub fn execute_dxb<'a>(
229 self_rc: Rc<RuntimeInternal>,
230 dxb: Vec<u8>,
231 execution_context: Option<&'a mut ExecutionContext>,
232 _end_execution: bool,
233 ) -> Pin<
234 Box<
235 dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
236 + 'a,
237 >,
238 > {
239 Box::pin(async move {
240 let execution_context =
241 get_execution_context!(self_rc, execution_context);
242 match execution_context {
243 ExecutionContext::Remote(context) => {
244 RuntimeInternal::execute_remote(self_rc, context, dxb).await
245 }
246 ExecutionContext::Local(_) => {
247 execution_context.execute_dxb(&dxb).await
248 }
249 }
250 })
251 }
252
253 pub fn execute_dxb_sync(
254 self_rc: Rc<RuntimeInternal>,
255 dxb: &[u8],
256 execution_context: Option<&mut ExecutionContext>,
257 _end_execution: bool,
258 ) -> Result<Option<ValueContainer>, ExecutionError> {
259 let execution_context =
260 get_execution_context!(self_rc, execution_context);
261 match execution_context {
262 ExecutionContext::Remote(_) => {
263 Err(ExecutionError::RequiresAsyncExecution)
264 }
265 ExecutionContext::Local(_) => {
266 execution_context.execute_dxb_sync(dxb)
267 }
268 }
269 }
270
271 fn take_execution_context(
275 self_rc: Rc<RuntimeInternal>,
276 context_id: &IncomingEndpointContextSectionId,
277 ) -> ExecutionContext {
278 let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
279 let execution_context = execution_contexts.remove(context_id);
281 if let Some(context) = execution_context {
282 context
283 } else {
284 ExecutionContext::local(ExecutionMode::unbounded(), self_rc.clone())
285 }
286 }
287
288 pub async fn execute_remote(
289 self_rc: Rc<RuntimeInternal>,
290 remote_execution_context: &mut RemoteExecutionContext,
291 dxb: Vec<u8>,
292 ) -> Result<Option<ValueContainer>, ExecutionError> {
293 let routing_header: RoutingHeader = RoutingHeader::default()
294 .with_sender(self_rc.endpoint.clone())
295 .to_owned();
296
297 let context_id =
299 remote_execution_context.context_id.unwrap_or_else(|| {
300 remote_execution_context.context_id =
302 Some(self_rc.com_hub.block_handler.get_new_context_id());
303 remote_execution_context.context_id.unwrap()
304 });
305
306 let block_header = BlockHeader {
307 context_id,
308 ..BlockHeader::default()
309 };
310 let encrypted_header = EncryptedHeader::default();
311
312 let mut block =
313 DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
314
315 block
316 .set_receivers(slice::from_ref(&remote_execution_context.endpoint));
317
318 let response = self_rc
319 .com_hub
320 .send_own_block_await_response(block, ResponseOptions::default())
321 .await
322 .remove(0)?;
323 let incoming_section = response.take_incoming_section();
324 RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
325 .await
326 .0
327 }
328
329 pub(crate) async fn execute_incoming_section(
330 self_rc: Rc<RuntimeInternal>,
331 mut incoming_section: IncomingSection,
332 ) -> (
333 Result<Option<ValueContainer>, ExecutionError>,
334 Endpoint,
335 OutgoingContextId,
336 ) {
337 let section_context_id =
338 incoming_section.get_section_context_id().clone();
339 let mut context =
340 Self::take_execution_context(self_rc.clone(), §ion_context_id);
341 info!(
342 "Executing incoming section with index: {}",
343 incoming_section.get_section_index()
344 );
345
346 let mut result = None;
347 let mut last_block = None;
348
349 loop {
351 let block = incoming_section.next().await;
352 if let Some(block) = block {
353 let res = RuntimeInternal::execute_dxb_block_local(
354 self_rc.clone(),
355 block.clone(),
356 Some(&mut context),
357 )
358 .await;
359 if let Err(err) = res {
360 return (
361 Err(err),
362 block.sender().clone(),
363 block.block_header.context_id,
364 );
365 }
366 result = res.unwrap();
367 last_block = Some(block);
368 } else {
369 break;
370 }
371 }
372
373 if last_block.is_none() {
374 unreachable!("Incoming section must contain at least one block");
375 }
376 let last_block = last_block.unwrap();
377 let sender_endpoint = last_block.sender().clone();
378 let context_id = last_block.block_header.context_id;
379
380 self_rc
383 .execution_contexts
384 .borrow_mut()
385 .insert(section_context_id, context);
386
387 (Ok(result), sender_endpoint, context_id)
388 }
389
390 async fn execute_dxb_block_local(
391 self_rc: Rc<RuntimeInternal>,
392 block: DXBBlock,
393 execution_context: Option<&mut ExecutionContext>,
394 ) -> Result<Option<ValueContainer>, ExecutionError> {
395 let execution_context =
396 get_execution_context!(self_rc, execution_context);
397 if !core::matches!(execution_context, ExecutionContext::Local(_)) {
399 unreachable!(
400 "Execution context must be local for executing a DXB block"
401 );
402 }
403 let dxb = block.body;
404 let end_execution =
405 block.block_header.flags_and_timestamp.is_end_of_section();
406 RuntimeInternal::execute_dxb(
407 self_rc,
408 dxb,
409 Some(execution_context),
410 end_execution,
411 )
412 .await
413 }
414
415 pub fn get_env(&self) -> HashMap<String, String> {
416 self.config.env.clone().unwrap_or_default()
417 }
418}