kaspa_core/task/
runtime.rs

1use crate::{signals::Shutdown, task::service::AsyncServiceResult};
2use futures_util::future::{select_all, try_join_all};
3use kaspa_core::core::Core;
4use kaspa_core::service::Service;
5use kaspa_core::task::service::AsyncService;
6use kaspa_core::trace;
7use std::{
8    sync::{Arc, Mutex},
9    thread::{self, JoinHandle as ThreadJoinHandle},
10};
11use tokio::task::JoinHandle as TaskJoinHandle;
12
13/// AsyncRuntime registers async services and provides
14/// a tokio Runtime to run them.
15pub struct AsyncRuntime {
16    threads: usize,
17    services: Mutex<Vec<Arc<dyn AsyncService>>>,
18}
19
20impl Default for AsyncRuntime {
21    fn default() -> Self {
22        // TODO
23        Self::new(std::cmp::max(num_cpus::get() / 3, 2))
24    }
25}
26
27impl AsyncRuntime {
28    pub const IDENT: &'static str = "async-runtime";
29
30    pub fn new(threads: usize) -> Self {
31        trace!("Creating the async-runtime service");
32        Self { threads, services: Mutex::new(Vec::new()) }
33    }
34
35    pub fn register<T>(&self, service: Arc<T>)
36    where
37        T: AsyncService,
38    {
39        trace!("async-runtime registering service {}", service.clone().ident());
40        self.services.lock().unwrap().push(service);
41    }
42
43    pub fn find(&self, ident: &'static str) -> Option<Arc<dyn AsyncService>> {
44        self.services.lock().unwrap().iter().find(|s| (*s).clone().ident() == ident).cloned()
45    }
46
47    pub fn init(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
48        trace!("initializing async-runtime service");
49        vec![thread::Builder::new().name(Self::IDENT.to_string()).spawn(move || self.worker(core)).unwrap()]
50    }
51
52    /// Launch a tokio Runtime and run the top-level async objects
53
54    pub fn worker(self: &Arc<AsyncRuntime>, core: Arc<Core>) {
55        return tokio::runtime::Builder::new_multi_thread()
56            .worker_threads(self.threads)
57            .enable_all()
58            .build()
59            .expect("Failed building the Runtime")
60            .block_on(async { self.worker_impl(core).await });
61    }
62
63    pub async fn worker_impl(self: &Arc<AsyncRuntime>, core: Arc<Core>) {
64        let rt_handle = tokio::runtime::Handle::current();
65        std::thread::spawn(move || loop {
66            // See https://github.com/tokio-rs/tokio/issues/4730 and comment therein referring to
67            // https://gist.github.com/Darksonn/330f2aa771f95b5008ddd4864f5eb9e9#file-main-rs-L6
68            // In our case it's hard to avoid some short blocking i/o calls to the DB so we place this
69            // workaround for now to avoid any rare yet possible system freeze.
70            std::thread::sleep(std::time::Duration::from_secs(2));
71            rt_handle.spawn(std::future::ready(()));
72        });
73
74        // Start all async services
75        // All services futures are spawned as tokio tasks to enable parallelism
76        trace!("async-runtime worker starting");
77        let futures = self
78            .services
79            .lock()
80            .unwrap()
81            .iter()
82            .map(|x| tokio::spawn(x.clone().start()))
83            .collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
84
85        // wait for at least one service to return
86        let (result, idx, remaining_futures) = select_all(futures).await;
87        trace!("async-runtime worker had service {} returning", self.services.lock().unwrap()[idx].clone().ident());
88        // if at least one service yields an error, initiate global shutdown
89        // this will cause signal_exit() to be executed externally (by Core invoking `stop()`)
90        match result {
91            Ok(Err(_)) | Err(_) => {
92                trace!("shutting down core due to async-runtime error");
93                core.shutdown()
94            }
95            _ => {}
96        }
97
98        // wait for remaining services to finish
99        trace!("async-runtime worker joining remaining {} services", remaining_futures.len());
100        try_join_all(remaining_futures).await.unwrap();
101
102        // Stop all async services
103        // All services futures are spawned as tokio tasks to enable parallelism
104        let futures = self
105            .services
106            .lock()
107            .unwrap()
108            .iter()
109            .map(|x| tokio::spawn(x.clone().stop()))
110            .collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
111        try_join_all(futures).await.unwrap();
112
113        // Drop all services and cleanup
114        self.services.lock().unwrap().clear();
115
116        trace!("async-runtime worker stopped");
117    }
118
119    pub fn signal_exit(self: Arc<AsyncRuntime>) {
120        trace!("Sending an exit signal to all async-runtime services");
121        for service in self.services.lock().unwrap().iter() {
122            service.clone().signal_exit();
123        }
124    }
125}
126
127impl Service for AsyncRuntime {
128    fn ident(self: Arc<AsyncRuntime>) -> &'static str {
129        Self::IDENT
130    }
131
132    fn start(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
133        self.init(core)
134    }
135
136    fn stop(self: Arc<AsyncRuntime>) {
137        self.signal_exit()
138    }
139}