rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26 use crate::cache::{new_cache, LazyRUMCache};
27 use std::sync::Arc;
28 use tokio::runtime::Runtime as TokioRuntime;
29 /**************************** Globals **************************************/
30 pub static mut RT_CACHE: TokioRtCache = new_cache();
31 /**************************** Helpers ***************************************/
32 pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
33 let mut builder = tokio::runtime::Builder::new_multi_thread();
34 builder.worker_threads(*threads);
35 builder.enable_all();
36 match builder.build() {
37 Ok(handle) => Arc::new(handle),
38 Err(e) => panic!(
39 "Unable to initialize threading tokio runtime because {}!",
40 &e
41 ),
42 }
43 }
44
45 /**************************** Types ***************************************/
46 pub type SafeTokioRuntime = Arc<TokioRuntime>;
47 pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
48}
49
50pub mod threading_manager {
51 use crate::cache::LazyRUMCacheValue;
52 use crate::core::{RUMResult, RUMVec};
53 use crate::strings::rumtk_format;
54 use crate::threading::thread_primitives::SafeTokioRuntime;
55 use crate::threading::threading_functions::async_sleep;
56 use crate::types::{RUMHashMap, RUMID};
57 use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
58 use std::future::Future;
59 use std::sync::Arc;
60 use tokio::sync::RwLock;
61 use tokio::task::JoinHandle;
62
63 const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
64 const DEFAULT_TASK_CAPACITY: usize = 1024;
65
66 pub type TaskItems<T> = RUMVec<T>;
67 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
68 pub type TaskArgs<T> = TaskItems<T>;
69 /// Function signature defining the interface of task processing logic.
70 pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
71 pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
72 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
73 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
74 pub type TaskID = RUMID;
75
76 #[derive(Debug, Clone, Default)]
77 pub struct Task<R> {
78 pub id: TaskID,
79 pub finished: bool,
80 pub result: Option<R>,
81 }
82
83 pub type SafeTask<R> = Arc<Task<R>>;
84 type SafeInternalTask<R> = Arc<RwLock<Task<R>>>;
85 pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
86 pub type SafeAsyncTaskTable<R> = Arc<RwLock<TaskTable<R>>>;
87 pub type TaskBatch = RUMVec<TaskID>;
88 /// Type to use to define how task results are expected to be returned.
89 pub type TaskResult<R> = RUMResult<SafeTask<R>>;
90 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
91 pub type TaskRuntime = LazyRUMCacheValue<SafeTokioRuntime>;
92
93 ///
94 /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
95 /// gives the multithreading, asynchronous superpowers to synchronous logic.
96 ///
97 /// ## Example Usage
98 ///
99 /// ```
100 /// use std::sync::{Arc};
101 /// use tokio::sync::RwLock as AsyncRwLock;
102 /// use rumtk_core::core::RUMResult;
103 /// use rumtk_core::strings::RUMString;
104 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
105 /// use rumtk_core::{rumtk_create_task, };
106 ///
107 /// let expected = vec![
108 /// RUMString::from("Hello"),
109 /// RUMString::from("World!"),
110 /// RUMString::from("Overcast"),
111 /// RUMString::from("and"),
112 /// RUMString::from("Sad"),
113 /// ];
114 ///
115 /// type TestResult = RUMResult<Vec<RUMString>>;
116 /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
117 ///
118 /// let locked_args = AsyncRwLock::new(expected.clone());
119 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
120 /// let processor = rumtk_create_task!(
121 /// async |args: &SafeTaskArgs<RUMString>| -> TestResult {
122 /// let owned_args = Arc::clone(args);
123 /// let locked_args = owned_args.read().await;
124 /// let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
125 ///
126 /// for arg in locked_args.iter() {
127 /// results.push(RUMString::new(arg));
128 /// }
129 ///
130 /// Ok(results)
131 /// },
132 /// task_args
133 /// );
134 ///
135 /// queue.add_task::<_>(processor);
136 /// let results = queue.wait();
137 ///
138 /// let mut result_data = Vec::<RUMString>::with_capacity(5);
139 /// for r in results {
140 /// for v in r.unwrap().result.clone().unwrap().iter() {
141 /// for value in v.iter() {
142 /// result_data.push(value.clone());
143 /// }
144 /// }
145 /// }
146 ///
147 /// assert_eq!(result_data, expected, "Results do not match expected!");
148 ///
149 /// ```
150 ///
151 #[derive(Debug, Clone, Default)]
152 pub struct TaskManager<R> {
153 tasks: SafeAsyncTaskTable<R>,
154 workers: usize,
155 }
156
157 impl<R> TaskManager<R>
158 where
159 R: Sync + Send + Clone + 'static,
160 {
161 ///
162 /// This method creates a [`TaskQueue`] instance using sensible defaults.
163 ///
164 /// The `threads` field is computed from the number of cores present in system.
165 ///
166 pub fn default() -> RUMResult<TaskManager<R>> {
167 Self::new(&threading::threading_functions::get_default_system_thread_count())
168 }
169
170 ///
171 /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
172 /// Expects you to provide the count of threads to spawn and the microtask queue size
173 /// allocated by each thread.
174 ///
175 /// This method calls [`Self::with_capacity()`] for the actual object creation.
176 /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
177 ///
178 pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
179 let tasks = SafeAsyncTaskTable::<R>::new(RwLock::new(TaskTable::with_capacity(
180 DEFAULT_TASK_CAPACITY,
181 )));
182 Ok(TaskManager::<R> {
183 tasks,
184 workers: worker_num.to_owned(),
185 })
186 }
187
188 ///
189 /// Add a task to the processing queue. The idea is that you can queue a processor function
190 /// and list of args that will be picked up by one of the threads for processing.
191 ///
192 /// This is the async counterpart
193 ///
194 pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
195 where
196 F: Future<Output = R> + Send + Sync + 'static,
197 F::Output: Send + Sized + 'static,
198 {
199 let id = TaskID::new_v4();
200 Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
201 }
202
203 ///
204 /// See [add_task](Self::add_task)
205 ///
206 /// Unlike `add_task`, this method does not block which is key to avoiding panicking
207 /// the tokio runtim if trying to add task to queue from a normal function called from an
208 /// async environment.
209 ///
210 /// ## Example
211 ///
212 /// ```
213 /// use rumtk_core::threading::threading_manager::{TaskManager};
214 /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
215 /// use std::sync::{Arc};
216 /// use tokio::sync::Mutex;
217 ///
218 /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
219 ///
220 /// async fn called_fn() -> usize {
221 /// 5
222 /// }
223 ///
224 /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
225 /// manager.spawn_task(called_fn());
226 /// 1
227 /// }
228 ///
229 /// async fn call_sync_fn(mut manager: JobManager) -> usize {
230 /// let mut owned = manager.lock().await;
231 /// push_job(&mut owned)
232 /// }
233 ///
234 /// let workers = 5;
235 /// let rt = rumtk_init_threads!(&workers);
236 /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
237 ///
238 /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
239 ///
240 /// let result_raw = manager.blocking_lock().wait();
241 ///
242 /// ```
243 ///
244 pub fn spawn_task<F>(&mut self, task: F) -> TaskID
245 where
246 F: Future<Output = R> + Send + Sync + 'static,
247 F::Output: Send + Sized + 'static,
248 {
249 let id = TaskID::new_v4();
250 let rt = rumtk_init_threads!(&self.workers);
251 rumtk_spawn_task!(
252 rt,
253 Self::_add_task_async(id.clone(), self.tasks.clone(), task)
254 );
255 id
256 }
257
258 ///
259 /// See [add_task](Self::add_task)
260 ///
261 pub fn add_task<F>(&mut self, task: F) -> TaskID
262 where
263 F: Future<Output = R> + Send + Sync + 'static,
264 F::Output: Send + Sized + 'static,
265 {
266 let rt = rumtk_init_threads!(&self.workers);
267 rumtk_resolve_task!(rt, self.add_task_async(task))
268 }
269
270 async fn _add_task_async<F>(id: TaskID, tasks: SafeAsyncTaskTable<R>, task: F) -> TaskID
271 where
272 F: Future<Output = R> + Send + Sync + 'static,
273 F::Output: Send + Sized + 'static,
274 {
275 let mut safe_task = Arc::new(RwLock::new(Task::<R> {
276 id: id.clone(),
277 finished: false,
278 result: None,
279 }));
280 tasks.write().await.insert(id.clone(), safe_task.clone());
281
282 let task_wrapper = async move || {
283 // Run the task
284 let result = task.await;
285
286 // Cleanup task
287 safe_task.write().await.result = Some(result);
288 safe_task.write().await.finished = true;
289 };
290
291 tokio::spawn(task_wrapper());
292
293 id
294 }
295
296 ///
297 /// See [wait_async](Self::wait_async)
298 ///
299 pub fn wait(&mut self) -> TaskResults<R> {
300 let rt = rumtk_init_threads!(&self.workers);
301 rumtk_resolve_task!(rt, self.wait_async())
302 }
303
304 ///
305 /// See [wait_on_batch_async](Self::wait_on_batch_async)
306 ///
307 pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
308 let rt = rumtk_init_threads!(&self.workers);
309 rumtk_resolve_task!(rt, self.wait_on_batch_async(&tasks))
310 }
311
312 ///
313 /// See [wait_on_async](Self::wait_on_async)
314 ///
315 pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
316 let rt = rumtk_init_threads!(&self.workers);
317 rumtk_resolve_task!(rt, self.wait_on_async(&task_id))
318 }
319
320 ///
321 /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
322 ///
323 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
324 ///
325 /// Upon completion,
326 ///
327 /// 2. Return the result ([TaskResults<R>](TaskResults)).
328 ///
329 /// This operation consumes the task.
330 ///
331 /// ### Note:
332 /// ```text
333 /// Results returned here are not guaranteed to be in the same order as the order in which
334 /// the tasks were queued for work. You will need to pass a type as T that automatically
335 /// tracks its own id or has a way for you to resort results.
336 /// ```
337 pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
338 let task = match self.tasks.write().await.remove(task_id) {
339 Some(task) => task.clone(),
340 None => return Err(rumtk_format!("No task with id {}", task_id)),
341 };
342
343 while !task.read().await.finished {
344 async_sleep(DEFAULT_SLEEP_DURATION).await;
345 }
346
347 let x = Ok(Arc::new(task.write().await.clone()));
348 x
349 }
350
351 ///
352 /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
353 ///
354 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
355 ///
356 /// Upon completion,
357 ///
358 /// 1. We collect the results generated (if any).
359 /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
360 ///
361 /// ### Note:
362 /// ```text
363 /// Results returned here are not guaranteed to be in the same order as the order in which
364 /// the tasks were queued for work. You will need to pass a type as T that automatically
365 /// tracks its own id or has a way for you to resort results.
366 /// ```
367 pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
368 let mut results = TaskResults::<R>::default();
369 for task in tasks {
370 results.push(self.wait_on_async(task).await);
371 }
372 results
373 }
374
375 ///
376 /// This method waits until all queued tasks have been processed from the main queue.
377 ///
378 /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
379 ///
380 /// Upon completion,
381 ///
382 /// 1. We collect the results generated (if any).
383 /// 2. We reset the main task and result internal queue states.
384 /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
385 ///
386 /// This operation consumes all the tasks.
387 ///
388 /// ### Note:
389 /// ```text
390 /// Results returned here are not guaranteed to be in the same order as the order in which
391 /// the tasks were queued for work. You will need to pass a type as T that automatically
392 /// tracks its own id or has a way for you to resort results.
393 /// ```
394 pub async fn wait_async(&mut self) -> TaskResults<R> {
395 let task_batch = self.tasks.read().await.keys().cloned().collect::<Vec<_>>();
396 self.wait_on_batch_async(&task_batch).await
397 }
398
399 ///
400 /// Check if all work has been completed from the task queue.
401 ///
402 /// ## Examples
403 ///
404 /// ### Sync Usage
405 ///
406 ///```
407 /// use rumtk_core::threading::threading_manager::TaskManager;
408 ///
409 /// let manager = TaskManager::<usize>::new(&4).unwrap();
410 ///
411 /// let all_done = manager.is_all_completed();
412 ///
413 /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
414 ///
415 /// ```
416 ///
417 pub fn is_all_completed(&self) -> bool {
418 let rt = rumtk_init_threads!(&self.workers);
419 rumtk_resolve_task!(rt, TaskManager::<R>::is_all_completed_async(self))
420 }
421
422 pub async fn is_all_completed_async(&self) -> bool {
423 for (_, task) in self.tasks.read().await.iter() {
424 if !task.read().await.finished {
425 return false;
426 }
427 }
428
429 true
430 }
431
432 ///
433 /// Check if a task completed
434 ///
435 pub fn is_finished(&self, id: &TaskID) -> bool {
436 match self.tasks.blocking_read().get(id) {
437 Some(t) => t.blocking_read().finished,
438 None => false,
439 }
440 }
441
442 pub async fn is_finished_async(&self, id: &TaskID) -> bool {
443 match self.tasks.read().await.get(id) {
444 Some(task) => task.read().await.finished,
445 None => true,
446 }
447 }
448
449 ///
450 /// Alias for [wait](TaskManager::wait).
451 ///
452 fn gather(&mut self) -> TaskResults<R> {
453 self.wait()
454 }
455 }
456}
457
458///
459/// This module contains a few helper.
460///
461/// For example, you can find a function for determining number of threads available in system.
462/// The sleep family of functions are also here.
463///
464pub mod threading_functions {
465 use num_cpus;
466 use std::thread::{available_parallelism, sleep as std_sleep};
467 use std::time::Duration;
468 use tokio::time::sleep as tokio_sleep;
469
470 pub const NANOS_PER_SEC: u64 = 1000000000;
471 pub const MILLIS_PER_SEC: u64 = 1000;
472 pub const MICROS_PER_SEC: u64 = 1000000;
473
474 pub fn get_default_system_thread_count() -> usize {
475 let cpus: usize = num_cpus::get();
476 let parallelism = match available_parallelism() {
477 Ok(n) => n.get(),
478 Err(_) => 0,
479 };
480
481 if parallelism >= cpus {
482 parallelism
483 } else {
484 cpus
485 }
486 }
487
488 pub fn sleep(s: f32) {
489 let ns = s * NANOS_PER_SEC as f32;
490 let rounded_ns = ns.round() as u64;
491 let duration = Duration::from_nanos(rounded_ns);
492 std_sleep(duration);
493 }
494
495 pub async fn async_sleep(s: f32) {
496 let ns = s * NANOS_PER_SEC as f32;
497 let rounded_ns = ns.round() as u64;
498 let duration = Duration::from_nanos(rounded_ns);
499 tokio_sleep(duration).await;
500 }
501}
502
503///
504/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
505/// This means that by default, all jobs sent to the thread pool have to be async in nature.
506/// These macros make handling of these jobs at the sync/async boundary more convenient.
507///
508pub mod threading_macros {
509 use crate::threading::thread_primitives;
510 use crate::threading::threading_manager::SafeTaskArgs;
511
512 ///
513 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
514 /// will be saved to the global context so the next call to this macro will simply grab a
515 /// reference to the previously initialized runtime.
516 ///
517 /// Passing nothing will default to initializing a runtime using the default number of threads
518 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
519 ///
520 /// Passing `threads` number will yield a runtime that allocates that many threads.
521 ///
522 ///
523 /// ## Examples
524 ///
525 /// ```
526 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
527 /// use rumtk_core::core::RUMResult;
528 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
529 ///
530 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
531 /// let mut result = Vec::<i32>::new();
532 /// for arg in args.read().await.iter() {
533 /// result.push(*arg);
534 /// }
535 /// Ok(result)
536 /// }
537 ///
538 /// let rt = rumtk_init_threads!(); // Creates runtime instance
539 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
540 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
541 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
542 /// ```
543 ///
544 /// ```
545 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
546 /// use rumtk_core::core::RUMResult;
547 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
548 ///
549 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
550 /// let mut result = Vec::<i32>::new();
551 /// for arg in args.read().await.iter() {
552 /// result.push(*arg);
553 /// }
554 /// Ok(result)
555 /// }
556 ///
557 /// let thread_count: usize = 10;
558 /// let rt = rumtk_init_threads!(&thread_count);
559 /// let args = rumtk_create_task_args!(1);
560 /// let task = rumtk_create_task!(test, args);
561 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
562 /// ```
563 #[macro_export]
564 macro_rules! rumtk_init_threads {
565 ( ) => {{
566 use $crate::rumtk_cache_fetch;
567 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
568 use $crate::threading::threading_functions::get_default_system_thread_count;
569 let rt = rumtk_cache_fetch!(
570 &mut RT_CACHE,
571 &get_default_system_thread_count(),
572 init_cache
573 );
574 rt
575 }};
576 ( $threads:expr ) => {{
577 use $crate::rumtk_cache_fetch;
578 use $crate::threading::thread_primitives::{init_cache, RT_CACHE};
579 let rt = rumtk_cache_fetch!(&raw mut RT_CACHE, $threads, init_cache);
580 rt
581 }};
582 }
583
584 ///
585 /// Puts task onto the runtime queue.
586 ///
587 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
588 ///
589 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
590 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
591 ///
592 #[macro_export]
593 macro_rules! rumtk_spawn_task {
594 ( $func:expr ) => {{
595 let rt = rumtk_init_threads!();
596 rt.spawn($func)
597 }};
598 ( $rt:expr, $func:expr ) => {{
599 $rt.spawn($func)
600 }};
601 }
602
603 ///
604 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
605 ///
606 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
607 /// async closure without passing any arguments.
608 ///
609 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
610 /// In such a case, we pass those arguments to the call on the async closure and await on results.
611 ///
612 #[macro_export]
613 macro_rules! rumtk_wait_on_task {
614 ( $rt:expr, $func:expr ) => {{
615 $rt.block_on(async move {
616 $func().await
617 })
618 }};
619 ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
620 $rt.block_on(async move {
621 $func($($arg_items),+).await
622 })
623 }};
624 }
625
626 ///
627 /// This macro awaits a future.
628 ///
629 /// The arguments are a reference to the runtime (`rt) and a future.
630 ///
631 /// If there is a result, you will get the result of the future.
632 ///
633 /// ## Examples
634 ///
635 /// ```
636 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
637 /// use rumtk_core::core::RUMResult;
638 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
639 ///
640 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
641 /// let mut result = Vec::<i32>::new();
642 /// for arg in args.read().await.iter() {
643 /// result.push(*arg);
644 /// }
645 /// Ok(result)
646 /// }
647 ///
648 /// let rt = rumtk_init_threads!();
649 /// let args = rumtk_create_task_args!(1);
650 /// let task = rumtk_create_task!(test, args);
651 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
652 /// ```
653 ///
654 #[macro_export]
655 macro_rules! rumtk_resolve_task {
656 ( $rt:expr, $future:expr ) => {{
657 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
658 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
659 // is technically moved into the async closure and captured so things like mutex guards
660 // technically go out of the outer scope. As a result that expression fails to compile even
661 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
662 // into the async closure. To ensure that happens regardless of given expression, we do
663 // a variable assignment below to force the "future" macro expressions to resolve before
664 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
665 let future = $future;
666 $rt.block_on(async move { future.await })
667 }};
668 }
669
670 #[macro_export]
671 macro_rules! rumtk_resolve_task_from_async {
672 ( $rt:expr, $future:expr ) => {{
673 let handle = $rt.spawn_blocking(async move { future.await })
674 }};
675 }
676
677 ///
678 /// This macro creates an async body that calls the async closure and awaits it.
679 ///
680 /// ## Example
681 ///
682 /// ```
683 /// use std::sync::{Arc, RwLock};
684 /// use tokio::sync::RwLock as AsyncRwLock;
685 /// use rumtk_core::strings::RUMString;
686 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
687 ///
688 /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
689 /// let expected = vec![
690 /// RUMString::from("Hello"),
691 /// RUMString::from("World!"),
692 /// RUMString::from("Overcast"),
693 /// RUMString::from("and"),
694 /// RUMString::from("Sad"),
695 /// ];
696 /// let locked_args = AsyncRwLock::new(expected.clone());
697 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
698 ///
699 ///
700 /// ```
701 ///
702 #[macro_export]
703 macro_rules! rumtk_create_task {
704 ( $func:expr ) => {{
705 async move {
706 let f = $func;
707 f().await
708 }
709 }};
710 ( $func:expr, $args:expr ) => {{
711 let f = $func;
712 async move { f(&$args).await }
713 }};
714 }
715
716 ///
717 /// Creates an instance of [SafeTaskArgs] with the arguments passed.
718 ///
719 /// ## Note
720 ///
721 /// All arguments must be of the same type
722 ///
723 #[macro_export]
724 macro_rules! rumtk_create_task_args {
725 ( ) => {{
726 use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
727 use tokio::sync::RwLock;
728 SafeTaskArgs::new(RwLock::new(vec![]))
729 }};
730 ( $($args:expr),+ ) => {{
731 use $crate::threading::threading_manager::{SafeTaskArgs};
732 use tokio::sync::RwLock;
733 SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
734 }};
735 }
736
737 ///
738 /// Convenience macro for packaging the task components and launching the task in one line.
739 ///
740 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
741 /// number of threads at the end. This is optional. Meaning, we will default to the system's
742 /// number of threads if that value is not specified.
743 ///
744 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
745 /// variable number of arguments to pass to the task. each argument must be of the same type.
746 /// If you wish to pass different arguments with different types, please define an abstract type
747 /// whose underlying structure is a tuple of items and pass that instead.
748 ///
749 /// ## Examples
750 ///
751 /// ### With Default Thread Count
752 /// ```
753 /// use rumtk_core::{rumtk_exec_task};
754 /// use rumtk_core::core::RUMResult;
755 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
756 ///
757 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
758 /// let mut result = Vec::<i32>::new();
759 /// for arg in args.read().await.iter() {
760 /// result.push(*arg);
761 /// }
762 /// Ok(result)
763 /// }
764 ///
765 /// let result = rumtk_exec_task!(test, vec![5]);
766 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
767 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
768 /// ```
769 ///
770 /// ### With Custom Thread Count
771 /// ```
772 /// use rumtk_core::{rumtk_exec_task};
773 /// use rumtk_core::core::RUMResult;
774 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
775 ///
776 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
777 /// let mut result = Vec::<i32>::new();
778 /// for arg in args.read().await.iter() {
779 /// result.push(*arg);
780 /// }
781 /// Ok(result)
782 /// }
783 ///
784 /// let result = rumtk_exec_task!(test, vec![5], 5);
785 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
786 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
787 /// ```
788 ///
789 /// ### With Async Function Body
790 /// ```
791 /// use rumtk_core::{rumtk_exec_task};
792 /// use rumtk_core::core::RUMResult;
793 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
794 ///
795 /// let result = rumtk_exec_task!(
796 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
797 /// let mut result = Vec::<i32>::new();
798 /// for arg in args.read().await.iter() {
799 /// result.push(*arg);
800 /// }
801 /// Ok(result)
802 /// },
803 /// vec![5]);
804 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
805 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
806 /// ```
807 ///
808 /// ### With Async Function Body and No Args
809 /// ```
810 /// use rumtk_core::{rumtk_exec_task};
811 /// use rumtk_core::core::RUMResult;
812 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
813 ///
814 /// let result = rumtk_exec_task!(
815 /// async || -> RUMResult<Vec<i32>> {
816 /// let mut result = Vec::<i32>::new();
817 /// Ok(result)
818 /// });
819 /// let empty = Vec::<i32>::new();
820 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
821 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
822 /// ```
823 ///
824 /// ## Equivalent To
825 ///
826 /// ```
827 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
828 /// use rumtk_core::core::RUMResult;
829 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
830 ///
831 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
832 /// let mut result = Vec::<i32>::new();
833 /// for arg in args.read().await.iter() {
834 /// result.push(*arg);
835 /// }
836 /// Ok(result)
837 /// }
838 ///
839 /// let rt = rumtk_init_threads!();
840 /// let args = rumtk_create_task_args!(1);
841 /// let task = rumtk_create_task!(test, args);
842 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
843 /// ```
844 ///
845 #[macro_export]
846 macro_rules! rumtk_exec_task {
847 ($func:expr ) => {{
848 use tokio::sync::RwLock;
849 use $crate::{
850 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
851 };
852 let rt = rumtk_init_threads!();
853 let task = rumtk_create_task!($func);
854 rumtk_resolve_task!(&rt, task)
855 }};
856 ($func:expr, $args:expr ) => {{
857 use tokio::sync::RwLock;
858 use $crate::{
859 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
860 };
861 let rt = rumtk_init_threads!();
862 let args = SafeTaskArgs::new(RwLock::new($args));
863 let task = rumtk_create_task!($func, args);
864 rumtk_resolve_task!(&rt, task)
865 }};
866 ($func:expr, $args:expr , $threads:expr ) => {{
867 use tokio::sync::RwLock;
868 use $crate::{
869 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
870 };
871 let rt = rumtk_init_threads!(&$threads);
872 let args = SafeTaskArgs::new(RwLock::new($args));
873 let task = rumtk_create_task!($func, args);
874 rumtk_resolve_task!(&rt, task)
875 }};
876 }
877
878 ///
879 /// Sleep a duration of time in a sync context, so no await can be call on the result.
880 ///
881 /// You can pass any value that can be cast to f32.
882 ///
883 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
884 ///
885 /// ## Examples
886 ///
887 /// ```
888 /// use rumtk_core::rumtk_sleep;
889 /// rumtk_sleep!(1); // Sleeps for 1 second.
890 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
891 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
892 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
893 /// ```
894 ///
895 #[macro_export]
896 macro_rules! rumtk_sleep {
897 ( $dur:expr) => {{
898 use $crate::threading::threading_functions::sleep;
899 sleep($dur as f32)
900 }};
901 }
902
903 ///
904 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
905 ///
906 /// You can pass any value that can be cast to f32.
907 ///
908 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
909 ///
910 /// ## Examples
911 ///
912 /// ```
913 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
914 /// use rumtk_core::core::RUMResult;
915 /// rumtk_exec_task!( async || -> RUMResult<()> {
916 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
917 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
918 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
919 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
920 /// Ok(())
921 /// }
922 /// );
923 /// ```
924 ///
925 #[macro_export]
926 macro_rules! rumtk_async_sleep {
927 ( $dur:expr) => {{
928 use $crate::threading::threading_functions::async_sleep;
929 async_sleep($dur as f32)
930 }};
931 }
932
933 ///
934 ///
935 ///
936 #[macro_export]
937 macro_rules! rumtk_new_task_queue {
938 ( $worker_num:expr ) => {{
939 use $crate::threading::threading_manager::TaskManager;
940 TaskManager::new($worker_num);
941 }};
942 }
943}