datex_core/runtime/
update_loop.rs

1use crate::core_compiler::value_compiler::compile_value_container;
2use crate::global::dxb_block::{DXBBlock, IncomingSection, OutgoingContextId};
3use crate::global::protocol_structures::block_header::FlagsAndTimestamp;
4use crate::global::protocol_structures::block_header::{
5    BlockHeader, BlockType,
6};
7use crate::global::protocol_structures::encrypted_header::EncryptedHeader;
8use crate::global::protocol_structures::routing_header::RoutingHeader;
9use crate::runtime::RuntimeInternal;
10use crate::runtime::execution::ExecutionError;
11use crate::stdlib::borrow::ToOwned;
12use crate::stdlib::rc::Rc;
13use crate::stdlib::vec;
14use crate::stdlib::vec::Vec;
15use crate::task::{sleep, spawn_with_panic_notify};
16use crate::values::core_values::endpoint::Endpoint;
17use crate::values::value_container::ValueContainer;
18use core::prelude::rust_2024::*;
19use core::result::Result;
20use core::time::Duration;
21use futures::channel::oneshot;
22use log::info;
23
24#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
25async fn handle_incoming_section_task(
26    runtime_rc: Rc<RuntimeInternal>,
27    section: IncomingSection,
28) {
29    let (result, endpoint, context_id) =
30        RuntimeInternal::execute_incoming_section(runtime_rc.clone(), section)
31            .await;
32    info!(
33        "Execution result (on {} from {}): {result:?}",
34        runtime_rc.endpoint, endpoint
35    );
36    // send response back to the sender
37    let res = RuntimeInternal::send_response_block(
38        runtime_rc.clone(),
39        result,
40        endpoint,
41        context_id,
42    )
43    .await;
44    // TODO #231: handle errors in sending response
45}
46
47#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
48async fn update_loop_task(runtime_rc: Rc<RuntimeInternal>) {
49    while *runtime_rc.update_loop_running.borrow() {
50        RuntimeInternal::update(runtime_rc.clone()).await;
51        sleep(Duration::from_millis(1)).await;
52    }
53    if let Some(sender) = runtime_rc.update_loop_stop_sender.borrow_mut().take()
54    {
55        sender.send(()).expect("Failed to send stop signal");
56    }
57}
58
59impl RuntimeInternal {
60    /// Starts the
61    pub fn start_update_loop(self_rc: Rc<RuntimeInternal>) {
62        info!("starting runtime update loop...");
63
64        // if already running, do nothing
65        if *self_rc.update_loop_running.borrow() {
66            return;
67        }
68
69        // set update loop running flag
70        *self_rc.update_loop_running.borrow_mut() = true;
71
72        spawn_with_panic_notify(
73            &self_rc.clone().async_context,
74            update_loop_task(self_rc),
75        );
76    }
77
78    /// Stops the update loop for the Runtime, if it is running.
79    pub async fn stop_update_loop(self_rc: Rc<RuntimeInternal>) {
80        info!("Stopping Runtime update loop for {}", self_rc.endpoint);
81        *self_rc.update_loop_running.borrow_mut() = false;
82
83        let (sender, receiver) = oneshot::channel::<()>();
84
85        self_rc.update_loop_stop_sender.borrow_mut().replace(sender);
86
87        receiver.await.unwrap();
88    }
89
90    /// main update loop
91    async fn update(self_rc: Rc<RuntimeInternal>) {
92        // update the ComHub
93        self_rc.com_hub.update().await;
94        // handle incoming sections
95        RuntimeInternal::handle_incoming_sections(self_rc);
96    }
97
98    /// pops incoming sections from the ComHub and executes them in separate tasks
99    fn handle_incoming_sections(self_rc: Rc<RuntimeInternal>) {
100        let mut sections = self_rc
101            .com_hub
102            .block_handler
103            .incoming_sections_queue
104            .borrow_mut();
105        let async_context = &self_rc.async_context;
106        // get incoming sections from ComHub
107        for section in sections.drain(..) {
108            // execute the section in a separate task
109            let self_rc = self_rc.clone();
110            spawn_with_panic_notify(
111                async_context,
112                handle_incoming_section_task(self_rc, section),
113            );
114        }
115    }
116
117    async fn send_response_block(
118        self_rc: Rc<RuntimeInternal>,
119        result: Result<Option<ValueContainer>, ExecutionError>,
120        receiver_endpoint: Endpoint,
121        context_id: OutgoingContextId,
122    ) -> Result<(), Vec<Endpoint>> {
123        let routing_header: RoutingHeader = RoutingHeader::default()
124            .with_sender(self_rc.endpoint.clone())
125            .to_owned();
126        let block_header = BlockHeader {
127            context_id,
128            flags_and_timestamp: FlagsAndTimestamp::new()
129                .with_block_type(BlockType::Response)
130                .with_is_end_of_section(true)
131                .with_is_end_of_context(true),
132            ..BlockHeader::default()
133        };
134        let encrypted_header = EncryptedHeader::default();
135
136        info!(
137            "send response, context_id: {context_id:?}, receiver: {receiver_endpoint}"
138        );
139
140        if let Ok(value) = result {
141            let dxb = if let Some(value) = &value {
142                compile_value_container(value)
143            } else {
144                vec![]
145            };
146
147            let mut block = DXBBlock::new(
148                routing_header,
149                block_header,
150                encrypted_header,
151                dxb,
152            );
153            block.set_receivers(core::slice::from_ref(&receiver_endpoint));
154
155            self_rc.com_hub.send_own_block(block).await
156        } else {
157            core::todo!("#233 Handle returning error response block");
158        }
159    }
160}