Skip to main content

rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19 */
20
21///
22/// This module provides all the primitives needed to build a multithreaded application.
23///
24pub mod thread_primitives {
25    pub use std::sync::Mutex as SyncMutex;
26    pub use std::sync::MutexGuard as SyncMutexGuard;
27    pub use std::sync::RwLock as SyncRwLock;
28    use std::sync::{Arc, OnceLock};
29    pub use tokio::io;
30    pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
31    use tokio::runtime::Runtime as TokioRuntime;
32    pub use tokio::sync::{
33        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard,
34        OwnedRwLockReadGuard as AsyncOwnedRwLockMappedReadGuard,
35        OwnedRwLockReadGuard as AsyncOwnedRwLockReadGuard,
36        OwnedRwLockWriteGuard as AsyncOwnedRwLockWriteGuard, RwLock as AsyncRwLock,
37        RwLockMappedWriteGuard as AsyncRwLockMappedWriteGuard,
38        RwLockReadGuard as AsyncRwLockReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
39    };
40
41    /**************************** Types ***************************************/
42    pub type SafeLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
43    pub type MappedLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
44    pub type SafeLockWriteGuard<T> = AsyncOwnedRwLockWriteGuard<T>;
45    pub type SafeLock<T> = Arc<AsyncRwLock<T>>;
46    pub type SafeTokioRuntime = OnceLock<TokioRuntime>;
47}
48
49pub mod threading_manager {
50    use crate::core::{RUMResult, RUMVec};
51    use crate::strings::rumtk_format;
52    use crate::threading::thread_primitives::SafeLock;
53    use crate::threading::threading_functions::{async_sleep, sleep};
54    use crate::types::{RUMHashMap, RUMID};
55    use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
56    use std::future::Future;
57    use std::sync::Arc;
58    pub use std::sync::RwLock as SyncRwLock;
59    use tokio::task::JoinHandle;
60
61    const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
62    const DEFAULT_TASK_CAPACITY: usize = 100;
63
64    pub type AsyncHandle<T> = JoinHandle<T>;
65    pub type TaskItems<T> = RUMVec<T>;
66    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
67    pub type TaskArgs<T> = TaskItems<T>;
68    /// Function signature defining the interface of task processing logic.
69    pub type SafeTaskArgs<T> = SafeLock<TaskItems<T>>;
70    pub type AsyncTaskHandle<R> = AsyncHandle<TaskResult<R>>;
71    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
72    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
73    pub type TaskID = RUMID;
74
75    #[derive(Debug, Clone, Default)]
76    pub struct Task<R> {
77        pub id: TaskID,
78        pub finished: bool,
79        pub result: Option<R>,
80    }
81
82    pub type SafeTask<R> = Arc<Task<R>>;
83    type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
84    pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
85    pub type SafeAsyncTaskTable<R> = SafeLock<TaskTable<R>>;
86    pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
87    pub type TaskBatch = RUMVec<TaskID>;
88    /// Type to use to define how task results are expected to be returned.
89    pub type TaskResult<R> = RUMResult<SafeTask<R>>;
90    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
91
92    ///
93    /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
94    /// gives the multithreading, asynchronous superpowers to synchronous logic.
95    ///
96    /// ## Example Usage
97    ///
98    /// ```
99    /// use std::sync::{Arc};
100    /// use tokio::sync::RwLock as AsyncRwLock;
101    /// use rumtk_core::core::RUMResult;
102    /// use rumtk_core::strings::RUMString;
103    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
104    /// use rumtk_core::{rumtk_create_task, };
105    ///
106    /// let expected = vec![
107    ///     RUMString::from("Hello"),
108    ///     RUMString::from("World!"),
109    ///     RUMString::from("Overcast"),
110    ///     RUMString::from("and"),
111    ///     RUMString::from("Sad"),
112    ///  ];
113    ///
114    /// type TestResult = RUMResult<Vec<RUMString>>;
115    /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
116    ///
117    /// let locked_args = AsyncRwLock::new(expected.clone());
118    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
119    /// let processor = rumtk_create_task!(
120    ///     async |args: &SafeTaskArgs<RUMString>| -> TestResult {
121    ///         let owned_args = Arc::clone(args);
122    ///         let locked_args = owned_args.read().await;
123    ///         let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
124    ///
125    ///         for arg in locked_args.iter() {
126    ///             results.push(RUMString::new(arg));
127    ///         }
128    ///
129    ///         Ok(results)
130    ///     },
131    ///     task_args
132    /// );
133    ///
134    /// queue.add_task::<_>(processor);
135    /// let results = queue.wait();
136    ///
137    /// let mut result_data = Vec::<RUMString>::with_capacity(5);
138    /// for r in results {
139    ///     for v in r.unwrap().result.clone().unwrap().iter() {
140    ///         for value in v.iter() {
141    ///             result_data.push(value.clone());
142    ///         }
143    ///     }
144    ///  }
145    ///
146    /// assert_eq!(result_data, expected, "Results do not match expected!");
147    ///
148    /// ```
149    ///
150    #[derive(Debug, Clone, Default)]
151    pub struct TaskManager<R> {
152        tasks: SafeSyncTaskTable<R>,
153        workers: usize,
154    }
155
156    impl<R> TaskManager<R>
157    where
158        R: Sync + Send + Clone + 'static,
159    {
160        ///
161        /// This method creates a [`TaskManager`] instance using sensible defaults.
162        ///
163        /// The `threads` field is computed from the number of cores present in system.
164        ///
165        pub fn default() -> RUMResult<TaskManager<R>> {
166            Self::new(&threading::threading_functions::get_default_system_thread_count())
167        }
168
169        ///
170        /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
171        /// Expects you to provide the count of threads to spawn and the microtask queue size
172        /// allocated by each thread.
173        ///
174        /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
175        /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
176        ///
177        pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
178            let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
179                DEFAULT_TASK_CAPACITY,
180            )));
181            Ok(TaskManager::<R> {
182                tasks,
183                workers: worker_num.to_owned(),
184            })
185        }
186
187        ///
188        /// Add a task to the processing queue. The idea is that you can queue a processor function
189        /// and list of args that will be picked up by one of the threads for processing.
190        ///
191        /// This is the async counterpart
192        ///
193        pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
194        where
195            F: Future<Output = R> + Send + Sync + 'static,
196            F::Output: Send + 'static,
197        {
198            let id = TaskID::new_v4();
199            Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
200        }
201
202        ///
203        /// See [`Self::add_task_async`]
204        ///
205        /// Unlike `add_task`, this method does not block which is key to avoiding panicking
206        /// the tokio runtim if trying to add task to queue from a normal function called from an
207        /// async environment.
208        ///
209        /// ## Example
210        ///
211        /// ```
212        /// use rumtk_core::threading::threading_manager::{TaskManager};
213        /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
214        /// use std::sync::{Arc};
215        /// use tokio::sync::Mutex;
216        ///
217        /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
218        ///
219        /// async fn called_fn() -> usize {
220        ///     5
221        /// }
222        ///
223        /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
224        ///     manager.spawn_task(called_fn());
225        ///     1
226        /// }
227        ///
228        /// async fn call_sync_fn(mut manager: JobManager) -> usize {
229        ///     let mut owned = manager.lock().await;
230        ///     push_job(&mut owned)
231        /// }
232        ///
233        /// let workers = 5;
234        /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
235        ///
236        /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
237        ///
238        /// let result_raw = manager.blocking_lock().wait();
239        ///
240        /// ```
241        ///
242        pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
243        where
244            F: Future<Output = R> + Send + Sync + 'static,
245            F::Output: Send + Sized + 'static,
246        {
247            let id = TaskID::new_v4();
248            let rt = rumtk_init_threads!(self.workers);
249            rumtk_spawn_task!(
250                rt,
251                Self::_add_task_async(id.clone(), self.tasks.clone(), task)
252            );
253            Ok(id)
254        }
255
256        ///
257        /// See [add_task_async](Self::add_task_async)
258        ///
259        pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
260        where
261            F: Future<Output = R> + Send + Sync + 'static,
262            F::Output: Send + Sized + 'static,
263        {
264            let id = TaskID::new_v4();
265            let tasks = self.tasks.clone();
266            Ok(rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task)))
267        }
268
269        async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
270        where
271            F: Future<Output = R> + Send + Sync + 'static,
272            F::Output: Send + Sized + 'static,
273        {
274            let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
275                id: id.clone(),
276                finished: false,
277                result: None,
278            }));
279            tasks.write().unwrap().insert(id.clone(), safe_task.clone());
280
281            let task_wrapper = async move || {
282                // Run the task
283                let result = task.await;
284
285                // Cleanup task
286                let mut lock = safe_task.write().unwrap();
287                lock.result = Some(result);
288                lock.finished = true;
289            };
290
291            tokio::spawn(task_wrapper());
292
293            id
294        }
295
296        ///
297        /// See [wait_async](Self::wait_async)
298        ///
299        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
300        /// this function happens to be called from the async context.
301        ///
302        pub fn wait(&mut self) -> TaskResults<R> {
303            let task_batch = self
304                .tasks
305                .read()
306                .unwrap()
307                .keys()
308                .cloned()
309                .collect::<Vec<_>>();
310            self.wait_on_batch(&task_batch)
311        }
312
313        ///
314        /// See [wait_on_batch_async](Self::wait_on_batch_async)
315        ///
316        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
317        /// this function happens to be called from the async context.
318        ///
319        pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
320            let mut results = TaskResults::<R>::default();
321            for task in tasks {
322                results.push(self.wait_on(task));
323            }
324            results
325        }
326
327        ///
328        /// See [wait_on_async](Self::wait_on_async)
329        ///
330        /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
331        /// this function happens to be called from the async context.
332        ///
333        pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
334            let task = match self.tasks.write().unwrap().remove(task_id) {
335                Some(task) => task.clone(),
336                None => return Err(rumtk_format!("No task with id {}", task_id)),
337            };
338
339            while !task.read().unwrap().finished {
340                sleep(DEFAULT_SLEEP_DURATION);
341            }
342
343            let x = Ok(Arc::new(task.write().unwrap().clone()));
344            x
345        }
346
347        ///
348        /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
349        ///
350        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
351        ///
352        /// Upon completion,
353        ///
354        /// 2. Return the result ([TaskResults<R>](TaskResults)).
355        ///
356        /// This operation consumes the task.
357        ///
358        /// ### Note:
359        /// ```text
360        ///     Results returned here are not guaranteed to be in the same order as the order in which
361        ///     the tasks were queued for work. You will need to pass a type as T that automatically
362        ///     tracks its own id or has a way for you to resort results.
363        /// ```
364        pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
365            let task = match self.tasks.write().unwrap().remove(task_id) {
366                Some(task) => task.clone(),
367                None => return Err(rumtk_format!("No task with id {}", task_id)),
368            };
369
370            while !task.read().unwrap().finished {
371                async_sleep(DEFAULT_SLEEP_DURATION).await;
372            }
373
374            let x = Ok(Arc::new(task.write().unwrap().clone()));
375            x
376        }
377
378        ///
379        /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
380        ///
381        /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
382        ///
383        /// Upon completion,
384        ///
385        /// 1. We collect the results generated (if any).
386        /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
387        ///
388        /// ### Note:
389        /// ```text
390        ///     Results returned here are not guaranteed to be in the same order as the order in which
391        ///     the tasks were queued for work. You will need to pass a type as T that automatically
392        ///     tracks its own id or has a way for you to resort results.
393        /// ```
394        pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
395            let mut results = TaskResults::<R>::default();
396            for task in tasks {
397                results.push(self.wait_on_async(task).await);
398            }
399            results
400        }
401
402        ///
403        /// This method waits until all queued tasks have been processed from the main queue.
404        ///
405        /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
406        ///
407        /// Upon completion,
408        ///
409        /// 1. We collect the results generated (if any).
410        /// 2. We reset the main task and result internal queue states.
411        /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
412        ///
413        /// This operation consumes all the tasks.
414        ///
415        /// ### Note:
416        /// ```text
417        ///     Results returned here are not guaranteed to be in the same order as the order in which
418        ///     the tasks were queued for work. You will need to pass a type as T that automatically
419        ///     tracks its own id or has a way for you to resort results.
420        /// ```
421        pub async fn wait_async(&mut self) -> TaskResults<R> {
422            let task_batch = self
423                .tasks
424                .read()
425                .unwrap()
426                .keys()
427                .cloned()
428                .collect::<Vec<_>>();
429            self.wait_on_batch_async(&task_batch).await
430        }
431
432        ///
433        /// Check if all work has been completed from the task queue.
434        ///
435        /// ## Examples
436        ///
437        /// ### Sync Usage
438        ///
439        ///```
440        /// use rumtk_core::threading::threading_manager::TaskManager;
441        ///
442        /// let manager = TaskManager::<usize>::new(&4).unwrap();
443        ///
444        /// let all_done = manager.is_all_completed();
445        ///
446        /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
447        ///
448        /// ```
449        ///
450        pub fn is_all_completed(&self) -> bool {
451            self._is_all_completed_async()
452        }
453
454        pub async fn is_all_completed_async(&self) -> bool {
455            self._is_all_completed_async()
456        }
457
458        fn _is_all_completed_async(&self) -> bool {
459            for (_, task) in self.tasks.read().unwrap().iter() {
460                if !task.read().unwrap().finished {
461                    return false;
462                }
463            }
464
465            true
466        }
467
468        ///
469        /// Check if a task completed
470        ///
471        pub fn is_finished(&self, id: &TaskID) -> bool {
472            match self.tasks.read().unwrap().get(id) {
473                Some(t) => t.read().unwrap().finished,
474                None => false,
475            }
476        }
477
478        pub async fn is_finished_async(&self, id: &TaskID) -> bool {
479            match self.tasks.read().unwrap().get(id) {
480                Some(task) => task.read().unwrap().finished,
481                None => true,
482            }
483        }
484
485        ///
486        /// Alias for [wait](TaskManager::wait).
487        ///
488        fn gather(&mut self) -> TaskResults<R> {
489            self.wait()
490        }
491    }
492}
493
494///
495/// This module contains a few helper.
496///
497/// For example, you can find a function for determining number of threads available in system.
498/// The sleep family of functions are also here.
499///
500pub mod threading_functions {
501    use crate::core::RUMResult;
502    use crate::net::tcp::{AsyncOwnedRwLockReadGuard, AsyncOwnedRwLockWriteGuard, SafeLockReadGuard, SafeLockWriteGuard, SafeTokioRuntime};
503    use crate::threading::thread_primitives::{AsyncRwLock, SafeLock};
504    use num_cpus;
505    use std::future::Future;
506    use std::sync::Arc;
507    use std::thread::{available_parallelism, sleep as std_sleep};
508    use std::time::Duration;
509    use tokio::runtime::Runtime;
510    use tokio::time::sleep as tokio_sleep;
511    /**************************** Globals **************************************/
512    static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
513
514    pub const NANOS_PER_SEC: u64 = 1000000000;
515    pub const MILLIS_PER_SEC: u64 = 1000;
516    pub const MICROS_PER_SEC: u64 = 1000000;
517    const DEFAULT_SLEEP_DURATION: f32 = 0.001;
518    /**************************** Helpers **************************************/
519    pub fn init_runtime<'a>(workers: usize) -> &'a Runtime {
520        unsafe {
521            let runtime = DEFAULT_RUNTIME.get_or_init(|| {
522                let mut builder = tokio::runtime::Builder::new_multi_thread();
523                builder.worker_threads(workers);
524                builder.enable_all();
525                match builder.build() {
526                    Ok(handle) => handle,
527                    Err(e) => panic!(
528                        "Unable to initialize threading tokio runtime because {}!",
529                        &e
530                    ),
531                }
532            });
533            runtime
534        }
535    }
536
537    pub fn get_default_system_thread_count() -> usize {
538        let cpus: usize = num_cpus::get();
539        let parallelism = match available_parallelism() {
540            Ok(n) => n.get(),
541            Err(_) => 0,
542        };
543
544        if parallelism >= cpus {
545            parallelism
546        } else {
547            cpus
548        }
549    }
550
551    pub fn sleep(s: f32) {
552        let ns = s * NANOS_PER_SEC as f32;
553        let rounded_ns = ns.round() as u64;
554        let duration = Duration::from_nanos(rounded_ns);
555        std_sleep(duration);
556    }
557
558    pub async fn async_sleep(s: f32) {
559        let ns = s * NANOS_PER_SEC as f32;
560        let rounded_ns = ns.round() as u64;
561        let duration = Duration::from_nanos(rounded_ns);
562        tokio_sleep(duration).await;
563    }
564
565    ///
566    /// Given a closure task, push it onto the current `tokio` runtime for execution.
567    /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
568    /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
569    /// result.
570    ///
571    /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
572    ///
573    /// ## Example
574    ///
575    /// ```
576    /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
577    ///
578    /// const Hello: &str = "World!";
579    ///
580    /// init_runtime(5);
581    ///
582    /// let result = block_on_task(async {
583    ///     Hello
584    /// });
585    ///
586    /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
587    /// ```
588    ///
589    pub fn block_on_task<R, F>(task: F) -> R
590    where
591        F: Future<Output = R> + Send + 'static,
592        F::Output: Send + 'static,
593    {
594        let rt = init_runtime(get_default_system_thread_count());
595        // You need to wrap our call to block_on with a call to tokio::task::block_in_place to force 
596        // cleanup of async executor and therefore avoid panics from the tokio runtime!
597        // Per Tokio's documentation, spawn_blocking would be better since it moves the task to an 
598        // executor meant for blocking tasks instead of moving tasks out of the current thread and 
599        // converting the thread into a clocking executor. The reason we don't do that is because 
600        // the call to this function expects to block the current thread until completion and then 
601        // return the result. If there's an issue with IO, revisit this function.
602        //
603        // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
604        // https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.block_on
605        // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
606        // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.spawn_blocking
607        tokio::task::block_in_place(move || {
608            rt.block_on(task)
609        })
610    }
611
612    pub fn new_lock<T>(data: T) -> SafeLock<T> {
613        Arc::new(AsyncRwLock::new(data))
614    }
615
616    ///
617    /// This function gives you read access to underlying structure.
618    ///
619    /// Helper function for executing microtask immediately after locking the spin lock. This function
620    /// should be used in situations in which you want to minimize the risk of Time of Check Time of
621    /// Use security bugs.
622    ///
623    /// ## Example
624    /// ```
625    /// use rumtk_core::core::RUMResult;
626    /// use rumtk_core::threading::thread_primitives::SafeLock;
627    /// use rumtk_core::threading::threading_functions::{new_lock, process_read_critical_section};
628    ///
629    /// let data = 5;
630    /// let lock = new_lock(data.clone());
631    /// let result = process_read_critical_section(lock, |guard| -> RUMResult<i32> {
632    ///     Ok(*guard)
633    /// }).unwrap();
634    ///
635    /// assert_eq!(result, data, "Failed to execute critical section through which we retrieve the locked data!");
636    /// ```
637    ///
638    pub fn process_read_critical_section<T, R, F>(
639        lock: SafeLock<T>,
640        critical_section: F,
641    ) -> R
642    where 
643        F: Fn(SafeLockReadGuard<T>) -> R, T: Send + Sync + 'static,
644    {
645        tokio::task::block_in_place(move || {
646            let read_guard = lock_read(lock);
647            critical_section(read_guard)
648        })
649    }
650
651    ///
652    /// This function gives you write access to underlying structure.
653    ///
654    /// Helper function for executing microtask immediately after locking the spin lock. This function
655    /// should be used in situations in which you want to minimize the risk of Time of Check Time of
656    /// Use security bugs.
657    ///
658    /// ## Example
659    /// ```
660    /// use rumtk_core::core::RUMResult;
661    /// use rumtk_core::threading::thread_primitives::SafeLock;
662    /// use rumtk_core::threading::threading_functions::{new_lock, process_write_critical_section};
663    ///
664    /// let data = 5;
665    /// let lock = new_lock(data.clone());
666    /// let new_data = 10;
667    /// let result = process_write_critical_section(lock, |mut guard| -> RUMResult<i32> {
668    ///     *guard = new_data;
669    ///     Ok(*guard)
670    /// }).unwrap();
671    ///
672    /// assert_eq!(result, new_data, "Failed to execute critical section through which we modify the locked data!");
673    /// ```
674    ///
675    pub fn process_write_critical_section<T, R, F>(
676        lock: SafeLock<T>,
677        critical_section: F,
678    ) -> R
679    where
680        F: Fn(SafeLockWriteGuard<T>) -> R, T: Send + Sync + 'static,
681    {
682        tokio::task::block_in_place(move || {
683            let write_guard = lock_write(lock);
684            critical_section(write_guard)
685        })
686    }
687
688    ///
689    /// Obtain read guard to standard spin lock such that you have a more ergonomic interface to
690    /// locked data.
691    ///
692    /// It is preferable to use [process_read_critical_section] when you must process
693    /// critical logic that is sensitive to time of check time of use security bugs!
694    ///
695    /// ## Example
696    /// ```
697    /// use rumtk_core::threading::thread_primitives::SafeLock;
698    /// use rumtk_core::threading::threading_functions::{new_lock, lock_read};
699    ///
700    /// let data = 5;
701    /// let lock = new_lock(data.clone());
702    /// let result = *lock_read(lock);
703    ///
704    /// assert_eq!(result, data, "Failed to access the locked data!");
705    /// ```
706    ///
707    pub fn lock_read<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockReadGuard<T> {
708        block_on_task(async move {
709            lock.read_owned().await
710        })
711    }
712
713    ///
714    /// Obtain write guard to standard spin lock such that you have a more ergonomic interface to
715    /// locked data.
716    ///
717    /// It is preferable to use [process_write_critical_section] when you must process
718    /// critical logic that is sensitive to time of check time of use security bugs!
719    ///
720    /// ## Example
721    /// ```
722    /// use rumtk_core::threading::thread_primitives::SafeLock;
723    /// use rumtk_core::threading::threading_functions::{new_lock, lock_read, lock_write};
724    ///
725    /// let data = 5;
726    /// let lock = new_lock(data.clone());
727    /// let new_data = 10;
728    ///
729    /// *lock_write(lock.clone()) = new_data;
730    ///
731    /// let result = *lock_read(lock);
732    ///
733    /// assert_eq!(result, new_data, "Failed to modify the locked data!");
734    /// ```
735    ///
736    pub fn lock_write<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockWriteGuard<T> {
737        block_on_task(async move {
738            lock.write_owned().await
739        })
740    }
741}
742
743///
744/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
745/// This means that by default, all jobs sent to the thread pool have to be async in nature.
746/// These macros make handling of these jobs at the sync/async boundary more convenient.
747///
748pub mod threading_macros {
749    use crate::threading::thread_primitives;
750    use crate::threading::threading_manager::SafeTaskArgs;
751
752    ///
753    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
754    /// will be saved to the global context so the next call to this macro will simply grab a
755    /// reference to the previously initialized runtime.
756    ///
757    /// Passing nothing will default to initializing a runtime using the default number of threads
758    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
759    ///
760    /// Passing `threads` number will yield a runtime that allocates that many threads.
761    ///
762    ///
763    /// ## Examples
764    ///
765    /// ```
766    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
767    ///     use rumtk_core::core::RUMResult;
768    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
769    ///
770    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
771    ///         let mut result = Vec::<i32>::new();
772    ///         for arg in args.read().await.iter() {
773    ///             result.push(*arg);
774    ///         }
775    ///         Ok(result)
776    ///     }
777    ///
778    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
779    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
780    ///     let result = rumtk_resolve_task!(task); // Spawn's task and waits for it to conclude.
781    /// ```
782    ///
783    /// ```
784    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
785    ///     use rumtk_core::core::RUMResult;
786    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
787    ///
788    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
789    ///         let mut result = Vec::<i32>::new();
790    ///         for arg in args.read().await.iter() {
791    ///             result.push(*arg);
792    ///         }
793    ///         Ok(result)
794    ///     }
795    ///
796    ///     let thread_count: usize = 10;
797    ///     let args = rumtk_create_task_args!(1);
798    ///     let task = rumtk_create_task!(test, args);
799    ///     let result = rumtk_resolve_task!(task);
800    /// ```
801    #[macro_export]
802    macro_rules! rumtk_init_threads {
803        ( ) => {{
804            use $crate::threading::threading_functions::{
805                get_default_system_thread_count, init_runtime,
806            };
807            init_runtime(get_default_system_thread_count())
808        }};
809        ( $threads:expr ) => {{
810            use $crate::rumtk_cache_fetch;
811            use $crate::threading::threading_functions::init_runtime;
812            init_runtime($threads)
813        }};
814    }
815
816    ///
817    /// Puts task onto the runtime queue.
818    ///
819    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
820    ///
821    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
822    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
823    ///
824    #[macro_export]
825    macro_rules! rumtk_spawn_task {
826        ( $func:expr ) => {{
827            use $crate::rumtk_init_threads;
828            let rt = rumtk_init_threads!().expect("Runtime is not initialized!");
829            rt.spawn($func)
830        }};
831        ( $rt:expr, $func:expr ) => {{
832            $rt.spawn($func)
833        }};
834    }
835
836    ///
837    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
838    ///
839    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
840    /// async closure without passing any arguments.
841    ///
842    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
843    /// In such a case, we pass those arguments to the call on the async closure and await on results.
844    ///
845    #[macro_export]
846    macro_rules! rumtk_wait_on_task {
847        ( $func:expr ) => {{
848            use $crate::threading::threading_functions::block_on_task;
849            block_on_task(async move {
850                $func().await
851            })
852        }};
853        ( $func:expr, $($arg_items:expr),+ ) => {{
854            use $crate::threading::threading_functions::block_on_task;
855            block_on_task(async move {
856                $func($($arg_items),+).await
857            })
858        }};
859    }
860
861    ///
862    /// This macro awaits a future.
863    ///
864    /// The arguments are a reference to the runtime (`rt) and a future.
865    ///
866    /// If there is a result, you will get the result of the future.
867    ///
868    /// ## Examples
869    ///
870    /// ```
871    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
872    ///     use rumtk_core::core::RUMResult;
873    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
874    ///
875    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
876    ///         let mut result = Vec::<i32>::new();
877    ///         for arg in args.read().await.iter() {
878    ///             result.push(*arg);
879    ///         }
880    ///         Ok(result)
881    ///     }
882    ///
883    ///     let args = rumtk_create_task_args!(1);
884    ///     let task = rumtk_create_task!(test, args);
885    ///     let result = rumtk_resolve_task!(task);
886    /// ```
887    ///
888    #[macro_export]
889    macro_rules! rumtk_resolve_task {
890        ( $future:expr ) => {{
891            use $crate::threading::threading_functions::block_on_task;
892            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
893            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
894            // is technically moved into the async closure and captured so things like mutex guards
895            // technically go out of the outer scope. As a result that expression fails to compile even
896            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
897            // into the async closure. To ensure that happens regardless of given expression, we do
898            // a variable assignment below to force the "future" macro expressions to resolve before
899            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
900            //let future = $future;
901            block_on_task(async move { $future.await })
902        }};
903    }
904
905    #[macro_export]
906    macro_rules! rumtk_resolve_task_from_async {
907        ( $rt:expr, $future:expr ) => {{
908            let handle = $rt.spawn_blocking(async move { future.await })
909        }};
910    }
911
912    ///
913    /// This macro creates an async body that calls the async closure and awaits it.
914    ///
915    /// ## Example
916    ///
917    /// ```
918    /// use std::sync::{Arc, RwLock};
919    /// use tokio::sync::RwLock as AsyncRwLock;
920    /// use rumtk_core::strings::RUMString;
921    /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
922    ///
923    /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
924    /// let expected = vec![
925    ///     RUMString::from("Hello"),
926    ///     RUMString::from("World!"),
927    ///     RUMString::from("Overcast"),
928    ///     RUMString::from("and"),
929    ///     RUMString::from("Sad"),
930    ///  ];
931    /// let locked_args = AsyncRwLock::new(expected.clone());
932    /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
933    ///
934    ///
935    /// ```
936    ///
937    #[macro_export]
938    macro_rules! rumtk_create_task {
939        ( $func:expr ) => {{
940            async move {
941                let f = $func;
942                f().await
943            }
944        }};
945        ( $func:expr, $args:expr ) => {{
946            let f = $func;
947            async move { f(&$args).await }
948        }};
949    }
950
951    ///
952    /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
953    ///
954    /// ## Note
955    ///
956    /// All arguments must be of the same type
957    ///
958    #[macro_export]
959    macro_rules! rumtk_create_task_args {
960        ( ) => {{
961            use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
962            use $crate::threading::thread_primitives::AsyncRwLock;
963            SafeTaskArgs::new(AsyncRwLock::new(vec![]))
964        }};
965        ( $($args:expr),+ ) => {{
966            use $crate::threading::threading_manager::{SafeTaskArgs};
967            use $crate::threading::thread_primitives::AsyncRwLock;
968            SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
969        }};
970    }
971
972    ///
973    /// Convenience macro for packaging the task components and launching the task in one line.
974    ///
975    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
976    /// number of threads at the end. This is optional. Meaning, we will default to the system's
977    /// number of threads if that value is not specified.
978    ///
979    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
980    /// variable number of arguments to pass to the task. each argument must be of the same type.
981    /// If you wish to pass different arguments with different types, please define an abstract type
982    /// whose underlying structure is a tuple of items and pass that instead.
983    ///
984    /// ## Examples
985    ///
986    /// ### With Default Thread Count
987    /// ```
988    ///     use rumtk_core::{rumtk_exec_task};
989    ///     use rumtk_core::core::RUMResult;
990    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
991    ///
992    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
993    ///         let mut result = Vec::<i32>::new();
994    ///         for arg in args.read().await.iter() {
995    ///             result.push(*arg);
996    ///         }
997    ///         Ok(result)
998    ///     }
999    ///
1000    ///     let result = rumtk_exec_task!(test, vec![5]).unwrap();
1001    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1002    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1003    /// ```
1004    ///
1005    /// ### With Custom Thread Count
1006    /// ```
1007    ///     use rumtk_core::{rumtk_exec_task};
1008    ///     use rumtk_core::core::RUMResult;
1009    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1010    ///
1011    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1012    ///         let mut result = Vec::<i32>::new();
1013    ///         for arg in args.read().await.iter() {
1014    ///             result.push(*arg);
1015    ///         }
1016    ///         Ok(result)
1017    ///     }
1018    ///
1019    ///     let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
1020    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1021    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1022    /// ```
1023    ///
1024    /// ### With Async Function Body
1025    /// ```
1026    ///     use rumtk_core::{rumtk_exec_task};
1027    ///     use rumtk_core::core::RUMResult;
1028    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1029    ///
1030    ///     let result = rumtk_exec_task!(
1031    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
1032    ///         let mut result = Vec::<i32>::new();
1033    ///         for arg in args.read().await.iter() {
1034    ///             result.push(*arg);
1035    ///         }
1036    ///         Ok(result)
1037    ///     },
1038    ///     vec![5]).unwrap();
1039    ///     assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1040    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1041    /// ```
1042    ///
1043    /// ### With Async Function Body and No Args
1044    /// ```
1045    ///     use rumtk_core::{rumtk_exec_task};
1046    ///     use rumtk_core::core::RUMResult;
1047    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1048    ///
1049    ///     let result = rumtk_exec_task!(
1050    ///     async || -> RUMResult<Vec<i32>> {
1051    ///         let mut result = Vec::<i32>::new();
1052    ///         Ok(result)
1053    ///     }).unwrap();
1054    ///     let empty = Vec::<i32>::new();
1055    ///     assert_eq!(&result.clone(), &empty, "Results mismatch");
1056    ///     assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1057    /// ```
1058    ///
1059    /// ## Equivalent To
1060    ///
1061    /// ```no_run
1062    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
1063    ///     use rumtk_core::core::RUMResult;
1064    ///     use rumtk_core::threading::threading_manager::SafeTaskArgs;
1065    ///
1066    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1067    ///         let mut result = Vec::<i32>::new();
1068    ///         for arg in args.read().await.iter() {
1069    ///             result.push(*arg);
1070    ///         }
1071    ///         Ok(result)
1072    ///     }
1073    ///
1074    ///     let args = rumtk_create_task_args!(1);
1075    ///     let task = rumtk_create_task!(test, args);
1076    ///     let result = rumtk_resolve_task!(task);
1077    /// ```
1078    ///
1079    #[macro_export]
1080    macro_rules! rumtk_exec_task {
1081        ($func:expr ) => {{
1082            use $crate::{
1083                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1084            };
1085            let task = rumtk_create_task!($func);
1086            rumtk_resolve_task!(task)
1087        }};
1088        ($func:expr, $args:expr ) => {{
1089            use $crate::threading::threading_functions::get_default_system_thread_count;
1090            rumtk_exec_task!($func, $args, get_default_system_thread_count())
1091        }};
1092        ($func:expr, $args:expr , $threads:expr ) => {{
1093            use $crate::threading::thread_primitives::AsyncRwLock;
1094            use $crate::{
1095                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1096            };
1097            let args = SafeTaskArgs::new(AsyncRwLock::new($args));
1098            let task = rumtk_create_task!($func, args);
1099            rumtk_resolve_task!(task)
1100        }};
1101    }
1102
1103    ///
1104    /// Sleep a duration of time in a sync context, so no await can be call on the result.
1105    ///
1106    /// You can pass any value that can be cast to f32.
1107    ///
1108    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1109    ///
1110    /// ## Examples
1111    ///
1112    /// ```
1113    ///     use rumtk_core::rumtk_sleep;
1114    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
1115    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
1116    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
1117    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
1118    /// ```
1119    ///
1120    #[macro_export]
1121    macro_rules! rumtk_sleep {
1122        ( $dur:expr) => {{
1123            use $crate::threading::threading_functions::sleep;
1124            sleep($dur as f32)
1125        }};
1126    }
1127
1128    ///
1129    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1130    ///
1131    /// You can pass any value that can be cast to f32.
1132    ///
1133    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1134    ///
1135    /// ## Examples
1136    ///
1137    /// ```
1138    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1139    ///     use rumtk_core::core::RUMResult;
1140    ///     rumtk_exec_task!( async || -> RUMResult<()> {
1141    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
1142    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
1143    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
1144    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1145    ///             Ok(())
1146    ///         }
1147    ///     );
1148    /// ```
1149    ///
1150    #[macro_export]
1151    macro_rules! rumtk_async_sleep {
1152        ( $dur:expr) => {{
1153            use $crate::threading::threading_functions::async_sleep;
1154            async_sleep($dur as f32)
1155        }};
1156    }
1157
1158    ///
1159    ///
1160    ///
1161    #[macro_export]
1162    macro_rules! rumtk_new_task_queue {
1163        ( $worker_num:expr ) => {{
1164            use $crate::threading::threading_manager::TaskManager;
1165            TaskManager::new($worker_num);
1166        }};
1167    }
1168
1169    ///
1170    /// Creates a new safe lock to guard the given data. This interface was created to cleanup lock
1171    /// management for consumers of framework!
1172    ///
1173    /// ## Example
1174    /// ```
1175    /// use rumtk_core::{rumtk_new_lock};
1176    ///
1177    /// let data = 5;
1178    /// let lock = rumtk_new_lock!(data);
1179    /// ```
1180    ///
1181    #[macro_export]
1182    macro_rules! rumtk_new_lock {
1183        ( $data:expr ) => {{
1184            use $crate::threading::threading_functions::new_lock;
1185            new_lock($data)
1186        }};
1187    }
1188
1189    ///
1190    /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1191    /// critical section. The critical section itself is a synchronous function or closure. In this case,
1192    /// the critical section simply retrieves a value from a guarded dataset.
1193    ///
1194    /// ## Example
1195    /// ```
1196    /// use rumtk_core::core::RUMResult;
1197    /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_read};
1198    ///
1199    /// let data = 5;
1200    /// let lock = rumtk_new_lock!(data);
1201    /// let result = rumtk_critical_section_read!(
1202    ///     lock,
1203    ///     |guard| -> RUMResult<i32> {
1204    ///         let result: i32 = *guard;
1205    ///         Ok(result)
1206    ///     }
1207    /// ).expect("No errors locking!");
1208    ///
1209    /// assert_eq!(result, data, "Critical section yielded invalid result!");
1210    /// ```
1211    ///
1212    #[macro_export]
1213    macro_rules! rumtk_critical_section_read {
1214        ( $lock:expr, $function:expr ) => {{
1215            use $crate::threading::threading_functions::process_read_critical_section;
1216            process_read_critical_section($lock, $function)
1217        }};
1218    }
1219
1220    ///
1221    /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1222    /// critical section. The critical section itself is a synchronous function or closure. In this case,
1223    /// the critical section attempts to modify the internal state of a guarded dataset.
1224    ///
1225    /// ## Example
1226    /// ```
1227    /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_write};
1228    ///
1229    /// let data = 5;
1230    /// let new_data = 10;
1231    /// let lock = rumtk_new_lock!(data);
1232    /// let result = rumtk_critical_section_write!(
1233    ///     lock,
1234    ///     |mut guard| {
1235    ///         *guard = new_data;
1236    ///     }
1237    /// );
1238    ///
1239    /// assert_eq!(result, (), "Critical section yielded invalid result!");
1240    /// ```
1241    ///
1242    #[macro_export]
1243    macro_rules! rumtk_critical_section_write {
1244        ( $lock:expr, $function:expr ) => {{
1245            use $crate::threading::threading_functions::process_write_critical_section;
1246            process_write_critical_section($lock, $function)
1247        }};
1248    }
1249
1250    ///
1251    /// Framework interface to obtain a `read` guard to the locked data.
1252    /// To access the internal data, you will need to dereference the guard (`*guard`).
1253    ///
1254    /// It is preferred to use [rumtk_critical_section_read] if you need to avoid `time of check time
1255    /// of use` security bugs.
1256    ///
1257    /// ## Example
1258    /// ```
1259    /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read};
1260    ///
1261    /// let data = 5;
1262    /// let lock = rumtk_new_lock!(data.clone());
1263    /// let result = *rumtk_lock_read!(lock);
1264    ///
1265    /// assert_eq!(result, data, "Failed to access locked data.");
1266    /// ```
1267    ///
1268    #[macro_export]
1269    macro_rules! rumtk_lock_read {
1270        ( $lock:expr ) => {{
1271            use $crate::threading::threading_functions::lock_read;
1272            lock_read($lock.clone())
1273        }};
1274    }
1275
1276    ///
1277    /// Framework interface to obtain a `write` guard to the locked data.
1278    /// To access the internal data, you will need to dereference the guard (`*guard`).
1279    ///
1280    /// It is preferred to use [rumtk_critical_section_write] if you need to avoid `time of check time
1281    /// of use` security bugs.
1282    ///
1283    /// ## Example
1284    /// ```
1285    /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read, rumtk_lock_write};
1286    ///
1287    /// let data = 5;
1288    /// let lock = rumtk_new_lock!(data.clone());
1289    /// let new_data = 10;
1290    ///
1291    /// *rumtk_lock_write!(lock) = new_data;
1292    /// let result = *rumtk_lock_read!(lock);
1293    ///
1294    /// assert_eq!(result, new_data, "Failed to modify locked data.");
1295    /// ```
1296    ///
1297    #[macro_export]
1298    macro_rules! rumtk_lock_write {
1299        ( $lock:expr ) => {{
1300            use $crate::threading::threading_functions::lock_write;
1301            lock_write($lock.clone())
1302        }};
1303    }
1304}