rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21///
22/// This module provides all of the primitives needed to build a multithreaded application.
23///
24pub mod thread_primitives {
25 use crate::cache::{new_cache, LazyRUMCache};
26 use crate::core::{RUMResult, RUMVec};
27 use std::future::IntoFuture;
28 use std::sync::Arc;
29 use tokio::runtime::Runtime as TokioRuntime;
30 use tokio::sync::RwLock;
31 use tokio::task::JoinHandle;
32
33 /**************************** Globals **************************************/
34 pub static mut rt_cache: TokioRtCache = new_cache();
35 /**************************** Helpers ***************************************/
36 pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
37 let mut builder = tokio::runtime::Builder::new_multi_thread();
38 builder.worker_threads(*threads);
39 builder.enable_all();
40 match builder.build() {
41 Ok(handle) => Arc::new(handle),
42 Err(e) => panic!(
43 "Unable to initialize threading tokio runtime because {}!",
44 &e
45 ),
46 }
47 }
48
49 /**************************** Types ***************************************/
50 pub type SafeTokioRuntime = Arc<TokioRuntime>;
51 pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
52 pub type TaskItems<T> = RUMVec<T>;
53 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
54 pub type TaskArgs<T> = TaskItems<T>;
55 /// Type to use to define how task results are expected to be returned.
56 pub type TaskResult<R> = RUMResult<TaskItems<R>>;
57 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
58 /// Function signature defining the interface of task processing logic.
59 pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
60 pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
61 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
62 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
63}
64
65///
66/// This module contains a few helper.
67///
68/// For example, you can find a function for determining number of threads available in system.
69/// The sleep family of functions are also here.
70///
71pub mod threading_functions {
72 use num_cpus;
73 use std::thread::{available_parallelism, sleep as std_sleep};
74 use std::time::Duration;
75 use tokio::time::sleep as tokio_sleep;
76
77 pub const NANOS_PER_SEC: u64 = 1000000000;
78 pub const MILLIS_PER_SEC: u64 = 1000;
79 pub const MICROS_PER_SEC: u64 = 1000000;
80
81 pub fn get_default_system_thread_count() -> usize {
82 let cpus: usize = num_cpus::get();
83 let parallelism = match available_parallelism() {
84 Ok(n) => n.get(),
85 Err(_) => 0,
86 };
87
88 if parallelism >= cpus {
89 parallelism
90 } else {
91 cpus
92 }
93 }
94
95 pub fn sleep(s: f32) {
96 let ns = s * NANOS_PER_SEC as f32;
97 let rounded_ns = ns.round() as u64;
98 let duration = Duration::from_nanos(rounded_ns);
99 std_sleep(duration);
100 }
101
102 pub async fn async_sleep(s: f32) {
103 let ns = s * NANOS_PER_SEC as f32;
104 let rounded_ns = ns.round() as u64;
105 let duration = Duration::from_nanos(rounded_ns);
106 tokio_sleep(duration).await;
107 }
108}
109
110///
111/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
112/// This means that by default, all jobs sent to the thread pool have to be async in nature.
113/// These macros make handling of these jobs at the sync/async boundary more convenient.
114///
115pub mod threading_macros {
116 use crate::threading::thread_primitives;
117 use crate::threading::thread_primitives::SafeTaskArgs;
118
119 ///
120 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
121 /// will be saved to the global context so the next call to this macro will simply grab a
122 /// reference to the previously initialized runtime.
123 ///
124 /// Passing nothing will default to initializing a runtime using the default number of threads
125 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
126 ///
127 /// Passing `threads` number will yield a runtime that allocates that many threads.
128 ///
129 ///
130 /// ## Examples
131 ///
132 /// ```
133 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
134 /// use rumtk_core::core::RUMResult;
135 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
136 ///
137 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
138 /// let mut result = Vec::<i32>::new();
139 /// for arg in args.read().await.iter() {
140 /// result.push(*arg);
141 /// }
142 /// Ok(result)
143 /// }
144 ///
145 /// let rt = rumtk_init_threads!(); // Creates runtime instance
146 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
147 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
148 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
149 /// ```
150 ///
151 /// ```
152 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
153 /// use rumtk_core::core::RUMResult;
154 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
155 ///
156 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
157 /// let mut result = Vec::<i32>::new();
158 /// for arg in args.read().await.iter() {
159 /// result.push(*arg);
160 /// }
161 /// Ok(result)
162 /// }
163 ///
164 /// let thread_count: usize = 10;
165 /// let rt = rumtk_init_threads!(&thread_count);
166 /// let args = rumtk_create_task_args!(1);
167 /// let task = rumtk_create_task!(test, args);
168 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
169 /// ```
170 #[macro_export]
171 macro_rules! rumtk_init_threads {
172 ( ) => {{
173 use $crate::rumtk_cache_fetch;
174 use $crate::threading::thread_primitives::{init_cache, rt_cache};
175 use $crate::threading::threading_functions::get_default_system_thread_count;
176 let rt = rumtk_cache_fetch!(
177 &mut rt_cache,
178 &get_default_system_thread_count(),
179 init_cache
180 );
181 rt
182 }};
183 ( $threads:expr ) => {{
184 use $crate::rumtk_cache_fetch;
185 use $crate::threading::thread_primitives::{init_cache, rt_cache};
186 let rt = rumtk_cache_fetch!(&mut rt_cache, $threads, init_cache);
187 rt
188 }};
189 }
190
191 ///
192 /// Puts task onto the runtime queue.
193 ///
194 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
195 ///
196 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
197 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
198 ///
199 #[macro_export]
200 macro_rules! rumtk_spawn_task {
201 ( $func:expr ) => {{
202 let rt = rumtk_init_threads!();
203 rt.spawn($func)
204 }};
205 ( $rt:expr, $func:expr ) => {{
206 $rt.spawn($func)
207 }};
208 }
209
210 ///
211 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
212 ///
213 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
214 /// async closure without passing any arguments.
215 ///
216 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
217 /// In such a case, we pass those arguments to the call on the async closure and await on results.
218 ///
219 #[macro_export]
220 macro_rules! rumtk_wait_on_task {
221 ( $rt:expr, $func:expr ) => {{
222 $rt.block_on(async move {
223 $func().await
224 })
225 }};
226 ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
227 $rt.block_on(async move {
228 $func($($arg_items),+).await
229 })
230 }};
231 }
232
233 ///
234 /// This macro awaits a future.
235 ///
236 /// The arguments are a reference to the runtime (`rt) and a future.
237 ///
238 /// If there is a result, you will get the result of the future.
239 ///
240 /// ## Examples
241 ///
242 /// ```
243 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
244 /// use rumtk_core::core::RUMResult;
245 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
246 ///
247 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
248 /// let mut result = Vec::<i32>::new();
249 /// for arg in args.read().await.iter() {
250 /// result.push(*arg);
251 /// }
252 /// Ok(result)
253 /// }
254 ///
255 /// let rt = rumtk_init_threads!();
256 /// let args = rumtk_create_task_args!(1);
257 /// let task = rumtk_create_task!(test, args);
258 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
259 /// ```
260 ///
261 #[macro_export]
262 macro_rules! rumtk_resolve_task {
263 ( $rt:expr, $future:expr ) => {{
264 use $crate::strings::format_compact;
265 //$rt.block_on(async move { $future.await }).unwrap()
266 match $rt.block_on(async move { $future.await }) {
267 Ok(r) => Ok(r),
268 Err(e) => Err(format_compact!("Task failed with {}", e)),
269 }
270 }};
271 }
272
273 ///
274 /// This macro creates an async body that calls the async closure and awaits it.
275 ///
276 #[macro_export]
277 macro_rules! rumtk_create_task {
278 ( $func:expr ) => {{
279 async move {
280 let f = $func;
281 f().await
282 }
283 }};
284 ( $func:expr, $args:expr ) => {{
285 async move {
286 let f = $func;
287 f(&$args).await
288 }
289 }};
290 }
291
292 ///
293 /// Creates an instance of [SafeTaskArgs] with the arguments passed.
294 ///
295 /// ## Note
296 ///
297 /// All arguments must be of the same type
298 ///
299 #[macro_export]
300 macro_rules! rumtk_create_task_args {
301 ( ) => {{
302 use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
303 use tokio::sync::RwLock;
304 SafeTaskArgs::new(RwLock::new(vec![]))
305 }};
306 ( $($args:expr),+ ) => {{
307 use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
308 use tokio::sync::RwLock;
309 SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
310 }};
311 }
312
313 ///
314 /// Convenience macro for packaging the task components and launching the task in one line.
315 ///
316 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
317 /// number of threads at the end. This is optional. Meaning, we will default to the system's
318 /// number of threads if that value is not specified.
319 ///
320 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
321 /// variable number of arguments to pass to the task. each argument must be of the same type.
322 /// If you wish to pass different arguments with different types, please define an abstract type
323 /// whose underlying structure is a tuple of items and pass that instead.
324 ///
325 /// ## Examples
326 ///
327 /// ### With Default Thread Count
328 /// ```
329 /// use rumtk_core::{rumtk_exec_task};
330 /// use rumtk_core::core::RUMResult;
331 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
332 ///
333 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
334 /// let mut result = Vec::<i32>::new();
335 /// for arg in args.read().await.iter() {
336 /// result.push(*arg);
337 /// }
338 /// Ok(result)
339 /// }
340 ///
341 /// let result = rumtk_exec_task!(test, vec![5]);
342 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
343 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
344 /// ```
345 ///
346 /// ### With Custom Thread Count
347 /// ```
348 /// use rumtk_core::{rumtk_exec_task};
349 /// use rumtk_core::core::RUMResult;
350 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
351 ///
352 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
353 /// let mut result = Vec::<i32>::new();
354 /// for arg in args.read().await.iter() {
355 /// result.push(*arg);
356 /// }
357 /// Ok(result)
358 /// }
359 ///
360 /// let result = rumtk_exec_task!(test, vec![5], 5);
361 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
362 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
363 /// ```
364 ///
365 /// ### With Async Function Body
366 /// ```
367 /// use rumtk_core::{rumtk_exec_task};
368 /// use rumtk_core::core::RUMResult;
369 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
370 ///
371 /// let result = rumtk_exec_task!(
372 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
373 /// let mut result = Vec::<i32>::new();
374 /// for arg in args.read().await.iter() {
375 /// result.push(*arg);
376 /// }
377 /// Ok(result)
378 /// },
379 /// vec![5]);
380 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
381 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
382 /// ```
383 ///
384 /// ### With Async Function Body and No Args
385 /// ```
386 /// use rumtk_core::{rumtk_exec_task};
387 /// use rumtk_core::core::RUMResult;
388 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
389 ///
390 /// let result = rumtk_exec_task!(
391 /// async || -> RUMResult<Vec<i32>> {
392 /// let mut result = Vec::<i32>::new();
393 /// Ok(result)
394 /// });
395 /// let empty = Vec::<i32>::new();
396 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
397 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
398 /// ```
399 ///
400 /// ## Equivalent To
401 ///
402 /// ```
403 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
404 /// use rumtk_core::core::RUMResult;
405 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
406 ///
407 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
408 /// let mut result = Vec::<i32>::new();
409 /// for arg in args.read().await.iter() {
410 /// result.push(*arg);
411 /// }
412 /// Ok(result)
413 /// }
414 ///
415 /// let rt = rumtk_init_threads!();
416 /// let args = rumtk_create_task_args!(1);
417 /// let task = rumtk_create_task!(test, args);
418 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
419 /// ```
420 ///
421 #[macro_export]
422 macro_rules! rumtk_exec_task {
423 ($func:expr ) => {{
424 use tokio::sync::RwLock;
425 use $crate::{
426 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
427 };
428 let rt = rumtk_init_threads!();
429 let task = rumtk_create_task!($func);
430 rumtk_resolve_task!(&rt, task)
431 }};
432 ($func:expr, $args:expr ) => {{
433 use tokio::sync::RwLock;
434 use $crate::{
435 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
436 };
437 let rt = rumtk_init_threads!();
438 let args = SafeTaskArgs::new(RwLock::new($args));
439 let task = rumtk_create_task!($func, args);
440 rumtk_resolve_task!(&rt, task)
441 }};
442 ($func:expr, $args:expr , $threads:expr ) => {{
443 use tokio::sync::RwLock;
444 use $crate::{
445 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
446 };
447 let rt = rumtk_init_threads!(&$threads);
448 let args = SafeTaskArgs::new(RwLock::new($args));
449 let task = rumtk_create_task!($func, args);
450 rumtk_resolve_task!(&rt, task)
451 }};
452 }
453
454 ///
455 /// Sleep a duration of time in a sync context, so no await can be call on the result.
456 ///
457 /// You can pass any value that can be cast to f32.
458 ///
459 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
460 ///
461 /// ## Examples
462 ///
463 /// ```
464 /// use rumtk_core::rumtk_sleep;
465 /// rumtk_sleep!(1); // Sleeps for 1 second.
466 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
467 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
468 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
469 /// ```
470 ///
471 #[macro_export]
472 macro_rules! rumtk_sleep {
473 ( $dur:expr) => {{
474 use $crate::threading::threading_functions::sleep;
475 sleep($dur as f32)
476 }};
477 }
478
479 ///
480 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
481 ///
482 /// You can pass any value that can be cast to f32.
483 ///
484 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
485 ///
486 /// ## Examples
487 ///
488 /// ```
489 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
490 /// use rumtk_core::core::RUMResult;
491 /// rumtk_exec_task!( async || -> RUMResult<()> {
492 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
493 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
494 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
495 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
496 /// Ok(())
497 /// }
498 /// );
499 /// ```
500 ///
501 #[macro_export]
502 macro_rules! rumtk_async_sleep {
503 ( $dur:expr) => {{
504 use $crate::threading::threading_functions::async_sleep;
505 async_sleep($dur as f32)
506 }};
507 }
508}