Skip to main content

datex_core/runtime/
internal.rs

1use crate::{
2    channel::mpsc::UnboundedReceiver,
3    collections::HashMap,
4    global::{
5        dxb_block::{
6            DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
7            OutgoingContextId,
8        },
9        protocol_structures::{
10            block_header::BlockHeader, encrypted_header::EncryptedHeader,
11            routing_header::RoutingHeader,
12        },
13    },
14    network::{
15        com_hub::{
16            ComHub, InterfacePriority, network_response::ResponseOptions,
17        },
18        com_interfaces::local_loopback_interface::LocalLoopbackInterfaceSetupData,
19    },
20    prelude::*,
21    runtime::{
22        RuntimeConfig, RuntimeConfigInterface,
23        execution::{
24            ExecutionError,
25            context::{
26                ExecutionContext, ExecutionMode, RemoteExecutionContext,
27                ScriptExecutionError,
28            },
29        },
30        memory::Memory,
31    },
32    time::Instant,
33    utils::task_manager::TaskManager,
34    values::{
35        core_values::endpoint::Endpoint, value_container::ValueContainer,
36    },
37};
38use alloc::rc::Rc;
39use core::{cell::RefCell, pin::Pin, slice};
40use log::{debug, error, info};
41
42#[derive(Debug)]
43pub struct RuntimeInternal {
44    pub memory: RefCell<Memory>,
45    pub com_hub: Rc<ComHub>,
46    pub endpoint: Endpoint,
47    pub config: RuntimeConfig,
48
49    pub task_manager: TaskManager,
50
51    // receiver for incoming sections from com hub
52    pub(crate) incoming_sections_receiver:
53        RefCell<UnboundedReceiver<IncomingSection>>,
54
55    /// active execution contexts, stored by context_id
56    pub execution_contexts:
57        RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
58}
59
60macro_rules! get_execution_context {
61    // take context and self_rc as parameters
62    ($self_rc:expr, $execution_context:expr) => {
63        match $execution_context {
64            Some(context) => {
65                // set current runtime in execution context if local execution context
66                if let &mut ExecutionContext::Local(ref mut local_context) = context {
67                    local_context.set_runtime_internal($self_rc.clone());
68                }
69                context
70            },
71            None => {
72               &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), ExecutionMode::Static)
73            }
74        }
75    };
76}
77
78impl RuntimeInternal {
79    /// Creates all interfaces configured in the runtime config
80    async fn create_configured_interfaces(&self) {
81        if let Some(interfaces) = &self.config.interfaces {
82            for RuntimeConfigInterface {
83                interface_type,
84                setup_data: config,
85                priority,
86            } in interfaces.iter()
87            {
88                let create_future = self
89                    .com_hub
90                    .clone()
91                    .create_interface(interface_type, config.clone(), *priority)
92                    .await;
93                match create_future {
94                    Err(err) => {
95                        error!(
96                            "Failed to create interface {interface_type}: {err:?}"
97                        )
98                    }
99                    Ok((_, ready_receiver)) => {
100                        if let Some(ready_receiver) = ready_receiver {
101                            let _ = ready_receiver.await;
102                        }
103                    }
104                }
105            }
106        }
107    }
108
109    async fn init_local_loopback_interface(&self) {
110        // add default local loopback interface
111        let local_interface_setup_data =
112            LocalLoopbackInterfaceSetupData.create_interface().unwrap();
113
114        let ready_signal = self
115            .com_hub
116            .clone()
117            .add_interface_from_configuration(
118                local_interface_setup_data,
119                InterfacePriority::None,
120            )
121            .expect("Failed to add local loopback interface");
122        // local loopback interface is single socket interface and should always return a ready signal
123        // which should always resolve to Ok
124        ready_signal.unwrap().await.unwrap()
125    }
126
127    /// Performs asynchronous initialization of the runtime
128    pub(crate) async fn init_async(&self) {
129        // create local loopback interface and other configured interfaces
130        self.init_local_loopback_interface().await;
131        self.create_configured_interfaces().await;
132    }
133
134    #[cfg(feature = "compiler")]
135    pub async fn execute(
136        self_rc: Rc<RuntimeInternal>,
137        script: &str,
138        inserted_values: &[ValueContainer],
139        execution_context: Option<&mut ExecutionContext>,
140    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
141        let execution_context =
142            get_execution_context!(self_rc, execution_context);
143        let compile_start = Instant::now();
144        let dxb = execution_context.compile(script, inserted_values)?;
145        debug!(
146            "[Compilation took {} ms]",
147            compile_start.elapsed().as_millis()
148        );
149        let execute_start = Instant::now();
150        let result = RuntimeInternal::execute_dxb(
151            self_rc,
152            dxb,
153            Some(execution_context),
154            true,
155        )
156        .await
157        .map_err(ScriptExecutionError::from);
158        debug!(
159            "[Execution took {} ms]",
160            execute_start.elapsed().as_millis()
161        );
162        result
163    }
164
165    #[cfg(feature = "compiler")]
166    pub fn execute_sync(
167        self_rc: Rc<RuntimeInternal>,
168        script: &str,
169        inserted_values: &[ValueContainer],
170        execution_context: Option<&mut ExecutionContext>,
171    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
172        let execution_context =
173            get_execution_context!(self_rc, execution_context);
174        let compile_start = Instant::now();
175        let dxb = execution_context.compile(script, inserted_values)?;
176        debug!(
177            "[Compilation took {} ms]",
178            compile_start.elapsed().as_millis()
179        );
180        let execute_start = Instant::now();
181        let result = RuntimeInternal::execute_dxb_sync(
182            self_rc,
183            &dxb,
184            Some(execution_context),
185            true,
186        )
187        .map_err(ScriptExecutionError::from);
188        debug!(
189            "[Execution took {} ms]",
190            execute_start.elapsed().as_millis()
191        );
192        result
193    }
194
195    pub fn execute_dxb<'a>(
196        self_rc: Rc<RuntimeInternal>,
197        dxb: Vec<u8>,
198        execution_context: Option<&'a mut ExecutionContext>,
199        _end_execution: bool,
200    ) -> Pin<
201        Box<
202            dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
203                + 'a,
204        >,
205    > {
206        Box::pin(async move {
207            let execution_context =
208                get_execution_context!(self_rc, execution_context);
209            match execution_context {
210                ExecutionContext::Remote(context) => {
211                    RuntimeInternal::execute_remote(self_rc, context, dxb).await
212                }
213                ExecutionContext::Local(_) => {
214                    execution_context.execute_dxb(&dxb).await
215                }
216            }
217        })
218    }
219
220    pub fn execute_dxb_sync(
221        self_rc: Rc<RuntimeInternal>,
222        dxb: &[u8],
223        execution_context: Option<&mut ExecutionContext>,
224        _end_execution: bool,
225    ) -> Result<Option<ValueContainer>, ExecutionError> {
226        let execution_context =
227            get_execution_context!(self_rc, execution_context);
228        match execution_context {
229            ExecutionContext::Remote(_) => {
230                Err(ExecutionError::RequiresAsyncExecution)
231            }
232            ExecutionContext::Local(_) => {
233                execution_context.execute_dxb_sync(dxb)
234            }
235        }
236    }
237
238    /// Returns the existing execution context for the given context_id,
239    /// or creates a new one if it doesn't exist.
240    /// To reuse the context later, the caller must store it back in the map after use.
241    fn take_execution_context(
242        self_rc: Rc<RuntimeInternal>,
243        context_id: &IncomingEndpointContextSectionId,
244    ) -> ExecutionContext {
245        let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
246        // get execution context by context_id or create a new one if it doesn't exist
247        let execution_context = execution_contexts.remove(context_id);
248        if let Some(context) = execution_context {
249            context
250        } else {
251            ExecutionContext::local_with_runtime_internal(
252                self_rc.clone(),
253                ExecutionMode::unbounded(),
254            )
255        }
256    }
257
258    pub async fn execute_remote(
259        self_rc: Rc<RuntimeInternal>,
260        remote_execution_context: &mut RemoteExecutionContext,
261        dxb: Vec<u8>,
262    ) -> Result<Option<ValueContainer>, ExecutionError> {
263        let routing_header: RoutingHeader = RoutingHeader::default()
264            .with_sender(self_rc.endpoint.clone())
265            .to_owned();
266
267        // get existing context_id for context, or create a new one
268        let context_id =
269            remote_execution_context.context_id.unwrap_or_else(|| {
270                // if the context_id is not set, we create a new one
271                remote_execution_context.context_id =
272                    Some(self_rc.com_hub.block_handler.get_new_context_id());
273                remote_execution_context.context_id.unwrap()
274            });
275
276        let block_header = BlockHeader {
277            context_id,
278            ..BlockHeader::default()
279        };
280        let encrypted_header = EncryptedHeader::default();
281
282        let mut block =
283            DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
284
285        block
286            .set_receivers(slice::from_ref(&remote_execution_context.endpoint));
287
288        let response = self_rc
289            .com_hub
290            .send_own_block_await_response(block, ResponseOptions::default())
291            .await
292            .remove(0)?;
293        let incoming_section = response.take_incoming_section();
294        RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
295            .await
296            .0
297    }
298
299    pub(crate) async fn execute_incoming_section(
300        self_rc: Rc<RuntimeInternal>,
301        mut incoming_section: IncomingSection,
302    ) -> (
303        Result<Option<ValueContainer>, ExecutionError>,
304        Endpoint,
305        OutgoingContextId,
306    ) {
307        let section_context_id =
308            incoming_section.get_section_context_id().clone();
309        let mut context =
310            Self::take_execution_context(self_rc.clone(), &section_context_id);
311        info!(
312            "Executing incoming section with index: {}",
313            incoming_section.get_section_index()
314        );
315
316        let mut result = None;
317        let mut last_block = None;
318
319        // iterate over the blocks in the incoming section
320        loop {
321            let block = incoming_section.next().await;
322            if let Some(block) = block {
323                let res = RuntimeInternal::execute_dxb_block_local(
324                    self_rc.clone(),
325                    block.clone(),
326                    Some(&mut context),
327                )
328                .await;
329                if let Err(err) = res {
330                    return (
331                        Err(err),
332                        block.sender().clone(),
333                        block.block_header.context_id,
334                    );
335                }
336                result = res.unwrap();
337                last_block = Some(block);
338            } else {
339                break;
340            }
341        }
342
343        if last_block.is_none() {
344            unreachable!("Incoming section must contain at least one block");
345        }
346        let last_block = last_block.unwrap();
347        let sender_endpoint = last_block.sender().clone();
348        let context_id = last_block.block_header.context_id;
349
350        // insert the context back into the map for future use
351        // TODO #638: is this needed or can we drop the context after execution here?
352        self_rc
353            .execution_contexts
354            .borrow_mut()
355            .insert(section_context_id, context);
356
357        (Ok(result), sender_endpoint, context_id)
358    }
359
360    async fn execute_dxb_block_local(
361        self_rc: Rc<RuntimeInternal>,
362        block: DXBBlock,
363        execution_context: Option<&mut ExecutionContext>,
364    ) -> Result<Option<ValueContainer>, ExecutionError> {
365        let execution_context =
366            get_execution_context!(self_rc, execution_context);
367        // assert that the execution context is local
368        if !core::matches!(execution_context, ExecutionContext::Local(_)) {
369            unreachable!(
370                "Execution context must be local for executing a DXB block"
371            );
372        }
373        let dxb = block.body;
374        let end_execution =
375            block.block_header.flags_and_timestamp.is_end_of_section();
376        RuntimeInternal::execute_dxb(
377            self_rc,
378            dxb,
379            Some(execution_context),
380            end_execution,
381        )
382        .await
383    }
384
385    pub fn get_env(&self) -> HashMap<String, String> {
386        self.config.env.clone().unwrap_or_default()
387    }
388}