datex_core/runtime/
update_loop.rs1use crate::compiler::compile_value;
2use crate::global::dxb_block::{DXBBlock, OutgoingContextId};
3use crate::global::protocol_structures::block_header::FlagsAndTimestamp;
4use crate::global::protocol_structures::block_header::{
5 BlockHeader, BlockType,
6};
7use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
8use crate::global::protocol_structures::routing_header::RoutingHeader;
9use crate::runtime::RuntimeInternal;
10use crate::runtime::execution::ExecutionError;
11use crate::task::{sleep, spawn_with_panic_notify};
12use crate::values::core_values::endpoint::Endpoint;
13use crate::values::value_container::ValueContainer;
14use futures::channel::oneshot;
15use log::info;
16use std::rc::Rc;
17use std::time::Duration;
18
19impl RuntimeInternal {
20 pub fn start_update_loop(self_rc: Rc<RuntimeInternal>) {
22 info!("starting runtime update loop...");
23
24 if *self_rc.update_loop_running.borrow() {
26 return;
27 }
28
29 *self_rc.update_loop_running.borrow_mut() = true;
31
32 spawn_with_panic_notify(async move {
33 while *self_rc.update_loop_running.borrow() {
34 RuntimeInternal::update(self_rc.clone()).await;
35 sleep(Duration::from_millis(1)).await;
36 }
37 if let Some(sender) =
38 self_rc.update_loop_stop_sender.borrow_mut().take()
39 {
40 sender.send(()).expect("Failed to send stop signal");
41 }
42 });
43 }
44
45 pub async fn stop_update_loop(self_rc: Rc<RuntimeInternal>) {
47 info!("Stopping Runtime update loop for {}", self_rc.endpoint);
48 *self_rc.update_loop_running.borrow_mut() = false;
49
50 let (sender, receiver) = oneshot::channel::<()>();
51
52 self_rc.update_loop_stop_sender.borrow_mut().replace(sender);
53
54 receiver.await.unwrap();
55 }
56
57 async fn update(self_rc: Rc<RuntimeInternal>) {
59 self_rc.com_hub.update();
61 RuntimeInternal::handle_incoming_sections(self_rc);
63 }
64
65 fn handle_incoming_sections(self_rc: Rc<RuntimeInternal>) {
67 let mut sections = self_rc
68 .com_hub
69 .block_handler
70 .incoming_sections_queue
71 .borrow_mut();
72 for section in sections.drain(..) {
74 let self_rc = self_rc.clone();
76 spawn_with_panic_notify(async move {
77 let (result, endpoint, context_id) =
78 RuntimeInternal::execute_incoming_section(
79 self_rc.clone(),
80 section,
81 )
82 .await;
83 info!(
84 "Execution result (on {} from {}): {result:?}",
85 self_rc.endpoint, endpoint
86 );
87 let res = RuntimeInternal::send_response_block(
89 self_rc.clone(),
90 result,
91 endpoint,
92 context_id,
93 );
94 });
96 }
97 }
98
99 fn send_response_block(
100 self_rc: Rc<RuntimeInternal>,
101 result: Result<Option<ValueContainer>, ExecutionError>,
102 receiver_endpoint: Endpoint,
103 context_id: OutgoingContextId,
104 ) -> Result<(), Vec<Endpoint>> {
105 let routing_header: RoutingHeader = RoutingHeader::default()
106 .with_sender(self_rc.endpoint.clone())
107 .to_owned();
108
109 let block_header = BlockHeader {
110 context_id,
111 flags_and_timestamp: FlagsAndTimestamp::new()
112 .with_block_type(BlockType::Response)
113 .with_is_end_of_section(true)
114 .with_is_end_of_context(true),
115 ..BlockHeader::default()
116 };
117 let encrypted_header = EncryptedHeader::default();
118
119 info!(
120 "send response, context_id: {context_id:?}, receiver: {receiver_endpoint}"
121 );
122
123 if let Ok(value) = result {
124 let dxb = if let Some(value) = &value {
125 compile_value(value)
126 } else {
127 Ok(vec![])
128 };
129
130 let dxb = dxb.unwrap();
132
133 let mut block = DXBBlock::new(
134 routing_header,
135 block_header,
136 encrypted_header,
137 dxb,
138 );
139 block.set_receivers(std::slice::from_ref(&receiver_endpoint));
140
141 self_rc.com_hub.send_own_block(block)
142 } else {
143 todo!("#233 Handle returning error response block");
144 }
145 }
146}