datex_core/runtime/
update_loop.rs1use std::rc::Rc;
2use std::time::Duration;
3use futures::channel::oneshot;
4use log::info;
5use crate::compiler::compile_value;
6use crate::global::dxb_block::{DXBBlock, OutgoingContextId};
7use crate::global::protocol_structures::block_header::{BlockHeader, BlockType};
8use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
9use crate::global::protocol_structures::routing_header;
10use crate::global::protocol_structures::routing_header::RoutingHeader;
11use crate::runtime::{RuntimeInternal};
12use crate::runtime::execution::ExecutionError;
13use crate::task::{sleep, spawn_with_panic_notify};
14use crate::values::core_values::endpoint::Endpoint;
15use crate::values::value_container::ValueContainer;
16use crate::global::protocol_structures::block_header::{
17 FlagsAndTimestamp,
18};
19
20impl RuntimeInternal {
21 pub fn start_update_loop(self_rc: Rc<RuntimeInternal>) {
23 info!("starting runtime update loop...");
24
25 if *self_rc.update_loop_running.borrow() {
27 return;
28 }
29
30 *self_rc.update_loop_running.borrow_mut() = true;
32
33 spawn_with_panic_notify(async move {
34 while *self_rc.update_loop_running.borrow() {
35 RuntimeInternal::update(self_rc.clone()).await;
36 sleep(Duration::from_millis(1)).await;
37 }
38 if let Some(sender) =
39 self_rc.update_loop_stop_sender.borrow_mut().take()
40 {
41 sender.send(()).expect("Failed to send stop signal");
42 }
43 });
44 }
45
46 pub async fn stop_update_loop(self_rc: Rc<RuntimeInternal>) {
48 info!("Stopping Runtime update loop for {}", self_rc.endpoint);
49 *self_rc.update_loop_running.borrow_mut() = false;
50
51 let (sender, receiver) = oneshot::channel::<()>();
52
53 self_rc.update_loop_stop_sender.borrow_mut().replace(sender);
54
55 receiver.await.unwrap();
56 }
57
58 async fn update(self_rc: Rc<RuntimeInternal>) {
60 self_rc.com_hub.update();
62 RuntimeInternal::handle_incoming_sections(self_rc);
64 }
65
66 fn handle_incoming_sections(self_rc: Rc<RuntimeInternal>) {
68 let mut sections = self_rc.com_hub.block_handler.incoming_sections_queue.borrow_mut();
69 for section in sections.drain(..) {
71 let self_rc = self_rc.clone();
73 spawn_with_panic_notify(async move {
74 let (result, endpoint, context_id) = RuntimeInternal::execute_incoming_section(self_rc.clone(), section).await;
75 info!("Execution result (on {} from {}): {result:?}", self_rc.endpoint, endpoint);
76 let res = RuntimeInternal::send_response_block(
78 self_rc.clone(),
79 result,
80 endpoint,
81 context_id,
82 );
83 });
85 }
86 }
87
88 fn send_response_block(
89 self_rc: Rc<RuntimeInternal>,
90 result: Result<Option<ValueContainer>, ExecutionError>,
91 receiver_endpoint: Endpoint,
92 context_id: OutgoingContextId,
93 ) -> Result<(), Vec<Endpoint>> {
94 let routing_header: RoutingHeader = RoutingHeader {
95 version: 2,
96 flags: routing_header::Flags::new(),
97 block_size_u16: Some(0),
98 block_size_u32: None,
99 sender: self_rc.endpoint.clone(),
100 receivers: routing_header::Receivers {
101 flags: routing_header::ReceiverFlags::new()
102 .with_has_endpoints(false)
103 .with_has_pointer_id(false)
104 .with_has_endpoint_keys(false),
105 pointer_id: None,
106 endpoints: None,
107 endpoints_with_keys: None,
108 },
109 ..RoutingHeader::default()
110 };
111
112 let block_header = BlockHeader {
113 context_id,
114 flags_and_timestamp: FlagsAndTimestamp::new()
115 .with_block_type(BlockType::Response)
116 .with_is_end_of_section(true)
117 .with_is_end_of_context(true),
118 ..BlockHeader::default()
119 };
120 let encrypted_header = EncryptedHeader::default();
121
122 info!("send response, context_id: {context_id:?}, receiver: {receiver_endpoint}");
123
124 if let Ok(value) = result {
125
126 let dxb = if let Some(value) = &value {
127 compile_value(value)
128 } else {
129 Ok(vec![])
130 };
131
132 let dxb = dxb.unwrap();
134
135 let mut block =
136 DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
137 block.set_receivers(std::slice::from_ref(&receiver_endpoint));
138
139 self_rc.com_hub.send_own_block(block)
140 }
141 else {
142 todo!("#233 Handle returning error response block");
143 }
144 }
145}