datex_core/runtime/
update_loop.rs

1use crate::compiler::compile_value;
2use crate::global::dxb_block::{DXBBlock, OutgoingContextId};
3use crate::global::protocol_structures::block_header::FlagsAndTimestamp;
4use crate::global::protocol_structures::block_header::{
5    BlockHeader, BlockType,
6};
7use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
8use crate::global::protocol_structures::routing_header::RoutingHeader;
9use crate::runtime::RuntimeInternal;
10use crate::runtime::execution::ExecutionError;
11use crate::task::{sleep, spawn_with_panic_notify};
12use crate::values::core_values::endpoint::Endpoint;
13use crate::values::value_container::ValueContainer;
14use futures::channel::oneshot;
15use log::info;
16use std::rc::Rc;
17use std::time::Duration;
18
19impl RuntimeInternal {
20    /// Starts the
21    pub fn start_update_loop(self_rc: Rc<RuntimeInternal>) {
22        info!("starting runtime update loop...");
23
24        // if already running, do nothing
25        if *self_rc.update_loop_running.borrow() {
26            return;
27        }
28
29        // set update loop running flag
30        *self_rc.update_loop_running.borrow_mut() = true;
31
32        spawn_with_panic_notify(async move {
33            while *self_rc.update_loop_running.borrow() {
34                RuntimeInternal::update(self_rc.clone()).await;
35                sleep(Duration::from_millis(1)).await;
36            }
37            if let Some(sender) =
38                self_rc.update_loop_stop_sender.borrow_mut().take()
39            {
40                sender.send(()).expect("Failed to send stop signal");
41            }
42        });
43    }
44
45    /// Stops the update loop for the Runtime, if it is running.
46    pub async fn stop_update_loop(self_rc: Rc<RuntimeInternal>) {
47        info!("Stopping Runtime update loop for {}", self_rc.endpoint);
48        *self_rc.update_loop_running.borrow_mut() = false;
49
50        let (sender, receiver) = oneshot::channel::<()>();
51
52        self_rc.update_loop_stop_sender.borrow_mut().replace(sender);
53
54        receiver.await.unwrap();
55    }
56
57    /// main update loop
58    async fn update(self_rc: Rc<RuntimeInternal>) {
59        // update the ComHub
60        self_rc.com_hub.update();
61        // handle incoming sections
62        RuntimeInternal::handle_incoming_sections(self_rc);
63    }
64
65    /// pops incoming sections from the ComHub and executes them in separate tasks
66    fn handle_incoming_sections(self_rc: Rc<RuntimeInternal>) {
67        let mut sections = self_rc
68            .com_hub
69            .block_handler
70            .incoming_sections_queue
71            .borrow_mut();
72        // get incoming sections from ComHub
73        for section in sections.drain(..) {
74            // execute the section in a separate task
75            let self_rc = self_rc.clone();
76            spawn_with_panic_notify(async move {
77                let (result, endpoint, context_id) =
78                    RuntimeInternal::execute_incoming_section(
79                        self_rc.clone(),
80                        section,
81                    )
82                    .await;
83                info!(
84                    "Execution result (on {} from {}): {result:?}",
85                    self_rc.endpoint, endpoint
86                );
87                // send response back to the sender
88                let res = RuntimeInternal::send_response_block(
89                    self_rc.clone(),
90                    result,
91                    endpoint,
92                    context_id,
93                );
94                // TODO #231: handle errors in sending response
95            });
96        }
97    }
98
99    fn send_response_block(
100        self_rc: Rc<RuntimeInternal>,
101        result: Result<Option<ValueContainer>, ExecutionError>,
102        receiver_endpoint: Endpoint,
103        context_id: OutgoingContextId,
104    ) -> Result<(), Vec<Endpoint>> {
105        let routing_header: RoutingHeader = RoutingHeader::default()
106            .with_sender(self_rc.endpoint.clone())
107            .to_owned();
108
109        let block_header = BlockHeader {
110            context_id,
111            flags_and_timestamp: FlagsAndTimestamp::new()
112                .with_block_type(BlockType::Response)
113                .with_is_end_of_section(true)
114                .with_is_end_of_context(true),
115            ..BlockHeader::default()
116        };
117        let encrypted_header = EncryptedHeader::default();
118
119        info!(
120            "send response, context_id: {context_id:?}, receiver: {receiver_endpoint}"
121        );
122
123        if let Ok(value) = result {
124            let dxb = if let Some(value) = &value {
125                compile_value(value)
126            } else {
127                Ok(vec![])
128            };
129
130            // TODO #232: handle compiler error here
131            let dxb = dxb.unwrap();
132
133            let mut block = DXBBlock::new(
134                routing_header,
135                block_header,
136                encrypted_header,
137                dxb,
138            );
139            block.set_receivers(std::slice::from_ref(&receiver_endpoint));
140
141            self_rc.com_hub.send_own_block(block)
142        } else {
143            todo!("#233 Handle returning error response block");
144        }
145    }
146}