rumtk_core/threading.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22///
23/// This module provides all of the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26 use crate::cache::{new_cache, LazyRUMCache};
27 use crate::core::{RUMResult, RUMVec};
28 use std::sync::Arc;
29 use tokio::runtime::Runtime as TokioRuntime;
30 use tokio::sync::RwLock;
31 use tokio::task::JoinHandle;
32
33 /**************************** Globals **************************************/
34 pub static mut rt_cache: TokioRtCache = new_cache();
35 /**************************** Helpers ***************************************/
36 pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
37 let mut builder = tokio::runtime::Builder::new_multi_thread();
38 builder.worker_threads(*threads);
39 builder.enable_all();
40 match builder.build() {
41 Ok(handle) => Arc::new(handle),
42 Err(e) => panic!(
43 "Unable to initialize threading tokio runtime because {}!",
44 &e
45 ),
46 }
47 }
48
49 /**************************** Types ***************************************/
50 pub type SafeTokioRuntime = Arc<TokioRuntime>;
51 pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
52 pub type TaskItems<T> = RUMVec<T>;
53 /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
54 pub type TaskArgs<T> = TaskItems<T>;
55 /// Type to use to define how task results are expected to be returned.
56 pub type TaskResult<R> = RUMResult<TaskItems<R>>;
57 pub type TaskResults<R> = TaskItems<TaskResult<R>>;
58 /// Function signature defining the interface of task processing logic.
59 pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
60 pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
61 pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
62 //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
63}
64
65///
66/// This module contains a few helper.
67///
68/// For example, you can find a function for determining number of threads available in system.
69/// The sleep family of functions are also here.
70///
71pub mod threading_functions {
72 use num_cpus;
73 use std::thread::{available_parallelism, sleep as std_sleep};
74 use std::time::Duration;
75 use tokio::time::sleep as tokio_sleep;
76
77 pub const NANOS_PER_SEC: u64 = 1000000000;
78 pub const MILLIS_PER_SEC: u64 = 1000;
79 pub const MICROS_PER_SEC: u64 = 1000000;
80
81 pub fn get_default_system_thread_count() -> usize {
82 let cpus: usize = num_cpus::get();
83 let parallelism = match available_parallelism() {
84 Ok(n) => n.get(),
85 Err(_) => 0,
86 };
87
88 if parallelism >= cpus {
89 parallelism
90 } else {
91 cpus
92 }
93 }
94
95 pub fn sleep(s: f32) {
96 let ns = s * NANOS_PER_SEC as f32;
97 let rounded_ns = ns.round() as u64;
98 let duration = Duration::from_nanos(rounded_ns);
99 std_sleep(duration);
100 }
101
102 pub async fn async_sleep(s: f32) {
103 let ns = s * NANOS_PER_SEC as f32;
104 let rounded_ns = ns.round() as u64;
105 let duration = Duration::from_nanos(rounded_ns);
106 tokio_sleep(duration).await;
107 }
108}
109
110///
111/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
112/// This means that by default, all jobs sent to the thread pool have to be async in nature.
113/// These macros make handling of these jobs at the sync/async boundary more convenient.
114///
115pub mod threading_macros {
116 use crate::threading::thread_primitives;
117 use crate::threading::thread_primitives::SafeTaskArgs;
118
119 ///
120 /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
121 /// will be saved to the global context so the next call to this macro will simply grab a
122 /// reference to the previously initialized runtime.
123 ///
124 /// Passing nothing will default to initializing a runtime using the default number of threads
125 /// for this system. This is typically equivalent to number of cores/threads for your CPU.
126 ///
127 /// Passing `threads` number will yield a runtime that allocates that many threads.
128 ///
129 ///
130 /// ## Examples
131 ///
132 /// ```
133 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
134 /// use rumtk_core::core::RUMResult;
135 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
136 ///
137 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
138 /// let mut result = Vec::<i32>::new();
139 /// for arg in args.read().await.iter() {
140 /// result.push(*arg);
141 /// }
142 /// Ok(result)
143 /// }
144 ///
145 /// let rt = rumtk_init_threads!(); // Creates runtime instance
146 /// let args = rumtk_create_task_args!(1); // Creates a vector of i32s
147 /// let task = rumtk_create_task!(test, args); // Creates a standard task which consists of a function or closure accepting a Vec<T>
148 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
149 /// ```
150 ///
151 /// ```
152 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
153 /// use rumtk_core::core::RUMResult;
154 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
155 ///
156 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
157 /// let mut result = Vec::<i32>::new();
158 /// for arg in args.read().await.iter() {
159 /// result.push(*arg);
160 /// }
161 /// Ok(result)
162 /// }
163 ///
164 /// let thread_count: usize = 10;
165 /// let rt = rumtk_init_threads!(&thread_count);
166 /// let args = rumtk_create_task_args!(1);
167 /// let task = rumtk_create_task!(test, args);
168 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
169 /// ```
170 #[macro_export]
171 macro_rules! rumtk_init_threads {
172 ( ) => {{
173 use $crate::rumtk_cache_fetch;
174 use $crate::threading::thread_primitives::{init_cache, rt_cache};
175 use $crate::threading::threading_functions::get_default_system_thread_count;
176 let rt = rumtk_cache_fetch!(
177 &mut rt_cache,
178 &get_default_system_thread_count(),
179 init_cache
180 );
181 rt
182 }};
183 ( $threads:expr ) => {{
184 use $crate::rumtk_cache_fetch;
185 use $crate::threading::thread_primitives::{init_cache, rt_cache};
186 let rt = rumtk_cache_fetch!(&raw mut rt_cache, $threads, init_cache);
187 rt
188 }};
189 }
190
191 ///
192 /// Puts task onto the runtime queue.
193 ///
194 /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
195 ///
196 /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
197 /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
198 ///
199 #[macro_export]
200 macro_rules! rumtk_spawn_task {
201 ( $func:expr ) => {{
202 let rt = rumtk_init_threads!();
203 rt.spawn($func)
204 }};
205 ( $rt:expr, $func:expr ) => {{
206 $rt.spawn($func)
207 }};
208 }
209
210 ///
211 /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
212 ///
213 /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
214 /// async closure without passing any arguments.
215 ///
216 /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
217 /// In such a case, we pass those arguments to the call on the async closure and await on results.
218 ///
219 #[macro_export]
220 macro_rules! rumtk_wait_on_task {
221 ( $rt:expr, $func:expr ) => {{
222 $rt.block_on(async move {
223 $func().await
224 })
225 }};
226 ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
227 $rt.block_on(async move {
228 $func($($arg_items),+).await
229 })
230 }};
231 }
232
233 ///
234 /// This macro awaits a future.
235 ///
236 /// The arguments are a reference to the runtime (`rt) and a future.
237 ///
238 /// If there is a result, you will get the result of the future.
239 ///
240 /// ## Examples
241 ///
242 /// ```
243 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
244 /// use rumtk_core::core::RUMResult;
245 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
246 ///
247 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
248 /// let mut result = Vec::<i32>::new();
249 /// for arg in args.read().await.iter() {
250 /// result.push(*arg);
251 /// }
252 /// Ok(result)
253 /// }
254 ///
255 /// let rt = rumtk_init_threads!();
256 /// let args = rumtk_create_task_args!(1);
257 /// let task = rumtk_create_task!(test, args);
258 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
259 /// ```
260 ///
261 #[macro_export]
262 macro_rules! rumtk_resolve_task {
263 ( $rt:expr, $future:expr ) => {{
264 use $crate::strings::rumtk_format;
265 // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
266 // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
267 // is technically moved into the async closure and captured so things like mutex guards
268 // technically go out of the outer scope. As a result that expression fails to compile even
269 // though the intent is for rumtk_spawn_task to resolve first and its result get moved
270 // into the async closure. To ensure that happens regardless of given expression, we do
271 // a variable assignment below to force the "future" macro expressions to resolve before
272 // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
273 let future = $future;
274 match $rt.block_on(async move { future.await }) {
275 Ok(r) => Ok(r),
276 Err(e) => Err(rumtk_format!("Task failed with {}", e)),
277 }
278 }};
279 }
280
281 ///
282 /// This macro creates an async body that calls the async closure and awaits it.
283 ///
284 #[macro_export]
285 macro_rules! rumtk_create_task {
286 ( $func:expr ) => {{
287 async move {
288 let f = $func;
289 f().await
290 }
291 }};
292 ( $func:expr, $args:expr ) => {{
293 async move {
294 let f = $func;
295 f(&$args).await
296 }
297 }};
298 }
299
300 ///
301 /// Creates an instance of [SafeTaskArgs] with the arguments passed.
302 ///
303 /// ## Note
304 ///
305 /// All arguments must be of the same type
306 ///
307 #[macro_export]
308 macro_rules! rumtk_create_task_args {
309 ( ) => {{
310 use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
311 use tokio::sync::RwLock;
312 SafeTaskArgs::new(RwLock::new(vec![]))
313 }};
314 ( $($args:expr),+ ) => {{
315 use $crate::threading::thread_primitives::{SafeTaskArgs};
316 use tokio::sync::RwLock;
317 SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
318 }};
319 }
320
321 ///
322 /// Convenience macro for packaging the task components and launching the task in one line.
323 ///
324 /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
325 /// number of threads at the end. This is optional. Meaning, we will default to the system's
326 /// number of threads if that value is not specified.
327 ///
328 /// Between the `func` parameter and the optional `threads` parameter, you can specify a
329 /// variable number of arguments to pass to the task. each argument must be of the same type.
330 /// If you wish to pass different arguments with different types, please define an abstract type
331 /// whose underlying structure is a tuple of items and pass that instead.
332 ///
333 /// ## Examples
334 ///
335 /// ### With Default Thread Count
336 /// ```
337 /// use rumtk_core::{rumtk_exec_task};
338 /// use rumtk_core::core::RUMResult;
339 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
340 ///
341 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
342 /// let mut result = Vec::<i32>::new();
343 /// for arg in args.read().await.iter() {
344 /// result.push(*arg);
345 /// }
346 /// Ok(result)
347 /// }
348 ///
349 /// let result = rumtk_exec_task!(test, vec![5]);
350 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
351 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
352 /// ```
353 ///
354 /// ### With Custom Thread Count
355 /// ```
356 /// use rumtk_core::{rumtk_exec_task};
357 /// use rumtk_core::core::RUMResult;
358 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
359 ///
360 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
361 /// let mut result = Vec::<i32>::new();
362 /// for arg in args.read().await.iter() {
363 /// result.push(*arg);
364 /// }
365 /// Ok(result)
366 /// }
367 ///
368 /// let result = rumtk_exec_task!(test, vec![5], 5);
369 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
370 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
371 /// ```
372 ///
373 /// ### With Async Function Body
374 /// ```
375 /// use rumtk_core::{rumtk_exec_task};
376 /// use rumtk_core::core::RUMResult;
377 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
378 ///
379 /// let result = rumtk_exec_task!(
380 /// async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
381 /// let mut result = Vec::<i32>::new();
382 /// for arg in args.read().await.iter() {
383 /// result.push(*arg);
384 /// }
385 /// Ok(result)
386 /// },
387 /// vec![5]);
388 /// assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
389 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
390 /// ```
391 ///
392 /// ### With Async Function Body and No Args
393 /// ```
394 /// use rumtk_core::{rumtk_exec_task};
395 /// use rumtk_core::core::RUMResult;
396 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
397 ///
398 /// let result = rumtk_exec_task!(
399 /// async || -> RUMResult<Vec<i32>> {
400 /// let mut result = Vec::<i32>::new();
401 /// Ok(result)
402 /// });
403 /// let empty = Vec::<i32>::new();
404 /// assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
405 /// assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
406 /// ```
407 ///
408 /// ## Equivalent To
409 ///
410 /// ```
411 /// use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
412 /// use rumtk_core::core::RUMResult;
413 /// use rumtk_core::threading::thread_primitives::SafeTaskArgs;
414 ///
415 /// async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
416 /// let mut result = Vec::<i32>::new();
417 /// for arg in args.read().await.iter() {
418 /// result.push(*arg);
419 /// }
420 /// Ok(result)
421 /// }
422 ///
423 /// let rt = rumtk_init_threads!();
424 /// let args = rumtk_create_task_args!(1);
425 /// let task = rumtk_create_task!(test, args);
426 /// let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
427 /// ```
428 ///
429 #[macro_export]
430 macro_rules! rumtk_exec_task {
431 ($func:expr ) => {{
432 use tokio::sync::RwLock;
433 use $crate::{
434 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
435 };
436 let rt = rumtk_init_threads!();
437 let task = rumtk_create_task!($func);
438 rumtk_resolve_task!(&rt, task)
439 }};
440 ($func:expr, $args:expr ) => {{
441 use tokio::sync::RwLock;
442 use $crate::{
443 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
444 };
445 let rt = rumtk_init_threads!();
446 let args = SafeTaskArgs::new(RwLock::new($args));
447 let task = rumtk_create_task!($func, args);
448 rumtk_resolve_task!(&rt, task)
449 }};
450 ($func:expr, $args:expr , $threads:expr ) => {{
451 use tokio::sync::RwLock;
452 use $crate::{
453 rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
454 };
455 let rt = rumtk_init_threads!(&$threads);
456 let args = SafeTaskArgs::new(RwLock::new($args));
457 let task = rumtk_create_task!($func, args);
458 rumtk_resolve_task!(&rt, task)
459 }};
460 }
461
462 ///
463 /// Sleep a duration of time in a sync context, so no await can be call on the result.
464 ///
465 /// You can pass any value that can be cast to f32.
466 ///
467 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
468 ///
469 /// ## Examples
470 ///
471 /// ```
472 /// use rumtk_core::rumtk_sleep;
473 /// rumtk_sleep!(1); // Sleeps for 1 second.
474 /// rumtk_sleep!(0.001); // Sleeps for 1 millisecond
475 /// rumtk_sleep!(0.000001); // Sleeps for 1 microsecond
476 /// rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
477 /// ```
478 ///
479 #[macro_export]
480 macro_rules! rumtk_sleep {
481 ( $dur:expr) => {{
482 use $crate::threading::threading_functions::sleep;
483 sleep($dur as f32)
484 }};
485 }
486
487 ///
488 /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
489 ///
490 /// You can pass any value that can be cast to f32.
491 ///
492 /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
493 ///
494 /// ## Examples
495 ///
496 /// ```
497 /// use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
498 /// use rumtk_core::core::RUMResult;
499 /// rumtk_exec_task!( async || -> RUMResult<()> {
500 /// rumtk_async_sleep!(1).await; // Sleeps for 1 second.
501 /// rumtk_async_sleep!(0.001).await; // Sleeps for 1 millisecond
502 /// rumtk_async_sleep!(0.000001).await; // Sleeps for 1 microsecond
503 /// rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
504 /// Ok(())
505 /// }
506 /// );
507 /// ```
508 ///
509 #[macro_export]
510 macro_rules! rumtk_async_sleep {
511 ( $dur:expr) => {{
512 use $crate::threading::threading_functions::async_sleep;
513 async_sleep($dur as f32)
514 }};
515 }
516}