datex_core/runtime/
runner.rs1use crate::{
2 channel::mpsc::create_unbounded_channel,
3 global::dxb_block::IncomingSection,
4 network::com_hub::ComHub,
5 prelude::*,
6 runtime::{
7 Runtime, RuntimeConfig, RuntimeInternal, VERSION, memory::Memory,
8 },
9 time::now_ms,
10 utils::task_manager::TaskManager,
11 values::core_values::endpoint::Endpoint,
12};
13use async_select::select;
14use core::{cell::RefCell, pin::Pin};
15use futures::channel::oneshot;
16use futures_util::{
17 future,
18 future::{Join, join},
19 join,
20};
21use log::info;
22
23pub struct RuntimeRunner {
24 pub runtime: Runtime,
25 pub task_future: Pin<Box<dyn Future<Output = ()>>>,
26}
27
28impl RuntimeRunner {
29 pub fn new(config: RuntimeConfig) -> RuntimeRunner {
32 let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
33
34 let (task_manager, runtime_task_future) = TaskManager::create();
35
36 let (incoming_sections_sender, incoming_sections_receiver) =
37 create_unbounded_channel::<IncomingSection>();
38
39 let (com_hub, com_hub_task_future) =
40 ComHub::create(endpoint.clone(), incoming_sections_sender);
41 let memory = RefCell::new(Memory::new(endpoint.clone()));
42
43 let runtime = Runtime::new(RuntimeInternal::new(
44 endpoint,
45 memory,
46 config,
47 com_hub,
48 task_manager,
49 incoming_sections_receiver,
50 ));
51
52 let runtime_internal = runtime.internal.clone();
53
54 let task_future = async {
56 join!(
57 com_hub_task_future,
59 runtime_task_future,
61 runtime_internal.handle_incoming_sections_task()
63 );
64 };
65
66 RuntimeRunner {
67 runtime,
68 task_future: Box::pin(task_future),
69 }
70 }
71
72 pub async fn run<AppReturn, AppFuture>(
75 self,
76 app_logic: impl FnOnce(Runtime) -> AppFuture,
77 ) -> AppReturn
78 where
79 AppFuture: Future<Output = AppReturn>,
80 {
81 let (runtime_future, app_future) =
82 self.create_runtime_and_app_future(app_logic);
83
84 select! {
86 _ = runtime_future => {
87 unreachable!("Runtime task future exited unexpectedly");
88 },
89 exit_value = app_future => {
90 exit_value
91 },
92 }
93 }
94
95 pub async fn run_forever<AppReturn, AppFuture>(
97 self,
98 app_logic: impl FnOnce(Runtime) -> AppFuture,
99 ) -> !
100 where
101 AppFuture: Future<Output = AppReturn>,
102 {
103 let (runtime_future, app_future) =
104 self.create_runtime_and_app_future(app_logic);
105
106 future::join(runtime_future, app_future).await;
108 unreachable!("Both runtime and app logic futures exited unexpectedly");
109 }
110
111 fn create_runtime_and_app_future<AppReturn, AppFuture>(
113 self,
114 app_logic: impl FnOnce(Runtime) -> AppFuture,
115 ) -> (
116 Join<impl Future<Output = ()>, impl Future<Output = ()>>,
117 impl Future<Output = AppReturn>,
118 )
119 where
120 AppFuture: Future<Output = AppReturn>,
121 {
122 let (init_ready_sender, init_ready_receiver) = oneshot::channel();
123 let runtime = self.runtime.clone();
124 let runtime_future = join(
125 async move {
127 runtime.internal.init_async().await;
128 init_ready_sender.send(()).unwrap();
129 },
130 self.task_future,
132 );
133
134 let app_future = async move {
136 init_ready_receiver.await.unwrap();
138
139 info!("Runtime initialized - Version {VERSION} Time: {}", now_ms());
140
141 app_logic(self.runtime).await
142 };
143
144 (runtime_future, app_future)
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151
152 #[tokio::test]
153 async fn test_runtime_runner() {
154 let runner = RuntimeRunner::new(RuntimeConfig::default());
155 let mut runtime = None;
156 runner
157 .run(async |runtime_inner| {
158 runtime = Some(runtime_inner);
159 })
160 .await;
161
162 assert!(runtime.is_some());
163
164 let com_hub = runtime.unwrap().com_hub();
166 let interface_map = com_hub.interfaces_manager().interfaces.borrow();
167 let local_loopback_interface =
168 interface_map.iter().find(|(uuid, interface)| {
169 interface.properties.interface_type == "local"
170 });
171 let (local_loopback_interface_uuid, _) = local_loopback_interface
172 .expect("Local loopback interface not found in com hub");
173
174 let socket_map = com_hub.socket_manager().sockets.borrow();
176 let local_loopback_socket = socket_map
177 .values()
178 .find(|socket| {
179 &socket.interface_uuid == local_loopback_interface_uuid
180 })
181 .expect("Local loopback socket not found in com hub");
182 }
183}