rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025 MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21///
22/// This module provides all the primitives needed to build a multithreaded application.
23///
24pub mod thread_primitives {
25 pub use std::sync::Mutex as SyncMutex;
26 pub use std::sync::MutexGuard as SyncMutexGuard;
27 pub use std::sync::RwLock as SyncRwLock;
28 use std::sync::{Arc, OnceLock};
29 pub use tokio::io;
30 pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
31 use tokio::runtime::Runtime as TokioRuntime;
32 pub use tokio::sync::{
33 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard,
34 OwnedRwLockReadGuard as AsyncOwnedRwLockMappedReadGuard,
35 OwnedRwLockReadGuard as AsyncOwnedRwLockReadGuard,
36 OwnedRwLockWriteGuard as AsyncOwnedRwLockWriteGuard, RwLock as AsyncRwLock,
37 RwLockMappedWriteGuard as AsyncRwLockMappedWriteGuard,
38 RwLockReadGuard as AsyncRwLockReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
39 };
40
41 /**************************** Types ***************************************/
42 pub type SafeLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
43 pub type MappedLockReadGuard<T> = AsyncOwnedRwLockReadGuard<T>;
44 pub type SafeLockWriteGuard<T> = AsyncOwnedRwLockWriteGuard<T>;
45 pub type SafeLock<T> = Arc<AsyncRwLock<T>>;
46 pub type SafeTokioRuntime = OnceLock<TokioRuntime>;
47}
48
49pub mod threading_manager {
50 use crate::core::{RUMResult, RUMVec};
51 use crate::strings::rumtk_format;
52 use crate::threading::thread_primitives::SafeLock;
53 use crate::threading::threading_functions::{async_sleep, sleep};
54 use crate::types::{RUMHashMap, RUMID};
55 use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
56 use std::future::Future;
57 use std::sync::Arc;
58 pub use std::sync::RwLock as SyncRwLock;
59 use tokio::task::JoinHandle;
60
61 const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
62 const DEFAULT_TASK_CAPACITY: usize = 100;
63
64 pub type AsyncHandle<T> = JoinHandle<T>;
65 pub type TaskItems<T> = RUMVec<T>;
66 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
67 pub type TaskArgs<T> = TaskItems<T>;
68 /// Function signature defining the interface of task processing logic.
69 pub type SafeTaskArgs<T> = SafeLock<TaskItems<T>>;
70 pub type AsyncTaskHandle<R> = AsyncHandle<TaskResult<R>>;
71 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
72 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
73 pub type TaskID = RUMID;
74
75 #[derive(Debug, Clone, Default)]
76 pub struct Task<R> {
77 pub id: TaskID,
78 pub finished: bool,
79 pub result: Option<R>,
80 }
81
82 pub type SafeTask<R> = Arc<Task<R>>;
83 type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
84 pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
85 pub type SafeAsyncTaskTable<R> = SafeLock<TaskTable<R>>;
86 pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
87 pub type TaskBatch = RUMVec<TaskID>;
88 /// Type to use to define how task results are expected to be returned.
89 pub type TaskResult<R> = RUMResult<SafeTask<R>>;
90 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
91
92 ///
93 /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
94 /// gives the multithreading, asynchronous superpowers to synchronous logic.
95 ///
96 /// ## Example Usage
97 ///
98 /// ```
99 /// use std::sync::{Arc};
100 /// use tokio::sync::RwLock as AsyncRwLock;
101 /// use rumtk_core::core::RUMResult;
102 /// use rumtk_core::strings::RUMString;
103 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
104 /// use rumtk_core::{rumtk_create_task, };
105 ///
106 /// let expected = vec![
107 /// RUMString::from("Hello"),
108 /// RUMString::from("World!"),
109 /// RUMString::from("Overcast"),
110 /// RUMString::from("and"),
111 /// RUMString::from("Sad"),
112 /// ];
113 ///
114 /// type TestResult = RUMResult<Vec<RUMString>>;
115 /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
116 ///
117 /// let locked_args = AsyncRwLock::new(expected.clone());
118 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
119 /// let processor = rumtk_create_task!(
120 /// async |args: &SafeTaskArgs<RUMString>| -> TestResult {
121 /// let owned_args = Arc::clone(args);
122 /// let locked_args = owned_args.read().await;
123 /// let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
124 ///
125 /// for arg in locked_args.iter() {
126 /// results.push(RUMString::new(arg));
127 /// }
128 ///
129 /// Ok(results)
130 /// },
131 /// task_args
132 /// );
133 ///
134 /// queue.add_task::<_>(processor);
135 /// let results = queue.wait();
136 ///
137 /// let mut result_data = Vec::<RUMString>::with_capacity(5);
138 /// for r in results {
139 /// for v in r.unwrap().result.clone().unwrap().iter() {
140 /// for value in v.iter() {
141 /// result_data.push(value.clone());
142 /// }
143 /// }
144 /// }
145 ///
146 /// assert_eq!(result_data, expected, "Results do not match expected!");
147 ///
148 /// ```
149 ///
150 #[derive(Debug, Clone, Default)]
151 pub struct TaskManager<R> {
152 tasks: SafeSyncTaskTable<R>,
153 workers: usize,
154 }
155
156 impl<R> TaskManager<R>
157 where
158 R: Sync + Send + Clone + 'static,
159 {
160 ///
161 /// This method creates a [`TaskManager`] instance using sensible defaults.
162 ///
163 /// The `threads` field is computed from the number of cores present in system.
164 ///
165 pub fn default() -> RUMResult<TaskManager<R>> {
166 Self::new(&threading::threading_functions::get_default_system_thread_count())
167 }
168
169 ///
170 /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
171 /// Expects you to provide the count of threads to spawn and the microtask queue size
172 /// allocated by each thread.
173 ///
174 /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
175 /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
176 ///
177 pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
178 let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
179 DEFAULT_TASK_CAPACITY,
180 )));
181 Ok(TaskManager::<R> {
182 tasks,
183 workers: worker_num.to_owned(),
184 })
185 }
186
187 ///
188 /// Add a task to the processing queue. The idea is that you can queue a processor function
189 /// and list of args that will be picked up by one of the threads for processing.
190 ///
191 /// This is the async counterpart
192 ///
193 pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
194 where
195 F: Future<Output = R> + Send + Sync + 'static,
196 F::Output: Send + 'static,
197 {
198 let id = TaskID::new_v4();
199 Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
200 }
201
202 ///
203 /// See [`Self::add_task_async`]
204 ///
205 /// Unlike `add_task`, this method does not block which is key to avoiding panicking
206 /// the tokio runtim if trying to add task to queue from a normal function called from an
207 /// async environment.
208 ///
209 /// ## Example
210 ///
211 /// ```
212 /// use rumtk_core::threading::threading_manager::{TaskManager};
213 /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
214 /// use std::sync::{Arc};
215 /// use tokio::sync::Mutex;
216 ///
217 /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
218 ///
219 /// async fn called_fn() -> usize {
220 /// 5
221 /// }
222 ///
223 /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
224 /// manager.spawn_task(called_fn());
225 /// 1
226 /// }
227 ///
228 /// async fn call_sync_fn(mut manager: JobManager) -> usize {
229 /// let mut owned = manager.lock().await;
230 /// push_job(&mut owned)
231 /// }
232 ///
233 /// let workers = 5;
234 /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
235 ///
236 /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
237 ///
238 /// let result_raw = manager.blocking_lock().wait();
239 ///
240 /// ```
241 ///
242 pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
243 where
244 F: Future<Output = R> + Send + Sync + 'static,
245 F::Output: Send + Sized + 'static,
246 {
247 let id = TaskID::new_v4();
248 let rt = rumtk_init_threads!(self.workers);
249 rumtk_spawn_task!(
250 rt,
251 Self::_add_task_async(id.clone(), self.tasks.clone(), task)
252 );
253 Ok(id)
254 }
255
256 ///
257 /// See [add_task_async](Self::add_task_async)
258 ///
259 pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
260 where
261 F: Future<Output = R> + Send + Sync + 'static,
262 F::Output: Send + Sized + 'static,
263 {
264 let id = TaskID::new_v4();
265 let tasks = self.tasks.clone();
266 Ok(rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task)))
267 }
268
269 async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
270 where
271 F: Future<Output = R> + Send + Sync + 'static,
272 F::Output: Send + Sized + 'static,
273 {
274 let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
275 id: id.clone(),
276 finished: false,
277 result: None,
278 }));
279 tasks.write().unwrap().insert(id.clone(), safe_task.clone());
280
281 let task_wrapper = async move || {
282 // Run the task
283 let result = task.await;
284
285 // Cleanup task
286 let mut lock = safe_task.write().unwrap();
287 lock.result = Some(result);
288 lock.finished = true;
289 };
290
291 tokio::spawn(task_wrapper());
292
293 id
294 }
295
296 ///
297 /// See [wait_async](Self::wait_async)
298 ///
299 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
300 /// this function happens to be called from the async context.
301 ///
302 pub fn wait(&mut self) -> TaskResults<R> {
303 let task_batch = self
304 .tasks
305 .read()
306 .unwrap()
307 .keys()
308 .cloned()
309 .collect::<Vec<_>>();
310 self.wait_on_batch(&task_batch)
311 }
312
313 ///
314 /// See [wait_on_batch_async](Self::wait_on_batch_async)
315 ///
316 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
317 /// this function happens to be called from the async context.
318 ///
319 pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
320 let mut results = TaskResults::<R>::default();
321 for task in tasks {
322 results.push(self.wait_on(task));
323 }
324 results
325 }
326
327 ///
328 /// See [wait_on_async](Self::wait_on_async)
329 ///
330 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
331 /// this function happens to be called from the async context.
332 ///
333 pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
334 let task = match self.tasks.write().unwrap().remove(task_id) {
335 Some(task) => task.clone(),
336 None => return Err(rumtk_format!("No task with id {}", task_id)),
337 };
338
339 while !task.read().unwrap().finished {
340 sleep(DEFAULT_SLEEP_DURATION);
341 }
342
343 let x = Ok(Arc::new(task.write().unwrap().clone()));
344 x
345 }
346
347 ///
348 /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
349 ///
350 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
351 ///
352 /// Upon completion,
353 ///
354 /// 2. Return the result ([TaskResults<R>](TaskResults)).
355 ///
356 /// This operation consumes the task.
357 ///
358 /// ### Note:
359 /// ```text
360 /// Results returned here are not guaranteed to be in the same order as the order in which
361 /// the tasks were queued for work. You will need to pass a type as T that automatically
362 /// tracks its own id or has a way for you to resort results.
363 /// ```
364 pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
365 let task = match self.tasks.write().unwrap().remove(task_id) {
366 Some(task) => task.clone(),
367 None => return Err(rumtk_format!("No task with id {}", task_id)),
368 };
369
370 while !task.read().unwrap().finished {
371 async_sleep(DEFAULT_SLEEP_DURATION).await;
372 }
373
374 let x = Ok(Arc::new(task.write().unwrap().clone()));
375 x
376 }
377
378 ///
379 /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
380 ///
381 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
382 ///
383 /// Upon completion,
384 ///
385 /// 1. We collect the results generated (if any).
386 /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
387 ///
388 /// ### Note:
389 /// ```text
390 /// Results returned here are not guaranteed to be in the same order as the order in which
391 /// the tasks were queued for work. You will need to pass a type as T that automatically
392 /// tracks its own id or has a way for you to resort results.
393 /// ```
394 pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
395 let mut results = TaskResults::<R>::default();
396 for task in tasks {
397 results.push(self.wait_on_async(task).await);
398 }
399 results
400 }
401
402 ///
403 /// This method waits until all queued tasks have been processed from the main queue.
404 ///
405 /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
406 ///
407 /// Upon completion,
408 ///
409 /// 1. We collect the results generated (if any).
410 /// 2. We reset the main task and result internal queue states.
411 /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
412 ///
413 /// This operation consumes all the tasks.
414 ///
415 /// ### Note:
416 /// ```text
417 /// Results returned here are not guaranteed to be in the same order as the order in which
418 /// the tasks were queued for work. You will need to pass a type as T that automatically
419 /// tracks its own id or has a way for you to resort results.
420 /// ```
421 pub async fn wait_async(&mut self) -> TaskResults<R> {
422 let task_batch = self
423 .tasks
424 .read()
425 .unwrap()
426 .keys()
427 .cloned()
428 .collect::<Vec<_>>();
429 self.wait_on_batch_async(&task_batch).await
430 }
431
432 ///
433 /// Check if all work has been completed from the task queue.
434 ///
435 /// ## Examples
436 ///
437 /// ### Sync Usage
438 ///
439 ///```
440 /// use rumtk_core::threading::threading_manager::TaskManager;
441 ///
442 /// let manager = TaskManager::<usize>::new(&4).unwrap();
443 ///
444 /// let all_done = manager.is_all_completed();
445 ///
446 /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
447 ///
448 /// ```
449 ///
450 pub fn is_all_completed(&self) -> bool {
451 self._is_all_completed_async()
452 }
453
454 pub async fn is_all_completed_async(&self) -> bool {
455 self._is_all_completed_async()
456 }
457
458 fn _is_all_completed_async(&self) -> bool {
459 for (_, task) in self.tasks.read().unwrap().iter() {
460 if !task.read().unwrap().finished {
461 return false;
462 }
463 }
464
465 true
466 }
467
468 ///
469 /// Check if a task completed
470 ///
471 pub fn is_finished(&self, id: &TaskID) -> bool {
472 match self.tasks.read().unwrap().get(id) {
473 Some(t) => t.read().unwrap().finished,
474 None => false,
475 }
476 }
477
478 pub async fn is_finished_async(&self, id: &TaskID) -> bool {
479 match self.tasks.read().unwrap().get(id) {
480 Some(task) => task.read().unwrap().finished,
481 None => true,
482 }
483 }
484
485 ///
486 /// Alias for [wait](TaskManager::wait).
487 ///
488 fn gather(&mut self) -> TaskResults<R> {
489 self.wait()
490 }
491 }
492}
493
494///
495/// This module contains a few helper.
496///
497/// For example, you can find a function for determining number of threads available in system.
498/// The sleep family of functions are also here.
499///
500pub mod threading_functions {
501 use crate::core::RUMResult;
502 use crate::net::tcp::{AsyncOwnedRwLockReadGuard, AsyncOwnedRwLockWriteGuard, SafeLockReadGuard, SafeLockWriteGuard, SafeTokioRuntime};
503 use crate::threading::thread_primitives::{AsyncRwLock, SafeLock};
504 use num_cpus;
505 use std::future::Future;
506 use std::sync::Arc;
507 use std::thread::{available_parallelism, sleep as std_sleep};
508 use std::time::Duration;
509 use tokio::runtime::Runtime;
510 use tokio::time::sleep as tokio_sleep;
511 /**************************** Globals **************************************/
512 static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
513
514 pub const NANOS_PER_SEC: u64 = 1000000000;
515 pub const MILLIS_PER_SEC: u64 = 1000;
516 pub const MICROS_PER_SEC: u64 = 1000000;
517 const DEFAULT_SLEEP_DURATION: f32 = 0.001;
518 /**************************** Helpers **************************************/
519 pub fn init_runtime<'a>(workers: usize) -> &'a Runtime {
520 unsafe {
521 let runtime = DEFAULT_RUNTIME.get_or_init(|| {
522 let mut builder = tokio::runtime::Builder::new_multi_thread();
523 builder.worker_threads(workers);
524 builder.enable_all();
525 match builder.build() {
526 Ok(handle) => handle,
527 Err(e) => panic!(
528 "Unable to initialize threading tokio runtime because {}!",
529 &e
530 ),
531 }
532 });
533 runtime
534 }
535 }
536
537 pub fn get_default_system_thread_count() -> usize {
538 let cpus: usize = num_cpus::get();
539 let parallelism = match available_parallelism() {
540 Ok(n) => n.get(),
541 Err(_) => 0,
542 };
543
544 if parallelism >= cpus {
545 parallelism
546 } else {
547 cpus
548 }
549 }
550
551 pub fn sleep(s: f32) {
552 let ns = s * NANOS_PER_SEC as f32;
553 let rounded_ns = ns.round() as u64;
554 let duration = Duration::from_nanos(rounded_ns);
555 std_sleep(duration);
556 }
557
558 pub async fn async_sleep(s: f32) {
559 let ns = s * NANOS_PER_SEC as f32;
560 let rounded_ns = ns.round() as u64;
561 let duration = Duration::from_nanos(rounded_ns);
562 tokio_sleep(duration).await;
563 }
564
565 ///
566 /// Given a closure task, push it onto the current `tokio` runtime for execution.
567 /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
568 /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
569 /// result.
570 ///
571 /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
572 ///
573 /// ## Example
574 ///
575 /// ```
576 /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
577 ///
578 /// const Hello: &str = "World!";
579 ///
580 /// init_runtime(5);
581 ///
582 /// let result = block_on_task(async {
583 /// Hello
584 /// });
585 ///
586 /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
587 /// ```
588 ///
589 pub fn block_on_task<R, F>(task: F) -> R
590 where
591 F: Future<Output = R> + Send + 'static,
592 F::Output: Send + 'static,
593 {
594 let rt = init_runtime(get_default_system_thread_count());
595 rt.block_on(task)
596 }
597
598 pub fn new_lock<T>(data: T) -> SafeLock<T> {
599 Arc::new(AsyncRwLock::new(data))
600 }
601
602 ///
603 /// This function gives you read access to underlying structure.
604 ///
605 /// Helper function for executing microtask immediately after locking the spin lock. This function
606 /// should be used in situations in which you want to minimize the risk of Time of Check Time of
607 /// Use security bugs.
608 ///
609 /// ## Example
610 /// ```
611 /// use rumtk_core::core::RUMResult;
612 /// use rumtk_core::threading::thread_primitives::SafeLock;
613 /// use rumtk_core::threading::threading_functions::{new_lock, process_read_critical_section};
614 ///
615 /// let data = 5;
616 /// let lock = new_lock(data.clone());
617 /// let result = process_read_critical_section(lock, |guard| -> RUMResult<i32> {
618 /// Ok(*guard)
619 /// }).unwrap();
620 ///
621 /// assert_eq!(result, data, "Failed to execute critical section through which we retrieve the locked data!");
622 /// ```
623 ///
624 pub fn process_read_critical_section<T, R, F>(
625 lock: SafeLock<T>,
626 critical_section: F,
627 ) -> R
628 where
629 F: Fn(SafeLockReadGuard<T>) -> R, T: Send + Sync + 'static,
630 {
631 tokio::task::block_in_place(move || {
632 let read_guard = lock_read(lock);
633 critical_section(read_guard)
634 })
635 }
636
637 ///
638 /// This function gives you write access to underlying structure.
639 ///
640 /// Helper function for executing microtask immediately after locking the spin lock. This function
641 /// should be used in situations in which you want to minimize the risk of Time of Check Time of
642 /// Use security bugs.
643 ///
644 /// ## Example
645 /// ```
646 /// use rumtk_core::core::RUMResult;
647 /// use rumtk_core::threading::thread_primitives::SafeLock;
648 /// use rumtk_core::threading::threading_functions::{new_lock, process_write_critical_section};
649 ///
650 /// let data = 5;
651 /// let lock = new_lock(data.clone());
652 /// let new_data = 10;
653 /// let result = process_write_critical_section(lock, |mut guard| -> RUMResult<i32> {
654 /// *guard = new_data;
655 /// Ok(*guard)
656 /// }).unwrap();
657 ///
658 /// assert_eq!(result, new_data, "Failed to execute critical section through which we modify the locked data!");
659 /// ```
660 ///
661 pub fn process_write_critical_section<T, R, F>(
662 lock: SafeLock<T>,
663 critical_section: F,
664 ) -> R
665 where
666 F: Fn(SafeLockWriteGuard<T>) -> R, T: Send + Sync + 'static,
667 {
668 tokio::task::block_in_place(move || {
669 let write_guard = lock_write(lock);
670 critical_section(write_guard)
671 })
672 }
673
674 ///
675 /// Obtain read guard to standard spin lock such that you have a more ergonomic interface to
676 /// locked data.
677 ///
678 /// It is preferable to use [process_read_critical_section] when you must process
679 /// critical logic that is sensitive to time of check time of use security bugs!
680 ///
681 /// ## Example
682 /// ```
683 /// use rumtk_core::threading::thread_primitives::SafeLock;
684 /// use rumtk_core::threading::threading_functions::{new_lock, lock_read};
685 ///
686 /// let data = 5;
687 /// let lock = new_lock(data.clone());
688 /// let result = *lock_read(lock);
689 ///
690 /// assert_eq!(result, data, "Failed to access the locked data!");
691 /// ```
692 ///
693 pub fn lock_read<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockReadGuard<T> {
694 block_on_task(async move {
695 lock.read_owned().await
696 })
697 }
698
699 ///
700 /// Obtain write guard to standard spin lock such that you have a more ergonomic interface to
701 /// locked data.
702 ///
703 /// It is preferable to use [process_write_critical_section] when you must process
704 /// critical logic that is sensitive to time of check time of use security bugs!
705 ///
706 /// ## Example
707 /// ```
708 /// use rumtk_core::threading::thread_primitives::SafeLock;
709 /// use rumtk_core::threading::threading_functions::{new_lock, lock_read, lock_write};
710 ///
711 /// let data = 5;
712 /// let lock = new_lock(data.clone());
713 /// let new_data = 10;
714 ///
715 /// *lock_write(lock.clone()) = new_data;
716 ///
717 /// let result = *lock_read(lock);
718 ///
719 /// assert_eq!(result, new_data, "Failed to modify the locked data!");
720 /// ```
721 ///
722 pub fn lock_write<T: Send + Sync + 'static>(lock: SafeLock<T>) -> AsyncOwnedRwLockWriteGuard<T> {
723 block_on_task(async move {
724 lock.write_owned().await
725 })
726 }
727}
728
729///
730/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
731/// This means that by default, all jobs sent to the thread pool have to be async in nature.
732/// These macros make handling of these jobs at the sync/async boundary more convenient.
733///
734pub mod threading_macros {
735 use crate::threading::thread_primitives;
736 use crate::threading::threading_manager::SafeTaskArgs;
737
738 ///
739 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
740 /// will be saved to the global context so the next call to this macro will simply grab a
741 /// reference to the previously initialized runtime.
742 ///
743 /// Passing nothing will default to initializing a runtime using the default number of threads
744 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
745 ///
746 /// Passing `threads` number will yield a runtime that allocates that many threads.
747 ///
748 ///
749 /// ## Examples
750 ///
751 /// ```
752 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
753 /// use rumtk_core::core::RUMResult;
754 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
755 ///
756 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
757 /// let mut result = Vec::<i32>::new();
758 /// for arg in args.read().await.iter() {
759 /// result.push(*arg);
760 /// }
761 /// Ok(result)
762 /// }
763 ///
764 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
765 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
766 /// let result = rumtk_resolve_task!(task); // Spawn's task and waits for it to conclude.
767 /// ```
768 ///
769 /// ```
770 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
771 /// use rumtk_core::core::RUMResult;
772 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
773 ///
774 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
775 /// let mut result = Vec::<i32>::new();
776 /// for arg in args.read().await.iter() {
777 /// result.push(*arg);
778 /// }
779 /// Ok(result)
780 /// }
781 ///
782 /// let thread_count: usize = 10;
783 /// let args = rumtk_create_task_args!(1);
784 /// let task = rumtk_create_task!(test, args);
785 /// let result = rumtk_resolve_task!(task);
786 /// ```
787 #[macro_export]
788 macro_rules! rumtk_init_threads {
789 ( ) => {{
790 use $crate::threading::threading_functions::{
791 get_default_system_thread_count, init_runtime,
792 };
793 init_runtime(get_default_system_thread_count())
794 }};
795 ( $threads:expr ) => {{
796 use $crate::rumtk_cache_fetch;
797 use $crate::threading::threading_functions::init_runtime;
798 init_runtime($threads)
799 }};
800 }
801
802 ///
803 /// Puts task onto the runtime queue.
804 ///
805 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
806 ///
807 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
808 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
809 ///
810 #[macro_export]
811 macro_rules! rumtk_spawn_task {
812 ( $func:expr ) => {{
813 use $crate::rumtk_init_threads;
814 let rt = rumtk_init_threads!().expect("Runtime is not initialized!");
815 rt.spawn($func)
816 }};
817 ( $rt:expr, $func:expr ) => {{
818 $rt.spawn($func)
819 }};
820 }
821
822 ///
823 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
824 ///
825 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
826 /// async closure without passing any arguments.
827 ///
828 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
829 /// In such a case, we pass those arguments to the call on the async closure and await on results.
830 ///
831 #[macro_export]
832 macro_rules! rumtk_wait_on_task {
833 ( $func:expr ) => {{
834 use $crate::threading::threading_functions::block_on_task;
835 block_on_task(async move {
836 $func().await
837 })
838 }};
839 ( $func:expr, $($arg_items:expr),+ ) => {{
840 use $crate::threading::threading_functions::block_on_task;
841 block_on_task(async move {
842 $func($($arg_items),+).await
843 })
844 }};
845 }
846
847 ///
848 /// This macro awaits a future.
849 ///
850 /// The arguments are a reference to the runtime (`rt) and a future.
851 ///
852 /// If there is a result, you will get the result of the future.
853 ///
854 /// ## Examples
855 ///
856 /// ```
857 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
858 /// use rumtk_core::core::RUMResult;
859 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
860 ///
861 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
862 /// let mut result = Vec::<i32>::new();
863 /// for arg in args.read().await.iter() {
864 /// result.push(*arg);
865 /// }
866 /// Ok(result)
867 /// }
868 ///
869 /// let args = rumtk_create_task_args!(1);
870 /// let task = rumtk_create_task!(test, args);
871 /// let result = rumtk_resolve_task!(task);
872 /// ```
873 ///
874 #[macro_export]
875 macro_rules! rumtk_resolve_task {
876 ( $future:expr ) => {{
877 use $crate::threading::threading_functions::block_on_task;
878 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
879 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
880 // is technically moved into the async closure and captured so things like mutex guards
881 // technically go out of the outer scope. As a result that expression fails to compile even
882 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
883 // into the async closure. To ensure that happens regardless of given expression, we do
884 // a variable assignment below to force the "future" macro expressions to resolve before
885 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
886 //let future = $future;
887 block_on_task(async move { $future.await })
888 }};
889 }
890
891 #[macro_export]
892 macro_rules! rumtk_resolve_task_from_async {
893 ( $rt:expr, $future:expr ) => {{
894 let handle = $rt.spawn_blocking(async move { future.await })
895 }};
896 }
897
898 ///
899 /// This macro creates an async body that calls the async closure and awaits it.
900 ///
901 /// ## Example
902 ///
903 /// ```
904 /// use std::sync::{Arc, RwLock};
905 /// use tokio::sync::RwLock as AsyncRwLock;
906 /// use rumtk_core::strings::RUMString;
907 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
908 ///
909 /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
910 /// let expected = vec![
911 /// RUMString::from("Hello"),
912 /// RUMString::from("World!"),
913 /// RUMString::from("Overcast"),
914 /// RUMString::from("and"),
915 /// RUMString::from("Sad"),
916 /// ];
917 /// let locked_args = AsyncRwLock::new(expected.clone());
918 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
919 ///
920 ///
921 /// ```
922 ///
923 #[macro_export]
924 macro_rules! rumtk_create_task {
925 ( $func:expr ) => {{
926 async move {
927 let f = $func;
928 f().await
929 }
930 }};
931 ( $func:expr, $args:expr ) => {{
932 let f = $func;
933 async move { f(&$args).await }
934 }};
935 }
936
937 ///
938 /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
939 ///
940 /// ## Note
941 ///
942 /// All arguments must be of the same type
943 ///
944 #[macro_export]
945 macro_rules! rumtk_create_task_args {
946 ( ) => {{
947 use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
948 use $crate::threading::thread_primitives::AsyncRwLock;
949 SafeTaskArgs::new(AsyncRwLock::new(vec![]))
950 }};
951 ( $($args:expr),+ ) => {{
952 use $crate::threading::threading_manager::{SafeTaskArgs};
953 use $crate::threading::thread_primitives::AsyncRwLock;
954 SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
955 }};
956 }
957
958 ///
959 /// Convenience macro for packaging the task components and launching the task in one line.
960 ///
961 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
962 /// number of threads at the end. This is optional. Meaning, we will default to the system's
963 /// number of threads if that value is not specified.
964 ///
965 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
966 /// variable number of arguments to pass to the task. each argument must be of the same type.
967 /// If you wish to pass different arguments with different types, please define an abstract type
968 /// whose underlying structure is a tuple of items and pass that instead.
969 ///
970 /// ## Examples
971 ///
972 /// ### With Default Thread Count
973 /// ```
974 /// use rumtk_core::{rumtk_exec_task};
975 /// use rumtk_core::core::RUMResult;
976 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
977 ///
978 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
979 /// let mut result = Vec::<i32>::new();
980 /// for arg in args.read().await.iter() {
981 /// result.push(*arg);
982 /// }
983 /// Ok(result)
984 /// }
985 ///
986 /// let result = rumtk_exec_task!(test, vec![5]).unwrap();
987 /// assert_eq!(&result.clone(), &vec![5], "Results mismatch");
988 /// assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
989 /// ```
990 ///
991 /// ### With Custom Thread Count
992 /// ```
993 /// use rumtk_core::{rumtk_exec_task};
994 /// use rumtk_core::core::RUMResult;
995 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
996 ///
997 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
998 /// let mut result = Vec::<i32>::new();
999 /// for arg in args.read().await.iter() {
1000 /// result.push(*arg);
1001 /// }
1002 /// Ok(result)
1003 /// }
1004 ///
1005 /// let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
1006 /// assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1007 /// assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1008 /// ```
1009 ///
1010 /// ### With Async Function Body
1011 /// ```
1012 /// use rumtk_core::{rumtk_exec_task};
1013 /// use rumtk_core::core::RUMResult;
1014 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
1015 ///
1016 /// let result = rumtk_exec_task!(
1017 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
1018 /// let mut result = Vec::<i32>::new();
1019 /// for arg in args.read().await.iter() {
1020 /// result.push(*arg);
1021 /// }
1022 /// Ok(result)
1023 /// },
1024 /// vec![5]).unwrap();
1025 /// assert_eq!(&result.clone(), &vec![5], "Results mismatch");
1026 /// assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1027 /// ```
1028 ///
1029 /// ### With Async Function Body and No Args
1030 /// ```
1031 /// use rumtk_core::{rumtk_exec_task};
1032 /// use rumtk_core::core::RUMResult;
1033 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
1034 ///
1035 /// let result = rumtk_exec_task!(
1036 /// async || -> RUMResult<Vec<i32>> {
1037 /// let mut result = Vec::<i32>::new();
1038 /// Ok(result)
1039 /// }).unwrap();
1040 /// let empty = Vec::<i32>::new();
1041 /// assert_eq!(&result.clone(), &empty, "Results mismatch");
1042 /// assert_ne!(&result.clone(), &vec![5, 10], "Results do not mismatch as expected!");
1043 /// ```
1044 ///
1045 /// ## Equivalent To
1046 ///
1047 /// ```no_run
1048 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
1049 /// use rumtk_core::core::RUMResult;
1050 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
1051 ///
1052 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
1053 /// let mut result = Vec::<i32>::new();
1054 /// for arg in args.read().await.iter() {
1055 /// result.push(*arg);
1056 /// }
1057 /// Ok(result)
1058 /// }
1059 ///
1060 /// let args = rumtk_create_task_args!(1);
1061 /// let task = rumtk_create_task!(test, args);
1062 /// let result = rumtk_resolve_task!(task);
1063 /// ```
1064 ///
1065 #[macro_export]
1066 macro_rules! rumtk_exec_task {
1067 ($func:expr ) => {{
1068 use $crate::{
1069 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1070 };
1071 let task = rumtk_create_task!($func);
1072 rumtk_resolve_task!(task)
1073 }};
1074 ($func:expr, $args:expr ) => {{
1075 use $crate::threading::threading_functions::get_default_system_thread_count;
1076 rumtk_exec_task!($func, $args, get_default_system_thread_count())
1077 }};
1078 ($func:expr, $args:expr , $threads:expr ) => {{
1079 use $crate::threading::thread_primitives::AsyncRwLock;
1080 use $crate::{
1081 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
1082 };
1083 let args = SafeTaskArgs::new(AsyncRwLock::new($args));
1084 let task = rumtk_create_task!($func, args);
1085 rumtk_resolve_task!(task)
1086 }};
1087 }
1088
1089 ///
1090 /// Sleep a duration of time in a sync context, so no await can be call on the result.
1091 ///
1092 /// You can pass any value that can be cast to f32.
1093 ///
1094 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1095 ///
1096 /// ## Examples
1097 ///
1098 /// ```
1099 /// use rumtk_core::rumtk_sleep;
1100 /// rumtk_sleep!(1); // Sleeps for 1 second.
1101 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
1102 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
1103 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
1104 /// ```
1105 ///
1106 #[macro_export]
1107 macro_rules! rumtk_sleep {
1108 ( $dur:expr) => {{
1109 use $crate::threading::threading_functions::sleep;
1110 sleep($dur as f32)
1111 }};
1112 }
1113
1114 ///
1115 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1116 ///
1117 /// You can pass any value that can be cast to f32.
1118 ///
1119 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1120 ///
1121 /// ## Examples
1122 ///
1123 /// ```
1124 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1125 /// use rumtk_core::core::RUMResult;
1126 /// rumtk_exec_task!( async || -> RUMResult<()> {
1127 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
1128 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
1129 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
1130 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1131 /// Ok(())
1132 /// }
1133 /// );
1134 /// ```
1135 ///
1136 #[macro_export]
1137 macro_rules! rumtk_async_sleep {
1138 ( $dur:expr) => {{
1139 use $crate::threading::threading_functions::async_sleep;
1140 async_sleep($dur as f32)
1141 }};
1142 }
1143
1144 ///
1145 ///
1146 ///
1147 #[macro_export]
1148 macro_rules! rumtk_new_task_queue {
1149 ( $worker_num:expr ) => {{
1150 use $crate::threading::threading_manager::TaskManager;
1151 TaskManager::new($worker_num);
1152 }};
1153 }
1154
1155 ///
1156 /// Creates a new safe lock to guard the given data. This interface was created to cleanup lock
1157 /// management for consumers of framework!
1158 ///
1159 /// ## Example
1160 /// ```
1161 /// use rumtk_core::{rumtk_new_lock};
1162 ///
1163 /// let data = 5;
1164 /// let lock = rumtk_new_lock!(data);
1165 /// ```
1166 ///
1167 #[macro_export]
1168 macro_rules! rumtk_new_lock {
1169 ( $data:expr ) => {{
1170 use $crate::threading::threading_functions::new_lock;
1171 new_lock($data)
1172 }};
1173 }
1174
1175 ///
1176 /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1177 /// critical section. The critical section itself is a synchronous function or closure. In this case,
1178 /// the critical section simply retrieves a value from a guarded dataset.
1179 ///
1180 /// ## Example
1181 /// ```
1182 /// use rumtk_core::core::RUMResult;
1183 /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_read};
1184 ///
1185 /// let data = 5;
1186 /// let lock = rumtk_new_lock!(data);
1187 /// let result = rumtk_critical_section_read!(
1188 /// lock,
1189 /// |guard| -> RUMResult<i32> {
1190 /// let result: i32 = *guard;
1191 /// Ok(result)
1192 /// }
1193 /// ).expect("No errors locking!");
1194 ///
1195 /// assert_eq!(result, data, "Critical section yielded invalid result!");
1196 /// ```
1197 ///
1198 #[macro_export]
1199 macro_rules! rumtk_critical_section_read {
1200 ( $lock:expr, $function:expr ) => {{
1201 use $crate::threading::threading_functions::process_read_critical_section;
1202 process_read_critical_section($lock, $function)
1203 }};
1204 }
1205
1206 ///
1207 /// Using a standard spin lock [SafeLock](thread_primitives::SafeLock), lock it and execute the
1208 /// critical section. The critical section itself is a synchronous function or closure. In this case,
1209 /// the critical section attempts to modify the internal state of a guarded dataset.
1210 ///
1211 /// ## Example
1212 /// ```
1213 /// use rumtk_core::{rumtk_new_lock, rumtk_critical_section_write};
1214 ///
1215 /// let data = 5;
1216 /// let new_data = 10;
1217 /// let lock = rumtk_new_lock!(data);
1218 /// let result = rumtk_critical_section_write!(
1219 /// lock,
1220 /// |mut guard| {
1221 /// *guard = new_data;
1222 /// }
1223 /// );
1224 ///
1225 /// assert_eq!(result, (), "Critical section yielded invalid result!");
1226 /// ```
1227 ///
1228 #[macro_export]
1229 macro_rules! rumtk_critical_section_write {
1230 ( $lock:expr, $function:expr ) => {{
1231 use $crate::threading::threading_functions::process_write_critical_section;
1232 process_write_critical_section($lock, $function)
1233 }};
1234 }
1235
1236 ///
1237 /// Framework interface to obtain a `read` guard to the locked data.
1238 /// To access the internal data, you will need to dereference the guard (`*guard`).
1239 ///
1240 /// It is preferred to use [rumtk_critical_section_read] if you need to avoid `time of check time
1241 /// of use` security bugs.
1242 ///
1243 /// ## Example
1244 /// ```
1245 /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read};
1246 ///
1247 /// let data = 5;
1248 /// let lock = rumtk_new_lock!(data.clone());
1249 /// let result = *rumtk_lock_read!(lock);
1250 ///
1251 /// assert_eq!(result, data, "Failed to access locked data.");
1252 /// ```
1253 ///
1254 #[macro_export]
1255 macro_rules! rumtk_lock_read {
1256 ( $lock:expr ) => {{
1257 use $crate::threading::threading_functions::lock_read;
1258 lock_read($lock.clone())
1259 }};
1260 }
1261
1262 ///
1263 /// Framework interface to obtain a `write` guard to the locked data.
1264 /// To access the internal data, you will need to dereference the guard (`*guard`).
1265 ///
1266 /// It is preferred to use [rumtk_critical_section_write] if you need to avoid `time of check time
1267 /// of use` security bugs.
1268 ///
1269 /// ## Example
1270 /// ```
1271 /// use rumtk_core::{rumtk_new_lock, rumtk_lock_read, rumtk_lock_write};
1272 ///
1273 /// let data = 5;
1274 /// let lock = rumtk_new_lock!(data.clone());
1275 /// let new_data = 10;
1276 ///
1277 /// *rumtk_lock_write!(lock) = new_data;
1278 /// let result = *rumtk_lock_read!(lock);
1279 ///
1280 /// assert_eq!(result, new_data, "Failed to modify locked data.");
1281 /// ```
1282 ///
1283 #[macro_export]
1284 macro_rules! rumtk_lock_write {
1285 ( $lock:expr ) => {{
1286 use $crate::threading::threading_functions::lock_write;
1287 lock_write($lock.clone())
1288 }};
1289 }
1290}