datex_core/runtime/
mod.rs

1use crate::collections::HashMap;
2#[cfg(all(feature = "native_crypto", feature = "std"))]
3use crate::crypto::crypto_native::CryptoNative;
4use crate::global::dxb_block::{
5    DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
6    OutgoingContextId,
7};
8use crate::global::protocol_structures::block_header::BlockHeader;
9use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
10use crate::global::protocol_structures::routing_header::RoutingHeader;
11use crate::logger::{init_logger, init_logger_debug};
12use crate::network::com_hub::{ComHub, InterfacePriority, ResponseOptions};
13use crate::network::com_interfaces::com_interface::ComInterfaceFactory;
14use crate::runtime::execution::ExecutionError;
15use crate::runtime::execution::context::ExecutionMode;
16use crate::serde::error::SerializationError;
17use crate::serde::serializer::to_value_container;
18use crate::stdlib::borrow::ToOwned;
19use crate::stdlib::boxed::Box;
20use crate::stdlib::pin::Pin;
21use crate::stdlib::string::String;
22use crate::stdlib::string::ToString;
23use crate::stdlib::sync::Arc;
24use crate::stdlib::vec;
25use crate::stdlib::vec::Vec;
26use crate::stdlib::{cell::RefCell, rc::Rc};
27use crate::time::Instant;
28use crate::utils::time::Time;
29use crate::values::core_values::endpoint::Endpoint;
30use crate::values::value_container::ValueContainer;
31use core::fmt::Debug;
32use core::prelude::rust_2024::*;
33use core::result::Result;
34use core::slice;
35use core::unreachable;
36use execution::context::{
37    ExecutionContext, RemoteExecutionContext, ScriptExecutionError,
38};
39use futures::channel::oneshot::Sender;
40use global_context::{GlobalContext, set_global_context};
41use log::{debug, error, info};
42use serde::{Deserialize, Serialize};
43
44pub mod dif_interface;
45pub mod execution;
46pub mod global_context;
47pub mod memory;
48mod update_loop;
49
50use self::memory::Memory;
51
52const VERSION: &str = env!("CARGO_PKG_VERSION");
53
54#[derive(Clone)]
55pub struct Runtime {
56    pub version: String,
57    pub internal: Rc<RuntimeInternal>,
58}
59
60impl Debug for Runtime {
61    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62        f.debug_struct("Runtime")
63            .field("version", &self.version)
64            .finish()
65    }
66}
67
68#[derive(Clone)]
69pub struct AsyncContext {
70    #[cfg(feature = "embassy_runtime")]
71    pub spawner: embassy_executor::Spawner,
72}
73#[cfg(not(feature = "embassy_runtime"))]
74impl Default for AsyncContext {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl AsyncContext {
81    #[cfg(feature = "embassy_runtime")]
82    pub fn new(spawner: embassy_executor::Spawner) -> Self {
83        Self { spawner }
84    }
85    #[cfg(not(feature = "embassy_runtime"))]
86    pub fn new() -> Self {
87        Self {}
88    }
89}
90
91impl Debug for AsyncContext {
92    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
93        core::write!(f, "AsyncContext")
94    }
95}
96
97#[derive(Debug)]
98pub struct RuntimeInternal {
99    pub memory: RefCell<Memory>,
100    pub com_hub: ComHub,
101    pub endpoint: Endpoint,
102    pub config: RuntimeConfig,
103    /// set to true if the update loop should be running
104    /// when set to false, the update loop will stop
105    update_loop_running: RefCell<bool>,
106    update_loop_stop_sender: RefCell<Option<Sender<()>>>,
107
108    /// active execution contexts, stored by context_id
109    pub execution_contexts:
110        RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
111    pub async_context: AsyncContext,
112}
113
114macro_rules! get_execution_context {
115    // take context and self_rc as parameters
116    ($self_rc:expr, $execution_context:expr) => {
117        match $execution_context {
118            Some(context) => {
119                // set current runtime in execution context if local execution context
120                if let &mut ExecutionContext::Local(ref mut local_context) = context {
121                    local_context.set_runtime_internal($self_rc.clone());
122                }
123                context
124            },
125            None => {
126               &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), ExecutionMode::Static)
127            }
128        }
129    };
130}
131
132impl RuntimeInternal {
133    fn new(async_context: AsyncContext) -> Self {
134        RuntimeInternal {
135            endpoint: Endpoint::default(),
136            config: RuntimeConfig::default(),
137            memory: RefCell::new(Memory::new(Endpoint::default())),
138            com_hub: ComHub::new(Endpoint::default(), async_context.clone()),
139            update_loop_running: RefCell::new(false),
140            update_loop_stop_sender: RefCell::new(None),
141            execution_contexts: RefCell::new(HashMap::new()),
142            async_context,
143        }
144    }
145
146    #[cfg(feature = "compiler")]
147    pub async fn execute(
148        self_rc: Rc<RuntimeInternal>,
149        script: &str,
150        inserted_values: &[ValueContainer],
151        execution_context: Option<&mut ExecutionContext>,
152    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
153        let execution_context =
154            get_execution_context!(self_rc, execution_context);
155        let compile_start = Instant::now();
156        let dxb = execution_context.compile(script, inserted_values)?;
157        debug!(
158            "[Compilation took {} ms]",
159            compile_start.elapsed().as_millis()
160        );
161        let execute_start = Instant::now();
162        let result = RuntimeInternal::execute_dxb(
163            self_rc,
164            dxb,
165            Some(execution_context),
166            true,
167        )
168        .await
169        .map_err(ScriptExecutionError::from);
170        debug!(
171            "[Execution took {} ms]",
172            execute_start.elapsed().as_millis()
173        );
174        result
175    }
176
177    #[cfg(feature = "compiler")]
178    pub fn execute_sync(
179        self_rc: Rc<RuntimeInternal>,
180        script: &str,
181        inserted_values: &[ValueContainer],
182        execution_context: Option<&mut ExecutionContext>,
183    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
184        let execution_context =
185            get_execution_context!(self_rc, execution_context);
186        let compile_start = Instant::now();
187        let dxb = execution_context.compile(script, inserted_values)?;
188        debug!(
189            "[Compilation took {} ms]",
190            compile_start.elapsed().as_millis()
191        );
192        let execute_start = Instant::now();
193        let result = RuntimeInternal::execute_dxb_sync(
194            self_rc,
195            &dxb,
196            Some(execution_context),
197            true,
198        )
199        .map_err(ScriptExecutionError::from);
200        debug!(
201            "[Execution took {} ms]",
202            execute_start.elapsed().as_millis()
203        );
204        result
205    }
206
207    pub fn execute_dxb<'a>(
208        self_rc: Rc<RuntimeInternal>,
209        dxb: Vec<u8>,
210        execution_context: Option<&'a mut ExecutionContext>,
211        end_execution: bool,
212    ) -> Pin<
213        Box<
214            dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
215                + 'a,
216        >,
217    > {
218        Box::pin(async move {
219            let execution_context =
220                get_execution_context!(self_rc, execution_context);
221            match execution_context {
222                ExecutionContext::Remote(context) => {
223                    RuntimeInternal::execute_remote(self_rc, context, dxb).await
224                }
225                ExecutionContext::Local(_) => {
226                    execution_context.execute_dxb(&dxb).await
227                }
228            }
229        })
230    }
231
232    pub fn execute_dxb_sync(
233        self_rc: Rc<RuntimeInternal>,
234        dxb: &[u8],
235        execution_context: Option<&mut ExecutionContext>,
236        end_execution: bool,
237    ) -> Result<Option<ValueContainer>, ExecutionError> {
238        let execution_context =
239            get_execution_context!(self_rc, execution_context);
240        match execution_context {
241            ExecutionContext::Remote(_) => {
242                Err(ExecutionError::RequiresAsyncExecution)
243            }
244            ExecutionContext::Local(_) => {
245                execution_context.execute_dxb_sync(dxb)
246            }
247        }
248    }
249
250    /// Returns the existing execution context for the given context_id,
251    /// or creates a new one if it doesn't exist.
252    /// To reuse the context later, the caller must store it back in the map after use.
253    fn take_execution_context(
254        self_rc: Rc<RuntimeInternal>,
255        context_id: &IncomingEndpointContextSectionId,
256    ) -> ExecutionContext {
257        let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
258        // get execution context by context_id or create a new one if it doesn't exist
259        let execution_context = execution_contexts.remove(context_id);
260        if let Some(context) = execution_context {
261            context
262        } else {
263            ExecutionContext::local_with_runtime_internal(
264                self_rc.clone(),
265                ExecutionMode::unbounded(),
266            )
267        }
268    }
269
270    pub async fn execute_remote(
271        self_rc: Rc<RuntimeInternal>,
272        remote_execution_context: &mut RemoteExecutionContext,
273        dxb: Vec<u8>,
274    ) -> Result<Option<ValueContainer>, ExecutionError> {
275        let routing_header: RoutingHeader = RoutingHeader::default()
276            .with_sender(self_rc.endpoint.clone())
277            .to_owned();
278
279        // get existing context_id for context, or create a new one
280        let context_id =
281            remote_execution_context.context_id.unwrap_or_else(|| {
282                // if the context_id is not set, we create a new one
283                remote_execution_context.context_id =
284                    Some(self_rc.com_hub.block_handler.get_new_context_id());
285                remote_execution_context.context_id.unwrap()
286            });
287
288        let block_header = BlockHeader {
289            context_id,
290            ..BlockHeader::default()
291        };
292        let encrypted_header = EncryptedHeader::default();
293
294        let mut block =
295            DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
296
297        block
298            .set_receivers(slice::from_ref(&remote_execution_context.endpoint));
299
300        let response = self_rc
301            .com_hub
302            .send_own_block_await_response(block, ResponseOptions::default())
303            .await
304            .remove(0)?;
305        let incoming_section = response.take_incoming_section();
306        RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
307            .await
308            .0
309    }
310
311    async fn execute_incoming_section(
312        self_rc: Rc<RuntimeInternal>,
313        mut incoming_section: IncomingSection,
314    ) -> (
315        Result<Option<ValueContainer>, ExecutionError>,
316        Endpoint,
317        OutgoingContextId,
318    ) {
319        let section_context_id =
320            incoming_section.get_section_context_id().clone();
321        let mut context =
322            Self::take_execution_context(self_rc.clone(), &section_context_id);
323        info!(
324            "Executing incoming section with index: {}",
325            incoming_section.get_section_index()
326        );
327
328        let mut result = None;
329        let mut last_block = None;
330
331        // iterate over the blocks in the incoming section
332        loop {
333            let block = incoming_section.next().await;
334            if let Some(block) = block {
335                let res = RuntimeInternal::execute_dxb_block_local(
336                    self_rc.clone(),
337                    block.clone(),
338                    Some(&mut context),
339                )
340                .await;
341                if let Err(err) = res {
342                    return (
343                        Err(err),
344                        block.get_sender().clone(),
345                        block.block_header.context_id,
346                    );
347                }
348                result = res.unwrap();
349                last_block = Some(block);
350            } else {
351                break;
352            }
353        }
354
355        if last_block.is_none() {
356            unreachable!("Incoming section must contain at least one block");
357        }
358        let last_block = last_block.unwrap();
359        let sender_endpoint = last_block.get_sender().clone();
360        let context_id = last_block.block_header.context_id;
361
362        // insert the context back into the map for future use
363        // TODO #638: is this needed or can we drop the context after execution here?
364        self_rc
365            .execution_contexts
366            .borrow_mut()
367            .insert(section_context_id, context);
368
369        (Ok(result), sender_endpoint, context_id)
370    }
371
372    async fn execute_dxb_block_local(
373        self_rc: Rc<RuntimeInternal>,
374        block: DXBBlock,
375        execution_context: Option<&mut ExecutionContext>,
376    ) -> Result<Option<ValueContainer>, ExecutionError> {
377        let execution_context =
378            get_execution_context!(self_rc, execution_context);
379        // assert that the execution context is local
380        if !core::matches!(execution_context, ExecutionContext::Local(_)) {
381            unreachable!(
382                "Execution context must be local for executing a DXB block"
383            );
384        }
385        let dxb = block.body;
386        let end_execution =
387            block.block_header.flags_and_timestamp.is_end_of_section();
388        RuntimeInternal::execute_dxb(
389            self_rc,
390            dxb,
391            Some(execution_context),
392            end_execution,
393        )
394        .await
395    }
396}
397
398#[derive(Debug, Deserialize, Serialize)]
399pub struct RuntimeConfigInterface {
400    #[serde(rename = "type")]
401    pub interface_type: String,
402    pub config: ValueContainer,
403}
404
405impl RuntimeConfigInterface {
406    pub fn new<T: Serialize>(
407        interface_type: &str,
408        config: T,
409    ) -> Result<RuntimeConfigInterface, SerializationError> {
410        Ok(RuntimeConfigInterface {
411            interface_type: interface_type.to_string(),
412            config: to_value_container(&config)?,
413        })
414    }
415
416    pub fn new_from_value_container(
417        interface_type: &str,
418        config: ValueContainer,
419    ) -> RuntimeConfigInterface {
420        RuntimeConfigInterface {
421            interface_type: interface_type.to_string(),
422            config,
423        }
424    }
425}
426
427#[derive(Debug, Default, Deserialize, Serialize)]
428pub struct RuntimeConfig {
429    pub endpoint: Option<Endpoint>,
430    pub interfaces: Option<Vec<RuntimeConfigInterface>>,
431    pub env: Option<HashMap<String, String>>,
432    /// if set to true, the runtime will log debug messages
433    pub debug: Option<bool>,
434}
435
436impl RuntimeConfig {
437    pub fn new_with_endpoint(endpoint: Endpoint) -> Self {
438        RuntimeConfig {
439            endpoint: Some(endpoint),
440            interfaces: None,
441            env: None,
442            debug: None,
443        }
444    }
445
446    pub fn add_interface<T: Serialize>(
447        &mut self,
448        interface_type: String,
449        config: T,
450    ) -> Result<(), SerializationError> {
451        let config = to_value_container(&config)?;
452        let interface = RuntimeConfigInterface {
453            interface_type,
454            config,
455        };
456        if let Some(interfaces) = &mut self.interfaces {
457            interfaces.push(interface);
458        } else {
459            self.interfaces = Some(vec![interface]);
460        }
461
462        Ok(())
463    }
464}
465
466/// publicly exposed wrapper impl for the Runtime
467/// around RuntimeInternal
468impl Runtime {
469    /// Creates a new runtime instance with the given configuration and async context.
470    /// Note: If the endpoint is not specified in the config, a random endpoint will be generated.
471    /// This required setting the global context before using `set_global_context`,
472    /// otherwise the runtime will panic here.
473    pub fn new(config: RuntimeConfig, async_context: AsyncContext) -> Runtime {
474        let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
475        let com_hub = ComHub::new(endpoint.clone(), async_context.clone());
476        let memory = RefCell::new(Memory::new(endpoint.clone()));
477        Runtime {
478            version: VERSION.to_string(),
479            internal: Rc::new(RuntimeInternal {
480                endpoint,
481                memory,
482                config,
483                com_hub,
484                ..RuntimeInternal::new(async_context)
485            }),
486        }
487    }
488
489    /// Initializes the runtime with the given configuration, global context, and async context.
490    /// This function also sets up logging and logs the initialization time.
491    pub fn init(
492        config: RuntimeConfig,
493        global_context: GlobalContext,
494        async_context: AsyncContext,
495    ) -> Runtime {
496        set_global_context(global_context);
497        if let Some(debug) = config.debug
498            && debug
499        {
500            init_logger_debug();
501        } else {
502            init_logger();
503        }
504        info!(
505            "Runtime initialized - Version {VERSION} Time: {}",
506            Time::now()
507        );
508        Self::new(config, async_context)
509    }
510
511    pub fn com_hub(&self) -> &ComHub {
512        &self.internal.com_hub
513    }
514    pub fn endpoint(&self) -> Endpoint {
515        self.internal.endpoint.clone()
516    }
517
518    pub fn internal(&self) -> Rc<RuntimeInternal> {
519        Rc::clone(&self.internal)
520    }
521
522    pub fn memory(&self) -> &RefCell<Memory> {
523        &self.internal.memory
524    }
525
526    #[cfg(all(
527        feature = "native_crypto",
528        feature = "std",
529        not(feature = "embassy_runtime")
530    ))]
531    pub fn init_native(config: RuntimeConfig) -> Runtime {
532        use crate::utils::time_native::TimeNative;
533
534        Self::init(
535            config,
536            GlobalContext::new(Arc::new(CryptoNative), Arc::new(TimeNative)),
537            AsyncContext::new(),
538        )
539    }
540
541    /// Starts the common update loop:
542    ///  - ComHub
543    ///  - Runtime
544    pub async fn start(&self) {
545        if *self.internal().update_loop_running.borrow() {
546            info!("runtime update loop already running, skipping start");
547            return;
548        }
549        info!("starting runtime...");
550        self.com_hub()
551            .init()
552            .await
553            .expect("Failed to initialize ComHub");
554
555        // register interface factories
556        self.register_interface_factories();
557
558        // create interfaces
559        if let Some(interfaces) = &self.internal.config.interfaces {
560            for RuntimeConfigInterface {
561                interface_type,
562                config,
563            } in interfaces.iter()
564            {
565                if let Err(err) = self
566                    .com_hub()
567                    .create_interface(
568                        interface_type,
569                        config.clone(),
570                        InterfacePriority::default(),
571                    )
572                    .await
573                {
574                    error!(
575                        "Failed to create interface {interface_type}: {err:?}"
576                    );
577                } else {
578                    info!("Created interface: {interface_type}");
579                }
580            }
581        }
582
583        RuntimeInternal::start_update_loop(self.internal());
584    }
585
586    // inits a runtime and starts the update loop
587    pub async fn create(
588        config: RuntimeConfig,
589        global_context: GlobalContext,
590        async_context: AsyncContext,
591    ) -> Runtime {
592        let runtime = Self::init(config, global_context, async_context);
593        runtime.start().await;
594        runtime
595    }
596
597    // inits a native runtime and starts the update loop
598    #[cfg(all(
599        feature = "native_crypto",
600        feature = "std",
601        not(feature = "embassy_runtime")
602    ))]
603    pub async fn create_native(config: RuntimeConfig) -> Runtime {
604        let runtime = Self::init_native(config);
605        runtime.start().await;
606        runtime
607    }
608
609    fn register_interface_factories(&self) {
610        crate::network::com_interfaces::default_com_interfaces::base_interface::BaseInterface::register_on_com_hub(self.com_hub());
611
612        #[cfg(feature = "native_websocket")]
613        crate::network::com_interfaces::default_com_interfaces::websocket::websocket_client_native_interface::WebSocketClientNativeInterface::register_on_com_hub(self.com_hub());
614        #[cfg(feature = "native_websocket")]
615        crate::network::com_interfaces::default_com_interfaces::websocket::websocket_server_native_interface::WebSocketServerNativeInterface::register_on_com_hub(self.com_hub());
616        #[cfg(feature = "native_serial")]
617        crate::network::com_interfaces::default_com_interfaces::serial::serial_native_interface::SerialNativeInterface::register_on_com_hub(self.com_hub());
618        #[cfg(feature = "native_tcp")]
619        crate::network::com_interfaces::default_com_interfaces::tcp::tcp_client_native_interface::TCPClientNativeInterface::register_on_com_hub(self.com_hub());
620        #[cfg(feature = "native_tcp")]
621        crate::network::com_interfaces::default_com_interfaces::tcp::tcp_server_native_interface::TCPServerNativeInterface::register_on_com_hub(self.com_hub());
622        // TODO #234:
623        // #[cfg(feature = "native_webrtc")]
624        // crate::network::com_interfaces::default_com_interfaces::webrtc::webrtc_native_interface::WebRTCNativeInterface::register_on_com_hub(self.com_hub());
625    }
626
627    #[cfg(feature = "compiler")]
628    pub async fn execute(
629        &self,
630        script: &str,
631        inserted_values: &[ValueContainer],
632        execution_context: Option<&mut ExecutionContext>,
633    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
634        RuntimeInternal::execute(
635            self.internal(),
636            script,
637            inserted_values,
638            execution_context,
639        )
640        .await
641    }
642
643    #[cfg(feature = "compiler")]
644    pub fn execute_sync(
645        &self,
646        script: &str,
647        inserted_values: &[ValueContainer],
648        execution_context: Option<&mut ExecutionContext>,
649    ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
650        RuntimeInternal::execute_sync(
651            self.internal(),
652            script,
653            inserted_values,
654            execution_context,
655        )
656    }
657
658    pub async fn execute_dxb<'a>(
659        &'a self,
660        dxb: Vec<u8>,
661        execution_context: Option<&'a mut ExecutionContext>,
662        end_execution: bool,
663    ) -> Result<Option<ValueContainer>, ExecutionError> {
664        RuntimeInternal::execute_dxb(
665            self.internal(),
666            dxb,
667            execution_context,
668            end_execution,
669        )
670        .await
671    }
672
673    pub fn execute_dxb_sync(
674        &self,
675        dxb: &[u8],
676        execution_context: Option<&mut ExecutionContext>,
677        end_execution: bool,
678    ) -> Result<Option<ValueContainer>, ExecutionError> {
679        RuntimeInternal::execute_dxb_sync(
680            self.internal(),
681            dxb,
682            execution_context,
683            end_execution,
684        )
685    }
686
687    async fn execute_remote(
688        &self,
689        remote_execution_context: &mut RemoteExecutionContext,
690        dxb: Vec<u8>,
691    ) -> Result<Option<ValueContainer>, ExecutionError> {
692        RuntimeInternal::execute_remote(
693            self.internal(),
694            remote_execution_context,
695            dxb,
696        )
697        .await
698    }
699}