rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26 use crate::cache::{new_cache, LazyRUMCache};
27 use std::sync::Arc;
28 use tokio::runtime::Runtime as TokioRuntime;
29 /**************************** Globals **************************************/
30 pub static mut RT_CACHE: TokioRtCache = new_cache();
31 /**************************** Helpers ***************************************/
32 pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
33 let mut builder = tokio::runtime::Builder::new_multi_thread();
34 builder.worker_threads(*threads);
35 builder.enable_all();
36 match builder.build() {
37 Ok(handle) => Arc::new(handle),
38 Err(e) => panic!(
39 "Unable to initialize threading tokio runtime because {}!",
40 &e
41 ),
42 }
43 }
44
45 /**************************** Types ***************************************/
46 pub type SafeTokioRuntime = Arc<TokioRuntime>;
47 pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
48}
49
50pub mod threading_manager {
51 use crate::cache::LazyRUMCacheValue;
52 use crate::core::{RUMResult, RUMVec};
53 use crate::strings::rumtk_format;
54 use crate::threading::thread_primitives::SafeTokioRuntime;
55 use crate::threading::threading_functions::async_sleep;
56 use crate::types::{RUMHashMap, RUMID};
57 use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
58 use std::future::Future;
59 use std::sync::Arc;
60 use tokio::sync::RwLock;
61 use tokio::task::JoinHandle;
62
63 const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64 const DEFAULT_TASK_CAPACITY: usize = 1024;
65
66 pub type TaskItems<T> = RUMVec<T>;
67 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68 pub type TaskArgs<T> = TaskItems<T>;
69 /// Function signature defining the interface of task processing logic.
70 pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
71 pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
72 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74 pub type TaskID = RUMID;
75
76 #[derive(Debug, Clone, Default)]
77 pub struct Task<R> {
78 pub id: TaskID,
79 pub finished: bool,
80 pub result: Option<R>,
81 }
82
83 pub type SafeTask<R> = Arc<Task<R>>;
84 type SafeInternalTask<R> = Arc<RwLock<Task<R>>>;
85 pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86 pub type TaskBatch = RUMVec<TaskID>;
87 /// Type to use to define how task results are expected to be returned.
88 pub type TaskResult<R> = RUMResult<SafeTask<R>>;
89 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
90 pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
91
92 ///
93 /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
94 /// gives the multithreading, asynchronous superpowers to synchronous logic.
95 ///
96 /// ## Example Usage
97 ///
98 /// ```
99 /// use std::sync::{Arc};
100 /// use tokio::sync::RwLock as AsyncRwLock;
101 /// use rumtk_core::core::RUMResult;
102 /// use rumtk_core::strings::RUMString;
103 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
104 /// use rumtk_core::{rumtk_create_task, };
105 ///
106 /// let expected = vec![
107 /// RUMString::from("Hello"),
108 /// RUMString::from("World!"),
109 /// RUMString::from("Overcast"),
110 /// RUMString::from("and"),
111 /// RUMString::from("Sad"),
112 /// ];
113 ///
114 /// type TestResult = RUMResult<Vec<RUMString>>;
115 /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
116 ///
117 /// let locked_args = AsyncRwLock::new(expected.clone());
118 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
119 /// let processor = rumtk_create_task!(
120 /// async |args: &SafeTaskArgs<RUMString>| -> TestResult {
121 /// let owned_args = Arc::clone(args);
122 /// let locked_args = owned_args.read().await;
123 /// let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
124 ///
125 /// for arg in locked_args.iter() {
126 /// results.push(RUMString::new(arg));
127 /// }
128 ///
129 /// Ok(results)
130 /// },
131 /// task_args
132 /// );
133 ///
134 /// queue.add_task::<_>(processor);
135 /// let results = queue.wait();
136 ///
137 /// let mut result_data = Vec::<RUMString>::with_capacity(5);
138 /// for r in results {
139 /// for v in r.unwrap().result.clone().unwrap().iter() {
140 /// for value in v.iter() {
141 /// result_data.push(value.clone());
142 /// }
143 /// }
144 /// }
145 ///
146 /// assert_eq!(result_data, expected, "Results do not match expected!");
147 ///
148 /// ```
149 ///
150 #[derive(Debug, Clone, Default)]
151 pub struct TaskManager<R> {
152 tasks: TaskTable<R>,
153 workers: usize,
154 }
155
156 impl<R> TaskManager<R>
157 where
158 R: Sync + Send + Clone + 'static,
159 {
160 ///
161 /// This method creates a [`TaskQueue`] instance using sensible defaults.
162 ///
163 /// The `threads` field is computed from the number of cores present in system.
164 ///
165 pub fn default() -> RUMResult<TaskManager<R>> {
166 Self::new(&threading::threading_functions::get_default_system_thread_count())
167 }
168
169 ///
170 /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
171 /// Expects you to provide the count of threads to spawn and the microtask queue size
172 /// allocated by each thread.
173 ///
174 /// This method calls [`Self::with_capacity()`] for the actual object creation.
175 /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
176 ///
177 pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
178 let tasks = TaskTable::<R>::with_capacity(DEFAULT_TASK_CAPACITY);
179 Ok(TaskManager::<R> {
180 tasks,
181 workers: worker_num.to_owned(),
182 })
183 }
184
185 ///
186 /// Add a task to the processing queue. The idea is that you can queue a processor function
187 /// and list of args that will be picked up by one of the threads for processing.
188 ///
189 pub fn add_task<F>(&mut self, task: F) -> TaskID
190 where
191 F: Future<Output = R> + Send + Sync + 'static,
192 F::Output: Send + Sized + 'static,
193 {
194 let id = TaskID::new_v4();
195 let mut safe_task = Arc::new(RwLock::new(Task::<R> {
196 id: id.clone(),
197 finished: false,
198 result: None,
199 }));
200 self.tasks.insert(id.clone(), safe_task.clone());
201
202 let task_wrapper = async move || {
203 // Run the task
204 let result = task.await;
205
206 // Cleanup task
207 safe_task.write().await.result = Some(result);
208 safe_task.write().await.finished = true;
209 };
210
211 let rt = rumtk_init_threads!(&self.workers);
212 rumtk_spawn_task!(rt, task_wrapper());
213
214 id
215 }
216
217 ///
218 /// This method waits until all queued tasks have been processed from the main queue.
219 ///
220 /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
221 ///
222 /// Upon completion,
223 ///
224 /// 1. We collect the results generated (if any).
225 /// 2. We reset the main task and result internal queue states.
226 /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
227 ///
228 /// This operation consumes all the tasks.
229 ///
230 /// ### Note:
231 /// ```text
232 /// Results returned here are not guaranteed to be in the same order as the order in which
233 /// the tasks were queued for work. You will need to pass a type as T that automatically
234 /// tracks its own id or has a way for you to resort results.
235 /// ```
236 pub fn wait(&mut self) -> TaskResults<R> {
237 let task_batch = self.tasks.keys().cloned().collect::<Vec<_>>();
238 self.wait_on_batch(&task_batch)
239 }
240
241 pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
242 let rt = rumtk_init_threads!(&self.workers);
243 let results = rumtk_resolve_task!(rt, self.wait_on_batch_async(&tasks));
244 results
245 }
246
247 pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
248 let rt = rumtk_init_threads!(&self.workers);
249 let result = rumtk_resolve_task!(rt, self.wait_on_async(&task_id));
250 result
251 }
252
253 ///
254 /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
255 ///
256 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
257 ///
258 /// Upon completion,
259 ///
260 /// 2. Return the result ([TaskResults<R>](TaskResults)).
261 ///
262 /// This operation consumes the task.
263 ///
264 /// ### Note:
265 /// ```text
266 /// Results returned here are not guaranteed to be in the same order as the order in which
267 /// the tasks were queued for work. You will need to pass a type as T that automatically
268 /// tracks its own id or has a way for you to resort results.
269 /// ```
270 pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
271 let task = match self.tasks.remove(task_id) {
272 Some(task) => task.clone(),
273 None => return Err(rumtk_format!("No task with id {}", task_id)),
274 };
275
276 while !task.read().await.finished {
277 async_sleep(DEFAULT_SLEEP_DURATION).await;
278 }
279
280 let x = Ok(Arc::new(task.write().await.clone()));
281 x
282 }
283
284 ///
285 /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
286 ///
287 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
288 ///
289 /// Upon completion,
290 ///
291 /// 1. We collect the results generated (if any).
292 /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
293 ///
294 /// ### Note:
295 /// ```text
296 /// Results returned here are not guaranteed to be in the same order as the order in which
297 /// the tasks were queued for work. You will need to pass a type as T that automatically
298 /// tracks its own id or has a way for you to resort results.
299 /// ```
300 pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
301 let mut results = TaskResults::<R>::default();
302 for task in tasks {
303 results.push(self.wait_on_async(task).await);
304 }
305 results
306 }
307
308 ///
309 /// Check if all work has been completed from the task queue.
310 ///
311 /// ## Examples
312 ///
313 /// ### Sync Usage
314 ///
315 ///```
316 /// use rumtk_core::threading::threading_manager::TaskManager;
317 ///
318 /// let manager = TaskManager::<usize>::new(&4).unwrap();
319 ///
320 /// let all_done = manager.is_all_completed();
321 ///
322 /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
323 ///
324 /// ```
325 ///
326 pub fn is_all_completed(&self) -> bool {
327 let rt = rumtk_init_threads!(&self.workers);
328 rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
329 }
330
331 pub async fn is_all_completed_async(&self) -> bool {
332 for (_, task) in self.tasks.iter() {
333 if !task.read().await.finished {
334 return false;
335 }
336 }
337
338 true
339 }
340
341 ///
342 /// Reset task queue and results queue states.
343 ///
344 pub fn reset(&mut self) {
345 self.tasks.clear();
346 }
347
348 ///
349 /// Alias for [wait](TaskManager::wait).
350 ///
351 fn gather(&mut self) -> TaskResults<R> {
352 self.wait()
353 }
354 }
355}
356
357///
358/// This module contains a few helper.
359///
360/// For example, you can find a function for determining number of threads available in system.
361/// The sleep family of functions are also here.
362///
363pub mod threading_functions {
364 use num_cpus;
365 use std::thread::{available_parallelism, sleep as std_sleep};
366 use std::time::Duration;
367 use tokio::time::sleep as tokio_sleep;
368
369 pub const NANOS_PER_SEC: u64 = 1000000000;
370 pub const MILLIS_PER_SEC: u64 = 1000;
371 pub const MICROS_PER_SEC: u64 = 1000000;
372
373 pub fn get_default_system_thread_count() -> usize {
374 let cpus: usize = num_cpus::get();
375 let parallelism = match available_parallelism() {
376 Ok(n) => n.get(),
377 Err(_) => 0,
378 };
379
380 if parallelism >= cpus {
381 parallelism
382 } else {
383 cpus
384 }
385 }
386
387 pub fn sleep(s: f32) {
388 let ns = s * NANOS_PER_SEC as f32;
389 let rounded_ns = ns.round() as u64;
390 let duration = Duration::from_nanos(rounded_ns);
391 std_sleep(duration);
392 }
393
394 pub async fn async_sleep(s: f32) {
395 let ns = s * NANOS_PER_SEC as f32;
396 let rounded_ns = ns.round() as u64;
397 let duration = Duration::from_nanos(rounded_ns);
398 tokio_sleep(duration).await;
399 }
400}
401
402///
403/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
404/// This means that by default, all jobs sent to the thread pool have to be async in nature.
405/// These macros make handling of these jobs at the sync/async boundary more convenient.
406///
407pub mod threading_macros {
408 use crate::threading::thread_primitives;
409 use crate::threading::threading_manager::SafeTaskArgs;
410
411 ///
412 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
413 /// will be saved to the global context so the next call to this macro will simply grab a
414 /// reference to the previously initialized runtime.
415 ///
416 /// Passing nothing will default to initializing a runtime using the default number of threads
417 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
418 ///
419 /// Passing `threads` number will yield a runtime that allocates that many threads.
420 ///
421 ///
422 /// ## Examples
423 ///
424 /// ```
425 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
426 /// use rumtk_core::core::RUMResult;
427 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
428 ///
429 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
430 /// let mut result = Vec::<i32>::new();
431 /// for arg in args.read().await.iter() {
432 /// result.push(*arg);
433 /// }
434 /// Ok(result)
435 /// }
436 ///
437 /// let rt = rumtk_init_threads!(); // Creates runtime instance
438 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
439 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
440 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
441 /// ```
442 ///
443 /// ```
444 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
445 /// use rumtk_core::core::RUMResult;
446 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
447 ///
448 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
449 /// let mut result = Vec::<i32>::new();
450 /// for arg in args.read().await.iter() {
451 /// result.push(*arg);
452 /// }
453 /// Ok(result)
454 /// }
455 ///
456 /// let thread_count: usize = 10;
457 /// let rt = rumtk_init_threads!(&thread_count);
458 /// let args = rumtk_create_task_args!(1);
459 /// let task = rumtk_create_task!(test, args);
460 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
461 /// ```
462 #[macro_export]
463 macro_rules! rumtk_init_threads {
464 ( ) => {{
465 use $crate::rumtk_cache_fetch;
466 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
467 use $crate::threading::threading_functions::get_default_system_thread_count;
468 let rt = rumtk_cache_fetch!(
469 &mut RT_CACHE,
470 &get_default_system_thread_count(),
471 init_cache
472 );
473 rt
474 }};
475 ( $threads:expr ) => {{
476 use $crate::rumtk_cache_fetch;
477 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
478 let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
479 rt
480 }};
481 }
482
483 ///
484 /// Puts task onto the runtime queue.
485 ///
486 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
487 ///
488 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
489 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
490 ///
491 #[macro_export]
492 macro_rules! rumtk_spawn_task {
493 ( $func:expr ) => {{
494 let rt = rumtk_init_threads!();
495 rt.spawn($func)
496 }};
497 ( $rt:expr, $func:expr ) => {{
498 $rt.spawn($func)
499 }};
500 }
501
502 ///
503 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
504 ///
505 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
506 /// async closure without passing any arguments.
507 ///
508 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
509 /// In such a case, we pass those arguments to the call on the async closure and await on results.
510 ///
511 #[macro_export]
512 macro_rules! rumtk_wait_on_task {
513 ( $rt:expr, $func:expr ) => {{
514 $rt.block_on(async move {
515 $func().await
516 })
517 }};
518 ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
519 $rt.block_on(async move {
520 $func($($arg_items),+).await
521 })
522 }};
523 }
524
525 ///
526 /// This macro awaits a future.
527 ///
528 /// The arguments are a reference to the runtime (`rt) and a future.
529 ///
530 /// If there is a result, you will get the result of the future.
531 ///
532 /// ## Examples
533 ///
534 /// ```
535 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
536 /// use rumtk_core::core::RUMResult;
537 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
538 ///
539 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
540 /// let mut result = Vec::<i32>::new();
541 /// for arg in args.read().await.iter() {
542 /// result.push(*arg);
543 /// }
544 /// Ok(result)
545 /// }
546 ///
547 /// let rt = rumtk_init_threads!();
548 /// let args = rumtk_create_task_args!(1);
549 /// let task = rumtk_create_task!(test, args);
550 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
551 /// ```
552 ///
553 #[macro_export]
554 macro_rules! rumtk_resolve_task {
555 ( $rt:expr, $future:expr ) => {{
556 use $crate::strings::rumtk_format;
557 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
558 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
559 // is technically moved into the async closure and captured so things like mutex guards
560 // technically go out of the outer scope. As a result that expression fails to compile even
561 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
562 // into the async closure. To ensure that happens regardless of given expression, we do
563 // a variable assignment below to force the "future" macro expressions to resolve before
564 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
565 let future = $future;
566 $rt.block_on(async move { future.await })
567 }};
568 }
569
570 ///
571 /// This macro creates an async body that calls the async closure and awaits it.
572 ///
573 /// ## Example
574 ///
575 /// ```
576 /// use std::sync::{Arc, RwLock};
577 /// use tokio::sync::RwLock as AsyncRwLock;
578 /// use rumtk_core::strings::RUMString;
579 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
580 ///
581 /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
582 /// let expected = vec![
583 /// RUMString::from("Hello"),
584 /// RUMString::from("World!"),
585 /// RUMString::from("Overcast"),
586 /// RUMString::from("and"),
587 /// RUMString::from("Sad"),
588 /// ];
589 /// let locked_args = AsyncRwLock::new(expected.clone());
590 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
591 ///
592 ///
593 /// ```
594 ///
595 #[macro_export]
596 macro_rules! rumtk_create_task {
597 ( $func:expr ) => {{
598 async move {
599 let f = $func;
600 f().await
601 }
602 }};
603 ( $func:expr, $args:expr ) => {{
604 let f = $func;
605 async move { f(&$args).await }
606 }};
607 }
608
609 ///
610 /// Creates an instance of [SafeTaskArgs] with the arguments passed.
611 ///
612 /// ## Note
613 ///
614 /// All arguments must be of the same type
615 ///
616 #[macro_export]
617 macro_rules! rumtk_create_task_args {
618 ( ) => {{
619 use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
620 use tokio::sync::RwLock;
621 SafeTaskArgs::new(RwLock::new(vec![]))
622 }};
623 ( $($args:expr),+ ) => {{
624 use $crate::threading::threading_manager::{SafeTaskArgs};
625 use tokio::sync::RwLock;
626 SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
627 }};
628 }
629
630 ///
631 /// Convenience macro for packaging the task components and launching the task in one line.
632 ///
633 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
634 /// number of threads at the end. This is optional. Meaning, we will default to the system's
635 /// number of threads if that value is not specified.
636 ///
637 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
638 /// variable number of arguments to pass to the task. each argument must be of the same type.
639 /// If you wish to pass different arguments with different types, please define an abstract type
640 /// whose underlying structure is a tuple of items and pass that instead.
641 ///
642 /// ## Examples
643 ///
644 /// ### With Default Thread Count
645 /// ```
646 /// use rumtk_core::{rumtk_exec_task};
647 /// use rumtk_core::core::RUMResult;
648 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
649 ///
650 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
651 /// let mut result = Vec::<i32>::new();
652 /// for arg in args.read().await.iter() {
653 /// result.push(*arg);
654 /// }
655 /// Ok(result)
656 /// }
657 ///
658 /// let result = rumtk_exec_task!(test, vec![5]);
659 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
660 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
661 /// ```
662 ///
663 /// ### With Custom Thread Count
664 /// ```
665 /// use rumtk_core::{rumtk_exec_task};
666 /// use rumtk_core::core::RUMResult;
667 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
668 ///
669 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
670 /// let mut result = Vec::<i32>::new();
671 /// for arg in args.read().await.iter() {
672 /// result.push(*arg);
673 /// }
674 /// Ok(result)
675 /// }
676 ///
677 /// let result = rumtk_exec_task!(test, vec![5], 5);
678 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
679 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
680 /// ```
681 ///
682 /// ### With Async Function Body
683 /// ```
684 /// use rumtk_core::{rumtk_exec_task};
685 /// use rumtk_core::core::RUMResult;
686 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
687 ///
688 /// let result = rumtk_exec_task!(
689 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
690 /// let mut result = Vec::<i32>::new();
691 /// for arg in args.read().await.iter() {
692 /// result.push(*arg);
693 /// }
694 /// Ok(result)
695 /// },
696 /// vec![5]);
697 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
698 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
699 /// ```
700 ///
701 /// ### With Async Function Body and No Args
702 /// ```
703 /// use rumtk_core::{rumtk_exec_task};
704 /// use rumtk_core::core::RUMResult;
705 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
706 ///
707 /// let result = rumtk_exec_task!(
708 /// async || -> RUMResult<Vec<i32>> {
709 /// let mut result = Vec::<i32>::new();
710 /// Ok(result)
711 /// });
712 /// let empty = Vec::<i32>::new();
713 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
714 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
715 /// ```
716 ///
717 /// ## Equivalent To
718 ///
719 /// ```
720 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
721 /// use rumtk_core::core::RUMResult;
722 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
723 ///
724 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
725 /// let mut result = Vec::<i32>::new();
726 /// for arg in args.read().await.iter() {
727 /// result.push(*arg);
728 /// }
729 /// Ok(result)
730 /// }
731 ///
732 /// let rt = rumtk_init_threads!();
733 /// let args = rumtk_create_task_args!(1);
734 /// let task = rumtk_create_task!(test, args);
735 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
736 /// ```
737 ///
738 #[macro_export]
739 macro_rules! rumtk_exec_task {
740 ($func:expr ) => {{
741 use tokio::sync::RwLock;
742 use $crate::{
743 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
744 };
745 let rt = rumtk_init_threads!();
746 let task = rumtk_create_task!($func);
747 rumtk_resolve_task!(&rt, task)
748 }};
749 ($func:expr, $args:expr ) => {{
750 use tokio::sync::RwLock;
751 use $crate::{
752 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
753 };
754 let rt = rumtk_init_threads!();
755 let args = SafeTaskArgs::new(RwLock::new($args));
756 let task = rumtk_create_task!($func, args);
757 rumtk_resolve_task!(&rt, task)
758 }};
759 ($func:expr, $args:expr , $threads:expr ) => {{
760 use tokio::sync::RwLock;
761 use $crate::{
762 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
763 };
764 let rt = rumtk_init_threads!(&$threads);
765 let args = SafeTaskArgs::new(RwLock::new($args));
766 let task = rumtk_create_task!($func, args);
767 rumtk_resolve_task!(&rt, task)
768 }};
769 }
770
771 ///
772 /// Sleep a duration of time in a sync context, so no await can be call on the result.
773 ///
774 /// You can pass any value that can be cast to f32.
775 ///
776 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
777 ///
778 /// ## Examples
779 ///
780 /// ```
781 /// use rumtk_core::rumtk_sleep;
782 /// rumtk_sleep!(1); // Sleeps for 1 second.
783 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
784 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
785 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
786 /// ```
787 ///
788 #[macro_export]
789 macro_rules! rumtk_sleep {
790 ( $dur:expr) => {{
791 use $crate::threading::threading_functions::sleep;
792 sleep($dur as f32)
793 }};
794 }
795
796 ///
797 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
798 ///
799 /// You can pass any value that can be cast to f32.
800 ///
801 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
802 ///
803 /// ## Examples
804 ///
805 /// ```
806 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
807 /// use rumtk_core::core::RUMResult;
808 /// rumtk_exec_task!( async || -> RUMResult<()> {
809 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
810 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
811 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
812 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
813 /// Ok(())
814 /// }
815 /// );
816 /// ```
817 ///
818 #[macro_export]
819 macro_rules! rumtk_async_sleep {
820 ( $dur:expr) => {{
821 use $crate::threading::threading_functions::async_sleep;
822 async_sleep($dur as f32)
823 }};
824 }
825
826 ///
827 ///
828 ///
829 #[macro_export]
830 macro_rules! rumtk_new_task_queue {
831 ( $worker_num:expr ) => {{
832 use $crate::threading::threading_manager::TaskManager;
833 TaskManager::new($worker_num);
834 }};
835 }
836}