1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use futures::channel::oneshot::Sender;
6#[cfg(feature = "native_crypto")]
7use crate::crypto::crypto_native::CryptoNative;
8use crate::values::core_values::endpoint::Endpoint;
9use crate::logger::{init_logger, init_logger_debug};
10use crate::stdlib::{cell::RefCell, rc::Rc};
11use global_context::{get_global_context, set_global_context, GlobalContext};
12use log::{error, info};
13use serde::{Deserialize, Serialize};
14use datex_core::network::com_interfaces::com_interface::ComInterfaceFactory;
15use datex_core::values::serde::error::SerializationError;
16use crate::global::dxb_block::{DXBBlock, IncomingEndpointContextSectionId, IncomingSection, OutgoingContextId};
17use crate::global::protocol_structures::block_header::BlockHeader;
18use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
19use crate::global::protocol_structures::routing_header;
20use crate::global::protocol_structures::routing_header::RoutingHeader;
21use crate::values::value_container::ValueContainer;
22use crate::network::com_hub::{ComHub, InterfacePriority, ResponseOptions};
23use crate::runtime::execution::ExecutionError;
24use crate::runtime::execution_context::{ExecutionContext, RemoteExecutionContext, ScriptExecutionError};
25use crate::values::serde::serializer::to_value_container;
26
27pub mod execution;
28pub mod global_context;
29pub mod memory;
30mod stack;
31pub mod execution_context;
32mod update_loop;
33
34use self::memory::Memory;
35
36const VERSION: &str = env!("CARGO_PKG_VERSION");
37
38#[derive(Clone)]
39pub struct Runtime {
40 pub version: String,
41 pub internal: Rc<RuntimeInternal>,
42}
43
44impl Debug for Runtime {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("Runtime")
47 .field("version", &self.version)
48 .finish()
49 }
50}
51
52impl Default for Runtime {
53 fn default() -> Self {
54 Runtime {
55 version: VERSION.to_string(),
56 internal: Rc::new(RuntimeInternal::default()),
57 }
58 }
59}
60
61
62#[derive(Debug)]
63pub struct RuntimeInternal {
64 pub memory: RefCell<Memory>,
65 pub com_hub: ComHub,
66 pub endpoint: Endpoint,
67 pub config: RuntimeConfig,
68 update_loop_running: RefCell<bool>,
71 update_loop_stop_sender: RefCell<Option<Sender<()>>>,
72
73 pub execution_contexts: RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
75}
76
77impl Default for RuntimeInternal {
78 fn default() -> Self {
79 RuntimeInternal {
80 endpoint: Endpoint::default(),
81 config: RuntimeConfig::default(),
82 memory: RefCell::new(Memory::new()),
83 com_hub: ComHub::default(),
84 update_loop_running: RefCell::new(false),
85 update_loop_stop_sender: RefCell::new(None),
86 execution_contexts: RefCell::new(HashMap::new()),
87 }
88 }
89}
90
91macro_rules! get_execution_context {
92 ($self_rc:expr, $execution_context:expr) => {
94 match $execution_context {
95 Some(context) => {
96 if let &mut ExecutionContext::Local(ref mut local_context) = context {
98 local_context.set_runtime_internal($self_rc.clone());
99 }
100 context
101 },
102 None => {
103 &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), true)
104 }
105 }
106 };
107}
108
109
110impl RuntimeInternal {
111
112 pub async fn execute(
113 self_rc: Rc<RuntimeInternal>,
114 script: &str,
115 inserted_values: &[ValueContainer],
116 execution_context: Option<&mut ExecutionContext>,
117 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
118 let execution_context = get_execution_context!(self_rc, execution_context);
119 let dxb = execution_context.compile(script, inserted_values)?;
120 RuntimeInternal::execute_dxb(self_rc, dxb, Some(execution_context), true)
121 .await
122 .map_err(ScriptExecutionError::from)
123 }
124
125 pub fn execute_sync(
126 self_rc: Rc<RuntimeInternal>,
127 script: &str,
128 inserted_values: &[ValueContainer],
129 execution_context: Option<&mut ExecutionContext>,
130 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
131 let execution_context = get_execution_context!(self_rc, execution_context);
132 let dxb = execution_context.compile(script, inserted_values)?;
133 RuntimeInternal::execute_dxb_sync(self_rc, &dxb, Some(execution_context), true)
134 .map_err(ScriptExecutionError::from)
135 }
136
137 pub fn execute_dxb<'a>(
138 self_rc: Rc<RuntimeInternal>,
139 dxb: Vec<u8>,
140 execution_context: Option<&'a mut ExecutionContext>,
141 end_execution: bool,
142 ) -> Pin<Box<dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>> + 'a>> {
143 Box::pin(async move {
144 let execution_context = get_execution_context!(self_rc, execution_context);
145 match execution_context {
146 ExecutionContext::Remote(context) => {
147 RuntimeInternal::execute_remote(self_rc ,context, dxb).await
148 },
149 ExecutionContext::Local(_) => {
150 execution_context.execute_dxb(&dxb, end_execution).await
151 }
152 }
153 })
154 }
155
156 pub fn execute_dxb_sync(
157 self_rc: Rc<RuntimeInternal>,
158 dxb: &[u8],
159 execution_context: Option<&mut ExecutionContext>,
160 end_execution: bool,
161 ) -> Result<Option<ValueContainer>, ExecutionError> {
162 let execution_context = get_execution_context!(self_rc, execution_context);
163 match execution_context {
164 ExecutionContext::Remote(_) => {
165 Err(ExecutionError::RequiresAsyncExecution)
166 }
167 ExecutionContext::Local(_) => {
168 execution_context.execute_dxb_sync(dxb, end_execution)
169 }
170 }
171 }
172
173 fn get_execution_context(
176 self_rc: Rc<RuntimeInternal>,
177 context_id: &IncomingEndpointContextSectionId,
178 ) -> ExecutionContext {
179 let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
180 let execution_context = execution_contexts.get(context_id).cloned();
182 if let Some(context) = execution_context {
183 context
184 } else {
185 let new_context = ExecutionContext::local_with_runtime_internal(self_rc.clone(), false);
186 execution_contexts.insert(context_id.clone(), new_context.clone());
188 new_context
189 }
190 }
191
192 pub async fn execute_remote(
193 self_rc: Rc<RuntimeInternal>,
194 remote_execution_context: &mut RemoteExecutionContext,
195 dxb: Vec<u8>
196 ) -> Result<Option<ValueContainer>, ExecutionError> {
197 let routing_header: RoutingHeader = RoutingHeader {
198 version: 2,
199 flags: routing_header::Flags::new(),
200 block_size_u16: Some(0),
201 block_size_u32: None,
202 sender: self_rc.endpoint.clone(),
203 receivers: routing_header::Receivers {
204 flags: routing_header::ReceiverFlags::new()
205 .with_has_endpoints(false)
206 .with_has_pointer_id(false)
207 .with_has_endpoint_keys(false),
208 pointer_id: None,
209 endpoints: None,
210 endpoints_with_keys: None,
211 },
212 ..RoutingHeader::default()
213 };
214
215 let context_id = remote_execution_context.context_id.unwrap_or_else(|| {
217 remote_execution_context.context_id = Some(self_rc.com_hub.block_handler.get_new_context_id());
219 remote_execution_context.context_id.unwrap()
220 });
221
222 let block_header = BlockHeader {
223 context_id,
224 ..BlockHeader::default()
225 };
226 let encrypted_header = EncryptedHeader::default();
227
228 let mut block =
229 DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
230
231 block.set_receivers(std::slice::from_ref(&remote_execution_context.endpoint));
232
233 let response = self_rc.com_hub.send_own_block_await_response(block, ResponseOptions::default()).await.remove(0)?;
234 let incoming_section = response.take_incoming_section();
235 RuntimeInternal::execute_incoming_section(self_rc, incoming_section).await.0
236 }
237
238 async fn execute_incoming_section(
239 self_rc: Rc<RuntimeInternal>,
240 mut incoming_section: IncomingSection,
241 ) -> (Result<Option<ValueContainer>, ExecutionError>, Endpoint, OutgoingContextId) {
242
243 let mut context = Self::get_execution_context(self_rc.clone(), incoming_section.get_section_context_id());
244 info!("Executing incoming section with index: {}", incoming_section.get_section_index());
245
246 let mut result = None;
247 let mut last_block = None;
248
249 loop {
251 let block = incoming_section.next().await;
252 if let Some(block) = block {
253 let res = RuntimeInternal::execute_dxb_block_local(self_rc.clone(), block.clone(), Some(&mut context)).await;
254 if let Err(err) = res {
255 return (Err(err), block.get_sender().clone(), block.block_header.context_id);
256 }
257 result = res.unwrap();
258 last_block = Some(block);
259 }
260 else {
261 break;
262 }
263 }
264
265 if last_block.is_none() {
266 unreachable!("Incoming section must contain at least one block");
267 }
268 let last_block = last_block.unwrap();
269 let sender_endpoint = last_block.get_sender().clone();
270 let context_id = last_block.block_header.context_id;
271 (Ok(result), sender_endpoint, context_id)
272 }
273
274 async fn execute_dxb_block_local(
275 self_rc: Rc<RuntimeInternal>,
276 block: DXBBlock,
277 execution_context: Option<&mut ExecutionContext>,
278 ) -> Result<Option<ValueContainer>, ExecutionError> {
279 let execution_context = get_execution_context!(self_rc, execution_context);
280 if !matches!(execution_context, ExecutionContext::Local(_)) {
282 unreachable!("Execution context must be local for executing a DXB block");
283 }
284 let dxb = block.body;
285 let end_execution = block.block_header.flags_and_timestamp.is_end_of_section();
286 RuntimeInternal::execute_dxb(self_rc, dxb, Some(execution_context), end_execution).await
287 }
288}
289
290#[derive(Debug, Deserialize, Serialize)]
291pub struct RuntimeConfigInterface {
292 r#type: String,
293 config: ValueContainer,
294}
295
296#[derive(Debug, Default, Deserialize, Serialize)]
297pub struct RuntimeConfig {
298 pub endpoint: Option<Endpoint>,
299 pub interfaces: Option<Vec<RuntimeConfigInterface>>,
300 pub debug: Option<bool>,
302}
303
304impl RuntimeConfig {
305 pub fn new_with_endpoint(endpoint: Endpoint) -> Self {
306 RuntimeConfig {
307 endpoint: Some(endpoint),
308 interfaces: None,
309 debug: None,
310 }
311 }
312
313 pub fn add_interface<T: Serialize>(
314 &mut self,
315 r#type: String,
316 config: T,
317 ) -> Result<(), SerializationError> {
318 let config = to_value_container(&config)?;
319 let interface = RuntimeConfigInterface { r#type, config };
320 if let Some(interfaces) = &mut self.interfaces {
321 interfaces.push(interface);
322 } else {
323 self.interfaces = Some(vec![interface]);
324 }
325
326 Ok(())
327 }
328}
329
330impl Runtime {
333 pub fn new(config: RuntimeConfig) -> Runtime {
334 let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
335 let com_hub = ComHub::new(endpoint.clone());
336 Runtime {
337 version: VERSION.to_string(),
338 internal: Rc::new(RuntimeInternal {
339 endpoint,
340 config,
341 com_hub,
342 ..RuntimeInternal::default()
343 })
344 }
345 }
346 pub fn init(
347 config: RuntimeConfig,
348 global_context: GlobalContext,
349 ) -> Runtime {
350 set_global_context(global_context);
351 if let Some(debug) = config.debug && debug {
352 init_logger_debug();
353 } else {
354 init_logger();
355 }
356 info!(
357 "Runtime initialized - Version {VERSION} Time: {}",
358 get_global_context().time.lock().unwrap().now()
359 );
360 Self::new(config)
361 }
362
363 pub fn com_hub(&self) -> &ComHub {
364 &self.internal.com_hub
365 }
366 pub fn endpoint(&self) -> Endpoint {
367 self.internal.endpoint.clone()
368 }
369
370 pub fn internal(&self) -> Rc<RuntimeInternal> {
371 Rc::clone(&self.internal)
372 }
373
374 pub fn memory(&self) -> &RefCell<Memory> {
375 &self.internal.memory
376 }
377
378 #[cfg(feature = "native_crypto")]
379 pub fn init_native(config: RuntimeConfig) -> Runtime {
380 use crate::utils::time_native::TimeNative;
381
382 Self::init(
383 config,
384 GlobalContext::new(
385 Arc::new(Mutex::new(CryptoNative)),
386 Arc::new(Mutex::new(TimeNative)),
387 ),
388 )
389 }
390
391 pub async fn start(&self) {
395 if *self.internal().update_loop_running.borrow() {
396 info!("runtime update loop already running, skipping start");
397 return;
398 }
399 info!("starting runtime...");
400 self.com_hub()
401 .init()
402 .await
403 .expect("Failed to initialize ComHub");
404
405 self.register_interface_factories();
407
408 if let Some(interfaces) = &self.internal.config.interfaces {
410 for RuntimeConfigInterface {r#type, config} in interfaces.iter() {
411 if let Err(err) = self.com_hub().create_interface(r#type, config.clone(), InterfacePriority::default()).await {
412 error!("Failed to create interface {type}: {err:?}");
413 } else {
414 info!("Created interface: {type}");
415 }
416 }
417 }
418
419 RuntimeInternal::start_update_loop(self.internal());
420 }
421
422 pub async fn create(
424 config: RuntimeConfig,
425 global_context: GlobalContext,
426 ) -> Runtime {
427 let runtime = Self::init(config, global_context);
428 runtime.start().await;
429 runtime
430 }
431
432 #[cfg(feature = "native_crypto")]
434 pub async fn create_native(
435 config: RuntimeConfig,
436 ) -> Runtime {
437 let runtime = Self::init_native(config);
438 runtime.start().await;
439 runtime
440 }
441
442 fn register_interface_factories(&self) {
443 crate::network::com_interfaces::default_com_interfaces::base_interface::BaseInterface::register_on_com_hub(self.com_hub());
444
445 #[cfg(feature = "native_websocket")]
446 crate::network::com_interfaces::default_com_interfaces::websocket::websocket_client_native_interface::WebSocketClientNativeInterface::register_on_com_hub(self.com_hub());
447 #[cfg(feature = "native_websocket")]
448 crate::network::com_interfaces::default_com_interfaces::websocket::websocket_server_native_interface::WebSocketServerNativeInterface::register_on_com_hub(self.com_hub());
449 #[cfg(feature = "native_serial")]
450 crate::network::com_interfaces::default_com_interfaces::serial::serial_native_interface::SerialNativeInterface::register_on_com_hub(self.com_hub());
451 #[cfg(feature = "native_tcp")]
452 crate::network::com_interfaces::default_com_interfaces::tcp::tcp_client_native_interface::TCPClientNativeInterface::register_on_com_hub(self.com_hub());
453 #[cfg(feature = "native_tcp")]
454 crate::network::com_interfaces::default_com_interfaces::tcp::tcp_server_native_interface::TCPServerNativeInterface::register_on_com_hub(self.com_hub());
455 }
459
460 pub async fn execute(
461 &self,
462 script: &str,
463 inserted_values: &[ValueContainer],
464 execution_context: Option<&mut ExecutionContext>,
465 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
466 RuntimeInternal::execute(self.internal(), script, inserted_values, execution_context).await
467 }
468
469 pub fn execute_sync(
470 &self,
471 script: &str,
472 inserted_values: &[ValueContainer],
473 execution_context: Option<&mut ExecutionContext>,
474 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
475 RuntimeInternal::execute_sync(self.internal(), script, inserted_values, execution_context)
476 }
477
478 pub async fn execute_dxb<'a>(
479 &'a self,
480 dxb: Vec<u8>,
481 execution_context: Option<&'a mut ExecutionContext>,
482 end_execution: bool,
483 ) -> Result<Option<ValueContainer>, ExecutionError> {
484 RuntimeInternal::execute_dxb(self.internal(), dxb, execution_context, end_execution).await
485 }
486
487 pub fn execute_dxb_sync(
488 &self,
489 dxb: &[u8],
490 execution_context: Option<&mut ExecutionContext>,
491 end_execution: bool,
492 ) -> Result<Option<ValueContainer>, ExecutionError> {
493 RuntimeInternal::execute_dxb_sync(self.internal(), dxb, execution_context, end_execution)
494 }
495
496 async fn execute_remote(
497 &self,
498 remote_execution_context: &mut RemoteExecutionContext,
499 dxb: Vec<u8>
500 ) -> Result<Option<ValueContainer>, ExecutionError> {
501 RuntimeInternal::execute_remote(self.internal(), remote_execution_context, dxb).await
502 }
503}