datex_core/runtime/
mod.rs

1#[cfg(feature = "native_crypto")]
2use crate::crypto::crypto_native::CryptoNative;
3use crate::global::dxb_block::{
4    DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
5    OutgoingContextId,
6};
7use crate::global::protocol_structures::block_header::BlockHeader;
8use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
9use crate::global::protocol_structures::routing_header::RoutingHeader;
10use crate::logger::{init_logger, init_logger_debug};
11use crate::network::com_hub::{ComHub, InterfacePriority, ResponseOptions};
12use crate::runtime::execution::ExecutionError;
13use crate::runtime::execution_context::{
14    ExecutionContext, RemoteExecutionContext, ScriptExecutionError,
15};
16use crate::serde::error::SerializationError;
17use crate::serde::serializer::to_value_container;
18use crate::stdlib::{cell::RefCell, rc::Rc};
19use crate::utils::time::Time;
20use crate::values::core_values::endpoint::Endpoint;
21use crate::values::value_container::ValueContainer;
22use datex_core::network::com_interfaces::com_interface::ComInterfaceFactory;
23use futures::channel::oneshot::Sender;
24use global_context::{GlobalContext, set_global_context};
25use log::{error, info};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::fmt::Debug;
29use std::pin::Pin;
30use std::sync::Arc;
31
32pub mod dif_interface;
33pub mod execution;
34pub mod execution_context;
35pub mod global_context;
36pub mod memory;
37mod stack;
38mod update_loop;
39
40use self::memory::Memory;
41
42const VERSION: &str = env!("CARGO_PKG_VERSION");
43
44#[derive(Clone)]
45pub struct Runtime {
46    pub version: String,
47    pub internal: Rc<RuntimeInternal>,
48}
49
50impl Debug for Runtime {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("Runtime")
53            .field("version", &self.version)
54            .finish()
55    }
56}
57
58impl Default for Runtime {
59    fn default() -> Self {
60        Runtime {
61            version: VERSION.to_string(),
62            internal: Rc::new(RuntimeInternal::default()),
63        }
64    }
65}
66
67#[derive(Debug)]
68pub struct RuntimeInternal {
69    pub memory: RefCell<Memory>,
70    pub com_hub: ComHub,
71    pub endpoint: Endpoint,
72    pub config: RuntimeConfig,
73    /// set to true if the update loop should be running
74    /// when set to false, the update loop will stop
75    update_loop_running: RefCell<bool>,
76    update_loop_stop_sender: RefCell<Option<Sender<()>>>,
77
78    /// active execution contexts, stored by context_id
79    pub execution_contexts:
80        RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
81}
82
83impl Default for RuntimeInternal {
84    fn default() -> Self {
85        RuntimeInternal {
86            endpoint: Endpoint::default(),
87            config: RuntimeConfig::default(),
88            memory: RefCell::new(Memory::new(Endpoint::default())),
89            com_hub: ComHub::default(),
90            update_loop_running: RefCell::new(false),
91            update_loop_stop_sender: RefCell::new(None),
92            execution_contexts: RefCell::new(HashMap::new()),
93        }
94    }
95}
96
97macro_rules! get_execution_context {
98    // take context and self_rc as parameters
99    ($self_rc:expr, $execution_context:expr) => {
100        match $execution_context {
101            Some(context) => {
102                // set current runtime in execution context if local execution context
103                if let &mut ExecutionContext::Local(ref mut local_context) = context {
104                    local_context.set_runtime_internal($self_rc.clone());
105                }
106                context
107            },
108            None => {
109               &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), true)
110            }
111        }
112    };
113}
114
115impl RuntimeInternal {
116    pub async fn execute(
117        self_rc: Rc<RuntimeInternal>,
118        script: &str,
119        inserted_values: &[ValueContainer],
120        execution_context: Option<&mut ExecutionContext>,
121    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
122        let execution_context =
123            get_execution_context!(self_rc, execution_context);
124        let dxb = execution_context.compile(script, inserted_values)?;
125        RuntimeInternal::execute_dxb(
126            self_rc,
127            dxb,
128            Some(execution_context),
129            true,
130        )
131        .await
132        .map_err(ScriptExecutionError::from)
133    }
134
135    pub fn execute_sync(
136        self_rc: Rc<RuntimeInternal>,
137        script: &str,
138        inserted_values: &[ValueContainer],
139        execution_context: Option<&mut ExecutionContext>,
140    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
141        let execution_context =
142            get_execution_context!(self_rc, execution_context);
143        let dxb = execution_context.compile(script, inserted_values)?;
144        RuntimeInternal::execute_dxb_sync(
145            self_rc,
146            &dxb,
147            Some(execution_context),
148            true,
149        )
150        .map_err(ScriptExecutionError::from)
151    }
152
153    pub fn execute_dxb<'a>(
154        self_rc: Rc<RuntimeInternal>,
155        dxb: Vec<u8>,
156        execution_context: Option<&'a mut ExecutionContext>,
157        end_execution: bool,
158    ) -> Pin<
159        Box<
160            dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
161                + 'a,
162        >,
163    > {
164        Box::pin(async move {
165            let execution_context =
166                get_execution_context!(self_rc, execution_context);
167            match execution_context {
168                ExecutionContext::Remote(context) => {
169                    RuntimeInternal::execute_remote(self_rc, context, dxb).await
170                }
171                ExecutionContext::Local(_) => {
172                    execution_context.execute_dxb(&dxb, end_execution).await
173                }
174            }
175        })
176    }
177
178    pub fn execute_dxb_sync(
179        self_rc: Rc<RuntimeInternal>,
180        dxb: &[u8],
181        execution_context: Option<&mut ExecutionContext>,
182        end_execution: bool,
183    ) -> Result<Option<ValueContainer>, ExecutionError> {
184        let execution_context =
185            get_execution_context!(self_rc, execution_context);
186        match execution_context {
187            ExecutionContext::Remote(_) => {
188                Err(ExecutionError::RequiresAsyncExecution)
189            }
190            ExecutionContext::Local(_) => {
191                execution_context.execute_dxb_sync(dxb, end_execution)
192            }
193        }
194    }
195
196    /// Returns the existing execution context for the given context_id,
197    /// or creates a new one if it doesn't exist.
198    fn get_execution_context(
199        self_rc: Rc<RuntimeInternal>,
200        context_id: &IncomingEndpointContextSectionId,
201    ) -> ExecutionContext {
202        let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
203        // get execution context by context_id or create a new one if it doesn't exist
204        let execution_context = execution_contexts.get(context_id).cloned();
205        if let Some(context) = execution_context {
206            context
207        } else {
208            let new_context = ExecutionContext::local_with_runtime_internal(
209                self_rc.clone(),
210                false,
211            );
212            // insert the new context into the map
213            execution_contexts.insert(context_id.clone(), new_context.clone());
214            new_context
215        }
216    }
217
218    pub async fn execute_remote(
219        self_rc: Rc<RuntimeInternal>,
220        remote_execution_context: &mut RemoteExecutionContext,
221        dxb: Vec<u8>,
222    ) -> Result<Option<ValueContainer>, ExecutionError> {
223        let routing_header: RoutingHeader = RoutingHeader::default()
224            .with_sender(self_rc.endpoint.clone())
225            .to_owned();
226
227        // get existing context_id for context, or create a new one
228        let context_id =
229            remote_execution_context.context_id.unwrap_or_else(|| {
230                // if the context_id is not set, we create a new one
231                remote_execution_context.context_id =
232                    Some(self_rc.com_hub.block_handler.get_new_context_id());
233                remote_execution_context.context_id.unwrap()
234            });
235
236        let block_header = BlockHeader {
237            context_id,
238            ..BlockHeader::default()
239        };
240        let encrypted_header = EncryptedHeader::default();
241
242        let mut block =
243            DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
244
245        block.set_receivers(std::slice::from_ref(
246            &remote_execution_context.endpoint,
247        ));
248
249        let response = self_rc
250            .com_hub
251            .send_own_block_await_response(block, ResponseOptions::default())
252            .await
253            .remove(0)?;
254        let incoming_section = response.take_incoming_section();
255        RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
256            .await
257            .0
258    }
259
260    async fn execute_incoming_section(
261        self_rc: Rc<RuntimeInternal>,
262        mut incoming_section: IncomingSection,
263    ) -> (
264        Result<Option<ValueContainer>, ExecutionError>,
265        Endpoint,
266        OutgoingContextId,
267    ) {
268        let mut context = Self::get_execution_context(
269            self_rc.clone(),
270            incoming_section.get_section_context_id(),
271        );
272        info!(
273            "Executing incoming section with index: {}",
274            incoming_section.get_section_index()
275        );
276
277        let mut result = None;
278        let mut last_block = None;
279
280        // iterate over the blocks in the incoming section
281        loop {
282            let block = incoming_section.next().await;
283            if let Some(block) = block {
284                let res = RuntimeInternal::execute_dxb_block_local(
285                    self_rc.clone(),
286                    block.clone(),
287                    Some(&mut context),
288                )
289                .await;
290                if let Err(err) = res {
291                    return (
292                        Err(err),
293                        block.get_sender().clone(),
294                        block.block_header.context_id,
295                    );
296                }
297                result = res.unwrap();
298                last_block = Some(block);
299            } else {
300                break;
301            }
302        }
303
304        if last_block.is_none() {
305            unreachable!("Incoming section must contain at least one block");
306        }
307        let last_block = last_block.unwrap();
308        let sender_endpoint = last_block.get_sender().clone();
309        let context_id = last_block.block_header.context_id;
310        (Ok(result), sender_endpoint, context_id)
311    }
312
313    async fn execute_dxb_block_local(
314        self_rc: Rc<RuntimeInternal>,
315        block: DXBBlock,
316        execution_context: Option<&mut ExecutionContext>,
317    ) -> Result<Option<ValueContainer>, ExecutionError> {
318        let execution_context =
319            get_execution_context!(self_rc, execution_context);
320        // assert that the execution context is local
321        if !matches!(execution_context, ExecutionContext::Local(_)) {
322            unreachable!(
323                "Execution context must be local for executing a DXB block"
324            );
325        }
326        let dxb = block.body;
327        let end_execution =
328            block.block_header.flags_and_timestamp.is_end_of_section();
329        RuntimeInternal::execute_dxb(
330            self_rc,
331            dxb,
332            Some(execution_context),
333            end_execution,
334        )
335        .await
336    }
337}
338
339#[derive(Debug, Deserialize, Serialize)]
340pub struct RuntimeConfigInterface {
341    r#type: String,
342    config: ValueContainer,
343}
344
345#[derive(Debug, Default, Deserialize, Serialize)]
346pub struct RuntimeConfig {
347    pub endpoint: Option<Endpoint>,
348    pub interfaces: Option<Vec<RuntimeConfigInterface>>,
349    /// if set to true, the runtime will log debug messages
350    pub debug: Option<bool>,
351}
352
353impl RuntimeConfig {
354    pub fn new_with_endpoint(endpoint: Endpoint) -> Self {
355        RuntimeConfig {
356            endpoint: Some(endpoint),
357            interfaces: None,
358            debug: None,
359        }
360    }
361
362    pub fn add_interface<T: Serialize>(
363        &mut self,
364        r#type: String,
365        config: T,
366    ) -> Result<(), SerializationError> {
367        let config = to_value_container(&config)?;
368        let interface = RuntimeConfigInterface { r#type, config };
369        if let Some(interfaces) = &mut self.interfaces {
370            interfaces.push(interface);
371        } else {
372            self.interfaces = Some(vec![interface]);
373        }
374
375        Ok(())
376    }
377}
378
379/// publicly exposed wrapper impl for the Runtime
380/// around RuntimeInternal
381impl Runtime {
382    pub fn new(config: RuntimeConfig) -> Runtime {
383        let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
384        let com_hub = ComHub::new(endpoint.clone());
385        let memory = RefCell::new(Memory::new(endpoint.clone()));
386        Runtime {
387            version: VERSION.to_string(),
388            internal: Rc::new(RuntimeInternal {
389                endpoint,
390                memory,
391                config,
392                com_hub,
393                ..RuntimeInternal::default()
394            }),
395        }
396    }
397
398    pub fn init(
399        config: RuntimeConfig,
400        global_context: GlobalContext,
401    ) -> Runtime {
402        set_global_context(global_context);
403        if let Some(debug) = config.debug
404            && debug
405        {
406            init_logger_debug();
407        } else {
408            init_logger();
409        }
410        info!(
411            "Runtime initialized - Version {VERSION} Time: {}",
412            Time::now()
413        );
414        Self::new(config)
415    }
416
417    pub fn com_hub(&self) -> &ComHub {
418        &self.internal.com_hub
419    }
420    pub fn endpoint(&self) -> Endpoint {
421        self.internal.endpoint.clone()
422    }
423
424    pub fn internal(&self) -> Rc<RuntimeInternal> {
425        Rc::clone(&self.internal)
426    }
427
428    pub fn memory(&self) -> &RefCell<Memory> {
429        &self.internal.memory
430    }
431
432    #[cfg(feature = "native_crypto")]
433    pub fn init_native(config: RuntimeConfig) -> Runtime {
434        use crate::utils::time_native::TimeNative;
435
436        Self::init(
437            config,
438            GlobalContext::new(Arc::new(CryptoNative), Arc::new(TimeNative)),
439        )
440    }
441
442    /// Starts the common update loop:
443    ///  - ComHub
444    ///  - Runtime
445    pub async fn start(&self) {
446        if *self.internal().update_loop_running.borrow() {
447            info!("runtime update loop already running, skipping start");
448            return;
449        }
450        info!("starting runtime...");
451        self.com_hub()
452            .init()
453            .await
454            .expect("Failed to initialize ComHub");
455
456        // register interface factories
457        self.register_interface_factories();
458
459        // create interfaces
460        if let Some(interfaces) = &self.internal.config.interfaces {
461            for RuntimeConfigInterface { r#type, config } in interfaces.iter() {
462                if let Err(err) = self
463                    .com_hub()
464                    .create_interface(
465                        r#type,
466                        config.clone(),
467                        InterfacePriority::default(),
468                    )
469                    .await
470                {
471                    error!("Failed to create interface {type}: {err:?}");
472                } else {
473                    info!("Created interface: {type}");
474                }
475            }
476        }
477
478        RuntimeInternal::start_update_loop(self.internal());
479    }
480
481    // inits a runtime and starts the update loop
482    pub async fn create(
483        config: RuntimeConfig,
484        global_context: GlobalContext,
485    ) -> Runtime {
486        let runtime = Self::init(config, global_context);
487        runtime.start().await;
488        runtime
489    }
490
491    // inits a native runtime and starts the update loop
492    #[cfg(feature = "native_crypto")]
493    pub async fn create_native(config: RuntimeConfig) -> Runtime {
494        let runtime = Self::init_native(config);
495        runtime.start().await;
496        runtime
497    }
498
499    fn register_interface_factories(&self) {
500        crate::network::com_interfaces::default_com_interfaces::base_interface::BaseInterface::register_on_com_hub(self.com_hub());
501
502        #[cfg(feature = "native_websocket")]
503        crate::network::com_interfaces::default_com_interfaces::websocket::websocket_client_native_interface::WebSocketClientNativeInterface::register_on_com_hub(self.com_hub());
504        #[cfg(feature = "native_websocket")]
505        crate::network::com_interfaces::default_com_interfaces::websocket::websocket_server_native_interface::WebSocketServerNativeInterface::register_on_com_hub(self.com_hub());
506        #[cfg(feature = "native_serial")]
507        crate::network::com_interfaces::default_com_interfaces::serial::serial_native_interface::SerialNativeInterface::register_on_com_hub(self.com_hub());
508        #[cfg(feature = "native_tcp")]
509        crate::network::com_interfaces::default_com_interfaces::tcp::tcp_client_native_interface::TCPClientNativeInterface::register_on_com_hub(self.com_hub());
510        #[cfg(feature = "native_tcp")]
511        crate::network::com_interfaces::default_com_interfaces::tcp::tcp_server_native_interface::TCPServerNativeInterface::register_on_com_hub(self.com_hub());
512        // TODO #234:
513        // #[cfg(feature = "native_webrtc")]
514        // crate::network::com_interfaces::default_com_interfaces::webrtc::webrtc_native_interface::WebRTCNativeInterface::register_on_com_hub(self.com_hub());
515    }
516
517    pub async fn execute(
518        &self,
519        script: &str,
520        inserted_values: &[ValueContainer],
521        execution_context: Option<&mut ExecutionContext>,
522    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
523        RuntimeInternal::execute(
524            self.internal(),
525            script,
526            inserted_values,
527            execution_context,
528        )
529        .await
530    }
531
532    pub fn execute_sync(
533        &self,
534        script: &str,
535        inserted_values: &[ValueContainer],
536        execution_context: Option<&mut ExecutionContext>,
537    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
538        RuntimeInternal::execute_sync(
539            self.internal(),
540            script,
541            inserted_values,
542            execution_context,
543        )
544    }
545
546    pub async fn execute_dxb<'a>(
547        &'a self,
548        dxb: Vec<u8>,
549        execution_context: Option<&'a mut ExecutionContext>,
550        end_execution: bool,
551    ) -> Result<Option<ValueContainer>, ExecutionError> {
552        RuntimeInternal::execute_dxb(
553            self.internal(),
554            dxb,
555            execution_context,
556            end_execution,
557        )
558        .await
559    }
560
561    pub fn execute_dxb_sync(
562        &self,
563        dxb: &[u8],
564        execution_context: Option<&mut ExecutionContext>,
565        end_execution: bool,
566    ) -> Result<Option<ValueContainer>, ExecutionError> {
567        RuntimeInternal::execute_dxb_sync(
568            self.internal(),
569            dxb,
570            execution_context,
571            end_execution,
572        )
573    }
574
575    async fn execute_remote(
576        &self,
577        remote_execution_context: &mut RemoteExecutionContext,
578        dxb: Vec<u8>,
579    ) -> Result<Option<ValueContainer>, ExecutionError> {
580        RuntimeInternal::execute_remote(
581            self.internal(),
582            remote_execution_context,
583            dxb,
584        )
585        .await
586    }
587}