workflow_egui/runtime/
runtime.rs

1use super::*;
2use crate::imports::*;
3use workflow_wasm::callback::CallbackMap;
4
5pub use payload::Payload;
6use repaint::RepaintService;
7pub use service::{Service, ServiceResult};
8
9pub struct Inner {
10    egui_ctx: egui::Context,
11    events: ApplicationEventsChannel,
12    services: RwLock<AHashMap<String, Arc<dyn Service>>>,
13    repaint_service: Arc<RepaintService>,
14    is_running: Arc<AtomicBool>,
15    start_time: Instant,
16    #[allow(dead_code)]
17    callbacks: CallbackMap,
18}
19
20#[derive(Clone)]
21pub struct Runtime {
22    inner: Arc<Inner>,
23}
24
25impl Runtime {
26    pub fn new(egui_ctx: &egui::Context, events: Option<ApplicationEventsChannel>) -> Self {
27        let events = events.unwrap_or_else(channel::Channel::unbounded);
28
29        let repaint_service = Arc::new(RepaintService::default());
30
31        let runtime = Self {
32            inner: Arc::new(Inner {
33                services: Default::default(),
34                events,
35                repaint_service: repaint_service.clone(),
36                egui_ctx: egui_ctx.clone(),
37                is_running: Arc::new(AtomicBool::new(false)),
38                start_time: Instant::now(),
39                callbacks: Default::default(),
40            }),
41        };
42        register_global(Some(runtime.clone()));
43
44        runtime.bind(repaint_service);
45
46        #[cfg(target_arch = "wasm32")]
47        runtime.register_visibility_handler();
48
49        runtime
50    }
51
52    pub fn bind(&self, service: Arc<dyn Service>) {
53        self.inner
54            .services
55            .write()
56            .unwrap()
57            .insert(service.name().to_string(), service.clone());
58        let runtime = self.clone();
59        spawn(async move { service.spawn(runtime).await });
60    }
61
62    pub fn uptime(&self) -> Duration {
63        self.inner.start_time.elapsed()
64    }
65
66    pub fn start_services(&self) {
67        let services = self.services();
68        for service in services {
69            let runtime = self.clone();
70            // service.spawn().await?;
71            spawn(async move { service.spawn(runtime).await });
72        }
73    }
74
75    pub fn services(&self) -> Vec<Arc<dyn Service>> {
76        self.inner
77            .services
78            .read()
79            .unwrap()
80            .values()
81            .cloned()
82            .collect()
83    }
84
85    pub fn stop_services(&self) {
86        self.services()
87            .into_iter()
88            .for_each(|service| service.terminate());
89    }
90
91    pub async fn join_services(&self) {
92        // for service in self.services() {
93        //  let name = service.name();
94        //  println!("⚡ {name}");
95        //  service.join().await.expect("service join failure");
96        //  println!("💀 {name}");
97        // }
98
99        let futures = self
100            .services()
101            .into_iter()
102            .map(|service| service.join())
103            .collect::<Vec<_>>();
104        join_all(futures).await;
105    }
106
107    pub fn drop(&self) {
108        register_global(None);
109    }
110
111    // / Start the runtime runtime.
112    pub fn start(&self) {
113        self.inner.is_running.store(true, Ordering::SeqCst);
114        self.start_services();
115    }
116
117    /// Shutdown runtime runtime.
118    pub async fn shutdown(&self) {
119        if self.inner.is_running.load(Ordering::SeqCst) {
120            self.inner.is_running.store(false, Ordering::SeqCst);
121            self.stop_services();
122            self.join_services().await;
123            register_global(None);
124        }
125    }
126
127    pub fn repaint_service(&self) -> &Arc<RepaintService> {
128        &self.inner.repaint_service
129    }
130
131    /// Returns the reference to the application events channel.
132    pub fn events(&self) -> &ApplicationEventsChannel {
133        &self.inner.events
134    }
135
136    /// Send an application even to the UI asynchronously.
137    pub async fn send<T>(&self, msg: T) -> Result<()>
138    where
139        T: Any + Send + Sync + 'static,
140    {
141        self.inner.events.send(RuntimeEvent::new(msg)).await?;
142        Ok(())
143    }
144
145    pub async fn send_runtime_event(&self, msg: RuntimeEvent) -> Result<()> {
146        self.inner.events.send(msg).await?;
147        Ok(())
148    }
149
150    /// Send an application event to the UI synchronously.
151    pub fn try_send<T>(&self, msg: T) -> Result<()>
152    where
153        T: Any + Send + Sync + 'static,
154    {
155        self.inner.events.sender.try_send(RuntimeEvent::new(msg))?;
156        Ok(())
157    }
158
159    pub fn try_send_runtime_event(&self, msg: RuntimeEvent) -> Result<()> {
160        self.inner.events.sender.try_send(msg)?;
161        Ok(())
162    }
163
164    pub fn spawn_task<F>(&self, task: F)
165    where
166        F: Future<Output = Result<()>> + Send + 'static,
167    {
168        let sender = self.events().sender.clone();
169        workflow_core::task::spawn(async move {
170            if let Err(err) = task.await {
171                sender
172                    .send(RuntimeEvent::Error(err.to_string()))
173                    .await
174                    .unwrap();
175            }
176        });
177    }
178
179    pub fn spawn_task_with_result<R, F>(
180        &self,
181        payload: &Payload<std::result::Result<R, Error>>,
182        task: F,
183    ) where
184        R: Clone + Send + 'static,
185        F: Future<Output = Result<R>> + Send + 'static,
186    {
187        let payload = (*payload).clone();
188        payload.mark_pending();
189        workflow_core::task::spawn(async move {
190            let result = task.await;
191            match result {
192                Ok(r) => payload.store(Ok(r)),
193                Err(err) => {
194                    payload.store(Err(err));
195                }
196            }
197        });
198    }
199
200    pub fn egui_ctx(&self) -> &egui::Context {
201        &self.inner.egui_ctx
202    }
203
204    pub fn request_repaint(&self) {
205        self.repaint_service().trigger();
206    }
207
208    #[cfg(target_arch = "wasm32")]
209    pub fn register_visibility_handler(&self) {
210        use workflow_dom::utils::*;
211        use workflow_wasm::callback::*;
212
213        let sender = self.events().sender.clone();
214        let callback = callback!(move || {
215            let visibility_state = document().visibility_state();
216            sender
217                .try_send(RuntimeEvent::VisibilityState(visibility_state))
218                .unwrap();
219            runtime().egui_ctx().request_repaint();
220        });
221
222        document().set_onvisibilitychange(Some(callback.as_ref()));
223        self.inner.callbacks.retain(callback).unwrap();
224    }
225}
226
227static RUNTIME: Mutex<Option<Runtime>> = Mutex::new(None);
228
229pub fn runtime() -> Runtime {
230    if let Some(runtime) = RUNTIME.lock().unwrap().as_ref() {
231        runtime.clone()
232    } else {
233        panic!("Error: `Runtime` is not initialized")
234    }
235}
236
237pub fn try_runtime() -> Option<Runtime> {
238    RUNTIME.lock().unwrap().clone()
239}
240
241fn register_global(runtime: Option<Runtime>) {
242    match runtime {
243        Some(runtime) => {
244            let mut global = RUNTIME.lock().unwrap();
245            if global.is_some() {
246                panic!("runtime already initialized");
247            }
248            global.replace(runtime);
249        }
250        None => {
251            RUNTIME.lock().unwrap().take();
252        }
253    };
254}
255
256/// Spawn an async task that will result in
257/// egui redraw upon its completion.
258pub fn spawn<F>(task: F)
259where
260    F: Future<Output = Result<()>> + Send + 'static,
261{
262    runtime().spawn_task(task);
263}
264
265/// Spawn an async task that will result in
266/// egui redraw upon its completion. Upon
267/// the task completion, the supplied [`Payload`]
268/// will be populated with the task result.
269pub fn spawn_with_result<R, F>(payload: &Payload<std::result::Result<R, Error>>, task: F)
270where
271    R: Clone + Send + 'static,
272    F: Future<Output = Result<R>> + Send + 'static,
273{
274    runtime().spawn_task_with_result(payload, task);
275}
276
277/// Gracefully halt the runtime runtime. This is used
278/// to shutdown kaspad when the kaspa-ng process exit
279/// is an inevitable eventuality.
280#[cfg(not(target_arch = "wasm32"))]
281impl Runtime {
282    pub fn halt() {
283        if let Some(runtime) = try_runtime() {
284            runtime.try_send(RuntimeEvent::Exit).ok();
285            // runtime.kaspa_service().clone().terminate();
286
287            let handle = tokio::spawn(async move { runtime.shutdown().await });
288
289            while !handle.is_finished() {
290                std::thread::sleep(std::time::Duration::from_millis(50));
291            }
292        }
293    }
294
295    /// Attempt to halt the runtime runtime but exit the process
296    /// if it takes too long. This is used in attempt to shutdown
297    /// kaspad if the kaspa-ng process panics, which can result
298    /// in a still functioning zombie child process on unix systems.
299    pub fn abort() {
300        const TIMEOUT: u128 = 5000;
301        let flag = Arc::new(AtomicBool::new(false));
302        let flag_ = flag.clone();
303        let thread = std::thread::Builder::new()
304            .name("halt".to_string())
305            .spawn(move || {
306                let start = std::time::Instant::now();
307                while !flag_.load(Ordering::SeqCst) {
308                    if start.elapsed().as_millis() > TIMEOUT {
309                        println!("halting...");
310                        std::process::exit(1);
311                    }
312                    std::thread::sleep(std::time::Duration::from_millis(50));
313                }
314            })
315            .ok();
316
317        Self::halt();
318
319        flag.store(true, Ordering::SeqCst);
320        if let Some(thread) = thread {
321            thread.join().unwrap();
322        }
323
324        #[cfg(feature = "console")]
325        {
326            println!("Press Enter to exit...");
327            let mut input = String::new();
328            let _ = std::io::stdin().read_line(&mut input);
329        }
330
331        std::process::exit(1);
332    }
333}