datex_core/runtime/
mod.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use futures::channel::oneshot::Sender;
6#[cfg(feature = "native_crypto")]
7use crate::crypto::crypto_native::CryptoNative;
8use crate::values::core_values::endpoint::Endpoint;
9use crate::logger::{init_logger, init_logger_debug};
10use crate::stdlib::{cell::RefCell, rc::Rc};
11use global_context::{get_global_context, set_global_context, GlobalContext};
12use log::{error, info};
13use serde::{Deserialize, Serialize};
14use datex_core::network::com_interfaces::com_interface::ComInterfaceFactory;
15use datex_core::values::serde::error::SerializationError;
16use crate::global::dxb_block::{DXBBlock, IncomingEndpointContextSectionId, IncomingSection, OutgoingContextId};
17use crate::global::protocol_structures::block_header::BlockHeader;
18use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
19use crate::global::protocol_structures::routing_header;
20use crate::global::protocol_structures::routing_header::RoutingHeader;
21use crate::values::value_container::ValueContainer;
22use crate::network::com_hub::{ComHub, InterfacePriority, ResponseOptions};
23use crate::runtime::execution::ExecutionError;
24use crate::runtime::execution_context::{ExecutionContext, RemoteExecutionContext, ScriptExecutionError};
25use crate::values::serde::serializer::to_value_container;
26
27pub mod execution;
28pub mod global_context;
29pub mod memory;
30mod stack;
31pub mod execution_context;
32mod update_loop;
33
34use self::memory::Memory;
35
36const VERSION: &str = env!("CARGO_PKG_VERSION");
37
38#[derive(Clone)]
39pub struct Runtime {
40    pub version: String,
41    pub internal: Rc<RuntimeInternal>,
42}
43
44impl Debug for Runtime {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("Runtime")
47            .field("version", &self.version)
48            .finish()
49    }
50}
51
52impl Default for Runtime {
53    fn default() -> Self {
54        Runtime {
55            version: VERSION.to_string(),
56            internal: Rc::new(RuntimeInternal::default()),
57        }
58    }
59}
60
61
62#[derive(Debug)]
63pub struct RuntimeInternal {
64    pub memory: RefCell<Memory>,
65    pub com_hub: ComHub,
66    pub endpoint: Endpoint,
67    pub config: RuntimeConfig,
68    /// set to true if the update loop should be running
69    /// when set to false, the update loop will stop
70    update_loop_running: RefCell<bool>,
71    update_loop_stop_sender: RefCell<Option<Sender<()>>>,
72
73    /// active execution contexts, stored by context_id
74    pub execution_contexts: RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
75}
76
77impl Default for RuntimeInternal {
78    fn default() -> Self {
79        RuntimeInternal {
80            endpoint: Endpoint::default(),
81            config: RuntimeConfig::default(),
82            memory: RefCell::new(Memory::new()),
83            com_hub: ComHub::default(),
84            update_loop_running: RefCell::new(false),
85            update_loop_stop_sender: RefCell::new(None),
86            execution_contexts: RefCell::new(HashMap::new()),
87        }
88    }
89}
90
91macro_rules! get_execution_context {
92    // take context and self_rc as parameters
93    ($self_rc:expr, $execution_context:expr) => {
94        match $execution_context {
95            Some(context) => {
96                // set current runtime in execution context if local execution context
97                if let &mut ExecutionContext::Local(ref mut local_context) = context {
98                    local_context.set_runtime_internal($self_rc.clone());
99                }
100                context
101            },
102            None => {
103               &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), true)
104            }
105        }
106    };
107}
108
109
110impl RuntimeInternal {
111
112    pub async fn execute(
113        self_rc: Rc<RuntimeInternal>,
114        script: &str,
115        inserted_values: &[ValueContainer],
116        execution_context: Option<&mut ExecutionContext>,
117    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
118        let execution_context = get_execution_context!(self_rc, execution_context);
119        let dxb = execution_context.compile(script, inserted_values)?;
120        RuntimeInternal::execute_dxb(self_rc, dxb, Some(execution_context), true)
121            .await
122            .map_err(ScriptExecutionError::from)
123    }
124
125    pub fn execute_sync(
126        self_rc: Rc<RuntimeInternal>,
127        script: &str,
128        inserted_values: &[ValueContainer],
129        execution_context: Option<&mut ExecutionContext>,
130    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
131        let execution_context = get_execution_context!(self_rc, execution_context);
132        let dxb = execution_context.compile(script, inserted_values)?;
133        RuntimeInternal::execute_dxb_sync(self_rc, &dxb, Some(execution_context), true)
134            .map_err(ScriptExecutionError::from)
135    }
136
137    pub fn execute_dxb<'a>(
138        self_rc: Rc<RuntimeInternal>,
139        dxb: Vec<u8>,
140        execution_context: Option<&'a mut ExecutionContext>,
141        end_execution: bool,
142    ) -> Pin<Box<dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>> + 'a>> {
143        Box::pin(async move {
144            let execution_context = get_execution_context!(self_rc, execution_context);
145            match execution_context {
146                ExecutionContext::Remote(context) => {
147                    RuntimeInternal::execute_remote(self_rc ,context, dxb).await
148                },
149                ExecutionContext::Local(_) => {
150                    execution_context.execute_dxb(&dxb, end_execution).await
151                }
152            }
153        })
154    }
155
156    pub fn execute_dxb_sync(
157        self_rc: Rc<RuntimeInternal>,
158        dxb: &[u8],
159        execution_context: Option<&mut ExecutionContext>,
160        end_execution: bool,
161    ) -> Result<Option<ValueContainer>, ExecutionError> {
162        let execution_context = get_execution_context!(self_rc, execution_context);
163        match execution_context {
164            ExecutionContext::Remote(_) => {
165                Err(ExecutionError::RequiresAsyncExecution)
166            }
167            ExecutionContext::Local(_)  => {
168                execution_context.execute_dxb_sync(dxb, end_execution)
169            }
170        }
171    }
172
173    /// Returns the existing execution context for the given context_id,
174    /// or creates a new one if it doesn't exist.
175    fn get_execution_context(
176        self_rc: Rc<RuntimeInternal>,
177        context_id: &IncomingEndpointContextSectionId,
178    ) -> ExecutionContext {
179        let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
180        // get execution context by context_id or create a new one if it doesn't exist
181        let execution_context = execution_contexts.get(context_id).cloned();
182        if let Some(context) = execution_context {
183            context
184        } else {
185            let new_context = ExecutionContext::local_with_runtime_internal(self_rc.clone(), false);
186            // insert the new context into the map
187            execution_contexts.insert(context_id.clone(), new_context.clone());
188            new_context
189        }
190    }
191
192    pub async fn execute_remote(
193        self_rc: Rc<RuntimeInternal>,
194        remote_execution_context: &mut RemoteExecutionContext,
195        dxb: Vec<u8>
196    ) -> Result<Option<ValueContainer>, ExecutionError> {
197        let routing_header: RoutingHeader = RoutingHeader {
198            version: 2,
199            flags: routing_header::Flags::new(),
200            block_size_u16: Some(0),
201            block_size_u32: None,
202            sender: self_rc.endpoint.clone(),
203            receivers: routing_header::Receivers {
204                flags: routing_header::ReceiverFlags::new()
205                    .with_has_endpoints(false)
206                    .with_has_pointer_id(false)
207                    .with_has_endpoint_keys(false),
208                pointer_id: None,
209                endpoints: None,
210                endpoints_with_keys: None,
211            },
212            ..RoutingHeader::default()
213        };
214
215        // get existing context_id for context, or create a new one
216        let context_id = remote_execution_context.context_id.unwrap_or_else(|| {
217            // if the context_id is not set, we create a new one
218            remote_execution_context.context_id = Some(self_rc.com_hub.block_handler.get_new_context_id());
219            remote_execution_context.context_id.unwrap()
220        });
221
222        let block_header = BlockHeader {
223            context_id,
224            ..BlockHeader::default()
225        };
226        let encrypted_header = EncryptedHeader::default();
227
228        let mut block =
229            DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
230
231        block.set_receivers(std::slice::from_ref(&remote_execution_context.endpoint));
232
233        let response = self_rc.com_hub.send_own_block_await_response(block, ResponseOptions::default()).await.remove(0)?;
234        let incoming_section = response.take_incoming_section();
235        RuntimeInternal::execute_incoming_section(self_rc, incoming_section).await.0
236    }
237
238    async fn execute_incoming_section(
239        self_rc: Rc<RuntimeInternal>,
240        mut incoming_section: IncomingSection,
241    ) -> (Result<Option<ValueContainer>, ExecutionError>, Endpoint, OutgoingContextId) {
242
243        let mut context = Self::get_execution_context(self_rc.clone(), incoming_section.get_section_context_id());
244        info!("Executing incoming section with index: {}", incoming_section.get_section_index());
245
246        let mut result = None;
247        let mut last_block = None;
248
249        // iterate over the blocks in the incoming section
250        loop {
251            let block = incoming_section.next().await;
252            if let Some(block) = block {
253                let res = RuntimeInternal::execute_dxb_block_local(self_rc.clone(), block.clone(), Some(&mut context)).await;
254                if let Err(err) = res {
255                    return (Err(err), block.get_sender().clone(), block.block_header.context_id);
256                }
257                result = res.unwrap();
258                last_block = Some(block);
259            }
260            else {
261                break;
262            }
263        }
264
265        if last_block.is_none() {
266            unreachable!("Incoming section must contain at least one block");
267        }
268        let last_block = last_block.unwrap();
269        let sender_endpoint = last_block.get_sender().clone();
270        let context_id = last_block.block_header.context_id;
271        (Ok(result), sender_endpoint, context_id)
272    }
273
274    async fn execute_dxb_block_local(
275        self_rc: Rc<RuntimeInternal>,
276        block: DXBBlock,
277        execution_context: Option<&mut ExecutionContext>,
278    ) -> Result<Option<ValueContainer>, ExecutionError> {
279        let execution_context = get_execution_context!(self_rc, execution_context);
280        // assert that the execution context is local
281        if !matches!(execution_context, ExecutionContext::Local(_)) {
282            unreachable!("Execution context must be local for executing a DXB block");
283        }
284        let dxb = block.body;
285        let end_execution = block.block_header.flags_and_timestamp.is_end_of_section();
286        RuntimeInternal::execute_dxb(self_rc, dxb, Some(execution_context), end_execution).await
287    }
288}
289
290#[derive(Debug, Deserialize, Serialize)]
291pub struct RuntimeConfigInterface {
292    r#type: String,
293    config: ValueContainer,
294}
295
296#[derive(Debug, Default, Deserialize, Serialize)]
297pub struct RuntimeConfig {
298    pub endpoint: Option<Endpoint>,
299    pub interfaces: Option<Vec<RuntimeConfigInterface>>,
300    /// if set to true, the runtime will log debug messages
301    pub debug: Option<bool>,
302}
303
304impl RuntimeConfig {
305    pub fn new_with_endpoint(endpoint: Endpoint) -> Self {
306        RuntimeConfig {
307            endpoint: Some(endpoint),
308            interfaces: None,
309            debug: None,
310        }
311    }
312    
313    pub fn add_interface<T: Serialize>(
314        &mut self,
315        r#type: String,
316        config: T,
317    ) -> Result<(), SerializationError> {
318        let config = to_value_container(&config)?;
319        let interface = RuntimeConfigInterface { r#type, config };
320        if let Some(interfaces) = &mut self.interfaces {
321            interfaces.push(interface);
322        } else {
323            self.interfaces = Some(vec![interface]);
324        }
325        
326        Ok(())
327    }
328}
329
330/// publicly exposed wrapper impl for the Runtime
331/// around RuntimeInternal
332impl Runtime {
333    pub fn new(config: RuntimeConfig) -> Runtime {
334        let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
335        let com_hub = ComHub::new(endpoint.clone());
336        Runtime {
337            version: VERSION.to_string(),
338            internal: Rc::new(RuntimeInternal {
339                endpoint,
340                config,
341                com_hub,
342                ..RuntimeInternal::default()
343            })
344        }
345    }
346    pub fn init(
347        config: RuntimeConfig,
348        global_context: GlobalContext,
349    ) -> Runtime {
350        set_global_context(global_context);
351        if let Some(debug) = config.debug && debug {
352            init_logger_debug();
353        } else {
354            init_logger();
355        }
356        info!(
357            "Runtime initialized - Version {VERSION} Time: {}",
358            get_global_context().time.lock().unwrap().now()
359        );
360        Self::new(config)
361    }
362
363    pub fn com_hub(&self) -> &ComHub {
364       &self.internal.com_hub
365    }
366    pub fn endpoint(&self) -> Endpoint {
367        self.internal.endpoint.clone()
368    }
369
370    pub fn internal(&self) -> Rc<RuntimeInternal> {
371        Rc::clone(&self.internal)
372    }
373
374    pub fn memory(&self) -> &RefCell<Memory> {
375        &self.internal.memory
376    }
377
378    #[cfg(feature = "native_crypto")]
379    pub fn init_native(config: RuntimeConfig) -> Runtime {
380        use crate::utils::time_native::TimeNative;
381
382        Self::init(
383            config,
384            GlobalContext::new(
385                Arc::new(Mutex::new(CryptoNative)),
386                Arc::new(Mutex::new(TimeNative)),
387            ),
388        )
389    }
390
391    /// Starts the common update loop:
392    ///  - ComHub
393    ///  - Runtime
394    pub async fn start(&self) {
395        if *self.internal().update_loop_running.borrow() {
396            info!("runtime update loop already running, skipping start");
397            return;
398        }
399        info!("starting runtime...");
400        self.com_hub()
401            .init()
402            .await
403            .expect("Failed to initialize ComHub");
404
405        // register interface factories
406        self.register_interface_factories();
407
408        // create interfaces
409        if let Some(interfaces) = &self.internal.config.interfaces {
410            for RuntimeConfigInterface {r#type, config} in interfaces.iter() {
411                if let Err(err) = self.com_hub().create_interface(r#type, config.clone(), InterfacePriority::default()).await {
412                    error!("Failed to create interface {type}: {err:?}");
413                } else {
414                    info!("Created interface: {type}");
415                }
416            }
417        }
418
419        RuntimeInternal::start_update_loop(self.internal());
420    }
421
422    // inits a runtime and starts the update loop
423    pub async fn create(
424        config: RuntimeConfig,
425        global_context: GlobalContext,
426    ) -> Runtime {
427        let runtime = Self::init(config, global_context);
428        runtime.start().await;
429        runtime
430    }
431
432    // inits a native runtime and starts the update loop
433    #[cfg(feature = "native_crypto")]
434    pub async fn create_native(
435        config: RuntimeConfig,
436    ) -> Runtime {
437        let runtime = Self::init_native(config);
438        runtime.start().await;
439        runtime
440    }
441
442    fn register_interface_factories(&self) {
443        crate::network::com_interfaces::default_com_interfaces::base_interface::BaseInterface::register_on_com_hub(self.com_hub());
444
445        #[cfg(feature = "native_websocket")]
446        crate::network::com_interfaces::default_com_interfaces::websocket::websocket_client_native_interface::WebSocketClientNativeInterface::register_on_com_hub(self.com_hub());
447        #[cfg(feature = "native_websocket")]
448        crate::network::com_interfaces::default_com_interfaces::websocket::websocket_server_native_interface::WebSocketServerNativeInterface::register_on_com_hub(self.com_hub());
449        #[cfg(feature = "native_serial")]
450        crate::network::com_interfaces::default_com_interfaces::serial::serial_native_interface::SerialNativeInterface::register_on_com_hub(self.com_hub());
451        #[cfg(feature = "native_tcp")]
452        crate::network::com_interfaces::default_com_interfaces::tcp::tcp_client_native_interface::TCPClientNativeInterface::register_on_com_hub(self.com_hub());
453        #[cfg(feature = "native_tcp")]
454        crate::network::com_interfaces::default_com_interfaces::tcp::tcp_server_native_interface::TCPServerNativeInterface::register_on_com_hub(self.com_hub());
455        // TODO #234:
456        // #[cfg(feature = "native_webrtc")]
457        // crate::network::com_interfaces::default_com_interfaces::webrtc::webrtc_native_interface::WebRTCNativeInterface::register_on_com_hub(self.com_hub());
458    }
459    
460    pub async fn execute(
461        &self,
462        script: &str,
463        inserted_values: &[ValueContainer],
464        execution_context: Option<&mut ExecutionContext>,
465    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
466        RuntimeInternal::execute(self.internal(), script, inserted_values, execution_context).await
467    }
468
469    pub fn execute_sync(
470        &self,
471        script: &str,
472        inserted_values: &[ValueContainer],
473        execution_context: Option<&mut ExecutionContext>,
474    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
475        RuntimeInternal::execute_sync(self.internal(), script, inserted_values, execution_context)
476    }
477
478    pub async fn execute_dxb<'a>(
479        &'a self,
480        dxb: Vec<u8>,
481        execution_context: Option<&'a mut ExecutionContext>,
482        end_execution: bool,
483    ) -> Result<Option<ValueContainer>, ExecutionError> {
484        RuntimeInternal::execute_dxb(self.internal(), dxb, execution_context, end_execution).await
485    }
486
487    pub fn execute_dxb_sync(
488        &self,
489        dxb: &[u8],
490        execution_context: Option<&mut ExecutionContext>,
491        end_execution: bool,
492    ) -> Result<Option<ValueContainer>, ExecutionError> {
493        RuntimeInternal::execute_dxb_sync(self.internal(), dxb, execution_context, end_execution)
494    }
495
496    async fn execute_remote(
497        &self,
498        remote_execution_context: &mut RemoteExecutionContext,
499        dxb: Vec<u8>
500    ) -> Result<Option<ValueContainer>, ExecutionError> {
501        RuntimeInternal::execute_remote(self.internal(), remote_execution_context, dxb).await
502    }
503}