Skip to main content

datex_core/runtime/
runner.rs

1use crate::{
2    channel::mpsc::create_unbounded_channel,
3    global::dxb_block::IncomingSection,
4    network::com_hub::ComHub,
5    prelude::*,
6    runtime::{
7        Runtime, RuntimeConfig, RuntimeInternal, VERSION, memory::Memory,
8    },
9    time::now_ms,
10    utils::task_manager::TaskManager,
11    values::core_values::endpoint::Endpoint,
12};
13use async_select::select;
14use core::{cell::RefCell, pin::Pin};
15use futures::channel::oneshot;
16use futures_util::{
17    future,
18    future::{Join, join},
19    join,
20};
21use log::info;
22
23pub struct RuntimeRunner {
24    pub runtime: Runtime,
25    pub task_future: Pin<Box<dyn Future<Output = ()>>>,
26}
27
28impl RuntimeRunner {
29    /// Creates a new runtime instance with the given configuration and global context.
30    /// Note: If the endpoint is not specified in the config, a random endpoint will be generated.
31    pub fn new(config: RuntimeConfig) -> RuntimeRunner {
32        let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
33
34        let (task_manager, runtime_task_future) = TaskManager::create();
35
36        let (incoming_sections_sender, incoming_sections_receiver) =
37            create_unbounded_channel::<IncomingSection>();
38
39        let (com_hub, com_hub_task_future) =
40            ComHub::create(endpoint.clone(), incoming_sections_sender);
41        let memory = RefCell::new(Memory::new(endpoint.clone()));
42
43        let runtime = Runtime::new(RuntimeInternal::new(
44            endpoint,
45            memory,
46            config,
47            com_hub,
48            task_manager,
49            incoming_sections_receiver,
50        ));
51
52        let runtime_internal = runtime.internal.clone();
53
54        // await all task futures
55        let task_future = async {
56            join!(
57                // com hub task manager
58                com_hub_task_future,
59                // runtime task manager
60                runtime_task_future,
61                // runtime incoming sections handler
62                runtime_internal.handle_incoming_sections_task()
63            );
64        };
65
66        RuntimeRunner {
67            runtime,
68            task_future: Box::pin(task_future),
69        }
70    }
71
72    // Starts the runtime, runs the provided app logic, and returns its result.
73    // The runtime will exit when the app logic completes.
74    pub async fn run<AppReturn, AppFuture>(
75        self,
76        app_logic: impl FnOnce(Runtime) -> AppFuture,
77    ) -> AppReturn
78    where
79        AppFuture: Future<Output = AppReturn>,
80    {
81        let (runtime_future, app_future) =
82            self.create_runtime_and_app_future(app_logic);
83
84        // run until the app logic completes
85        select! {
86            _ = runtime_future => {
87                unreachable!("Runtime task future exited unexpectedly");
88            },
89            exit_value = app_future => {
90                exit_value
91            },
92        }
93    }
94
95    /// Starts the runtime and runs indefinitely, executing the provided app logic.
96    pub async fn run_forever<AppReturn, AppFuture>(
97        self,
98        app_logic: impl FnOnce(Runtime) -> AppFuture,
99    ) -> !
100    where
101        AppFuture: Future<Output = AppReturn>,
102    {
103        let (runtime_future, app_future) =
104            self.create_runtime_and_app_future(app_logic);
105
106        // run indefinitely (runtime future should never exit)
107        future::join(runtime_future, app_future).await;
108        unreachable!("Both runtime and app logic futures exited unexpectedly");
109    }
110
111    /// Creates the runtime future and the app future, ensuring that the app logic starts executing only after the runtime has completed its initialization.
112    fn create_runtime_and_app_future<AppReturn, AppFuture>(
113        self,
114        app_logic: impl FnOnce(Runtime) -> AppFuture,
115    ) -> (
116        Join<impl Future<Output = ()>, impl Future<Output = ()>>,
117        impl Future<Output = AppReturn>,
118    )
119    where
120        AppFuture: Future<Output = AppReturn>,
121    {
122        let (init_ready_sender, init_ready_receiver) = oneshot::channel();
123        let runtime = self.runtime.clone();
124        let runtime_future = join(
125            // initialize the runtime
126            async move {
127                runtime.internal.init_async().await;
128                init_ready_sender.send(()).unwrap();
129            },
130            // run tasks
131            self.task_future,
132        );
133
134        // start the app logic
135        let app_future = async move {
136            // wait for runtime initialization to complete before starting app logic
137            init_ready_receiver.await.unwrap();
138
139            info!("Runtime initialized - Version {VERSION} Time: {}", now_ms());
140
141            app_logic(self.runtime).await
142        };
143
144        (runtime_future, app_future)
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    #[tokio::test]
153    async fn test_runtime_runner() {
154        let runner = RuntimeRunner::new(RuntimeConfig::default());
155        let mut runtime = None;
156        runner
157            .run(async |runtime_inner| {
158                runtime = Some(runtime_inner);
159            })
160            .await;
161
162        assert!(runtime.is_some());
163
164        // check if local loopback interface was fully initialized and is present in the com hub
165        let com_hub = runtime.unwrap().com_hub();
166        let interface_map = com_hub.interfaces_manager().interfaces.borrow();
167        let local_loopback_interface =
168            interface_map.iter().find(|(uuid, interface)| {
169                interface.properties.interface_type == "local"
170            });
171        let (local_loopback_interface_uuid, _) = local_loopback_interface
172            .expect("Local loopback interface not found in com hub");
173
174        // check if socket for the local loopback interface is present in the com hub
175        let socket_map = com_hub.socket_manager().sockets.borrow();
176        let local_loopback_socket = socket_map
177            .values()
178            .find(|socket| {
179                &socket.interface_uuid == local_loopback_interface_uuid
180            })
181            .expect("Local loopback socket not found in com hub");
182    }
183}