Skip to main content

rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::cache::{new_cache, LazyRUMCache};
27    use std::sync::Arc;
28    use tokio::runtime::Runtime as TokioRuntime;
29    /**************************** Globals **************************************/
30    pub static mut RT_CACHE: TokioRtCache = new_cache();
31    /**************************** Helpers ***************************************/
32    pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
33        let mut builder = tokio::runtime::Builder::new_multi_thread();
34        builder.worker_threads(*threads);
35        builder.enable_all();
36        match builder.build() {
37            Ok(handle) => Arc::new(handle),
38            Err(e) => panic!(
39                "Unable to initialize threading tokio runtime because {}!",
40                &e
41            ),
42        }
43    }
44
45    /**************************** Types ***************************************/
46    pub type SafeTokioRuntime = Arc<TokioRuntime>;
47    pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
48}
49
50pub mod threading_manager {
51    use crate::cache::LazyRUMCacheValue;
52    use crate::core::{RUMResult, RUMVec};
53    use crate::strings::rumtk_format;
54    use crate::threading::thread_primitives::SafeTokioRuntime;
55    use crate::threading::threading_functions::async_sleep;
56    use crate::types::{RUMHashMap, RUMID};
57    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
58    use std::future::Future;
59    use std::sync::Arc;
60    use tokio::sync::RwLock;
61    use tokio::task::JoinHandle;
62
63    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64    const DEFAULT_TASK_CAPACITY: usize = 1024;
65
66    pub type TaskItems<T> = RUMVec<T>;
67    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68    pub type TaskArgs<T> = TaskItems<T>;
69    /// Function signature defining the interface of task processing logic.
70    pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
71    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
72    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74    pub type TaskID = RUMID;
75
76    #[derive(Debug, Clone, Default)]
77    pub struct Task<R> {
78        pub id: TaskID,
79        pub finished: bool,
80        pub result: Option<R>,
81    }
82
83    pub type SafeTask<R> = Arc<Task<R>>;
84    type SafeInternalTask<R> = Arc<RwLock<Task<R>>>;
85    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86    pub type SafeAsyncTaskTable<R> = Arc<RwLock<TaskTable<R>>>;
87    pub type TaskBatch = RUMVec<TaskID>;
88    /// Type to use to define how task results are expected to be returned.
89    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
90    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
91    pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
92
93    ///
94    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
95    /// gives the multithreading, asynchronous superpowers to synchronous logic.
96    ///
97    /// ## Example Usage
98    ///
99    /// ```
100    /// use std::sync::{Arc};
101    /// use tokio::sync::RwLock as AsyncRwLock;
102    /// use rumtk_core::core::RUMResult;
103    /// use rumtk_core::strings::RUMString;
104    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
105    /// use rumtk_core::{rumtk_create_task, };
106    ///
107    /// let expected = vec![
108    ///     RUMString::from("Hello"),
109    ///     RUMString::from("World!"),
110    ///     RUMString::from("Overcast"),
111    ///     RUMString::from("and"),
112    ///     RUMString::from("Sad"),
113    ///  ];
114    ///
115    /// type TestResult = RUMResult<Vec<RUMString>>;
116    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
117    ///
118    /// let locked_args = AsyncRwLock::new(expected.clone());
119    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
120    /// let processor = rumtk_create_task!(
121    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
122    ///         let owned_args = Arc::clone(args);
123    ///         let locked_args = owned_args.read().await;
124    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
125    ///
126    ///         for arg in locked_args.iter() {
127    ///             results.push(RUMString::new(arg));
128    ///         }
129    ///
130    ///         Ok(results)
131    ///     },
132    ///     task_args
133    /// );
134    ///
135    /// queue.add_task::<_>(processor);
136    /// let results = queue.wait();
137    ///
138    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
139    /// for r in results {
140    ///     for v in r.unwrap().result.clone().unwrap().iter() {
141    ///         for value in v.iter() {
142    ///             result_data.push(value.clone());
143    ///         }
144    ///     }
145    ///  }
146    ///
147    /// assert_eq!(result_data, expected, "Results do not match expected!");
148    ///
149    /// ```
150    ///
151    #[derive(Debug, Clone, Default)]
152    pub struct TaskManager<R> {
153        tasks: SafeAsyncTaskTable<R>,
154        workers: usize,
155    }
156
157    impl<R> TaskManager<R>
158    where
159        R: Sync + Send + Clone + 'static,
160    {
161        ///
162        /// This method creates a [`TaskQueue`] instance using sensible defaults.
163        ///
164        /// The `threads` field is computed from the number of cores present in system.
165        ///
166        pub fn default() -> RUMResult<TaskManager<R>> {
167            Self::new(&threading::threading_functions::get_default_system_thread_count())
168        }
169
170        ///
171        /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
172        /// Expects you to provide the count of threads to spawn and the microtask queue size
173        /// allocated by each thread.
174        ///
175        /// This method calls [`Self::with_capacity()`] for the actual object creation.
176        /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
177        ///
178        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
179            let tasks = SafeAsyncTaskTable::<R>::new(RwLock::new(TaskTable::with_capacity(
180                DEFAULT_TASK_CAPACITY,
181            )));
182            Ok(TaskManager::<R> {
183                tasks,
184                workers: worker_num.to_owned(),
185            })
186        }
187
188        ///
189        /// Add a task to the processing queue. The idea is that you can queue a processor function
190        /// and list of args that will be picked up by one of the threads for processing.
191        ///
192        /// This is the async counterpart
193        ///
194        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
195        where
196            F: Future<Output = R> + Send + Sync + 'static,
197            F::Output: Send + Sized + 'static,
198        {
199            let id = TaskID::new_v4();
200            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
201        }
202
203        ///
204        /// See [add_task](Self::add_task)
205        ///
206        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
207        /// the tokio runtim if trying to add task to queue from a normal function called from an
208        /// async environment.
209        ///
210        /// ## Example
211        ///
212        /// ```
213        /// use rumtk_core::threading::threading_manager::{TaskManager};
214        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
215        /// use std::sync::{Arc};
216        /// use tokio::sync::Mutex;
217        ///
218        /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
219        ///
220        /// async fn called_fn() -> usize {
221        ///     5
222        /// }
223        ///
224        /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
225        ///     manager.spawn_task(called_fn());
226        ///     1
227        /// }
228        ///
229        /// async fn call_sync_fn(mut manager: JobManager) -> usize {
230        ///     let mut owned = manager.lock().await;
231        ///     push_job(&mut owned)
232        /// }
233        ///
234        /// let workers = 5;
235        /// let rt = rumtk_init_threads!(&workers);
236        /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
237        ///
238        /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
239        ///
240        /// let result_raw = manager.blocking_lock().wait();
241        ///
242        /// ```
243        ///
244        pub fn spawn_task<F>(&mut self, task: F) -> TaskID
245        where
246            F: Future<Output = R> + Send + Sync + 'static,
247            F::Output: Send + Sized + 'static,
248        {
249            let id = TaskID::new_v4();
250            let rt = rumtk_init_threads!(&self.workers);
251            rumtk_spawn_task!(
252                rt,
253                Self::_add_task_async(id.clone(), self.tasks.clone(), task)
254            );
255            id
256        }
257
258        ///
259        /// See [add_task](Self::add_task)
260        ///
261        pub fn add_task<F>(&mut self, task: F) -> TaskID
262        where
263            F: Future<Output = R> + Send + Sync + 'static,
264            F::Output: Send + Sized + 'static,
265        {
266            let rt = rumtk_init_threads!(&self.workers);
267            rumtk_resolve_task!(rt, self.add_task_async(task))
268        }
269
270        async fn _add_task_async<F>(id: TaskID, tasks: SafeAsyncTaskTable<R>, task: F) -> TaskID
271        where
272            F: Future<Output = R> + Send + Sync + 'static,
273            F::Output: Send + Sized + 'static,
274        {
275            let mut safe_task = Arc::new(RwLock::new(Task::<R> {
276                id: id.clone(),
277                finished: false,
278                result: None,
279            }));
280            tasks.write().await.insert(id.clone(), safe_task.clone());
281
282            let task_wrapper = async move || {
283                // Run the task
284                let result = task.await;
285
286                // Cleanup task
287                safe_task.write().await.result = Some(result);
288                safe_task.write().await.finished = true;
289            };
290
291            tokio::spawn(task_wrapper());
292
293            id
294        }
295
296        ///
297        /// See [wait_async](Self::wait_async)
298        ///
299        pub fn wait(&mut self) -> TaskResults<R> {
300            let rt = rumtk_init_threads!(&self.workers);
301            rumtk_resolve_task!(rt, self.wait_async())
302        }
303
304        ///
305        /// See [wait_on_batch_async](Self::wait_on_batch_async)
306        ///
307        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
308            let rt = rumtk_init_threads!(&self.workers);
309            rumtk_resolve_task!(rt, self.wait_on_batch_async(&tasks))
310        }
311
312        ///
313        /// See [wait_on_async](Self::wait_on_async)
314        ///
315        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
316            let rt = rumtk_init_threads!(&self.workers);
317            rumtk_resolve_task!(rt, self.wait_on_async(&task_id))
318        }
319
320        ///
321        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
322        ///
323        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
324        ///
325        /// Upon completion,
326        ///
327        /// 2. Return the result ([TaskResults<R>](TaskResults)).
328        ///
329        /// This operation consumes the task.
330        ///
331        /// ### Note:
332        /// ```text
333        ///     Results returned here are not guaranteed to be in the same order as the order in which
334        ///     the tasks were queued for work. You will need to pass a type as T that automatically
335        ///     tracks its own id or has a way for you to resort results.
336        /// ```
337        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
338            let task = match self.tasks.write().await.remove(task_id) {
339                Some(task) => task.clone(),
340                None => return Err(rumtk_format!("No task with id {}", task_id)),
341            };
342
343            while !task.read().await.finished {
344                async_sleep(DEFAULT_SLEEP_DURATION).await;
345            }
346
347            let x = Ok(Arc::new(task.write().await.clone()));
348            x
349        }
350
351        ///
352        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
353        ///
354        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
355        ///
356        /// Upon completion,
357        ///
358        /// 1. We collect the results generated (if any).
359        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
360        ///
361        /// ### Note:
362        /// ```text
363        ///     Results returned here are not guaranteed to be in the same order as the order in which
364        ///     the tasks were queued for work. You will need to pass a type as T that automatically
365        ///     tracks its own id or has a way for you to resort results.
366        /// ```
367        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
368            let mut results = TaskResults::<R>::default();
369            for task in tasks {
370                results.push(self.wait_on_async(task).await);
371            }
372            results
373        }
374
375        ///
376        /// This method waits until all queued tasks have been processed from the main queue.
377        ///
378        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
379        ///
380        /// Upon completion,
381        ///
382        /// 1. We collect the results generated (if any).
383        /// 2. We reset the main task and result internal queue states.
384        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
385        ///
386        /// This operation consumes all the tasks.
387        ///
388        /// ### Note:
389        /// ```text
390        ///     Results returned here are not guaranteed to be in the same order as the order in which
391        ///     the tasks were queued for work. You will need to pass a type as T that automatically
392        ///     tracks its own id or has a way for you to resort results.
393        /// ```
394        pub async fn wait_async(&mut self) -> TaskResults<R> {
395            let task_batch = self.tasks.read().await.keys().cloned().collect::<Vec<_>>();
396            self.wait_on_batch_async(&task_batch).await
397        }
398
399        ///
400        /// Check if all work has been completed from the task queue.
401        ///
402        /// ## Examples
403        ///
404        /// ### Sync Usage
405        ///
406        ///```
407        /// use rumtk_core::threading::threading_manager::TaskManager;
408        ///
409        /// let manager = TaskManager::<usize>::new(&4).unwrap();
410        ///
411        /// let all_done = manager.is_all_completed();
412        ///
413        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
414        ///
415        /// ```
416        ///
417        pub fn is_all_completed(&self) -> bool {
418            let rt = rumtk_init_threads!(&self.workers);
419            rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
420        }
421
422        pub async fn is_all_completed_async(&self) -> bool {
423            for (_, task) in self.tasks.read().await.iter() {
424                if !task.read().await.finished {
425                    return false;
426                }
427            }
428
429            true
430        }
431
432        ///
433        /// Alias for [wait](TaskManager::wait).
434        ///
435        fn gather(&mut self) -> TaskResults<R> {
436            self.wait()
437        }
438    }
439}
440
441///
442/// This module contains a few helper.
443///
444/// For example, you can find a function for determining number of threads available in system.
445/// The sleep family of functions are also here.
446///
447pub mod threading_functions {
448    use num_cpus;
449    use std::thread::{available_parallelism, sleep as std_sleep};
450    use std::time::Duration;
451    use tokio::time::sleep as tokio_sleep;
452
453    pub const NANOS_PER_SEC: u64 = 1000000000;
454    pub const MILLIS_PER_SEC: u64 = 1000;
455    pub const MICROS_PER_SEC: u64 = 1000000;
456
457    pub fn get_default_system_thread_count() -> usize {
458        let cpus: usize = num_cpus::get();
459        let parallelism = match available_parallelism() {
460            Ok(n) => n.get(),
461            Err(_) => 0,
462        };
463
464        if parallelism >= cpus {
465            parallelism
466        } else {
467            cpus
468        }
469    }
470
471    pub fn sleep(s: f32) {
472        let ns = s * NANOS_PER_SEC as f32;
473        let rounded_ns = ns.round() as u64;
474        let duration = Duration::from_nanos(rounded_ns);
475        std_sleep(duration);
476    }
477
478    pub async fn async_sleep(s: f32) {
479        let ns = s * NANOS_PER_SEC as f32;
480        let rounded_ns = ns.round() as u64;
481        let duration = Duration::from_nanos(rounded_ns);
482        tokio_sleep(duration).await;
483    }
484}
485
486///
487/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
488/// This means that by default, all jobs sent to the thread pool have to be async in nature.
489/// These macros make handling of these jobs at the sync/async boundary more convenient.
490///
491pub mod threading_macros {
492    use crate::threading::thread_primitives;
493    use crate::threading::threading_manager::SafeTaskArgs;
494
495    ///
496    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
497    /// will be saved to the global context so the next call to this macro will simply grab a
498    /// reference to the previously initialized runtime.
499    ///
500    /// Passing nothing will default to initializing a runtime using the default number of threads
501    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
502    ///
503    /// Passing `threads` number will yield a runtime that allocates that many threads.
504    ///
505    ///
506    /// ## Examples
507    ///
508    /// ```
509    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
510    ///     use rumtk_core::core::RUMResult;
511    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
512    ///
513    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
514    ///         let mut result = Vec::<i32>::new();
515    ///         for arg in args.read().await.iter() {
516    ///             result.push(*arg);
517    ///         }
518    ///         Ok(result)
519    ///     }
520    ///
521    ///     let rt = rumtk_init_threads!();                                      // Creates runtime instance
522    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
523    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
524    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
525    /// ```
526    ///
527    /// ```
528    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
529    ///     use rumtk_core::core::RUMResult;
530    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
531    ///
532    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
533    ///         let mut result = Vec::<i32>::new();
534    ///         for arg in args.read().await.iter() {
535    ///             result.push(*arg);
536    ///         }
537    ///         Ok(result)
538    ///     }
539    ///
540    ///     let thread_count: usize = 10;
541    ///     let rt = rumtk_init_threads!(&thread_count);
542    ///     let args = rumtk_create_task_args!(1);
543    ///     let task = rumtk_create_task!(test, args);
544    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
545    /// ```
546    #[macro_export]
547    macro_rules! rumtk_init_threads {
548        ( ) => {{
549            use $crate::rumtk_cache_fetch;
550            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
551            use $crate::threading::threading_functions::get_default_system_thread_count;
552            let rt = rumtk_cache_fetch!(
553                &mut RT_CACHE,
554                &get_default_system_thread_count(),
555                init_cache
556            );
557            rt
558        }};
559        ( $threads:expr ) => {{
560            use $crate::rumtk_cache_fetch;
561            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
562            let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
563            rt
564        }};
565    }
566
567    ///
568    /// Puts task onto the runtime queue.
569    ///
570    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
571    ///
572    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
573    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
574    ///
575    #[macro_export]
576    macro_rules! rumtk_spawn_task {
577        ( $func:expr ) => {{
578            let rt = rumtk_init_threads!();
579            rt.spawn($func)
580        }};
581        ( $rt:expr, $func:expr ) => {{
582            $rt.spawn($func)
583        }};
584    }
585
586    ///
587    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
588    ///
589    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
590    /// async closure without passing any arguments.
591    ///
592    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
593    /// In such a case, we pass those arguments to the call on the async closure and await on results.
594    ///
595    #[macro_export]
596    macro_rules! rumtk_wait_on_task {
597        ( $rt:expr, $func:expr ) => {{
598            $rt.block_on(async move {
599                $func().await
600            })
601        }};
602        ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
603            $rt.block_on(async move {
604                $func($($arg_items),+).await
605            })
606        }};
607    }
608
609    ///
610    /// This macro awaits a future.
611    ///
612    /// The arguments are a reference to the runtime (`rt) and a future.
613    ///
614    /// If there is a result, you will get the result of the future.
615    ///
616    /// ## Examples
617    ///
618    /// ```
619    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
620    ///     use rumtk_core::core::RUMResult;
621    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
622    ///
623    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
624    ///         let mut result = Vec::<i32>::new();
625    ///         for arg in args.read().await.iter() {
626    ///             result.push(*arg);
627    ///         }
628    ///         Ok(result)
629    ///     }
630    ///
631    ///     let rt = rumtk_init_threads!();
632    ///     let args = rumtk_create_task_args!(1);
633    ///     let task = rumtk_create_task!(test, args);
634    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
635    /// ```
636    ///
637    #[macro_export]
638    macro_rules! rumtk_resolve_task {
639        ( $rt:expr, $future:expr ) => {{
640            use $crate::strings::rumtk_format;
641            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
642            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
643            // is technically moved into the async closure and captured so things like mutex guards
644            // technically go out of the outer scope. As a result that expression fails to compile even
645            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
646            // into the async closure. To ensure that happens regardless of given expression, we do
647            // a variable assignment below to force the "future" macro expressions to resolve before
648            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
649            let future = $future;
650            $rt.block_on(async move { future.await })
651        }};
652    }
653
654    ///
655    /// This macro creates an async body that calls the async closure and awaits it.
656    ///
657    /// ## Example
658    ///
659    /// ```
660    /// use std::sync::{Arc, RwLock};
661    /// use tokio::sync::RwLock as AsyncRwLock;
662    /// use rumtk_core::strings::RUMString;
663    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
664    ///
665    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
666    /// let expected = vec![
667    ///     RUMString::from("Hello"),
668    ///     RUMString::from("World!"),
669    ///     RUMString::from("Overcast"),
670    ///     RUMString::from("and"),
671    ///     RUMString::from("Sad"),
672    ///  ];
673    /// let locked_args = AsyncRwLock::new(expected.clone());
674    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
675    ///
676    ///
677    /// ```
678    ///
679    #[macro_export]
680    macro_rules! rumtk_create_task {
681        ( $func:expr ) => {{
682            async move {
683                let f = $func;
684                f().await
685            }
686        }};
687        ( $func:expr, $args:expr ) => {{
688            let f = $func;
689            async move { f(&$args).await }
690        }};
691    }
692
693    ///
694    /// Creates an instance of [SafeTaskArgs] with the arguments passed.
695    ///
696    /// ## Note
697    ///
698    /// All arguments must be of the same type
699    ///
700    #[macro_export]
701    macro_rules! rumtk_create_task_args {
702        ( ) => {{
703            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
704            use tokio::sync::RwLock;
705            SafeTaskArgs::new(RwLock::new(vec![]))
706        }};
707        ( $($args:expr),+ ) => {{
708            use $crate::threading::threading_manager::{SafeTaskArgs};
709            use tokio::sync::RwLock;
710            SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
711        }};
712    }
713
714    ///
715    /// Convenience macro for packaging the task components and launching the task in one line.
716    ///
717    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
718    /// number of threads at the end. This is optional. Meaning, we will default to the system's
719    /// number of threads if that value is not specified.
720    ///
721    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
722    /// variable number of arguments to pass to the task. each argument must be of the same type.
723    /// If you wish to pass different arguments with different types, please define an abstract type
724    /// whose underlying structure is a tuple of items and pass that instead.
725    ///
726    /// ## Examples
727    ///
728    /// ### With Default Thread Count
729    /// ```
730    ///     use rumtk_core::{rumtk_exec_task};
731    ///     use rumtk_core::core::RUMResult;
732    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
733    ///
734    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
735    ///         let mut result = Vec::<i32>::new();
736    ///         for arg in args.read().await.iter() {
737    ///             result.push(*arg);
738    ///         }
739    ///         Ok(result)
740    ///     }
741    ///
742    ///     let result = rumtk_exec_task!(test, vec![5]);
743    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
744    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
745    /// ```
746    ///
747    /// ### With Custom Thread Count
748    /// ```
749    ///     use rumtk_core::{rumtk_exec_task};
750    ///     use rumtk_core::core::RUMResult;
751    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
752    ///
753    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
754    ///         let mut result = Vec::<i32>::new();
755    ///         for arg in args.read().await.iter() {
756    ///             result.push(*arg);
757    ///         }
758    ///         Ok(result)
759    ///     }
760    ///
761    ///     let result = rumtk_exec_task!(test, vec![5], 5);
762    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
763    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
764    /// ```
765    ///
766    /// ### With Async Function Body
767    /// ```
768    ///     use rumtk_core::{rumtk_exec_task};
769    ///     use rumtk_core::core::RUMResult;
770    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
771    ///
772    ///     let result = rumtk_exec_task!(
773    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
774    ///         let mut result = Vec::<i32>::new();
775    ///         for arg in args.read().await.iter() {
776    ///             result.push(*arg);
777    ///         }
778    ///         Ok(result)
779    ///     },
780    ///     vec![5]);
781    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
782    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
783    /// ```
784    ///
785    /// ### With Async Function Body and No Args
786    /// ```
787    ///     use rumtk_core::{rumtk_exec_task};
788    ///     use rumtk_core::core::RUMResult;
789    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
790    ///
791    ///     let result = rumtk_exec_task!(
792    ///     async || -> RUMResult<Vec<i32>> {
793    ///         let mut result = Vec::<i32>::new();
794    ///         Ok(result)
795    ///     });
796    ///     let empty = Vec::<i32>::new();
797    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
798    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
799    /// ```
800    ///
801    /// ## Equivalent To
802    ///
803    /// ```
804    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
805    ///     use rumtk_core::core::RUMResult;
806    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
807    ///
808    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
809    ///         let mut result = Vec::<i32>::new();
810    ///         for arg in args.read().await.iter() {
811    ///             result.push(*arg);
812    ///         }
813    ///         Ok(result)
814    ///     }
815    ///
816    ///     let rt = rumtk_init_threads!();
817    ///     let args = rumtk_create_task_args!(1);
818    ///     let task = rumtk_create_task!(test, args);
819    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
820    /// ```
821    ///
822    #[macro_export]
823    macro_rules! rumtk_exec_task {
824        ($func:expr ) => {{
825            use tokio::sync::RwLock;
826            use $crate::{
827                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
828            };
829            let rt = rumtk_init_threads!();
830            let task = rumtk_create_task!($func);
831            rumtk_resolve_task!(&rt, task)
832        }};
833        ($func:expr, $args:expr ) => {{
834            use tokio::sync::RwLock;
835            use $crate::{
836                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
837            };
838            let rt = rumtk_init_threads!();
839            let args = SafeTaskArgs::new(RwLock::new($args));
840            let task = rumtk_create_task!($func, args);
841            rumtk_resolve_task!(&rt, task)
842        }};
843        ($func:expr, $args:expr , $threads:expr ) => {{
844            use tokio::sync::RwLock;
845            use $crate::{
846                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
847            };
848            let rt = rumtk_init_threads!(&$threads);
849            let args = SafeTaskArgs::new(RwLock::new($args));
850            let task = rumtk_create_task!($func, args);
851            rumtk_resolve_task!(&rt, task)
852        }};
853    }
854
855    ///
856    /// Sleep a duration of time in a sync context, so no await can be call on the result.
857    ///
858    /// You can pass any value that can be cast to f32.
859    ///
860    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
861    ///
862    /// ## Examples
863    ///
864    /// ```
865    ///     use rumtk_core::rumtk_sleep;
866    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
867    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
868    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
869    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
870    /// ```
871    ///
872    #[macro_export]
873    macro_rules! rumtk_sleep {
874        ( $dur:expr) => {{
875            use $crate::threading::threading_functions::sleep;
876            sleep($dur as f32)
877        }};
878    }
879
880    ///
881    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
882    ///
883    /// You can pass any value that can be cast to f32.
884    ///
885    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
886    ///
887    /// ## Examples
888    ///
889    /// ```
890    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
891    ///     use rumtk_core::core::RUMResult;
892    ///     rumtk_exec_task!( async || -> RUMResult<()> {
893    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
894    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
895    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
896    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
897    ///             Ok(())
898    ///         }
899    ///     );
900    /// ```
901    ///
902    #[macro_export]
903    macro_rules! rumtk_async_sleep {
904        ( $dur:expr) => {{
905            use $crate::threading::threading_functions::async_sleep;
906            async_sleep($dur as f32)
907        }};
908    }
909
910    ///
911    ///
912    ///
913    #[macro_export]
914    macro_rules! rumtk_new_task_queue {
915        ( $worker_num:expr ) => {{
916            use $crate::threading::threading_manager::TaskManager;
917            TaskManager::new($worker_num);
918        }};
919    }
920}