Skip to main content

rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::cache::{new_cache, LazyRUMCache};
27    use std::sync::Arc;
28    pub use tokio::io;
29    pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
30    use tokio::runtime::Runtime as TokioRuntime;
31    pub use tokio::sync::{
32        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
33        RwLockWriteGuard,
34    };
35    /**************************** Globals **************************************/
36    pub static mut RT_CACHE: TokioRtCache = new_cache();
37    /**************************** Helpers ***************************************/
38    pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
39        let mut builder = tokio::runtime::Builder::new_multi_thread();
40        builder.worker_threads(*threads);
41        builder.enable_all();
42        match builder.build() {
43            Ok(handle) => Arc::new(handle),
44            Err(e) => panic!(
45                "Unable to initialize threading tokio runtime because {}!",
46                &e
47            ),
48        }
49    }
50
51    /**************************** Types ***************************************/
52    pub type SafeTokioRuntime = Arc<TokioRuntime>;
53    pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
54}
55
56pub mod threading_manager {
57    use crate::cache::LazyRUMCacheValue;
58    use crate::core::{RUMResult, RUMVec};
59    use crate::strings::rumtk_format;
60    use crate::threading::thread_primitives::SafeTokioRuntime;
61    use crate::threading::threading_functions::{async_sleep, sleep};
62    use crate::types::{RUMHashMap, RUMID};
63    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
64    use std::future::Future;
65    use std::sync::Arc;
66    pub use std::sync::RwLock as SyncRwLock;
67    use tokio::sync::RwLock as AsyncRwLock;
68    use tokio::task::JoinHandle;
69
70    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
71    const DEFAULT_TASK_CAPACITY: usize = 100;
72
73    pub type TaskItems<T> = RUMVec<T>;
74    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
75    pub type TaskArgs<T> = TaskItems<T>;
76    /// Function signature defining the interface of task processing logic.
77    pub type SafeTaskArgs<T> = Arc<AsyncRwLock<TaskItems<T>>>;
78    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
79    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
80    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
81    pub type TaskID = RUMID;
82
83    #[derive(Debug, Clone, Default)]
84    pub struct Task<R> {
85        pub id: TaskID,
86        pub finished: bool,
87        pub result: Option<R>,
88    }
89
90    pub type SafeTask<R> = Arc<Task<R>>;
91    type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
92    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
93    pub type SafeAsyncTaskTable<R> = Arc<AsyncRwLock<TaskTable<R>>>;
94    pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
95    pub type TaskBatch = RUMVec<TaskID>;
96    /// Type to use to define how task results are expected to be returned.
97    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
98    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
99    pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
100
101    ///
102    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
103    /// gives the multithreading, asynchronous superpowers to synchronous logic.
104    ///
105    /// ## Example Usage
106    ///
107    /// ```
108    /// use std::sync::{Arc};
109    /// use tokio::sync::RwLock as AsyncRwLock;
110    /// use rumtk_core::core::RUMResult;
111    /// use rumtk_core::strings::RUMString;
112    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
113    /// use rumtk_core::{rumtk_create_task, };
114    ///
115    /// let expected = vec![
116    ///     RUMString::from("Hello"),
117    ///     RUMString::from("World!"),
118    ///     RUMString::from("Overcast"),
119    ///     RUMString::from("and"),
120    ///     RUMString::from("Sad"),
121    ///  ];
122    ///
123    /// type TestResult = RUMResult<Vec<RUMString>>;
124    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
125    ///
126    /// let locked_args = AsyncRwLock::new(expected.clone());
127    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
128    /// let processor = rumtk_create_task!(
129    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
130    ///         let owned_args = Arc::clone(args);
131    ///         let locked_args = owned_args.read().await;
132    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
133    ///
134    ///         for arg in locked_args.iter() {
135    ///             results.push(RUMString::new(arg));
136    ///         }
137    ///
138    ///         Ok(results)
139    ///     },
140    ///     task_args
141    /// );
142    ///
143    /// queue.add_task::<_>(processor);
144    /// let results = queue.wait();
145    ///
146    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
147    /// for r in results {
148    ///     for v in r.unwrap().result.clone().unwrap().iter() {
149    ///         for value in v.iter() {
150    ///             result_data.push(value.clone());
151    ///         }
152    ///     }
153    ///  }
154    ///
155    /// assert_eq!(result_data, expected, "Results do not match expected!");
156    ///
157    /// ```
158    ///
159    #[derive(Debug, Clone, Default)]
160    pub struct TaskManager<R> {
161        tasks: SafeSyncTaskTable<R>,
162        workers: usize,
163    }
164
165    impl<R> TaskManager<R>
166    where
167        R: Sync + Send + Clone + 'static,
168    {
169        ///
170        /// This method creates a [`TaskQueue`] instance using sensible defaults.
171        ///
172        /// The `threads` field is computed from the number of cores present in system.
173        ///
174        pub fn default() -> RUMResult<TaskManager<R>> {
175            Self::new(&threading::threading_functions::get_default_system_thread_count())
176        }
177
178        ///
179        /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
180        /// Expects you to provide the count of threads to spawn and the microtask queue size
181        /// allocated by each thread.
182        ///
183        /// This method calls [`Self::with_capacity()`] for the actual object creation.
184        /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
185        ///
186        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
187            let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
188                DEFAULT_TASK_CAPACITY,
189            )));
190            Ok(TaskManager::<R> {
191                tasks,
192                workers: worker_num.to_owned(),
193            })
194        }
195
196        ///
197        /// Add a task to the processing queue. The idea is that you can queue a processor function
198        /// and list of args that will be picked up by one of the threads for processing.
199        ///
200        /// This is the async counterpart
201        ///
202        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
203        where
204            F: Future<Output = R> + Send + Sync + 'static,
205            F::Output: Send + Sized + 'static,
206        {
207            let id = TaskID::new_v4();
208            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
209        }
210
211        ///
212        /// See [add_task](Self::add_task)
213        ///
214        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
215        /// the tokio runtim if trying to add task to queue from a normal function called from an
216        /// async environment.
217        ///
218        /// ## Example
219        ///
220        /// ```
221        /// use rumtk_core::threading::threading_manager::{TaskManager};
222        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
223        /// use std::sync::{Arc};
224        /// use tokio::sync::Mutex;
225        ///
226        /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
227        ///
228        /// async fn called_fn() -> usize {
229        ///     5
230        /// }
231        ///
232        /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
233        ///     manager.spawn_task(called_fn());
234        ///     1
235        /// }
236        ///
237        /// async fn call_sync_fn(mut manager: JobManager) -> usize {
238        ///     let mut owned = manager.lock().await;
239        ///     push_job(&mut owned)
240        /// }
241        ///
242        /// let workers = 5;
243        /// let rt = rumtk_init_threads!(&workers);
244        /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
245        ///
246        /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
247        ///
248        /// let result_raw = manager.blocking_lock().wait();
249        ///
250        /// ```
251        ///
252        pub fn spawn_task<F>(&mut self, task: F) -> TaskID
253        where
254            F: Future<Output = R> + Send + Sync + 'static,
255            F::Output: Send + Sized + 'static,
256        {
257            let id = TaskID::new_v4();
258            let rt = rumtk_init_threads!(&self.workers);
259            rumtk_spawn_task!(
260                rt,
261                Self::_add_task_async(id.clone(), self.tasks.clone(), task)
262            );
263            id
264        }
265
266        ///
267        /// See [add_task](Self::add_task)
268        ///
269        pub fn add_task<F>(&mut self, task: F) -> TaskID
270        where
271            F: Future<Output = R> + Send + Sync + 'static,
272            F::Output: Send + Sized + 'static,
273        {
274            let rt = rumtk_init_threads!(&self.workers);
275            rumtk_resolve_task!(rt, self.add_task_async(task))
276        }
277
278        async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
279        where
280            F: Future<Output = R> + Send + Sync + 'static,
281            F::Output: Send + Sized + 'static,
282        {
283            let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
284                id: id.clone(),
285                finished: false,
286                result: None,
287            }));
288            tasks.write().unwrap().insert(id.clone(), safe_task.clone());
289
290            let task_wrapper = async move || {
291                // Run the task
292                let result = task.await;
293
294                // Cleanup task
295                let mut lock = safe_task.write().unwrap();
296                lock.result = Some(result);
297                lock.finished = true;
298            };
299
300            tokio::spawn(task_wrapper());
301
302            id
303        }
304
305        ///
306        /// See [wait_async](Self::wait_async)
307        ///
308        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
309        /// this function happens to be called from the async context.
310        ///
311        pub fn wait(&mut self) -> TaskResults<R> {
312            let task_batch = self
313                .tasks
314                .read()
315                .unwrap()
316                .keys()
317                .cloned()
318                .collect::<Vec<_>>();
319            self.wait_on_batch(&task_batch)
320        }
321
322        ///
323        /// See [wait_on_batch_async](Self::wait_on_batch_async)
324        ///
325        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
326        /// this function happens to be called from the async context.
327        ///
328        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
329            let mut results = TaskResults::<R>::default();
330            for task in tasks {
331                results.push(self.wait_on(task));
332            }
333            results
334        }
335
336        ///
337        /// See [wait_on_async](Self::wait_on_async)
338        ///
339        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
340        /// this function happens to be called from the async context.
341        ///
342        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
343            let task = match self.tasks.write().unwrap().remove(task_id) {
344                Some(task) => task.clone(),
345                None => return Err(rumtk_format!("No task with id {}", task_id)),
346            };
347
348            while !task.read().unwrap().finished {
349                sleep(DEFAULT_SLEEP_DURATION);
350            }
351
352            let x = Ok(Arc::new(task.write().unwrap().clone()));
353            x
354        }
355
356        ///
357        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
358        ///
359        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
360        ///
361        /// Upon completion,
362        ///
363        /// 2. Return the result ([TaskResults<R>](TaskResults)).
364        ///
365        /// This operation consumes the task.
366        ///
367        /// ### Note:
368        /// ```text
369        ///     Results returned here are not guaranteed to be in the same order as the order in which
370        ///     the tasks were queued for work. You will need to pass a type as T that automatically
371        ///     tracks its own id or has a way for you to resort results.
372        /// ```
373        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
374            let task = match self.tasks.write().unwrap().remove(task_id) {
375                Some(task) => task.clone(),
376                None => return Err(rumtk_format!("No task with id {}", task_id)),
377            };
378
379            while !task.read().unwrap().finished {
380                async_sleep(DEFAULT_SLEEP_DURATION).await;
381            }
382
383            let x = Ok(Arc::new(task.write().unwrap().clone()));
384            x
385        }
386
387        ///
388        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
389        ///
390        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
391        ///
392        /// Upon completion,
393        ///
394        /// 1. We collect the results generated (if any).
395        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
396        ///
397        /// ### Note:
398        /// ```text
399        ///     Results returned here are not guaranteed to be in the same order as the order in which
400        ///     the tasks were queued for work. You will need to pass a type as T that automatically
401        ///     tracks its own id or has a way for you to resort results.
402        /// ```
403        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
404            let mut results = TaskResults::<R>::default();
405            for task in tasks {
406                results.push(self.wait_on_async(task).await);
407            }
408            results
409        }
410
411        ///
412        /// This method waits until all queued tasks have been processed from the main queue.
413        ///
414        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
415        ///
416        /// Upon completion,
417        ///
418        /// 1. We collect the results generated (if any).
419        /// 2. We reset the main task and result internal queue states.
420        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
421        ///
422        /// This operation consumes all the tasks.
423        ///
424        /// ### Note:
425        /// ```text
426        ///     Results returned here are not guaranteed to be in the same order as the order in which
427        ///     the tasks were queued for work. You will need to pass a type as T that automatically
428        ///     tracks its own id or has a way for you to resort results.
429        /// ```
430        pub async fn wait_async(&mut self) -> TaskResults<R> {
431            let task_batch = self
432                .tasks
433                .read()
434                .unwrap()
435                .keys()
436                .cloned()
437                .collect::<Vec<_>>();
438            self.wait_on_batch_async(&task_batch).await
439        }
440
441        ///
442        /// Check if all work has been completed from the task queue.
443        ///
444        /// ## Examples
445        ///
446        /// ### Sync Usage
447        ///
448        ///```
449        /// use rumtk_core::threading::threading_manager::TaskManager;
450        ///
451        /// let manager = TaskManager::<usize>::new(&4).unwrap();
452        ///
453        /// let all_done = manager.is_all_completed();
454        ///
455        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
456        ///
457        /// ```
458        ///
459        pub fn is_all_completed(&self) -> bool {
460            let rt = rumtk_init_threads!(&self.workers);
461            rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
462        }
463
464        pub async fn is_all_completed_async(&self) -> bool {
465            for (_, task) in self.tasks.read().unwrap().iter() {
466                if !task.read().unwrap().finished {
467                    return false;
468                }
469            }
470
471            true
472        }
473
474        ///
475        /// Check if a task completed
476        ///
477        pub fn is_finished(&self, id: &TaskID) -> bool {
478            match self.tasks.read().unwrap().get(id) {
479                Some(t) => t.read().unwrap().finished,
480                None => false,
481            }
482        }
483
484        pub async fn is_finished_async(&self, id: &TaskID) -> bool {
485            match self.tasks.read().unwrap().get(id) {
486                Some(task) => task.read().unwrap().finished,
487                None => true,
488            }
489        }
490
491        ///
492        /// Alias for [wait](TaskManager::wait).
493        ///
494        fn gather(&mut self) -> TaskResults<R> {
495            self.wait()
496        }
497    }
498}
499
500///
501/// This module contains a few helper.
502///
503/// For example, you can find a function for determining number of threads available in system.
504/// The sleep family of functions are also here.
505///
506pub mod threading_functions {
507    use num_cpus;
508    use std::thread::{available_parallelism, sleep as std_sleep};
509    use std::time::Duration;
510    use tokio::time::sleep as tokio_sleep;
511
512    pub const NANOS_PER_SEC: u64 = 1000000000;
513    pub const MILLIS_PER_SEC: u64 = 1000;
514    pub const MICROS_PER_SEC: u64 = 1000000;
515
516    pub fn get_default_system_thread_count() -> usize {
517        let cpus: usize = num_cpus::get();
518        let parallelism = match available_parallelism() {
519            Ok(n) => n.get(),
520            Err(_) => 0,
521        };
522
523        if parallelism >= cpus {
524            parallelism
525        } else {
526            cpus
527        }
528    }
529
530    pub fn sleep(s: f32) {
531        let ns = s * NANOS_PER_SEC as f32;
532        let rounded_ns = ns.round() as u64;
533        let duration = Duration::from_nanos(rounded_ns);
534        std_sleep(duration);
535    }
536
537    pub async fn async_sleep(s: f32) {
538        let ns = s * NANOS_PER_SEC as f32;
539        let rounded_ns = ns.round() as u64;
540        let duration = Duration::from_nanos(rounded_ns);
541        tokio_sleep(duration).await;
542    }
543}
544
545///
546/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
547/// This means that by default, all jobs sent to the thread pool have to be async in nature.
548/// These macros make handling of these jobs at the sync/async boundary more convenient.
549///
550pub mod threading_macros {
551    use crate::threading::thread_primitives;
552    use crate::threading::threading_manager::SafeTaskArgs;
553
554    ///
555    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
556    /// will be saved to the global context so the next call to this macro will simply grab a
557    /// reference to the previously initialized runtime.
558    ///
559    /// Passing nothing will default to initializing a runtime using the default number of threads
560    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
561    ///
562    /// Passing `threads` number will yield a runtime that allocates that many threads.
563    ///
564    ///
565    /// ## Examples
566    ///
567    /// ```
568    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
569    ///     use rumtk_core::core::RUMResult;
570    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
571    ///
572    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
573    ///         let mut result = Vec::<i32>::new();
574    ///         for arg in args.read().await.iter() {
575    ///             result.push(*arg);
576    ///         }
577    ///         Ok(result)
578    ///     }
579    ///
580    ///     let rt = rumtk_init_threads!();                                      // Creates runtime instance
581    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
582    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
583    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
584    /// ```
585    ///
586    /// ```
587    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
588    ///     use rumtk_core::core::RUMResult;
589    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
590    ///
591    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
592    ///         let mut result = Vec::<i32>::new();
593    ///         for arg in args.read().await.iter() {
594    ///             result.push(*arg);
595    ///         }
596    ///         Ok(result)
597    ///     }
598    ///
599    ///     let thread_count: usize = 10;
600    ///     let rt = rumtk_init_threads!(&thread_count);
601    ///     let args = rumtk_create_task_args!(1);
602    ///     let task = rumtk_create_task!(test, args);
603    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
604    /// ```
605    #[macro_export]
606    macro_rules! rumtk_init_threads {
607        ( ) => {{
608            use $crate::rumtk_cache_fetch;
609            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
610            use $crate::threading::threading_functions::get_default_system_thread_count;
611            let rt = rumtk_cache_fetch!(
612                &mut RT_CACHE,
613                &get_default_system_thread_count(),
614                init_cache
615            );
616            rt
617        }};
618        ( $threads:expr ) => {{
619            use $crate::rumtk_cache_fetch;
620            use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
621            let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
622            rt
623        }};
624    }
625
626    ///
627    /// Puts task onto the runtime queue.
628    ///
629    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
630    ///
631    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
632    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
633    ///
634    #[macro_export]
635    macro_rules! rumtk_spawn_task {
636        ( $func:expr ) => {{
637            let rt = rumtk_init_threads!();
638            rt.spawn($func)
639        }};
640        ( $rt:expr, $func:expr ) => {{
641            $rt.spawn($func)
642        }};
643    }
644
645    ///
646    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
647    ///
648    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
649    /// async closure without passing any arguments.
650    ///
651    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
652    /// In such a case, we pass those arguments to the call on the async closure and await on results.
653    ///
654    #[macro_export]
655    macro_rules! rumtk_wait_on_task {
656        ( $rt:expr, $func:expr ) => {{
657            $rt.block_on(async move {
658                $func().await
659            })
660        }};
661        ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
662            $rt.block_on(async move {
663                $func($($arg_items),+).await
664            })
665        }};
666    }
667
668    ///
669    /// This macro awaits a future.
670    ///
671    /// The arguments are a reference to the runtime (`rt) and a future.
672    ///
673    /// If there is a result, you will get the result of the future.
674    ///
675    /// ## Examples
676    ///
677    /// ```
678    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
679    ///     use rumtk_core::core::RUMResult;
680    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
681    ///
682    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
683    ///         let mut result = Vec::<i32>::new();
684    ///         for arg in args.read().await.iter() {
685    ///             result.push(*arg);
686    ///         }
687    ///         Ok(result)
688    ///     }
689    ///
690    ///     let rt = rumtk_init_threads!();
691    ///     let args = rumtk_create_task_args!(1);
692    ///     let task = rumtk_create_task!(test, args);
693    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
694    /// ```
695    ///
696    #[macro_export]
697    macro_rules! rumtk_resolve_task {
698        ( $rt:expr, $future:expr ) => {{
699            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
700            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
701            // is technically moved into the async closure and captured so things like mutex guards
702            // technically go out of the outer scope. As a result that expression fails to compile even
703            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
704            // into the async closure. To ensure that happens regardless of given expression, we do
705            // a variable assignment below to force the "future" macro expressions to resolve before
706            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
707            let future = $future;
708            $rt.block_on(async move { future.await })
709        }};
710    }
711
712    #[macro_export]
713    macro_rules! rumtk_resolve_task_from_async {
714        ( $rt:expr, $future:expr ) => {{
715            let handle = $rt.spawn_blocking(async move { future.await })
716        }};
717    }
718
719    ///
720    /// This macro creates an async body that calls the async closure and awaits it.
721    ///
722    /// ## Example
723    ///
724    /// ```
725    /// use std::sync::{Arc, RwLock};
726    /// use tokio::sync::RwLock as AsyncRwLock;
727    /// use rumtk_core::strings::RUMString;
728    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
729    ///
730    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
731    /// let expected = vec![
732    ///     RUMString::from("Hello"),
733    ///     RUMString::from("World!"),
734    ///     RUMString::from("Overcast"),
735    ///     RUMString::from("and"),
736    ///     RUMString::from("Sad"),
737    ///  ];
738    /// let locked_args = AsyncRwLock::new(expected.clone());
739    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
740    ///
741    ///
742    /// ```
743    ///
744    #[macro_export]
745    macro_rules! rumtk_create_task {
746        ( $func:expr ) => {{
747            async move {
748                let f = $func;
749                f().await
750            }
751        }};
752        ( $func:expr, $args:expr ) => {{
753            let f = $func;
754            async move { f(&$args).await }
755        }};
756    }
757
758    ///
759    /// Creates an instance of [SafeTaskArgs] with the arguments passed.
760    ///
761    /// ## Note
762    ///
763    /// All arguments must be of the same type
764    ///
765    #[macro_export]
766    macro_rules! rumtk_create_task_args {
767        ( ) => {{
768            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
769            use tokio::sync::RwLock;
770            SafeTaskArgs::new(RwLock::new(vec![]))
771        }};
772        ( $($args:expr),+ ) => {{
773            use $crate::threading::threading_manager::{SafeTaskArgs};
774            use tokio::sync::RwLock;
775            SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
776        }};
777    }
778
779    ///
780    /// Convenience macro for packaging the task components and launching the task in one line.
781    ///
782    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
783    /// number of threads at the end. This is optional. Meaning, we will default to the system's
784    /// number of threads if that value is not specified.
785    ///
786    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
787    /// variable number of arguments to pass to the task. each argument must be of the same type.
788    /// If you wish to pass different arguments with different types, please define an abstract type
789    /// whose underlying structure is a tuple of items and pass that instead.
790    ///
791    /// ## Examples
792    ///
793    /// ### With Default Thread Count
794    /// ```
795    ///     use rumtk_core::{rumtk_exec_task};
796    ///     use rumtk_core::core::RUMResult;
797    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
798    ///
799    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
800    ///         let mut result = Vec::<i32>::new();
801    ///         for arg in args.read().await.iter() {
802    ///             result.push(*arg);
803    ///         }
804    ///         Ok(result)
805    ///     }
806    ///
807    ///     let result = rumtk_exec_task!(test, vec![5]);
808    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
809    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
810    /// ```
811    ///
812    /// ### With Custom Thread Count
813    /// ```
814    ///     use rumtk_core::{rumtk_exec_task};
815    ///     use rumtk_core::core::RUMResult;
816    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
817    ///
818    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
819    ///         let mut result = Vec::<i32>::new();
820    ///         for arg in args.read().await.iter() {
821    ///             result.push(*arg);
822    ///         }
823    ///         Ok(result)
824    ///     }
825    ///
826    ///     let result = rumtk_exec_task!(test, vec![5], 5);
827    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
828    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
829    /// ```
830    ///
831    /// ### With Async Function Body
832    /// ```
833    ///     use rumtk_core::{rumtk_exec_task};
834    ///     use rumtk_core::core::RUMResult;
835    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
836    ///
837    ///     let result = rumtk_exec_task!(
838    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
839    ///         let mut result = Vec::<i32>::new();
840    ///         for arg in args.read().await.iter() {
841    ///             result.push(*arg);
842    ///         }
843    ///         Ok(result)
844    ///     },
845    ///     vec![5]);
846    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
847    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
848    /// ```
849    ///
850    /// ### With Async Function Body and No Args
851    /// ```
852    ///     use rumtk_core::{rumtk_exec_task};
853    ///     use rumtk_core::core::RUMResult;
854    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
855    ///
856    ///     let result = rumtk_exec_task!(
857    ///     async || -> RUMResult<Vec<i32>> {
858    ///         let mut result = Vec::<i32>::new();
859    ///         Ok(result)
860    ///     });
861    ///     let empty = Vec::<i32>::new();
862    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
863    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
864    /// ```
865    ///
866    /// ## Equivalent To
867    ///
868    /// ```
869    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
870    ///     use rumtk_core::core::RUMResult;
871    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
872    ///
873    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
874    ///         let mut result = Vec::<i32>::new();
875    ///         for arg in args.read().await.iter() {
876    ///             result.push(*arg);
877    ///         }
878    ///         Ok(result)
879    ///     }
880    ///
881    ///     let rt = rumtk_init_threads!();
882    ///     let args = rumtk_create_task_args!(1);
883    ///     let task = rumtk_create_task!(test, args);
884    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
885    /// ```
886    ///
887    #[macro_export]
888    macro_rules! rumtk_exec_task {
889        ($func:expr ) => {{
890            use tokio::sync::RwLock;
891            use $crate::{
892                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
893            };
894            let rt = rumtk_init_threads!();
895            let task = rumtk_create_task!($func);
896            rumtk_resolve_task!(&rt, task)
897        }};
898        ($func:expr, $args:expr ) => {{
899            use tokio::sync::RwLock;
900            use $crate::{
901                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
902            };
903            let rt = rumtk_init_threads!();
904            let args = SafeTaskArgs::new(RwLock::new($args));
905            let task = rumtk_create_task!($func, args);
906            rumtk_resolve_task!(&rt, task)
907        }};
908        ($func:expr, $args:expr , $threads:expr ) => {{
909            use tokio::sync::RwLock;
910            use $crate::{
911                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
912            };
913            let rt = rumtk_init_threads!(&$threads);
914            let args = SafeTaskArgs::new(RwLock::new($args));
915            let task = rumtk_create_task!($func, args);
916            rumtk_resolve_task!(&rt, task)
917        }};
918    }
919
920    ///
921    /// Sleep a duration of time in a sync context, so no await can be call on the result.
922    ///
923    /// You can pass any value that can be cast to f32.
924    ///
925    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
926    ///
927    /// ## Examples
928    ///
929    /// ```
930    ///     use rumtk_core::rumtk_sleep;
931    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
932    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
933    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
934    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
935    /// ```
936    ///
937    #[macro_export]
938    macro_rules! rumtk_sleep {
939        ( $dur:expr) => {{
940            use $crate::threading::threading_functions::sleep;
941            sleep($dur as f32)
942        }};
943    }
944
945    ///
946    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
947    ///
948    /// You can pass any value that can be cast to f32.
949    ///
950    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
951    ///
952    /// ## Examples
953    ///
954    /// ```
955    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
956    ///     use rumtk_core::core::RUMResult;
957    ///     rumtk_exec_task!( async || -> RUMResult<()> {
958    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
959    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
960    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
961    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
962    ///             Ok(())
963    ///         }
964    ///     );
965    /// ```
966    ///
967    #[macro_export]
968    macro_rules! rumtk_async_sleep {
969        ( $dur:expr) => {{
970            use $crate::threading::threading_functions::async_sleep;
971            async_sleep($dur as f32)
972        }};
973    }
974
975    ///
976    ///
977    ///
978    #[macro_export]
979    macro_rules! rumtk_new_task_queue {
980        ( $worker_num:expr ) => {{
981            use $crate::threading::threading_manager::TaskManager;
982            TaskManager::new($worker_num);
983        }};
984    }
985}