rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26 use crate::cache::{new_cache, LazyRUMCache};
27 use std::sync::Arc;
28 pub use tokio::io;
29 pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
30 use tokio::runtime::Runtime as TokioRuntime;
31 pub use tokio::sync::{
32 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
33 RwLockWriteGuard,
34 };
35 /**************************** Globals **************************************/
36 pub static mut RT_CACHE: TokioRtCache = new_cache();
37 /**************************** Helpers ***************************************/
38 pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
39 let mut builder = tokio::runtime::Builder::new_multi_thread();
40 builder.worker_threads(*threads);
41 builder.enable_all();
42 match builder.build() {
43 Ok(handle) => Arc::new(handle),
44 Err(e) => panic!(
45 "Unable to initialize threading tokio runtime because {}!",
46 &e
47 ),
48 }
49 }
50
51 /**************************** Types ***************************************/
52 pub type SafeTokioRuntime = Arc<TokioRuntime>;
53 pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
54}
55
56pub mod threading_manager {
57 use crate::cache::LazyRUMCacheValue;
58 use crate::core::{RUMResult, RUMVec};
59 use crate::strings::rumtk_format;
60 use crate::threading::thread_primitives::SafeTokioRuntime;
61 use crate::threading::threading_functions::{async_sleep, sleep};
62 use crate::types::{RUMHashMap, RUMID};
63 use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
64 use std::future::Future;
65 use std::sync::Arc;
66 pub use std::sync::RwLock as SyncRwLock;
67 use tokio::sync::RwLock as AsyncRwLock;
68 use tokio::task::JoinHandle;
69
70 const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
71 const DEFAULT_TASK_CAPACITY: usize = 100;
72
73 pub type TaskItems<T> = RUMVec<T>;
74 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
75 pub type TaskArgs<T> = TaskItems<T>;
76 /// Function signature defining the interface of task processing logic.
77 pub type SafeTaskArgs<T> = Arc<AsyncRwLock<TaskItems<T>>>;
78 pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
79 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
80 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
81 pub type TaskID = RUMID;
82
83 #[derive(Debug, Clone, Default)]
84 pub struct Task<R> {
85 pub id: TaskID,
86 pub finished: bool,
87 pub result: Option<R>,
88 }
89
90 pub type SafeTask<R> = Arc<Task<R>>;
91 type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
92 pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
93 pub type SafeAsyncTaskTable<R> = Arc<AsyncRwLock<TaskTable<R>>>;
94 pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
95 pub type TaskBatch = RUMVec<TaskID>;
96 /// Type to use to define how task results are expected to be returned.
97 pub type TaskResult<R> = RUMResult<SafeTask<R>>;
98 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
99 pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
100
101 ///
102 /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
103 /// gives the multithreading, asynchronous superpowers to synchronous logic.
104 ///
105 /// ## Example Usage
106 ///
107 /// ```
108 /// use std::sync::{Arc};
109 /// use tokio::sync::RwLock as AsyncRwLock;
110 /// use rumtk_core::core::RUMResult;
111 /// use rumtk_core::strings::RUMString;
112 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
113 /// use rumtk_core::{rumtk_create_task, };
114 ///
115 /// let expected = vec![
116 /// RUMString::from("Hello"),
117 /// RUMString::from("World!"),
118 /// RUMString::from("Overcast"),
119 /// RUMString::from("and"),
120 /// RUMString::from("Sad"),
121 /// ];
122 ///
123 /// type TestResult = RUMResult<Vec<RUMString>>;
124 /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
125 ///
126 /// let locked_args = AsyncRwLock::new(expected.clone());
127 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
128 /// let processor = rumtk_create_task!(
129 /// async |args: &SafeTaskArgs<RUMString>| -> TestResult {
130 /// let owned_args = Arc::clone(args);
131 /// let locked_args = owned_args.read().await;
132 /// let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
133 ///
134 /// for arg in locked_args.iter() {
135 /// results.push(RUMString::new(arg));
136 /// }
137 ///
138 /// Ok(results)
139 /// },
140 /// task_args
141 /// );
142 ///
143 /// queue.add_task::<_>(processor);
144 /// let results = queue.wait();
145 ///
146 /// let mut result_data = Vec::<RUMString>::with_capacity(5);
147 /// for r in results {
148 /// for v in r.unwrap().result.clone().unwrap().iter() {
149 /// for value in v.iter() {
150 /// result_data.push(value.clone());
151 /// }
152 /// }
153 /// }
154 ///
155 /// assert_eq!(result_data, expected, "Results do not match expected!");
156 ///
157 /// ```
158 ///
159 #[derive(Debug, Clone, Default)]
160 pub struct TaskManager<R> {
161 tasks: SafeSyncTaskTable<R>,
162 workers: usize,
163 }
164
165 impl<R> TaskManager<R>
166 where
167 R: Sync + Send + Clone + 'static,
168 {
169 ///
170 /// This method creates a [`TaskQueue`] instance using sensible defaults.
171 ///
172 /// The `threads` field is computed from the number of cores present in system.
173 ///
174 pub fn default() -> RUMResult<TaskManager<R>> {
175 Self::new(&threading::threading_functions::get_default_system_thread_count())
176 }
177
178 ///
179 /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
180 /// Expects you to provide the count of threads to spawn and the microtask queue size
181 /// allocated by each thread.
182 ///
183 /// This method calls [`Self::with_capacity()`] for the actual object creation.
184 /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
185 ///
186 pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
187 let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
188 DEFAULT_TASK_CAPACITY,
189 )));
190 Ok(TaskManager::<R> {
191 tasks,
192 workers: worker_num.to_owned(),
193 })
194 }
195
196 ///
197 /// Add a task to the processing queue. The idea is that you can queue a processor function
198 /// and list of args that will be picked up by one of the threads for processing.
199 ///
200 /// This is the async counterpart
201 ///
202 pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
203 where
204 F: Future<Output = R> + Send + Sync + 'static,
205 F::Output: Send + Sized + 'static,
206 {
207 let id = TaskID::new_v4();
208 Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
209 }
210
211 ///
212 /// See [add_task](Self::add_task)
213 ///
214 /// Unlike `add_task`, this method does not block which is key to avoiding panicking
215 /// the tokio runtim if trying to add task to queue from a normal function called from an
216 /// async environment.
217 ///
218 /// ## Example
219 ///
220 /// ```
221 /// use rumtk_core::threading::threading_manager::{TaskManager};
222 /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
223 /// use std::sync::{Arc};
224 /// use tokio::sync::Mutex;
225 ///
226 /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
227 ///
228 /// async fn called_fn() -> usize {
229 /// 5
230 /// }
231 ///
232 /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
233 /// manager.spawn_task(called_fn());
234 /// 1
235 /// }
236 ///
237 /// async fn call_sync_fn(mut manager: JobManager) -> usize {
238 /// let mut owned = manager.lock().await;
239 /// push_job(&mut owned)
240 /// }
241 ///
242 /// let workers = 5;
243 /// let rt = rumtk_init_threads!(&workers);
244 /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
245 ///
246 /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
247 ///
248 /// let result_raw = manager.blocking_lock().wait();
249 ///
250 /// ```
251 ///
252 pub fn spawn_task<F>(&mut self, task: F) -> TaskID
253 where
254 F: Future<Output = R> + Send + Sync + 'static,
255 F::Output: Send + Sized + 'static,
256 {
257 let id = TaskID::new_v4();
258 let rt = rumtk_init_threads!(&self.workers);
259 rumtk_spawn_task!(
260 rt,
261 Self::_add_task_async(id.clone(), self.tasks.clone(), task)
262 );
263 id
264 }
265
266 ///
267 /// See [add_task](Self::add_task)
268 ///
269 pub fn add_task<F>(&mut self, task: F) -> TaskID
270 where
271 F: Future<Output = R> + Send + Sync + 'static,
272 F::Output: Send + Sized + 'static,
273 {
274 let rt = rumtk_init_threads!(&self.workers);
275 rumtk_resolve_task!(rt, self.add_task_async(task))
276 }
277
278 async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
279 where
280 F: Future<Output = R> + Send + Sync + 'static,
281 F::Output: Send + Sized + 'static,
282 {
283 let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
284 id: id.clone(),
285 finished: false,
286 result: None,
287 }));
288 tasks.write().unwrap().insert(id.clone(), safe_task.clone());
289
290 let task_wrapper = async move || {
291 // Run the task
292 let result = task.await;
293
294 // Cleanup task
295 let mut lock = safe_task.write().unwrap();
296 lock.result = Some(result);
297 lock.finished = true;
298 };
299
300 tokio::spawn(task_wrapper());
301
302 id
303 }
304
305 ///
306 /// See [wait_async](Self::wait_async)
307 ///
308 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
309 /// this function happens to be called from the async context.
310 ///
311 pub fn wait(&mut self) -> TaskResults<R> {
312 let task_batch = self
313 .tasks
314 .read()
315 .unwrap()
316 .keys()
317 .cloned()
318 .collect::<Vec<_>>();
319 self.wait_on_batch(&task_batch)
320 }
321
322 ///
323 /// See [wait_on_batch_async](Self::wait_on_batch_async)
324 ///
325 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
326 /// this function happens to be called from the async context.
327 ///
328 pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
329 let mut results = TaskResults::<R>::default();
330 for task in tasks {
331 results.push(self.wait_on(task));
332 }
333 results
334 }
335
336 ///
337 /// See [wait_on_async](Self::wait_on_async)
338 ///
339 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
340 /// this function happens to be called from the async context.
341 ///
342 pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
343 let task = match self.tasks.write().unwrap().remove(task_id) {
344 Some(task) => task.clone(),
345 None => return Err(rumtk_format!("No task with id {}", task_id)),
346 };
347
348 while !task.read().unwrap().finished {
349 sleep(DEFAULT_SLEEP_DURATION);
350 }
351
352 let x = Ok(Arc::new(task.write().unwrap().clone()));
353 x
354 }
355
356 ///
357 /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
358 ///
359 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
360 ///
361 /// Upon completion,
362 ///
363 /// 2. Return the result ([TaskResults<R>](TaskResults)).
364 ///
365 /// This operation consumes the task.
366 ///
367 /// ### Note:
368 /// ```text
369 /// Results returned here are not guaranteed to be in the same order as the order in which
370 /// the tasks were queued for work. You will need to pass a type as T that automatically
371 /// tracks its own id or has a way for you to resort results.
372 /// ```
373 pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
374 let task = match self.tasks.write().unwrap().remove(task_id) {
375 Some(task) => task.clone(),
376 None => return Err(rumtk_format!("No task with id {}", task_id)),
377 };
378
379 while !task.read().unwrap().finished {
380 async_sleep(DEFAULT_SLEEP_DURATION).await;
381 }
382
383 let x = Ok(Arc::new(task.write().unwrap().clone()));
384 x
385 }
386
387 ///
388 /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
389 ///
390 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
391 ///
392 /// Upon completion,
393 ///
394 /// 1. We collect the results generated (if any).
395 /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
396 ///
397 /// ### Note:
398 /// ```text
399 /// Results returned here are not guaranteed to be in the same order as the order in which
400 /// the tasks were queued for work. You will need to pass a type as T that automatically
401 /// tracks its own id or has a way for you to resort results.
402 /// ```
403 pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
404 let mut results = TaskResults::<R>::default();
405 for task in tasks {
406 results.push(self.wait_on_async(task).await);
407 }
408 results
409 }
410
411 ///
412 /// This method waits until all queued tasks have been processed from the main queue.
413 ///
414 /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
415 ///
416 /// Upon completion,
417 ///
418 /// 1. We collect the results generated (if any).
419 /// 2. We reset the main task and result internal queue states.
420 /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
421 ///
422 /// This operation consumes all the tasks.
423 ///
424 /// ### Note:
425 /// ```text
426 /// Results returned here are not guaranteed to be in the same order as the order in which
427 /// the tasks were queued for work. You will need to pass a type as T that automatically
428 /// tracks its own id or has a way for you to resort results.
429 /// ```
430 pub async fn wait_async(&mut self) -> TaskResults<R> {
431 let task_batch = self
432 .tasks
433 .read()
434 .unwrap()
435 .keys()
436 .cloned()
437 .collect::<Vec<_>>();
438 self.wait_on_batch_async(&task_batch).await
439 }
440
441 ///
442 /// Check if all work has been completed from the task queue.
443 ///
444 /// ## Examples
445 ///
446 /// ### Sync Usage
447 ///
448 ///```
449 /// use rumtk_core::threading::threading_manager::TaskManager;
450 ///
451 /// let manager = TaskManager::<usize>::new(&4).unwrap();
452 ///
453 /// let all_done = manager.is_all_completed();
454 ///
455 /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
456 ///
457 /// ```
458 ///
459 pub fn is_all_completed(&self) -> bool {
460 let rt = rumtk_init_threads!(&self.workers);
461 rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
462 }
463
464 pub async fn is_all_completed_async(&self) -> bool {
465 for (_, task) in self.tasks.read().unwrap().iter() {
466 if !task.read().unwrap().finished {
467 return false;
468 }
469 }
470
471 true
472 }
473
474 ///
475 /// Check if a task completed
476 ///
477 pub fn is_finished(&self, id: &TaskID) -> bool {
478 match self.tasks.read().unwrap().get(id) {
479 Some(t) => t.read().unwrap().finished,
480 None => false,
481 }
482 }
483
484 pub async fn is_finished_async(&self, id: &TaskID) -> bool {
485 match self.tasks.read().unwrap().get(id) {
486 Some(task) => task.read().unwrap().finished,
487 None => true,
488 }
489 }
490
491 ///
492 /// Alias for [wait](TaskManager::wait).
493 ///
494 fn gather(&mut self) -> TaskResults<R> {
495 self.wait()
496 }
497 }
498}
499
500///
501/// This module contains a few helper.
502///
503/// For example, you can find a function for determining number of threads available in system.
504/// The sleep family of functions are also here.
505///
506pub mod threading_functions {
507 use num_cpus;
508 use std::thread::{available_parallelism, sleep as std_sleep};
509 use std::time::Duration;
510 use tokio::time::sleep as tokio_sleep;
511
512 pub const NANOS_PER_SEC: u64 = 1000000000;
513 pub const MILLIS_PER_SEC: u64 = 1000;
514 pub const MICROS_PER_SEC: u64 = 1000000;
515
516 pub fn get_default_system_thread_count() -> usize {
517 let cpus: usize = num_cpus::get();
518 let parallelism = match available_parallelism() {
519 Ok(n) => n.get(),
520 Err(_) => 0,
521 };
522
523 if parallelism >= cpus {
524 parallelism
525 } else {
526 cpus
527 }
528 }
529
530 pub fn sleep(s: f32) {
531 let ns = s * NANOS_PER_SEC as f32;
532 let rounded_ns = ns.round() as u64;
533 let duration = Duration::from_nanos(rounded_ns);
534 std_sleep(duration);
535 }
536
537 pub async fn async_sleep(s: f32) {
538 let ns = s * NANOS_PER_SEC as f32;
539 let rounded_ns = ns.round() as u64;
540 let duration = Duration::from_nanos(rounded_ns);
541 tokio_sleep(duration).await;
542 }
543}
544
545///
546/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
547/// This means that by default, all jobs sent to the thread pool have to be async in nature.
548/// These macros make handling of these jobs at the sync/async boundary more convenient.
549///
550pub mod threading_macros {
551 use crate::threading::thread_primitives;
552 use crate::threading::threading_manager::SafeTaskArgs;
553
554 ///
555 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
556 /// will be saved to the global context so the next call to this macro will simply grab a
557 /// reference to the previously initialized runtime.
558 ///
559 /// Passing nothing will default to initializing a runtime using the default number of threads
560 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
561 ///
562 /// Passing `threads` number will yield a runtime that allocates that many threads.
563 ///
564 ///
565 /// ## Examples
566 ///
567 /// ```
568 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
569 /// use rumtk_core::core::RUMResult;
570 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
571 ///
572 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
573 /// let mut result = Vec::<i32>::new();
574 /// for arg in args.read().await.iter() {
575 /// result.push(*arg);
576 /// }
577 /// Ok(result)
578 /// }
579 ///
580 /// let rt = rumtk_init_threads!(); // Creates runtime instance
581 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
582 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
583 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
584 /// ```
585 ///
586 /// ```
587 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
588 /// use rumtk_core::core::RUMResult;
589 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
590 ///
591 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
592 /// let mut result = Vec::<i32>::new();
593 /// for arg in args.read().await.iter() {
594 /// result.push(*arg);
595 /// }
596 /// Ok(result)
597 /// }
598 ///
599 /// let thread_count: usize = 10;
600 /// let rt = rumtk_init_threads!(&thread_count);
601 /// let args = rumtk_create_task_args!(1);
602 /// let task = rumtk_create_task!(test, args);
603 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
604 /// ```
605 #[macro_export]
606 macro_rules! rumtk_init_threads {
607 ( ) => {{
608 use $crate::rumtk_cache_fetch;
609 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
610 use $crate::threading::threading_functions::get_default_system_thread_count;
611 let rt = rumtk_cache_fetch!(
612 &mut RT_CACHE,
613 &get_default_system_thread_count(),
614 init_cache
615 );
616 rt
617 }};
618 ( $threads:expr ) => {{
619 use $crate::rumtk_cache_fetch;
620 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
621 let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
622 rt
623 }};
624 }
625
626 ///
627 /// Puts task onto the runtime queue.
628 ///
629 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
630 ///
631 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
632 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
633 ///
634 #[macro_export]
635 macro_rules! rumtk_spawn_task {
636 ( $func:expr ) => {{
637 let rt = rumtk_init_threads!();
638 rt.spawn($func)
639 }};
640 ( $rt:expr, $func:expr ) => {{
641 $rt.spawn($func)
642 }};
643 }
644
645 ///
646 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
647 ///
648 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
649 /// async closure without passing any arguments.
650 ///
651 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
652 /// In such a case, we pass those arguments to the call on the async closure and await on results.
653 ///
654 #[macro_export]
655 macro_rules! rumtk_wait_on_task {
656 ( $rt:expr, $func:expr ) => {{
657 $rt.block_on(async move {
658 $func().await
659 })
660 }};
661 ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
662 $rt.block_on(async move {
663 $func($($arg_items),+).await
664 })
665 }};
666 }
667
668 ///
669 /// This macro awaits a future.
670 ///
671 /// The arguments are a reference to the runtime (`rt) and a future.
672 ///
673 /// If there is a result, you will get the result of the future.
674 ///
675 /// ## Examples
676 ///
677 /// ```
678 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
679 /// use rumtk_core::core::RUMResult;
680 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
681 ///
682 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
683 /// let mut result = Vec::<i32>::new();
684 /// for arg in args.read().await.iter() {
685 /// result.push(*arg);
686 /// }
687 /// Ok(result)
688 /// }
689 ///
690 /// let rt = rumtk_init_threads!();
691 /// let args = rumtk_create_task_args!(1);
692 /// let task = rumtk_create_task!(test, args);
693 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
694 /// ```
695 ///
696 #[macro_export]
697 macro_rules! rumtk_resolve_task {
698 ( $rt:expr, $future:expr ) => {{
699 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
700 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
701 // is technically moved into the async closure and captured so things like mutex guards
702 // technically go out of the outer scope. As a result that expression fails to compile even
703 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
704 // into the async closure. To ensure that happens regardless of given expression, we do
705 // a variable assignment below to force the "future" macro expressions to resolve before
706 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
707 let future = $future;
708 $rt.block_on(async move { future.await })
709 }};
710 }
711
712 #[macro_export]
713 macro_rules! rumtk_resolve_task_from_async {
714 ( $rt:expr, $future:expr ) => {{
715 let handle = $rt.spawn_blocking(async move { future.await })
716 }};
717 }
718
719 ///
720 /// This macro creates an async body that calls the async closure and awaits it.
721 ///
722 /// ## Example
723 ///
724 /// ```
725 /// use std::sync::{Arc, RwLock};
726 /// use tokio::sync::RwLock as AsyncRwLock;
727 /// use rumtk_core::strings::RUMString;
728 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
729 ///
730 /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
731 /// let expected = vec![
732 /// RUMString::from("Hello"),
733 /// RUMString::from("World!"),
734 /// RUMString::from("Overcast"),
735 /// RUMString::from("and"),
736 /// RUMString::from("Sad"),
737 /// ];
738 /// let locked_args = AsyncRwLock::new(expected.clone());
739 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
740 ///
741 ///
742 /// ```
743 ///
744 #[macro_export]
745 macro_rules! rumtk_create_task {
746 ( $func:expr ) => {{
747 async move {
748 let f = $func;
749 f().await
750 }
751 }};
752 ( $func:expr, $args:expr ) => {{
753 let f = $func;
754 async move { f(&$args).await }
755 }};
756 }
757
758 ///
759 /// Creates an instance of [SafeTaskArgs] with the arguments passed.
760 ///
761 /// ## Note
762 ///
763 /// All arguments must be of the same type
764 ///
765 #[macro_export]
766 macro_rules! rumtk_create_task_args {
767 ( ) => {{
768 use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
769 use tokio::sync::RwLock;
770 SafeTaskArgs::new(RwLock::new(vec![]))
771 }};
772 ( $($args:expr),+ ) => {{
773 use $crate::threading::threading_manager::{SafeTaskArgs};
774 use tokio::sync::RwLock;
775 SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
776 }};
777 }
778
779 ///
780 /// Convenience macro for packaging the task components and launching the task in one line.
781 ///
782 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
783 /// number of threads at the end. This is optional. Meaning, we will default to the system's
784 /// number of threads if that value is not specified.
785 ///
786 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
787 /// variable number of arguments to pass to the task. each argument must be of the same type.
788 /// If you wish to pass different arguments with different types, please define an abstract type
789 /// whose underlying structure is a tuple of items and pass that instead.
790 ///
791 /// ## Examples
792 ///
793 /// ### With Default Thread Count
794 /// ```
795 /// use rumtk_core::{rumtk_exec_task};
796 /// use rumtk_core::core::RUMResult;
797 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
798 ///
799 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
800 /// let mut result = Vec::<i32>::new();
801 /// for arg in args.read().await.iter() {
802 /// result.push(*arg);
803 /// }
804 /// Ok(result)
805 /// }
806 ///
807 /// let result = rumtk_exec_task!(test, vec![5]);
808 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
809 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
810 /// ```
811 ///
812 /// ### With Custom Thread Count
813 /// ```
814 /// use rumtk_core::{rumtk_exec_task};
815 /// use rumtk_core::core::RUMResult;
816 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
817 ///
818 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
819 /// let mut result = Vec::<i32>::new();
820 /// for arg in args.read().await.iter() {
821 /// result.push(*arg);
822 /// }
823 /// Ok(result)
824 /// }
825 ///
826 /// let result = rumtk_exec_task!(test, vec![5], 5);
827 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
828 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
829 /// ```
830 ///
831 /// ### With Async Function Body
832 /// ```
833 /// use rumtk_core::{rumtk_exec_task};
834 /// use rumtk_core::core::RUMResult;
835 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
836 ///
837 /// let result = rumtk_exec_task!(
838 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
839 /// let mut result = Vec::<i32>::new();
840 /// for arg in args.read().await.iter() {
841 /// result.push(*arg);
842 /// }
843 /// Ok(result)
844 /// },
845 /// vec![5]);
846 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
847 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
848 /// ```
849 ///
850 /// ### With Async Function Body and No Args
851 /// ```
852 /// use rumtk_core::{rumtk_exec_task};
853 /// use rumtk_core::core::RUMResult;
854 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
855 ///
856 /// let result = rumtk_exec_task!(
857 /// async || -> RUMResult<Vec<i32>> {
858 /// let mut result = Vec::<i32>::new();
859 /// Ok(result)
860 /// });
861 /// let empty = Vec::<i32>::new();
862 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
863 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
864 /// ```
865 ///
866 /// ## Equivalent To
867 ///
868 /// ```
869 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
870 /// use rumtk_core::core::RUMResult;
871 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
872 ///
873 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
874 /// let mut result = Vec::<i32>::new();
875 /// for arg in args.read().await.iter() {
876 /// result.push(*arg);
877 /// }
878 /// Ok(result)
879 /// }
880 ///
881 /// let rt = rumtk_init_threads!();
882 /// let args = rumtk_create_task_args!(1);
883 /// let task = rumtk_create_task!(test, args);
884 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
885 /// ```
886 ///
887 #[macro_export]
888 macro_rules! rumtk_exec_task {
889 ($func:expr ) => {{
890 use tokio::sync::RwLock;
891 use $crate::{
892 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
893 };
894 let rt = rumtk_init_threads!();
895 let task = rumtk_create_task!($func);
896 rumtk_resolve_task!(&rt, task)
897 }};
898 ($func:expr, $args:expr ) => {{
899 use tokio::sync::RwLock;
900 use $crate::{
901 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
902 };
903 let rt = rumtk_init_threads!();
904 let args = SafeTaskArgs::new(RwLock::new($args));
905 let task = rumtk_create_task!($func, args);
906 rumtk_resolve_task!(&rt, task)
907 }};
908 ($func:expr, $args:expr , $threads:expr ) => {{
909 use tokio::sync::RwLock;
910 use $crate::{
911 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
912 };
913 let rt = rumtk_init_threads!(&$threads);
914 let args = SafeTaskArgs::new(RwLock::new($args));
915 let task = rumtk_create_task!($func, args);
916 rumtk_resolve_task!(&rt, task)
917 }};
918 }
919
920 ///
921 /// Sleep a duration of time in a sync context, so no await can be call on the result.
922 ///
923 /// You can pass any value that can be cast to f32.
924 ///
925 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
926 ///
927 /// ## Examples
928 ///
929 /// ```
930 /// use rumtk_core::rumtk_sleep;
931 /// rumtk_sleep!(1); // Sleeps for 1 second.
932 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
933 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
934 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
935 /// ```
936 ///
937 #[macro_export]
938 macro_rules! rumtk_sleep {
939 ( $dur:expr) => {{
940 use $crate::threading::threading_functions::sleep;
941 sleep($dur as f32)
942 }};
943 }
944
945 ///
946 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
947 ///
948 /// You can pass any value that can be cast to f32.
949 ///
950 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
951 ///
952 /// ## Examples
953 ///
954 /// ```
955 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
956 /// use rumtk_core::core::RUMResult;
957 /// rumtk_exec_task!( async || -> RUMResult<()> {
958 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
959 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
960 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
961 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
962 /// Ok(())
963 /// }
964 /// );
965 /// ```
966 ///
967 #[macro_export]
968 macro_rules! rumtk_async_sleep {
969 ( $dur:expr) => {{
970 use $crate::threading::threading_functions::async_sleep;
971 async_sleep($dur as f32)
972 }};
973 }
974
975 ///
976 ///
977 ///
978 #[macro_export]
979 macro_rules! rumtk_new_task_queue {
980 ( $worker_num:expr ) => {{
981 use $crate::threading::threading_manager::TaskManager;
982 TaskManager::new($worker_num);
983 }};
984 }
985}