Skip to main content

rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19 */
20
21///
22/// This module provides all the primitives needed to build a multithreaded application.
23///
24pub mod thread_primitives {
25    pub use std::sync::Mutex as SyncMutex;
26    pub use std::sync::MutexGuard as SyncMutexGuard;
27    pub use std::sync::RwLock as SyncRwLock;
28    use std::sync::{Arc, OnceLock};
29    pub use tokio::io;
30    pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
31    use tokio::runtime::Runtime as TokioRuntime;
32    pub use tokio::sync::{
33        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard,
34        OwnedRwLockReadGuard as AsyncOwnedRwLockMappedReadGuard,
35        OwnedRwLockReadGuard as AsyncOwnedRwLockReadGuard,
36        OwnedRwLockWriteGuard as AsyncOwnedRwLockWriteGuard, RwLock as AsyncRwLock,
37        RwLockMappedWriteGuard as AsyncRwLockMappedWriteGuard,
38        RwLockReadGuard as AsyncRwLockReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
39    };
40
41    /**************************** Types ***************************************/
42    pub type SafeLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
43    pub type MappedLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
44    pub type SafeLockWriteGuard<T> = AsyncOwnedRwLockWriteGuard<T>;
45    pub type SafeLock<T> = Arc<AsyncRwLock<T>>;
46    pub type SafeTokioRuntime = OnceLock<TokioRuntime>;
47}
48
49pub mod threading_manager {
50    use crate::core::{RUMResult, RUMVec};
51    use crate::strings::rumtk_format;
52    use crate::threading::thread_primitives::SafeLock;
53    use crate::threading::threading_functions::{async_sleep, sleep};
54    use crate::types::{RUMHashMap, RUMID};
55    use crate::{rumtk_init_threads, rumtk_resolve_task, threading};
56    use std::fmt::Debug;
57    use std::future::Future;
58    use std::sync::Arc;
59    pub use std::sync::RwLock as SyncRwLock;
60    use tokio::io::AsyncReadExt;
61    use tokio::task::JoinHandle;
62
63    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64    const DEFAULT_TASK_CAPACITY: usize = 100;
65
66    pub type AsyncHandle<T> = JoinHandle<T>;
67    pub type TaskItems<T> = RUMVec<T>;
68    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
69    pub type TaskArgs<T> = TaskItems<T>;
70    /// Function signature defining the interface of task processing logic.
71    pub type SafeTaskArgs<T> = SafeLock<TaskItems<T>>;
72    pub type AsyncTaskHandle<R> = AsyncHandle<TaskResult<R>>;
73    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
74    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
75    pub type TaskID = RUMID;
76
77    #[derive(Debug, Clone, Default)]
78    pub struct Task<R> {
79        pub id: TaskID,
80        pub finished: bool,
81        pub result: Option<R>,
82    }
83
84    pub type SafeTask<R> = Arc<Task<R>>;
85    type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
86    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
87    pub type SafeAsyncTaskTable<R> = SafeLock<TaskTable<R>>;
88    pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
89    pub type TaskBatch = RUMVec<TaskID>;
90    /// Type to use to define how task results are expected to be returned.
91    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
92    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
93
94    ///
95    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
96    /// gives the multithreading, asynchronous superpowers to synchronous logic.
97    ///
98    /// ## Example Usage
99    ///
100    /// ```
101    /// use std::sync::{Arc};
102    /// use tokio::sync::RwLock as AsyncRwLock;
103    /// use rumtk_core::core::RUMResult;
104    /// use rumtk_core::strings::RUMString;
105    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
106    /// use rumtk_core::{rumtk_create_task, };
107    ///
108    /// let expected = vec![
109    ///     RUMString::from("Hello"),
110    ///     RUMString::from("World!"),
111    ///     RUMString::from("Overcast"),
112    ///     RUMString::from("and"),
113    ///     RUMString::from("Sad"),
114    ///  ];
115    ///
116    /// type TestResult = RUMResult<Vec<RUMString>>;
117    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
118    ///
119    /// let locked_args = AsyncRwLock::new(expected.clone());
120    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
121    /// let processor = rumtk_create_task!(
122    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
123    ///         let owned_args = Arc::clone(args);
124    ///         let locked_args = owned_args.read().await;
125    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
126    ///
127    ///         for arg in locked_args.iter() {
128    ///             results.push(RUMString::from(arg));
129    ///         }
130    ///
131    ///         Ok(results)
132    ///     },
133    ///     task_args
134    /// );
135    ///
136    /// queue.add_task::<_>(processor);
137    /// let results = queue.wait();
138    ///
139    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
140    /// for r in results {
141    ///     for v in r.unwrap().result.clone().unwrap().iter() {
142    ///         for value in v.iter() {
143    ///             result_data.push(value.clone());
144    ///         }
145    ///     }
146    ///  }
147    ///
148    /// assert_eq!(result_data, expected, "Results do not match expected!");
149    ///
150    /// ```
151    ///
152    #[derive(Debug, Clone, Default)]
153    pub struct TaskManager<R> {
154        tasks: SafeSyncTaskTable<R>,
155        workers: usize,
156    }
157
158    impl<R> TaskManager<R>
159    where
160        R: Debug + Sync + Send + Clone + 'static,
161    {
162        ///
163        /// This method creates a [`TaskManager`] instance using sensible defaults.
164        ///
165        /// The `threads` field is computed from the number of cores present in system.
166        ///
167        pub fn default() -> RUMResult<TaskManager<R>> {
168            Self::new(&threading::threading_functions::get_default_system_thread_count())
169        }
170
171        ///
172        /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
173        /// Expects you to provide the count of threads to spawn and the microtask queue size
174        /// allocated by each thread.
175        ///
176        /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
177        /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
178        ///
179        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
180            let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
181                DEFAULT_TASK_CAPACITY,
182            )));
183            Ok(TaskManager::<R> {
184                tasks,
185                workers: worker_num.to_owned(),
186            })
187        }
188
189        ///
190        /// Add a task to the processing queue. The idea is that you can queue a processor function
191        /// and list of args that will be picked up by one of the threads for processing.
192        ///
193        /// This is the async counterpart
194        ///
195        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
196        where
197            F: Future<Output = R> + Send + Sync + 'static,
198            F::Output: Send + 'static,
199        {
200            let id = TaskID::new_v4();
201            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
202        }
203
204        ///
205        /// See [`Self::add_task_async`]
206        ///
207        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
208        /// the tokio runtim if trying to add task to queue from a normal function called from an
209        /// async environment.
210        ///
211        /// ## Example
212        ///
213        /// ```
214        /// use rumtk_core::threading::threading_manager::{TaskManager};
215        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
216        /// use std::sync::{Arc, LazyLock};
217        ///
218        /// type JobManager = LazyLock<TaskManager<usize>>;
219        /// static mut manager: JobManager = LazyLock::new( || TaskManager::new(&5).unwrap());
220        ///
221        /// async fn called_fn() -> usize {
222        ///     5
223        /// }
224        ///
225        /// fn push_job() -> usize {
226        ///     unsafe {(*manager).spawn_task(called_fn())};
227        ///     1
228        /// }
229        ///
230        /// async fn call_sync_fn() -> usize {
231        ///     push_job()
232        /// }
233        ///
234        /// unsafe {(*manager).spawn_task(call_sync_fn())};
235        ///
236        /// let result_raw = unsafe {(*manager).wait()};
237        ///
238        /// ```
239        ///
240        pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
241        where
242            F: Future<Output = R> + Send + Sync + 'static,
243            F::Output: Send + Sized + 'static,
244        {
245            let id = TaskID::new_v4();
246            let tasks = self.tasks.clone();
247            rumtk_init_threads!(self.workers);
248            Ok(rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task)))
249        }
250
251        ///
252        /// See [add_task_async](Self::add_task_async)
253        ///
254        pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
255        where
256            F: Future<Output = R> + Send + Sync + 'static,
257            F::Output: Send + Sized + 'static,
258        {
259            self.spawn_task(task)
260        }
261
262        async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
263        where
264            F: Future<Output = R> + Send + Sync + 'static,
265            F::Output: Send + Sized + 'static,
266        {
267            let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
268                id: id.clone(),
269                finished: false,
270                result: None,
271            }));
272            tasks.write().unwrap().insert(id.clone(), safe_task.clone());
273
274            let task_wrapper = async move || {
275                // Run the task
276                let result = task.await;
277
278                // Cleanup task
279                let mut lock = safe_task.write().unwrap();
280                lock.result = Some(result);
281                lock.finished = true;
282            };
283
284            tokio::spawn(task_wrapper());
285
286            id
287        }
288
289        ///
290        /// See [wait_async](Self::wait_async)
291        ///
292        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
293        /// this function happens to be called from the async context.
294        ///
295        pub fn wait(&mut self) -> TaskResults<R> {
296            let task_batch = self
297                .tasks
298                .read()
299                .unwrap()
300                .keys()
301                .cloned()
302                .collect::<Vec<_>>();
303            self.wait_on_batch(&task_batch)
304        }
305
306        ///
307        /// See [wait_on_batch_async](Self::wait_on_batch_async)
308        ///
309        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
310        /// this function happens to be called from the async context.
311        ///
312        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
313            let mut results = TaskResults::<R>::default();
314            for task in tasks {
315                results.push(self.wait_on(task));
316            }
317            results
318        }
319
320        ///
321        /// See [wait_on_async](Self::wait_on_async)
322        ///
323        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
324        /// this function happens to be called from the async context.
325        ///
326        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
327            let task = match self.tasks.write().unwrap().remove(task_id) {
328                Some(task) => task.clone(),
329                None => return Err(rumtk_format!("No task with id {}", task_id)),
330            };
331
332            while !task.read().unwrap().finished {
333                sleep(DEFAULT_SLEEP_DURATION);
334            }
335
336            let x = Ok(Arc::new(task.write().unwrap().clone()));
337            x
338        }
339
340        ///
341        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
342        ///
343        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
344        ///
345        /// Upon completion,
346        ///
347        /// 2. Return the result ([TaskResults<R>](TaskResults)).
348        ///
349        /// This operation consumes the task.
350        ///
351        /// ### Note:
352        /// ```text
353        ///     Results returned here are not guaranteed to be in the same order as the order in which
354        ///     the tasks were queued for work. You will need to pass a type as T that automatically
355        ///     tracks its own id or has a way for you to resort results.
356        /// ```
357        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
358            let task = match self.tasks.write().unwrap().remove(task_id) {
359                Some(task) => task.clone(),
360                None => return Err(rumtk_format!("No task with id {}", task_id)),
361            };
362
363            while !task.read().unwrap().finished {
364                async_sleep(DEFAULT_SLEEP_DURATION).await;
365            }
366
367            let x = Ok(Arc::new(task.write().unwrap().clone()));
368            x
369        }
370
371        ///
372        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
373        ///
374        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
375        ///
376        /// Upon completion,
377        ///
378        /// 1. We collect the results generated (if any).
379        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
380        ///
381        /// ### Note:
382        /// ```text
383        ///     Results returned here are not guaranteed to be in the same order as the order in which
384        ///     the tasks were queued for work. You will need to pass a type as T that automatically
385        ///     tracks its own id or has a way for you to resort results.
386        /// ```
387        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
388            let mut results = TaskResults::<R>::default();
389            for task in tasks {
390                results.push(self.wait_on_async(task).await);
391            }
392            results
393        }
394
395        ///
396        /// This method waits until all queued tasks have been processed from the main queue.
397        ///
398        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
399        ///
400        /// Upon completion,
401        ///
402        /// 1. We collect the results generated (if any).
403        /// 2. We reset the main task and result internal queue states.
404        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
405        ///
406        /// This operation consumes all the tasks.
407        ///
408        /// ### Note:
409        /// ```text
410        ///     Results returned here are not guaranteed to be in the same order as the order in which
411        ///     the tasks were queued for work. You will need to pass a type as T that automatically
412        ///     tracks its own id or has a way for you to resort results.
413        /// ```
414        pub async fn wait_async(&mut self) -> TaskResults<R> {
415            let task_batch = self
416                .tasks
417                .read()
418                .unwrap()
419                .keys()
420                .cloned()
421                .collect::<Vec<_>>();
422            self.wait_on_batch_async(&task_batch).await
423        }
424
425        ///
426        /// Check if all work has been completed from the task queue.
427        ///
428        /// ## Examples
429        ///
430        /// ### Sync Usage
431        ///
432        ///```
433        /// use rumtk_core::threading::threading_manager::TaskManager;
434        ///
435        /// let manager = TaskManager::<usize>::new(&4).unwrap();
436        ///
437        /// let all_done = manager.is_all_completed();
438        ///
439        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
440        ///
441        /// ```
442        ///
443        pub fn is_all_completed(&self) -> bool {
444            self._is_all_completed_async()
445        }
446
447        pub async fn is_all_completed_async(&self) -> bool {
448            self._is_all_completed_async()
449        }
450
451        fn _is_all_completed_async(&self) -> bool {
452            for (_, task) in self.tasks.read().unwrap().iter() {
453                if !task.read().unwrap().finished {
454                    return false;
455                }
456            }
457
458            true
459        }
460
461        ///
462        /// Check if a task completed
463        ///
464        pub fn is_finished(&self, id: &TaskID) -> bool {
465            match self.tasks.read().unwrap().get(id) {
466                Some(t) => t.read().unwrap().finished,
467                None => false,
468            }
469        }
470
471        pub async fn is_finished_async(&self, id: &TaskID) -> bool {
472            match self.tasks.read().unwrap().get(id) {
473                Some(task) => task.read().unwrap().finished,
474                None => true,
475            }
476        }
477
478        ///
479        /// Alias for [wait](TaskManager::wait).
480        ///
481        fn gather(&mut self) -> TaskResults<R> {
482            self.wait()
483        }
484
485        pub fn has_job(&self, id: &TaskID) -> bool {
486            match self.tasks.read().unwrap().get(id) {
487                Some(_) => true,
488                None => false,
489            }
490        }
491    }
492}
493
494///
495/// This module contains a few helper.
496///
497/// For example, you can find a function for determining number of threads available in system.
498/// The sleep family of functions are also here.
499///
500pub mod threading_functions {
501    use crate::core::RUMResult;
502    use crate::net::tcp::{AsyncOwnedRwLockReadGuard, AsyncOwnedRwLockWriteGuard, SafeLockReadGuard, SafeLockWriteGuard, SafeTokioRuntime};
503    use crate::threading::thread_primitives::{AsyncRwLock, SafeLock};
504    use num_cpus;
505    use std::future::Future;
506    use std::sync::Arc;
507    use std::thread::{available_parallelism, sleep as std_sleep};
508    use std::time::Duration;
509    use tokio::runtime::Runtime;
510    use tokio::task::JoinHandle;
511    use tokio::time::sleep as tokio_sleep;
512    /**************************** Globals **************************************/
513    static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
514
515    pub const NANOS_PER_SEC: u64 = 1000000000;
516    pub const MILLIS_PER_SEC: u64 = 1000;
517    pub const MICROS_PER_SEC: u64 = 1000000;
518    const DEFAULT_SLEEP_DURATION: f32 = 0.001;
519    /**************************** Helpers **************************************/
520    pub fn init_runtime<'a>(workers: usize) -> &'a Runtime {
521        unsafe {
522            let runtime = DEFAULT_RUNTIME.get_or_init(|| {
523                let mut builder = tokio::runtime::Builder::new_multi_thread();
524                builder.worker_threads(workers);
525                builder.enable_all();
526                match builder.build() {
527                    Ok(handle) => handle,
528                    Err(e) => panic!(
529                        "Unable to initialize threading tokio runtime because {}!",
530                        &e
531                    ),
532                }
533            });
534            runtime
535        }
536    }
537
538    pub fn get_default_system_thread_count() -> usize {
539        let cpus: usize = num_cpus::get();
540        let parallelism = match available_parallelism() {
541            Ok(n) => n.get(),
542            Err(_) => 0,
543        };
544
545        if parallelism >= cpus {
546            parallelism
547        } else {
548            cpus
549        }
550    }
551
552    pub fn sleep(s: f32) {
553        let ns = s * NANOS_PER_SEC as f32;
554        let rounded_ns = ns.round() as u64;
555        let duration = Duration::from_nanos(rounded_ns);
556        std_sleep(duration);
557    }
558
559    pub async fn async_sleep(s: f32) {
560        let ns = s * NANOS_PER_SEC as f32;
561        let rounded_ns = ns.round() as u64;
562        let duration = Duration::from_nanos(rounded_ns);
563        tokio_sleep(duration).await;
564    }
565
566    ///
567    /// Given a closure task, push it onto the current `tokio` runtime for execution.
568    /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
569    /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
570    /// result.
571    ///
572    /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
573    ///
574    /// ## Example
575    ///
576    /// ```
577    /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
578    ///
579    /// const Hello: &str = "World!";
580    ///
581    /// init_runtime(5);
582    ///
583    /// let result = block_on_task(async {
584    ///     Hello
585    /// });
586    ///
587    /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
588    /// ```
589    ///
590    /// ## Notes
591    /// ```text
592    ///     You need to wrap our call to block_on with a call to tokio::task::block_in_place to force
593    ///     cleanup of async executor and therefore avoid panics from the tokio runtime!
594    ///     Per Tokio's documentation, spawn_blocking would be better since it moves the task to an
595    ///     executor meant for blocking tasks instead of moving tasks out of the current thread and
596    ///     converting the thread into a clocking executor. The reason we don't do that is because
597    ///     the call to this function expects to block the current thread until completion and then
598    ///     return the result. If there's an issue with IO, revisit this function.
599    ///
600    ///     https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
601    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on
602    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
603    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
604    /// ```
605    ///
606    pub fn block_on_task<R, F>(task: F) -> R
607    where
608        F: Future<Output = R> + Send + 'static,
609        F::Output: Send + 'static,
610    {
611        let rt = init_runtime(get_default_system_thread_count());
612        // You need to wrap our call to block_on with a call to tokio::task::block_in_place to force 
613        // cleanup of async executor and therefore avoid panics from the tokio runtime!
614        // Per Tokio's documentation, spawn_blocking would be better since it moves the task to an 
615        // executor meant for blocking tasks instead of moving tasks out of the current thread and 
616        // converting the thread into a clocking executor. The reason we don't do that is because 
617        // the call to this function expects to block the current thread until completion and then 
618        // return the result. If there's an issue with IO, revisit this function.
619        //
620        // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
621        // https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on
622        // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
623        // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
624        tokio::task::block_in_place(move || {
625            rt.block_on(task)
626        })
627    }
628
629    ///
630    /// This helper should be used for spawning tasks that would normally block the async runtime.
631    /// However, here we use the appropriate `tokio` facilities to signal the runtime on how
632    /// to handle this, potentially blocking, task. For waiting on potentially blocking futures, use
633    /// [block_on_task] instead!
634    ///
635    ///
636    ///
637    /// ## Notes
638    /// ```text
639    ///     You need to wrap our call to block_on with a call to tokio::task::block_in_place to force
640    ///     cleanup of async executor and therefore avoid panics from the tokio runtime!
641    ///     Per Tokio's documentation, spawn_blocking would be better since it moves the task to an
642    ///     executor meant for blocking tasks instead of moving tasks out of the current thread and
643    ///     converting the thread into a clocking executor. The reason we don't do that is because
644    ///     the call to this function expects to block the current thread until completion and then
645    ///     return the result. If there's an issue with IO, revisit this function.
646    ///
647    ///     https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
648    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on
649    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
650    ///     https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
651    /// ```
652    ///
653    pub fn spawn_blocking_sync_task<R, F>(task: F) -> JoinHandle<R>
654    where
655        F: FnOnce() -> R + Send + 'static,
656        R: Send + 'static,
657    {
658        let rt = init_runtime(get_default_system_thread_count());
659        rt.spawn_blocking(task)
660    }
661
662    pub fn new_lock<T>(data: T) -> SafeLock<T> {
663        Arc::new(AsyncRwLock::new(data))
664    }
665
666    ///
667    /// This function gives you read access to underlying structure.
668    ///
669    /// Helper function for executing microtask immediately after locking the spin lock. This function
670    /// should be used in situations in which you want to minimize the risk of Time of Check Time of
671    /// Use security bugs.
672    ///
673    /// ## Example
674    /// ```
675    /// use rumtk_core::core::RUMResult;
676    /// use rumtk_core::threading::thread_primitives::SafeLock;
677    /// use rumtk_core::threading::threading_functions::{new_lock, process_read_critical_section};
678    ///
679    /// let data = 5;
680    /// let lock = new_lock(data.clone());
681    /// let result = process_read_critical_section(lock, |guard| -> RUMResult<i32> {
682    ///     Ok(*guard)
683    /// }).unwrap();
684    ///
685    /// assert_eq!(result, data, "Failed to execute critical section through which we retrieve the locked data!");
686    /// ```
687    ///
688    pub fn process_read_critical_section<T, R, F>(
689        lock: SafeLock<T>,
690        critical_section: F,
691    ) -> R
692    where 
693        F: Fn(SafeLockReadGuard<T>) -> R, T: Send + Sync + 'static,
694    {
695        tokio::task::block_in_place(move || {
696            let read_guard = lock_read(lock);
697            critical_section(read_guard)
698        })
699    }
700
701    ///
702    /// This function gives you write access to underlying structure.
703    ///
704    /// Helper function for executing microtask immediately after locking the spin lock. This function
705    /// should be used in situations in which you want to minimize the risk of Time of Check Time of
706    /// Use security bugs.
707    ///
708    /// ## Example
709    /// ```
710    /// use rumtk_core::core::RUMResult;
711    /// use rumtk_core::threading::thread_primitives::SafeLock;
712    /// use rumtk_core::threading::threading_functions::{new_lock, process_write_critical_section};
713    ///
714    /// let data = 5;
715    /// let lock = new_lock(data.clone());
716    /// let new_data = 10;
717    /// let result = process_write_critical_section(lock, |mut guard| -> RUMResult<i32> {
718    ///     *guard = new_data;
719    ///     Ok(*guard)
720    /// }).unwrap();
721    ///
722    /// assert_eq!(result, new_data, "Failed to execute critical section through which we modify the locked data!");
723    /// ```
724    ///
725    pub fn process_write_critical_section<T, R, F>(
726        lock: SafeLock<T>,
727        critical_section: F,
728    ) -> R
729    where
730        F: Fn(SafeLockWriteGuard<T>) -> R, T: Send + Sync + 'static,
731    {
732        tokio::task::block_in_place(move || {
733            let write_guard = lock_write(lock);
734            critical_section(write_guard)
735        })
736    }
737
738    ///
739    /// Obtain read guard to standard spin lock such that you have a more ergonomic interface to
740    /// locked data.
741    ///
742    /// It is preferable to use [process_read_critical_section] when you must process
743    /// critical logic that is sensitive to time of check time of use security bugs!
744    ///
745    /// ## Example
746    /// ```
747    /// use rumtk_core::threading::thread_primitives::SafeLock;
748    /// use rumtk_core::threading::threading_functions::{new_lock, lock_read};
749    ///
750    /// let data = 5;
751    /// let lock = new_lock(data.clone());
752    /// let result = *lock_read(lock);
753    ///
754    /// assert_eq!(result, data, "Failed to access the locked data!");
755    /// ```
756    ///
757    pub fn lock_read<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockReadGuard<T> {
758        block_on_task(async move {
759            lock.read_owned().await
760        })
761    }
762
763    ///
764    /// Obtain write guard to standard spin lock such that you have a more ergonomic interface to
765    /// locked data.
766    ///
767    /// It is preferable to use [process_write_critical_section] when you must process
768    /// critical logic that is sensitive to time of check time of use security bugs!
769    ///
770    /// ## Example
771    /// ```
772    /// use rumtk_core::threading::thread_primitives::SafeLock;
773    /// use rumtk_core::threading::threading_functions::{new_lock, lock_read, lock_write};
774    ///
775    /// let data = 5;
776    /// let lock = new_lock(data.clone());
777    /// let new_data = 10;
778    ///
779    /// *lock_write(lock.clone()) = new_data;
780    ///
781    /// let result = *lock_read(lock);
782    ///
783    /// assert_eq!(result, new_data, "Failed to modify the locked data!");
784    /// ```
785    ///
786    pub fn lock_write<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockWriteGuard<T> {
787        block_on_task(async move {
788            lock.write_owned().await
789        })
790    }
791}
792
793///
794/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
795/// This means that by default, all jobs sent to the thread pool have to be async in nature.
796/// These macros make handling of these jobs at the sync/async boundary more convenient.
797///
798pub mod threading_macros {
799    use crate::threading::thread_primitives;
800    use crate::threading::threading_manager::SafeTaskArgs;
801
802    ///
803    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
804    /// will be saved to the global context so the next call to this macro will simply grab a
805    /// reference to the previously initialized runtime.
806    ///
807    /// Passing nothing will default to initializing a runtime using the default number of threads
808    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
809    ///
810    /// Passing `threads` number will yield a runtime that allocates that many threads.
811    ///
812    ///
813    /// ## Examples
814    ///
815    /// ```
816    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
817    ///     use rumtk_core::core::RUMResult;
818    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
819    ///
820    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
821    ///         let mut result = Vec::<i32>::new();
822    ///         for arg in args.read().await.iter() {
823    ///             result.push(*arg);
824    ///         }
825    ///         Ok(result)
826    ///     }
827    ///
828    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
829    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
830    ///     let result = rumtk_resolve_task!(task); // Spawn's task and waits for it to conclude.
831    /// ```
832    ///
833    /// ```
834    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
835    ///     use rumtk_core::core::RUMResult;
836    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
837    ///
838    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
839    ///         let mut result = Vec::<i32>::new();
840    ///         for arg in args.read().await.iter() {
841    ///             result.push(*arg);
842    ///         }
843    ///         Ok(result)
844    ///     }
845    ///
846    ///     let thread_count: usize = 10;
847    ///     let args = rumtk_create_task_args!(1);
848    ///     let task = rumtk_create_task!(test, args);
849    ///     let result = rumtk_resolve_task!(task);
850    /// ```
851    #[macro_export]
852    macro_rules! rumtk_init_threads {
853        ( ) => {{
854            use $crate::threading::threading_functions::{
855                get_default_system_thread_count, init_runtime,
856            };
857            init_runtime(get_default_system_thread_count())
858        }};
859        ( $threads:expr ) => {{
860            use $crate::rumtk_cache_fetch;
861            use $crate::threading::threading_functions::init_runtime;
862            init_runtime($threads)
863        }};
864    }
865
866    ///
867    /// Puts task onto the runtime queue.
868    ///
869    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
870    ///
871    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
872    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
873    ///
874    #[macro_export]
875    macro_rules! rumtk_spawn_task {
876        ( $func:expr ) => {{
877            use $crate::rumtk_init_threads;
878            let rt = rumtk_init_threads!();
879            rt.spawn($func)
880        }};
881        ( $rt:expr, $func:expr ) => {{
882            $rt.spawn($func)
883        }};
884    }
885
886    #[macro_export]
887    macro_rules! rumtk_spawn_blocking_task {
888        ( $func:expr ) => {{
889            use $crate::threading::threading_functions::spawn_blocking_sync_task;
890            spawn_blocking_sync_task($func)
891        }}
892    }
893
894    ///
895    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
896    ///
897    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
898    /// async closure without passing any arguments.
899    ///
900    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
901    /// In such a case, we pass those arguments to the call on the async closure and await on results.
902    ///
903    #[macro_export]
904    macro_rules! rumtk_wait_on_task {
905        ( $func:expr ) => {{
906            use $crate::threading::threading_functions::block_on_task;
907            block_on_task(async move {
908                $func().await
909            })
910        }};
911        ( $func:expr, $($arg_items:expr),+ ) => {{
912            use $crate::threading::threading_functions::block_on_task;
913            block_on_task(async move {
914                $func($($arg_items),+).await
915            })
916        }};
917    }
918
919    ///
920    /// This macro awaits a future.
921    ///
922    /// The arguments are a reference to the runtime (`rt) and a future.
923    ///
924    /// If there is a result, you will get the result of the future.
925    ///
926    /// ## Examples
927    ///
928    /// ```
929    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
930    ///     use rumtk_core::core::RUMResult;
931    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
932    ///
933    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
934    ///         let mut result = Vec::<i32>::new();
935    ///         for arg in args.read().await.iter() {
936    ///             result.push(*arg);
937    ///         }
938    ///         Ok(result)
939    ///     }
940    ///
941    ///     let args = rumtk_create_task_args!(1);
942    ///     let task = rumtk_create_task!(test, args);
943    ///     let result = rumtk_resolve_task!(task);
944    /// ```
945    ///
946    #[macro_export]
947    macro_rules! rumtk_resolve_task {
948        ( $future:expr ) => {{
949            use $crate::threading::threading_functions::block_on_task;
950            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
951            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
952            // is technically moved into the async closure and captured so things like mutex guards
953            // technically go out of the outer scope. As a result that expression fails to compile even
954            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
955            // into the async closure. To ensure that happens regardless of given expression, we do
956            // a variable assignment below to force the "future" macro expressions to resolve before
957            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
958            //let future = $future;
959            block_on_task(async move { $future.await })
960        }};
961    }
962
963    ///
964    /// This macro allows to resolve a `sync` closure that was executed in a safe thread.
965    /// You cannot run this macro outside the `async` context.
966    ///
967    #[macro_export]
968    macro_rules! rumtk_resolve_sync_task {
969        ( $closure:expr ) => {{
970            use $crate::threading::threading_functions::spawn_blocking_sync_task;
971            use $crate::strings::rumtk_format;
972            match spawn_blocking_sync_task($closure).await {
973                Ok(result) => result,
974                Err(e) => Err(rumtk_format!("Issue with blocking task => {}", e))
975            }
976        }};
977    }
978
979    ///
980    /// This macro creates an async body that calls the async closure and awaits it.
981    ///
982    /// ## Example
983    ///
984    /// ```
985    /// use std::sync::{Arc, RwLock};
986    /// use tokio::sync::RwLock as AsyncRwLock;
987    /// use rumtk_core::strings::RUMString;
988    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
989    ///
990    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
991    /// let expected = vec![
992    ///     RUMString::from("Hello"),
993    ///     RUMString::from("World!"),
994    ///     RUMString::from("Overcast"),
995    ///     RUMString::from("and"),
996    ///     RUMString::from("Sad"),
997    ///  ];
998    /// let locked_args = AsyncRwLock::new(expected.clone());
999    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
1000    ///
1001    ///
1002    /// ```
1003    ///
1004    #[macro_export]
1005    macro_rules! rumtk_create_task {
1006        ( $func:expr ) => {{
1007            async move {
1008                let f = $func;
1009                f().await
1010            }
1011        }};
1012        ( $func:expr, $args:expr ) => {{
1013            let f = $func;
1014            async move { f(&$args).await }
1015        }};
1016    }
1017
1018    ///
1019    /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
1020    ///
1021    /// ## Note
1022    ///
1023    /// All arguments must be of the same type
1024    ///
1025    #[macro_export]
1026    macro_rules! rumtk_create_task_args {
1027        ( ) => {{
1028            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
1029            use $crate::threading::thread_primitives::AsyncRwLock;
1030            SafeTaskArgs::new(AsyncRwLock::new(vec![]))
1031        }};
1032        ( $($args:expr),+ ) => {{
1033            use $crate::threading::threading_manager::{SafeTaskArgs};
1034            use $crate::threading::thread_primitives::AsyncRwLock;
1035            SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
1036        }};
1037    }
1038
1039    ///
1040    /// Convenience macro for packaging the task components and launching the task in one line.
1041    ///
1042    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
1043    /// number of threads at the end. This is optional. Meaning, we will default to the system's
1044    /// number of threads if that value is not specified.
1045    ///
1046    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
1047    /// variable number of arguments to pass to the task. each argument must be of the same type.
1048    /// If you wish to pass different arguments with different types, please define an abstract type
1049    /// whose underlying structure is a tuple of items and pass that instead.
1050    ///
1051    /// ## Examples
1052    ///
1053    /// ### With Default Thread Count
1054    /// ```
1055    ///     use rumtk_core::{rumtk_exec_task};
1056    ///     use rumtk_core::core::RUMResult;
1057    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1058    ///
1059    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1060    ///         let mut result = Vec::<i32>::new();
1061    ///         for arg in args.read().await.iter() {
1062    ///             result.push(*arg);
1063    ///         }
1064    ///         Ok(result)
1065    ///     }
1066    ///
1067    ///     let result = rumtk_exec_task!(test, vec![5]).unwrap();
1068    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1069    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1070    /// ```
1071    ///
1072    /// ### With Custom Thread Count
1073    /// ```
1074    ///     use rumtk_core::{rumtk_exec_task};
1075    ///     use rumtk_core::core::RUMResult;
1076    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1077    ///
1078    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1079    ///         let mut result = Vec::<i32>::new();
1080    ///         for arg in args.read().await.iter() {
1081    ///             result.push(*arg);
1082    ///         }
1083    ///         Ok(result)
1084    ///     }
1085    ///
1086    ///     let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
1087    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1088    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1089    /// ```
1090    ///
1091    /// ### With Async Function Body
1092    /// ```
1093    ///     use rumtk_core::{rumtk_exec_task};
1094    ///     use rumtk_core::core::RUMResult;
1095    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1096    ///
1097    ///     let result = rumtk_exec_task!(
1098    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
1099    ///         let mut result = Vec::<i32>::new();
1100    ///         for arg in args.read().await.iter() {
1101    ///             result.push(*arg);
1102    ///         }
1103    ///         Ok(result)
1104    ///     },
1105    ///     vec![5]).unwrap();
1106    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1107    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1108    /// ```
1109    ///
1110    /// ### With Async Function Body and No Args
1111    /// ```
1112    ///     use rumtk_core::{rumtk_exec_task};
1113    ///     use rumtk_core::core::RUMResult;
1114    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1115    ///
1116    ///     let result = rumtk_exec_task!(
1117    ///     async || -> RUMResult<Vec<i32>> {
1118    ///         let mut result = Vec::<i32>::new();
1119    ///         Ok(result)
1120    ///     }).unwrap();
1121    ///     let empty = Vec::<i32>::new();
1122    ///     assert_eq!(&result.clone(), &empty, "Results mismatch");
1123    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1124    /// ```
1125    ///
1126    /// ## Equivalent To
1127    ///
1128    /// ```no_run
1129    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
1130    ///     use rumtk_core::core::RUMResult;
1131    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1132    ///
1133    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1134    ///         let mut result = Vec::<i32>::new();
1135    ///         for arg in args.read().await.iter() {
1136    ///             result.push(*arg);
1137    ///         }
1138    ///         Ok(result)
1139    ///     }
1140    ///
1141    ///     let args = rumtk_create_task_args!(1);
1142    ///     let task = rumtk_create_task!(test, args);
1143    ///     let result = rumtk_resolve_task!(task);
1144    /// ```
1145    ///
1146    #[macro_export]
1147    macro_rules! rumtk_exec_task {
1148        ($func:expr ) => {{
1149            use $crate::{
1150                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1151            };
1152            let task = rumtk_create_task!($func);
1153            rumtk_resolve_task!(task)
1154        }};
1155        ($func:expr, $args:expr ) => {{
1156            use $crate::threading::threading_functions::get_default_system_thread_count;
1157            rumtk_exec_task!($func, $args, get_default_system_thread_count())
1158        }};
1159        ($func:expr, $args:expr , $threads:expr ) => {{
1160            use $crate::threading::thread_primitives::AsyncRwLock;
1161            use $crate::{
1162                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1163            };
1164            let args = SafeTaskArgs::new(AsyncRwLock::new($args));
1165            let task = rumtk_create_task!($func, args);
1166            rumtk_resolve_task!(task)
1167        }};
1168    }
1169
1170    ///
1171    /// Sleep a duration of time in a sync context, so no await can be call on the result.
1172    ///
1173    /// You can pass any value that can be cast to f32.
1174    ///
1175    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1176    ///
1177    /// ## Examples
1178    ///
1179    /// ```
1180    ///     use rumtk_core::rumtk_sleep;
1181    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
1182    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
1183    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
1184    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
1185    /// ```
1186    ///
1187    #[macro_export]
1188    macro_rules! rumtk_sleep {
1189        ( $dur:expr) => {{
1190            use $crate::threading::threading_functions::sleep;
1191            sleep($dur as f32)
1192        }};
1193    }
1194
1195    ///
1196    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1197    ///
1198    /// You can pass any value that can be cast to f32.
1199    ///
1200    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1201    ///
1202    /// ## Examples
1203    ///
1204    /// ```
1205    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1206    ///     use rumtk_core::core::RUMResult;
1207    ///     rumtk_exec_task!( async || -> RUMResult<()> {
1208    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
1209    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
1210    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
1211    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1212    ///             Ok(())
1213    ///         }
1214    ///     );
1215    /// ```
1216    ///
1217    #[macro_export]
1218    macro_rules! rumtk_async_sleep {
1219        ( $dur:expr) => {{
1220            use $crate::threading::threading_functions::async_sleep;
1221            async_sleep($dur as f32)
1222        }};
1223    }
1224
1225    ///
1226    ///
1227    ///
1228    #[macro_export]
1229    macro_rules! rumtk_new_task_queue {
1230        ( $worker_num:expr ) => {{
1231            use $crate::threading::threading_manager::TaskManager;
1232            TaskManager::new($worker_num);
1233        }};
1234    }
1235
1236    ///
1237    /// Creates a new safe lock to guard the given data. This interface was created to cleanup lock
1238    /// management for consumers of framework!
1239    ///
1240    /// ## Example
1241    /// ```
1242    /// use rumtk_core::{rumtk_new_lock};
1243    ///
1244    /// let data = 5;
1245    /// let lock = rumtk_new_lock!(data);
1246    /// ```
1247    ///
1248    #[macro_export]
1249    macro_rules! rumtk_new_lock {
1250        ( $data:expr ) => {{
1251            use $crate::threading::threading_functions::new_lock;
1252            new_lock($data)
1253        }};
1254    }
1255
1256    ///
1257    /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1258    /// critical section. The critical section itself is a synchronous function or closure. In this case,
1259    /// the critical section simply retrieves a value from a guarded dataset.
1260    ///
1261    /// ## Example
1262    /// ```
1263    /// use rumtk_core::core::RUMResult;
1264    /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_read};
1265    ///
1266    /// let data = 5;
1267    /// let lock = rumtk_new_lock!(data);
1268    /// let result = rumtk_critical_section_read!(
1269    ///     lock,
1270    ///     |guard| -> RUMResult<i32> {
1271    ///         let result: i32 = *guard;
1272    ///         Ok(result)
1273    ///     }
1274    /// ).expect("No errors locking!");
1275    ///
1276    /// assert_eq!(result, data, "Critical section yielded invalid result!");
1277    /// ```
1278    ///
1279    #[macro_export]
1280    macro_rules! rumtk_critical_section_read {
1281        ( $lock:expr, $function:expr ) => {{
1282            use $crate::threading::threading_functions::process_read_critical_section;
1283            process_read_critical_section($lock, $function)
1284        }};
1285    }
1286
1287    ///
1288    /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1289    /// critical section. The critical section itself is a synchronous function or closure. In this case,
1290    /// the critical section attempts to modify the internal state of a guarded dataset.
1291    ///
1292    /// ## Example
1293    /// ```
1294    /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_write};
1295    ///
1296    /// let data = 5;
1297    /// let new_data = 10;
1298    /// let lock = rumtk_new_lock!(data);
1299    /// let result = rumtk_critical_section_write!(
1300    ///     lock,
1301    ///     |mut guard| {
1302    ///         *guard = new_data;
1303    ///     }
1304    /// );
1305    ///
1306    /// assert_eq!(result, (), "Critical section yielded invalid result!");
1307    /// ```
1308    ///
1309    #[macro_export]
1310    macro_rules! rumtk_critical_section_write {
1311        ( $lock:expr, $function:expr ) => {{
1312            use $crate::threading::threading_functions::process_write_critical_section;
1313            process_write_critical_section($lock, $function)
1314        }};
1315    }
1316
1317    ///
1318    /// Framework interface to obtain a `read` guard to the locked data.
1319    /// To access the internal data, you will need to dereference the guard (`*guard`).
1320    ///
1321    /// It is preferred to use [rumtk_critical_section_read] if you need to avoid `time of check time
1322    /// of use` security bugs.
1323    ///
1324    /// ## Example
1325    /// ```
1326    /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read};
1327    ///
1328    /// let data = 5;
1329    /// let lock = rumtk_new_lock!(data.clone());
1330    /// let result = *rumtk_lock_read!(lock);
1331    ///
1332    /// assert_eq!(result, data, "Failed to access locked data.");
1333    /// ```
1334    ///
1335    #[macro_export]
1336    macro_rules! rumtk_lock_read {
1337        ( $lock:expr ) => {{
1338            use $crate::threading::threading_functions::lock_read;
1339            lock_read($lock.clone())
1340        }};
1341    }
1342
1343    ///
1344    /// Framework interface to obtain a `write` guard to the locked data.
1345    /// To access the internal data, you will need to dereference the guard (`*guard`).
1346    ///
1347    /// It is preferred to use [rumtk_critical_section_write] if you need to avoid `time of check time
1348    /// of use` security bugs.
1349    ///
1350    /// ## Example
1351    /// ```
1352    /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read, rumtk_lock_write};
1353    ///
1354    /// let data = 5;
1355    /// let lock = rumtk_new_lock!(data.clone());
1356    /// let new_data = 10;
1357    ///
1358    /// *rumtk_lock_write!(lock) = new_data;
1359    /// let result = *rumtk_lock_read!(lock);
1360    ///
1361    /// assert_eq!(result, new_data, "Failed to modify locked data.");
1362    /// ```
1363    ///
1364    #[macro_export]
1365    macro_rules! rumtk_lock_write {
1366        ( $lock:expr ) => {{
1367            use $crate::threading::threading_functions::lock_write;
1368            lock_write($lock.clone())
1369        }};
1370    }
1371}