Skip to main content

rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19 */
20
21///
22/// This module provides all the primitives needed to build a multithreaded application.
23///
24pub mod thread_primitives {
25    pub use std::sync::Mutex as SyncMutex;
26    pub use std::sync::MutexGuard as SyncMutexGuard;
27    pub use std::sync::RwLock as SyncRwLock;
28    use std::sync::{Arc, OnceLock};
29    pub use tokio::io;
30    pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
31    use tokio::runtime::Runtime as TokioRuntime;
32    pub use tokio::sync::{
33        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard,
34        OwnedRwLockReadGuard as AsyncOwnedRwLockMappedReadGuard,
35        OwnedRwLockReadGuard as AsyncOwnedRwLockReadGuard,
36        OwnedRwLockWriteGuard as AsyncOwnedRwLockWriteGuard, RwLock as AsyncRwLock,
37        RwLockMappedWriteGuard as AsyncRwLockMappedWriteGuard,
38        RwLockReadGuard as AsyncRwLockReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
39    };
40
41    /**************************** Types ***************************************/
42    pub type SafeLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
43    pub type MappedLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
44    pub type SafeLockWriteGuard<T> = AsyncOwnedRwLockWriteGuard<T>;
45    pub type SafeLock<T> = Arc<AsyncRwLock<T>>;
46    pub type SafeTokioRuntime = OnceLock<TokioRuntime>;
47}
48
49pub mod threading_manager {
50    use crate::core::{RUMResult, RUMVec};
51    use crate::strings::rumtk_format;
52    use crate::threading::thread_primitives::SafeLock;
53    use crate::threading::threading_functions::{async_sleep, sleep};
54    use crate::types::{RUMHashMap, RUMID};
55    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
56    use std::future::Future;
57    use std::sync::Arc;
58    pub use std::sync::RwLock as SyncRwLock;
59    use tokio::io::AsyncReadExt;
60    use tokio::task::JoinHandle;
61
62    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
63    const DEFAULT_TASK_CAPACITY: usize = 100;
64
65    pub type AsyncHandle<T> = JoinHandle<T>;
66    pub type TaskItems<T> = RUMVec<T>;
67    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68    pub type TaskArgs<T> = TaskItems<T>;
69    /// Function signature defining the interface of task processing logic.
70    pub type SafeTaskArgs<T> = SafeLock<TaskItems<T>>;
71    pub type AsyncTaskHandle<R> = AsyncHandle<TaskResult<R>>;
72    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74    pub type TaskID = RUMID;
75
76    #[derive(Debug, Clone, Default)]
77    pub struct Task<R> {
78        pub id: TaskID,
79        pub finished: bool,
80        pub result: Option<R>,
81    }
82
83    pub type SafeTask<R> = Arc<Task<R>>;
84    type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
85    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86    pub type SafeAsyncTaskTable<R> = SafeLock<TaskTable<R>>;
87    pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
88    pub type TaskBatch = RUMVec<TaskID>;
89    /// Type to use to define how task results are expected to be returned.
90    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
91    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
92
93    ///
94    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
95    /// gives the multithreading, asynchronous superpowers to synchronous logic.
96    ///
97    /// ## Example Usage
98    ///
99    /// ```
100    /// use std::sync::{Arc};
101    /// use tokio::sync::RwLock as AsyncRwLock;
102    /// use rumtk_core::core::RUMResult;
103    /// use rumtk_core::strings::RUMString;
104    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
105    /// use rumtk_core::{rumtk_create_task, };
106    ///
107    /// let expected = vec![
108    ///     RUMString::from("Hello"),
109    ///     RUMString::from("World!"),
110    ///     RUMString::from("Overcast"),
111    ///     RUMString::from("and"),
112    ///     RUMString::from("Sad"),
113    ///  ];
114    ///
115    /// type TestResult = RUMResult<Vec<RUMString>>;
116    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
117    ///
118    /// let locked_args = AsyncRwLock::new(expected.clone());
119    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
120    /// let processor = rumtk_create_task!(
121    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
122    ///         let owned_args = Arc::clone(args);
123    ///         let locked_args = owned_args.read().await;
124    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
125    ///
126    ///         for arg in locked_args.iter() {
127    ///             results.push(RUMString::new(arg));
128    ///         }
129    ///
130    ///         Ok(results)
131    ///     },
132    ///     task_args
133    /// );
134    ///
135    /// queue.add_task::<_>(processor);
136    /// let results = queue.wait();
137    ///
138    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
139    /// for r in results {
140    ///     for v in r.unwrap().result.clone().unwrap().iter() {
141    ///         for value in v.iter() {
142    ///             result_data.push(value.clone());
143    ///         }
144    ///     }
145    ///  }
146    ///
147    /// assert_eq!(result_data, expected, "Results do not match expected!");
148    ///
149    /// ```
150    ///
151    #[derive(Debug, Clone, Default)]
152    pub struct TaskManager<R> {
153        tasks: SafeSyncTaskTable<R>,
154        workers: usize,
155    }
156
157    impl<R> TaskManager<R>
158    where
159        R: Sync + Send + Clone + 'static,
160    {
161        ///
162        /// This method creates a [`TaskManager`] instance using sensible defaults.
163        ///
164        /// The `threads` field is computed from the number of cores present in system.
165        ///
166        pub fn default() -> RUMResult<TaskManager<R>> {
167            Self::new(&threading::threading_functions::get_default_system_thread_count())
168        }
169
170        ///
171        /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
172        /// Expects you to provide the count of threads to spawn and the microtask queue size
173        /// allocated by each thread.
174        ///
175        /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
176        /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
177        ///
178        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
179            let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
180                DEFAULT_TASK_CAPACITY,
181            )));
182            Ok(TaskManager::<R> {
183                tasks,
184                workers: worker_num.to_owned(),
185            })
186        }
187
188        ///
189        /// Add a task to the processing queue. The idea is that you can queue a processor function
190        /// and list of args that will be picked up by one of the threads for processing.
191        ///
192        /// This is the async counterpart
193        ///
194        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
195        where
196            F: Future<Output = R> + Send + Sync + 'static,
197            F::Output: Send + 'static,
198        {
199            let id = TaskID::new_v4();
200            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
201        }
202
203        ///
204        /// See [`Self::add_task_async`]
205        ///
206        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
207        /// the tokio runtim if trying to add task to queue from a normal function called from an
208        /// async environment.
209        ///
210        /// ## Example
211        ///
212        /// ```
213        /// use rumtk_core::threading::threading_manager::{TaskManager};
214        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
215        /// use std::sync::{Arc};
216        /// use tokio::sync::Mutex;
217        ///
218        /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
219        ///
220        /// async fn called_fn() -> usize {
221        ///     5
222        /// }
223        ///
224        /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
225        ///     manager.spawn_task(called_fn());
226        ///     1
227        /// }
228        ///
229        /// async fn call_sync_fn(mut manager: JobManager) -> usize {
230        ///     let mut owned = manager.lock().await;
231        ///     push_job(&mut owned)
232        /// }
233        ///
234        /// let workers = 5;
235        /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
236        ///
237        /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
238        ///
239        /// let result_raw = manager.blocking_lock().wait();
240        ///
241        /// ```
242        ///
243        pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
244        where
245            F: Future<Output = R> + Send + Sync + 'static,
246            F::Output: Send + Sized + 'static,
247        {
248            let id = TaskID::new_v4();
249            let rt = rumtk_init_threads!(self.workers);
250            rumtk_spawn_task!(
251                rt,
252                Self::_add_task_async(id.clone(), self.tasks.clone(), task)
253            );
254            Ok(id)
255        }
256
257        ///
258        /// See [add_task_async](Self::add_task_async)
259        ///
260        pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
261        where
262            F: Future<Output = R> + Send + Sync + 'static,
263            F::Output: Send + Sized + 'static,
264        {
265            let id = TaskID::new_v4();
266            let tasks = self.tasks.clone();
267            Ok(rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task)))
268        }
269
270        async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
271        where
272            F: Future<Output = R> + Send + Sync + 'static,
273            F::Output: Send + Sized + 'static,
274        {
275            let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
276                id: id.clone(),
277                finished: false,
278                result: None,
279            }));
280            tasks.write().unwrap().insert(id.clone(), safe_task.clone());
281
282            let task_wrapper = async move || {
283                // Run the task
284                let result = task.await;
285
286                // Cleanup task
287                let mut lock = safe_task.write().unwrap();
288                lock.result = Some(result);
289                lock.finished = true;
290            };
291
292            tokio::spawn(task_wrapper());
293
294            id
295        }
296
297        ///
298        /// See [wait_async](Self::wait_async)
299        ///
300        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
301        /// this function happens to be called from the async context.
302        ///
303        pub fn wait(&mut self) -> TaskResults<R> {
304            let task_batch = self
305                .tasks
306                .read()
307                .unwrap()
308                .keys()
309                .cloned()
310                .collect::<Vec<_>>();
311            self.wait_on_batch(&task_batch)
312        }
313
314        ///
315        /// See [wait_on_batch_async](Self::wait_on_batch_async)
316        ///
317        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
318        /// this function happens to be called from the async context.
319        ///
320        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
321            let mut results = TaskResults::<R>::default();
322            for task in tasks {
323                results.push(self.wait_on(task));
324            }
325            results
326        }
327
328        ///
329        /// See [wait_on_async](Self::wait_on_async)
330        ///
331        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
332        /// this function happens to be called from the async context.
333        ///
334        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
335            let task = match self.tasks.write().unwrap().remove(task_id) {
336                Some(task) => task.clone(),
337                None => return Err(rumtk_format!("No task with id {}", task_id)),
338            };
339
340            while !task.read().unwrap().finished {
341                sleep(DEFAULT_SLEEP_DURATION);
342            }
343
344            let x = Ok(Arc::new(task.write().unwrap().clone()));
345            x
346        }
347
348        ///
349        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
350        ///
351        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
352        ///
353        /// Upon completion,
354        ///
355        /// 2. Return the result ([TaskResults<R>](TaskResults)).
356        ///
357        /// This operation consumes the task.
358        ///
359        /// ### Note:
360        /// ```text
361        ///     Results returned here are not guaranteed to be in the same order as the order in which
362        ///     the tasks were queued for work. You will need to pass a type as T that automatically
363        ///     tracks its own id or has a way for you to resort results.
364        /// ```
365        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
366            let task = match self.tasks.write().unwrap().remove(task_id) {
367                Some(task) => task.clone(),
368                None => return Err(rumtk_format!("No task with id {}", task_id)),
369            };
370
371            while !task.read().unwrap().finished {
372                async_sleep(DEFAULT_SLEEP_DURATION).await;
373            }
374
375            let x = Ok(Arc::new(task.write().unwrap().clone()));
376            x
377        }
378
379        ///
380        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
381        ///
382        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
383        ///
384        /// Upon completion,
385        ///
386        /// 1. We collect the results generated (if any).
387        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
388        ///
389        /// ### Note:
390        /// ```text
391        ///     Results returned here are not guaranteed to be in the same order as the order in which
392        ///     the tasks were queued for work. You will need to pass a type as T that automatically
393        ///     tracks its own id or has a way for you to resort results.
394        /// ```
395        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
396            let mut results = TaskResults::<R>::default();
397            for task in tasks {
398                results.push(self.wait_on_async(task).await);
399            }
400            results
401        }
402
403        ///
404        /// This method waits until all queued tasks have been processed from the main queue.
405        ///
406        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
407        ///
408        /// Upon completion,
409        ///
410        /// 1. We collect the results generated (if any).
411        /// 2. We reset the main task and result internal queue states.
412        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
413        ///
414        /// This operation consumes all the tasks.
415        ///
416        /// ### Note:
417        /// ```text
418        ///     Results returned here are not guaranteed to be in the same order as the order in which
419        ///     the tasks were queued for work. You will need to pass a type as T that automatically
420        ///     tracks its own id or has a way for you to resort results.
421        /// ```
422        pub async fn wait_async(&mut self) -> TaskResults<R> {
423            let task_batch = self
424                .tasks
425                .read()
426                .unwrap()
427                .keys()
428                .cloned()
429                .collect::<Vec<_>>();
430            self.wait_on_batch_async(&task_batch).await
431        }
432
433        ///
434        /// Check if all work has been completed from the task queue.
435        ///
436        /// ## Examples
437        ///
438        /// ### Sync Usage
439        ///
440        ///```
441        /// use rumtk_core::threading::threading_manager::TaskManager;
442        ///
443        /// let manager = TaskManager::<usize>::new(&4).unwrap();
444        ///
445        /// let all_done = manager.is_all_completed();
446        ///
447        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
448        ///
449        /// ```
450        ///
451        pub fn is_all_completed(&self) -> bool {
452            self._is_all_completed_async()
453        }
454
455        pub async fn is_all_completed_async(&self) -> bool {
456            self._is_all_completed_async()
457        }
458
459        fn _is_all_completed_async(&self) -> bool {
460            for (_, task) in self.tasks.read().unwrap().iter() {
461                if !task.read().unwrap().finished {
462                    return false;
463                }
464            }
465
466            true
467        }
468
469        ///
470        /// Check if a task completed
471        ///
472        pub fn is_finished(&self, id: &TaskID) -> bool {
473            match self.tasks.read().unwrap().get(id) {
474                Some(t) => t.read().unwrap().finished,
475                None => false,
476            }
477        }
478
479        pub async fn is_finished_async(&self, id: &TaskID) -> bool {
480            match self.tasks.read().unwrap().get(id) {
481                Some(task) => task.read().unwrap().finished,
482                None => true,
483            }
484        }
485
486        ///
487        /// Alias for [wait](TaskManager::wait).
488        ///
489        fn gather(&mut self) -> TaskResults<R> {
490            self.wait()
491        }
492
493        pub fn has_job(&self, id: &TaskID) -> bool {
494            match self.tasks.read().unwrap().get(id) {
495                Some(_) => true,
496                None => false,
497            }
498        }
499    }
500}
501
502///
503/// This module contains a few helper.
504///
505/// For example, you can find a function for determining number of threads available in system.
506/// The sleep family of functions are also here.
507///
508pub mod threading_functions {
509    use crate::core::RUMResult;
510    use crate::net::tcp::{AsyncOwnedRwLockReadGuard, AsyncOwnedRwLockWriteGuard, SafeLockReadGuard, SafeLockWriteGuard, SafeTokioRuntime};
511    use crate::threading::thread_primitives::{AsyncRwLock, SafeLock};
512    use num_cpus;
513    use std::future::Future;
514    use std::sync::Arc;
515    use std::thread::{available_parallelism, sleep as std_sleep};
516    use std::time::Duration;
517    use tokio::runtime::Runtime;
518    use tokio::task::JoinHandle;
519    use tokio::time::sleep as tokio_sleep;
520    /**************************** Globals **************************************/
521    static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
522
523    pub const NANOS_PER_SEC: u64 = 1000000000;
524    pub const MILLIS_PER_SEC: u64 = 1000;
525    pub const MICROS_PER_SEC: u64 = 1000000;
526    const DEFAULT_SLEEP_DURATION: f32 = 0.001;
527    /**************************** Helpers **************************************/
528    pub fn init_runtime<'a>(workers: usize) -> &'a Runtime {
529        unsafe {
530            let runtime = DEFAULT_RUNTIME.get_or_init(|| {
531                let mut builder = tokio::runtime::Builder::new_multi_thread();
532                builder.worker_threads(workers);
533                builder.enable_all();
534                match builder.build() {
535                    Ok(handle) => handle,
536                    Err(e) => panic!(
537                        "Unable to initialize threading tokio runtime because {}!",
538                        &e
539                    ),
540                }
541            });
542            runtime
543        }
544    }
545
546    pub fn get_default_system_thread_count() -> usize {
547        let cpus: usize = num_cpus::get();
548        let parallelism = match available_parallelism() {
549            Ok(n) => n.get(),
550            Err(_) => 0,
551        };
552
553        if parallelism >= cpus {
554            parallelism
555        } else {
556            cpus
557        }
558    }
559
560    pub fn sleep(s: f32) {
561        let ns = s * NANOS_PER_SEC as f32;
562        let rounded_ns = ns.round() as u64;
563        let duration = Duration::from_nanos(rounded_ns);
564        std_sleep(duration);
565    }
566
567    pub async fn async_sleep(s: f32) {
568        let ns = s * NANOS_PER_SEC as f32;
569        let rounded_ns = ns.round() as u64;
570        let duration = Duration::from_nanos(rounded_ns);
571        tokio_sleep(duration).await;
572    }
573
574    ///
575    /// Given a closure task, push it onto the current `tokio` runtime for execution.
576    /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
577    /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
578    /// result.
579    ///
580    /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
581    ///
582    /// ## Example
583    ///
584    /// ```
585    /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
586    ///
587    /// const Hello: &str = "World!";
588    ///
589    /// init_runtime(5);
590    ///
591    /// let result = block_on_task(async {
592    ///     Hello
593    /// });
594    ///
595    /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
596    /// ```
597    ///
598    /// ## Notes
599    /// ```text
600    ///     You need to wrap our call to block_on with a call to tokio::task::block_in_place to force
601    ///     cleanup of async executor and therefore avoid panics from the tokio runtime!
602    ///     Per Tokio's documentation, spawn_blocking would be better since it moves the task to an
603    ///     executor meant for blocking tasks instead of moving tasks out of the current thread and
604    ///     converting the thread into a clocking executor. The reason we don't do that is because
605    ///     the call to this function expects to block the current thread until completion and then
606    ///     return the result. If there's an issue with IO, revisit this function.
607    ///
608    ///     https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
609    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on
610    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
611    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
612    /// ```
613    ///
614    pub fn block_on_task<R, F>(task: F) -> R
615    where
616        F: Future<Output = R> + Send + 'static,
617        F::Output: Send + 'static,
618    {
619        let rt = init_runtime(get_default_system_thread_count());
620        // You need to wrap our call to block_on with a call to tokio::task::block_in_place to force 
621        // cleanup of async executor and therefore avoid panics from the tokio runtime!
622        // Per Tokio's documentation, spawn_blocking would be better since it moves the task to an 
623        // executor meant for blocking tasks instead of moving tasks out of the current thread and 
624        // converting the thread into a clocking executor. The reason we don't do that is because 
625        // the call to this function expects to block the current thread until completion and then 
626        // return the result. If there's an issue with IO, revisit this function.
627        //
628        // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
629        // https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on
630        // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
631        // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
632        tokio::task::block_in_place(move || {
633            rt.block_on(task)
634        })
635    }
636
637    ///
638    /// This helper should be used for spawning tasks that would normally block the async runtime.
639    /// However, here we use the appropriate `tokio` facilities to signal the runtime on how
640    /// to handle this, potentially blocking, task. For waiting on potentially blocking futures, use
641    /// [block_on_task] instead!
642    ///
643    ///
644    ///
645    /// ## Notes
646    /// ```text
647    ///     You need to wrap our call to block_on with a call to tokio::task::block_in_place to force
648    ///     cleanup of async executor and therefore avoid panics from the tokio runtime!
649    ///     Per Tokio's documentation, spawn_blocking would be better since it moves the task to an
650    ///     executor meant for blocking tasks instead of moving tasks out of the current thread and
651    ///     converting the thread into a clocking executor. The reason we don't do that is because
652    ///     the call to this function expects to block the current thread until completion and then
653    ///     return the result. If there's an issue with IO, revisit this function.
654    ///
655    ///     https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
656    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on
657    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
658    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
659    /// ```
660    ///
661    pub fn spawn_blocking_sync_task<R, F>(task: F) -> JoinHandle<R>
662    where
663        F: FnOnce() -> R + Send + 'static,
664        R: Send + 'static,
665    {
666        let rt = init_runtime(get_default_system_thread_count());
667        rt.spawn_blocking(task)
668    }
669
670    pub fn new_lock<T>(data: T) -> SafeLock<T> {
671        Arc::new(AsyncRwLock::new(data))
672    }
673
674    ///
675    /// This function gives you read access to underlying structure.
676    ///
677    /// Helper function for executing microtask immediately after locking the spin lock. This function
678    /// should be used in situations in which you want to minimize the risk of Time of Check Time of
679    /// Use security bugs.
680    ///
681    /// ## Example
682    /// ```
683    /// use rumtk_core::core::RUMResult;
684    /// use rumtk_core::threading::thread_primitives::SafeLock;
685    /// use rumtk_core::threading::threading_functions::{new_lock, process_read_critical_section};
686    ///
687    /// let data = 5;
688    /// let lock = new_lock(data.clone());
689    /// let result = process_read_critical_section(lock, |guard| -> RUMResult<i32> {
690    ///     Ok(*guard)
691    /// }).unwrap();
692    ///
693    /// assert_eq!(result, data, "Failed to execute critical section through which we retrieve the locked data!");
694    /// ```
695    ///
696    pub fn process_read_critical_section<T, R, F>(
697        lock: SafeLock<T>,
698        critical_section: F,
699    ) -> R
700    where 
701        F: Fn(SafeLockReadGuard<T>) -> R, T: Send + Sync + 'static,
702    {
703        tokio::task::block_in_place(move || {
704            let read_guard = lock_read(lock);
705            critical_section(read_guard)
706        })
707    }
708
709    ///
710    /// This function gives you write access to underlying structure.
711    ///
712    /// Helper function for executing microtask immediately after locking the spin lock. This function
713    /// should be used in situations in which you want to minimize the risk of Time of Check Time of
714    /// Use security bugs.
715    ///
716    /// ## Example
717    /// ```
718    /// use rumtk_core::core::RUMResult;
719    /// use rumtk_core::threading::thread_primitives::SafeLock;
720    /// use rumtk_core::threading::threading_functions::{new_lock, process_write_critical_section};
721    ///
722    /// let data = 5;
723    /// let lock = new_lock(data.clone());
724    /// let new_data = 10;
725    /// let result = process_write_critical_section(lock, |mut guard| -> RUMResult<i32> {
726    ///     *guard = new_data;
727    ///     Ok(*guard)
728    /// }).unwrap();
729    ///
730    /// assert_eq!(result, new_data, "Failed to execute critical section through which we modify the locked data!");
731    /// ```
732    ///
733    pub fn process_write_critical_section<T, R, F>(
734        lock: SafeLock<T>,
735        critical_section: F,
736    ) -> R
737    where
738        F: Fn(SafeLockWriteGuard<T>) -> R, T: Send + Sync + 'static,
739    {
740        tokio::task::block_in_place(move || {
741            let write_guard = lock_write(lock);
742            critical_section(write_guard)
743        })
744    }
745
746    ///
747    /// Obtain read guard to standard spin lock such that you have a more ergonomic interface to
748    /// locked data.
749    ///
750    /// It is preferable to use [process_read_critical_section] when you must process
751    /// critical logic that is sensitive to time of check time of use security bugs!
752    ///
753    /// ## Example
754    /// ```
755    /// use rumtk_core::threading::thread_primitives::SafeLock;
756    /// use rumtk_core::threading::threading_functions::{new_lock, lock_read};
757    ///
758    /// let data = 5;
759    /// let lock = new_lock(data.clone());
760    /// let result = *lock_read(lock);
761    ///
762    /// assert_eq!(result, data, "Failed to access the locked data!");
763    /// ```
764    ///
765    pub fn lock_read<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockReadGuard<T> {
766        block_on_task(async move {
767            lock.read_owned().await
768        })
769    }
770
771    ///
772    /// Obtain write guard to standard spin lock such that you have a more ergonomic interface to
773    /// locked data.
774    ///
775    /// It is preferable to use [process_write_critical_section] when you must process
776    /// critical logic that is sensitive to time of check time of use security bugs!
777    ///
778    /// ## Example
779    /// ```
780    /// use rumtk_core::threading::thread_primitives::SafeLock;
781    /// use rumtk_core::threading::threading_functions::{new_lock, lock_read, lock_write};
782    ///
783    /// let data = 5;
784    /// let lock = new_lock(data.clone());
785    /// let new_data = 10;
786    ///
787    /// *lock_write(lock.clone()) = new_data;
788    ///
789    /// let result = *lock_read(lock);
790    ///
791    /// assert_eq!(result, new_data, "Failed to modify the locked data!");
792    /// ```
793    ///
794    pub fn lock_write<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockWriteGuard<T> {
795        block_on_task(async move {
796            lock.write_owned().await
797        })
798    }
799}
800
801///
802/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
803/// This means that by default, all jobs sent to the thread pool have to be async in nature.
804/// These macros make handling of these jobs at the sync/async boundary more convenient.
805///
806pub mod threading_macros {
807    use crate::threading::thread_primitives;
808    use crate::threading::threading_manager::SafeTaskArgs;
809
810    ///
811    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
812    /// will be saved to the global context so the next call to this macro will simply grab a
813    /// reference to the previously initialized runtime.
814    ///
815    /// Passing nothing will default to initializing a runtime using the default number of threads
816    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
817    ///
818    /// Passing `threads` number will yield a runtime that allocates that many threads.
819    ///
820    ///
821    /// ## Examples
822    ///
823    /// ```
824    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
825    ///     use rumtk_core::core::RUMResult;
826    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
827    ///
828    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
829    ///         let mut result = Vec::<i32>::new();
830    ///         for arg in args.read().await.iter() {
831    ///             result.push(*arg);
832    ///         }
833    ///         Ok(result)
834    ///     }
835    ///
836    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
837    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
838    ///     let result = rumtk_resolve_task!(task); // Spawn's task and waits for it to conclude.
839    /// ```
840    ///
841    /// ```
842    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
843    ///     use rumtk_core::core::RUMResult;
844    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
845    ///
846    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
847    ///         let mut result = Vec::<i32>::new();
848    ///         for arg in args.read().await.iter() {
849    ///             result.push(*arg);
850    ///         }
851    ///         Ok(result)
852    ///     }
853    ///
854    ///     let thread_count: usize = 10;
855    ///     let args = rumtk_create_task_args!(1);
856    ///     let task = rumtk_create_task!(test, args);
857    ///     let result = rumtk_resolve_task!(task);
858    /// ```
859    #[macro_export]
860    macro_rules! rumtk_init_threads {
861        ( ) => {{
862            use $crate::threading::threading_functions::{
863                get_default_system_thread_count, init_runtime,
864            };
865            init_runtime(get_default_system_thread_count())
866        }};
867        ( $threads:expr ) => {{
868            use $crate::rumtk_cache_fetch;
869            use $crate::threading::threading_functions::init_runtime;
870            init_runtime($threads)
871        }};
872    }
873
874    ///
875    /// Puts task onto the runtime queue.
876    ///
877    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
878    ///
879    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
880    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
881    ///
882    #[macro_export]
883    macro_rules! rumtk_spawn_task {
884        ( $func:expr ) => {{
885            use $crate::rumtk_init_threads;
886            let rt = rumtk_init_threads!();
887            rt.spawn($func)
888        }};
889        ( $rt:expr, $func:expr ) => {{
890            $rt.spawn($func)
891        }};
892    }
893
894    #[macro_export]
895    macro_rules! rumtk_spawn_blocking_task {
896        ( $func:expr ) => {{
897            use $crate::threading::threading_functions::spawn_blocking_sync_task;
898            spawn_blocking_sync_task($func)
899        }}
900    }
901
902    ///
903    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
904    ///
905    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
906    /// async closure without passing any arguments.
907    ///
908    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
909    /// In such a case, we pass those arguments to the call on the async closure and await on results.
910    ///
911    #[macro_export]
912    macro_rules! rumtk_wait_on_task {
913        ( $func:expr ) => {{
914            use $crate::threading::threading_functions::block_on_task;
915            block_on_task(async move {
916                $func().await
917            })
918        }};
919        ( $func:expr, $($arg_items:expr),+ ) => {{
920            use $crate::threading::threading_functions::block_on_task;
921            block_on_task(async move {
922                $func($($arg_items),+).await
923            })
924        }};
925    }
926
927    ///
928    /// This macro awaits a future.
929    ///
930    /// The arguments are a reference to the runtime (`rt) and a future.
931    ///
932    /// If there is a result, you will get the result of the future.
933    ///
934    /// ## Examples
935    ///
936    /// ```
937    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
938    ///     use rumtk_core::core::RUMResult;
939    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
940    ///
941    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
942    ///         let mut result = Vec::<i32>::new();
943    ///         for arg in args.read().await.iter() {
944    ///             result.push(*arg);
945    ///         }
946    ///         Ok(result)
947    ///     }
948    ///
949    ///     let args = rumtk_create_task_args!(1);
950    ///     let task = rumtk_create_task!(test, args);
951    ///     let result = rumtk_resolve_task!(task);
952    /// ```
953    ///
954    #[macro_export]
955    macro_rules! rumtk_resolve_task {
956        ( $future:expr ) => {{
957            use $crate::threading::threading_functions::block_on_task;
958            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
959            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
960            // is technically moved into the async closure and captured so things like mutex guards
961            // technically go out of the outer scope. As a result that expression fails to compile even
962            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
963            // into the async closure. To ensure that happens regardless of given expression, we do
964            // a variable assignment below to force the "future" macro expressions to resolve before
965            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
966            //let future = $future;
967            block_on_task(async move { $future.await })
968        }};
969    }
970
971    ///
972    /// This macro allows to resolve a `sync` closure that was executed in a safe thread.
973    /// You cannot run this macro outside the `async` context.
974    ///
975    #[macro_export]
976    macro_rules! rumtk_resolve_sync_task {
977        ( $closure:expr ) => {{
978            use $crate::threading::threading_functions::spawn_blocking_sync_task;
979            use $crate::strings::rumtk_format;
980            match spawn_blocking_sync_task($closure).await {
981                Ok(result) => result,
982                Err(e) => Err(rumtk_format!("Issue with blocking task => {}", e))
983            }
984        }};
985    }
986
987    ///
988    /// This macro creates an async body that calls the async closure and awaits it.
989    ///
990    /// ## Example
991    ///
992    /// ```
993    /// use std::sync::{Arc, RwLock};
994    /// use tokio::sync::RwLock as AsyncRwLock;
995    /// use rumtk_core::strings::RUMString;
996    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
997    ///
998    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
999    /// let expected = vec![
1000    ///     RUMString::from("Hello"),
1001    ///     RUMString::from("World!"),
1002    ///     RUMString::from("Overcast"),
1003    ///     RUMString::from("and"),
1004    ///     RUMString::from("Sad"),
1005    ///  ];
1006    /// let locked_args = AsyncRwLock::new(expected.clone());
1007    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
1008    ///
1009    ///
1010    /// ```
1011    ///
1012    #[macro_export]
1013    macro_rules! rumtk_create_task {
1014        ( $func:expr ) => {{
1015            async move {
1016                let f = $func;
1017                f().await
1018            }
1019        }};
1020        ( $func:expr, $args:expr ) => {{
1021            let f = $func;
1022            async move { f(&$args).await }
1023        }};
1024    }
1025
1026    ///
1027    /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
1028    ///
1029    /// ## Note
1030    ///
1031    /// All arguments must be of the same type
1032    ///
1033    #[macro_export]
1034    macro_rules! rumtk_create_task_args {
1035        ( ) => {{
1036            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
1037            use $crate::threading::thread_primitives::AsyncRwLock;
1038            SafeTaskArgs::new(AsyncRwLock::new(vec![]))
1039        }};
1040        ( $($args:expr),+ ) => {{
1041            use $crate::threading::threading_manager::{SafeTaskArgs};
1042            use $crate::threading::thread_primitives::AsyncRwLock;
1043            SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
1044        }};
1045    }
1046
1047    ///
1048    /// Convenience macro for packaging the task components and launching the task in one line.
1049    ///
1050    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
1051    /// number of threads at the end. This is optional. Meaning, we will default to the system's
1052    /// number of threads if that value is not specified.
1053    ///
1054    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
1055    /// variable number of arguments to pass to the task. each argument must be of the same type.
1056    /// If you wish to pass different arguments with different types, please define an abstract type
1057    /// whose underlying structure is a tuple of items and pass that instead.
1058    ///
1059    /// ## Examples
1060    ///
1061    /// ### With Default Thread Count
1062    /// ```
1063    ///     use rumtk_core::{rumtk_exec_task};
1064    ///     use rumtk_core::core::RUMResult;
1065    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1066    ///
1067    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1068    ///         let mut result = Vec::<i32>::new();
1069    ///         for arg in args.read().await.iter() {
1070    ///             result.push(*arg);
1071    ///         }
1072    ///         Ok(result)
1073    ///     }
1074    ///
1075    ///     let result = rumtk_exec_task!(test, vec![5]).unwrap();
1076    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1077    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1078    /// ```
1079    ///
1080    /// ### With Custom Thread Count
1081    /// ```
1082    ///     use rumtk_core::{rumtk_exec_task};
1083    ///     use rumtk_core::core::RUMResult;
1084    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1085    ///
1086    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1087    ///         let mut result = Vec::<i32>::new();
1088    ///         for arg in args.read().await.iter() {
1089    ///             result.push(*arg);
1090    ///         }
1091    ///         Ok(result)
1092    ///     }
1093    ///
1094    ///     let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
1095    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1096    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1097    /// ```
1098    ///
1099    /// ### With Async Function Body
1100    /// ```
1101    ///     use rumtk_core::{rumtk_exec_task};
1102    ///     use rumtk_core::core::RUMResult;
1103    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1104    ///
1105    ///     let result = rumtk_exec_task!(
1106    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
1107    ///         let mut result = Vec::<i32>::new();
1108    ///         for arg in args.read().await.iter() {
1109    ///             result.push(*arg);
1110    ///         }
1111    ///         Ok(result)
1112    ///     },
1113    ///     vec![5]).unwrap();
1114    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1115    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1116    /// ```
1117    ///
1118    /// ### With Async Function Body and No Args
1119    /// ```
1120    ///     use rumtk_core::{rumtk_exec_task};
1121    ///     use rumtk_core::core::RUMResult;
1122    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1123    ///
1124    ///     let result = rumtk_exec_task!(
1125    ///     async || -> RUMResult<Vec<i32>> {
1126    ///         let mut result = Vec::<i32>::new();
1127    ///         Ok(result)
1128    ///     }).unwrap();
1129    ///     let empty = Vec::<i32>::new();
1130    ///     assert_eq!(&result.clone(), &empty, "Results mismatch");
1131    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1132    /// ```
1133    ///
1134    /// ## Equivalent To
1135    ///
1136    /// ```no_run
1137    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
1138    ///     use rumtk_core::core::RUMResult;
1139    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1140    ///
1141    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1142    ///         let mut result = Vec::<i32>::new();
1143    ///         for arg in args.read().await.iter() {
1144    ///             result.push(*arg);
1145    ///         }
1146    ///         Ok(result)
1147    ///     }
1148    ///
1149    ///     let args = rumtk_create_task_args!(1);
1150    ///     let task = rumtk_create_task!(test, args);
1151    ///     let result = rumtk_resolve_task!(task);
1152    /// ```
1153    ///
1154    #[macro_export]
1155    macro_rules! rumtk_exec_task {
1156        ($func:expr ) => {{
1157            use $crate::{
1158                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1159            };
1160            let task = rumtk_create_task!($func);
1161            rumtk_resolve_task!(task)
1162        }};
1163        ($func:expr, $args:expr ) => {{
1164            use $crate::threading::threading_functions::get_default_system_thread_count;
1165            rumtk_exec_task!($func, $args, get_default_system_thread_count())
1166        }};
1167        ($func:expr, $args:expr , $threads:expr ) => {{
1168            use $crate::threading::thread_primitives::AsyncRwLock;
1169            use $crate::{
1170                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1171            };
1172            let args = SafeTaskArgs::new(AsyncRwLock::new($args));
1173            let task = rumtk_create_task!($func, args);
1174            rumtk_resolve_task!(task)
1175        }};
1176    }
1177
1178    ///
1179    /// Sleep a duration of time in a sync context, so no await can be call on the result.
1180    ///
1181    /// You can pass any value that can be cast to f32.
1182    ///
1183    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1184    ///
1185    /// ## Examples
1186    ///
1187    /// ```
1188    ///     use rumtk_core::rumtk_sleep;
1189    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
1190    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
1191    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
1192    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
1193    /// ```
1194    ///
1195    #[macro_export]
1196    macro_rules! rumtk_sleep {
1197        ( $dur:expr) => {{
1198            use $crate::threading::threading_functions::sleep;
1199            sleep($dur as f32)
1200        }};
1201    }
1202
1203    ///
1204    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1205    ///
1206    /// You can pass any value that can be cast to f32.
1207    ///
1208    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1209    ///
1210    /// ## Examples
1211    ///
1212    /// ```
1213    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1214    ///     use rumtk_core::core::RUMResult;
1215    ///     rumtk_exec_task!( async || -> RUMResult<()> {
1216    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
1217    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
1218    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
1219    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1220    ///             Ok(())
1221    ///         }
1222    ///     );
1223    /// ```
1224    ///
1225    #[macro_export]
1226    macro_rules! rumtk_async_sleep {
1227        ( $dur:expr) => {{
1228            use $crate::threading::threading_functions::async_sleep;
1229            async_sleep($dur as f32)
1230        }};
1231    }
1232
1233    ///
1234    ///
1235    ///
1236    #[macro_export]
1237    macro_rules! rumtk_new_task_queue {
1238        ( $worker_num:expr ) => {{
1239            use $crate::threading::threading_manager::TaskManager;
1240            TaskManager::new($worker_num);
1241        }};
1242    }
1243
1244    ///
1245    /// Creates a new safe lock to guard the given data. This interface was created to cleanup lock
1246    /// management for consumers of framework!
1247    ///
1248    /// ## Example
1249    /// ```
1250    /// use rumtk_core::{rumtk_new_lock};
1251    ///
1252    /// let data = 5;
1253    /// let lock = rumtk_new_lock!(data);
1254    /// ```
1255    ///
1256    #[macro_export]
1257    macro_rules! rumtk_new_lock {
1258        ( $data:expr ) => {{
1259            use $crate::threading::threading_functions::new_lock;
1260            new_lock($data)
1261        }};
1262    }
1263
1264    ///
1265    /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1266    /// critical section. The critical section itself is a synchronous function or closure. In this case,
1267    /// the critical section simply retrieves a value from a guarded dataset.
1268    ///
1269    /// ## Example
1270    /// ```
1271    /// use rumtk_core::core::RUMResult;
1272    /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_read};
1273    ///
1274    /// let data = 5;
1275    /// let lock = rumtk_new_lock!(data);
1276    /// let result = rumtk_critical_section_read!(
1277    ///     lock,
1278    ///     |guard| -> RUMResult<i32> {
1279    ///         let result: i32 = *guard;
1280    ///         Ok(result)
1281    ///     }
1282    /// ).expect("No errors locking!");
1283    ///
1284    /// assert_eq!(result, data, "Critical section yielded invalid result!");
1285    /// ```
1286    ///
1287    #[macro_export]
1288    macro_rules! rumtk_critical_section_read {
1289        ( $lock:expr, $function:expr ) => {{
1290            use $crate::threading::threading_functions::process_read_critical_section;
1291            process_read_critical_section($lock, $function)
1292        }};
1293    }
1294
1295    ///
1296    /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1297    /// critical section. The critical section itself is a synchronous function or closure. In this case,
1298    /// the critical section attempts to modify the internal state of a guarded dataset.
1299    ///
1300    /// ## Example
1301    /// ```
1302    /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_write};
1303    ///
1304    /// let data = 5;
1305    /// let new_data = 10;
1306    /// let lock = rumtk_new_lock!(data);
1307    /// let result = rumtk_critical_section_write!(
1308    ///     lock,
1309    ///     |mut guard| {
1310    ///         *guard = new_data;
1311    ///     }
1312    /// );
1313    ///
1314    /// assert_eq!(result, (), "Critical section yielded invalid result!");
1315    /// ```
1316    ///
1317    #[macro_export]
1318    macro_rules! rumtk_critical_section_write {
1319        ( $lock:expr, $function:expr ) => {{
1320            use $crate::threading::threading_functions::process_write_critical_section;
1321            process_write_critical_section($lock, $function)
1322        }};
1323    }
1324
1325    ///
1326    /// Framework interface to obtain a `read` guard to the locked data.
1327    /// To access the internal data, you will need to dereference the guard (`*guard`).
1328    ///
1329    /// It is preferred to use [rumtk_critical_section_read] if you need to avoid `time of check time
1330    /// of use` security bugs.
1331    ///
1332    /// ## Example
1333    /// ```
1334    /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read};
1335    ///
1336    /// let data = 5;
1337    /// let lock = rumtk_new_lock!(data.clone());
1338    /// let result = *rumtk_lock_read!(lock);
1339    ///
1340    /// assert_eq!(result, data, "Failed to access locked data.");
1341    /// ```
1342    ///
1343    #[macro_export]
1344    macro_rules! rumtk_lock_read {
1345        ( $lock:expr ) => {{
1346            use $crate::threading::threading_functions::lock_read;
1347            lock_read($lock.clone())
1348        }};
1349    }
1350
1351    ///
1352    /// Framework interface to obtain a `write` guard to the locked data.
1353    /// To access the internal data, you will need to dereference the guard (`*guard`).
1354    ///
1355    /// It is preferred to use [rumtk_critical_section_write] if you need to avoid `time of check time
1356    /// of use` security bugs.
1357    ///
1358    /// ## Example
1359    /// ```
1360    /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read, rumtk_lock_write};
1361    ///
1362    /// let data = 5;
1363    /// let lock = rumtk_new_lock!(data.clone());
1364    /// let new_data = 10;
1365    ///
1366    /// *rumtk_lock_write!(lock) = new_data;
1367    /// let result = *rumtk_lock_read!(lock);
1368    ///
1369    /// assert_eq!(result, new_data, "Failed to modify locked data.");
1370    /// ```
1371    ///
1372    #[macro_export]
1373    macro_rules! rumtk_lock_write {
1374        ( $lock:expr ) => {{
1375            use $crate::threading::threading_functions::lock_write;
1376            lock_write($lock.clone())
1377        }};
1378    }
1379}