datex_core/runtime/
update_loop.rs

1use std::rc::Rc;
2use std::time::Duration;
3use futures::channel::oneshot;
4use log::info;
5use crate::compiler::compile_value;
6use crate::global::dxb_block::{DXBBlock, OutgoingContextId};
7use crate::global::protocol_structures::block_header::{BlockHeader, BlockType};
8use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
9use crate::global::protocol_structures::routing_header;
10use crate::global::protocol_structures::routing_header::RoutingHeader;
11use crate::runtime::{RuntimeInternal};
12use crate::runtime::execution::ExecutionError;
13use crate::task::{sleep, spawn_with_panic_notify};
14use crate::values::core_values::endpoint::Endpoint;
15use crate::values::value_container::ValueContainer;
16use crate::global::protocol_structures::block_header::{
17    FlagsAndTimestamp,
18};
19
20impl RuntimeInternal {
21    /// Starts the
22    pub fn start_update_loop(self_rc: Rc<RuntimeInternal>) {
23        info!("starting runtime update loop...");
24
25        // if already running, do nothing
26        if *self_rc.update_loop_running.borrow() {
27            return;
28        }
29
30        // set update loop running flag
31        *self_rc.update_loop_running.borrow_mut() = true;
32
33        spawn_with_panic_notify(async move {
34            while *self_rc.update_loop_running.borrow() {
35                RuntimeInternal::update(self_rc.clone()).await;
36                sleep(Duration::from_millis(1)).await;
37            }
38            if let Some(sender) =
39                self_rc.update_loop_stop_sender.borrow_mut().take()
40            {
41                sender.send(()).expect("Failed to send stop signal");
42            }
43        });
44    }
45
46    /// Stops the update loop for the Runtime, if it is running.
47    pub async fn stop_update_loop(self_rc: Rc<RuntimeInternal>) {
48        info!("Stopping Runtime update loop for {}", self_rc.endpoint);
49        *self_rc.update_loop_running.borrow_mut() = false;
50
51        let (sender, receiver) = oneshot::channel::<()>();
52
53        self_rc.update_loop_stop_sender.borrow_mut().replace(sender);
54
55        receiver.await.unwrap();
56    }
57
58    /// main update loop
59    async fn update(self_rc: Rc<RuntimeInternal>) {
60        // update the ComHub
61        self_rc.com_hub.update();
62        // handle incoming sections
63        RuntimeInternal::handle_incoming_sections(self_rc);
64    }
65
66    /// pops incoming sections from the ComHub and executes them in separate tasks
67    fn handle_incoming_sections(self_rc: Rc<RuntimeInternal>) {
68        let mut sections = self_rc.com_hub.block_handler.incoming_sections_queue.borrow_mut();
69        // get incoming sections from ComHub
70        for section in sections.drain(..) {
71            // execute the section in a separate task
72            let self_rc = self_rc.clone();
73            spawn_with_panic_notify(async move {
74                let (result, endpoint, context_id) = RuntimeInternal::execute_incoming_section(self_rc.clone(), section).await;
75                info!("Execution result (on {} from {}): {result:?}", self_rc.endpoint, endpoint);
76                // send response back to the sender
77                let res = RuntimeInternal::send_response_block(
78                    self_rc.clone(),
79                    result,
80                    endpoint,
81                    context_id,
82                );
83                // TODO #231: handle errors in sending response
84            });
85        }
86    }
87
88    fn send_response_block(
89        self_rc: Rc<RuntimeInternal>,
90        result: Result<Option<ValueContainer>, ExecutionError>,
91        receiver_endpoint: Endpoint,
92        context_id: OutgoingContextId,
93    ) -> Result<(), Vec<Endpoint>> {
94        let routing_header: RoutingHeader = RoutingHeader {
95            version: 2,
96            flags: routing_header::Flags::new(),
97            block_size_u16: Some(0),
98            block_size_u32: None,
99            sender: self_rc.endpoint.clone(),
100            receivers: routing_header::Receivers {
101                flags: routing_header::ReceiverFlags::new()
102                    .with_has_endpoints(false)
103                    .with_has_pointer_id(false)
104                    .with_has_endpoint_keys(false),
105                pointer_id: None,
106                endpoints: None,
107                endpoints_with_keys: None,
108            },
109            ..RoutingHeader::default()
110        };
111
112        let block_header = BlockHeader {
113            context_id,
114            flags_and_timestamp: FlagsAndTimestamp::new()
115                .with_block_type(BlockType::Response)
116                .with_is_end_of_section(true)
117                .with_is_end_of_context(true),
118            ..BlockHeader::default()
119        };
120        let encrypted_header = EncryptedHeader::default();
121
122        info!("send response, context_id: {context_id:?}, receiver: {receiver_endpoint}");
123
124        if let Ok(value) = result {
125
126            let dxb = if let Some(value) = &value {
127                compile_value(value)
128            } else {
129                Ok(vec![])
130            };
131
132            // TODO #232: handle compiler error here
133            let dxb = dxb.unwrap();
134
135            let mut block =
136                DXBBlock::new(routing_header, block_header, encrypted_header, dxb);
137            block.set_receivers(std::slice::from_ref(&receiver_endpoint));
138
139            self_rc.com_hub.send_own_block(block)
140        }
141        else {
142            todo!("#233 Handle returning error response block");
143        }
144    }
145}