rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides all of the primitives needed to build a multithreaded application.
24///
25pub mod thread_primitives {
26    use crate::cache::{new_cache, LazyRUMCache};
27    use crate::core::{RUMResult, RUMVec};
28    use std::sync::Arc;
29    use tokio::runtime::Runtime as TokioRuntime;
30    use tokio::sync::RwLock;
31    use tokio::task::JoinHandle;
32
33    /**************************** Globals **************************************/
34    pub static mut rt_cache: TokioRtCache = new_cache();
35    /**************************** Helpers ***************************************/
36    pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
37        let mut builder = tokio::runtime::Builder::new_multi_thread();
38        builder.worker_threads(*threads);
39        builder.enable_all();
40        match builder.build() {
41            Ok(handle) => Arc::new(handle),
42            Err(e) => panic!(
43                "Unable to initialize threading tokio runtime because {}!",
44                &e
45            ),
46        }
47    }
48
49    /**************************** Types ***************************************/
50    pub type SafeTokioRuntime = Arc<TokioRuntime>;
51    pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
52    pub type TaskItems<T> = RUMVec<T>;
53    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
54    pub type TaskArgs<T> = TaskItems<T>;
55    /// Type to use to define how task results are expected to be returned.
56    pub type TaskResult<R> = RUMResult<TaskItems<R>>;
57    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
58    /// Function signature defining the interface of task processing logic.
59    pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
60    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
61    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
62    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
63}
64
65///
66/// This module contains a few helper.
67///
68/// For example, you can find a function for determining number of threads available in system.
69/// The sleep family of functions are also here.
70///
71pub mod threading_functions {
72    use num_cpus;
73    use std::thread::{available_parallelism, sleep as std_sleep};
74    use std::time::Duration;
75    use tokio::time::sleep as tokio_sleep;
76
77    pub const NANOS_PER_SEC: u64 = 1000000000;
78    pub const MILLIS_PER_SEC: u64 = 1000;
79    pub const MICROS_PER_SEC: u64 = 1000000;
80
81    pub fn get_default_system_thread_count() -> usize {
82        let cpus: usize = num_cpus::get();
83        let parallelism = match available_parallelism() {
84            Ok(n) => n.get(),
85            Err(_) => 0,
86        };
87
88        if parallelism >= cpus {
89            parallelism
90        } else {
91            cpus
92        }
93    }
94
95    pub fn sleep(s: f32) {
96        let ns = s * NANOS_PER_SEC as f32;
97        let rounded_ns = ns.round() as u64;
98        let duration = Duration::from_nanos(rounded_ns);
99        std_sleep(duration);
100    }
101
102    pub async fn async_sleep(s: f32) {
103        let ns = s * NANOS_PER_SEC as f32;
104        let rounded_ns = ns.round() as u64;
105        let duration = Duration::from_nanos(rounded_ns);
106        tokio_sleep(duration).await;
107    }
108}
109
110///
111/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
112/// This means that by default, all jobs sent to the thread pool have to be async in nature.
113/// These macros make handling of these jobs at the sync/async boundary more convenient.
114///
115pub mod threading_macros {
116    use crate::threading::thread_primitives;
117    use crate::threading::thread_primitives::SafeTaskArgs;
118
119    ///
120    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
121    /// will be saved to the global context so the next call to this macro will simply grab a
122    /// reference to the previously initialized runtime.
123    ///
124    /// Passing nothing will default to initializing a runtime using the default number of threads
125    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
126    ///
127    /// Passing `threads` number will yield a runtime that allocates that many threads.
128    ///
129    ///
130    /// ## Examples
131    ///
132    /// ```
133    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
134    ///     use rumtk_core::core::RUMResult;
135    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
136    ///
137    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
138    ///         let mut result = Vec::<i32>::new();
139    ///         for arg in args.read().await.iter() {
140    ///             result.push(*arg);
141    ///         }
142    ///         Ok(result)
143    ///     }
144    ///
145    ///     let rt = rumtk_init_threads!();                                      // Creates runtime instance
146    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
147    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
148    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
149    /// ```
150    ///
151    /// ```
152    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
153    ///     use rumtk_core::core::RUMResult;
154    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
155    ///
156    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
157    ///         let mut result = Vec::<i32>::new();
158    ///         for arg in args.read().await.iter() {
159    ///             result.push(*arg);
160    ///         }
161    ///         Ok(result)
162    ///     }
163    ///
164    ///     let thread_count: usize = 10;
165    ///     let rt = rumtk_init_threads!(&thread_count);
166    ///     let args = rumtk_create_task_args!(1);
167    ///     let task = rumtk_create_task!(test, args);
168    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
169    /// ```
170    #[macro_export]
171    macro_rules! rumtk_init_threads {
172        ( ) => {{
173            use $crate::rumtk_cache_fetch;
174            use $crate::threading::thread_primitives::{init_cache, rt_cache};
175            use $crate::threading::threading_functions::get_default_system_thread_count;
176            let rt = rumtk_cache_fetch!(
177                &mut rt_cache,
178                &get_default_system_thread_count(),
179                init_cache
180            );
181            rt
182        }};
183        ( $threads:expr ) => {{
184            use $crate::rumtk_cache_fetch;
185            use $crate::threading::thread_primitives::{init_cache, rt_cache};
186            let rt = rumtk_cache_fetch!(&raw mut rt_cache, $threads, init_cache);
187            rt
188        }};
189    }
190
191    ///
192    /// Puts task onto the runtime queue.
193    ///
194    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
195    ///
196    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
197    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
198    ///
199    #[macro_export]
200    macro_rules! rumtk_spawn_task {
201        ( $func:expr ) => {{
202            let rt = rumtk_init_threads!();
203            rt.spawn($func)
204        }};
205        ( $rt:expr, $func:expr ) => {{
206            $rt.spawn($func)
207        }};
208    }
209
210    ///
211    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
212    ///
213    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
214    /// async closure without passing any arguments.
215    ///
216    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
217    /// In such a case, we pass those arguments to the call on the async closure and await on results.
218    ///
219    #[macro_export]
220    macro_rules! rumtk_wait_on_task {
221        ( $rt:expr, $func:expr ) => {{
222            $rt.block_on(async move {
223                $func().await
224            })
225        }};
226        ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
227            $rt.block_on(async move {
228                $func($($arg_items),+).await
229            })
230        }};
231    }
232
233    ///
234    /// This macro awaits a future.
235    ///
236    /// The arguments are a reference to the runtime (`rt) and a future.
237    ///
238    /// If there is a result, you will get the result of the future.
239    ///
240    /// ## Examples
241    ///
242    /// ```
243    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
244    ///     use rumtk_core::core::RUMResult;
245    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
246    ///
247    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
248    ///         let mut result = Vec::<i32>::new();
249    ///         for arg in args.read().await.iter() {
250    ///             result.push(*arg);
251    ///         }
252    ///         Ok(result)
253    ///     }
254    ///
255    ///     let rt = rumtk_init_threads!();
256    ///     let args = rumtk_create_task_args!(1);
257    ///     let task = rumtk_create_task!(test, args);
258    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
259    /// ```
260    ///
261    #[macro_export]
262    macro_rules! rumtk_resolve_task {
263        ( $rt:expr, $future:expr ) => {{
264            use $crate::strings::rumtk_format;
265            // Fun tidbit, the expression rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)), where
266            // rt is the tokio runtime yields async move { { &rt.spawn(task) } }. However, the whole thing
267            // is technically moved into the async closure and captured so things like mutex guards
268            // technically go out of the outer scope. As a result that expression fails to compile even
269            // though the intent is for rumtk_spawn_task to resolve first and its result get moved
270            // into the async closure. To ensure that happens regardless of given expression, we do
271            // a variable assignment below to force the "future" macro expressions to resolve before
272            // moving into the closure. DO NOT REMOVE OR "SIMPLIFY" THE let future = $future LINE!!!
273            let future = $future;
274            match $rt.block_on(async move { future.await }) {
275                Ok(r) => Ok(r),
276                Err(e) => Err(rumtk_format!("Task failed with {}", e)),
277            }
278        }};
279    }
280
281    ///
282    /// This macro creates an async body that calls the async closure and awaits it.
283    ///
284    #[macro_export]
285    macro_rules! rumtk_create_task {
286        ( $func:expr ) => {{
287            async move {
288                let f = $func;
289                f().await
290            }
291        }};
292        ( $func:expr, $args:expr ) => {{
293            async move {
294                let f = $func;
295                f(&$args).await
296            }
297        }};
298    }
299
300    ///
301    /// Creates an instance of [SafeTaskArgs] with the arguments passed.
302    ///
303    /// ## Note
304    ///
305    /// All arguments must be of the same type
306    ///
307    #[macro_export]
308    macro_rules! rumtk_create_task_args {
309        ( ) => {{
310            use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
311            use tokio::sync::RwLock;
312            SafeTaskArgs::new(RwLock::new(vec![]))
313        }};
314        ( $($args:expr),+ ) => {{
315            use $crate::threading::thread_primitives::{SafeTaskArgs};
316            use tokio::sync::RwLock;
317            SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
318        }};
319    }
320
321    ///
322    /// Convenience macro for packaging the task components and launching the task in one line.
323    ///
324    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
325    /// number of threads at the end. This is optional. Meaning, we will default to the system's
326    /// number of threads if that value is not specified.
327    ///
328    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
329    /// variable number of arguments to pass to the task. each argument must be of the same type.
330    /// If you wish to pass different arguments with different types, please define an abstract type
331    /// whose underlying structure is a tuple of items and pass that instead.
332    ///
333    /// ## Examples
334    ///
335    /// ### With Default Thread Count
336    /// ```
337    ///     use rumtk_core::{rumtk_exec_task};
338    ///     use rumtk_core::core::RUMResult;
339    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
340    ///
341    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
342    ///         let mut result = Vec::<i32>::new();
343    ///         for arg in args.read().await.iter() {
344    ///             result.push(*arg);
345    ///         }
346    ///         Ok(result)
347    ///     }
348    ///
349    ///     let result = rumtk_exec_task!(test, vec![5]);
350    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
351    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
352    /// ```
353    ///
354    /// ### With Custom Thread Count
355    /// ```
356    ///     use rumtk_core::{rumtk_exec_task};
357    ///     use rumtk_core::core::RUMResult;
358    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
359    ///
360    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
361    ///         let mut result = Vec::<i32>::new();
362    ///         for arg in args.read().await.iter() {
363    ///             result.push(*arg);
364    ///         }
365    ///         Ok(result)
366    ///     }
367    ///
368    ///     let result = rumtk_exec_task!(test, vec![5], 5);
369    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
370    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
371    /// ```
372    ///
373    /// ### With Async Function Body
374    /// ```
375    ///     use rumtk_core::{rumtk_exec_task};
376    ///     use rumtk_core::core::RUMResult;
377    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
378    ///
379    ///     let result = rumtk_exec_task!(
380    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
381    ///         let mut result = Vec::<i32>::new();
382    ///         for arg in args.read().await.iter() {
383    ///             result.push(*arg);
384    ///         }
385    ///         Ok(result)
386    ///     },
387    ///     vec![5]);
388    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
389    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
390    /// ```
391    ///
392    /// ### With Async Function Body and No Args
393    /// ```
394    ///     use rumtk_core::{rumtk_exec_task};
395    ///     use rumtk_core::core::RUMResult;
396    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
397    ///
398    ///     let result = rumtk_exec_task!(
399    ///     async || -> RUMResult<Vec<i32>> {
400    ///         let mut result = Vec::<i32>::new();
401    ///         Ok(result)
402    ///     });
403    ///     let empty = Vec::<i32>::new();
404    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
405    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
406    /// ```
407    ///
408    /// ## Equivalent To
409    ///
410    /// ```
411    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
412    ///     use rumtk_core::core::RUMResult;
413    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
414    ///
415    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
416    ///         let mut result = Vec::<i32>::new();
417    ///         for arg in args.read().await.iter() {
418    ///             result.push(*arg);
419    ///         }
420    ///         Ok(result)
421    ///     }
422    ///
423    ///     let rt = rumtk_init_threads!();
424    ///     let args = rumtk_create_task_args!(1);
425    ///     let task = rumtk_create_task!(test, args);
426    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
427    /// ```
428    ///
429    #[macro_export]
430    macro_rules! rumtk_exec_task {
431        ($func:expr ) => {{
432            use tokio::sync::RwLock;
433            use $crate::{
434                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
435            };
436            let rt = rumtk_init_threads!();
437            let task = rumtk_create_task!($func);
438            rumtk_resolve_task!(&rt, task)
439        }};
440        ($func:expr, $args:expr ) => {{
441            use tokio::sync::RwLock;
442            use $crate::{
443                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
444            };
445            let rt = rumtk_init_threads!();
446            let args = SafeTaskArgs::new(RwLock::new($args));
447            let task = rumtk_create_task!($func, args);
448            rumtk_resolve_task!(&rt, task)
449        }};
450        ($func:expr, $args:expr , $threads:expr ) => {{
451            use tokio::sync::RwLock;
452            use $crate::{
453                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
454            };
455            let rt = rumtk_init_threads!(&$threads);
456            let args = SafeTaskArgs::new(RwLock::new($args));
457            let task = rumtk_create_task!($func, args);
458            rumtk_resolve_task!(&rt, task)
459        }};
460    }
461
462    ///
463    /// Sleep a duration of time in a sync context, so no await can be call on the result.
464    ///
465    /// You can pass any value that can be cast to f32.
466    ///
467    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
468    ///
469    /// ## Examples
470    ///
471    /// ```
472    ///     use rumtk_core::rumtk_sleep;
473    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
474    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
475    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
476    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
477    /// ```
478    ///
479    #[macro_export]
480    macro_rules! rumtk_sleep {
481        ( $dur:expr) => {{
482            use $crate::threading::threading_functions::sleep;
483            sleep($dur as f32)
484        }};
485    }
486
487    ///
488    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
489    ///
490    /// You can pass any value that can be cast to f32.
491    ///
492    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
493    ///
494    /// ## Examples
495    ///
496    /// ```
497    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
498    ///     use rumtk_core::core::RUMResult;
499    ///     rumtk_exec_task!( async || -> RUMResult<()> {
500    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
501    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
502    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
503    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
504    ///             Ok(())
505    ///         }
506    ///     );
507    /// ```
508    ///
509    #[macro_export]
510    macro_rules! rumtk_async_sleep {
511        ( $dur:expr) => {{
512            use $crate::threading::threading_functions::async_sleep;
513            async_sleep($dur as f32)
514        }};
515    }
516}