Skip to main content

rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::core::RUMResult;
27    use crate::strings::rumtk_format;
28    pub use std::sync::Mutex as SyncMutex;
29    pub use std::sync::MutexGuard as SyncMutexGuard;
30    use std::sync::OnceLock;
31    pub use std::sync::RwLock as SyncRwLock;
32    pub use tokio::io;
33    pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
34    use tokio::runtime::Runtime as TokioRuntime;
35    pub use tokio::sync::{
36        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
37        RwLockWriteGuard,
38    };
39
40    pub type RuntimeGuard<'a> = SyncMutexGuard<'a, TokioRuntime>;
41    /**************************** Types ***************************************/
42    pub type SafeTokioRuntime = OnceLock<SyncMutex<TokioRuntime>>;
43    /**************************** Globals **************************************/
44    static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
45    /**************************** Helpers ***************************************/
46    pub fn init_runtime(workers: usize) -> RUMResult<RuntimeGuard<'static>> {
47        unsafe {
48            let handle = DEFAULT_RUNTIME.get_or_init(|| {
49                let mut builder = tokio::runtime::Builder::new_multi_thread();
50                builder.worker_threads(workers);
51                builder.enable_all();
52                match builder.build() {
53                    Ok(handle) => SyncMutex::new(handle),
54                    Err(e) => panic!(
55                        "Unable to initialize threading tokio runtime because {}!",
56                        &e
57                    ),
58                }
59            });
60            match handle.lock() {
61                Ok(guard) => Ok(guard),
62                Err(e) => Err(rumtk_format!("Unable to lock tokio runtime!")),
63            }
64        }
65    }
66}
67
68pub mod threading_manager {
69    use crate::core::{RUMResult, RUMVec};
70    use crate::strings::rumtk_format;
71    use crate::threading::threading_functions::{async_sleep, sleep};
72    use crate::types::{RUMHashMap, RUMID};
73    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
74    use std::future::Future;
75    use std::sync::Arc;
76    pub use std::sync::RwLock as SyncRwLock;
77    use tokio::sync::RwLock as AsyncRwLock;
78    use tokio::task::JoinHandle;
79
80    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
81    const DEFAULT_TASK_CAPACITY: usize = 100;
82
83    pub type TaskItems<T> = RUMVec<T>;
84    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
85    pub type TaskArgs<T> = TaskItems<T>;
86    /// Function signature defining the interface of task processing logic.
87    pub type SafeTaskArgs<T> = Arc<AsyncRwLock<TaskItems<T>>>;
88    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
89    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
90    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
91    pub type TaskID = RUMID;
92
93    #[derive(Debug, Clone, Default)]
94    pub struct Task<R> {
95        pub id: TaskID,
96        pub finished: bool,
97        pub result: Option<R>,
98    }
99
100    pub type SafeTask<R> = Arc<Task<R>>;
101    type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
102    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
103    pub type SafeAsyncTaskTable<R> = Arc<AsyncRwLock<TaskTable<R>>>;
104    pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
105    pub type TaskBatch = RUMVec<TaskID>;
106    /// Type to use to define how task results are expected to be returned.
107    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
108    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
109
110    ///
111    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
112    /// gives the multithreading, asynchronous superpowers to synchronous logic.
113    ///
114    /// ## Example Usage
115    ///
116    /// ```
117    /// use std::sync::{Arc};
118    /// use tokio::sync::RwLock as AsyncRwLock;
119    /// use rumtk_core::core::RUMResult;
120    /// use rumtk_core::strings::RUMString;
121    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
122    /// use rumtk_core::{rumtk_create_task, };
123    ///
124    /// let expected = vec![
125    ///     RUMString::from("Hello"),
126    ///     RUMString::from("World!"),
127    ///     RUMString::from("Overcast"),
128    ///     RUMString::from("and"),
129    ///     RUMString::from("Sad"),
130    ///  ];
131    ///
132    /// type TestResult = RUMResult<Vec<RUMString>>;
133    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
134    ///
135    /// let locked_args = AsyncRwLock::new(expected.clone());
136    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
137    /// let processor = rumtk_create_task!(
138    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
139    ///         let owned_args = Arc::clone(args);
140    ///         let locked_args = owned_args.read().await;
141    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
142    ///
143    ///         for arg in locked_args.iter() {
144    ///             results.push(RUMString::new(arg));
145    ///         }
146    ///
147    ///         Ok(results)
148    ///     },
149    ///     task_args
150    /// );
151    ///
152    /// queue.add_task::<_>(processor);
153    /// let results = queue.wait();
154    ///
155    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
156    /// for r in results {
157    ///     for v in r.unwrap().result.clone().unwrap().iter() {
158    ///         for value in v.iter() {
159    ///             result_data.push(value.clone());
160    ///         }
161    ///     }
162    ///  }
163    ///
164    /// assert_eq!(result_data, expected, "Results do not match expected!");
165    ///
166    /// ```
167    ///
168    #[derive(Debug, Clone, Default)]
169    pub struct TaskManager<R> {
170        tasks: SafeSyncTaskTable<R>,
171        workers: usize,
172    }
173
174    impl<R> TaskManager<R>
175    where
176        R: Sync + Send + Clone + 'static,
177    {
178        ///
179        /// This method creates a [`TaskManager`] instance using sensible defaults.
180        ///
181        /// The `threads` field is computed from the number of cores present in system.
182        ///
183        pub fn default() -> RUMResult<TaskManager<R>> {
184            Self::new(&threading::threading_functions::get_default_system_thread_count())
185        }
186
187        ///
188        /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
189        /// Expects you to provide the count of threads to spawn and the microtask queue size
190        /// allocated by each thread.
191        ///
192        /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
193        /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
194        ///
195        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
196            let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
197                DEFAULT_TASK_CAPACITY,
198            )));
199            Ok(TaskManager::<R> {
200                tasks,
201                workers: worker_num.to_owned(),
202            })
203        }
204
205        ///
206        /// Add a task to the processing queue. The idea is that you can queue a processor function
207        /// and list of args that will be picked up by one of the threads for processing.
208        ///
209        /// This is the async counterpart
210        ///
211        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
212        where
213            F: Future<Output = R> + Send + Sync + 'static,
214            F::Output: Send + 'static,
215        {
216            let id = TaskID::new_v4();
217            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
218        }
219
220        ///
221        /// See [`Self::add_task_async`]
222        ///
223        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
224        /// the tokio runtim if trying to add task to queue from a normal function called from an
225        /// async environment.
226        ///
227        /// ## Example
228        ///
229        /// ```
230        /// use rumtk_core::threading::threading_manager::{TaskManager};
231        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
232        /// use std::sync::{Arc};
233        /// use tokio::sync::Mutex;
234        ///
235        /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
236        ///
237        /// async fn called_fn() -> usize {
238        ///     5
239        /// }
240        ///
241        /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
242        ///     manager.spawn_task(called_fn());
243        ///     1
244        /// }
245        ///
246        /// async fn call_sync_fn(mut manager: JobManager) -> usize {
247        ///     let mut owned = manager.lock().await;
248        ///     push_job(&mut owned)
249        /// }
250        ///
251        /// let workers = 5;
252        /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
253        ///
254        /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
255        ///
256        /// let result_raw = manager.blocking_lock().wait();
257        ///
258        /// ```
259        ///
260        pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
261        where
262            F: Future<Output = R> + Send + Sync + 'static,
263            F::Output: Send + Sized + 'static,
264        {
265            let id = TaskID::new_v4();
266            let rt = rumtk_init_threads!(self.workers)?;
267            rumtk_spawn_task!(
268                rt,
269                Self::_add_task_async(id.clone(), self.tasks.clone(), task)
270            );
271            Ok(id)
272        }
273
274        ///
275        /// See [add_task_async](Self::add_task_async)
276        ///
277        pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
278        where
279            F: Future<Output = R> + Send + Sync + 'static,
280            F::Output: Send + Sized + 'static,
281        {
282            let id = TaskID::new_v4();
283            let tasks = self.tasks.clone();
284            rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task))
285        }
286
287        async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
288        where
289            F: Future<Output = R> + Send + Sync + 'static,
290            F::Output: Send + Sized + 'static,
291        {
292            let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
293                id: id.clone(),
294                finished: false,
295                result: None,
296            }));
297            tasks.write().unwrap().insert(id.clone(), safe_task.clone());
298
299            let task_wrapper = async move || {
300                // Run the task
301                let result = task.await;
302
303                // Cleanup task
304                let mut lock = safe_task.write().unwrap();
305                lock.result = Some(result);
306                lock.finished = true;
307            };
308
309            tokio::spawn(task_wrapper());
310
311            id
312        }
313
314        ///
315        /// See [wait_async](Self::wait_async)
316        ///
317        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
318        /// this function happens to be called from the async context.
319        ///
320        pub fn wait(&mut self) -> TaskResults<R> {
321            let task_batch = self
322                .tasks
323                .read()
324                .unwrap()
325                .keys()
326                .cloned()
327                .collect::<Vec<_>>();
328            self.wait_on_batch(&task_batch)
329        }
330
331        ///
332        /// See [wait_on_batch_async](Self::wait_on_batch_async)
333        ///
334        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
335        /// this function happens to be called from the async context.
336        ///
337        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
338            let mut results = TaskResults::<R>::default();
339            for task in tasks {
340                results.push(self.wait_on(task));
341            }
342            results
343        }
344
345        ///
346        /// See [wait_on_async](Self::wait_on_async)
347        ///
348        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
349        /// this function happens to be called from the async context.
350        ///
351        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
352            let task = match self.tasks.write().unwrap().remove(task_id) {
353                Some(task) => task.clone(),
354                None => return Err(rumtk_format!("No task with id {}", task_id)),
355            };
356
357            while !task.read().unwrap().finished {
358                sleep(DEFAULT_SLEEP_DURATION);
359            }
360
361            let x = Ok(Arc::new(task.write().unwrap().clone()));
362            x
363        }
364
365        ///
366        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
367        ///
368        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
369        ///
370        /// Upon completion,
371        ///
372        /// 2. Return the result ([TaskResults<R>](TaskResults)).
373        ///
374        /// This operation consumes the task.
375        ///
376        /// ### Note:
377        /// ```text
378        ///     Results returned here are not guaranteed to be in the same order as the order in which
379        ///     the tasks were queued for work. You will need to pass a type as T that automatically
380        ///     tracks its own id or has a way for you to resort results.
381        /// ```
382        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
383            let task = match self.tasks.write().unwrap().remove(task_id) {
384                Some(task) => task.clone(),
385                None => return Err(rumtk_format!("No task with id {}", task_id)),
386            };
387
388            while !task.read().unwrap().finished {
389                async_sleep(DEFAULT_SLEEP_DURATION).await;
390            }
391
392            let x = Ok(Arc::new(task.write().unwrap().clone()));
393            x
394        }
395
396        ///
397        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
398        ///
399        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
400        ///
401        /// Upon completion,
402        ///
403        /// 1. We collect the results generated (if any).
404        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
405        ///
406        /// ### Note:
407        /// ```text
408        ///     Results returned here are not guaranteed to be in the same order as the order in which
409        ///     the tasks were queued for work. You will need to pass a type as T that automatically
410        ///     tracks its own id or has a way for you to resort results.
411        /// ```
412        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
413            let mut results = TaskResults::<R>::default();
414            for task in tasks {
415                results.push(self.wait_on_async(task).await);
416            }
417            results
418        }
419
420        ///
421        /// This method waits until all queued tasks have been processed from the main queue.
422        ///
423        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
424        ///
425        /// Upon completion,
426        ///
427        /// 1. We collect the results generated (if any).
428        /// 2. We reset the main task and result internal queue states.
429        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
430        ///
431        /// This operation consumes all the tasks.
432        ///
433        /// ### Note:
434        /// ```text
435        ///     Results returned here are not guaranteed to be in the same order as the order in which
436        ///     the tasks were queued for work. You will need to pass a type as T that automatically
437        ///     tracks its own id or has a way for you to resort results.
438        /// ```
439        pub async fn wait_async(&mut self) -> TaskResults<R> {
440            let task_batch = self
441                .tasks
442                .read()
443                .unwrap()
444                .keys()
445                .cloned()
446                .collect::<Vec<_>>();
447            self.wait_on_batch_async(&task_batch).await
448        }
449
450        ///
451        /// Check if all work has been completed from the task queue.
452        ///
453        /// ## Examples
454        ///
455        /// ### Sync Usage
456        ///
457        ///```
458        /// use rumtk_core::threading::threading_manager::TaskManager;
459        ///
460        /// let manager = TaskManager::<usize>::new(&4).unwrap();
461        ///
462        /// let all_done = manager.is_all_completed();
463        ///
464        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
465        ///
466        /// ```
467        ///
468        pub fn is_all_completed(&self) -> bool {
469            self._is_all_completed_async()
470        }
471
472        pub async fn is_all_completed_async(&self) -> bool {
473            self._is_all_completed_async()
474        }
475
476        fn _is_all_completed_async(&self) -> bool {
477            for (_, task) in self.tasks.read().unwrap().iter() {
478                if !task.read().unwrap().finished {
479                    return false;
480                }
481            }
482
483            true
484        }
485
486        ///
487        /// Check if a task completed
488        ///
489        pub fn is_finished(&self, id: &TaskID) -> bool {
490            match self.tasks.read().unwrap().get(id) {
491                Some(t) => t.read().unwrap().finished,
492                None => false,
493            }
494        }
495
496        pub async fn is_finished_async(&self, id: &TaskID) -> bool {
497            match self.tasks.read().unwrap().get(id) {
498                Some(task) => task.read().unwrap().finished,
499                None => true,
500            }
501        }
502
503        ///
504        /// Alias for [wait](TaskManager::wait).
505        ///
506        fn gather(&mut self) -> TaskResults<R> {
507            self.wait()
508        }
509    }
510}
511
512///
513/// This module contains a few helper.
514///
515/// For example, you can find a function for determining number of threads available in system.
516/// The sleep family of functions are also here.
517///
518pub mod threading_functions {
519    use crate::core::RUMResult;
520    use crate::rumtk_sleep;
521    use crate::strings::rumtk_format;
522    pub use crate::threading::thread_primitives::init_runtime;
523    use num_cpus;
524    use std::future::Future;
525    use std::thread::{available_parallelism, sleep as std_sleep};
526    use std::time::Duration;
527    use tokio::time::sleep as tokio_sleep;
528
529    pub const NANOS_PER_SEC: u64 = 1000000000;
530    pub const MILLIS_PER_SEC: u64 = 1000;
531    pub const MICROS_PER_SEC: u64 = 1000000;
532
533    const DEFAULT_SLEEP_DURATION: f32 = 0.001;
534
535    pub fn get_default_system_thread_count() -> usize {
536        let cpus: usize = num_cpus::get();
537        let parallelism = match available_parallelism() {
538            Ok(n) => n.get(),
539            Err(_) => 0,
540        };
541
542        if parallelism >= cpus {
543            parallelism
544        } else {
545            cpus
546        }
547    }
548
549    pub fn sleep(s: f32) {
550        let ns = s * NANOS_PER_SEC as f32;
551        let rounded_ns = ns.round() as u64;
552        let duration = Duration::from_nanos(rounded_ns);
553        std_sleep(duration);
554    }
555
556    pub async fn async_sleep(s: f32) {
557        let ns = s * NANOS_PER_SEC as f32;
558        let rounded_ns = ns.round() as u64;
559        let duration = Duration::from_nanos(rounded_ns);
560        tokio_sleep(duration).await;
561    }
562
563    ///
564    /// Given a closure task, push it onto the current `tokio` runtime for execution.
565    /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
566    /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
567    /// result.
568    ///
569    /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
570    ///
571    /// ## Example
572    ///
573    /// ```
574    /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
575    ///
576    /// const Hello: &str = "World!";
577    ///
578    /// init_runtime(5);
579    ///
580    /// let result = block_on_task(async {
581    ///     Hello
582    /// }).unwrap();
583    ///
584    /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
585    /// ```
586    ///
587    pub fn block_on_task<R, F>(task: F) -> RUMResult<R>
588    where
589        F: Future<Output = R> + Send + 'static,
590        F::Output: Send + 'static,
591    {
592        let default_system_thread_count = get_default_system_thread_count();
593
594        let handle = init_runtime(default_system_thread_count)
595            .expect("Failed to initialize runtime")
596            .spawn(task);
597
598        while !handle.is_finished() {
599            rumtk_sleep!(DEFAULT_SLEEP_DURATION);
600        }
601
602        tokio::task::block_in_place(move || {
603            match init_runtime(default_system_thread_count)
604                .expect("Failed to initialize runtime")
605                .block_on(handle)
606            {
607                Ok(r) => Ok(r),
608                Err(e) => Err(rumtk_format!(
609                    "Issue peeking into task in the runtime because => {}",
610                    &e
611                )),
612            }
613        })
614    }
615}
616
617///
618/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
619/// This means that by default, all jobs sent to the thread pool have to be async in nature.
620/// These macros make handling of these jobs at the sync/async boundary more convenient.
621///
622pub mod threading_macros {
623    use crate::threading::thread_primitives;
624    use crate::threading::threading_manager::SafeTaskArgs;
625
626    ///
627    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
628    /// will be saved to the global context so the next call to this macro will simply grab a
629    /// reference to the previously initialized runtime.
630    ///
631    /// Passing nothing will default to initializing a runtime using the default number of threads
632    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
633    ///
634    /// Passing `threads` number will yield a runtime that allocates that many threads.
635    ///
636    ///
637    /// ## Examples
638    ///
639    /// ```
640    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
641    ///     use rumtk_core::core::RUMResult;
642    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
643    ///
644    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
645    ///         let mut result = Vec::<i32>::new();
646    ///         for arg in args.read().await.iter() {
647    ///             result.push(*arg);
648    ///         }
649    ///         Ok(result)
650    ///     }
651    ///
652    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
653    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
654    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task)); // Spawn's task and waits for it to conclude.
655    /// ```
656    ///
657    /// ```
658    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
659    ///     use rumtk_core::core::RUMResult;
660    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
661    ///
662    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
663    ///         let mut result = Vec::<i32>::new();
664    ///         for arg in args.read().await.iter() {
665    ///             result.push(*arg);
666    ///         }
667    ///         Ok(result)
668    ///     }
669    ///
670    ///     let thread_count: usize = 10;
671    ///     let args = rumtk_create_task_args!(1);
672    ///     let task = rumtk_create_task!(test, args);
673    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
674    /// ```
675    #[macro_export]
676    macro_rules! rumtk_init_threads {
677        ( ) => {{
678            use $crate::threading::thread_primitives::init_runtime;
679            use $crate::threading::threading_functions::get_default_system_thread_count;
680            init_runtime(get_default_system_thread_count())
681        }};
682        ( $threads:expr ) => {{
683            use $crate::rumtk_cache_fetch;
684            use $crate::threading::thread_primitives::init_runtime;
685            init_runtime($threads)
686        }};
687    }
688
689    ///
690    /// Puts task onto the runtime queue.
691    ///
692    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
693    ///
694    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
695    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
696    ///
697    #[macro_export]
698    macro_rules! rumtk_spawn_task {
699        ( $func:expr ) => {{
700            let rt = rumtk_init_threads!().expect("Runtime is not initialized!");
701            rt.spawn($func)
702        }};
703        ( $rt:expr, $func:expr ) => {{
704            $rt.spawn($func)
705        }};
706    }
707
708    ///
709    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
710    ///
711    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
712    /// async closure without passing any arguments.
713    ///
714    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
715    /// In such a case, we pass those arguments to the call on the async closure and await on results.
716    ///
717    #[macro_export]
718    macro_rules! rumtk_wait_on_task {
719        ( $func:expr ) => {{
720            use $crate::threading::threading_functions::block_on_task;
721            block_on_task(async move {
722                $func().await
723            })
724        }};
725        ( $func:expr, $($arg_items:expr),+ ) => {{
726            use $crate::threading::threading_functions::block_on_task;
727            block_on_task(async move {
728                $func($($arg_items),+).await
729            })
730        }};
731    }
732
733    ///
734    /// This macro awaits a future.
735    ///
736    /// The arguments are a reference to the runtime (`rt) and a future.
737    ///
738    /// If there is a result, you will get the result of the future.
739    ///
740    /// ## Examples
741    ///
742    /// ```
743    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
744    ///     use rumtk_core::core::RUMResult;
745    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
746    ///
747    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
748    ///         let mut result = Vec::<i32>::new();
749    ///         for arg in args.read().await.iter() {
750    ///             result.push(*arg);
751    ///         }
752    ///         Ok(result)
753    ///     }
754    ///
755    ///     let args = rumtk_create_task_args!(1);
756    ///     let task = rumtk_create_task!(test, args);
757    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
758    /// ```
759    ///
760    #[macro_export]
761    macro_rules! rumtk_resolve_task {
762        ( $future:expr ) => {{
763            use $crate::threading::threading_functions::block_on_task;
764            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
765            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
766            // is technically moved into the async closure and captured so things like mutex guards
767            // technically go out of the outer scope. As a result that expression fails to compile even
768            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
769            // into the async closure. To ensure that happens regardless of given expression, we do
770            // a variable assignment below to force the "future" macro expressions to resolve before
771            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
772            //let future = $future;
773            block_on_task(async move { $future.await })
774        }};
775    }
776
777    #[macro_export]
778    macro_rules! rumtk_resolve_task_from_async {
779        ( $rt:expr, $future:expr ) => {{
780            let handle = $rt.spawn_blocking(async move { future.await })
781        }};
782    }
783
784    ///
785    /// This macro creates an async body that calls the async closure and awaits it.
786    ///
787    /// ## Example
788    ///
789    /// ```
790    /// use std::sync::{Arc, RwLock};
791    /// use tokio::sync::RwLock as AsyncRwLock;
792    /// use rumtk_core::strings::RUMString;
793    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
794    ///
795    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
796    /// let expected = vec![
797    ///     RUMString::from("Hello"),
798    ///     RUMString::from("World!"),
799    ///     RUMString::from("Overcast"),
800    ///     RUMString::from("and"),
801    ///     RUMString::from("Sad"),
802    ///  ];
803    /// let locked_args = AsyncRwLock::new(expected.clone());
804    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
805    ///
806    ///
807    /// ```
808    ///
809    #[macro_export]
810    macro_rules! rumtk_create_task {
811        ( $func:expr ) => {{
812            async move {
813                let f = $func;
814                f().await
815            }
816        }};
817        ( $func:expr, $args:expr ) => {{
818            let f = $func;
819            async move { f(&$args).await }
820        }};
821    }
822
823    ///
824    /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
825    ///
826    /// ## Note
827    ///
828    /// All arguments must be of the same type
829    ///
830    #[macro_export]
831    macro_rules! rumtk_create_task_args {
832        ( ) => {{
833            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
834            use $crate::threading::thread_primitives::AsyncRwLock;
835            SafeTaskArgs::new(AsyncRwLock::new(vec![]))
836        }};
837        ( $($args:expr),+ ) => {{
838            use $crate::threading::threading_manager::{SafeTaskArgs};
839            use $crate::threading::thread_primitives::AsyncRwLock;
840            SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
841        }};
842    }
843
844    ///
845    /// Convenience macro for packaging the task components and launching the task in one line.
846    ///
847    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
848    /// number of threads at the end. This is optional. Meaning, we will default to the system's
849    /// number of threads if that value is not specified.
850    ///
851    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
852    /// variable number of arguments to pass to the task. each argument must be of the same type.
853    /// If you wish to pass different arguments with different types, please define an abstract type
854    /// whose underlying structure is a tuple of items and pass that instead.
855    ///
856    /// ## Examples
857    ///
858    /// ### With Default Thread Count
859    /// ```
860    ///     use rumtk_core::{rumtk_exec_task};
861    ///     use rumtk_core::core::RUMResult;
862    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
863    ///
864    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
865    ///         let mut result = Vec::<i32>::new();
866    ///         for arg in args.read().await.iter() {
867    ///             result.push(*arg);
868    ///         }
869    ///         Ok(result)
870    ///     }
871    ///
872    ///     let result = rumtk_exec_task!(test, vec![5]).unwrap();
873    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
874    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
875    /// ```
876    ///
877    /// ### With Custom Thread Count
878    /// ```
879    ///     use rumtk_core::{rumtk_exec_task};
880    ///     use rumtk_core::core::RUMResult;
881    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
882    ///
883    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
884    ///         let mut result = Vec::<i32>::new();
885    ///         for arg in args.read().await.iter() {
886    ///             result.push(*arg);
887    ///         }
888    ///         Ok(result)
889    ///     }
890    ///
891    ///     let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
892    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
893    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
894    /// ```
895    ///
896    /// ### With Async Function Body
897    /// ```
898    ///     use rumtk_core::{rumtk_exec_task};
899    ///     use rumtk_core::core::RUMResult;
900    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
901    ///
902    ///     let result = rumtk_exec_task!(
903    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
904    ///         let mut result = Vec::<i32>::new();
905    ///         for arg in args.read().await.iter() {
906    ///             result.push(*arg);
907    ///         }
908    ///         Ok(result)
909    ///     },
910    ///     vec![5]).unwrap();
911    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
912    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
913    /// ```
914    ///
915    /// ### With Async Function Body and No Args
916    /// ```
917    ///     use rumtk_core::{rumtk_exec_task};
918    ///     use rumtk_core::core::RUMResult;
919    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
920    ///
921    ///     let result = rumtk_exec_task!(
922    ///     async || -> RUMResult<Vec<i32>> {
923    ///         let mut result = Vec::<i32>::new();
924    ///         Ok(result)
925    ///     }).unwrap();
926    ///     let empty = Vec::<i32>::new();
927    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
928    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
929    /// ```
930    ///
931    /// ## Equivalent To
932    ///
933    /// ```
934    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
935    ///     use rumtk_core::core::RUMResult;
936    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
937    ///
938    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
939    ///         let mut result = Vec::<i32>::new();
940    ///         for arg in args.read().await.iter() {
941    ///             result.push(*arg);
942    ///         }
943    ///         Ok(result)
944    ///     }
945    ///
946    ///     let args = rumtk_create_task_args!(1);
947    ///     let task = rumtk_create_task!(test, args);
948    ///     let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
949    /// ```
950    ///
951    #[macro_export]
952    macro_rules! rumtk_exec_task {
953        ($func:expr ) => {{
954            use $crate::{
955                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
956            };
957            let task = rumtk_create_task!($func);
958            rumtk_resolve_task!(task)
959        }};
960        ($func:expr, $args:expr ) => {{
961            use $crate::threading::threading_functions::get_default_system_thread_count;
962            rumtk_exec_task!($func, $args, get_default_system_thread_count())
963        }};
964        ($func:expr, $args:expr , $threads:expr ) => {{
965            use $crate::threading::thread_primitives::AsyncRwLock;
966            use $crate::{
967                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
968            };
969            let args = SafeTaskArgs::new(AsyncRwLock::new($args));
970            let task = rumtk_create_task!($func, args);
971            rumtk_resolve_task!(task)
972        }};
973    }
974
975    ///
976    /// Sleep a duration of time in a sync context, so no await can be call on the result.
977    ///
978    /// You can pass any value that can be cast to f32.
979    ///
980    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
981    ///
982    /// ## Examples
983    ///
984    /// ```
985    ///     use rumtk_core::rumtk_sleep;
986    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
987    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
988    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
989    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
990    /// ```
991    ///
992    #[macro_export]
993    macro_rules! rumtk_sleep {
994        ( $dur:expr) => {{
995            use $crate::threading::threading_functions::sleep;
996            sleep($dur as f32)
997        }};
998    }
999
1000    ///
1001    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1002    ///
1003    /// You can pass any value that can be cast to f32.
1004    ///
1005    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1006    ///
1007    /// ## Examples
1008    ///
1009    /// ```
1010    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1011    ///     use rumtk_core::core::RUMResult;
1012    ///     rumtk_exec_task!( async || -> RUMResult<()> {
1013    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
1014    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
1015    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
1016    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1017    ///             Ok(())
1018    ///         }
1019    ///     );
1020    /// ```
1021    ///
1022    #[macro_export]
1023    macro_rules! rumtk_async_sleep {
1024        ( $dur:expr) => {{
1025            use $crate::threading::threading_functions::async_sleep;
1026            async_sleep($dur as f32)
1027        }};
1028    }
1029
1030    ///
1031    ///
1032    ///
1033    #[macro_export]
1034    macro_rules! rumtk_new_task_queue {
1035        ( $worker_num:expr ) => {{
1036            use $crate::threading::threading_manager::TaskManager;
1037            TaskManager::new($worker_num);
1038        }};
1039    }
1040}