rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::cache::{new_cache, LazyRUMCache};
27    use std::sync::Arc;
28    use tokio::runtime::Runtime as TokioRuntime;
29    /**************************** Globals **************************************/
30    pub static mut RT_CACHE: TokioRtCache = new_cache();
31    /**************************** Helpers ***************************************/
32    pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
33        let mut builder = tokio::runtime::Builder::new_multi_thread();
34        builder.worker_threads(*threads);
35        builder.enable_all();
36        match builder.build() {
37            Ok(handle) => Arc::new(handle),
38            Err(e) => panic!(
39                "Unable to initialize threading tokio runtime because {}!",
40                &e
41            ),
42        }
43    }
44
45    /**************************** Types ***************************************/
46    pub type SafeTokioRuntime = Arc<TokioRuntime>;
47    pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
48}
49
50pub mod threading_manager {
51    use crate::cache::LazyRUMCacheValue;
52    use crate::core::{RUMResult, RUMVec};
53    use crate::strings::rumtk_format;
54    use crate::threading::thread_primitives::SafeTokioRuntime;
55    use crate::threading::threading_functions::async_sleep;
56    use crate::types::{RUMHashMap, RUMID};
57    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
58    use std::future::Future;
59    use std::sync::Arc;
60    use tokio::sync::RwLock;
61    use tokio::task::JoinHandle;
62
63    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64    const DEFAULT_TASK_CAPACITY: usize = 1024;
65
66    pub type TaskItems<T> = RUMVec<T>;
67    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68    pub type TaskArgs<T> = TaskItems<T>;
69    /// Function signature defining the interface of task processing logic.
70    pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
71    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
72    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74    pub type TaskID = RUMID;
75
76    #[derive(Debug, Clone, Default)]
77    pub struct Task<R> {
78        pub id: TaskID,
79        pub finished: bool,
80        pub result: Option<R>,
81    }
82
83    pub type SafeTask<R> = Arc<Task<R>>;
84    type SafeInternalTask<R> = Arc<RwLock<Task<R>>>;
85    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86    pub type SafeAsyncTaskTable<R> = Arc<RwLock<TaskTable<R>>>;
87    pub type TaskBatch = RUMVec<TaskID>;
88    /// Type to use to define how task results are expected to be returned.
89    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
90    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
91    pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
92
93    ///
94    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
95    /// gives the multithreading, asynchronous superpowers to synchronous logic.
96    ///
97    /// ## Example Usage
98    ///
99    /// ```
100    /// use std::sync::{Arc};
101    /// use tokio::sync::RwLock as AsyncRwLock;
102    /// use rumtk_core::core::RUMResult;
103    /// use rumtk_core::strings::RUMString;
104    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
105    /// use rumtk_core::{rumtk_create_task, };
106    ///
107    /// let expected = vec![
108    ///     RUMString::from("Hello"),
109    ///     RUMString::from("World!"),
110    ///     RUMString::from("Overcast"),
111    ///     RUMString::from("and"),
112    ///     RUMString::from("Sad"),
113    ///  ];
114    ///
115    /// type TestResult = RUMResult<Vec<RUMString>>;
116    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
117    ///
118    /// let locked_args = AsyncRwLock::new(expected.clone());
119    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
120    /// let processor = rumtk_create_task!(
121    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
122    ///         let owned_args = Arc::clone(args);
123    ///         let locked_args = owned_args.read().await;
124    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
125    ///
126    ///         for arg in locked_args.iter() {
127    ///             results.push(RUMString::new(arg));
128    ///         }
129    ///
130    ///         Ok(results)
131    ///     },
132    ///     task_args
133    /// );
134    ///
135    /// queue.add_task::<_>(processor);
136    /// let results = queue.wait();
137    ///
138    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
139    /// for r in results {
140    ///     for v in r.unwrap().result.clone().unwrap().iter() {
141    ///         for value in v.iter() {
142    ///             result_data.push(value.clone());
143    ///         }
144    ///     }
145    ///  }
146    ///
147    /// assert_eq!(result_data, expected, "Results do not match expected!");
148    ///
149    /// ```
150    ///
151    #[derive(Debug, Clone, Default)]
152    pub struct TaskManager<R> {
153        tasks: SafeAsyncTaskTable<R>,
154        workers: usize,
155    }
156
157    impl<R> TaskManager<R>
158    where
159        R: Sync + Send + Clone + 'static,
160    {
161        ///
162        /// This method creates a [`TaskQueue`] instance using sensible defaults.
163        ///
164        /// The `threads` field is computed from the number of cores present in system.
165        ///
166        pub fn default() -> RUMResult<TaskManager<R>> {
167            Self::new(&threading::threading_functions::get_default_system_thread_count())
168        }
169
170        ///
171        /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
172        /// Expects you to provide the count of threads to spawn and the microtask queue size
173        /// allocated by each thread.
174        ///
175        /// This method calls [`Self::with_capacity()`] for the actual object creation.
176        /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
177        ///
178        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
179            let tasks = SafeAsyncTaskTable::<R>::new(RwLock::new(TaskTable::with_capacity(
180                DEFAULT_TASK_CAPACITY,
181            )));
182            Ok(TaskManager::<R> {
183                tasks,
184                workers: worker_num.to_owned(),
185            })
186        }
187
188        ///
189        /// Add a task to the processing queue. The idea is that you can queue a processor function
190        /// and list of args that will be picked up by one of the threads for processing.
191        ///
192        /// This is the async counterpart
193        ///
194        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
195        where
196            F: Future<Output = R> + Send + Sync + 'static,
197            F::Output: Send + Sized + 'static,
198        {
199            let id = TaskID::new_v4();
200            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
201        }
202
203        ///
204        /// See [add_task](Self::add_task)
205        ///
206        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
207        /// the tokio runtim if trying to add task to queue from a normal function called from an
208        /// async environment.
209        ///
210        /// ## Example
211        ///
212        /// ```
213        /// use rumtk_core::threading::threading_manager::{TaskManager};
214        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
215        /// use std::sync::{Arc};
216        /// use tokio::sync::Mutex;
217        ///
218        /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
219        ///
220        /// async fn called_fn() -> usize {
221        ///     5
222        /// }
223        ///
224        /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
225        ///     manager.spawn_task(called_fn());
226        ///     1
227        /// }
228        ///
229        /// async fn call_sync_fn(mut manager: JobManager) -> usize {
230        ///     let mut owned = manager.lock().await;
231        ///     push_job(&mut owned)
232        /// }
233        ///
234        /// let workers = 5;
235        /// let rt = rumtk_init_threads!(&workers);
236        /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
237        ///
238        /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
239        ///
240        /// let result_raw = manager.blocking_lock().wait();
241        ///
242        /// ```
243        ///
244        pub fn spawn_task<F>(&mut self, task: F) -> TaskID
245        where
246            F: Future<Output = R> + Send + Sync + 'static,
247            F::Output: Send + Sized + 'static,
248        {
249            let id = TaskID::new_v4();
250            let rt = rumtk_init_threads!(&self.workers);
251            rumtk_spawn_task!(
252                rt,
253                Self::_add_task_async(id.clone(), self.tasks.clone(), task)
254            );
255            id
256        }
257
258        ///
259        /// See [add_task](Self::add_task)
260        ///
261        pub fn add_task<F>(&mut self, task: F) -> TaskID
262        where
263            F: Future<Output = R> + Send + Sync + 'static,
264            F::Output: Send + Sized + 'static,
265        {
266            let rt = rumtk_init_threads!(&self.workers);
267            rumtk_resolve_task!(rt, self.add_task_async(task))
268        }
269
270        async fn _add_task_async<F>(id: TaskID, tasks: SafeAsyncTaskTable<R>, task: F) -> TaskID
271        where
272            F: Future<Output = R> + Send + Sync + 'static,
273            F::Output: Send + Sized + 'static,
274        {
275            let mut safe_task = Arc::new(RwLock::new(Task::<R> {
276                id: id.clone(),
277                finished: false,
278                result: None,
279            }));
280            tasks.write().await.insert(id.clone(), safe_task.clone());
281
282            let task_wrapper = async move || {
283                // Run the task
284                let result = task.await;
285
286                // Cleanup task
287                safe_task.write().await.result = Some(result);
288                safe_task.write().await.finished = true;
289            };
290
291            tokio::spawn(task_wrapper());
292
293            id
294        }
295
296        ///
297        /// See [wait_async](Self::wait_async)
298        ///
299        pub fn wait(&mut self) -> TaskResults<R> {
300            let rt = rumtk_init_threads!(&self.workers);
301            rumtk_resolve_task!(rt, self.wait_async())
302        }
303
304        ///
305        /// See [wait_on_batch_async](Self::wait_on_batch_async)
306        ///
307        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
308            let rt = rumtk_init_threads!(&self.workers);
309            rumtk_resolve_task!(rt, self.wait_on_batch_async(&tasks))
310        }
311
312        ///
313        /// See [wait_on_async](Self::wait_on_async)
314        ///
315        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
316            let rt = rumtk_init_threads!(&self.workers);
317            rumtk_resolve_task!(rt, self.wait_on_async(&task_id))
318        }
319
320        ///
321        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
322        ///
323        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
324        ///
325        /// Upon completion,
326        ///
327        /// 2. Return the result ([TaskResults<R>](TaskResults)).
328        ///
329        /// This operation consumes the task.
330        ///
331        /// ### Note:
332        /// ```text
333        ///     Results returned here are not guaranteed to be in the same order as the order in which
334        ///     the tasks were queued for work. You will need to pass a type as T that automatically
335        ///     tracks its own id or has a way for you to resort results.
336        /// ```
337        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
338            let task = match self.tasks.write().await.remove(task_id) {
339                Some(task) => task.clone(),
340                None => return Err(rumtk_format!("No task with id {}", task_id)),
341            };
342
343            while !task.read().await.finished {
344                async_sleep(DEFAULT_SLEEP_DURATION).await;
345            }
346
347            let x = Ok(Arc::new(task.write().await.clone()));
348            x
349        }
350
351        ///
352        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
353        ///
354        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
355        ///
356        /// Upon completion,
357        ///
358        /// 1. We collect the results generated (if any).
359        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
360        ///
361        /// ### Note:
362        /// ```text
363        ///     Results returned here are not guaranteed to be in the same order as the order in which
364        ///     the tasks were queued for work. You will need to pass a type as T that automatically
365        ///     tracks its own id or has a way for you to resort results.
366        /// ```
367        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
368            let mut results = TaskResults::<R>::default();
369            for task in tasks {
370                results.push(self.wait_on_async(task).await);
371            }
372            results
373        }
374
375        ///
376        /// This method waits until all queued tasks have been processed from the main queue.
377        ///
378        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
379        ///
380        /// Upon completion,
381        ///
382        /// 1. We collect the results generated (if any).
383        /// 2. We reset the main task and result internal queue states.
384        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
385        ///
386        /// This operation consumes all the tasks.
387        ///
388        /// ### Note:
389        /// ```text
390        ///     Results returned here are not guaranteed to be in the same order as the order in which
391        ///     the tasks were queued for work. You will need to pass a type as T that automatically
392        ///     tracks its own id or has a way for you to resort results.
393        /// ```
394        pub async fn wait_async(&mut self) -> TaskResults<R> {
395            let task_batch = self.tasks.read().await.keys().cloned().collect::<Vec<_>>();
396            self.wait_on_batch_async(&task_batch).await
397        }
398
399        ///
400        /// Check if all work has been completed from the task queue.
401        ///
402        /// ## Examples
403        ///
404        /// ### Sync Usage
405        ///
406        ///```
407        /// use rumtk_core::threading::threading_manager::TaskManager;
408        ///
409        /// let manager = TaskManager::<usize>::new(&4).unwrap();
410        ///
411        /// let all_done = manager.is_all_completed();
412        ///
413        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
414        ///
415        /// ```
416        ///
417        pub fn is_all_completed(&self) -> bool {
418            let rt = rumtk_init_threads!(&self.workers);
419            rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
420        }
421
422        pub async fn is_all_completed_async(&self) -> bool {
423            for (_, task) in self.tasks.read().await.iter() {
424                if !task.read().await.finished {
425                    return false;
426                }
427            }
428
429            true
430        }
431
432        ///
433        /// Check if a task completed
434        ///
435        pub fn is_finished(&self, id: &TaskID) -> bool {
436            match self.tasks.blocking_read().get(id) {
437                Some(t) => t.blocking_read().finished,
438                None => false,
439            }
440        }
441
442        pub async fn is_finished_async(&self, id: &TaskID) -> bool {
443            match self.tasks.read().await.get(id) {
444                Some(task) => task.read().await.finished,
445                None => true,
446            }
447        }
448
449        ///
450        /// Alias for [wait](TaskManager::wait).
451        ///
452        fn gather(&mut self) -> TaskResults<R> {
453            self.wait()
454        }
455    }
456}
457
458///
459/// This module contains a few helper.
460///
461/// For example, you can find a function for determining number of threads available in system.
462/// The sleep family of functions are also here.
463///
464pub mod threading_functions {
465    use num_cpus;
466    use std::thread::{available_parallelism, sleep as std_sleep};
467    use std::time::Duration;
468    use tokio::time::sleep as tokio_sleep;
469
470    pub const NANOS_PER_SEC: u64 = 1000000000;
471    pub const MILLIS_PER_SEC: u64 = 1000;
472    pub const MICROS_PER_SEC: u64 = 1000000;
473
474    pub fn get_default_system_thread_count() -> usize {
475        let cpus: usize = num_cpus::get();
476        let parallelism = match available_parallelism() {
477            Ok(n) => n.get(),
478            Err(_) => 0,
479        };
480
481        if parallelism >= cpus {
482            parallelism
483        } else {
484            cpus
485        }
486    }
487
488    pub fn sleep(s: f32) {
489        let ns = s * NANOS_PER_SEC as f32;
490        let rounded_ns = ns.round() as u64;
491        let duration = Duration::from_nanos(rounded_ns);
492        std_sleep(duration);
493    }
494
495    pub async fn async_sleep(s: f32) {
496        let ns = s * NANOS_PER_SEC as f32;
497        let rounded_ns = ns.round() as u64;
498        let duration = Duration::from_nanos(rounded_ns);
499        tokio_sleep(duration).await;
500    }
501}
502
503///
504/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
505/// This means that by default, all jobs sent to the thread pool have to be async in nature.
506/// These macros make handling of these jobs at the sync/async boundary more convenient.
507///
508pub mod threading_macros {
509    use crate::threading::thread_primitives;
510    use crate::threading::threading_manager::SafeTaskArgs;
511
512    ///
513    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
514    /// will be saved to the global context so the next call to this macro will simply grab a
515    /// reference to the previously initialized runtime.
516    ///
517    /// Passing nothing will default to initializing a runtime using the default number of threads
518    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
519    ///
520    /// Passing `threads` number will yield a runtime that allocates that many threads.
521    ///
522    ///
523    /// ## Examples
524    ///
525    /// ```
526    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
527    ///     use rumtk_core::core::RUMResult;
528    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
529    ///
530    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
531    ///         let mut result = Vec::<i32>::new();
532    ///         for arg in args.read().await.iter() {
533    ///             result.push(*arg);
534    ///         }
535    ///         Ok(result)
536    ///     }
537    ///
538    ///     let rt = rumtk_init_threads!();                                      // Creates runtime instance
539    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
540    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
541    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
542    /// ```
543    ///
544    /// ```
545    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
546    ///     use rumtk_core::core::RUMResult;
547    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
548    ///
549    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
550    ///         let mut result = Vec::<i32>::new();
551    ///         for arg in args.read().await.iter() {
552    ///             result.push(*arg);
553    ///         }
554    ///         Ok(result)
555    ///     }
556    ///
557    ///     let thread_count: usize = 10;
558    ///     let rt = rumtk_init_threads!(&thread_count);
559    ///     let args = rumtk_create_task_args!(1);
560    ///     let task = rumtk_create_task!(test, args);
561    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
562    /// ```
563    #[macro_export]
564    macro_rules! rumtk_init_threads {
565        ( ) => {{
566            use $crate::rumtk_cache_fetch;
567            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
568            use $crate::threading::threading_functions::get_default_system_thread_count;
569            let rt = rumtk_cache_fetch!(
570                &mut RT_CACHE,
571                &get_default_system_thread_count(),
572                init_cache
573            );
574            rt
575        }};
576        ( $threads:expr ) => {{
577            use $crate::rumtk_cache_fetch;
578            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
579            let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
580            rt
581        }};
582    }
583
584    ///
585    /// Puts task onto the runtime queue.
586    ///
587    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
588    ///
589    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
590    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
591    ///
592    #[macro_export]
593    macro_rules! rumtk_spawn_task {
594        ( $func:expr ) => {{
595            let rt = rumtk_init_threads!();
596            rt.spawn($func)
597        }};
598        ( $rt:expr, $func:expr ) => {{
599            $rt.spawn($func)
600        }};
601    }
602
603    ///
604    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
605    ///
606    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
607    /// async closure without passing any arguments.
608    ///
609    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
610    /// In such a case, we pass those arguments to the call on the async closure and await on results.
611    ///
612    #[macro_export]
613    macro_rules! rumtk_wait_on_task {
614        ( $rt:expr, $func:expr ) => {{
615            $rt.block_on(async move {
616                $func().await
617            })
618        }};
619        ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
620            $rt.block_on(async move {
621                $func($($arg_items),+).await
622            })
623        }};
624    }
625
626    ///
627    /// This macro awaits a future.
628    ///
629    /// The arguments are a reference to the runtime (`rt) and a future.
630    ///
631    /// If there is a result, you will get the result of the future.
632    ///
633    /// ## Examples
634    ///
635    /// ```
636    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
637    ///     use rumtk_core::core::RUMResult;
638    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
639    ///
640    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
641    ///         let mut result = Vec::<i32>::new();
642    ///         for arg in args.read().await.iter() {
643    ///             result.push(*arg);
644    ///         }
645    ///         Ok(result)
646    ///     }
647    ///
648    ///     let rt = rumtk_init_threads!();
649    ///     let args = rumtk_create_task_args!(1);
650    ///     let task = rumtk_create_task!(test, args);
651    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
652    /// ```
653    ///
654    #[macro_export]
655    macro_rules! rumtk_resolve_task {
656        ( $rt:expr, $future:expr ) => {{
657            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
658            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
659            // is technically moved into the async closure and captured so things like mutex guards
660            // technically go out of the outer scope. As a result that expression fails to compile even
661            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
662            // into the async closure. To ensure that happens regardless of given expression, we do
663            // a variable assignment below to force the "future" macro expressions to resolve before
664            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
665            let future = $future;
666            $rt.block_on(async move { future.await })
667        }};
668    }
669
670    #[macro_export]
671    macro_rules! rumtk_resolve_task_from_async {
672        ( $rt:expr, $future:expr ) => {{
673            let handle = $rt.spawn_blocking(async move { future.await })
674        }};
675    }
676
677    ///
678    /// This macro creates an async body that calls the async closure and awaits it.
679    ///
680    /// ## Example
681    ///
682    /// ```
683    /// use std::sync::{Arc, RwLock};
684    /// use tokio::sync::RwLock as AsyncRwLock;
685    /// use rumtk_core::strings::RUMString;
686    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
687    ///
688    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
689    /// let expected = vec![
690    ///     RUMString::from("Hello"),
691    ///     RUMString::from("World!"),
692    ///     RUMString::from("Overcast"),
693    ///     RUMString::from("and"),
694    ///     RUMString::from("Sad"),
695    ///  ];
696    /// let locked_args = AsyncRwLock::new(expected.clone());
697    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
698    ///
699    ///
700    /// ```
701    ///
702    #[macro_export]
703    macro_rules! rumtk_create_task {
704        ( $func:expr ) => {{
705            async move {
706                let f = $func;
707                f().await
708            }
709        }};
710        ( $func:expr, $args:expr ) => {{
711            let f = $func;
712            async move { f(&$args).await }
713        }};
714    }
715
716    ///
717    /// Creates an instance of [SafeTaskArgs] with the arguments passed.
718    ///
719    /// ## Note
720    ///
721    /// All arguments must be of the same type
722    ///
723    #[macro_export]
724    macro_rules! rumtk_create_task_args {
725        ( ) => {{
726            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
727            use tokio::sync::RwLock;
728            SafeTaskArgs::new(RwLock::new(vec![]))
729        }};
730        ( $($args:expr),+ ) => {{
731            use $crate::threading::threading_manager::{SafeTaskArgs};
732            use tokio::sync::RwLock;
733            SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
734        }};
735    }
736
737    ///
738    /// Convenience macro for packaging the task components and launching the task in one line.
739    ///
740    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
741    /// number of threads at the end. This is optional. Meaning, we will default to the system's
742    /// number of threads if that value is not specified.
743    ///
744    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
745    /// variable number of arguments to pass to the task. each argument must be of the same type.
746    /// If you wish to pass different arguments with different types, please define an abstract type
747    /// whose underlying structure is a tuple of items and pass that instead.
748    ///
749    /// ## Examples
750    ///
751    /// ### With Default Thread Count
752    /// ```
753    ///     use rumtk_core::{rumtk_exec_task};
754    ///     use rumtk_core::core::RUMResult;
755    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
756    ///
757    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
758    ///         let mut result = Vec::<i32>::new();
759    ///         for arg in args.read().await.iter() {
760    ///             result.push(*arg);
761    ///         }
762    ///         Ok(result)
763    ///     }
764    ///
765    ///     let result = rumtk_exec_task!(test, vec![5]);
766    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
767    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
768    /// ```
769    ///
770    /// ### With Custom Thread Count
771    /// ```
772    ///     use rumtk_core::{rumtk_exec_task};
773    ///     use rumtk_core::core::RUMResult;
774    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
775    ///
776    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
777    ///         let mut result = Vec::<i32>::new();
778    ///         for arg in args.read().await.iter() {
779    ///             result.push(*arg);
780    ///         }
781    ///         Ok(result)
782    ///     }
783    ///
784    ///     let result = rumtk_exec_task!(test, vec![5], 5);
785    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
786    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
787    /// ```
788    ///
789    /// ### With Async Function Body
790    /// ```
791    ///     use rumtk_core::{rumtk_exec_task};
792    ///     use rumtk_core::core::RUMResult;
793    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
794    ///
795    ///     let result = rumtk_exec_task!(
796    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
797    ///         let mut result = Vec::<i32>::new();
798    ///         for arg in args.read().await.iter() {
799    ///             result.push(*arg);
800    ///         }
801    ///         Ok(result)
802    ///     },
803    ///     vec![5]);
804    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
805    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
806    /// ```
807    ///
808    /// ### With Async Function Body and No Args
809    /// ```
810    ///     use rumtk_core::{rumtk_exec_task};
811    ///     use rumtk_core::core::RUMResult;
812    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
813    ///
814    ///     let result = rumtk_exec_task!(
815    ///     async || -> RUMResult<Vec<i32>> {
816    ///         let mut result = Vec::<i32>::new();
817    ///         Ok(result)
818    ///     });
819    ///     let empty = Vec::<i32>::new();
820    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
821    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
822    /// ```
823    ///
824    /// ## Equivalent To
825    ///
826    /// ```
827    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
828    ///     use rumtk_core::core::RUMResult;
829    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
830    ///
831    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
832    ///         let mut result = Vec::<i32>::new();
833    ///         for arg in args.read().await.iter() {
834    ///             result.push(*arg);
835    ///         }
836    ///         Ok(result)
837    ///     }
838    ///
839    ///     let rt = rumtk_init_threads!();
840    ///     let args = rumtk_create_task_args!(1);
841    ///     let task = rumtk_create_task!(test, args);
842    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
843    /// ```
844    ///
845    #[macro_export]
846    macro_rules! rumtk_exec_task {
847        ($func:expr ) => {{
848            use tokio::sync::RwLock;
849            use $crate::{
850                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
851            };
852            let rt = rumtk_init_threads!();
853            let task = rumtk_create_task!($func);
854            rumtk_resolve_task!(&rt, task)
855        }};
856        ($func:expr, $args:expr ) => {{
857            use tokio::sync::RwLock;
858            use $crate::{
859                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
860            };
861            let rt = rumtk_init_threads!();
862            let args = SafeTaskArgs::new(RwLock::new($args));
863            let task = rumtk_create_task!($func, args);
864            rumtk_resolve_task!(&rt, task)
865        }};
866        ($func:expr, $args:expr , $threads:expr ) => {{
867            use tokio::sync::RwLock;
868            use $crate::{
869                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
870            };
871            let rt = rumtk_init_threads!(&$threads);
872            let args = SafeTaskArgs::new(RwLock::new($args));
873            let task = rumtk_create_task!($func, args);
874            rumtk_resolve_task!(&rt, task)
875        }};
876    }
877
878    ///
879    /// Sleep a duration of time in a sync context, so no await can be call on the result.
880    ///
881    /// You can pass any value that can be cast to f32.
882    ///
883    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
884    ///
885    /// ## Examples
886    ///
887    /// ```
888    ///     use rumtk_core::rumtk_sleep;
889    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
890    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
891    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
892    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
893    /// ```
894    ///
895    #[macro_export]
896    macro_rules! rumtk_sleep {
897        ( $dur:expr) => {{
898            use $crate::threading::threading_functions::sleep;
899            sleep($dur as f32)
900        }};
901    }
902
903    ///
904    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
905    ///
906    /// You can pass any value that can be cast to f32.
907    ///
908    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
909    ///
910    /// ## Examples
911    ///
912    /// ```
913    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
914    ///     use rumtk_core::core::RUMResult;
915    ///     rumtk_exec_task!( async || -> RUMResult<()> {
916    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
917    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
918    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
919    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
920    ///             Ok(())
921    ///         }
922    ///     );
923    /// ```
924    ///
925    #[macro_export]
926    macro_rules! rumtk_async_sleep {
927        ( $dur:expr) => {{
928            use $crate::threading::threading_functions::async_sleep;
929            async_sleep($dur as f32)
930        }};
931    }
932
933    ///
934    ///
935    ///
936    #[macro_export]
937    macro_rules! rumtk_new_task_queue {
938        ( $worker_num:expr ) => {{
939            use $crate::threading::threading_manager::TaskManager;
940            TaskManager::new($worker_num);
941        }};
942    }
943}