1use crate::collections::HashMap;
2#[cfg(all(feature = "native_crypto", feature = "std"))]
3use crate::crypto::crypto_native::CryptoNative;
4use crate::global::dxb_block::{
5 DXBBlock, IncomingEndpointContextSectionId, IncomingSection,
6 OutgoingContextId,
7};
8use crate::global::protocol_structures::block_header::BlockHeader;
9use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
10use crate::global::protocol_structures::routing_header::RoutingHeader;
11use crate::logger::{init_logger, init_logger_debug};
12use crate::network::com_hub::{ComHub, InterfacePriority, ResponseOptions};
13use crate::network::com_interfaces::com_interface::ComInterfaceFactory;
14use crate::runtime::execution::ExecutionError;
15use crate::runtime::execution::context::ExecutionMode;
16use crate::serde::error::SerializationError;
17use crate::serde::serializer::to_value_container;
18use crate::stdlib::borrow::ToOwned;
19use crate::stdlib::boxed::Box;
20use crate::stdlib::pin::Pin;
21use crate::stdlib::string::String;
22use crate::stdlib::string::ToString;
23use crate::stdlib::sync::Arc;
24use crate::stdlib::vec;
25use crate::stdlib::vec::Vec;
26use crate::stdlib::{cell::RefCell, rc::Rc};
27use crate::time::Instant;
28use crate::utils::time::Time;
29use crate::values::core_values::endpoint::Endpoint;
30use crate::values::value_container::ValueContainer;
31use core::fmt::Debug;
32use core::prelude::rust_2024::*;
33use core::result::Result;
34use core::slice;
35use core::unreachable;
36use execution::context::{
37 ExecutionContext, RemoteExecutionContext, ScriptExecutionError,
38};
39use futures::channel::oneshot::Sender;
40use global_context::{GlobalContext, set_global_context};
41use log::{debug, error, info};
42use serde::{Deserialize, Serialize};
43
44pub mod dif_interface;
45pub mod execution;
46pub mod global_context;
47pub mod memory;
48mod update_loop;
49
50use self::memory::Memory;
51
52const VERSION: &str = env!("CARGO_PKG_VERSION");
53
54#[derive(Clone)]
55pub struct Runtime {
56 pub version: String,
57 pub internal: Rc<RuntimeInternal>,
58}
59
60impl Debug for Runtime {
61 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62 f.debug_struct("Runtime")
63 .field("version", &self.version)
64 .finish()
65 }
66}
67
68#[derive(Clone)]
69pub struct AsyncContext {
70 #[cfg(feature = "embassy_runtime")]
71 pub spawner: embassy_executor::Spawner,
72}
73#[cfg(not(feature = "embassy_runtime"))]
74impl Default for AsyncContext {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl AsyncContext {
81 #[cfg(feature = "embassy_runtime")]
82 pub fn new(spawner: embassy_executor::Spawner) -> Self {
83 Self { spawner }
84 }
85 #[cfg(not(feature = "embassy_runtime"))]
86 pub fn new() -> Self {
87 Self {}
88 }
89}
90
91impl Debug for AsyncContext {
92 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
93 core::write!(f, "AsyncContext")
94 }
95}
96
97#[derive(Debug)]
98pub struct RuntimeInternal {
99 pub memory: RefCell<Memory>,
100 pub com_hub: ComHub,
101 pub endpoint: Endpoint,
102 pub config: RuntimeConfig,
103 update_loop_running: RefCell<bool>,
106 update_loop_stop_sender: RefCell<Option<Sender<()>>>,
107
108 pub execution_contexts:
110 RefCell<HashMap<IncomingEndpointContextSectionId, ExecutionContext>>,
111 pub async_context: AsyncContext,
112}
113
114macro_rules! get_execution_context {
115 ($self_rc:expr, $execution_context:expr) => {
117 match $execution_context {
118 Some(context) => {
119 if let &mut ExecutionContext::Local(ref mut local_context) = context {
121 local_context.set_runtime_internal($self_rc.clone());
122 }
123 context
124 },
125 None => {
126 &mut ExecutionContext::local_with_runtime_internal($self_rc.clone(), ExecutionMode::Static)
127 }
128 }
129 };
130}
131
132impl RuntimeInternal {
133 fn new(async_context: AsyncContext) -> Self {
134 RuntimeInternal {
135 endpoint: Endpoint::default(),
136 config: RuntimeConfig::default(),
137 memory: RefCell::new(Memory::new(Endpoint::default())),
138 com_hub: ComHub::new(Endpoint::default(), async_context.clone()),
139 update_loop_running: RefCell::new(false),
140 update_loop_stop_sender: RefCell::new(None),
141 execution_contexts: RefCell::new(HashMap::new()),
142 async_context,
143 }
144 }
145
146 #[cfg(feature = "compiler")]
147 pub async fn execute(
148 self_rc: Rc<RuntimeInternal>,
149 script: &str,
150 inserted_values: &[ValueContainer],
151 execution_context: Option<&mut ExecutionContext>,
152 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
153 let execution_context =
154 get_execution_context!(self_rc, execution_context);
155 let compile_start = Instant::now();
156 let dxb = execution_context.compile(script, inserted_values)?;
157 debug!(
158 "[Compilation took {} ms]",
159 compile_start.elapsed().as_millis()
160 );
161 let execute_start = Instant::now();
162 let result = RuntimeInternal::execute_dxb(
163 self_rc,
164 dxb,
165 Some(execution_context),
166 true,
167 )
168 .await
169 .map_err(ScriptExecutionError::from);
170 debug!(
171 "[Execution took {} ms]",
172 execute_start.elapsed().as_millis()
173 );
174 result
175 }
176
177 #[cfg(feature = "compiler")]
178 pub fn execute_sync(
179 self_rc: Rc<RuntimeInternal>,
180 script: &str,
181 inserted_values: &[ValueContainer],
182 execution_context: Option<&mut ExecutionContext>,
183 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
184 let execution_context =
185 get_execution_context!(self_rc, execution_context);
186 let compile_start = Instant::now();
187 let dxb = execution_context.compile(script, inserted_values)?;
188 debug!(
189 "[Compilation took {} ms]",
190 compile_start.elapsed().as_millis()
191 );
192 let execute_start = Instant::now();
193 let result = RuntimeInternal::execute_dxb_sync(
194 self_rc,
195 &dxb,
196 Some(execution_context),
197 true,
198 )
199 .map_err(ScriptExecutionError::from);
200 debug!(
201 "[Execution took {} ms]",
202 execute_start.elapsed().as_millis()
203 );
204 result
205 }
206
207 pub fn execute_dxb<'a>(
208 self_rc: Rc<RuntimeInternal>,
209 dxb: Vec<u8>,
210 execution_context: Option<&'a mut ExecutionContext>,
211 end_execution: bool,
212 ) -> Pin<
213 Box<
214 dyn Future<Output = Result<Option<ValueContainer>, ExecutionError>>
215 + 'a,
216 >,
217 > {
218 Box::pin(async move {
219 let execution_context =
220 get_execution_context!(self_rc, execution_context);
221 match execution_context {
222 ExecutionContext::Remote(context) => {
223 RuntimeInternal::execute_remote(self_rc, context, dxb).await
224 }
225 ExecutionContext::Local(_) => {
226 execution_context.execute_dxb(&dxb).await
227 }
228 }
229 })
230 }
231
232 pub fn execute_dxb_sync(
233 self_rc: Rc<RuntimeInternal>,
234 dxb: &[u8],
235 execution_context: Option<&mut ExecutionContext>,
236 end_execution: bool,
237 ) -> Result<Option<ValueContainer>, ExecutionError> {
238 let execution_context =
239 get_execution_context!(self_rc, execution_context);
240 match execution_context {
241 ExecutionContext::Remote(_) => {
242 Err(ExecutionError::RequiresAsyncExecution)
243 }
244 ExecutionContext::Local(_) => {
245 execution_context.execute_dxb_sync(dxb)
246 }
247 }
248 }
249
250 fn take_execution_context(
254 self_rc: Rc<RuntimeInternal>,
255 context_id: &IncomingEndpointContextSectionId,
256 ) -> ExecutionContext {
257 let mut execution_contexts = self_rc.execution_contexts.borrow_mut();
258 let execution_context = execution_contexts.remove(context_id);
260 if let Some(context) = execution_context {
261 context
262 } else {
263 ExecutionContext::local_with_runtime_internal(
264 self_rc.clone(),
265 ExecutionMode::unbounded(),
266 )
267 }
268 }
269
270 pub async fn execute_remote(
271 self_rc: Rc<RuntimeInternal>,
272 remote_execution_context: &mut RemoteExecutionContext,
273 dxb: Vec<u8>,
274 ) -> Result<Option<ValueContainer>, ExecutionError> {
275 let routing_header: RoutingHeader = RoutingHeader::default()
276 .with_sender(self_rc.endpoint.clone())
277 .to_owned();
278
279 let context_id =
281 remote_execution_context.context_id.unwrap_or_else(|| {
282 remote_execution_context.context_id =
284 Some(self_rc.com_hub.block_handler.get_new_context_id());
285 remote_execution_context.context_id.unwrap()
286 });
287
288 let block_header = BlockHeader {
289 context_id,
290 ..BlockHeader::default()
291 };
292 let encrypted_header = EncryptedHeader::default();
293
294 let mut block =
295 DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
296
297 block
298 .set_receivers(slice::from_ref(&remote_execution_context.endpoint));
299
300 let response = self_rc
301 .com_hub
302 .send_own_block_await_response(block, ResponseOptions::default())
303 .await
304 .remove(0)?;
305 let incoming_section = response.take_incoming_section();
306 RuntimeInternal::execute_incoming_section(self_rc, incoming_section)
307 .await
308 .0
309 }
310
311 async fn execute_incoming_section(
312 self_rc: Rc<RuntimeInternal>,
313 mut incoming_section: IncomingSection,
314 ) -> (
315 Result<Option<ValueContainer>, ExecutionError>,
316 Endpoint,
317 OutgoingContextId,
318 ) {
319 let section_context_id =
320 incoming_section.get_section_context_id().clone();
321 let mut context =
322 Self::take_execution_context(self_rc.clone(), §ion_context_id);
323 info!(
324 "Executing incoming section with index: {}",
325 incoming_section.get_section_index()
326 );
327
328 let mut result = None;
329 let mut last_block = None;
330
331 loop {
333 let block = incoming_section.next().await;
334 if let Some(block) = block {
335 let res = RuntimeInternal::execute_dxb_block_local(
336 self_rc.clone(),
337 block.clone(),
338 Some(&mut context),
339 )
340 .await;
341 if let Err(err) = res {
342 return (
343 Err(err),
344 block.get_sender().clone(),
345 block.block_header.context_id,
346 );
347 }
348 result = res.unwrap();
349 last_block = Some(block);
350 } else {
351 break;
352 }
353 }
354
355 if last_block.is_none() {
356 unreachable!("Incoming section must contain at least one block");
357 }
358 let last_block = last_block.unwrap();
359 let sender_endpoint = last_block.get_sender().clone();
360 let context_id = last_block.block_header.context_id;
361
362 self_rc
365 .execution_contexts
366 .borrow_mut()
367 .insert(section_context_id, context);
368
369 (Ok(result), sender_endpoint, context_id)
370 }
371
372 async fn execute_dxb_block_local(
373 self_rc: Rc<RuntimeInternal>,
374 block: DXBBlock,
375 execution_context: Option<&mut ExecutionContext>,
376 ) -> Result<Option<ValueContainer>, ExecutionError> {
377 let execution_context =
378 get_execution_context!(self_rc, execution_context);
379 if !core::matches!(execution_context, ExecutionContext::Local(_)) {
381 unreachable!(
382 "Execution context must be local for executing a DXB block"
383 );
384 }
385 let dxb = block.body;
386 let end_execution =
387 block.block_header.flags_and_timestamp.is_end_of_section();
388 RuntimeInternal::execute_dxb(
389 self_rc,
390 dxb,
391 Some(execution_context),
392 end_execution,
393 )
394 .await
395 }
396}
397
398#[derive(Debug, Deserialize, Serialize)]
399pub struct RuntimeConfigInterface {
400 #[serde(rename = "type")]
401 pub interface_type: String,
402 pub config: ValueContainer,
403}
404
405impl RuntimeConfigInterface {
406 pub fn new<T: Serialize>(
407 interface_type: &str,
408 config: T,
409 ) -> Result<RuntimeConfigInterface, SerializationError> {
410 Ok(RuntimeConfigInterface {
411 interface_type: interface_type.to_string(),
412 config: to_value_container(&config)?,
413 })
414 }
415
416 pub fn new_from_value_container(
417 interface_type: &str,
418 config: ValueContainer,
419 ) -> RuntimeConfigInterface {
420 RuntimeConfigInterface {
421 interface_type: interface_type.to_string(),
422 config,
423 }
424 }
425}
426
427#[derive(Debug, Default, Deserialize, Serialize)]
428pub struct RuntimeConfig {
429 pub endpoint: Option<Endpoint>,
430 pub interfaces: Option<Vec<RuntimeConfigInterface>>,
431 pub env: Option<HashMap<String, String>>,
432 pub debug: Option<bool>,
434}
435
436impl RuntimeConfig {
437 pub fn new_with_endpoint(endpoint: Endpoint) -> Self {
438 RuntimeConfig {
439 endpoint: Some(endpoint),
440 interfaces: None,
441 env: None,
442 debug: None,
443 }
444 }
445
446 pub fn add_interface<T: Serialize>(
447 &mut self,
448 interface_type: String,
449 config: T,
450 ) -> Result<(), SerializationError> {
451 let config = to_value_container(&config)?;
452 let interface = RuntimeConfigInterface {
453 interface_type,
454 config,
455 };
456 if let Some(interfaces) = &mut self.interfaces {
457 interfaces.push(interface);
458 } else {
459 self.interfaces = Some(vec![interface]);
460 }
461
462 Ok(())
463 }
464}
465
466impl Runtime {
469 pub fn new(config: RuntimeConfig, async_context: AsyncContext) -> Runtime {
474 let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
475 let com_hub = ComHub::new(endpoint.clone(), async_context.clone());
476 let memory = RefCell::new(Memory::new(endpoint.clone()));
477 Runtime {
478 version: VERSION.to_string(),
479 internal: Rc::new(RuntimeInternal {
480 endpoint,
481 memory,
482 config,
483 com_hub,
484 ..RuntimeInternal::new(async_context)
485 }),
486 }
487 }
488
489 pub fn init(
492 config: RuntimeConfig,
493 global_context: GlobalContext,
494 async_context: AsyncContext,
495 ) -> Runtime {
496 set_global_context(global_context);
497 if let Some(debug) = config.debug
498 && debug
499 {
500 init_logger_debug();
501 } else {
502 init_logger();
503 }
504 info!(
505 "Runtime initialized - Version {VERSION} Time: {}",
506 Time::now()
507 );
508 Self::new(config, async_context)
509 }
510
511 pub fn com_hub(&self) -> &ComHub {
512 &self.internal.com_hub
513 }
514 pub fn endpoint(&self) -> Endpoint {
515 self.internal.endpoint.clone()
516 }
517
518 pub fn internal(&self) -> Rc<RuntimeInternal> {
519 Rc::clone(&self.internal)
520 }
521
522 pub fn memory(&self) -> &RefCell<Memory> {
523 &self.internal.memory
524 }
525
526 #[cfg(all(
527 feature = "native_crypto",
528 feature = "std",
529 not(feature = "embassy_runtime")
530 ))]
531 pub fn init_native(config: RuntimeConfig) -> Runtime {
532 use crate::utils::time_native::TimeNative;
533
534 Self::init(
535 config,
536 GlobalContext::new(Arc::new(CryptoNative), Arc::new(TimeNative)),
537 AsyncContext::new(),
538 )
539 }
540
541 pub async fn start(&self) {
545 if *self.internal().update_loop_running.borrow() {
546 info!("runtime update loop already running, skipping start");
547 return;
548 }
549 info!("starting runtime...");
550 self.com_hub()
551 .init()
552 .await
553 .expect("Failed to initialize ComHub");
554
555 self.register_interface_factories();
557
558 if let Some(interfaces) = &self.internal.config.interfaces {
560 for RuntimeConfigInterface {
561 interface_type,
562 config,
563 } in interfaces.iter()
564 {
565 if let Err(err) = self
566 .com_hub()
567 .create_interface(
568 interface_type,
569 config.clone(),
570 InterfacePriority::default(),
571 )
572 .await
573 {
574 error!(
575 "Failed to create interface {interface_type}: {err:?}"
576 );
577 } else {
578 info!("Created interface: {interface_type}");
579 }
580 }
581 }
582
583 RuntimeInternal::start_update_loop(self.internal());
584 }
585
586 pub async fn create(
588 config: RuntimeConfig,
589 global_context: GlobalContext,
590 async_context: AsyncContext,
591 ) -> Runtime {
592 let runtime = Self::init(config, global_context, async_context);
593 runtime.start().await;
594 runtime
595 }
596
597 #[cfg(all(
599 feature = "native_crypto",
600 feature = "std",
601 not(feature = "embassy_runtime")
602 ))]
603 pub async fn create_native(config: RuntimeConfig) -> Runtime {
604 let runtime = Self::init_native(config);
605 runtime.start().await;
606 runtime
607 }
608
609 fn register_interface_factories(&self) {
610 crate::network::com_interfaces::default_com_interfaces::base_interface::BaseInterface::register_on_com_hub(self.com_hub());
611
612 #[cfg(feature = "native_websocket")]
613 crate::network::com_interfaces::default_com_interfaces::websocket::websocket_client_native_interface::WebSocketClientNativeInterface::register_on_com_hub(self.com_hub());
614 #[cfg(feature = "native_websocket")]
615 crate::network::com_interfaces::default_com_interfaces::websocket::websocket_server_native_interface::WebSocketServerNativeInterface::register_on_com_hub(self.com_hub());
616 #[cfg(feature = "native_serial")]
617 crate::network::com_interfaces::default_com_interfaces::serial::serial_native_interface::SerialNativeInterface::register_on_com_hub(self.com_hub());
618 #[cfg(feature = "native_tcp")]
619 crate::network::com_interfaces::default_com_interfaces::tcp::tcp_client_native_interface::TCPClientNativeInterface::register_on_com_hub(self.com_hub());
620 #[cfg(feature = "native_tcp")]
621 crate::network::com_interfaces::default_com_interfaces::tcp::tcp_server_native_interface::TCPServerNativeInterface::register_on_com_hub(self.com_hub());
622 }
626
627 #[cfg(feature = "compiler")]
628 pub async fn execute(
629 &self,
630 script: &str,
631 inserted_values: &[ValueContainer],
632 execution_context: Option<&mut ExecutionContext>,
633 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
634 RuntimeInternal::execute(
635 self.internal(),
636 script,
637 inserted_values,
638 execution_context,
639 )
640 .await
641 }
642
643 #[cfg(feature = "compiler")]
644 pub fn execute_sync(
645 &self,
646 script: &str,
647 inserted_values: &[ValueContainer],
648 execution_context: Option<&mut ExecutionContext>,
649 ) -> Result<Option<ValueContainer>, ScriptExecutionError> {
650 RuntimeInternal::execute_sync(
651 self.internal(),
652 script,
653 inserted_values,
654 execution_context,
655 )
656 }
657
658 pub async fn execute_dxb<'a>(
659 &'a self,
660 dxb: Vec<u8>,
661 execution_context: Option<&'a mut ExecutionContext>,
662 end_execution: bool,
663 ) -> Result<Option<ValueContainer>, ExecutionError> {
664 RuntimeInternal::execute_dxb(
665 self.internal(),
666 dxb,
667 execution_context,
668 end_execution,
669 )
670 .await
671 }
672
673 pub fn execute_dxb_sync(
674 &self,
675 dxb: &[u8],
676 execution_context: Option<&mut ExecutionContext>,
677 end_execution: bool,
678 ) -> Result<Option<ValueContainer>, ExecutionError> {
679 RuntimeInternal::execute_dxb_sync(
680 self.internal(),
681 dxb,
682 execution_context,
683 end_execution,
684 )
685 }
686
687 async fn execute_remote(
688 &self,
689 remote_execution_context: &mut RemoteExecutionContext,
690 dxb: Vec<u8>,
691 ) -> Result<Option<ValueContainer>, ExecutionError> {
692 RuntimeInternal::execute_remote(
693 self.internal(),
694 remote_execution_context,
695 dxb,
696 )
697 .await
698 }
699}