Skip to main content

rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::core::RUMResult;
27    use crate::strings::rumtk_format;
28    pub use std::sync::Mutex as SyncMutex;
29    pub use std::sync::MutexGuard as SyncMutexGuard;
30    use std::sync::OnceLock;
31    pub use std::sync::RwLock as SyncRwLock;
32    pub use tokio::io;
33    pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
34    use tokio::runtime::Runtime as TokioRuntime;
35    pub use tokio::sync::{
36        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock,
37        RwLockReadGuard as AsyncRwLockReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
38    };
39
40    pub type RuntimeGuard<'a> = SyncMutexGuard<'a, TokioRuntime>;
41    /**************************** Types ***************************************/
42    pub type SafeTokioRuntime = OnceLock<SyncMutex<TokioRuntime>>;
43    /**************************** Globals **************************************/
44    static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
45    /**************************** Helpers ***************************************/
46    pub fn init_runtime(workers: usize) -> RUMResult<RuntimeGuard<'static>> {
47        unsafe {
48            let handle = DEFAULT_RUNTIME.get_or_init(|| {
49                let mut builder = tokio::runtime::Builder::new_multi_thread();
50                builder.worker_threads(workers);
51                builder.enable_all();
52                match builder.build() {
53                    Ok(handle) => SyncMutex::new(handle),
54                    Err(e) => panic!(
55                        "Unable to initialize threading tokio runtime because {}!",
56                        &e
57                    ),
58                }
59            });
60            match handle.lock() {
61                Ok(guard) => Ok(guard),
62                Err(e) => Err(rumtk_format!("Unable to lock tokio runtime!")),
63            }
64        }
65    }
66}
67
68pub mod threading_manager {
69    use crate::core::{RUMResult, RUMVec};
70    use crate::strings::rumtk_format;
71    use crate::threading::threading_functions::{async_sleep, sleep};
72    use crate::types::{RUMHashMap, RUMID};
73    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
74    use std::future::Future;
75    use std::sync::Arc;
76    pub use std::sync::RwLock as SyncRwLock;
77    use tokio::sync::RwLock as AsyncRwLock;
78    use tokio::task::JoinHandle;
79
80    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
81    const DEFAULT_TASK_CAPACITY: usize = 100;
82
83    pub type AsyncHandle<T> = JoinHandle<T>;
84    pub type TaskItems<T> = RUMVec<T>;
85    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
86    pub type TaskArgs<T> = TaskItems<T>;
87    /// Function signature defining the interface of task processing logic.
88    pub type SafeTaskArgs<T> = Arc<AsyncRwLock<TaskItems<T>>>;
89    pub type AsyncTaskHandle<R> = AsyncHandle<TaskResult<R>>;
90    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
91    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
92    pub type TaskID = RUMID;
93
94    #[derive(Debug, Clone, Default)]
95    pub struct Task<R> {
96        pub id: TaskID,
97        pub finished: bool,
98        pub result: Option<R>,
99    }
100
101    pub type SafeTask<R> = Arc<Task<R>>;
102    type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
103    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
104    pub type SafeAsyncTaskTable<R> = Arc<AsyncRwLock<TaskTable<R>>>;
105    pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
106    pub type TaskBatch = RUMVec<TaskID>;
107    /// Type to use to define how task results are expected to be returned.
108    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
109    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
110
111    ///
112    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
113    /// gives the multithreading, asynchronous superpowers to synchronous logic.
114    ///
115    /// ## Example Usage
116    ///
117    /// ```
118    /// use std::sync::{Arc};
119    /// use tokio::sync::RwLock as AsyncRwLock;
120    /// use rumtk_core::core::RUMResult;
121    /// use rumtk_core::strings::RUMString;
122    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
123    /// use rumtk_core::{rumtk_create_task, };
124    ///
125    /// let expected = vec![
126    ///     RUMString::from("Hello"),
127    ///     RUMString::from("World!"),
128    ///     RUMString::from("Overcast"),
129    ///     RUMString::from("and"),
130    ///     RUMString::from("Sad"),
131    ///  ];
132    ///
133    /// type TestResult = RUMResult<Vec<RUMString>>;
134    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
135    ///
136    /// let locked_args = AsyncRwLock::new(expected.clone());
137    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
138    /// let processor = rumtk_create_task!(
139    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
140    ///         let owned_args = Arc::clone(args);
141    ///         let locked_args = owned_args.read().await;
142    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
143    ///
144    ///         for arg in locked_args.iter() {
145    ///             results.push(RUMString::new(arg));
146    ///         }
147    ///
148    ///         Ok(results)
149    ///     },
150    ///     task_args
151    /// );
152    ///
153    /// queue.add_task::<_>(processor);
154    /// let results = queue.wait();
155    ///
156    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
157    /// for r in results {
158    ///     for v in r.unwrap().result.clone().unwrap().iter() {
159    ///         for value in v.iter() {
160    ///             result_data.push(value.clone());
161    ///         }
162    ///     }
163    ///  }
164    ///
165    /// assert_eq!(result_data, expected, "Results do not match expected!");
166    ///
167    /// ```
168    ///
169    #[derive(Debug, Clone, Default)]
170    pub struct TaskManager<R> {
171        tasks: SafeSyncTaskTable<R>,
172        workers: usize,
173    }
174
175    impl<R> TaskManager<R>
176    where
177        R: Sync + Send + Clone + 'static,
178    {
179        ///
180        /// This method creates a [`TaskManager`] instance using sensible defaults.
181        ///
182        /// The `threads` field is computed from the number of cores present in system.
183        ///
184        pub fn default() -> RUMResult<TaskManager<R>> {
185            Self::new(&threading::threading_functions::get_default_system_thread_count())
186        }
187
188        ///
189        /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
190        /// Expects you to provide the count of threads to spawn and the microtask queue size
191        /// allocated by each thread.
192        ///
193        /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
194        /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
195        ///
196        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
197            let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
198                DEFAULT_TASK_CAPACITY,
199            )));
200            Ok(TaskManager::<R> {
201                tasks,
202                workers: worker_num.to_owned(),
203            })
204        }
205
206        ///
207        /// Add a task to the processing queue. The idea is that you can queue a processor function
208        /// and list of args that will be picked up by one of the threads for processing.
209        ///
210        /// This is the async counterpart
211        ///
212        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
213        where
214            F: Future<Output = R> + Send + Sync + 'static,
215            F::Output: Send + 'static,
216        {
217            let id = TaskID::new_v4();
218            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
219        }
220
221        ///
222        /// See [`Self::add_task_async`]
223        ///
224        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
225        /// the tokio runtim if trying to add task to queue from a normal function called from an
226        /// async environment.
227        ///
228        /// ## Example
229        ///
230        /// ```
231        /// use rumtk_core::threading::threading_manager::{TaskManager};
232        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
233        /// use std::sync::{Arc};
234        /// use tokio::sync::Mutex;
235        ///
236        /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
237        ///
238        /// async fn called_fn() -> usize {
239        ///     5
240        /// }
241        ///
242        /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
243        ///     manager.spawn_task(called_fn());
244        ///     1
245        /// }
246        ///
247        /// async fn call_sync_fn(mut manager: JobManager) -> usize {
248        ///     let mut owned = manager.lock().await;
249        ///     push_job(&mut owned)
250        /// }
251        ///
252        /// let workers = 5;
253        /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
254        ///
255        /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
256        ///
257        /// let result_raw = manager.blocking_lock().wait();
258        ///
259        /// ```
260        ///
261        pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
262        where
263            F: Future<Output = R> + Send + Sync + 'static,
264            F::Output: Send + Sized + 'static,
265        {
266            let id = TaskID::new_v4();
267            let rt = rumtk_init_threads!(self.workers)?;
268            rumtk_spawn_task!(
269                rt,
270                Self::_add_task_async(id.clone(), self.tasks.clone(), task)
271            );
272            Ok(id)
273        }
274
275        ///
276        /// See [add_task_async](Self::add_task_async)
277        ///
278        pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
279        where
280            F: Future<Output = R> + Send + Sync + 'static,
281            F::Output: Send + Sized + 'static,
282        {
283            let id = TaskID::new_v4();
284            let tasks = self.tasks.clone();
285            rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task))
286        }
287
288        async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
289        where
290            F: Future<Output = R> + Send + Sync + 'static,
291            F::Output: Send + Sized + 'static,
292        {
293            let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
294                id: id.clone(),
295                finished: false,
296                result: None,
297            }));
298            tasks.write().unwrap().insert(id.clone(), safe_task.clone());
299
300            let task_wrapper = async move || {
301                // Run the task
302                let result = task.await;
303
304                // Cleanup task
305                let mut lock = safe_task.write().unwrap();
306                lock.result = Some(result);
307                lock.finished = true;
308            };
309
310            tokio::spawn(task_wrapper());
311
312            id
313        }
314
315        ///
316        /// See [wait_async](Self::wait_async)
317        ///
318        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
319        /// this function happens to be called from the async context.
320        ///
321        pub fn wait(&mut self) -> TaskResults<R> {
322            let task_batch = self
323                .tasks
324                .read()
325                .unwrap()
326                .keys()
327                .cloned()
328                .collect::<Vec<_>>();
329            self.wait_on_batch(&task_batch)
330        }
331
332        ///
333        /// See [wait_on_batch_async](Self::wait_on_batch_async)
334        ///
335        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
336        /// this function happens to be called from the async context.
337        ///
338        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
339            let mut results = TaskResults::<R>::default();
340            for task in tasks {
341                results.push(self.wait_on(task));
342            }
343            results
344        }
345
346        ///
347        /// See [wait_on_async](Self::wait_on_async)
348        ///
349        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
350        /// this function happens to be called from the async context.
351        ///
352        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
353            let task = match self.tasks.write().unwrap().remove(task_id) {
354                Some(task) => task.clone(),
355                None => return Err(rumtk_format!("No task with id {}", task_id)),
356            };
357
358            while !task.read().unwrap().finished {
359                sleep(DEFAULT_SLEEP_DURATION);
360            }
361
362            let x = Ok(Arc::new(task.write().unwrap().clone()));
363            x
364        }
365
366        ///
367        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
368        ///
369        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
370        ///
371        /// Upon completion,
372        ///
373        /// 2. Return the result ([TaskResults<R>](TaskResults)).
374        ///
375        /// This operation consumes the task.
376        ///
377        /// ### Note:
378        /// ```text
379        ///     Results returned here are not guaranteed to be in the same order as the order in which
380        ///     the tasks were queued for work. You will need to pass a type as T that automatically
381        ///     tracks its own id or has a way for you to resort results.
382        /// ```
383        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
384            let task = match self.tasks.write().unwrap().remove(task_id) {
385                Some(task) => task.clone(),
386                None => return Err(rumtk_format!("No task with id {}", task_id)),
387            };
388
389            while !task.read().unwrap().finished {
390                async_sleep(DEFAULT_SLEEP_DURATION).await;
391            }
392
393            let x = Ok(Arc::new(task.write().unwrap().clone()));
394            x
395        }
396
397        ///
398        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
399        ///
400        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
401        ///
402        /// Upon completion,
403        ///
404        /// 1. We collect the results generated (if any).
405        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
406        ///
407        /// ### Note:
408        /// ```text
409        ///     Results returned here are not guaranteed to be in the same order as the order in which
410        ///     the tasks were queued for work. You will need to pass a type as T that automatically
411        ///     tracks its own id or has a way for you to resort results.
412        /// ```
413        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
414            let mut results = TaskResults::<R>::default();
415            for task in tasks {
416                results.push(self.wait_on_async(task).await);
417            }
418            results
419        }
420
421        ///
422        /// This method waits until all queued tasks have been processed from the main queue.
423        ///
424        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
425        ///
426        /// Upon completion,
427        ///
428        /// 1. We collect the results generated (if any).
429        /// 2. We reset the main task and result internal queue states.
430        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
431        ///
432        /// This operation consumes all the tasks.
433        ///
434        /// ### Note:
435        /// ```text
436        ///     Results returned here are not guaranteed to be in the same order as the order in which
437        ///     the tasks were queued for work. You will need to pass a type as T that automatically
438        ///     tracks its own id or has a way for you to resort results.
439        /// ```
440        pub async fn wait_async(&mut self) -> TaskResults<R> {
441            let task_batch = self
442                .tasks
443                .read()
444                .unwrap()
445                .keys()
446                .cloned()
447                .collect::<Vec<_>>();
448            self.wait_on_batch_async(&task_batch).await
449        }
450
451        ///
452        /// Check if all work has been completed from the task queue.
453        ///
454        /// ## Examples
455        ///
456        /// ### Sync Usage
457        ///
458        ///```
459        /// use rumtk_core::threading::threading_manager::TaskManager;
460        ///
461        /// let manager = TaskManager::<usize>::new(&4).unwrap();
462        ///
463        /// let all_done = manager.is_all_completed();
464        ///
465        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
466        ///
467        /// ```
468        ///
469        pub fn is_all_completed(&self) -> bool {
470            self._is_all_completed_async()
471        }
472
473        pub async fn is_all_completed_async(&self) -> bool {
474            self._is_all_completed_async()
475        }
476
477        fn _is_all_completed_async(&self) -> bool {
478            for (_, task) in self.tasks.read().unwrap().iter() {
479                if !task.read().unwrap().finished {
480                    return false;
481                }
482            }
483
484            true
485        }
486
487        ///
488        /// Check if a task completed
489        ///
490        pub fn is_finished(&self, id: &TaskID) -> bool {
491            match self.tasks.read().unwrap().get(id) {
492                Some(t) => t.read().unwrap().finished,
493                None => false,
494            }
495        }
496
497        pub async fn is_finished_async(&self, id: &TaskID) -> bool {
498            match self.tasks.read().unwrap().get(id) {
499                Some(task) => task.read().unwrap().finished,
500                None => true,
501            }
502        }
503
504        ///
505        /// Alias for [wait](TaskManager::wait).
506        ///
507        fn gather(&mut self) -> TaskResults<R> {
508            self.wait()
509        }
510    }
511}
512
513///
514/// This module contains a few helper.
515///
516/// For example, you can find a function for determining number of threads available in system.
517/// The sleep family of functions are also here.
518///
519pub mod threading_functions {
520    use crate::core::RUMResult;
521    use crate::rumtk_sleep;
522    use crate::strings::rumtk_format;
523    pub use crate::threading::thread_primitives::init_runtime;
524    use num_cpus;
525    use std::future::Future;
526    use std::thread::{available_parallelism, sleep as std_sleep};
527    use std::time::Duration;
528    use tokio::time::sleep as tokio_sleep;
529
530    pub const NANOS_PER_SEC: u64 = 1000000000;
531    pub const MILLIS_PER_SEC: u64 = 1000;
532    pub const MICROS_PER_SEC: u64 = 1000000;
533
534    const DEFAULT_SLEEP_DURATION: f32 = 0.001;
535
536    pub fn get_default_system_thread_count() -> usize {
537        let cpus: usize = num_cpus::get();
538        let parallelism = match available_parallelism() {
539            Ok(n) => n.get(),
540            Err(_) => 0,
541        };
542
543        if parallelism >= cpus {
544            parallelism
545        } else {
546            cpus
547        }
548    }
549
550    pub fn sleep(s: f32) {
551        let ns = s * NANOS_PER_SEC as f32;
552        let rounded_ns = ns.round() as u64;
553        let duration = Duration::from_nanos(rounded_ns);
554        std_sleep(duration);
555    }
556
557    pub async fn async_sleep(s: f32) {
558        let ns = s * NANOS_PER_SEC as f32;
559        let rounded_ns = ns.round() as u64;
560        let duration = Duration::from_nanos(rounded_ns);
561        tokio_sleep(duration).await;
562    }
563
564    ///
565    /// Given a closure task, push it onto the current `tokio` runtime for execution.
566    /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
567    /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
568    /// result.
569    ///
570    /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
571    ///
572    /// ## Example
573    ///
574    /// ```
575    /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
576    ///
577    /// const Hello: &str = "World!";
578    ///
579    /// init_runtime(5);
580    ///
581    /// let result = block_on_task(async {
582    ///     Hello
583    /// }).unwrap();
584    ///
585    /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
586    /// ```
587    ///
588    pub fn block_on_task<R, F>(task: F) -> RUMResult<R>
589    where
590        F: Future<Output = R> + Send + 'static,
591        F::Output: Send + 'static,
592    {
593        let default_system_thread_count = get_default_system_thread_count();
594
595        let handle = init_runtime(default_system_thread_count)
596            .expect("Failed to initialize runtime")
597            .spawn(task);
598
599        while !handle.is_finished() {
600            rumtk_sleep!(DEFAULT_SLEEP_DURATION);
601        }
602
603        tokio::task::block_in_place(move || {
604            match init_runtime(default_system_thread_count)
605                .expect("Failed to initialize runtime")
606                .block_on(handle)
607            {
608                Ok(r) => Ok(r),
609                Err(e) => Err(rumtk_format!(
610                    "Issue peeking into task in the runtime because => {}",
611                    &e
612                )),
613            }
614        })
615    }
616}
617
618///
619/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
620/// This means that by default, all jobs sent to the thread pool have to be async in nature.
621/// These macros make handling of these jobs at the sync/async boundary more convenient.
622///
623pub mod threading_macros {
624    use crate::threading::thread_primitives;
625    use crate::threading::threading_manager::SafeTaskArgs;
626
627    ///
628    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
629    /// will be saved to the global context so the next call to this macro will simply grab a
630    /// reference to the previously initialized runtime.
631    ///
632    /// Passing nothing will default to initializing a runtime using the default number of threads
633    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
634    ///
635    /// Passing `threads` number will yield a runtime that allocates that many threads.
636    ///
637    ///
638    /// ## Examples
639    ///
640    /// ```
641    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
642    ///     use rumtk_core::core::RUMResult;
643    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
644    ///
645    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
646    ///         let mut result = Vec::<i32>::new();
647    ///         for arg in args.read().await.iter() {
648    ///             result.push(*arg);
649    ///         }
650    ///         Ok(result)
651    ///     }
652    ///
653    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
654    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
655    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task)); // Spawn's task and waits for it to conclude.
656    /// ```
657    ///
658    /// ```
659    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
660    ///     use rumtk_core::core::RUMResult;
661    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
662    ///
663    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
664    ///         let mut result = Vec::<i32>::new();
665    ///         for arg in args.read().await.iter() {
666    ///             result.push(*arg);
667    ///         }
668    ///         Ok(result)
669    ///     }
670    ///
671    ///     let thread_count: usize = 10;
672    ///     let args = rumtk_create_task_args!(1);
673    ///     let task = rumtk_create_task!(test, args);
674    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
675    /// ```
676    #[macro_export]
677    macro_rules! rumtk_init_threads {
678        ( ) => {{
679            use $crate::threading::thread_primitives::init_runtime;
680            use $crate::threading::threading_functions::get_default_system_thread_count;
681            init_runtime(get_default_system_thread_count())
682        }};
683        ( $threads:expr ) => {{
684            use $crate::rumtk_cache_fetch;
685            use $crate::threading::thread_primitives::init_runtime;
686            init_runtime($threads)
687        }};
688    }
689
690    ///
691    /// Puts task onto the runtime queue.
692    ///
693    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
694    ///
695    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
696    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
697    ///
698    #[macro_export]
699    macro_rules! rumtk_spawn_task {
700        ( $func:expr ) => {{
701            let rt = rumtk_init_threads!().expect("Runtime is not initialized!");
702            rt.spawn($func)
703        }};
704        ( $rt:expr, $func:expr ) => {{
705            $rt.spawn($func)
706        }};
707    }
708
709    ///
710    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
711    ///
712    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
713    /// async closure without passing any arguments.
714    ///
715    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
716    /// In such a case, we pass those arguments to the call on the async closure and await on results.
717    ///
718    #[macro_export]
719    macro_rules! rumtk_wait_on_task {
720        ( $func:expr ) => {{
721            use $crate::threading::threading_functions::block_on_task;
722            block_on_task(async move {
723                $func().await
724            })
725        }};
726        ( $func:expr, $($arg_items:expr),+ ) => {{
727            use $crate::threading::threading_functions::block_on_task;
728            block_on_task(async move {
729                $func($($arg_items),+).await
730            })
731        }};
732    }
733
734    ///
735    /// This macro awaits a future.
736    ///
737    /// The arguments are a reference to the runtime (`rt) and a future.
738    ///
739    /// If there is a result, you will get the result of the future.
740    ///
741    /// ## Examples
742    ///
743    /// ```
744    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
745    ///     use rumtk_core::core::RUMResult;
746    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
747    ///
748    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
749    ///         let mut result = Vec::<i32>::new();
750    ///         for arg in args.read().await.iter() {
751    ///             result.push(*arg);
752    ///         }
753    ///         Ok(result)
754    ///     }
755    ///
756    ///     let args = rumtk_create_task_args!(1);
757    ///     let task = rumtk_create_task!(test, args);
758    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
759    /// ```
760    ///
761    #[macro_export]
762    macro_rules! rumtk_resolve_task {
763        ( $future:expr ) => {{
764            use $crate::threading::threading_functions::block_on_task;
765            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
766            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
767            // is technically moved into the async closure and captured so things like mutex guards
768            // technically go out of the outer scope. As a result that expression fails to compile even
769            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
770            // into the async closure. To ensure that happens regardless of given expression, we do
771            // a variable assignment below to force the "future" macro expressions to resolve before
772            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
773            //let future = $future;
774            block_on_task(async move { $future.await })
775        }};
776    }
777
778    #[macro_export]
779    macro_rules! rumtk_resolve_task_from_async {
780        ( $rt:expr, $future:expr ) => {{
781            let handle = $rt.spawn_blocking(async move { future.await })
782        }};
783    }
784
785    ///
786    /// This macro creates an async body that calls the async closure and awaits it.
787    ///
788    /// ## Example
789    ///
790    /// ```
791    /// use std::sync::{Arc, RwLock};
792    /// use tokio::sync::RwLock as AsyncRwLock;
793    /// use rumtk_core::strings::RUMString;
794    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
795    ///
796    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
797    /// let expected = vec![
798    ///     RUMString::from("Hello"),
799    ///     RUMString::from("World!"),
800    ///     RUMString::from("Overcast"),
801    ///     RUMString::from("and"),
802    ///     RUMString::from("Sad"),
803    ///  ];
804    /// let locked_args = AsyncRwLock::new(expected.clone());
805    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
806    ///
807    ///
808    /// ```
809    ///
810    #[macro_export]
811    macro_rules! rumtk_create_task {
812        ( $func:expr ) => {{
813            async move {
814                let f = $func;
815                f().await
816            }
817        }};
818        ( $func:expr, $args:expr ) => {{
819            let f = $func;
820            async move { f(&$args).await }
821        }};
822    }
823
824    ///
825    /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
826    ///
827    /// ## Note
828    ///
829    /// All arguments must be of the same type
830    ///
831    #[macro_export]
832    macro_rules! rumtk_create_task_args {
833        ( ) => {{
834            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
835            use $crate::threading::thread_primitives::AsyncRwLock;
836            SafeTaskArgs::new(AsyncRwLock::new(vec![]))
837        }};
838        ( $($args:expr),+ ) => {{
839            use $crate::threading::threading_manager::{SafeTaskArgs};
840            use $crate::threading::thread_primitives::AsyncRwLock;
841            SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
842        }};
843    }
844
845    ///
846    /// Convenience macro for packaging the task components and launching the task in one line.
847    ///
848    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
849    /// number of threads at the end. This is optional. Meaning, we will default to the system's
850    /// number of threads if that value is not specified.
851    ///
852    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
853    /// variable number of arguments to pass to the task. each argument must be of the same type.
854    /// If you wish to pass different arguments with different types, please define an abstract type
855    /// whose underlying structure is a tuple of items and pass that instead.
856    ///
857    /// ## Examples
858    ///
859    /// ### With Default Thread Count
860    /// ```
861    ///     use rumtk_core::{rumtk_exec_task};
862    ///     use rumtk_core::core::RUMResult;
863    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
864    ///
865    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
866    ///         let mut result = Vec::<i32>::new();
867    ///         for arg in args.read().await.iter() {
868    ///             result.push(*arg);
869    ///         }
870    ///         Ok(result)
871    ///     }
872    ///
873    ///     let result = rumtk_exec_task!(test, vec![5]).unwrap();
874    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
875    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
876    /// ```
877    ///
878    /// ### With Custom Thread Count
879    /// ```
880    ///     use rumtk_core::{rumtk_exec_task};
881    ///     use rumtk_core::core::RUMResult;
882    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
883    ///
884    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
885    ///         let mut result = Vec::<i32>::new();
886    ///         for arg in args.read().await.iter() {
887    ///             result.push(*arg);
888    ///         }
889    ///         Ok(result)
890    ///     }
891    ///
892    ///     let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
893    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
894    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
895    /// ```
896    ///
897    /// ### With Async Function Body
898    /// ```
899    ///     use rumtk_core::{rumtk_exec_task};
900    ///     use rumtk_core::core::RUMResult;
901    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
902    ///
903    ///     let result = rumtk_exec_task!(
904    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
905    ///         let mut result = Vec::<i32>::new();
906    ///         for arg in args.read().await.iter() {
907    ///             result.push(*arg);
908    ///         }
909    ///         Ok(result)
910    ///     },
911    ///     vec![5]).unwrap();
912    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
913    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
914    /// ```
915    ///
916    /// ### With Async Function Body and No Args
917    /// ```
918    ///     use rumtk_core::{rumtk_exec_task};
919    ///     use rumtk_core::core::RUMResult;
920    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
921    ///
922    ///     let result = rumtk_exec_task!(
923    ///     async || -> RUMResult<Vec<i32>> {
924    ///         let mut result = Vec::<i32>::new();
925    ///         Ok(result)
926    ///     }).unwrap();
927    ///     let empty = Vec::<i32>::new();
928    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
929    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
930    /// ```
931    ///
932    /// ## Equivalent To
933    ///
934    /// ```
935    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
936    ///     use rumtk_core::core::RUMResult;
937    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
938    ///
939    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
940    ///         let mut result = Vec::<i32>::new();
941    ///         for arg in args.read().await.iter() {
942    ///             result.push(*arg);
943    ///         }
944    ///         Ok(result)
945    ///     }
946    ///
947    ///     let args = rumtk_create_task_args!(1);
948    ///     let task = rumtk_create_task!(test, args);
949    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
950    /// ```
951    ///
952    #[macro_export]
953    macro_rules! rumtk_exec_task {
954        ($func:expr ) => {{
955            use $crate::{
956                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
957            };
958            let task = rumtk_create_task!($func);
959            rumtk_resolve_task!(task)
960        }};
961        ($func:expr, $args:expr ) => {{
962            use $crate::threading::threading_functions::get_default_system_thread_count;
963            rumtk_exec_task!($func, $args, get_default_system_thread_count())
964        }};
965        ($func:expr, $args:expr , $threads:expr ) => {{
966            use $crate::threading::thread_primitives::AsyncRwLock;
967            use $crate::{
968                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
969            };
970            let args = SafeTaskArgs::new(AsyncRwLock::new($args));
971            let task = rumtk_create_task!($func, args);
972            rumtk_resolve_task!(task)
973        }};
974    }
975
976    ///
977    /// Sleep a duration of time in a sync context, so no await can be call on the result.
978    ///
979    /// You can pass any value that can be cast to f32.
980    ///
981    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
982    ///
983    /// ## Examples
984    ///
985    /// ```
986    ///     use rumtk_core::rumtk_sleep;
987    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
988    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
989    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
990    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
991    /// ```
992    ///
993    #[macro_export]
994    macro_rules! rumtk_sleep {
995        ( $dur:expr) => {{
996            use $crate::threading::threading_functions::sleep;
997            sleep($dur as f32)
998        }};
999    }
1000
1001    ///
1002    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1003    ///
1004    /// You can pass any value that can be cast to f32.
1005    ///
1006    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1007    ///
1008    /// ## Examples
1009    ///
1010    /// ```
1011    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1012    ///     use rumtk_core::core::RUMResult;
1013    ///     rumtk_exec_task!( async || -> RUMResult<()> {
1014    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
1015    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
1016    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
1017    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1018    ///             Ok(())
1019    ///         }
1020    ///     );
1021    /// ```
1022    ///
1023    #[macro_export]
1024    macro_rules! rumtk_async_sleep {
1025        ( $dur:expr) => {{
1026            use $crate::threading::threading_functions::async_sleep;
1027            async_sleep($dur as f32)
1028        }};
1029    }
1030
1031    ///
1032    ///
1033    ///
1034    #[macro_export]
1035    macro_rules! rumtk_new_task_queue {
1036        ( $worker_num:expr ) => {{
1037            use $crate::threading::threading_manager::TaskManager;
1038            TaskManager::new($worker_num);
1039        }};
1040    }
1041}