pub mod thread_primitives {
use crate::cache::{new_cache, LazyRUMCache};
use crate::core::{RUMResult, RUMVec};
use std::sync::Arc;
use tokio::runtime::Runtime as TokioRuntime;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
pub static mut rt_cache: TokioRtCache = new_cache();
pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.worker_threads(*threads);
builder.enable_all();
match builder.build() {
Ok(handle) => Arc::new(handle),
Err(e) => panic!(
"Unable to initialize threading tokio runtime because {}!",
&e
),
}
}
pub type SafeTokioRuntime = Arc<TokioRuntime>;
pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
pub type TaskItems<T> = RUMVec<T>;
pub type TaskArgs<T> = TaskItems<T>;
pub type TaskResult<R> = RUMResult<TaskItems<R>>;
pub type TaskResults<R> = TaskItems<TaskResult<R>>;
pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
}
pub mod threading_functions {
use num_cpus;
use std::thread::{available_parallelism, sleep as std_sleep};
use std::time::Duration;
use tokio::time::sleep as tokio_sleep;
pub const NANOS_PER_SEC: u64 = 1000000000;
pub const MILLIS_PER_SEC: u64 = 1000;
pub const MICROS_PER_SEC: u64 = 1000000;
pub fn get_default_system_thread_count() -> usize {
let cpus: usize = num_cpus::get();
let parallelism = match available_parallelism() {
Ok(n) => n.get(),
Err(_) => 0,
};
if parallelism >= cpus {
parallelism
} else {
cpus
}
}
pub fn sleep(s: f32) {
let ns = s * NANOS_PER_SEC as f32;
let rounded_ns = ns.round() as u64;
let duration = Duration::from_nanos(rounded_ns);
std_sleep(duration);
}
pub async fn async_sleep(s: f32) {
let ns = s * NANOS_PER_SEC as f32;
let rounded_ns = ns.round() as u64;
let duration = Duration::from_nanos(rounded_ns);
tokio_sleep(duration).await;
}
}
pub mod threading_macros {
use crate::threading::thread_primitives;
use crate::threading::thread_primitives::SafeTaskArgs;
#[macro_export]
macro_rules! rumtk_init_threads {
( ) => {{
use $crate::rumtk_cache_fetch;
use $crate::threading::thread_primitives::{init_cache, rt_cache};
use $crate::threading::threading_functions::get_default_system_thread_count;
let rt = rumtk_cache_fetch!(
&mut rt_cache,
&get_default_system_thread_count(),
init_cache
);
rt
}};
( $threads:expr ) => {{
use $crate::rumtk_cache_fetch;
use $crate::threading::thread_primitives::{init_cache, rt_cache};
let rt = rumtk_cache_fetch!(&mut rt_cache, $threads, init_cache);
rt
}};
}
#[macro_export]
macro_rules! rumtk_spawn_task {
( $func:expr ) => {{
let rt = rumtk_init_threads!();
rt.spawn($func)
}};
( $rt:expr, $func:expr ) => {{
$rt.spawn($func)
}};
}
#[macro_export]
macro_rules! rumtk_wait_on_task {
( $rt:expr, $func:expr ) => {{
$rt.block_on(async move {
$func().await
})
}};
( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
$rt.block_on(async move {
$func($($arg_items),+).await
})
}};
}
#[macro_export]
macro_rules! rumtk_resolve_task {
( $rt:expr, $future:expr ) => {{
use $crate::strings::rumtk_format;
match $rt.block_on(async move { $future.await }) {
Ok(r) => Ok(r),
Err(e) => Err(rumtk_format!("Task failed with {}", e)),
}
}};
}
#[macro_export]
macro_rules! rumtk_create_task {
( $func:expr ) => {{
async move {
let f = $func;
f().await
}
}};
( $func:expr, $args:expr ) => {{
async move {
let f = $func;
f(&$args).await
}
}};
}
#[macro_export]
macro_rules! rumtk_create_task_args {
( ) => {{
use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
use tokio::sync::RwLock;
SafeTaskArgs::new(RwLock::new(vec![]))
}};
( $($args:expr),+ ) => {{
use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
use tokio::sync::RwLock;
SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
}};
}
#[macro_export]
macro_rules! rumtk_exec_task {
($func:expr ) => {{
use tokio::sync::RwLock;
use $crate::{
rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
};
let rt = rumtk_init_threads!();
let task = rumtk_create_task!($func);
rumtk_resolve_task!(&rt, task)
}};
($func:expr, $args:expr ) => {{
use tokio::sync::RwLock;
use $crate::{
rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
};
let rt = rumtk_init_threads!();
let args = SafeTaskArgs::new(RwLock::new($args));
let task = rumtk_create_task!($func, args);
rumtk_resolve_task!(&rt, task)
}};
($func:expr, $args:expr , $threads:expr ) => {{
use tokio::sync::RwLock;
use $crate::{
rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
};
let rt = rumtk_init_threads!(&$threads);
let args = SafeTaskArgs::new(RwLock::new($args));
let task = rumtk_create_task!($func, args);
rumtk_resolve_task!(&rt, task)
}};
}
#[macro_export]
macro_rules! rumtk_sleep {
( $dur:expr) => {{
use $crate::threading::threading_functions::sleep;
sleep($dur as f32)
}};
}
#[macro_export]
macro_rules! rumtk_async_sleep {
( $dur:expr) => {{
use $crate::threading::threading_functions::async_sleep;
async_sleep($dur as f32)
}};
}
}