Skip to main content

datex_core/runtime/
internal.rs

1use crate::{
2    channel::mpsc::{UnboundedReceiver, create_unbounded_channel},
3    collections::HashMap,
4    global::{
5        dxb_block::{
6            DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
7            OutgoingContextId,
8        },
9        protocol_structures::{
10            block_header::BlockHeader, encrypted_header::EncryptedHeader,
11            routing_header::RoutingHeader,
12        },
13    },
14    network::{
15        com_hub::{
16            ComHub, InterfacePriority, network_response::ResponseOptions,
17        },
18        com_interfaces::local_loopback_interface::LocalLoopbackInterfaceSetupData,
19    },
20    prelude::*,
21    runtime::{
22        RuntimeConfig, RuntimeConfigInterface,
23        execution::{
24            ExecutionError,
25            context::{
26                ExecutionContext, ExecutionMode, RemoteExecutionContext,
27                ScriptExecutionError,
28            },
29        },
30        memory::Memory,
31    },
32    time::Instant,
33    utils::task_manager::TaskManager,
34    values::{
35        core_values::endpoint::Endpoint, value_container::ValueContainer,
36    },
37};
38use alloc::rc::Rc;
39use core::{cell::RefCell, pin::Pin, slice};
40use log::{debug, error, info};
41
42#[derive(Debug)]
43pub struct RuntimeInternal {
44    pub memory: RefCell<Memory>,
45    pub com_hub: Rc<ComHub>,
46    pub endpoint: Endpoint,
47    pub config: RuntimeConfig,
48
49    pub task_manager: TaskManager,
50
51    // receiver for incoming sections from com hub
52    pub(crate) incoming_sections_receiver:
53        RefCell<UnboundedReceiver<IncomingSection>>,
54
55    /// active execution contexts, stored by context_id
56    pub execution_contexts:
57        RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
58}
59
60macro_rules! get_execution_context {
61    // take context and self_rc as parameters
62    ($self_rc:expr, $execution_context:expr) => {
63        match $execution_context {
64            Some(context) => {
65                // set current runtime in execution context if local execution context
66                if let &mut ExecutionContext::Local(ref mut local_context) = context {
67                    local_context.set_runtime_internal($self_rc.clone());
68                }
69                context
70            },
71            None => {
72               &mut ExecutionContext::local(ExecutionMode::Static, $self_rc.clone())
73            }
74        }
75    };
76}
77
78impl RuntimeInternal {
79    pub(crate) fn new(
80        endpoint: Endpoint,
81        memory: RefCell<Memory>,
82        config: RuntimeConfig,
83        com_hub: Rc<ComHub>,
84        task_manager: TaskManager,
85        incoming_sections_receiver: UnboundedReceiver<IncomingSection>,
86    ) -> RuntimeInternal {
87        RuntimeInternal {
88            endpoint,
89            memory,
90            config,
91            com_hub,
92            task_manager,
93            incoming_sections_receiver: RefCell::new(
94                incoming_sections_receiver,
95            ),
96            execution_contexts: RefCell::new(HashMap::new()),
97        }
98    }
99
100    pub fn stub() -> RuntimeInternal {
101        let (sender, receiver) = create_unbounded_channel();
102        RuntimeInternal::new(
103            Endpoint::default(),
104            RefCell::new(Memory::default()),
105            RuntimeConfig::default(),
106            ComHub::create(Endpoint::default(), sender).0,
107            TaskManager::create().0,
108            receiver,
109        )
110    }
111
112    /// Creates all interfaces configured in the runtime config
113    async fn create_configured_interfaces(&self) {
114        if let Some(interfaces) = &self.config.interfaces {
115            for RuntimeConfigInterface {
116                interface_type,
117                setup_data: config,
118                priority,
119            } in interfaces.iter()
120            {
121                let create_future = self
122                    .com_hub
123                    .clone()
124                    .create_interface(interface_type, config.clone(), *priority)
125                    .await;
126                match create_future {
127                    Err(err) => {
128                        error!(
129                            "Failed to create interface {interface_type}: {err:?}"
130                        )
131                    }
132                    Ok((_, ready_receiver)) => {
133                        if let Some(ready_receiver) = ready_receiver {
134                            let _ = ready_receiver.await;
135                        }
136                    }
137                }
138            }
139        }
140    }
141
142    async fn init_local_loopback_interface(&self) {
143        // add default local loopback interface
144        let local_interface_setup_data =
145            LocalLoopbackInterfaceSetupData.create_interface().unwrap();
146
147        let ready_signal = self
148            .com_hub
149            .clone()
150            .add_interface_from_configuration(
151                local_interface_setup_data,
152                InterfacePriority::None,
153            )
154            .expect("Failed to add local loopback interface");
155        // local loopback interface is single socket interface and should always return a ready signal
156        // which should always resolve to Ok
157        ready_signal.unwrap().await.unwrap()
158    }
159
160    /// Performs asynchronous initialization of the runtime
161    pub(crate) async fn init_async(&self) {
162        // create local loopback interface and other configured interfaces
163        self.init_local_loopback_interface().await;
164        self.create_configured_interfaces().await;
165    }
166
167    #[cfg(feature = "compiler")]
168    pub async fn execute(
169        self_rc: Rc<RuntimeInternal>,
170        script: &str,
171        inserted_values: &[ValueContainer],
172        execution_context: Option<&mut ExecutionContext>,
173    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
174        let execution_context =
175            get_execution_context!(self_rc, execution_context);
176        let compile_start = Instant::now();
177        let dxb = execution_context.compile(script, inserted_values)?;
178        debug!(
179            "[Compilation took {} ms]",
180            compile_start.elapsed().as_millis()
181        );
182        let execute_start = Instant::now();
183        let result = RuntimeInternal::execute_dxb(
184            self_rc,
185            dxb,
186            Some(execution_context),
187            true,
188        )
189        .await
190        .map_err(ScriptExecutionError::from);
191        debug!(
192            "[Execution took {} ms]",
193            execute_start.elapsed().as_millis()
194        );
195        result
196    }
197
198    #[cfg(feature = "compiler")]
199    pub fn execute_sync(
200        self_rc: Rc<RuntimeInternal>,
201        script: &str,
202        inserted_values: &[ValueContainer],
203        execution_context: Option<&mut ExecutionContext>,
204    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
205        let execution_context =
206            get_execution_context!(self_rc, execution_context);
207        let compile_start = Instant::now();
208        let dxb = execution_context.compile(script, inserted_values)?;
209        debug!(
210            "[Compilation took {} ms]",
211            compile_start.elapsed().as_millis()
212        );
213        let execute_start = Instant::now();
214        let result = RuntimeInternal::execute_dxb_sync(
215            self_rc,
216            &dxb,
217            Some(execution_context),
218            true,
219        )
220        .map_err(ScriptExecutionError::from);
221        debug!(
222            "[Execution took {} ms]",
223            execute_start.elapsed().as_millis()
224        );
225        result
226    }
227
228    pub fn execute_dxb<'a>(
229        self_rc: Rc<RuntimeInternal>,
230        dxb: Vec<u8>,
231        execution_context: Option<&'a mut ExecutionContext>,
232        _end_execution: bool,
233    ) -> Pin<
234        Box<
235            dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
236                + 'a,
237        >,
238    > {
239        Box::pin(async move {
240            let execution_context =
241                get_execution_context!(self_rc, execution_context);
242            match execution_context {
243                ExecutionContext::Remote(context) => {
244                    RuntimeInternal::execute_remote(self_rc, context, dxb).await
245                }
246                ExecutionContext::Local(_) => {
247                    execution_context.execute_dxb(&dxb).await
248                }
249            }
250        })
251    }
252
253    pub fn execute_dxb_sync(
254        self_rc: Rc<RuntimeInternal>,
255        dxb: &[u8],
256        execution_context: Option<&mut ExecutionContext>,
257        _end_execution: bool,
258    ) -> Result<Option<ValueContainer>, ExecutionError> {
259        let execution_context =
260            get_execution_context!(self_rc, execution_context);
261        match execution_context {
262            ExecutionContext::Remote(_) => {
263                Err(ExecutionError::RequiresAsyncExecution)
264            }
265            ExecutionContext::Local(_) => {
266                execution_context.execute_dxb_sync(dxb)
267            }
268        }
269    }
270
271    /// Returns the existing execution context for the given context_id,
272    /// or creates a new one if it doesn't exist.
273    /// To reuse the context later, the caller must store it back in the map after use.
274    fn take_execution_context(
275        self_rc: Rc<RuntimeInternal>,
276        context_id: &IncomingEndpointContextSectionId,
277    ) -> ExecutionContext {
278        let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
279        // get execution context by context_id or create a new one if it doesn't exist
280        let execution_context = execution_contexts.remove(context_id);
281        if let Some(context) = execution_context {
282            context
283        } else {
284            ExecutionContext::local(ExecutionMode::unbounded(), self_rc.clone())
285        }
286    }
287
288    pub async fn execute_remote(
289        self_rc: Rc<RuntimeInternal>,
290        remote_execution_context: &mut RemoteExecutionContext,
291        dxb: Vec<u8>,
292    ) -> Result<Option<ValueContainer>, ExecutionError> {
293        let routing_header: RoutingHeader = RoutingHeader::default()
294            .with_sender(self_rc.endpoint.clone())
295            .to_owned();
296
297        // get existing context_id for context, or create a new one
298        let context_id =
299            remote_execution_context.context_id.unwrap_or_else(|| {
300                // if the context_id is not set, we create a new one
301                remote_execution_context.context_id =
302                    Some(self_rc.com_hub.block_handler.get_new_context_id());
303                remote_execution_context.context_id.unwrap()
304            });
305
306        let block_header = BlockHeader {
307            context_id,
308            ..BlockHeader::default()
309        };
310        let encrypted_header = EncryptedHeader::default();
311
312        let mut block =
313            DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
314
315        block
316            .set_receivers(slice::from_ref(&remote_execution_context.endpoint));
317
318        let response = self_rc
319            .com_hub
320            .send_own_block_await_response(block, ResponseOptions::default())
321            .await
322            .remove(0)?;
323        let incoming_section = response.take_incoming_section();
324        RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
325            .await
326            .0
327    }
328
329    pub(crate) async fn execute_incoming_section(
330        self_rc: Rc<RuntimeInternal>,
331        mut incoming_section: IncomingSection,
332    ) -> (
333        Result<Option<ValueContainer>, ExecutionError>,
334        Endpoint,
335        OutgoingContextId,
336    ) {
337        let section_context_id =
338            incoming_section.get_section_context_id().clone();
339        let mut context =
340            Self::take_execution_context(self_rc.clone(), &section_context_id);
341        info!(
342            "Executing incoming section with index: {}",
343            incoming_section.get_section_index()
344        );
345
346        let mut result = None;
347        let mut last_block = None;
348
349        // iterate over the blocks in the incoming section
350        loop {
351            let block = incoming_section.next().await;
352            if let Some(block) = block {
353                let res = RuntimeInternal::execute_dxb_block_local(
354                    self_rc.clone(),
355                    block.clone(),
356                    Some(&mut context),
357                )
358                .await;
359                if let Err(err) = res {
360                    return (
361                        Err(err),
362                        block.sender().clone(),
363                        block.block_header.context_id,
364                    );
365                }
366                result = res.unwrap();
367                last_block = Some(block);
368            } else {
369                break;
370            }
371        }
372
373        if last_block.is_none() {
374            unreachable!("Incoming section must contain at least one block");
375        }
376        let last_block = last_block.unwrap();
377        let sender_endpoint = last_block.sender().clone();
378        let context_id = last_block.block_header.context_id;
379
380        // insert the context back into the map for future use
381        // TODO #638: is this needed or can we drop the context after execution here?
382        self_rc
383            .execution_contexts
384            .borrow_mut()
385            .insert(section_context_id, context);
386
387        (Ok(result), sender_endpoint, context_id)
388    }
389
390    async fn execute_dxb_block_local(
391        self_rc: Rc<RuntimeInternal>,
392        block: DXBBlock,
393        execution_context: Option<&mut ExecutionContext>,
394    ) -> Result<Option<ValueContainer>, ExecutionError> {
395        let execution_context =
396            get_execution_context!(self_rc, execution_context);
397        // assert that the execution context is local
398        if !core::matches!(execution_context, ExecutionContext::Local(_)) {
399            unreachable!(
400                "Execution context must be local for executing a DXB block"
401            );
402        }
403        let dxb = block.body;
404        let end_execution =
405            block.block_header.flags_and_timestamp.is_end_of_section();
406        RuntimeInternal::execute_dxb(
407            self_rc,
408            dxb,
409            Some(execution_context),
410            end_execution,
411        )
412        .await
413    }
414
415    pub fn get_env(&self) -> HashMap<String, String> {
416        self.config.env.clone().unwrap_or_default()
417    }
418}