rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26 use crate::cache::{new_cache, LazyRUMCache};
27 use std::sync::Arc;
28 use tokio::runtime::Runtime as TokioRuntime;
29 /**************************** Globals **************************************/
30 pub static mut RT_CACHE: TokioRtCache = new_cache();
31 /**************************** Helpers ***************************************/
32 pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
33 let mut builder = tokio::runtime::Builder::new_multi_thread();
34 builder.worker_threads(*threads);
35 builder.enable_all();
36 match builder.build() {
37 Ok(handle) => Arc::new(handle),
38 Err(e) => panic!(
39 "Unable to initialize threading tokio runtime because {}!",
40 &e
41 ),
42 }
43 }
44
45 /**************************** Types ***************************************/
46 pub type SafeTokioRuntime = Arc<TokioRuntime>;
47 pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
48}
49
50pub mod threading_manager {
51 use crate::cache::LazyRUMCacheValue;
52 use crate::core::{RUMResult, RUMVec};
53 use crate::strings::rumtk_format;
54 use crate::threading::thread_primitives::SafeTokioRuntime;
55 use crate::threading::threading_functions::async_sleep;
56 use crate::types::{RUMHashMap, RUMID};
57 use crate::{rumtk_init_threads, rumtk_resolve_task, threading};
58 use std::future::Future;
59 use std::sync::Arc;
60 use tokio::sync::RwLock;
61 use tokio::task::JoinHandle;
62
63 const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64 const DEFAULT_TASK_CAPACITY: usize = 1024;
65
66 pub type TaskItems<T> = RUMVec<T>;
67 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68 pub type TaskArgs<T> = TaskItems<T>;
69 /// Function signature defining the interface of task processing logic.
70 pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
71 pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
72 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74 pub type TaskID = RUMID;
75
76 #[derive(Debug, Clone, Default)]
77 pub struct Task<R> {
78 pub id: TaskID,
79 pub finished: bool,
80 pub result: Option<R>,
81 }
82
83 pub type SafeTask<R> = Arc<Task<R>>;
84 type SafeInternalTask<R> = Arc<RwLock<Task<R>>>;
85 pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86 pub type SafeAsyncTaskTable<R> = Arc<RwLock<TaskTable<R>>>;
87 pub type TaskBatch = RUMVec<TaskID>;
88 /// Type to use to define how task results are expected to be returned.
89 pub type TaskResult<R> = RUMResult<SafeTask<R>>;
90 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
91 pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
92
93 ///
94 /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
95 /// gives the multithreading, asynchronous superpowers to synchronous logic.
96 ///
97 /// ## Example Usage
98 ///
99 /// ```
100 /// use std::sync::{Arc};
101 /// use tokio::sync::RwLock as AsyncRwLock;
102 /// use rumtk_core::core::RUMResult;
103 /// use rumtk_core::strings::RUMString;
104 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
105 /// use rumtk_core::{rumtk_create_task, };
106 ///
107 /// let expected = vec![
108 /// RUMString::from("Hello"),
109 /// RUMString::from("World!"),
110 /// RUMString::from("Overcast"),
111 /// RUMString::from("and"),
112 /// RUMString::from("Sad"),
113 /// ];
114 ///
115 /// type TestResult = RUMResult<Vec<RUMString>>;
116 /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
117 ///
118 /// let locked_args = AsyncRwLock::new(expected.clone());
119 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
120 /// let processor = rumtk_create_task!(
121 /// async |args: &SafeTaskArgs<RUMString>| -> TestResult {
122 /// let owned_args = Arc::clone(args);
123 /// let locked_args = owned_args.read().await;
124 /// let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
125 ///
126 /// for arg in locked_args.iter() {
127 /// results.push(RUMString::new(arg));
128 /// }
129 ///
130 /// Ok(results)
131 /// },
132 /// task_args
133 /// );
134 ///
135 /// queue.add_task::<_>(processor);
136 /// let results = queue.wait();
137 ///
138 /// let mut result_data = Vec::<RUMString>::with_capacity(5);
139 /// for r in results {
140 /// for v in r.unwrap().result.clone().unwrap().iter() {
141 /// for value in v.iter() {
142 /// result_data.push(value.clone());
143 /// }
144 /// }
145 /// }
146 ///
147 /// assert_eq!(result_data, expected, "Results do not match expected!");
148 ///
149 /// ```
150 ///
151 #[derive(Debug, Clone, Default)]
152 pub struct TaskManager<R> {
153 tasks: SafeAsyncTaskTable<R>,
154 workers: usize,
155 }
156
157 impl<R> TaskManager<R>
158 where
159 R: Sync + Send + Clone + 'static,
160 {
161 ///
162 /// This method creates a [`TaskQueue`] instance using sensible defaults.
163 ///
164 /// The `threads` field is computed from the number of cores present in system.
165 ///
166 pub fn default() -> RUMResult<TaskManager<R>> {
167 Self::new(&threading::threading_functions::get_default_system_thread_count())
168 }
169
170 ///
171 /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
172 /// Expects you to provide the count of threads to spawn and the microtask queue size
173 /// allocated by each thread.
174 ///
175 /// This method calls [`Self::with_capacity()`] for the actual object creation.
176 /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
177 ///
178 pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
179 let tasks = SafeAsyncTaskTable::<R>::new(RwLock::new(TaskTable::with_capacity(
180 DEFAULT_TASK_CAPACITY,
181 )));
182 Ok(TaskManager::<R> {
183 tasks,
184 workers: worker_num.to_owned(),
185 })
186 }
187
188 ///
189 /// Add a task to the processing queue. The idea is that you can queue a processor function
190 /// and list of args that will be picked up by one of the threads for processing.
191 ///
192 /// This is the async counterpart
193 ///
194 pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
195 where
196 F: Future<Output = R> + Send + Sync + 'static,
197 F::Output: Send + Sized + 'static,
198 {
199 let id = TaskID::new_v4();
200 let mut safe_task = Arc::new(RwLock::new(Task::<R> {
201 id: id.clone(),
202 finished: false,
203 result: None,
204 }));
205 self.tasks
206 .write()
207 .await
208 .insert(id.clone(), safe_task.clone());
209
210 let task_wrapper = async move || {
211 // Run the task
212 let result = task.await;
213
214 // Cleanup task
215 safe_task.write().await.result = Some(result);
216 safe_task.write().await.finished = true;
217 };
218
219 tokio::spawn(task_wrapper());
220
221 id
222 }
223
224 ///
225 /// See [add_task](Self::add_task)
226 ///
227 pub fn add_task<F>(&mut self, task: F) -> TaskID
228 where
229 F: Future<Output = R> + Send + Sync + 'static,
230 F::Output: Send + Sized + 'static,
231 {
232 let rt = rumtk_init_threads!(&self.workers);
233 rumtk_resolve_task!(rt, self.add_task_async(task))
234 }
235
236 ///
237 /// See [wait_async](Self::wait_async)
238 ///
239 pub fn wait(&mut self) -> TaskResults<R> {
240 let rt = rumtk_init_threads!(&self.workers);
241 rumtk_resolve_task!(rt, self.wait_async())
242 }
243
244 ///
245 /// See [wait_on_batch_async](Self::wait_on_batch_async)
246 ///
247 pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
248 let rt = rumtk_init_threads!(&self.workers);
249 rumtk_resolve_task!(rt, self.wait_on_batch_async(&tasks))
250 }
251
252 ///
253 /// See [wait_on_async](Self::wait_on_async)
254 ///
255 pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
256 let rt = rumtk_init_threads!(&self.workers);
257 rumtk_resolve_task!(rt, self.wait_on_async(&task_id))
258 }
259
260 ///
261 /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
262 ///
263 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
264 ///
265 /// Upon completion,
266 ///
267 /// 2. Return the result ([TaskResults<R>](TaskResults)).
268 ///
269 /// This operation consumes the task.
270 ///
271 /// ### Note:
272 /// ```text
273 /// Results returned here are not guaranteed to be in the same order as the order in which
274 /// the tasks were queued for work. You will need to pass a type as T that automatically
275 /// tracks its own id or has a way for you to resort results.
276 /// ```
277 pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
278 let task = match self.tasks.write().await.remove(task_id) {
279 Some(task) => task.clone(),
280 None => return Err(rumtk_format!("No task with id {}", task_id)),
281 };
282
283 while !task.read().await.finished {
284 async_sleep(DEFAULT_SLEEP_DURATION).await;
285 }
286
287 let x = Ok(Arc::new(task.write().await.clone()));
288 x
289 }
290
291 ///
292 /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
293 ///
294 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
295 ///
296 /// Upon completion,
297 ///
298 /// 1. We collect the results generated (if any).
299 /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
300 ///
301 /// ### Note:
302 /// ```text
303 /// Results returned here are not guaranteed to be in the same order as the order in which
304 /// the tasks were queued for work. You will need to pass a type as T that automatically
305 /// tracks its own id or has a way for you to resort results.
306 /// ```
307 pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
308 let mut results = TaskResults::<R>::default();
309 for task in tasks {
310 results.push(self.wait_on_async(task).await);
311 }
312 results
313 }
314
315 ///
316 /// This method waits until all queued tasks have been processed from the main queue.
317 ///
318 /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
319 ///
320 /// Upon completion,
321 ///
322 /// 1. We collect the results generated (if any).
323 /// 2. We reset the main task and result internal queue states.
324 /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
325 ///
326 /// This operation consumes all the tasks.
327 ///
328 /// ### Note:
329 /// ```text
330 /// Results returned here are not guaranteed to be in the same order as the order in which
331 /// the tasks were queued for work. You will need to pass a type as T that automatically
332 /// tracks its own id or has a way for you to resort results.
333 /// ```
334 pub async fn wait_async(&mut self) -> TaskResults<R> {
335 let task_batch = self.tasks.read().await.keys().cloned().collect::<Vec<_>>();
336 self.wait_on_batch_async(&task_batch).await
337 }
338
339 ///
340 /// Check if all work has been completed from the task queue.
341 ///
342 /// ## Examples
343 ///
344 /// ### Sync Usage
345 ///
346 ///```
347 /// use rumtk_core::threading::threading_manager::TaskManager;
348 ///
349 /// let manager = TaskManager::<usize>::new(&4).unwrap();
350 ///
351 /// let all_done = manager.is_all_completed();
352 ///
353 /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
354 ///
355 /// ```
356 ///
357 pub fn is_all_completed(&self) -> bool {
358 let rt = rumtk_init_threads!(&self.workers);
359 rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
360 }
361
362 pub async fn is_all_completed_async(&self) -> bool {
363 for (_, task) in self.tasks.read().await.iter() {
364 if !task.read().await.finished {
365 return false;
366 }
367 }
368
369 true
370 }
371
372 ///
373 /// Alias for [wait](TaskManager::wait).
374 ///
375 fn gather(&mut self) -> TaskResults<R> {
376 self.wait()
377 }
378 }
379}
380
381///
382/// This module contains a few helper.
383///
384/// For example, you can find a function for determining number of threads available in system.
385/// The sleep family of functions are also here.
386///
387pub mod threading_functions {
388 use num_cpus;
389 use std::thread::{available_parallelism, sleep as std_sleep};
390 use std::time::Duration;
391 use tokio::time::sleep as tokio_sleep;
392
393 pub const NANOS_PER_SEC: u64 = 1000000000;
394 pub const MILLIS_PER_SEC: u64 = 1000;
395 pub const MICROS_PER_SEC: u64 = 1000000;
396
397 pub fn get_default_system_thread_count() -> usize {
398 let cpus: usize = num_cpus::get();
399 let parallelism = match available_parallelism() {
400 Ok(n) => n.get(),
401 Err(_) => 0,
402 };
403
404 if parallelism >= cpus {
405 parallelism
406 } else {
407 cpus
408 }
409 }
410
411 pub fn sleep(s: f32) {
412 let ns = s * NANOS_PER_SEC as f32;
413 let rounded_ns = ns.round() as u64;
414 let duration = Duration::from_nanos(rounded_ns);
415 std_sleep(duration);
416 }
417
418 pub async fn async_sleep(s: f32) {
419 let ns = s * NANOS_PER_SEC as f32;
420 let rounded_ns = ns.round() as u64;
421 let duration = Duration::from_nanos(rounded_ns);
422 tokio_sleep(duration).await;
423 }
424}
425
426///
427/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
428/// This means that by default, all jobs sent to the thread pool have to be async in nature.
429/// These macros make handling of these jobs at the sync/async boundary more convenient.
430///
431pub mod threading_macros {
432 use crate::threading::thread_primitives;
433 use crate::threading::threading_manager::SafeTaskArgs;
434
435 ///
436 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
437 /// will be saved to the global context so the next call to this macro will simply grab a
438 /// reference to the previously initialized runtime.
439 ///
440 /// Passing nothing will default to initializing a runtime using the default number of threads
441 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
442 ///
443 /// Passing `threads` number will yield a runtime that allocates that many threads.
444 ///
445 ///
446 /// ## Examples
447 ///
448 /// ```
449 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
450 /// use rumtk_core::core::RUMResult;
451 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
452 ///
453 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
454 /// let mut result = Vec::<i32>::new();
455 /// for arg in args.read().await.iter() {
456 /// result.push(*arg);
457 /// }
458 /// Ok(result)
459 /// }
460 ///
461 /// let rt = rumtk_init_threads!(); // Creates runtime instance
462 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
463 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
464 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
465 /// ```
466 ///
467 /// ```
468 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
469 /// use rumtk_core::core::RUMResult;
470 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
471 ///
472 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
473 /// let mut result = Vec::<i32>::new();
474 /// for arg in args.read().await.iter() {
475 /// result.push(*arg);
476 /// }
477 /// Ok(result)
478 /// }
479 ///
480 /// let thread_count: usize = 10;
481 /// let rt = rumtk_init_threads!(&thread_count);
482 /// let args = rumtk_create_task_args!(1);
483 /// let task = rumtk_create_task!(test, args);
484 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
485 /// ```
486 #[macro_export]
487 macro_rules! rumtk_init_threads {
488 ( ) => {{
489 use $crate::rumtk_cache_fetch;
490 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
491 use $crate::threading::threading_functions::get_default_system_thread_count;
492 let rt = rumtk_cache_fetch!(
493 &mut RT_CACHE,
494 &get_default_system_thread_count(),
495 init_cache
496 );
497 rt
498 }};
499 ( $threads:expr ) => {{
500 use $crate::rumtk_cache_fetch;
501 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
502 let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
503 rt
504 }};
505 }
506
507 ///
508 /// Puts task onto the runtime queue.
509 ///
510 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
511 ///
512 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
513 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
514 ///
515 #[macro_export]
516 macro_rules! rumtk_spawn_task {
517 ( $func:expr ) => {{
518 let rt = rumtk_init_threads!();
519 rt.spawn($func)
520 }};
521 ( $rt:expr, $func:expr ) => {{
522 $rt.spawn($func)
523 }};
524 }
525
526 ///
527 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
528 ///
529 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
530 /// async closure without passing any arguments.
531 ///
532 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
533 /// In such a case, we pass those arguments to the call on the async closure and await on results.
534 ///
535 #[macro_export]
536 macro_rules! rumtk_wait_on_task {
537 ( $rt:expr, $func:expr ) => {{
538 $rt.block_on(async move {
539 $func().await
540 })
541 }};
542 ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
543 $rt.block_on(async move {
544 $func($($arg_items),+).await
545 })
546 }};
547 }
548
549 ///
550 /// This macro awaits a future.
551 ///
552 /// The arguments are a reference to the runtime (`rt) and a future.
553 ///
554 /// If there is a result, you will get the result of the future.
555 ///
556 /// ## Examples
557 ///
558 /// ```
559 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
560 /// use rumtk_core::core::RUMResult;
561 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
562 ///
563 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
564 /// let mut result = Vec::<i32>::new();
565 /// for arg in args.read().await.iter() {
566 /// result.push(*arg);
567 /// }
568 /// Ok(result)
569 /// }
570 ///
571 /// let rt = rumtk_init_threads!();
572 /// let args = rumtk_create_task_args!(1);
573 /// let task = rumtk_create_task!(test, args);
574 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
575 /// ```
576 ///
577 #[macro_export]
578 macro_rules! rumtk_resolve_task {
579 ( $rt:expr, $future:expr ) => {{
580 use $crate::strings::rumtk_format;
581 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
582 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
583 // is technically moved into the async closure and captured so things like mutex guards
584 // technically go out of the outer scope. As a result that expression fails to compile even
585 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
586 // into the async closure. To ensure that happens regardless of given expression, we do
587 // a variable assignment below to force the "future" macro expressions to resolve before
588 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
589 let future = $future;
590 $rt.block_on(async move { future.await })
591 }};
592 }
593
594 ///
595 /// This macro creates an async body that calls the async closure and awaits it.
596 ///
597 /// ## Example
598 ///
599 /// ```
600 /// use std::sync::{Arc, RwLock};
601 /// use tokio::sync::RwLock as AsyncRwLock;
602 /// use rumtk_core::strings::RUMString;
603 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
604 ///
605 /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
606 /// let expected = vec![
607 /// RUMString::from("Hello"),
608 /// RUMString::from("World!"),
609 /// RUMString::from("Overcast"),
610 /// RUMString::from("and"),
611 /// RUMString::from("Sad"),
612 /// ];
613 /// let locked_args = AsyncRwLock::new(expected.clone());
614 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
615 ///
616 ///
617 /// ```
618 ///
619 #[macro_export]
620 macro_rules! rumtk_create_task {
621 ( $func:expr ) => {{
622 async move {
623 let f = $func;
624 f().await
625 }
626 }};
627 ( $func:expr, $args:expr ) => {{
628 let f = $func;
629 async move { f(&$args).await }
630 }};
631 }
632
633 ///
634 /// Creates an instance of [SafeTaskArgs] with the arguments passed.
635 ///
636 /// ## Note
637 ///
638 /// All arguments must be of the same type
639 ///
640 #[macro_export]
641 macro_rules! rumtk_create_task_args {
642 ( ) => {{
643 use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
644 use tokio::sync::RwLock;
645 SafeTaskArgs::new(RwLock::new(vec![]))
646 }};
647 ( $($args:expr),+ ) => {{
648 use $crate::threading::threading_manager::{SafeTaskArgs};
649 use tokio::sync::RwLock;
650 SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
651 }};
652 }
653
654 ///
655 /// Convenience macro for packaging the task components and launching the task in one line.
656 ///
657 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
658 /// number of threads at the end. This is optional. Meaning, we will default to the system's
659 /// number of threads if that value is not specified.
660 ///
661 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
662 /// variable number of arguments to pass to the task. each argument must be of the same type.
663 /// If you wish to pass different arguments with different types, please define an abstract type
664 /// whose underlying structure is a tuple of items and pass that instead.
665 ///
666 /// ## Examples
667 ///
668 /// ### With Default Thread Count
669 /// ```
670 /// use rumtk_core::{rumtk_exec_task};
671 /// use rumtk_core::core::RUMResult;
672 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
673 ///
674 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
675 /// let mut result = Vec::<i32>::new();
676 /// for arg in args.read().await.iter() {
677 /// result.push(*arg);
678 /// }
679 /// Ok(result)
680 /// }
681 ///
682 /// let result = rumtk_exec_task!(test, vec![5]);
683 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
684 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
685 /// ```
686 ///
687 /// ### With Custom Thread Count
688 /// ```
689 /// use rumtk_core::{rumtk_exec_task};
690 /// use rumtk_core::core::RUMResult;
691 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
692 ///
693 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
694 /// let mut result = Vec::<i32>::new();
695 /// for arg in args.read().await.iter() {
696 /// result.push(*arg);
697 /// }
698 /// Ok(result)
699 /// }
700 ///
701 /// let result = rumtk_exec_task!(test, vec![5], 5);
702 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
703 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
704 /// ```
705 ///
706 /// ### With Async Function Body
707 /// ```
708 /// use rumtk_core::{rumtk_exec_task};
709 /// use rumtk_core::core::RUMResult;
710 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
711 ///
712 /// let result = rumtk_exec_task!(
713 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
714 /// let mut result = Vec::<i32>::new();
715 /// for arg in args.read().await.iter() {
716 /// result.push(*arg);
717 /// }
718 /// Ok(result)
719 /// },
720 /// vec![5]);
721 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
722 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
723 /// ```
724 ///
725 /// ### With Async Function Body and No Args
726 /// ```
727 /// use rumtk_core::{rumtk_exec_task};
728 /// use rumtk_core::core::RUMResult;
729 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
730 ///
731 /// let result = rumtk_exec_task!(
732 /// async || -> RUMResult<Vec<i32>> {
733 /// let mut result = Vec::<i32>::new();
734 /// Ok(result)
735 /// });
736 /// let empty = Vec::<i32>::new();
737 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
738 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
739 /// ```
740 ///
741 /// ## Equivalent To
742 ///
743 /// ```
744 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
745 /// use rumtk_core::core::RUMResult;
746 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
747 ///
748 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
749 /// let mut result = Vec::<i32>::new();
750 /// for arg in args.read().await.iter() {
751 /// result.push(*arg);
752 /// }
753 /// Ok(result)
754 /// }
755 ///
756 /// let rt = rumtk_init_threads!();
757 /// let args = rumtk_create_task_args!(1);
758 /// let task = rumtk_create_task!(test, args);
759 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
760 /// ```
761 ///
762 #[macro_export]
763 macro_rules! rumtk_exec_task {
764 ($func:expr ) => {{
765 use tokio::sync::RwLock;
766 use $crate::{
767 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
768 };
769 let rt = rumtk_init_threads!();
770 let task = rumtk_create_task!($func);
771 rumtk_resolve_task!(&rt, task)
772 }};
773 ($func:expr, $args:expr ) => {{
774 use tokio::sync::RwLock;
775 use $crate::{
776 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
777 };
778 let rt = rumtk_init_threads!();
779 let args = SafeTaskArgs::new(RwLock::new($args));
780 let task = rumtk_create_task!($func, args);
781 rumtk_resolve_task!(&rt, task)
782 }};
783 ($func:expr, $args:expr , $threads:expr ) => {{
784 use tokio::sync::RwLock;
785 use $crate::{
786 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
787 };
788 let rt = rumtk_init_threads!(&$threads);
789 let args = SafeTaskArgs::new(RwLock::new($args));
790 let task = rumtk_create_task!($func, args);
791 rumtk_resolve_task!(&rt, task)
792 }};
793 }
794
795 ///
796 /// Sleep a duration of time in a sync context, so no await can be call on the result.
797 ///
798 /// You can pass any value that can be cast to f32.
799 ///
800 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
801 ///
802 /// ## Examples
803 ///
804 /// ```
805 /// use rumtk_core::rumtk_sleep;
806 /// rumtk_sleep!(1); // Sleeps for 1 second.
807 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
808 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
809 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
810 /// ```
811 ///
812 #[macro_export]
813 macro_rules! rumtk_sleep {
814 ( $dur:expr) => {{
815 use $crate::threading::threading_functions::sleep;
816 sleep($dur as f32)
817 }};
818 }
819
820 ///
821 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
822 ///
823 /// You can pass any value that can be cast to f32.
824 ///
825 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
826 ///
827 /// ## Examples
828 ///
829 /// ```
830 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
831 /// use rumtk_core::core::RUMResult;
832 /// rumtk_exec_task!( async || -> RUMResult<()> {
833 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
834 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
835 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
836 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
837 /// Ok(())
838 /// }
839 /// );
840 /// ```
841 ///
842 #[macro_export]
843 macro_rules! rumtk_async_sleep {
844 ( $dur:expr) => {{
845 use $crate::threading::threading_functions::async_sleep;
846 async_sleep($dur as f32)
847 }};
848 }
849
850 ///
851 ///
852 ///
853 #[macro_export]
854 macro_rules! rumtk_new_task_queue {
855 ( $worker_num:expr ) => {{
856 use $crate::threading::threading_manager::TaskManager;
857 TaskManager::new($worker_num);
858 }};
859 }
860}