datex_core/runtime/
update_loop.rs1use crate::core_compiler::value_compiler::compile_value_container;
2use crate::global::dxb_block::{DXBBlock, IncomingSection, OutgoingContextId};
3use crate::global::protocol_structures::block_header::FlagsAndTimestamp;
4use crate::global::protocol_structures::block_header::{
5 BlockHeader, BlockType,
6};
7use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
8use crate::global::protocol_structures::routing_header::RoutingHeader;
9use crate::runtime::RuntimeInternal;
10use crate::runtime::execution::ExecutionError;
11use crate::stdlib::borrow::ToOwned;
12use crate::stdlib::rc::Rc;
13use crate::stdlib::vec;
14use crate::stdlib::vec::Vec;
15use crate::task::{sleep, spawn_with_panic_notify};
16use crate::values::core_values::endpoint::Endpoint;
17use crate::values::value_container::ValueContainer;
18use core::prelude::rust_2024::*;
19use core::result::Result;
20use core::time::Duration;
21use futures::channel::oneshot;
22use log::info;
23
24#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
25async fn handle_incoming_section_task(
26 runtime_rc: Rc<RuntimeInternal>,
27 section: IncomingSection,
28) {
29 let (result, endpoint, context_id) =
30 RuntimeInternal::execute_incoming_section(runtime_rc.clone(), section)
31 .await;
32 info!(
33 "Execution result (on {} from {}): {result:?}",
34 runtime_rc.endpoint, endpoint
35 );
36 let res = RuntimeInternal::send_response_block(
38 runtime_rc.clone(),
39 result,
40 endpoint,
41 context_id,
42 )
43 .await;
44 }
46
47#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
48async fn update_loop_task(runtime_rc: Rc<RuntimeInternal>) {
49 while *runtime_rc.update_loop_running.borrow() {
50 RuntimeInternal::update(runtime_rc.clone()).await;
51 sleep(Duration::from_millis(1)).await;
52 }
53 if let Some(sender) = runtime_rc.update_loop_stop_sender.borrow_mut().take()
54 {
55 sender.send(()).expect("Failed to send stop signal");
56 }
57}
58
59impl RuntimeInternal {
60 pub fn start_update_loop(self_rc: Rc<RuntimeInternal>) {
62 info!("starting runtime update loop...");
63
64 if *self_rc.update_loop_running.borrow() {
66 return;
67 }
68
69 *self_rc.update_loop_running.borrow_mut() = true;
71
72 spawn_with_panic_notify(
73 &self_rc.clone().async_context,
74 update_loop_task(self_rc),
75 );
76 }
77
78 pub async fn stop_update_loop(self_rc: Rc<RuntimeInternal>) {
80 info!("Stopping Runtime update loop for {}", self_rc.endpoint);
81 *self_rc.update_loop_running.borrow_mut() = false;
82
83 let (sender, receiver) = oneshot::channel::<()>();
84
85 self_rc.update_loop_stop_sender.borrow_mut().replace(sender);
86
87 receiver.await.unwrap();
88 }
89
90 async fn update(self_rc: Rc<RuntimeInternal>) {
92 self_rc.com_hub.update().await;
94 RuntimeInternal::handle_incoming_sections(self_rc);
96 }
97
98 fn handle_incoming_sections(self_rc: Rc<RuntimeInternal>) {
100 let mut sections = self_rc
101 .com_hub
102 .block_handler
103 .incoming_sections_queue
104 .borrow_mut();
105 let async_context = &self_rc.async_context;
106 for section in sections.drain(..) {
108 let self_rc = self_rc.clone();
110 spawn_with_panic_notify(
111 async_context,
112 handle_incoming_section_task(self_rc, section),
113 );
114 }
115 }
116
117 async fn send_response_block(
118 self_rc: Rc<RuntimeInternal>,
119 result: Result<Option<ValueContainer>, ExecutionError>,
120 receiver_endpoint: Endpoint,
121 context_id: OutgoingContextId,
122 ) -> Result<(), Vec<Endpoint>> {
123 let routing_header: RoutingHeader = RoutingHeader::default()
124 .with_sender(self_rc.endpoint.clone())
125 .to_owned();
126 let block_header = BlockHeader {
127 context_id,
128 flags_and_timestamp: FlagsAndTimestamp::new()
129 .with_block_type(BlockType::Response)
130 .with_is_end_of_section(true)
131 .with_is_end_of_context(true),
132 ..BlockHeader::default()
133 };
134 let encrypted_header = EncryptedHeader::default();
135
136 info!(
137 "send response, context_id: {context_id:?}, receiver: {receiver_endpoint}"
138 );
139
140 if let Ok(value) = result {
141 let dxb = if let Some(value) = &value {
142 compile_value_container(value)
143 } else {
144 vec![]
145 };
146
147 let mut block = DXBBlock::new(
148 routing_header,
149 block_header,
150 encrypted_header,
151 dxb,
152 );
153 block.set_receivers(core::slice::from_ref(&receiver_endpoint));
154
155 self_rc.com_hub.send_own_block(block).await
156 } else {
157 core::todo!("#233 Handle returning error response block");
158 }
159 }
160}