rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::cache::{new_cache, LazyRUMCache};
27    use std::sync::Arc;
28    use tokio::runtime::Runtime as TokioRuntime;
29    /**************************** Globals **************************************/
30    pub static mut RT_CACHE: TokioRtCache = new_cache();
31    /**************************** Helpers ***************************************/
32    pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
33        let mut builder = tokio::runtime::Builder::new_multi_thread();
34        builder.worker_threads(*threads);
35        builder.enable_all();
36        match builder.build() {
37            Ok(handle) => Arc::new(handle),
38            Err(e) => panic!(
39                "Unable to initialize threading tokio runtime because {}!",
40                &e
41            ),
42        }
43    }
44
45    /**************************** Types ***************************************/
46    pub type SafeTokioRuntime = Arc<TokioRuntime>;
47    pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
48}
49
50pub mod threading_manager {
51    use crate::cache::LazyRUMCacheValue;
52    use crate::core::{RUMResult, RUMVec};
53    use crate::strings::rumtk_format;
54    use crate::threading::thread_primitives::SafeTokioRuntime;
55    use crate::threading::threading_functions::async_sleep;
56    use crate::types::{RUMHashMap, RUMID};
57    use crate::{rumtk_init_threads, rumtk_resolve_task, threading};
58    use std::future::Future;
59    use std::sync::Arc;
60    use tokio::sync::RwLock;
61    use tokio::task::JoinHandle;
62
63    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64    const DEFAULT_TASK_CAPACITY: usize = 1024;
65
66    pub type TaskItems<T> = RUMVec<T>;
67    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68    pub type TaskArgs<T> = TaskItems<T>;
69    /// Function signature defining the interface of task processing logic.
70    pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
71    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
72    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74    pub type TaskID = RUMID;
75
76    #[derive(Debug, Clone, Default)]
77    pub struct Task<R> {
78        pub id: TaskID,
79        pub finished: bool,
80        pub result: Option<R>,
81    }
82
83    pub type SafeTask<R> = Arc<Task<R>>;
84    type SafeInternalTask<R> = Arc<RwLock<Task<R>>>;
85    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86    pub type SafeAsyncTaskTable<R> = Arc<RwLock<TaskTable<R>>>;
87    pub type TaskBatch = RUMVec<TaskID>;
88    /// Type to use to define how task results are expected to be returned.
89    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
90    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
91    pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
92
93    ///
94    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
95    /// gives the multithreading, asynchronous superpowers to synchronous logic.
96    ///
97    /// ## Example Usage
98    ///
99    /// ```
100    /// use std::sync::{Arc};
101    /// use tokio::sync::RwLock as AsyncRwLock;
102    /// use rumtk_core::core::RUMResult;
103    /// use rumtk_core::strings::RUMString;
104    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
105    /// use rumtk_core::{rumtk_create_task, };
106    ///
107    /// let expected = vec![
108    ///     RUMString::from("Hello"),
109    ///     RUMString::from("World!"),
110    ///     RUMString::from("Overcast"),
111    ///     RUMString::from("and"),
112    ///     RUMString::from("Sad"),
113    ///  ];
114    ///
115    /// type TestResult = RUMResult<Vec<RUMString>>;
116    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
117    ///
118    /// let locked_args = AsyncRwLock::new(expected.clone());
119    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
120    /// let processor = rumtk_create_task!(
121    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
122    ///         let owned_args = Arc::clone(args);
123    ///         let locked_args = owned_args.read().await;
124    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
125    ///
126    ///         for arg in locked_args.iter() {
127    ///             results.push(RUMString::new(arg));
128    ///         }
129    ///
130    ///         Ok(results)
131    ///     },
132    ///     task_args
133    /// );
134    ///
135    /// queue.add_task::<_>(processor);
136    /// let results = queue.wait();
137    ///
138    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
139    /// for r in results {
140    ///     for v in r.unwrap().result.clone().unwrap().iter() {
141    ///         for value in v.iter() {
142    ///             result_data.push(value.clone());
143    ///         }
144    ///     }
145    ///  }
146    ///
147    /// assert_eq!(result_data, expected, "Results do not match expected!");
148    ///
149    /// ```
150    ///
151    #[derive(Debug, Clone, Default)]
152    pub struct TaskManager<R> {
153        tasks: SafeAsyncTaskTable<R>,
154        workers: usize,
155    }
156
157    impl<R> TaskManager<R>
158    where
159        R: Sync + Send + Clone + 'static,
160    {
161        ///
162        /// This method creates a [`TaskQueue`] instance using sensible defaults.
163        ///
164        /// The `threads` field is computed from the number of cores present in system.
165        ///
166        pub fn default() -> RUMResult<TaskManager<R>> {
167            Self::new(&threading::threading_functions::get_default_system_thread_count())
168        }
169
170        ///
171        /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
172        /// Expects you to provide the count of threads to spawn and the microtask queue size
173        /// allocated by each thread.
174        ///
175        /// This method calls [`Self::with_capacity()`] for the actual object creation.
176        /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
177        ///
178        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
179            let tasks = SafeAsyncTaskTable::<R>::new(RwLock::new(TaskTable::with_capacity(
180                DEFAULT_TASK_CAPACITY,
181            )));
182            Ok(TaskManager::<R> {
183                tasks,
184                workers: worker_num.to_owned(),
185            })
186        }
187
188        ///
189        /// Add a task to the processing queue. The idea is that you can queue a processor function
190        /// and list of args that will be picked up by one of the threads for processing.
191        ///
192        /// This is the async counterpart
193        ///
194        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
195        where
196            F: Future<Output = R> + Send + Sync + 'static,
197            F::Output: Send + Sized + 'static,
198        {
199            let id = TaskID::new_v4();
200            let mut safe_task = Arc::new(RwLock::new(Task::<R> {
201                id: id.clone(),
202                finished: false,
203                result: None,
204            }));
205            self.tasks
206                .write()
207                .await
208                .insert(id.clone(), safe_task.clone());
209
210            let task_wrapper = async move || {
211                // Run the task
212                let result = task.await;
213
214                // Cleanup task
215                safe_task.write().await.result = Some(result);
216                safe_task.write().await.finished = true;
217            };
218
219            tokio::spawn(task_wrapper());
220
221            id
222        }
223
224        ///
225        /// See [add_task](Self::add_task)
226        ///
227        pub fn add_task<F>(&mut self, task: F) -> TaskID
228        where
229            F: Future<Output = R> + Send + Sync + 'static,
230            F::Output: Send + Sized + 'static,
231        {
232            let rt = rumtk_init_threads!(&self.workers);
233            rumtk_resolve_task!(rt, self.add_task_async(task))
234        }
235
236        ///
237        /// See [wait_async](Self::wait_async)
238        ///
239        pub fn wait(&mut self) -> TaskResults<R> {
240            let rt = rumtk_init_threads!(&self.workers);
241            rumtk_resolve_task!(rt, self.wait_async())
242        }
243
244        ///
245        /// See [wait_on_batch_async](Self::wait_on_batch_async)
246        ///
247        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
248            let rt = rumtk_init_threads!(&self.workers);
249            rumtk_resolve_task!(rt, self.wait_on_batch_async(&tasks))
250        }
251
252        ///
253        /// See [wait_on_async](Self::wait_on_async)
254        ///
255        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
256            let rt = rumtk_init_threads!(&self.workers);
257            rumtk_resolve_task!(rt, self.wait_on_async(&task_id))
258        }
259
260        ///
261        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
262        ///
263        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
264        ///
265        /// Upon completion,
266        ///
267        /// 2. Return the result ([TaskResults<R>](TaskResults)).
268        ///
269        /// This operation consumes the task.
270        ///
271        /// ### Note:
272        /// ```text
273        ///     Results returned here are not guaranteed to be in the same order as the order in which
274        ///     the tasks were queued for work. You will need to pass a type as T that automatically
275        ///     tracks its own id or has a way for you to resort results.
276        /// ```
277        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
278            let task = match self.tasks.write().await.remove(task_id) {
279                Some(task) => task.clone(),
280                None => return Err(rumtk_format!("No task with id {}", task_id)),
281            };
282
283            while !task.read().await.finished {
284                async_sleep(DEFAULT_SLEEP_DURATION).await;
285            }
286
287            let x = Ok(Arc::new(task.write().await.clone()));
288            x
289        }
290
291        ///
292        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
293        ///
294        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
295        ///
296        /// Upon completion,
297        ///
298        /// 1. We collect the results generated (if any).
299        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
300        ///
301        /// ### Note:
302        /// ```text
303        ///     Results returned here are not guaranteed to be in the same order as the order in which
304        ///     the tasks were queued for work. You will need to pass a type as T that automatically
305        ///     tracks its own id or has a way for you to resort results.
306        /// ```
307        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
308            let mut results = TaskResults::<R>::default();
309            for task in tasks {
310                results.push(self.wait_on_async(task).await);
311            }
312            results
313        }
314
315        ///
316        /// This method waits until all queued tasks have been processed from the main queue.
317        ///
318        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
319        ///
320        /// Upon completion,
321        ///
322        /// 1. We collect the results generated (if any).
323        /// 2. We reset the main task and result internal queue states.
324        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
325        ///
326        /// This operation consumes all the tasks.
327        ///
328        /// ### Note:
329        /// ```text
330        ///     Results returned here are not guaranteed to be in the same order as the order in which
331        ///     the tasks were queued for work. You will need to pass a type as T that automatically
332        ///     tracks its own id or has a way for you to resort results.
333        /// ```
334        pub async fn wait_async(&mut self) -> TaskResults<R> {
335            let task_batch = self.tasks.read().await.keys().cloned().collect::<Vec<_>>();
336            self.wait_on_batch_async(&task_batch).await
337        }
338
339        ///
340        /// Check if all work has been completed from the task queue.
341        ///
342        /// ## Examples
343        ///
344        /// ### Sync Usage
345        ///
346        ///```
347        /// use rumtk_core::threading::threading_manager::TaskManager;
348        ///
349        /// let manager = TaskManager::<usize>::new(&4).unwrap();
350        ///
351        /// let all_done = manager.is_all_completed();
352        ///
353        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
354        ///
355        /// ```
356        ///
357        pub fn is_all_completed(&self) -> bool {
358            let rt = rumtk_init_threads!(&self.workers);
359            rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
360        }
361
362        pub async fn is_all_completed_async(&self) -> bool {
363            for (_, task) in self.tasks.read().await.iter() {
364                if !task.read().await.finished {
365                    return false;
366                }
367            }
368
369            true
370        }
371
372        ///
373        /// Alias for [wait](TaskManager::wait).
374        ///
375        fn gather(&mut self) -> TaskResults<R> {
376            self.wait()
377        }
378    }
379}
380
381///
382/// This module contains a few helper.
383///
384/// For example, you can find a function for determining number of threads available in system.
385/// The sleep family of functions are also here.
386///
387pub mod threading_functions {
388    use num_cpus;
389    use std::thread::{available_parallelism, sleep as std_sleep};
390    use std::time::Duration;
391    use tokio::time::sleep as tokio_sleep;
392
393    pub const NANOS_PER_SEC: u64 = 1000000000;
394    pub const MILLIS_PER_SEC: u64 = 1000;
395    pub const MICROS_PER_SEC: u64 = 1000000;
396
397    pub fn get_default_system_thread_count() -> usize {
398        let cpus: usize = num_cpus::get();
399        let parallelism = match available_parallelism() {
400            Ok(n) => n.get(),
401            Err(_) => 0,
402        };
403
404        if parallelism >= cpus {
405            parallelism
406        } else {
407            cpus
408        }
409    }
410
411    pub fn sleep(s: f32) {
412        let ns = s * NANOS_PER_SEC as f32;
413        let rounded_ns = ns.round() as u64;
414        let duration = Duration::from_nanos(rounded_ns);
415        std_sleep(duration);
416    }
417
418    pub async fn async_sleep(s: f32) {
419        let ns = s * NANOS_PER_SEC as f32;
420        let rounded_ns = ns.round() as u64;
421        let duration = Duration::from_nanos(rounded_ns);
422        tokio_sleep(duration).await;
423    }
424}
425
426///
427/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
428/// This means that by default, all jobs sent to the thread pool have to be async in nature.
429/// These macros make handling of these jobs at the sync/async boundary more convenient.
430///
431pub mod threading_macros {
432    use crate::threading::thread_primitives;
433    use crate::threading::threading_manager::SafeTaskArgs;
434
435    ///
436    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
437    /// will be saved to the global context so the next call to this macro will simply grab a
438    /// reference to the previously initialized runtime.
439    ///
440    /// Passing nothing will default to initializing a runtime using the default number of threads
441    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
442    ///
443    /// Passing `threads` number will yield a runtime that allocates that many threads.
444    ///
445    ///
446    /// ## Examples
447    ///
448    /// ```
449    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
450    ///     use rumtk_core::core::RUMResult;
451    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
452    ///
453    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
454    ///         let mut result = Vec::<i32>::new();
455    ///         for arg in args.read().await.iter() {
456    ///             result.push(*arg);
457    ///         }
458    ///         Ok(result)
459    ///     }
460    ///
461    ///     let rt = rumtk_init_threads!();                                      // Creates runtime instance
462    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
463    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
464    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
465    /// ```
466    ///
467    /// ```
468    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
469    ///     use rumtk_core::core::RUMResult;
470    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
471    ///
472    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
473    ///         let mut result = Vec::<i32>::new();
474    ///         for arg in args.read().await.iter() {
475    ///             result.push(*arg);
476    ///         }
477    ///         Ok(result)
478    ///     }
479    ///
480    ///     let thread_count: usize = 10;
481    ///     let rt = rumtk_init_threads!(&thread_count);
482    ///     let args = rumtk_create_task_args!(1);
483    ///     let task = rumtk_create_task!(test, args);
484    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
485    /// ```
486    #[macro_export]
487    macro_rules! rumtk_init_threads {
488        ( ) => {{
489            use $crate::rumtk_cache_fetch;
490            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
491            use $crate::threading::threading_functions::get_default_system_thread_count;
492            let rt = rumtk_cache_fetch!(
493                &mut RT_CACHE,
494                &get_default_system_thread_count(),
495                init_cache
496            );
497            rt
498        }};
499        ( $threads:expr ) => {{
500            use $crate::rumtk_cache_fetch;
501            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
502            let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
503            rt
504        }};
505    }
506
507    ///
508    /// Puts task onto the runtime queue.
509    ///
510    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
511    ///
512    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
513    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
514    ///
515    #[macro_export]
516    macro_rules! rumtk_spawn_task {
517        ( $func:expr ) => {{
518            let rt = rumtk_init_threads!();
519            rt.spawn($func)
520        }};
521        ( $rt:expr, $func:expr ) => {{
522            $rt.spawn($func)
523        }};
524    }
525
526    ///
527    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
528    ///
529    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
530    /// async closure without passing any arguments.
531    ///
532    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
533    /// In such a case, we pass those arguments to the call on the async closure and await on results.
534    ///
535    #[macro_export]
536    macro_rules! rumtk_wait_on_task {
537        ( $rt:expr, $func:expr ) => {{
538            $rt.block_on(async move {
539                $func().await
540            })
541        }};
542        ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
543            $rt.block_on(async move {
544                $func($($arg_items),+).await
545            })
546        }};
547    }
548
549    ///
550    /// This macro awaits a future.
551    ///
552    /// The arguments are a reference to the runtime (`rt) and a future.
553    ///
554    /// If there is a result, you will get the result of the future.
555    ///
556    /// ## Examples
557    ///
558    /// ```
559    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
560    ///     use rumtk_core::core::RUMResult;
561    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
562    ///
563    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
564    ///         let mut result = Vec::<i32>::new();
565    ///         for arg in args.read().await.iter() {
566    ///             result.push(*arg);
567    ///         }
568    ///         Ok(result)
569    ///     }
570    ///
571    ///     let rt = rumtk_init_threads!();
572    ///     let args = rumtk_create_task_args!(1);
573    ///     let task = rumtk_create_task!(test, args);
574    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
575    /// ```
576    ///
577    #[macro_export]
578    macro_rules! rumtk_resolve_task {
579        ( $rt:expr, $future:expr ) => {{
580            use $crate::strings::rumtk_format;
581            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
582            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
583            // is technically moved into the async closure and captured so things like mutex guards
584            // technically go out of the outer scope. As a result that expression fails to compile even
585            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
586            // into the async closure. To ensure that happens regardless of given expression, we do
587            // a variable assignment below to force the "future" macro expressions to resolve before
588            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
589            let future = $future;
590            $rt.block_on(async move { future.await })
591        }};
592    }
593
594    ///
595    /// This macro creates an async body that calls the async closure and awaits it.
596    ///
597    /// ## Example
598    ///
599    /// ```
600    /// use std::sync::{Arc, RwLock};
601    /// use tokio::sync::RwLock as AsyncRwLock;
602    /// use rumtk_core::strings::RUMString;
603    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
604    ///
605    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
606    /// let expected = vec![
607    ///     RUMString::from("Hello"),
608    ///     RUMString::from("World!"),
609    ///     RUMString::from("Overcast"),
610    ///     RUMString::from("and"),
611    ///     RUMString::from("Sad"),
612    ///  ];
613    /// let locked_args = AsyncRwLock::new(expected.clone());
614    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
615    ///
616    ///
617    /// ```
618    ///
619    #[macro_export]
620    macro_rules! rumtk_create_task {
621        ( $func:expr ) => {{
622            async move {
623                let f = $func;
624                f().await
625            }
626        }};
627        ( $func:expr, $args:expr ) => {{
628            let f = $func;
629            async move { f(&$args).await }
630        }};
631    }
632
633    ///
634    /// Creates an instance of [SafeTaskArgs] with the arguments passed.
635    ///
636    /// ## Note
637    ///
638    /// All arguments must be of the same type
639    ///
640    #[macro_export]
641    macro_rules! rumtk_create_task_args {
642        ( ) => {{
643            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
644            use tokio::sync::RwLock;
645            SafeTaskArgs::new(RwLock::new(vec![]))
646        }};
647        ( $($args:expr),+ ) => {{
648            use $crate::threading::threading_manager::{SafeTaskArgs};
649            use tokio::sync::RwLock;
650            SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
651        }};
652    }
653
654    ///
655    /// Convenience macro for packaging the task components and launching the task in one line.
656    ///
657    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
658    /// number of threads at the end. This is optional. Meaning, we will default to the system's
659    /// number of threads if that value is not specified.
660    ///
661    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
662    /// variable number of arguments to pass to the task. each argument must be of the same type.
663    /// If you wish to pass different arguments with different types, please define an abstract type
664    /// whose underlying structure is a tuple of items and pass that instead.
665    ///
666    /// ## Examples
667    ///
668    /// ### With Default Thread Count
669    /// ```
670    ///     use rumtk_core::{rumtk_exec_task};
671    ///     use rumtk_core::core::RUMResult;
672    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
673    ///
674    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
675    ///         let mut result = Vec::<i32>::new();
676    ///         for arg in args.read().await.iter() {
677    ///             result.push(*arg);
678    ///         }
679    ///         Ok(result)
680    ///     }
681    ///
682    ///     let result = rumtk_exec_task!(test, vec![5]);
683    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
684    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
685    /// ```
686    ///
687    /// ### With Custom Thread Count
688    /// ```
689    ///     use rumtk_core::{rumtk_exec_task};
690    ///     use rumtk_core::core::RUMResult;
691    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
692    ///
693    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
694    ///         let mut result = Vec::<i32>::new();
695    ///         for arg in args.read().await.iter() {
696    ///             result.push(*arg);
697    ///         }
698    ///         Ok(result)
699    ///     }
700    ///
701    ///     let result = rumtk_exec_task!(test, vec![5], 5);
702    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
703    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
704    /// ```
705    ///
706    /// ### With Async Function Body
707    /// ```
708    ///     use rumtk_core::{rumtk_exec_task};
709    ///     use rumtk_core::core::RUMResult;
710    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
711    ///
712    ///     let result = rumtk_exec_task!(
713    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
714    ///         let mut result = Vec::<i32>::new();
715    ///         for arg in args.read().await.iter() {
716    ///             result.push(*arg);
717    ///         }
718    ///         Ok(result)
719    ///     },
720    ///     vec![5]);
721    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
722    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
723    /// ```
724    ///
725    /// ### With Async Function Body and No Args
726    /// ```
727    ///     use rumtk_core::{rumtk_exec_task};
728    ///     use rumtk_core::core::RUMResult;
729    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
730    ///
731    ///     let result = rumtk_exec_task!(
732    ///     async || -> RUMResult<Vec<i32>> {
733    ///         let mut result = Vec::<i32>::new();
734    ///         Ok(result)
735    ///     });
736    ///     let empty = Vec::<i32>::new();
737    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
738    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
739    /// ```
740    ///
741    /// ## Equivalent To
742    ///
743    /// ```
744    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
745    ///     use rumtk_core::core::RUMResult;
746    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
747    ///
748    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
749    ///         let mut result = Vec::<i32>::new();
750    ///         for arg in args.read().await.iter() {
751    ///             result.push(*arg);
752    ///         }
753    ///         Ok(result)
754    ///     }
755    ///
756    ///     let rt = rumtk_init_threads!();
757    ///     let args = rumtk_create_task_args!(1);
758    ///     let task = rumtk_create_task!(test, args);
759    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
760    /// ```
761    ///
762    #[macro_export]
763    macro_rules! rumtk_exec_task {
764        ($func:expr ) => {{
765            use tokio::sync::RwLock;
766            use $crate::{
767                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
768            };
769            let rt = rumtk_init_threads!();
770            let task = rumtk_create_task!($func);
771            rumtk_resolve_task!(&rt, task)
772        }};
773        ($func:expr, $args:expr ) => {{
774            use tokio::sync::RwLock;
775            use $crate::{
776                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
777            };
778            let rt = rumtk_init_threads!();
779            let args = SafeTaskArgs::new(RwLock::new($args));
780            let task = rumtk_create_task!($func, args);
781            rumtk_resolve_task!(&rt, task)
782        }};
783        ($func:expr, $args:expr , $threads:expr ) => {{
784            use tokio::sync::RwLock;
785            use $crate::{
786                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
787            };
788            let rt = rumtk_init_threads!(&$threads);
789            let args = SafeTaskArgs::new(RwLock::new($args));
790            let task = rumtk_create_task!($func, args);
791            rumtk_resolve_task!(&rt, task)
792        }};
793    }
794
795    ///
796    /// Sleep a duration of time in a sync context, so no await can be call on the result.
797    ///
798    /// You can pass any value that can be cast to f32.
799    ///
800    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
801    ///
802    /// ## Examples
803    ///
804    /// ```
805    ///     use rumtk_core::rumtk_sleep;
806    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
807    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
808    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
809    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
810    /// ```
811    ///
812    #[macro_export]
813    macro_rules! rumtk_sleep {
814        ( $dur:expr) => {{
815            use $crate::threading::threading_functions::sleep;
816            sleep($dur as f32)
817        }};
818    }
819
820    ///
821    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
822    ///
823    /// You can pass any value that can be cast to f32.
824    ///
825    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
826    ///
827    /// ## Examples
828    ///
829    /// ```
830    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
831    ///     use rumtk_core::core::RUMResult;
832    ///     rumtk_exec_task!( async || -> RUMResult<()> {
833    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
834    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
835    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
836    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
837    ///             Ok(())
838    ///         }
839    ///     );
840    /// ```
841    ///
842    #[macro_export]
843    macro_rules! rumtk_async_sleep {
844        ( $dur:expr) => {{
845            use $crate::threading::threading_functions::async_sleep;
846            async_sleep($dur as f32)
847        }};
848    }
849
850    ///
851    ///
852    ///
853    #[macro_export]
854    macro_rules! rumtk_new_task_queue {
855        ( $worker_num:expr ) => {{
856            use $crate::threading::threading_manager::TaskManager;
857            TaskManager::new($worker_num);
858        }};
859    }
860}