Skip to main content

datex_core/runtime/
runner.rs

1use crate::{
2    channel::mpsc::create_unbounded_channel,
3    collections::HashMap,
4    global::dxb_block::IncomingSection,
5    network::com_hub::ComHub,
6    prelude::*,
7    runtime::{
8        Runtime, RuntimeConfig, RuntimeInternal, VERSION, memory::Memory,
9    },
10    utils::task_manager::TaskManager,
11    values::core_values::endpoint::Endpoint,
12};
13use async_select::select;
14use core::{cell::RefCell, pin::Pin};
15use futures::channel::oneshot;
16use futures_util::{
17    future,
18    future::{Join, join},
19    join,
20};
21use log::info;
22use crate::time::now_ms;
23
24pub struct RuntimeRunner {
25    pub runtime: Runtime,
26    pub task_future: Pin<Box<dyn Future<Output = ()>>>,
27}
28
29impl RuntimeRunner {
30    /// Creates a new runtime instance with the given configuration and global context.
31    /// Note: If the endpoint is not specified in the config, a random endpoint will be generated.
32    pub fn new(config: RuntimeConfig) -> RuntimeRunner {
33        let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
34
35        let (task_manager, runtime_task_future) = TaskManager::create();
36
37        let (incoming_sections_sender, incoming_sections_receiver) =
38            create_unbounded_channel::<IncomingSection>();
39
40        let (com_hub, com_hub_task_future) =
41            ComHub::create(endpoint.clone(), incoming_sections_sender);
42        let memory = RefCell::new(Memory::new(endpoint.clone()));
43
44        let runtime = Runtime {
45            version: VERSION.to_string(),
46            internal: Rc::new(RuntimeInternal {
47                endpoint,
48                memory,
49                config,
50                com_hub,
51                task_manager,
52                incoming_sections_receiver: RefCell::new(
53                    incoming_sections_receiver,
54                ),
55                execution_contexts: RefCell::new(HashMap::new()),
56            }),
57        };
58
59        let runtime_internal = runtime.internal.clone();
60
61        // await all task futures
62        let task_future = async {
63            join!(
64                // com hub task manager
65                com_hub_task_future,
66                // runtime task manager
67                runtime_task_future,
68                // runtime incoming sections handler
69                runtime_internal.handle_incoming_sections_task()
70            );
71        };
72
73        RuntimeRunner {
74            runtime,
75            task_future: Box::pin(task_future),
76        }
77    }
78
79    // Starts the runtime, runs the provided app logic, and returns its result.
80    // The runtime will exit when the app logic completes.
81    pub async fn run<AppReturn, AppFuture>(
82        self,
83        app_logic: impl FnOnce(Runtime) -> AppFuture,
84    ) -> AppReturn
85    where
86        AppFuture: Future<Output = AppReturn>,
87    {
88        let (runtime_future, app_future) =
89            self.create_runtime_and_app_future(app_logic);
90
91        // run until the app logic completes
92        select! {
93            _ = runtime_future => {
94                unreachable!("Runtime task future exited unexpectedly");
95            },
96            exit_value = app_future => {
97                exit_value
98            },
99        }
100    }
101
102    /// Starts the runtime and runs indefinitely, executing the provided app logic.
103    pub async fn run_forever<AppReturn, AppFuture>(
104        self,
105        app_logic: impl FnOnce(Runtime) -> AppFuture,
106    ) -> !
107    where
108        AppFuture: Future<Output = AppReturn>,
109    {
110        let (runtime_future, app_future) =
111            self.create_runtime_and_app_future(app_logic);
112
113        // run indefinitely (runtime future should never exit)
114        future::join(runtime_future, app_future).await;
115        unreachable!("Both runtime and app logic futures exited unexpectedly");
116    }
117
118    /// Creates the runtime future and the app future, ensuring that the app logic starts executing only after the runtime has completed its initialization.
119    fn create_runtime_and_app_future<AppReturn, AppFuture>(
120        self,
121        app_logic: impl FnOnce(Runtime) -> AppFuture,
122    ) -> (
123        Join<impl Future<Output = ()>, impl Future<Output = ()>>,
124        impl Future<Output = AppReturn>,
125    )
126    where
127        AppFuture: Future<Output = AppReturn>,
128    {
129        let (init_ready_sender, init_ready_receiver) = oneshot::channel();
130        let runtime = self.runtime.clone();
131        let runtime_future = join(
132            // initialize the runtime
133            async move {
134                runtime.internal.init_async().await;
135                init_ready_sender.send(()).unwrap();
136            },
137            // run tasks
138            self.task_future,
139        );
140
141        // start the app logic
142        let app_future = async move {
143            // wait for runtime initialization to complete before starting app logic
144            init_ready_receiver.await.unwrap();
145
146            info!(
147                "Runtime initialized - Version {VERSION} Time: {}",
148                now_ms()
149            );
150
151            app_logic(self.runtime).await
152        };
153
154        (runtime_future, app_future)
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[tokio::test]
163    async fn test_runtime_runner() {
164        let runner = RuntimeRunner::new(RuntimeConfig::default());
165        let mut runtime = None;
166        runner
167            .run(async |runtime_inner| {
168                runtime = Some(runtime_inner);
169            })
170            .await;
171
172        assert!(runtime.is_some());
173
174        // check if local loopback interface was fully initialized and is present in the com hub
175        let com_hub = runtime.unwrap().com_hub();
176        let interface_map = com_hub.interfaces_manager().interfaces.borrow();
177        let local_loopback_interface =
178            interface_map.iter().find(|(uuid, interface)| {
179                interface.properties.interface_type == "local"
180            });
181        let (local_loopback_interface_uuid, _) = local_loopback_interface
182            .expect("Local loopback interface not found in com hub");
183
184        // check if socket for the local loopback interface is present in the com hub
185        let socket_map = com_hub.socket_manager().sockets.borrow();
186        let local_loopback_socket = socket_map
187            .values()
188            .find(|socket| {
189                &socket.interface_uuid == local_loopback_interface_uuid
190            })
191            .expect("Local loopback socket not found in com hub");
192    }
193}