workflow_egui/runtime/
runtime.rs1use super::*;
2use crate::imports::*;
3use workflow_wasm::callback::CallbackMap;
4
5pub use payload::Payload;
6use repaint::RepaintService;
7pub use service::{Service, ServiceResult};
8
9pub struct Inner {
10 egui_ctx: egui::Context,
11 events: ApplicationEventsChannel,
12 services: RwLock<AHashMap<String, Arc<dyn Service>>>,
13 repaint_service: Arc<RepaintService>,
14 is_running: Arc<AtomicBool>,
15 start_time: Instant,
16 #[allow(dead_code)]
17 callbacks: CallbackMap,
18}
19
20#[derive(Clone)]
21pub struct Runtime {
22 inner: Arc<Inner>,
23}
24
25impl Runtime {
26 pub fn new(egui_ctx: &egui::Context, events: Option<ApplicationEventsChannel>) -> Self {
27 let events = events.unwrap_or_else(channel::Channel::unbounded);
28
29 let repaint_service = Arc::new(RepaintService::default());
30
31 let runtime = Self {
32 inner: Arc::new(Inner {
33 services: Default::default(),
34 events,
35 repaint_service: repaint_service.clone(),
36 egui_ctx: egui_ctx.clone(),
37 is_running: Arc::new(AtomicBool::new(false)),
38 start_time: Instant::now(),
39 callbacks: Default::default(),
40 }),
41 };
42 register_global(Some(runtime.clone()));
43
44 runtime.bind(repaint_service);
45
46 #[cfg(target_arch = "wasm32")]
47 runtime.register_visibility_handler();
48
49 runtime
50 }
51
52 pub fn bind(&self, service: Arc<dyn Service>) {
53 self.inner
54 .services
55 .write()
56 .unwrap()
57 .insert(service.name().to_string(), service.clone());
58 let runtime = self.clone();
59 spawn(async move { service.spawn(runtime).await });
60 }
61
62 pub fn uptime(&self) -> Duration {
63 self.inner.start_time.elapsed()
64 }
65
66 pub fn start_services(&self) {
67 let services = self.services();
68 for service in services {
69 let runtime = self.clone();
70 spawn(async move { service.spawn(runtime).await });
72 }
73 }
74
75 pub fn services(&self) -> Vec<Arc<dyn Service>> {
76 self.inner
77 .services
78 .read()
79 .unwrap()
80 .values()
81 .cloned()
82 .collect()
83 }
84
85 pub fn stop_services(&self) {
86 self.services()
87 .into_iter()
88 .for_each(|service| service.terminate());
89 }
90
91 pub async fn join_services(&self) {
92 let futures = self
100 .services()
101 .into_iter()
102 .map(|service| service.join())
103 .collect::<Vec<_>>();
104 join_all(futures).await;
105 }
106
107 pub fn drop(&self) {
108 register_global(None);
109 }
110
111 pub fn start(&self) {
113 self.inner.is_running.store(true, Ordering::SeqCst);
114 self.start_services();
115 }
116
117 pub async fn shutdown(&self) {
119 if self.inner.is_running.load(Ordering::SeqCst) {
120 self.inner.is_running.store(false, Ordering::SeqCst);
121 self.stop_services();
122 self.join_services().await;
123 register_global(None);
124 }
125 }
126
127 pub fn repaint_service(&self) -> &Arc<RepaintService> {
128 &self.inner.repaint_service
129 }
130
131 pub fn events(&self) -> &ApplicationEventsChannel {
133 &self.inner.events
134 }
135
136 pub async fn send<T>(&self, msg: T) -> Result<()>
138 where
139 T: Any + Send + Sync + 'static,
140 {
141 self.inner.events.send(RuntimeEvent::new(msg)).await?;
142 Ok(())
143 }
144
145 pub async fn send_runtime_event(&self, msg: RuntimeEvent) -> Result<()> {
146 self.inner.events.send(msg).await?;
147 Ok(())
148 }
149
150 pub fn try_send<T>(&self, msg: T) -> Result<()>
152 where
153 T: Any + Send + Sync + 'static,
154 {
155 self.inner.events.sender.try_send(RuntimeEvent::new(msg))?;
156 Ok(())
157 }
158
159 pub fn try_send_runtime_event(&self, msg: RuntimeEvent) -> Result<()> {
160 self.inner.events.sender.try_send(msg)?;
161 Ok(())
162 }
163
164 pub fn spawn_task<F>(&self, task: F)
165 where
166 F: Future<Output = Result<()>> + Send + 'static,
167 {
168 let sender = self.events().sender.clone();
169 workflow_core::task::spawn(async move {
170 if let Err(err) = task.await {
171 sender
172 .send(RuntimeEvent::Error(err.to_string()))
173 .await
174 .unwrap();
175 }
176 });
177 }
178
179 pub fn spawn_task_with_result<R, F>(
180 &self,
181 payload: &Payload<std::result::Result<R, Error>>,
182 task: F,
183 ) where
184 R: Clone + Send + 'static,
185 F: Future<Output = Result<R>> + Send + 'static,
186 {
187 let payload = (*payload).clone();
188 payload.mark_pending();
189 workflow_core::task::spawn(async move {
190 let result = task.await;
191 match result {
192 Ok(r) => payload.store(Ok(r)),
193 Err(err) => {
194 payload.store(Err(err));
195 }
196 }
197 });
198 }
199
200 pub fn egui_ctx(&self) -> &egui::Context {
201 &self.inner.egui_ctx
202 }
203
204 pub fn request_repaint(&self) {
205 self.repaint_service().trigger();
206 }
207
208 #[cfg(target_arch = "wasm32")]
209 pub fn register_visibility_handler(&self) {
210 use workflow_dom::utils::*;
211 use workflow_wasm::callback::*;
212
213 let sender = self.events().sender.clone();
214 let callback = callback!(move || {
215 let visibility_state = document().visibility_state();
216 sender
217 .try_send(RuntimeEvent::VisibilityState(visibility_state))
218 .unwrap();
219 runtime().egui_ctx().request_repaint();
220 });
221
222 document().set_onvisibilitychange(Some(callback.as_ref()));
223 self.inner.callbacks.retain(callback).unwrap();
224 }
225}
226
227static RUNTIME: Mutex<Option<Runtime>> = Mutex::new(None);
228
229pub fn runtime() -> Runtime {
230 if let Some(runtime) = RUNTIME.lock().unwrap().as_ref() {
231 runtime.clone()
232 } else {
233 panic!("Error: `Runtime` is not initialized")
234 }
235}
236
237pub fn try_runtime() -> Option<Runtime> {
238 RUNTIME.lock().unwrap().clone()
239}
240
241fn register_global(runtime: Option<Runtime>) {
242 match runtime {
243 Some(runtime) => {
244 let mut global = RUNTIME.lock().unwrap();
245 if global.is_some() {
246 panic!("runtime already initialized");
247 }
248 global.replace(runtime);
249 }
250 None => {
251 RUNTIME.lock().unwrap().take();
252 }
253 };
254}
255
256pub fn spawn<F>(task: F)
259where
260 F: Future<Output = Result<()>> + Send + 'static,
261{
262 runtime().spawn_task(task);
263}
264
265pub fn spawn_with_result<R, F>(payload: &Payload<std::result::Result<R, Error>>, task: F)
270where
271 R: Clone + Send + 'static,
272 F: Future<Output = Result<R>> + Send + 'static,
273{
274 runtime().spawn_task_with_result(payload, task);
275}
276
277#[cfg(not(target_arch = "wasm32"))]
281impl Runtime {
282 pub fn halt() {
283 if let Some(runtime) = try_runtime() {
284 runtime.try_send(RuntimeEvent::Exit).ok();
285 let handle = tokio::spawn(async move { runtime.shutdown().await });
288
289 while !handle.is_finished() {
290 std::thread::sleep(std::time::Duration::from_millis(50));
291 }
292 }
293 }
294
295 pub fn abort() {
300 const TIMEOUT: u128 = 5000;
301 let flag = Arc::new(AtomicBool::new(false));
302 let flag_ = flag.clone();
303 let thread = std::thread::Builder::new()
304 .name("halt".to_string())
305 .spawn(move || {
306 let start = std::time::Instant::now();
307 while !flag_.load(Ordering::SeqCst) {
308 if start.elapsed().as_millis() > TIMEOUT {
309 println!("halting...");
310 std::process::exit(1);
311 }
312 std::thread::sleep(std::time::Duration::from_millis(50));
313 }
314 })
315 .ok();
316
317 Self::halt();
318
319 flag.store(true, Ordering::SeqCst);
320 if let Some(thread) = thread {
321 thread.join().unwrap();
322 }
323
324 #[cfg(feature = "console")]
325 {
326 println!("Press Enter to exit...");
327 let mut input = String::new();
328 let _ = std::io::stdin().read_line(&mut input);
329 }
330
331 std::process::exit(1);
332 }
333}