1#[cfg(feature = "native_crypto")]
2use crate::crypto::crypto_native::CryptoNative;
3use crate::global::dxb_block::{
4 DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
5 OutgoingContextId,
6};
7use crate::global::protocol_structures::block_header::BlockHeader;
8use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
9use crate::global::protocol_structures::routing_header::RoutingHeader;
10use crate::logger::{init_logger, init_logger_debug};
11use crate::network::com_hub::{ComHub, InterfacePriority, ResponseOptions};
12use crate::runtime::execution::ExecutionError;
13use crate::runtime::execution_context::{
14 ExecutionContext, RemoteExecutionContext, ScriptExecutionError,
15};
16use crate::serde::error::SerializationError;
17use crate::serde::serializer::to_value_container;
18use crate::stdlib::{cell::RefCell, rc::Rc};
19use crate::utils::time::Time;
20use crate::values::core_values::endpoint::Endpoint;
21use crate::values::value_container::ValueContainer;
22use datex_core::network::com_interfaces::com_interface::ComInterfaceFactory;
23use futures::channel::oneshot::Sender;
24use global_context::{GlobalContext, set_global_context};
25use log::{error, info};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::fmt::Debug;
29use std::pin::Pin;
30use std::sync::Arc;
31
32pub mod dif_interface;
33pub mod execution;
34pub mod execution_context;
35pub mod global_context;
36pub mod memory;
37mod stack;
38mod update_loop;
39
40use self::memory::Memory;
41
42const VERSION: &str = env!("CARGO_PKG_VERSION");
43
44#[derive(Clone)]
45pub struct Runtime {
46 pub version: String,
47 pub internal: Rc<RuntimeInternal>,
48}
49
50impl Debug for Runtime {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("Runtime")
53 .field("version", &self.version)
54 .finish()
55 }
56}
57
58impl Default for Runtime {
59 fn default() -> Self {
60 Runtime {
61 version: VERSION.to_string(),
62 internal: Rc::new(RuntimeInternal::default()),
63 }
64 }
65}
66
67#[derive(Debug)]
68pub struct RuntimeInternal {
69 pub memory: RefCell<Memory>,
70 pub com_hub: ComHub,
71 pub endpoint: Endpoint,
72 pub config: RuntimeConfig,
73 update_loop_running: RefCell<bool>,
76 update_loop_stop_sender: RefCell<Option<Sender<()>>>,
77
78 pub execution_contexts:
80 RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
81}
82
83impl Default for RuntimeInternal {
84 fn default() -> Self {
85 RuntimeInternal {
86 endpoint: Endpoint::default(),
87 config: RuntimeConfig::default(),
88 memory: RefCell::new(Memory::new(Endpoint::default())),
89 com_hub: ComHub::default(),
90 update_loop_running: RefCell::new(false),
91 update_loop_stop_sender: RefCell::new(None),
92 execution_contexts: RefCell::new(HashMap::new()),
93 }
94 }
95}
96
97macro_rules! get_execution_context {
98 ($self_rc:expr, $execution_context:expr) => {
100 match $execution_context {
101 Some(context) => {
102 if let &mut ExecutionContext::Local(ref mut local_context) = context {
104 local_context.set_runtime_internal($self_rc.clone());
105 }
106 context
107 },
108 None => {
109 &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), true)
110 }
111 }
112 };
113}
114
115impl RuntimeInternal {
116 pub async fn execute(
117 self_rc: Rc<RuntimeInternal>,
118 script: &str,
119 inserted_values: &[ValueContainer],
120 execution_context: Option<&mut ExecutionContext>,
121 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
122 let execution_context =
123 get_execution_context!(self_rc, execution_context);
124 let dxb = execution_context.compile(script, inserted_values)?;
125 RuntimeInternal::execute_dxb(
126 self_rc,
127 dxb,
128 Some(execution_context),
129 true,
130 )
131 .await
132 .map_err(ScriptExecutionError::from)
133 }
134
135 pub fn execute_sync(
136 self_rc: Rc<RuntimeInternal>,
137 script: &str,
138 inserted_values: &[ValueContainer],
139 execution_context: Option<&mut ExecutionContext>,
140 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
141 let execution_context =
142 get_execution_context!(self_rc, execution_context);
143 let dxb = execution_context.compile(script, inserted_values)?;
144 RuntimeInternal::execute_dxb_sync(
145 self_rc,
146 &dxb,
147 Some(execution_context),
148 true,
149 )
150 .map_err(ScriptExecutionError::from)
151 }
152
153 pub fn execute_dxb<'a>(
154 self_rc: Rc<RuntimeInternal>,
155 dxb: Vec<u8>,
156 execution_context: Option<&'a mut ExecutionContext>,
157 end_execution: bool,
158 ) -> Pin<
159 Box<
160 dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
161 + 'a,
162 >,
163 > {
164 Box::pin(async move {
165 let execution_context =
166 get_execution_context!(self_rc, execution_context);
167 match execution_context {
168 ExecutionContext::Remote(context) => {
169 RuntimeInternal::execute_remote(self_rc, context, dxb).await
170 }
171 ExecutionContext::Local(_) => {
172 execution_context.execute_dxb(&dxb, end_execution).await
173 }
174 }
175 })
176 }
177
178 pub fn execute_dxb_sync(
179 self_rc: Rc<RuntimeInternal>,
180 dxb: &[u8],
181 execution_context: Option<&mut ExecutionContext>,
182 end_execution: bool,
183 ) -> Result<Option<ValueContainer>, ExecutionError> {
184 let execution_context =
185 get_execution_context!(self_rc, execution_context);
186 match execution_context {
187 ExecutionContext::Remote(_) => {
188 Err(ExecutionError::RequiresAsyncExecution)
189 }
190 ExecutionContext::Local(_) => {
191 execution_context.execute_dxb_sync(dxb, end_execution)
192 }
193 }
194 }
195
196 fn get_execution_context(
199 self_rc: Rc<RuntimeInternal>,
200 context_id: &IncomingEndpointContextSectionId,
201 ) -> ExecutionContext {
202 let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
203 let execution_context = execution_contexts.get(context_id).cloned();
205 if let Some(context) = execution_context {
206 context
207 } else {
208 let new_context = ExecutionContext::local_with_runtime_internal(
209 self_rc.clone(),
210 false,
211 );
212 execution_contexts.insert(context_id.clone(), new_context.clone());
214 new_context
215 }
216 }
217
218 pub async fn execute_remote(
219 self_rc: Rc<RuntimeInternal>,
220 remote_execution_context: &mut RemoteExecutionContext,
221 dxb: Vec<u8>,
222 ) -> Result<Option<ValueContainer>, ExecutionError> {
223 let routing_header: RoutingHeader = RoutingHeader::default()
224 .with_sender(self_rc.endpoint.clone())
225 .to_owned();
226
227 let context_id =
229 remote_execution_context.context_id.unwrap_or_else(|| {
230 remote_execution_context.context_id =
232 Some(self_rc.com_hub.block_handler.get_new_context_id());
233 remote_execution_context.context_id.unwrap()
234 });
235
236 let block_header = BlockHeader {
237 context_id,
238 ..BlockHeader::default()
239 };
240 let encrypted_header = EncryptedHeader::default();
241
242 let mut block =
243 DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
244
245 block.set_receivers(std::slice::from_ref(
246 &remote_execution_context.endpoint,
247 ));
248
249 let response = self_rc
250 .com_hub
251 .send_own_block_await_response(block, ResponseOptions::default())
252 .await
253 .remove(0)?;
254 let incoming_section = response.take_incoming_section();
255 RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
256 .await
257 .0
258 }
259
260 async fn execute_incoming_section(
261 self_rc: Rc<RuntimeInternal>,
262 mut incoming_section: IncomingSection,
263 ) -> (
264 Result<Option<ValueContainer>, ExecutionError>,
265 Endpoint,
266 OutgoingContextId,
267 ) {
268 let mut context = Self::get_execution_context(
269 self_rc.clone(),
270 incoming_section.get_section_context_id(),
271 );
272 info!(
273 "Executing incoming section with index: {}",
274 incoming_section.get_section_index()
275 );
276
277 let mut result = None;
278 let mut last_block = None;
279
280 loop {
282 let block = incoming_section.next().await;
283 if let Some(block) = block {
284 let res = RuntimeInternal::execute_dxb_block_local(
285 self_rc.clone(),
286 block.clone(),
287 Some(&mut context),
288 )
289 .await;
290 if let Err(err) = res {
291 return (
292 Err(err),
293 block.get_sender().clone(),
294 block.block_header.context_id,
295 );
296 }
297 result = res.unwrap();
298 last_block = Some(block);
299 } else {
300 break;
301 }
302 }
303
304 if last_block.is_none() {
305 unreachable!("Incoming section must contain at least one block");
306 }
307 let last_block = last_block.unwrap();
308 let sender_endpoint = last_block.get_sender().clone();
309 let context_id = last_block.block_header.context_id;
310 (Ok(result), sender_endpoint, context_id)
311 }
312
313 async fn execute_dxb_block_local(
314 self_rc: Rc<RuntimeInternal>,
315 block: DXBBlock,
316 execution_context: Option<&mut ExecutionContext>,
317 ) -> Result<Option<ValueContainer>, ExecutionError> {
318 let execution_context =
319 get_execution_context!(self_rc, execution_context);
320 if !matches!(execution_context, ExecutionContext::Local(_)) {
322 unreachable!(
323 "Execution context must be local for executing a DXB block"
324 );
325 }
326 let dxb = block.body;
327 let end_execution =
328 block.block_header.flags_and_timestamp.is_end_of_section();
329 RuntimeInternal::execute_dxb(
330 self_rc,
331 dxb,
332 Some(execution_context),
333 end_execution,
334 )
335 .await
336 }
337}
338
339#[derive(Debug, Deserialize, Serialize)]
340pub struct RuntimeConfigInterface {
341 r#type: String,
342 config: ValueContainer,
343}
344
345#[derive(Debug, Default, Deserialize, Serialize)]
346pub struct RuntimeConfig {
347 pub endpoint: Option<Endpoint>,
348 pub interfaces: Option<Vec<RuntimeConfigInterface>>,
349 pub debug: Option<bool>,
351}
352
353impl RuntimeConfig {
354 pub fn new_with_endpoint(endpoint: Endpoint) -> Self {
355 RuntimeConfig {
356 endpoint: Some(endpoint),
357 interfaces: None,
358 debug: None,
359 }
360 }
361
362 pub fn add_interface<T: Serialize>(
363 &mut self,
364 r#type: String,
365 config: T,
366 ) -> Result<(), SerializationError> {
367 let config = to_value_container(&config)?;
368 let interface = RuntimeConfigInterface { r#type, config };
369 if let Some(interfaces) = &mut self.interfaces {
370 interfaces.push(interface);
371 } else {
372 self.interfaces = Some(vec![interface]);
373 }
374
375 Ok(())
376 }
377}
378
379impl Runtime {
382 pub fn new(config: RuntimeConfig) -> Runtime {
383 let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
384 let com_hub = ComHub::new(endpoint.clone());
385 let memory = RefCell::new(Memory::new(endpoint.clone()));
386 Runtime {
387 version: VERSION.to_string(),
388 internal: Rc::new(RuntimeInternal {
389 endpoint,
390 memory,
391 config,
392 com_hub,
393 ..RuntimeInternal::default()
394 }),
395 }
396 }
397
398 pub fn init(
399 config: RuntimeConfig,
400 global_context: GlobalContext,
401 ) -> Runtime {
402 set_global_context(global_context);
403 if let Some(debug) = config.debug
404 && debug
405 {
406 init_logger_debug();
407 } else {
408 init_logger();
409 }
410 info!(
411 "Runtime initialized - Version {VERSION} Time: {}",
412 Time::now()
413 );
414 Self::new(config)
415 }
416
417 pub fn com_hub(&self) -> &ComHub {
418 &self.internal.com_hub
419 }
420 pub fn endpoint(&self) -> Endpoint {
421 self.internal.endpoint.clone()
422 }
423
424 pub fn internal(&self) -> Rc<RuntimeInternal> {
425 Rc::clone(&self.internal)
426 }
427
428 pub fn memory(&self) -> &RefCell<Memory> {
429 &self.internal.memory
430 }
431
432 #[cfg(feature = "native_crypto")]
433 pub fn init_native(config: RuntimeConfig) -> Runtime {
434 use crate::utils::time_native::TimeNative;
435
436 Self::init(
437 config,
438 GlobalContext::new(Arc::new(CryptoNative), Arc::new(TimeNative)),
439 )
440 }
441
442 pub async fn start(&self) {
446 if *self.internal().update_loop_running.borrow() {
447 info!("runtime update loop already running, skipping start");
448 return;
449 }
450 info!("starting runtime...");
451 self.com_hub()
452 .init()
453 .await
454 .expect("Failed to initialize ComHub");
455
456 self.register_interface_factories();
458
459 if let Some(interfaces) = &self.internal.config.interfaces {
461 for RuntimeConfigInterface { r#type, config } in interfaces.iter() {
462 if let Err(err) = self
463 .com_hub()
464 .create_interface(
465 r#type,
466 config.clone(),
467 InterfacePriority::default(),
468 )
469 .await
470 {
471 error!("Failed to create interface {type}: {err:?}");
472 } else {
473 info!("Created interface: {type}");
474 }
475 }
476 }
477
478 RuntimeInternal::start_update_loop(self.internal());
479 }
480
481 pub async fn create(
483 config: RuntimeConfig,
484 global_context: GlobalContext,
485 ) -> Runtime {
486 let runtime = Self::init(config, global_context);
487 runtime.start().await;
488 runtime
489 }
490
491 #[cfg(feature = "native_crypto")]
493 pub async fn create_native(config: RuntimeConfig) -> Runtime {
494 let runtime = Self::init_native(config);
495 runtime.start().await;
496 runtime
497 }
498
499 fn register_interface_factories(&self) {
500 crate::network::com_interfaces::default_com_interfaces::base_interface::BaseInterface::register_on_com_hub(self.com_hub());
501
502 #[cfg(feature = "native_websocket")]
503 crate::network::com_interfaces::default_com_interfaces::websocket::websocket_client_native_interface::WebSocketClientNativeInterface::register_on_com_hub(self.com_hub());
504 #[cfg(feature = "native_websocket")]
505 crate::network::com_interfaces::default_com_interfaces::websocket::websocket_server_native_interface::WebSocketServerNativeInterface::register_on_com_hub(self.com_hub());
506 #[cfg(feature = "native_serial")]
507 crate::network::com_interfaces::default_com_interfaces::serial::serial_native_interface::SerialNativeInterface::register_on_com_hub(self.com_hub());
508 #[cfg(feature = "native_tcp")]
509 crate::network::com_interfaces::default_com_interfaces::tcp::tcp_client_native_interface::TCPClientNativeInterface::register_on_com_hub(self.com_hub());
510 #[cfg(feature = "native_tcp")]
511 crate::network::com_interfaces::default_com_interfaces::tcp::tcp_server_native_interface::TCPServerNativeInterface::register_on_com_hub(self.com_hub());
512 }
516
517 pub async fn execute(
518 &self,
519 script: &str,
520 inserted_values: &[ValueContainer],
521 execution_context: Option<&mut ExecutionContext>,
522 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
523 RuntimeInternal::execute(
524 self.internal(),
525 script,
526 inserted_values,
527 execution_context,
528 )
529 .await
530 }
531
532 pub fn execute_sync(
533 &self,
534 script: &str,
535 inserted_values: &[ValueContainer],
536 execution_context: Option<&mut ExecutionContext>,
537 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
538 RuntimeInternal::execute_sync(
539 self.internal(),
540 script,
541 inserted_values,
542 execution_context,
543 )
544 }
545
546 pub async fn execute_dxb<'a>(
547 &'a self,
548 dxb: Vec<u8>,
549 execution_context: Option<&'a mut ExecutionContext>,
550 end_execution: bool,
551 ) -> Result<Option<ValueContainer>, ExecutionError> {
552 RuntimeInternal::execute_dxb(
553 self.internal(),
554 dxb,
555 execution_context,
556 end_execution,
557 )
558 .await
559 }
560
561 pub fn execute_dxb_sync(
562 &self,
563 dxb: &[u8],
564 execution_context: Option<&mut ExecutionContext>,
565 end_execution: bool,
566 ) -> Result<Option<ValueContainer>, ExecutionError> {
567 RuntimeInternal::execute_dxb_sync(
568 self.internal(),
569 dxb,
570 execution_context,
571 end_execution,
572 )
573 }
574
575 async fn execute_remote(
576 &self,
577 remote_execution_context: &mut RemoteExecutionContext,
578 dxb: Vec<u8>,
579 ) -> Result<Option<ValueContainer>, ExecutionError> {
580 RuntimeInternal::execute_remote(
581 self.internal(),
582 remote_execution_context,
583 dxb,
584 )
585 .await
586 }
587}