rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26 use crate::core::RUMResult;
27 use crate::strings::rumtk_format;
28 pub use std::sync::Mutex as SyncMutex;
29 pub use std::sync::MutexGuard as SyncMutexGuard;
30 use std::sync::OnceLock;
31 pub use std::sync::RwLock as SyncRwLock;
32 pub use tokio::io;
33 pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
34 use tokio::runtime::Runtime as TokioRuntime;
35 pub use tokio::sync::{
36 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
37 RwLockWriteGuard,
38 };
39
40 pub type RuntimeGuard<'a> = SyncMutexGuard<'a, TokioRuntime>;
41 /**************************** Types ***************************************/
42 pub type SafeTokioRuntime = OnceLock<SyncMutex<TokioRuntime>>;
43 /**************************** Globals **************************************/
44 static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
45 /**************************** Helpers ***************************************/
46 pub fn init_runtime(workers: usize) -> RUMResult<RuntimeGuard<'static>> {
47 unsafe {
48 let handle = DEFAULT_RUNTIME.get_or_init(|| {
49 let mut builder = tokio::runtime::Builder::new_multi_thread();
50 builder.worker_threads(workers);
51 builder.enable_all();
52 match builder.build() {
53 Ok(handle) => SyncMutex::new(handle),
54 Err(e) => panic!(
55 "Unable to initialize threading tokio runtime because {}!",
56 &e
57 ),
58 }
59 });
60 match handle.lock() {
61 Ok(guard) => Ok(guard),
62 Err(e) => Err(rumtk_format!("Unable to lock tokio runtime!")),
63 }
64 }
65 }
66}
67
68pub mod threading_manager {
69 use crate::core::{RUMResult, RUMVec};
70 use crate::strings::rumtk_format;
71 use crate::threading::threading_functions::{async_sleep, sleep};
72 use crate::types::{RUMHashMap, RUMID};
73 use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
74 use std::future::Future;
75 use std::sync::Arc;
76 pub use std::sync::RwLock as SyncRwLock;
77 use tokio::sync::RwLock as AsyncRwLock;
78 use tokio::task::JoinHandle;
79
80 const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
81 const DEFAULT_TASK_CAPACITY: usize = 100;
82
83 pub type TaskItems<T> = RUMVec<T>;
84 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
85 pub type TaskArgs<T> = TaskItems<T>;
86 /// Function signature defining the interface of task processing logic.
87 pub type SafeTaskArgs<T> = Arc<AsyncRwLock<TaskItems<T>>>;
88 pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
89 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
90 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
91 pub type TaskID = RUMID;
92
93 #[derive(Debug, Clone, Default)]
94 pub struct Task<R> {
95 pub id: TaskID,
96 pub finished: bool,
97 pub result: Option<R>,
98 }
99
100 pub type SafeTask<R> = Arc<Task<R>>;
101 type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
102 pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
103 pub type SafeAsyncTaskTable<R> = Arc<AsyncRwLock<TaskTable<R>>>;
104 pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
105 pub type TaskBatch = RUMVec<TaskID>;
106 /// Type to use to define how task results are expected to be returned.
107 pub type TaskResult<R> = RUMResult<SafeTask<R>>;
108 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
109
110 ///
111 /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
112 /// gives the multithreading, asynchronous superpowers to synchronous logic.
113 ///
114 /// ## Example Usage
115 ///
116 /// ```
117 /// use std::sync::{Arc};
118 /// use tokio::sync::RwLock as AsyncRwLock;
119 /// use rumtk_core::core::RUMResult;
120 /// use rumtk_core::strings::RUMString;
121 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
122 /// use rumtk_core::{rumtk_create_task, };
123 ///
124 /// let expected = vec![
125 /// RUMString::from("Hello"),
126 /// RUMString::from("World!"),
127 /// RUMString::from("Overcast"),
128 /// RUMString::from("and"),
129 /// RUMString::from("Sad"),
130 /// ];
131 ///
132 /// type TestResult = RUMResult<Vec<RUMString>>;
133 /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
134 ///
135 /// let locked_args = AsyncRwLock::new(expected.clone());
136 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
137 /// let processor = rumtk_create_task!(
138 /// async |args: &SafeTaskArgs<RUMString>| -> TestResult {
139 /// let owned_args = Arc::clone(args);
140 /// let locked_args = owned_args.read().await;
141 /// let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
142 ///
143 /// for arg in locked_args.iter() {
144 /// results.push(RUMString::new(arg));
145 /// }
146 ///
147 /// Ok(results)
148 /// },
149 /// task_args
150 /// );
151 ///
152 /// queue.add_task::<_>(processor);
153 /// let results = queue.wait();
154 ///
155 /// let mut result_data = Vec::<RUMString>::with_capacity(5);
156 /// for r in results {
157 /// for v in r.unwrap().result.clone().unwrap().iter() {
158 /// for value in v.iter() {
159 /// result_data.push(value.clone());
160 /// }
161 /// }
162 /// }
163 ///
164 /// assert_eq!(result_data, expected, "Results do not match expected!");
165 ///
166 /// ```
167 ///
168 #[derive(Debug, Clone, Default)]
169 pub struct TaskManager<R> {
170 tasks: SafeSyncTaskTable<R>,
171 workers: usize,
172 }
173
174 impl<R> TaskManager<R>
175 where
176 R: Sync + Send + Clone + 'static,
177 {
178 ///
179 /// This method creates a [`TaskManager`] instance using sensible defaults.
180 ///
181 /// The `threads` field is computed from the number of cores present in system.
182 ///
183 pub fn default() -> RUMResult<TaskManager<R>> {
184 Self::new(&threading::threading_functions::get_default_system_thread_count())
185 }
186
187 ///
188 /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
189 /// Expects you to provide the count of threads to spawn and the microtask queue size
190 /// allocated by each thread.
191 ///
192 /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
193 /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
194 ///
195 pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
196 let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
197 DEFAULT_TASK_CAPACITY,
198 )));
199 Ok(TaskManager::<R> {
200 tasks,
201 workers: worker_num.to_owned(),
202 })
203 }
204
205 ///
206 /// Add a task to the processing queue. The idea is that you can queue a processor function
207 /// and list of args that will be picked up by one of the threads for processing.
208 ///
209 /// This is the async counterpart
210 ///
211 pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
212 where
213 F: Future<Output = R> + Send + Sync + 'static,
214 F::Output: Send + 'static,
215 {
216 let id = TaskID::new_v4();
217 Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
218 }
219
220 ///
221 /// See [`Self::add_task_async`]
222 ///
223 /// Unlike `add_task`, this method does not block which is key to avoiding panicking
224 /// the tokio runtim if trying to add task to queue from a normal function called from an
225 /// async environment.
226 ///
227 /// ## Example
228 ///
229 /// ```
230 /// use rumtk_core::threading::threading_manager::{TaskManager};
231 /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
232 /// use std::sync::{Arc};
233 /// use tokio::sync::Mutex;
234 ///
235 /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
236 ///
237 /// async fn called_fn() -> usize {
238 /// 5
239 /// }
240 ///
241 /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
242 /// manager.spawn_task(called_fn());
243 /// 1
244 /// }
245 ///
246 /// async fn call_sync_fn(mut manager: JobManager) -> usize {
247 /// let mut owned = manager.lock().await;
248 /// push_job(&mut owned)
249 /// }
250 ///
251 /// let workers = 5;
252 /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
253 ///
254 /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
255 ///
256 /// let result_raw = manager.blocking_lock().wait();
257 ///
258 /// ```
259 ///
260 pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
261 where
262 F: Future<Output = R> + Send + Sync + 'static,
263 F::Output: Send + Sized + 'static,
264 {
265 let id = TaskID::new_v4();
266 let rt = rumtk_init_threads!(self.workers)?;
267 rumtk_spawn_task!(
268 rt,
269 Self::_add_task_async(id.clone(), self.tasks.clone(), task)
270 );
271 Ok(id)
272 }
273
274 ///
275 /// See [add_task_async](Self::add_task_async)
276 ///
277 pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
278 where
279 F: Future<Output = R> + Send + Sync + 'static,
280 F::Output: Send + Sized + 'static,
281 {
282 let id = TaskID::new_v4();
283 let tasks = self.tasks.clone();
284 rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task))
285 }
286
287 async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
288 where
289 F: Future<Output = R> + Send + Sync + 'static,
290 F::Output: Send + Sized + 'static,
291 {
292 let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
293 id: id.clone(),
294 finished: false,
295 result: None,
296 }));
297 tasks.write().unwrap().insert(id.clone(), safe_task.clone());
298
299 let task_wrapper = async move || {
300 // Run the task
301 let result = task.await;
302
303 // Cleanup task
304 let mut lock = safe_task.write().unwrap();
305 lock.result = Some(result);
306 lock.finished = true;
307 };
308
309 tokio::spawn(task_wrapper());
310
311 id
312 }
313
314 ///
315 /// See [wait_async](Self::wait_async)
316 ///
317 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
318 /// this function happens to be called from the async context.
319 ///
320 pub fn wait(&mut self) -> TaskResults<R> {
321 let task_batch = self
322 .tasks
323 .read()
324 .unwrap()
325 .keys()
326 .cloned()
327 .collect::<Vec<_>>();
328 self.wait_on_batch(&task_batch)
329 }
330
331 ///
332 /// See [wait_on_batch_async](Self::wait_on_batch_async)
333 ///
334 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
335 /// this function happens to be called from the async context.
336 ///
337 pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
338 let mut results = TaskResults::<R>::default();
339 for task in tasks {
340 results.push(self.wait_on(task));
341 }
342 results
343 }
344
345 ///
346 /// See [wait_on_async](Self::wait_on_async)
347 ///
348 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
349 /// this function happens to be called from the async context.
350 ///
351 pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
352 let task = match self.tasks.write().unwrap().remove(task_id) {
353 Some(task) => task.clone(),
354 None => return Err(rumtk_format!("No task with id {}", task_id)),
355 };
356
357 while !task.read().unwrap().finished {
358 sleep(DEFAULT_SLEEP_DURATION);
359 }
360
361 let x = Ok(Arc::new(task.write().unwrap().clone()));
362 x
363 }
364
365 ///
366 /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
367 ///
368 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
369 ///
370 /// Upon completion,
371 ///
372 /// 2. Return the result ([TaskResults<R>](TaskResults)).
373 ///
374 /// This operation consumes the task.
375 ///
376 /// ### Note:
377 /// ```text
378 /// Results returned here are not guaranteed to be in the same order as the order in which
379 /// the tasks were queued for work. You will need to pass a type as T that automatically
380 /// tracks its own id or has a way for you to resort results.
381 /// ```
382 pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
383 let task = match self.tasks.write().unwrap().remove(task_id) {
384 Some(task) => task.clone(),
385 None => return Err(rumtk_format!("No task with id {}", task_id)),
386 };
387
388 while !task.read().unwrap().finished {
389 async_sleep(DEFAULT_SLEEP_DURATION).await;
390 }
391
392 let x = Ok(Arc::new(task.write().unwrap().clone()));
393 x
394 }
395
396 ///
397 /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
398 ///
399 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
400 ///
401 /// Upon completion,
402 ///
403 /// 1. We collect the results generated (if any).
404 /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
405 ///
406 /// ### Note:
407 /// ```text
408 /// Results returned here are not guaranteed to be in the same order as the order in which
409 /// the tasks were queued for work. You will need to pass a type as T that automatically
410 /// tracks its own id or has a way for you to resort results.
411 /// ```
412 pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
413 let mut results = TaskResults::<R>::default();
414 for task in tasks {
415 results.push(self.wait_on_async(task).await);
416 }
417 results
418 }
419
420 ///
421 /// This method waits until all queued tasks have been processed from the main queue.
422 ///
423 /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
424 ///
425 /// Upon completion,
426 ///
427 /// 1. We collect the results generated (if any).
428 /// 2. We reset the main task and result internal queue states.
429 /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
430 ///
431 /// This operation consumes all the tasks.
432 ///
433 /// ### Note:
434 /// ```text
435 /// Results returned here are not guaranteed to be in the same order as the order in which
436 /// the tasks were queued for work. You will need to pass a type as T that automatically
437 /// tracks its own id or has a way for you to resort results.
438 /// ```
439 pub async fn wait_async(&mut self) -> TaskResults<R> {
440 let task_batch = self
441 .tasks
442 .read()
443 .unwrap()
444 .keys()
445 .cloned()
446 .collect::<Vec<_>>();
447 self.wait_on_batch_async(&task_batch).await
448 }
449
450 ///
451 /// Check if all work has been completed from the task queue.
452 ///
453 /// ## Examples
454 ///
455 /// ### Sync Usage
456 ///
457 ///```
458 /// use rumtk_core::threading::threading_manager::TaskManager;
459 ///
460 /// let manager = TaskManager::<usize>::new(&4).unwrap();
461 ///
462 /// let all_done = manager.is_all_completed();
463 ///
464 /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
465 ///
466 /// ```
467 ///
468 pub fn is_all_completed(&self) -> bool {
469 self._is_all_completed_async()
470 }
471
472 pub async fn is_all_completed_async(&self) -> bool {
473 self._is_all_completed_async()
474 }
475
476 fn _is_all_completed_async(&self) -> bool {
477 for (_, task) in self.tasks.read().unwrap().iter() {
478 if !task.read().unwrap().finished {
479 return false;
480 }
481 }
482
483 true
484 }
485
486 ///
487 /// Check if a task completed
488 ///
489 pub fn is_finished(&self, id: &TaskID) -> bool {
490 match self.tasks.read().unwrap().get(id) {
491 Some(t) => t.read().unwrap().finished,
492 None => false,
493 }
494 }
495
496 pub async fn is_finished_async(&self, id: &TaskID) -> bool {
497 match self.tasks.read().unwrap().get(id) {
498 Some(task) => task.read().unwrap().finished,
499 None => true,
500 }
501 }
502
503 ///
504 /// Alias for [wait](TaskManager::wait).
505 ///
506 fn gather(&mut self) -> TaskResults<R> {
507 self.wait()
508 }
509 }
510}
511
512///
513/// This module contains a few helper.
514///
515/// For example, you can find a function for determining number of threads available in system.
516/// The sleep family of functions are also here.
517///
518pub mod threading_functions {
519 use crate::core::RUMResult;
520 use crate::rumtk_sleep;
521 use crate::strings::rumtk_format;
522 pub use crate::threading::thread_primitives::init_runtime;
523 use num_cpus;
524 use std::future::Future;
525 use std::thread::{available_parallelism, sleep as std_sleep};
526 use std::time::Duration;
527 use tokio::time::sleep as tokio_sleep;
528
529 pub const NANOS_PER_SEC: u64 = 1000000000;
530 pub const MILLIS_PER_SEC: u64 = 1000;
531 pub const MICROS_PER_SEC: u64 = 1000000;
532
533 const DEFAULT_SLEEP_DURATION: f32 = 0.001;
534
535 pub fn get_default_system_thread_count() -> usize {
536 let cpus: usize = num_cpus::get();
537 let parallelism = match available_parallelism() {
538 Ok(n) => n.get(),
539 Err(_) => 0,
540 };
541
542 if parallelism >= cpus {
543 parallelism
544 } else {
545 cpus
546 }
547 }
548
549 pub fn sleep(s: f32) {
550 let ns = s * NANOS_PER_SEC as f32;
551 let rounded_ns = ns.round() as u64;
552 let duration = Duration::from_nanos(rounded_ns);
553 std_sleep(duration);
554 }
555
556 pub async fn async_sleep(s: f32) {
557 let ns = s * NANOS_PER_SEC as f32;
558 let rounded_ns = ns.round() as u64;
559 let duration = Duration::from_nanos(rounded_ns);
560 tokio_sleep(duration).await;
561 }
562
563 ///
564 /// Given a closure task, push it onto the current `tokio` runtime for execution.
565 /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
566 /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
567 /// result.
568 ///
569 /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
570 ///
571 /// ## Example
572 ///
573 /// ```
574 /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
575 ///
576 /// const Hello: &str = "World!";
577 ///
578 /// init_runtime(5);
579 ///
580 /// let result = block_on_task(async {
581 /// Hello
582 /// }).unwrap();
583 ///
584 /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
585 /// ```
586 ///
587 pub fn block_on_task<R, F>(task: F) -> RUMResult<R>
588 where
589 F: Future<Output = R> + Send + 'static,
590 F::Output: Send + 'static,
591 {
592 let default_system_thread_count = get_default_system_thread_count();
593
594 let handle = init_runtime(default_system_thread_count)
595 .expect("Failed to initialize runtime")
596 .spawn(task);
597
598 while !handle.is_finished() {
599 rumtk_sleep!(DEFAULT_SLEEP_DURATION);
600 }
601
602 tokio::task::block_in_place(move || {
603 match init_runtime(default_system_thread_count)
604 .expect("Failed to initialize runtime")
605 .block_on(handle)
606 {
607 Ok(r) => Ok(r),
608 Err(e) => Err(rumtk_format!(
609 "Issue peeking into task in the runtime because => {}",
610 &e
611 )),
612 }
613 })
614 }
615}
616
617///
618/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
619/// This means that by default, all jobs sent to the thread pool have to be async in nature.
620/// These macros make handling of these jobs at the sync/async boundary more convenient.
621///
622pub mod threading_macros {
623 use crate::threading::thread_primitives;
624 use crate::threading::threading_manager::SafeTaskArgs;
625
626 ///
627 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
628 /// will be saved to the global context so the next call to this macro will simply grab a
629 /// reference to the previously initialized runtime.
630 ///
631 /// Passing nothing will default to initializing a runtime using the default number of threads
632 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
633 ///
634 /// Passing `threads` number will yield a runtime that allocates that many threads.
635 ///
636 ///
637 /// ## Examples
638 ///
639 /// ```
640 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
641 /// use rumtk_core::core::RUMResult;
642 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
643 ///
644 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
645 /// let mut result = Vec::<i32>::new();
646 /// for arg in args.read().await.iter() {
647 /// result.push(*arg);
648 /// }
649 /// Ok(result)
650 /// }
651 ///
652 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
653 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
654 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task)); // Spawn's task and waits for it to conclude.
655 /// ```
656 ///
657 /// ```
658 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
659 /// use rumtk_core::core::RUMResult;
660 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
661 ///
662 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
663 /// let mut result = Vec::<i32>::new();
664 /// for arg in args.read().await.iter() {
665 /// result.push(*arg);
666 /// }
667 /// Ok(result)
668 /// }
669 ///
670 /// let thread_count: usize = 10;
671 /// let args = rumtk_create_task_args!(1);
672 /// let task = rumtk_create_task!(test, args);
673 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
674 /// ```
675 #[macro_export]
676 macro_rules! rumtk_init_threads {
677 ( ) => {{
678 use $crate::threading::thread_primitives::init_runtime;
679 use $crate::threading::threading_functions::get_default_system_thread_count;
680 init_runtime(get_default_system_thread_count())
681 }};
682 ( $threads:expr ) => {{
683 use $crate::rumtk_cache_fetch;
684 use $crate::threading::thread_primitives::init_runtime;
685 init_runtime($threads)
686 }};
687 }
688
689 ///
690 /// Puts task onto the runtime queue.
691 ///
692 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
693 ///
694 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
695 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
696 ///
697 #[macro_export]
698 macro_rules! rumtk_spawn_task {
699 ( $func:expr ) => {{
700 let rt = rumtk_init_threads!().expect("Runtime is not initialized!");
701 rt.spawn($func)
702 }};
703 ( $rt:expr, $func:expr ) => {{
704 $rt.spawn($func)
705 }};
706 }
707
708 ///
709 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
710 ///
711 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
712 /// async closure without passing any arguments.
713 ///
714 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
715 /// In such a case, we pass those arguments to the call on the async closure and await on results.
716 ///
717 #[macro_export]
718 macro_rules! rumtk_wait_on_task {
719 ( $func:expr ) => {{
720 use $crate::threading::threading_functions::block_on_task;
721 block_on_task(async move {
722 $func().await
723 })
724 }};
725 ( $func:expr, $($arg_items:expr),+ ) => {{
726 use $crate::threading::threading_functions::block_on_task;
727 block_on_task(async move {
728 $func($($arg_items),+).await
729 })
730 }};
731 }
732
733 ///
734 /// This macro awaits a future.
735 ///
736 /// The arguments are a reference to the runtime (`rt) and a future.
737 ///
738 /// If there is a result, you will get the result of the future.
739 ///
740 /// ## Examples
741 ///
742 /// ```
743 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
744 /// use rumtk_core::core::RUMResult;
745 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
746 ///
747 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
748 /// let mut result = Vec::<i32>::new();
749 /// for arg in args.read().await.iter() {
750 /// result.push(*arg);
751 /// }
752 /// Ok(result)
753 /// }
754 ///
755 /// let args = rumtk_create_task_args!(1);
756 /// let task = rumtk_create_task!(test, args);
757 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
758 /// ```
759 ///
760 #[macro_export]
761 macro_rules! rumtk_resolve_task {
762 ( $future:expr ) => {{
763 use $crate::threading::threading_functions::block_on_task;
764 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
765 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
766 // is technically moved into the async closure and captured so things like mutex guards
767 // technically go out of the outer scope. As a result that expression fails to compile even
768 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
769 // into the async closure. To ensure that happens regardless of given expression, we do
770 // a variable assignment below to force the "future" macro expressions to resolve before
771 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
772 //let future = $future;
773 block_on_task(async move { $future.await })
774 }};
775 }
776
777 #[macro_export]
778 macro_rules! rumtk_resolve_task_from_async {
779 ( $rt:expr, $future:expr ) => {{
780 let handle = $rt.spawn_blocking(async move { future.await })
781 }};
782 }
783
784 ///
785 /// This macro creates an async body that calls the async closure and awaits it.
786 ///
787 /// ## Example
788 ///
789 /// ```
790 /// use std::sync::{Arc, RwLock};
791 /// use tokio::sync::RwLock as AsyncRwLock;
792 /// use rumtk_core::strings::RUMString;
793 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
794 ///
795 /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
796 /// let expected = vec![
797 /// RUMString::from("Hello"),
798 /// RUMString::from("World!"),
799 /// RUMString::from("Overcast"),
800 /// RUMString::from("and"),
801 /// RUMString::from("Sad"),
802 /// ];
803 /// let locked_args = AsyncRwLock::new(expected.clone());
804 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
805 ///
806 ///
807 /// ```
808 ///
809 #[macro_export]
810 macro_rules! rumtk_create_task {
811 ( $func:expr ) => {{
812 async move {
813 let f = $func;
814 f().await
815 }
816 }};
817 ( $func:expr, $args:expr ) => {{
818 let f = $func;
819 async move { f(&$args).await }
820 }};
821 }
822
823 ///
824 /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
825 ///
826 /// ## Note
827 ///
828 /// All arguments must be of the same type
829 ///
830 #[macro_export]
831 macro_rules! rumtk_create_task_args {
832 ( ) => {{
833 use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
834 use $crate::threading::thread_primitives::AsyncRwLock;
835 SafeTaskArgs::new(AsyncRwLock::new(vec![]))
836 }};
837 ( $($args:expr),+ ) => {{
838 use $crate::threading::threading_manager::{SafeTaskArgs};
839 use $crate::threading::thread_primitives::AsyncRwLock;
840 SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
841 }};
842 }
843
844 ///
845 /// Convenience macro for packaging the task components and launching the task in one line.
846 ///
847 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
848 /// number of threads at the end. This is optional. Meaning, we will default to the system's
849 /// number of threads if that value is not specified.
850 ///
851 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
852 /// variable number of arguments to pass to the task. each argument must be of the same type.
853 /// If you wish to pass different arguments with different types, please define an abstract type
854 /// whose underlying structure is a tuple of items and pass that instead.
855 ///
856 /// ## Examples
857 ///
858 /// ### With Default Thread Count
859 /// ```
860 /// use rumtk_core::{rumtk_exec_task};
861 /// use rumtk_core::core::RUMResult;
862 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
863 ///
864 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
865 /// let mut result = Vec::<i32>::new();
866 /// for arg in args.read().await.iter() {
867 /// result.push(*arg);
868 /// }
869 /// Ok(result)
870 /// }
871 ///
872 /// let result = rumtk_exec_task!(test, vec![5]).unwrap();
873 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
874 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
875 /// ```
876 ///
877 /// ### With Custom Thread Count
878 /// ```
879 /// use rumtk_core::{rumtk_exec_task};
880 /// use rumtk_core::core::RUMResult;
881 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
882 ///
883 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
884 /// let mut result = Vec::<i32>::new();
885 /// for arg in args.read().await.iter() {
886 /// result.push(*arg);
887 /// }
888 /// Ok(result)
889 /// }
890 ///
891 /// let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
892 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
893 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
894 /// ```
895 ///
896 /// ### With Async Function Body
897 /// ```
898 /// use rumtk_core::{rumtk_exec_task};
899 /// use rumtk_core::core::RUMResult;
900 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
901 ///
902 /// let result = rumtk_exec_task!(
903 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
904 /// let mut result = Vec::<i32>::new();
905 /// for arg in args.read().await.iter() {
906 /// result.push(*arg);
907 /// }
908 /// Ok(result)
909 /// },
910 /// vec![5]).unwrap();
911 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
912 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
913 /// ```
914 ///
915 /// ### With Async Function Body and No Args
916 /// ```
917 /// use rumtk_core::{rumtk_exec_task};
918 /// use rumtk_core::core::RUMResult;
919 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
920 ///
921 /// let result = rumtk_exec_task!(
922 /// async || -> RUMResult<Vec<i32>> {
923 /// let mut result = Vec::<i32>::new();
924 /// Ok(result)
925 /// }).unwrap();
926 /// let empty = Vec::<i32>::new();
927 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
928 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
929 /// ```
930 ///
931 /// ## Equivalent To
932 ///
933 /// ```
934 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
935 /// use rumtk_core::core::RUMResult;
936 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
937 ///
938 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
939 /// let mut result = Vec::<i32>::new();
940 /// for arg in args.read().await.iter() {
941 /// result.push(*arg);
942 /// }
943 /// Ok(result)
944 /// }
945 ///
946 /// let args = rumtk_create_task_args!(1);
947 /// let task = rumtk_create_task!(test, args);
948 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
949 /// ```
950 ///
951 #[macro_export]
952 macro_rules! rumtk_exec_task {
953 ($func:expr ) => {{
954 use $crate::{
955 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
956 };
957 let task = rumtk_create_task!($func);
958 rumtk_resolve_task!(task)
959 }};
960 ($func:expr, $args:expr ) => {{
961 use $crate::threading::threading_functions::get_default_system_thread_count;
962 rumtk_exec_task!($func, $args, get_default_system_thread_count())
963 }};
964 ($func:expr, $args:expr , $threads:expr ) => {{
965 use $crate::threading::thread_primitives::AsyncRwLock;
966 use $crate::{
967 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
968 };
969 let args = SafeTaskArgs::new(AsyncRwLock::new($args));
970 let task = rumtk_create_task!($func, args);
971 rumtk_resolve_task!(task)
972 }};
973 }
974
975 ///
976 /// Sleep a duration of time in a sync context, so no await can be call on the result.
977 ///
978 /// You can pass any value that can be cast to f32.
979 ///
980 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
981 ///
982 /// ## Examples
983 ///
984 /// ```
985 /// use rumtk_core::rumtk_sleep;
986 /// rumtk_sleep!(1); // Sleeps for 1 second.
987 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
988 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
989 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
990 /// ```
991 ///
992 #[macro_export]
993 macro_rules! rumtk_sleep {
994 ( $dur:expr) => {{
995 use $crate::threading::threading_functions::sleep;
996 sleep($dur as f32)
997 }};
998 }
999
1000 ///
1001 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1002 ///
1003 /// You can pass any value that can be cast to f32.
1004 ///
1005 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1006 ///
1007 /// ## Examples
1008 ///
1009 /// ```
1010 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1011 /// use rumtk_core::core::RUMResult;
1012 /// rumtk_exec_task!( async || -> RUMResult<()> {
1013 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
1014 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
1015 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
1016 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1017 /// Ok(())
1018 /// }
1019 /// );
1020 /// ```
1021 ///
1022 #[macro_export]
1023 macro_rules! rumtk_async_sleep {
1024 ( $dur:expr) => {{
1025 use $crate::threading::threading_functions::async_sleep;
1026 async_sleep($dur as f32)
1027 }};
1028 }
1029
1030 ///
1031 ///
1032 ///
1033 #[macro_export]
1034 macro_rules! rumtk_new_task_queue {
1035 ( $worker_num:expr ) => {{
1036 use $crate::threading::threading_manager::TaskManager;
1037 TaskManager::new($worker_num);
1038 }};
1039 }
1040}