use crate::{signals::Shutdown, task::service::AsyncServiceResult};
use futures_util::future::{select_all, try_join_all};
use kaspa_core::core::Core;
use kaspa_core::service::Service;
use kaspa_core::task::service::AsyncService;
use kaspa_core::trace;
use std::{
sync::{Arc, Mutex},
thread::{self, JoinHandle as ThreadJoinHandle},
};
use tokio::task::JoinHandle as TaskJoinHandle;
pub struct AsyncRuntime {
threads: usize,
services: Mutex<Vec<Arc<dyn AsyncService>>>,
}
impl Default for AsyncRuntime {
fn default() -> Self {
Self::new(std::cmp::max(num_cpus::get() / 3, 2))
}
}
impl AsyncRuntime {
pub const IDENT: &'static str = "async-runtime";
pub fn new(threads: usize) -> Self {
trace!("Creating the async-runtime service");
Self { threads, services: Mutex::new(Vec::new()) }
}
pub fn register<T>(&self, service: Arc<T>)
where
T: AsyncService,
{
trace!("async-runtime registering service {}", service.clone().ident());
self.services.lock().unwrap().push(service);
}
pub fn init(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
trace!("initializing async-runtime service");
vec![thread::Builder::new().name(Self::IDENT.to_string()).spawn(move || self.worker(core)).unwrap()]
}
pub fn worker(self: &Arc<AsyncRuntime>, core: Arc<Core>) {
return tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.threads)
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(async { self.worker_impl(core).await });
}
pub async fn worker_impl(self: &Arc<AsyncRuntime>, core: Arc<Core>) {
let rt_handle = tokio::runtime::Handle::current();
std::thread::spawn(move || loop {
std::thread::sleep(std::time::Duration::from_secs(2));
rt_handle.spawn(std::future::ready(()));
});
trace!("async-runtime worker starting");
let futures = self
.services
.lock()
.unwrap()
.iter()
.map(|x| tokio::spawn(x.clone().start()))
.collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
let (result, idx, remaining_futures) = select_all(futures).await;
trace!("async-runtime worker had service {} returning", self.services.lock().unwrap()[idx].clone().ident());
match result {
Ok(Err(_)) | Err(_) => {
trace!("shutting down core due to async-runtime error");
core.shutdown()
}
_ => {}
}
trace!("async-runtime worker joining remaining {} services", remaining_futures.len());
try_join_all(remaining_futures).await.unwrap();
let futures = self
.services
.lock()
.unwrap()
.iter()
.map(|x| tokio::spawn(x.clone().stop()))
.collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
try_join_all(futures).await.unwrap();
self.services.lock().unwrap().clear();
trace!("async-runtime worker stopped");
}
pub fn signal_exit(self: Arc<AsyncRuntime>) {
trace!("Sending an exit signal to all async-runtime services");
for service in self.services.lock().unwrap().iter() {
service.clone().signal_exit();
}
}
}
impl Service for AsyncRuntime {
fn ident(self: Arc<AsyncRuntime>) -> &'static str {
Self::IDENT
}
fn start(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
self.init(core)
}
fn stop(self: Arc<AsyncRuntime>) {
self.signal_exit()
}
}