kaspa_core/task/
runtime.rs1use crate::{signals::Shutdown, task::service::AsyncServiceResult};
2use futures_util::future::{select_all, try_join_all};
3use kaspa_core::core::Core;
4use kaspa_core::service::Service;
5use kaspa_core::task::service::AsyncService;
6use kaspa_core::trace;
7use std::{
8 sync::{Arc, Mutex},
9 thread::{self, JoinHandle as ThreadJoinHandle},
10};
11use tokio::task::JoinHandle as TaskJoinHandle;
12
13pub struct AsyncRuntime {
16 threads: usize,
17 services: Mutex<Vec<Arc<dyn AsyncService>>>,
18}
19
20impl Default for AsyncRuntime {
21 fn default() -> Self {
22 Self::new(std::cmp::max(num_cpus::get() / 3, 2))
24 }
25}
26
27impl AsyncRuntime {
28 pub const IDENT: &'static str = "async-runtime";
29
30 pub fn new(threads: usize) -> Self {
31 trace!("Creating the async-runtime service");
32 Self { threads, services: Mutex::new(Vec::new()) }
33 }
34
35 pub fn register<T>(&self, service: Arc<T>)
36 where
37 T: AsyncService,
38 {
39 trace!("async-runtime registering service {}", service.clone().ident());
40 self.services.lock().unwrap().push(service);
41 }
42
43 pub fn find(&self, ident: &'static str) -> Option<Arc<dyn AsyncService>> {
44 self.services.lock().unwrap().iter().find(|s| (*s).clone().ident() == ident).cloned()
45 }
46
47 pub fn init(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
48 trace!("initializing async-runtime service");
49 vec![thread::Builder::new().name(Self::IDENT.to_string()).spawn(move || self.worker(core)).unwrap()]
50 }
51
52 pub fn worker(self: &Arc<AsyncRuntime>, core: Arc<Core>) {
55 return tokio::runtime::Builder::new_multi_thread()
56 .worker_threads(self.threads)
57 .enable_all()
58 .build()
59 .expect("Failed building the Runtime")
60 .block_on(async { self.worker_impl(core).await });
61 }
62
63 pub async fn worker_impl(self: &Arc<AsyncRuntime>, core: Arc<Core>) {
64 let rt_handle = tokio::runtime::Handle::current();
65 std::thread::spawn(move || loop {
66 std::thread::sleep(std::time::Duration::from_secs(2));
71 rt_handle.spawn(std::future::ready(()));
72 });
73
74 trace!("async-runtime worker starting");
77 let futures = self
78 .services
79 .lock()
80 .unwrap()
81 .iter()
82 .map(|x| tokio::spawn(x.clone().start()))
83 .collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
84
85 let (result, idx, remaining_futures) = select_all(futures).await;
87 trace!("async-runtime worker had service {} returning", self.services.lock().unwrap()[idx].clone().ident());
88 match result {
91 Ok(Err(_)) | Err(_) => {
92 trace!("shutting down core due to async-runtime error");
93 core.shutdown()
94 }
95 _ => {}
96 }
97
98 trace!("async-runtime worker joining remaining {} services", remaining_futures.len());
100 try_join_all(remaining_futures).await.unwrap();
101
102 let futures = self
105 .services
106 .lock()
107 .unwrap()
108 .iter()
109 .map(|x| tokio::spawn(x.clone().stop()))
110 .collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
111 try_join_all(futures).await.unwrap();
112
113 self.services.lock().unwrap().clear();
115
116 trace!("async-runtime worker stopped");
117 }
118
119 pub fn signal_exit(self: Arc<AsyncRuntime>) {
120 trace!("Sending an exit signal to all async-runtime services");
121 for service in self.services.lock().unwrap().iter() {
122 service.clone().signal_exit();
123 }
124 }
125}
126
127impl Service for AsyncRuntime {
128 fn ident(self: Arc<AsyncRuntime>) -> &'static str {
129 Self::IDENT
130 }
131
132 fn start(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
133 self.init(core)
134 }
135
136 fn stop(self: Arc<AsyncRuntime>) {
137 self.signal_exit()
138 }
139}