rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::cache::{new_cache, LazyRUMCache};
27    use std::sync::Arc;
28    use tokio::runtime::Runtime as TokioRuntime;
29    /**************************** Globals **************************************/
30    pub static mut RT_CACHE: TokioRtCache = new_cache();
31    /**************************** Helpers ***************************************/
32    pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
33        let mut builder = tokio::runtime::Builder::new_multi_thread();
34        builder.worker_threads(*threads);
35        builder.enable_all();
36        match builder.build() {
37            Ok(handle) => Arc::new(handle),
38            Err(e) => panic!(
39                "Unable to initialize threading tokio runtime because {}!",
40                &e
41            ),
42        }
43    }
44
45    /**************************** Types ***************************************/
46    pub type SafeTokioRuntime = Arc<TokioRuntime>;
47    pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
48}
49
50pub mod threading_manager {
51    use crate::cache::LazyRUMCacheValue;
52    use crate::core::{RUMResult, RUMVec};
53    use crate::strings::rumtk_format;
54    use crate::threading::thread_primitives::SafeTokioRuntime;
55    use crate::threading::threading_functions::async_sleep;
56    use crate::types::{RUMHashMap, RUMID};
57    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
58    use std::future::Future;
59    use std::sync::Arc;
60    use tokio::sync::RwLock;
61    use tokio::task::JoinHandle;
62
63    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64    const DEFAULT_TASK_CAPACITY: usize = 1024;
65
66    pub type TaskItems<T> = RUMVec<T>;
67    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68    pub type TaskArgs<T> = TaskItems<T>;
69    /// Function signature defining the interface of task processing logic.
70    pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
71    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
72    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74    pub type TaskID = RUMID;
75
76    #[derive(Debug, Clone, Default)]
77    pub struct Task<R> {
78        pub id: TaskID,
79        pub finished: bool,
80        pub result: Option<R>,
81    }
82
83    pub type SafeTask<R> = Arc<Task<R>>;
84    type SafeInternalTask<R> = Arc<RwLock<Task<R>>>;
85    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86    pub type TaskBatch = RUMVec<TaskID>;
87    /// Type to use to define how task results are expected to be returned.
88    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
89    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
90    pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
91
92    ///
93    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
94    /// gives the multithreading, asynchronous superpowers to synchronous logic.
95    ///
96    /// ## Example Usage
97    ///
98    /// ```
99    /// use std::sync::{Arc};
100    /// use tokio::sync::RwLock as AsyncRwLock;
101    /// use rumtk_core::core::RUMResult;
102    /// use rumtk_core::strings::RUMString;
103    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
104    /// use rumtk_core::{rumtk_create_task, };
105    ///
106    /// let expected = vec![
107    ///     RUMString::from("Hello"),
108    ///     RUMString::from("World!"),
109    ///     RUMString::from("Overcast"),
110    ///     RUMString::from("and"),
111    ///     RUMString::from("Sad"),
112    ///  ];
113    ///
114    /// type TestResult = RUMResult<Vec<RUMString>>;
115    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
116    ///
117    /// let locked_args = AsyncRwLock::new(expected.clone());
118    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
119    /// let processor = rumtk_create_task!(
120    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
121    ///         let owned_args = Arc::clone(args);
122    ///         let locked_args = owned_args.read().await;
123    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
124    ///
125    ///         for arg in locked_args.iter() {
126    ///             results.push(RUMString::new(arg));
127    ///         }
128    ///
129    ///         Ok(results)
130    ///     },
131    ///     task_args
132    /// );
133    ///
134    /// queue.add_task::<_>(processor);
135    /// let results = queue.wait();
136    ///
137    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
138    /// for r in results {
139    ///     for v in r.unwrap().result.clone().unwrap().iter() {
140    ///         for value in v.iter() {
141    ///             result_data.push(value.clone());
142    ///         }
143    ///     }
144    ///  }
145    ///
146    /// assert_eq!(result_data, expected, "Results do not match expected!");
147    ///
148    /// ```
149    ///
150    #[derive(Debug, Clone, Default)]
151    pub struct TaskManager<R> {
152        tasks: TaskTable<R>,
153        workers: usize,
154    }
155
156    impl<R> TaskManager<R>
157    where
158        R: Sync + Send + Clone + 'static,
159    {
160        ///
161        /// This method creates a [`TaskQueue`] instance using sensible defaults.
162        ///
163        /// The `threads` field is computed from the number of cores present in system.
164        ///
165        pub fn default() -> RUMResult<TaskManager<R>> {
166            Self::new(&threading::threading_functions::get_default_system_thread_count())
167        }
168
169        ///
170        /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
171        /// Expects you to provide the count of threads to spawn and the microtask queue size
172        /// allocated by each thread.
173        ///
174        /// This method calls [`Self::with_capacity()`] for the actual object creation.
175        /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
176        ///
177        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
178            let tasks = TaskTable::<R>::with_capacity(DEFAULT_TASK_CAPACITY);
179            Ok(TaskManager::<R> {
180                tasks,
181                workers: worker_num.to_owned(),
182            })
183        }
184
185        ///
186        /// Add a task to the processing queue. The idea is that you can queue a processor function
187        /// and list of args that will be picked up by one of the threads for processing.
188        ///
189        pub fn add_task<F>(&mut self, task: F) -> TaskID
190        where
191            F: Future<Output = R> + Send + Sync + 'static,
192            F::Output: Send + Sized + 'static,
193        {
194            let id = TaskID::new_v4();
195            let mut safe_task = Arc::new(RwLock::new(Task::<R> {
196                id: id.clone(),
197                finished: false,
198                result: None,
199            }));
200            self.tasks.insert(id.clone(), safe_task.clone());
201
202            let task_wrapper = async move || {
203                // Run the task
204                let result = task.await;
205
206                // Cleanup task
207                safe_task.write().await.result = Some(result);
208                safe_task.write().await.finished = true;
209            };
210
211            let rt = rumtk_init_threads!(&self.workers);
212            rumtk_spawn_task!(rt, task_wrapper());
213
214            id
215        }
216
217        ///
218        /// This method waits until all queued tasks have been processed from the main queue.
219        ///
220        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
221        ///
222        /// Upon completion,
223        ///
224        /// 1. We collect the results generated (if any).
225        /// 2. We reset the main task and result internal queue states.
226        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
227        ///
228        /// This operation consumes all the tasks.
229        ///
230        /// ### Note:
231        /// ```text
232        ///     Results returned here are not guaranteed to be in the same order as the order in which
233        ///     the tasks were queued for work. You will need to pass a type as T that automatically
234        ///     tracks its own id or has a way for you to resort results.
235        /// ```
236        pub fn wait(&mut self) -> TaskResults<R> {
237            let task_batch = self.tasks.keys().cloned().collect::<Vec<_>>();
238            self.wait_on_batch(&task_batch)
239        }
240
241        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
242            let rt = rumtk_init_threads!(&self.workers);
243            let results = rumtk_resolve_task!(rt, self.wait_on_batch_async(&tasks));
244            results
245        }
246
247        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
248            let rt = rumtk_init_threads!(&self.workers);
249            let result = rumtk_resolve_task!(rt, self.wait_on_async(&task_id));
250            result
251        }
252
253        ///
254        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
255        ///
256        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
257        ///
258        /// Upon completion,
259        ///
260        /// 2. Return the result ([TaskResults<R>](TaskResults)).
261        ///
262        /// This operation consumes the task.
263        ///
264        /// ### Note:
265        /// ```text
266        ///     Results returned here are not guaranteed to be in the same order as the order in which
267        ///     the tasks were queued for work. You will need to pass a type as T that automatically
268        ///     tracks its own id or has a way for you to resort results.
269        /// ```
270        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
271            let task = match self.tasks.remove(task_id) {
272                Some(task) => task.clone(),
273                None => return Err(rumtk_format!("No task with id {}", task_id)),
274            };
275
276            while !task.read().await.finished {
277                async_sleep(DEFAULT_SLEEP_DURATION).await;
278            }
279
280            let x = Ok(Arc::new(task.write().await.clone()));
281            x
282        }
283
284        ///
285        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
286        ///
287        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
288        ///
289        /// Upon completion,
290        ///
291        /// 1. We collect the results generated (if any).
292        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
293        ///
294        /// ### Note:
295        /// ```text
296        ///     Results returned here are not guaranteed to be in the same order as the order in which
297        ///     the tasks were queued for work. You will need to pass a type as T that automatically
298        ///     tracks its own id or has a way for you to resort results.
299        /// ```
300        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
301            let mut results = TaskResults::<R>::default();
302            for task in tasks {
303                results.push(self.wait_on_async(task).await);
304            }
305            results
306        }
307
308        ///
309        /// Check if all work has been completed from the task queue.
310        ///
311        /// ## Examples
312        ///
313        /// ### Sync Usage
314        ///
315        ///```
316        /// use rumtk_core::threading::threading_manager::TaskManager;
317        ///
318        /// let manager = TaskManager::<usize>::new(&4).unwrap();
319        ///
320        /// let all_done = manager.is_all_completed();
321        ///
322        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
323        ///
324        /// ```
325        ///
326        pub fn is_all_completed(&self) -> bool {
327            let rt = rumtk_init_threads!(&self.workers);
328            rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
329        }
330
331        pub async fn is_all_completed_async(&self) -> bool {
332            for (_, task) in self.tasks.iter() {
333                if !task.read().await.finished {
334                    return false;
335                }
336            }
337
338            true
339        }
340
341        ///
342        /// Reset task queue and results queue states.
343        ///
344        pub fn reset(&mut self) {
345            self.tasks.clear();
346        }
347
348        ///
349        /// Alias for [wait](TaskManager::wait).
350        ///
351        fn gather(&mut self) -> TaskResults<R> {
352            self.wait()
353        }
354    }
355}
356
357///
358/// This module contains a few helper.
359///
360/// For example, you can find a function for determining number of threads available in system.
361/// The sleep family of functions are also here.
362///
363pub mod threading_functions {
364    use num_cpus;
365    use std::thread::{available_parallelism, sleep as std_sleep};
366    use std::time::Duration;
367    use tokio::time::sleep as tokio_sleep;
368
369    pub const NANOS_PER_SEC: u64 = 1000000000;
370    pub const MILLIS_PER_SEC: u64 = 1000;
371    pub const MICROS_PER_SEC: u64 = 1000000;
372
373    pub fn get_default_system_thread_count() -> usize {
374        let cpus: usize = num_cpus::get();
375        let parallelism = match available_parallelism() {
376            Ok(n) => n.get(),
377            Err(_) => 0,
378        };
379
380        if parallelism >= cpus {
381            parallelism
382        } else {
383            cpus
384        }
385    }
386
387    pub fn sleep(s: f32) {
388        let ns = s * NANOS_PER_SEC as f32;
389        let rounded_ns = ns.round() as u64;
390        let duration = Duration::from_nanos(rounded_ns);
391        std_sleep(duration);
392    }
393
394    pub async fn async_sleep(s: f32) {
395        let ns = s * NANOS_PER_SEC as f32;
396        let rounded_ns = ns.round() as u64;
397        let duration = Duration::from_nanos(rounded_ns);
398        tokio_sleep(duration).await;
399    }
400}
401
402///
403/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
404/// This means that by default, all jobs sent to the thread pool have to be async in nature.
405/// These macros make handling of these jobs at the sync/async boundary more convenient.
406///
407pub mod threading_macros {
408    use crate::threading::thread_primitives;
409    use crate::threading::threading_manager::SafeTaskArgs;
410
411    ///
412    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
413    /// will be saved to the global context so the next call to this macro will simply grab a
414    /// reference to the previously initialized runtime.
415    ///
416    /// Passing nothing will default to initializing a runtime using the default number of threads
417    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
418    ///
419    /// Passing `threads` number will yield a runtime that allocates that many threads.
420    ///
421    ///
422    /// ## Examples
423    ///
424    /// ```
425    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
426    ///     use rumtk_core::core::RUMResult;
427    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
428    ///
429    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
430    ///         let mut result = Vec::<i32>::new();
431    ///         for arg in args.read().await.iter() {
432    ///             result.push(*arg);
433    ///         }
434    ///         Ok(result)
435    ///     }
436    ///
437    ///     let rt = rumtk_init_threads!();                                      // Creates runtime instance
438    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
439    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
440    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
441    /// ```
442    ///
443    /// ```
444    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
445    ///     use rumtk_core::core::RUMResult;
446    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
447    ///
448    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
449    ///         let mut result = Vec::<i32>::new();
450    ///         for arg in args.read().await.iter() {
451    ///             result.push(*arg);
452    ///         }
453    ///         Ok(result)
454    ///     }
455    ///
456    ///     let thread_count: usize = 10;
457    ///     let rt = rumtk_init_threads!(&thread_count);
458    ///     let args = rumtk_create_task_args!(1);
459    ///     let task = rumtk_create_task!(test, args);
460    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
461    /// ```
462    #[macro_export]
463    macro_rules! rumtk_init_threads {
464        ( ) => {{
465            use $crate::rumtk_cache_fetch;
466            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
467            use $crate::threading::threading_functions::get_default_system_thread_count;
468            let rt = rumtk_cache_fetch!(
469                &mut RT_CACHE,
470                &get_default_system_thread_count(),
471                init_cache
472            );
473            rt
474        }};
475        ( $threads:expr ) => {{
476            use $crate::rumtk_cache_fetch;
477            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
478            let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
479            rt
480        }};
481    }
482
483    ///
484    /// Puts task onto the runtime queue.
485    ///
486    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
487    ///
488    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
489    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
490    ///
491    #[macro_export]
492    macro_rules! rumtk_spawn_task {
493        ( $func:expr ) => {{
494            let rt = rumtk_init_threads!();
495            rt.spawn($func)
496        }};
497        ( $rt:expr, $func:expr ) => {{
498            $rt.spawn($func)
499        }};
500    }
501
502    ///
503    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
504    ///
505    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
506    /// async closure without passing any arguments.
507    ///
508    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
509    /// In such a case, we pass those arguments to the call on the async closure and await on results.
510    ///
511    #[macro_export]
512    macro_rules! rumtk_wait_on_task {
513        ( $rt:expr, $func:expr ) => {{
514            $rt.block_on(async move {
515                $func().await
516            })
517        }};
518        ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
519            $rt.block_on(async move {
520                $func($($arg_items),+).await
521            })
522        }};
523    }
524
525    ///
526    /// This macro awaits a future.
527    ///
528    /// The arguments are a reference to the runtime (`rt) and a future.
529    ///
530    /// If there is a result, you will get the result of the future.
531    ///
532    /// ## Examples
533    ///
534    /// ```
535    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
536    ///     use rumtk_core::core::RUMResult;
537    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
538    ///
539    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
540    ///         let mut result = Vec::<i32>::new();
541    ///         for arg in args.read().await.iter() {
542    ///             result.push(*arg);
543    ///         }
544    ///         Ok(result)
545    ///     }
546    ///
547    ///     let rt = rumtk_init_threads!();
548    ///     let args = rumtk_create_task_args!(1);
549    ///     let task = rumtk_create_task!(test, args);
550    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
551    /// ```
552    ///
553    #[macro_export]
554    macro_rules! rumtk_resolve_task {
555        ( $rt:expr, $future:expr ) => {{
556            use $crate::strings::rumtk_format;
557            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
558            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
559            // is technically moved into the async closure and captured so things like mutex guards
560            // technically go out of the outer scope. As a result that expression fails to compile even
561            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
562            // into the async closure. To ensure that happens regardless of given expression, we do
563            // a variable assignment below to force the "future" macro expressions to resolve before
564            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
565            let future = $future;
566            $rt.block_on(async move { future.await })
567        }};
568    }
569
570    ///
571    /// This macro creates an async body that calls the async closure and awaits it.
572    ///
573    /// ## Example
574    ///
575    /// ```
576    /// use std::sync::{Arc, RwLock};
577    /// use tokio::sync::RwLock as AsyncRwLock;
578    /// use rumtk_core::strings::RUMString;
579    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
580    ///
581    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
582    /// let expected = vec![
583    ///     RUMString::from("Hello"),
584    ///     RUMString::from("World!"),
585    ///     RUMString::from("Overcast"),
586    ///     RUMString::from("and"),
587    ///     RUMString::from("Sad"),
588    ///  ];
589    /// let locked_args = AsyncRwLock::new(expected.clone());
590    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
591    ///
592    ///
593    /// ```
594    ///
595    #[macro_export]
596    macro_rules! rumtk_create_task {
597        ( $func:expr ) => {{
598            async move {
599                let f = $func;
600                f().await
601            }
602        }};
603        ( $func:expr, $args:expr ) => {{
604            let f = $func;
605            async move { f(&$args).await }
606        }};
607    }
608
609    ///
610    /// Creates an instance of [SafeTaskArgs] with the arguments passed.
611    ///
612    /// ## Note
613    ///
614    /// All arguments must be of the same type
615    ///
616    #[macro_export]
617    macro_rules! rumtk_create_task_args {
618        ( ) => {{
619            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
620            use tokio::sync::RwLock;
621            SafeTaskArgs::new(RwLock::new(vec![]))
622        }};
623        ( $($args:expr),+ ) => {{
624            use $crate::threading::threading_manager::{SafeTaskArgs};
625            use tokio::sync::RwLock;
626            SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
627        }};
628    }
629
630    ///
631    /// Convenience macro for packaging the task components and launching the task in one line.
632    ///
633    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
634    /// number of threads at the end. This is optional. Meaning, we will default to the system's
635    /// number of threads if that value is not specified.
636    ///
637    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
638    /// variable number of arguments to pass to the task. each argument must be of the same type.
639    /// If you wish to pass different arguments with different types, please define an abstract type
640    /// whose underlying structure is a tuple of items and pass that instead.
641    ///
642    /// ## Examples
643    ///
644    /// ### With Default Thread Count
645    /// ```
646    ///     use rumtk_core::{rumtk_exec_task};
647    ///     use rumtk_core::core::RUMResult;
648    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
649    ///
650    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
651    ///         let mut result = Vec::<i32>::new();
652    ///         for arg in args.read().await.iter() {
653    ///             result.push(*arg);
654    ///         }
655    ///         Ok(result)
656    ///     }
657    ///
658    ///     let result = rumtk_exec_task!(test, vec![5]);
659    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
660    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
661    /// ```
662    ///
663    /// ### With Custom Thread Count
664    /// ```
665    ///     use rumtk_core::{rumtk_exec_task};
666    ///     use rumtk_core::core::RUMResult;
667    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
668    ///
669    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
670    ///         let mut result = Vec::<i32>::new();
671    ///         for arg in args.read().await.iter() {
672    ///             result.push(*arg);
673    ///         }
674    ///         Ok(result)
675    ///     }
676    ///
677    ///     let result = rumtk_exec_task!(test, vec![5], 5);
678    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
679    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
680    /// ```
681    ///
682    /// ### With Async Function Body
683    /// ```
684    ///     use rumtk_core::{rumtk_exec_task};
685    ///     use rumtk_core::core::RUMResult;
686    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
687    ///
688    ///     let result = rumtk_exec_task!(
689    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
690    ///         let mut result = Vec::<i32>::new();
691    ///         for arg in args.read().await.iter() {
692    ///             result.push(*arg);
693    ///         }
694    ///         Ok(result)
695    ///     },
696    ///     vec![5]);
697    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
698    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
699    /// ```
700    ///
701    /// ### With Async Function Body and No Args
702    /// ```
703    ///     use rumtk_core::{rumtk_exec_task};
704    ///     use rumtk_core::core::RUMResult;
705    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
706    ///
707    ///     let result = rumtk_exec_task!(
708    ///     async || -> RUMResult<Vec<i32>> {
709    ///         let mut result = Vec::<i32>::new();
710    ///         Ok(result)
711    ///     });
712    ///     let empty = Vec::<i32>::new();
713    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
714    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
715    /// ```
716    ///
717    /// ## Equivalent To
718    ///
719    /// ```
720    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
721    ///     use rumtk_core::core::RUMResult;
722    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
723    ///
724    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
725    ///         let mut result = Vec::<i32>::new();
726    ///         for arg in args.read().await.iter() {
727    ///             result.push(*arg);
728    ///         }
729    ///         Ok(result)
730    ///     }
731    ///
732    ///     let rt = rumtk_init_threads!();
733    ///     let args = rumtk_create_task_args!(1);
734    ///     let task = rumtk_create_task!(test, args);
735    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
736    /// ```
737    ///
738    #[macro_export]
739    macro_rules! rumtk_exec_task {
740        ($func:expr ) => {{
741            use tokio::sync::RwLock;
742            use $crate::{
743                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
744            };
745            let rt = rumtk_init_threads!();
746            let task = rumtk_create_task!($func);
747            rumtk_resolve_task!(&rt, task)
748        }};
749        ($func:expr, $args:expr ) => {{
750            use tokio::sync::RwLock;
751            use $crate::{
752                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
753            };
754            let rt = rumtk_init_threads!();
755            let args = SafeTaskArgs::new(RwLock::new($args));
756            let task = rumtk_create_task!($func, args);
757            rumtk_resolve_task!(&rt, task)
758        }};
759        ($func:expr, $args:expr , $threads:expr ) => {{
760            use tokio::sync::RwLock;
761            use $crate::{
762                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
763            };
764            let rt = rumtk_init_threads!(&$threads);
765            let args = SafeTaskArgs::new(RwLock::new($args));
766            let task = rumtk_create_task!($func, args);
767            rumtk_resolve_task!(&rt, task)
768        }};
769    }
770
771    ///
772    /// Sleep a duration of time in a sync context, so no await can be call on the result.
773    ///
774    /// You can pass any value that can be cast to f32.
775    ///
776    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
777    ///
778    /// ## Examples
779    ///
780    /// ```
781    ///     use rumtk_core::rumtk_sleep;
782    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
783    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
784    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
785    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
786    /// ```
787    ///
788    #[macro_export]
789    macro_rules! rumtk_sleep {
790        ( $dur:expr) => {{
791            use $crate::threading::threading_functions::sleep;
792            sleep($dur as f32)
793        }};
794    }
795
796    ///
797    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
798    ///
799    /// You can pass any value that can be cast to f32.
800    ///
801    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
802    ///
803    /// ## Examples
804    ///
805    /// ```
806    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
807    ///     use rumtk_core::core::RUMResult;
808    ///     rumtk_exec_task!( async || -> RUMResult<()> {
809    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
810    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
811    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
812    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
813    ///             Ok(())
814    ///         }
815    ///     );
816    /// ```
817    ///
818    #[macro_export]
819    macro_rules! rumtk_async_sleep {
820        ( $dur:expr) => {{
821            use $crate::threading::threading_functions::async_sleep;
822            async_sleep($dur as f32)
823        }};
824    }
825
826    ///
827    ///
828    ///
829    #[macro_export]
830    macro_rules! rumtk_new_task_queue {
831        ( $worker_num:expr ) => {{
832            use $crate::threading::threading_manager::TaskManager;
833            TaskManager::new($worker_num);
834        }};
835    }
836}