rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22///
23/// This module provides all the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26 use crate::core::RUMResult;
27 use crate::strings::rumtk_format;
28 pub use std::sync::Mutex as SyncMutex;
29 pub use std::sync::MutexGuard as SyncMutexGuard;
30 use std::sync::OnceLock;
31 pub use std::sync::RwLock as SyncRwLock;
32 pub use tokio::io;
33 pub use tokio::io::{AsyncReadExt, AsyncWriteExt};
34 use tokio::runtime::Runtime as TokioRuntime;
35 pub use tokio::sync::{
36 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock,
37 RwLockReadGuard as AsyncRwLockReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
38 };
39
40 pub type RuntimeGuard<'a> = SyncMutexGuard<'a, TokioRuntime>;
41 /**************************** Types ***************************************/
42 pub type SafeTokioRuntime = OnceLock<SyncMutex<TokioRuntime>>;
43 /**************************** Globals **************************************/
44 static mut DEFAULT_RUNTIME: SafeTokioRuntime = SafeTokioRuntime::new();
45 /**************************** Helpers ***************************************/
46 pub fn init_runtime(workers: usize) -> RUMResult<RuntimeGuard<'static>> {
47 unsafe {
48 let handle = DEFAULT_RUNTIME.get_or_init(|| {
49 let mut builder = tokio::runtime::Builder::new_multi_thread();
50 builder.worker_threads(workers);
51 builder.enable_all();
52 match builder.build() {
53 Ok(handle) => SyncMutex::new(handle),
54 Err(e) => panic!(
55 "Unable to initialize threading tokio runtime because {}!",
56 &e
57 ),
58 }
59 });
60 match handle.lock() {
61 Ok(guard) => Ok(guard),
62 Err(e) => Err(rumtk_format!("Unable to lock tokio runtime!")),
63 }
64 }
65 }
66}
67
68pub mod threading_manager {
69 use crate::core::{RUMResult, RUMVec};
70 use crate::strings::rumtk_format;
71 use crate::threading::threading_functions::{async_sleep, sleep};
72 use crate::types::{RUMHashMap, RUMID};
73 use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
74 use std::future::Future;
75 use std::sync::Arc;
76 pub use std::sync::RwLock as SyncRwLock;
77 use tokio::sync::RwLock as AsyncRwLock;
78 use tokio::task::JoinHandle;
79
80 const DEFAULT_SLEEP_DURATION: f32 = 0.001f32;
81 const DEFAULT_TASK_CAPACITY: usize = 100;
82
83 pub type AsyncHandle<T> = JoinHandle<T>;
84 pub type TaskItems<T> = RUMVec<T>;
85 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
86 pub type TaskArgs<T> = TaskItems<T>;
87 /// Function signature defining the interface of task processing logic.
88 pub type SafeTaskArgs<T> = Arc<AsyncRwLock<TaskItems<T>>>;
89 pub type AsyncTaskHandle<R> = AsyncHandle<TaskResult<R>>;
90 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
91 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
92 pub type TaskID = RUMID;
93
94 #[derive(Debug, Clone, Default)]
95 pub struct Task<R> {
96 pub id: TaskID,
97 pub finished: bool,
98 pub result: Option<R>,
99 }
100
101 pub type SafeTask<R> = Arc<Task<R>>;
102 type SafeInternalTask<R> = Arc<SyncRwLock<Task<R>>>;
103 pub type TaskTable<R> = RUMHashMap<TaskID, SafeInternalTask<R>>;
104 pub type SafeAsyncTaskTable<R> = Arc<AsyncRwLock<TaskTable<R>>>;
105 pub type SafeSyncTaskTable<R> = Arc<SyncRwLock<TaskTable<R>>>;
106 pub type TaskBatch = RUMVec<TaskID>;
107 /// Type to use to define how task results are expected to be returned.
108 pub type TaskResult<R> = RUMResult<SafeTask<R>>;
109 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
110
111 ///
112 /// Manages asynchronous tasks submitted as micro jobs from synchronous code. This type essentially
113 /// gives the multithreading, asynchronous superpowers to synchronous logic.
114 ///
115 /// ## Example Usage
116 ///
117 /// ```
118 /// use std::sync::{Arc};
119 /// use tokio::sync::RwLock as AsyncRwLock;
120 /// use rumtk_core::core::RUMResult;
121 /// use rumtk_core::strings::RUMString;
122 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems, TaskManager};
123 /// use rumtk_core::{rumtk_create_task, };
124 ///
125 /// let expected = vec![
126 /// RUMString::from("Hello"),
127 /// RUMString::from("World!"),
128 /// RUMString::from("Overcast"),
129 /// RUMString::from("and"),
130 /// RUMString::from("Sad"),
131 /// ];
132 ///
133 /// type TestResult = RUMResult<Vec<RUMString>>;
134 /// let mut queue: TaskManager<TestResult> = TaskManager::new(&5).unwrap();
135 ///
136 /// let locked_args = AsyncRwLock::new(expected.clone());
137 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
138 /// let processor = rumtk_create_task!(
139 /// async |args: &SafeTaskArgs<RUMString>| -> TestResult {
140 /// let owned_args = Arc::clone(args);
141 /// let locked_args = owned_args.read().await;
142 /// let mut results = TaskItems::<RUMString>::with_capacity(locked_args.len());
143 ///
144 /// for arg in locked_args.iter() {
145 /// results.push(RUMString::new(arg));
146 /// }
147 ///
148 /// Ok(results)
149 /// },
150 /// task_args
151 /// );
152 ///
153 /// queue.add_task::<_>(processor);
154 /// let results = queue.wait();
155 ///
156 /// let mut result_data = Vec::<RUMString>::with_capacity(5);
157 /// for r in results {
158 /// for v in r.unwrap().result.clone().unwrap().iter() {
159 /// for value in v.iter() {
160 /// result_data.push(value.clone());
161 /// }
162 /// }
163 /// }
164 ///
165 /// assert_eq!(result_data, expected, "Results do not match expected!");
166 ///
167 /// ```
168 ///
169 #[derive(Debug, Clone, Default)]
170 pub struct TaskManager<R> {
171 tasks: SafeSyncTaskTable<R>,
172 workers: usize,
173 }
174
175 impl<R> TaskManager<R>
176 where
177 R: Sync + Send + Clone + 'static,
178 {
179 ///
180 /// This method creates a [`TaskManager`] instance using sensible defaults.
181 ///
182 /// The `threads` field is computed from the number of cores present in system.
183 ///
184 pub fn default() -> RUMResult<TaskManager<R>> {
185 Self::new(&threading::threading_functions::get_default_system_thread_count())
186 }
187
188 ///
189 /// Creates an instance of [`TaskManager<R>`](TaskManager<R>).
190 /// Expects you to provide the count of threads to spawn and the microtask queue size
191 /// allocated by each thread.
192 ///
193 /// This method calls [`TaskTable::with_capacity()`](TaskTable::with_capacity) for the actual object creation.
194 /// The main queue capacity is pre-allocated to [`DEFAULT_TASK_CAPACITY`](DEFAULT_TASK_CAPACITY).
195 ///
196 pub fn new(worker_num: &usize) -> RUMResult<TaskManager<R>> {
197 let tasks = SafeSyncTaskTable::<R>::new(SyncRwLock::new(TaskTable::with_capacity(
198 DEFAULT_TASK_CAPACITY,
199 )));
200 Ok(TaskManager::<R> {
201 tasks,
202 workers: worker_num.to_owned(),
203 })
204 }
205
206 ///
207 /// Add a task to the processing queue. The idea is that you can queue a processor function
208 /// and list of args that will be picked up by one of the threads for processing.
209 ///
210 /// This is the async counterpart
211 ///
212 pub async fn add_task_async<F>(&mut self, task: F) -> TaskID
213 where
214 F: Future<Output = R> + Send + Sync + 'static,
215 F::Output: Send + 'static,
216 {
217 let id = TaskID::new_v4();
218 Self::_add_task_async(id.clone(), self.tasks.clone(), task).await
219 }
220
221 ///
222 /// See [`Self::add_task_async`]
223 ///
224 /// Unlike `add_task`, this method does not block which is key to avoiding panicking
225 /// the tokio runtim if trying to add task to queue from a normal function called from an
226 /// async environment.
227 ///
228 /// ## Example
229 ///
230 /// ```
231 /// use rumtk_core::threading::threading_manager::{TaskManager};
232 /// use rumtk_core::{rumtk_init_threads, strings::rumtk_format};
233 /// use std::sync::{Arc};
234 /// use tokio::sync::Mutex;
235 ///
236 /// type JobManager = Arc<Mutex<TaskManager<usize>>>;
237 ///
238 /// async fn called_fn() -> usize {
239 /// 5
240 /// }
241 ///
242 /// fn push_job(manager: &mut TaskManager<usize>) -> usize {
243 /// manager.spawn_task(called_fn());
244 /// 1
245 /// }
246 ///
247 /// async fn call_sync_fn(mut manager: JobManager) -> usize {
248 /// let mut owned = manager.lock().await;
249 /// push_job(&mut owned)
250 /// }
251 ///
252 /// let workers = 5;
253 /// let mut manager = Arc::new(Mutex::new(TaskManager::new(&workers).unwrap()));
254 ///
255 /// manager.blocking_lock().spawn_task(call_sync_fn(manager.clone()));
256 ///
257 /// let result_raw = manager.blocking_lock().wait();
258 ///
259 /// ```
260 ///
261 pub fn spawn_task<F>(&mut self, task: F) -> RUMResult<TaskID>
262 where
263 F: Future<Output = R> + Send + Sync + 'static,
264 F::Output: Send + Sized + 'static,
265 {
266 let id = TaskID::new_v4();
267 let rt = rumtk_init_threads!(self.workers)?;
268 rumtk_spawn_task!(
269 rt,
270 Self::_add_task_async(id.clone(), self.tasks.clone(), task)
271 );
272 Ok(id)
273 }
274
275 ///
276 /// See [add_task_async](Self::add_task_async)
277 ///
278 pub fn add_task<F>(&mut self, task: F) -> RUMResult<TaskID>
279 where
280 F: Future<Output = R> + Send + Sync + 'static,
281 F::Output: Send + Sized + 'static,
282 {
283 let id = TaskID::new_v4();
284 let tasks = self.tasks.clone();
285 rumtk_resolve_task!(Self::_add_task_async(id.clone(), tasks, task))
286 }
287
288 async fn _add_task_async<F>(id: TaskID, tasks: SafeSyncTaskTable<R>, task: F) -> TaskID
289 where
290 F: Future<Output = R> + Send + Sync + 'static,
291 F::Output: Send + Sized + 'static,
292 {
293 let mut safe_task = Arc::new(SyncRwLock::new(Task::<R> {
294 id: id.clone(),
295 finished: false,
296 result: None,
297 }));
298 tasks.write().unwrap().insert(id.clone(), safe_task.clone());
299
300 let task_wrapper = async move || {
301 // Run the task
302 let result = task.await;
303
304 // Cleanup task
305 let mut lock = safe_task.write().unwrap();
306 lock.result = Some(result);
307 lock.finished = true;
308 };
309
310 tokio::spawn(task_wrapper());
311
312 id
313 }
314
315 ///
316 /// See [wait_async](Self::wait_async)
317 ///
318 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
319 /// this function happens to be called from the async context.
320 ///
321 pub fn wait(&mut self) -> TaskResults<R> {
322 let task_batch = self
323 .tasks
324 .read()
325 .unwrap()
326 .keys()
327 .cloned()
328 .collect::<Vec<_>>();
329 self.wait_on_batch(&task_batch)
330 }
331
332 ///
333 /// See [wait_on_batch_async](Self::wait_on_batch_async)
334 ///
335 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
336 /// this function happens to be called from the async context.
337 ///
338 pub fn wait_on_batch(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
339 let mut results = TaskResults::<R>::default();
340 for task in tasks {
341 results.push(self.wait_on(task));
342 }
343 results
344 }
345
346 ///
347 /// See [wait_on_async](Self::wait_on_async)
348 ///
349 /// Duplicated here because we can't request the tokio runtime to do a quick exec for us if
350 /// this function happens to be called from the async context.
351 ///
352 pub fn wait_on(&mut self, task_id: &TaskID) -> TaskResult<R> {
353 let task = match self.tasks.write().unwrap().remove(task_id) {
354 Some(task) => task.clone(),
355 None => return Err(rumtk_format!("No task with id {}", task_id)),
356 };
357
358 while !task.read().unwrap().finished {
359 sleep(DEFAULT_SLEEP_DURATION);
360 }
361
362 let x = Ok(Arc::new(task.write().unwrap().clone()));
363 x
364 }
365
366 ///
367 /// This method waits until a queued task with [TaskID](TaskID) has been processed from the main queue.
368 ///
369 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
370 ///
371 /// Upon completion,
372 ///
373 /// 2. Return the result ([TaskResults<R>](TaskResults)).
374 ///
375 /// This operation consumes the task.
376 ///
377 /// ### Note:
378 /// ```text
379 /// Results returned here are not guaranteed to be in the same order as the order in which
380 /// the tasks were queued for work. You will need to pass a type as T that automatically
381 /// tracks its own id or has a way for you to resort results.
382 /// ```
383 pub async fn wait_on_async(&mut self, task_id: &TaskID) -> TaskResult<R> {
384 let task = match self.tasks.write().unwrap().remove(task_id) {
385 Some(task) => task.clone(),
386 None => return Err(rumtk_format!("No task with id {}", task_id)),
387 };
388
389 while !task.read().unwrap().finished {
390 async_sleep(DEFAULT_SLEEP_DURATION).await;
391 }
392
393 let x = Ok(Arc::new(task.write().unwrap().clone()));
394 x
395 }
396
397 ///
398 /// This method waits until a set of queued tasks with [TaskID](TaskID) has been processed from the main queue.
399 ///
400 /// We poll the status of the task every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
401 ///
402 /// Upon completion,
403 ///
404 /// 1. We collect the results generated (if any).
405 /// 2. Return the list of results ([TaskResults<R>](TaskResults)).
406 ///
407 /// ### Note:
408 /// ```text
409 /// Results returned here are not guaranteed to be in the same order as the order in which
410 /// the tasks were queued for work. You will need to pass a type as T that automatically
411 /// tracks its own id or has a way for you to resort results.
412 /// ```
413 pub async fn wait_on_batch_async(&mut self, tasks: &TaskBatch) -> TaskResults<R> {
414 let mut results = TaskResults::<R>::default();
415 for task in tasks {
416 results.push(self.wait_on_async(task).await);
417 }
418 results
419 }
420
421 ///
422 /// This method waits until all queued tasks have been processed from the main queue.
423 ///
424 /// We poll the status of the main queue every [DEFAULT_SLEEP_DURATION](DEFAULT_SLEEP_DURATION) ms.
425 ///
426 /// Upon completion,
427 ///
428 /// 1. We collect the results generated (if any).
429 /// 2. We reset the main task and result internal queue states.
430 /// 3. Return the list of results ([TaskResults<R>](TaskResults)).
431 ///
432 /// This operation consumes all the tasks.
433 ///
434 /// ### Note:
435 /// ```text
436 /// Results returned here are not guaranteed to be in the same order as the order in which
437 /// the tasks were queued for work. You will need to pass a type as T that automatically
438 /// tracks its own id or has a way for you to resort results.
439 /// ```
440 pub async fn wait_async(&mut self) -> TaskResults<R> {
441 let task_batch = self
442 .tasks
443 .read()
444 .unwrap()
445 .keys()
446 .cloned()
447 .collect::<Vec<_>>();
448 self.wait_on_batch_async(&task_batch).await
449 }
450
451 ///
452 /// Check if all work has been completed from the task queue.
453 ///
454 /// ## Examples
455 ///
456 /// ### Sync Usage
457 ///
458 ///```
459 /// use rumtk_core::threading::threading_manager::TaskManager;
460 ///
461 /// let manager = TaskManager::<usize>::new(&4).unwrap();
462 ///
463 /// let all_done = manager.is_all_completed();
464 ///
465 /// assert_eq!(all_done, true, "Empty TaskManager reports tasks are not completed!");
466 ///
467 /// ```
468 ///
469 pub fn is_all_completed(&self) -> bool {
470 self._is_all_completed_async()
471 }
472
473 pub async fn is_all_completed_async(&self) -> bool {
474 self._is_all_completed_async()
475 }
476
477 fn _is_all_completed_async(&self) -> bool {
478 for (_, task) in self.tasks.read().unwrap().iter() {
479 if !task.read().unwrap().finished {
480 return false;
481 }
482 }
483
484 true
485 }
486
487 ///
488 /// Check if a task completed
489 ///
490 pub fn is_finished(&self, id: &TaskID) -> bool {
491 match self.tasks.read().unwrap().get(id) {
492 Some(t) => t.read().unwrap().finished,
493 None => false,
494 }
495 }
496
497 pub async fn is_finished_async(&self, id: &TaskID) -> bool {
498 match self.tasks.read().unwrap().get(id) {
499 Some(task) => task.read().unwrap().finished,
500 None => true,
501 }
502 }
503
504 ///
505 /// Alias for [wait](TaskManager::wait).
506 ///
507 fn gather(&mut self) -> TaskResults<R> {
508 self.wait()
509 }
510 }
511}
512
513///
514/// This module contains a few helper.
515///
516/// For example, you can find a function for determining number of threads available in system.
517/// The sleep family of functions are also here.
518///
519pub mod threading_functions {
520 use crate::core::RUMResult;
521 use crate::rumtk_sleep;
522 use crate::strings::rumtk_format;
523 pub use crate::threading::thread_primitives::init_runtime;
524 use num_cpus;
525 use std::future::Future;
526 use std::thread::{available_parallelism, sleep as std_sleep};
527 use std::time::Duration;
528 use tokio::time::sleep as tokio_sleep;
529
530 pub const NANOS_PER_SEC: u64 = 1000000000;
531 pub const MILLIS_PER_SEC: u64 = 1000;
532 pub const MICROS_PER_SEC: u64 = 1000000;
533
534 const DEFAULT_SLEEP_DURATION: f32 = 0.001;
535
536 pub fn get_default_system_thread_count() -> usize {
537 let cpus: usize = num_cpus::get();
538 let parallelism = match available_parallelism() {
539 Ok(n) => n.get(),
540 Err(_) => 0,
541 };
542
543 if parallelism >= cpus {
544 parallelism
545 } else {
546 cpus
547 }
548 }
549
550 pub fn sleep(s: f32) {
551 let ns = s * NANOS_PER_SEC as f32;
552 let rounded_ns = ns.round() as u64;
553 let duration = Duration::from_nanos(rounded_ns);
554 std_sleep(duration);
555 }
556
557 pub async fn async_sleep(s: f32) {
558 let ns = s * NANOS_PER_SEC as f32;
559 let rounded_ns = ns.round() as u64;
560 let duration = Duration::from_nanos(rounded_ns);
561 tokio_sleep(duration).await;
562 }
563
564 ///
565 /// Given a closure task, push it onto the current `tokio` runtime for execution.
566 /// Every [DEFAULT_SLEEP_DURATION] seconds, we check if the task has concluded.
567 /// Once the task has concluded, we call [tokio::block_on](tokio::task::block_in_place) to resolve and extract the task
568 /// result.
569 ///
570 /// Because this helper function can fail, the return value is wrapped inside a [RUMResult].
571 ///
572 /// ## Example
573 ///
574 /// ```
575 /// use rumtk_core::threading::threading_functions::{init_runtime, block_on_task};
576 ///
577 /// const Hello: &str = "World!";
578 ///
579 /// init_runtime(5);
580 ///
581 /// let result = block_on_task(async {
582 /// Hello
583 /// }).unwrap();
584 ///
585 /// assert_eq!(Hello, result, "Result mismatches expected! {} vs. {}", Hello, result);
586 /// ```
587 ///
588 pub fn block_on_task<R, F>(task: F) -> RUMResult<R>
589 where
590 F: Future<Output = R> + Send + 'static,
591 F::Output: Send + 'static,
592 {
593 let default_system_thread_count = get_default_system_thread_count();
594
595 let handle = init_runtime(default_system_thread_count)
596 .expect("Failed to initialize runtime")
597 .spawn(task);
598
599 while !handle.is_finished() {
600 rumtk_sleep!(DEFAULT_SLEEP_DURATION);
601 }
602
603 tokio::task::block_in_place(move || {
604 match init_runtime(default_system_thread_count)
605 .expect("Failed to initialize runtime")
606 .block_on(handle)
607 {
608 Ok(r) => Ok(r),
609 Err(e) => Err(rumtk_format!(
610 "Issue peeking into task in the runtime because => {}",
611 &e
612 )),
613 }
614 })
615 }
616}
617
618///
619/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
620/// This means that by default, all jobs sent to the thread pool have to be async in nature.
621/// These macros make handling of these jobs at the sync/async boundary more convenient.
622///
623pub mod threading_macros {
624 use crate::threading::thread_primitives;
625 use crate::threading::threading_manager::SafeTaskArgs;
626
627 ///
628 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
629 /// will be saved to the global context so the next call to this macro will simply grab a
630 /// reference to the previously initialized runtime.
631 ///
632 /// Passing nothing will default to initializing a runtime using the default number of threads
633 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
634 ///
635 /// Passing `threads` number will yield a runtime that allocates that many threads.
636 ///
637 ///
638 /// ## Examples
639 ///
640 /// ```
641 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
642 /// use rumtk_core::core::RUMResult;
643 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
644 ///
645 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
646 /// let mut result = Vec::<i32>::new();
647 /// for arg in args.read().await.iter() {
648 /// result.push(*arg);
649 /// }
650 /// Ok(result)
651 /// }
652 ///
653 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
654 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
655 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task)); // Spawn's task and waits for it to conclude.
656 /// ```
657 ///
658 /// ```
659 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
660 /// use rumtk_core::core::RUMResult;
661 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
662 ///
663 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
664 /// let mut result = Vec::<i32>::new();
665 /// for arg in args.read().await.iter() {
666 /// result.push(*arg);
667 /// }
668 /// Ok(result)
669 /// }
670 ///
671 /// let thread_count: usize = 10;
672 /// let args = rumtk_create_task_args!(1);
673 /// let task = rumtk_create_task!(test, args);
674 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
675 /// ```
676 #[macro_export]
677 macro_rules! rumtk_init_threads {
678 ( ) => {{
679 use $crate::threading::thread_primitives::init_runtime;
680 use $crate::threading::threading_functions::get_default_system_thread_count;
681 init_runtime(get_default_system_thread_count())
682 }};
683 ( $threads:expr ) => {{
684 use $crate::rumtk_cache_fetch;
685 use $crate::threading::thread_primitives::init_runtime;
686 init_runtime($threads)
687 }};
688 }
689
690 ///
691 /// Puts task onto the runtime queue.
692 ///
693 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
694 ///
695 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
696 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
697 ///
698 #[macro_export]
699 macro_rules! rumtk_spawn_task {
700 ( $func:expr ) => {{
701 let rt = rumtk_init_threads!().expect("Runtime is not initialized!");
702 rt.spawn($func)
703 }};
704 ( $rt:expr, $func:expr ) => {{
705 $rt.spawn($func)
706 }};
707 }
708
709 ///
710 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
711 ///
712 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
713 /// async closure without passing any arguments.
714 ///
715 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
716 /// In such a case, we pass those arguments to the call on the async closure and await on results.
717 ///
718 #[macro_export]
719 macro_rules! rumtk_wait_on_task {
720 ( $func:expr ) => {{
721 use $crate::threading::threading_functions::block_on_task;
722 block_on_task(async move {
723 $func().await
724 })
725 }};
726 ( $func:expr, $($arg_items:expr),+ ) => {{
727 use $crate::threading::threading_functions::block_on_task;
728 block_on_task(async move {
729 $func($($arg_items),+).await
730 })
731 }};
732 }
733
734 ///
735 /// This macro awaits a future.
736 ///
737 /// The arguments are a reference to the runtime (`rt) and a future.
738 ///
739 /// If there is a result, you will get the result of the future.
740 ///
741 /// ## Examples
742 ///
743 /// ```
744 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
745 /// use rumtk_core::core::RUMResult;
746 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
747 ///
748 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
749 /// let mut result = Vec::<i32>::new();
750 /// for arg in args.read().await.iter() {
751 /// result.push(*arg);
752 /// }
753 /// Ok(result)
754 /// }
755 ///
756 /// let args = rumtk_create_task_args!(1);
757 /// let task = rumtk_create_task!(test, args);
758 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
759 /// ```
760 ///
761 #[macro_export]
762 macro_rules! rumtk_resolve_task {
763 ( $future:expr ) => {{
764 use $crate::threading::threading_functions::block_on_task;
765 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
766 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
767 // is technically moved into the async closure and captured so things like mutex guards
768 // technically go out of the outer scope. As a result that expression fails to compile even
769 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
770 // into the async closure. To ensure that happens regardless of given expression, we do
771 // a variable assignment below to force the "future" macro expressions to resolve before
772 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
773 //let future = $future;
774 block_on_task(async move { $future.await })
775 }};
776 }
777
778 #[macro_export]
779 macro_rules! rumtk_resolve_task_from_async {
780 ( $rt:expr, $future:expr ) => {{
781 let handle = $rt.spawn_blocking(async move { future.await })
782 }};
783 }
784
785 ///
786 /// This macro creates an async body that calls the async closure and awaits it.
787 ///
788 /// ## Example
789 ///
790 /// ```
791 /// use std::sync::{Arc, RwLock};
792 /// use tokio::sync::RwLock as AsyncRwLock;
793 /// use rumtk_core::strings::RUMString;
794 /// use rumtk_core::threading::threading_manager::{SafeTaskArgs, TaskItems};
795 ///
796 /// pub type SafeTaskArgs2<T> = Arc<RwLock<TaskItems<T>>>;
797 /// let expected = vec![
798 /// RUMString::from("Hello"),
799 /// RUMString::from("World!"),
800 /// RUMString::from("Overcast"),
801 /// RUMString::from("and"),
802 /// RUMString::from("Sad"),
803 /// ];
804 /// let locked_args = AsyncRwLock::new(expected.clone());
805 /// let task_args = SafeTaskArgs::<RUMString>::new(locked_args);
806 ///
807 ///
808 /// ```
809 ///
810 #[macro_export]
811 macro_rules! rumtk_create_task {
812 ( $func:expr ) => {{
813 async move {
814 let f = $func;
815 f().await
816 }
817 }};
818 ( $func:expr, $args:expr ) => {{
819 let f = $func;
820 async move { f(&$args).await }
821 }};
822 }
823
824 ///
825 /// Creates an instance of [SafeTaskArgs](SafeTaskArgs) with the arguments passed.
826 ///
827 /// ## Note
828 ///
829 /// All arguments must be of the same type
830 ///
831 #[macro_export]
832 macro_rules! rumtk_create_task_args {
833 ( ) => {{
834 use $crate::threading::threading_manager::{TaskArgs, SafeTaskArgs, TaskItems};
835 use $crate::threading::thread_primitives::AsyncRwLock;
836 SafeTaskArgs::new(AsyncRwLock::new(vec![]))
837 }};
838 ( $($args:expr),+ ) => {{
839 use $crate::threading::threading_manager::{SafeTaskArgs};
840 use $crate::threading::thread_primitives::AsyncRwLock;
841 SafeTaskArgs::new(AsyncRwLock::new(vec![$($args),+]))
842 }};
843 }
844
845 ///
846 /// Convenience macro for packaging the task components and launching the task in one line.
847 ///
848 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
849 /// number of threads at the end. This is optional. Meaning, we will default to the system's
850 /// number of threads if that value is not specified.
851 ///
852 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
853 /// variable number of arguments to pass to the task. each argument must be of the same type.
854 /// If you wish to pass different arguments with different types, please define an abstract type
855 /// whose underlying structure is a tuple of items and pass that instead.
856 ///
857 /// ## Examples
858 ///
859 /// ### With Default Thread Count
860 /// ```
861 /// use rumtk_core::{rumtk_exec_task};
862 /// use rumtk_core::core::RUMResult;
863 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
864 ///
865 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
866 /// let mut result = Vec::<i32>::new();
867 /// for arg in args.read().await.iter() {
868 /// result.push(*arg);
869 /// }
870 /// Ok(result)
871 /// }
872 ///
873 /// let result = rumtk_exec_task!(test, vec![5]).unwrap();
874 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
875 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
876 /// ```
877 ///
878 /// ### With Custom Thread Count
879 /// ```
880 /// use rumtk_core::{rumtk_exec_task};
881 /// use rumtk_core::core::RUMResult;
882 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
883 ///
884 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
885 /// let mut result = Vec::<i32>::new();
886 /// for arg in args.read().await.iter() {
887 /// result.push(*arg);
888 /// }
889 /// Ok(result)
890 /// }
891 ///
892 /// let result = rumtk_exec_task!(test, vec![5], 5).unwrap();
893 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
894 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
895 /// ```
896 ///
897 /// ### With Async Function Body
898 /// ```
899 /// use rumtk_core::{rumtk_exec_task};
900 /// use rumtk_core::core::RUMResult;
901 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
902 ///
903 /// let result = rumtk_exec_task!(
904 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
905 /// let mut result = Vec::<i32>::new();
906 /// for arg in args.read().await.iter() {
907 /// result.push(*arg);
908 /// }
909 /// Ok(result)
910 /// },
911 /// vec![5]).unwrap();
912 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
913 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
914 /// ```
915 ///
916 /// ### With Async Function Body and No Args
917 /// ```
918 /// use rumtk_core::{rumtk_exec_task};
919 /// use rumtk_core::core::RUMResult;
920 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
921 ///
922 /// let result = rumtk_exec_task!(
923 /// async || -> RUMResult<Vec<i32>> {
924 /// let mut result = Vec::<i32>::new();
925 /// Ok(result)
926 /// }).unwrap();
927 /// let empty = Vec::<i32>::new();
928 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
929 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
930 /// ```
931 ///
932 /// ## Equivalent To
933 ///
934 /// ```
935 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
936 /// use rumtk_core::core::RUMResult;
937 /// use rumtk_core::threading::threading_manager::SafeTaskArgs;
938 ///
939 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
940 /// let mut result = Vec::<i32>::new();
941 /// for arg in args.read().await.iter() {
942 /// result.push(*arg);
943 /// }
944 /// Ok(result)
945 /// }
946 ///
947 /// let args = rumtk_create_task_args!(1);
948 /// let task = rumtk_create_task!(test, args);
949 /// let result = rumtk_resolve_task!(rumtk_spawn_task!(task));
950 /// ```
951 ///
952 #[macro_export]
953 macro_rules! rumtk_exec_task {
954 ($func:expr ) => {{
955 use $crate::{
956 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
957 };
958 let task = rumtk_create_task!($func);
959 rumtk_resolve_task!(task)
960 }};
961 ($func:expr, $args:expr ) => {{
962 use $crate::threading::threading_functions::get_default_system_thread_count;
963 rumtk_exec_task!($func, $args, get_default_system_thread_count())
964 }};
965 ($func:expr, $args:expr , $threads:expr ) => {{
966 use $crate::threading::thread_primitives::AsyncRwLock;
967 use $crate::{
968 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
969 };
970 let args = SafeTaskArgs::new(AsyncRwLock::new($args));
971 let task = rumtk_create_task!($func, args);
972 rumtk_resolve_task!(task)
973 }};
974 }
975
976 ///
977 /// Sleep a duration of time in a sync context, so no await can be call on the result.
978 ///
979 /// You can pass any value that can be cast to f32.
980 ///
981 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
982 ///
983 /// ## Examples
984 ///
985 /// ```
986 /// use rumtk_core::rumtk_sleep;
987 /// rumtk_sleep!(1); // Sleeps for 1 second.
988 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
989 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
990 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
991 /// ```
992 ///
993 #[macro_export]
994 macro_rules! rumtk_sleep {
995 ( $dur:expr) => {{
996 use $crate::threading::threading_functions::sleep;
997 sleep($dur as f32)
998 }};
999 }
1000
1001 ///
1002 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
1003 ///
1004 /// You can pass any value that can be cast to f32.
1005 ///
1006 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
1007 ///
1008 /// ## Examples
1009 ///
1010 /// ```
1011 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
1012 /// use rumtk_core::core::RUMResult;
1013 /// rumtk_exec_task!( async || -> RUMResult<()> {
1014 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
1015 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
1016 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
1017 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
1018 /// Ok(())
1019 /// }
1020 /// );
1021 /// ```
1022 ///
1023 #[macro_export]
1024 macro_rules! rumtk_async_sleep {
1025 ( $dur:expr) => {{
1026 use $crate::threading::threading_functions::async_sleep;
1027 async_sleep($dur as f32)
1028 }};
1029 }
1030
1031 ///
1032 ///
1033 ///
1034 #[macro_export]
1035 macro_rules! rumtk_new_task_queue {
1036 ( $worker_num:expr ) => {{
1037 use $crate::threading::threading_manager::TaskManager;
1038 TaskManager::new($worker_num);
1039 }};
1040 }
1041}