datex_core/runtime/
runner.rs1use crate::{
2 channel::mpsc::create_unbounded_channel,
3 collections::HashMap,
4 global::dxb_block::IncomingSection,
5 network::com_hub::ComHub,
6 prelude::*,
7 runtime::{
8 Runtime, RuntimeConfig, RuntimeInternal, VERSION, memory::Memory,
9 },
10 utils::task_manager::TaskManager,
11 values::core_values::endpoint::Endpoint,
12};
13use async_select::select;
14use core::{cell::RefCell, pin::Pin};
15use futures::channel::oneshot;
16use futures_util::{
17 future,
18 future::{Join, join},
19 join,
20};
21use log::info;
22use crate::time::now_ms;
23
24pub struct RuntimeRunner {
25 pub runtime: Runtime,
26 pub task_future: Pin<Box<dyn Future<Output = ()>>>,
27}
28
29impl RuntimeRunner {
30 pub fn new(config: RuntimeConfig) -> RuntimeRunner {
33 let endpoint = config.endpoint.clone().unwrap_or_else(Endpoint::random);
34
35 let (task_manager, runtime_task_future) = TaskManager::create();
36
37 let (incoming_sections_sender, incoming_sections_receiver) =
38 create_unbounded_channel::<IncomingSection>();
39
40 let (com_hub, com_hub_task_future) =
41 ComHub::create(endpoint.clone(), incoming_sections_sender);
42 let memory = RefCell::new(Memory::new(endpoint.clone()));
43
44 let runtime = Runtime {
45 version: VERSION.to_string(),
46 internal: Rc::new(RuntimeInternal {
47 endpoint,
48 memory,
49 config,
50 com_hub,
51 task_manager,
52 incoming_sections_receiver: RefCell::new(
53 incoming_sections_receiver,
54 ),
55 execution_contexts: RefCell::new(HashMap::new()),
56 }),
57 };
58
59 let runtime_internal = runtime.internal.clone();
60
61 let task_future = async {
63 join!(
64 com_hub_task_future,
66 runtime_task_future,
68 runtime_internal.handle_incoming_sections_task()
70 );
71 };
72
73 RuntimeRunner {
74 runtime,
75 task_future: Box::pin(task_future),
76 }
77 }
78
79 pub async fn run<AppReturn, AppFuture>(
82 self,
83 app_logic: impl FnOnce(Runtime) -> AppFuture,
84 ) -> AppReturn
85 where
86 AppFuture: Future<Output = AppReturn>,
87 {
88 let (runtime_future, app_future) =
89 self.create_runtime_and_app_future(app_logic);
90
91 select! {
93 _ = runtime_future => {
94 unreachable!("Runtime task future exited unexpectedly");
95 },
96 exit_value = app_future => {
97 exit_value
98 },
99 }
100 }
101
102 pub async fn run_forever<AppReturn, AppFuture>(
104 self,
105 app_logic: impl FnOnce(Runtime) -> AppFuture,
106 ) -> !
107 where
108 AppFuture: Future<Output = AppReturn>,
109 {
110 let (runtime_future, app_future) =
111 self.create_runtime_and_app_future(app_logic);
112
113 future::join(runtime_future, app_future).await;
115 unreachable!("Both runtime and app logic futures exited unexpectedly");
116 }
117
118 fn create_runtime_and_app_future<AppReturn, AppFuture>(
120 self,
121 app_logic: impl FnOnce(Runtime) -> AppFuture,
122 ) -> (
123 Join<impl Future<Output = ()>, impl Future<Output = ()>>,
124 impl Future<Output = AppReturn>,
125 )
126 where
127 AppFuture: Future<Output = AppReturn>,
128 {
129 let (init_ready_sender, init_ready_receiver) = oneshot::channel();
130 let runtime = self.runtime.clone();
131 let runtime_future = join(
132 async move {
134 runtime.internal.init_async().await;
135 init_ready_sender.send(()).unwrap();
136 },
137 self.task_future,
139 );
140
141 let app_future = async move {
143 init_ready_receiver.await.unwrap();
145
146 info!(
147 "Runtime initialized - Version {VERSION} Time: {}",
148 now_ms()
149 );
150
151 app_logic(self.runtime).await
152 };
153
154 (runtime_future, app_future)
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[tokio::test]
163 async fn test_runtime_runner() {
164 let runner = RuntimeRunner::new(RuntimeConfig::default());
165 let mut runtime = None;
166 runner
167 .run(async |runtime_inner| {
168 runtime = Some(runtime_inner);
169 })
170 .await;
171
172 assert!(runtime.is_some());
173
174 let com_hub = runtime.unwrap().com_hub();
176 let interface_map = com_hub.interfaces_manager().interfaces.borrow();
177 let local_loopback_interface =
178 interface_map.iter().find(|(uuid, interface)| {
179 interface.properties.interface_type == "local"
180 });
181 let (local_loopback_interface_uuid, _) = local_loopback_interface
182 .expect("Local loopback interface not found in com hub");
183
184 let socket_map = com_hub.socket_manager().sockets.borrow();
186 let local_loopback_socket = socket_map
187 .values()
188 .find(|socket| {
189 &socket.interface_uuid == local_loopback_interface_uuid
190 })
191 .expect("Local loopback socket not found in com hub");
192 }
193}