rumtk_core/
threading.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D.
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19 */
20
21///
22/// This module provides all of the primitives needed to build a multithreaded application.
23///
24pub mod thread_primitives {
25    use crate::cache::{new_cache, LazyRUMCache};
26    use crate::core::{RUMResult, RUMVec};
27    use std::future::IntoFuture;
28    use std::sync::Arc;
29    use tokio::runtime::Runtime as TokioRuntime;
30    use tokio::sync::RwLock;
31    use tokio::task::JoinHandle;
32
33    /**************************** Globals **************************************/
34    pub static mut rt_cache: TokioRtCache = new_cache();
35    /**************************** Helpers ***************************************/
36    pub fn init_cache(threads: &usize) -> SafeTokioRuntime {
37        let mut builder = tokio::runtime::Builder::new_multi_thread();
38        builder.worker_threads(*threads);
39        builder.enable_all();
40        match builder.build() {
41            Ok(handle) => Arc::new(handle),
42            Err(e) => panic!(
43                "Unable to initialize threading tokio runtime because {}!",
44                &e
45            ),
46        }
47    }
48
49    /**************************** Types ***************************************/
50    pub type SafeTokioRuntime = Arc<TokioRuntime>;
51    pub type TokioRtCache = LazyRUMCache<usize, SafeTokioRuntime>;
52    pub type TaskItems<T> = RUMVec<T>;
53    /// This type aliases a vector of T elements that will be used for passing arguments to the task processor.
54    pub type TaskArgs<T> = TaskItems<T>;
55    /// Type to use to define how task results are expected to be returned.
56    pub type TaskResult<R> = RUMResult<TaskItems<R>>;
57    pub type TaskResults<R> = TaskItems<TaskResult<R>>;
58    /// Function signature defining the interface of task processing logic.
59    pub type SafeTaskArgs<T> = Arc<RwLock<TaskItems<T>>>;
60    pub type AsyncTaskHandle<R> = JoinHandle<TaskResult<R>>;
61    pub type AsyncTaskHandles<R> = Vec<AsyncTaskHandle<R>>;
62    //pub type TaskProcessor<T, R, Fut: Future<Output = TaskResult<R>>> = impl FnOnce(&SafeTaskArgs<T>) -> Fut;
63}
64
65///
66/// This module contains a few helper.
67///
68/// For example, you can find a function for determining number of threads available in system.
69/// The sleep family of functions are also here.
70///
71pub mod threading_functions {
72    use num_cpus;
73    use std::thread::{available_parallelism, sleep as std_sleep};
74    use std::time::Duration;
75    use tokio::time::sleep as tokio_sleep;
76
77    pub const NANOS_PER_SEC: u64 = 1000000000;
78    pub const MILLIS_PER_SEC: u64 = 1000;
79    pub const MICROS_PER_SEC: u64 = 1000000;
80
81    pub fn get_default_system_thread_count() -> usize {
82        let cpus: usize = num_cpus::get();
83        let parallelism = match available_parallelism() {
84            Ok(n) => n.get(),
85            Err(_) => 0,
86        };
87
88        if parallelism >= cpus {
89            parallelism
90        } else {
91            cpus
92        }
93    }
94
95    pub fn sleep(s: f32) {
96        let ns = s * NANOS_PER_SEC as f32;
97        let rounded_ns = ns.round() as u64;
98        let duration = Duration::from_nanos(rounded_ns);
99        std_sleep(duration);
100    }
101
102    pub async fn async_sleep(s: f32) {
103        let ns = s * NANOS_PER_SEC as f32;
104        let rounded_ns = ns.round() as u64;
105        let duration = Duration::from_nanos(rounded_ns);
106        tokio_sleep(duration).await;
107    }
108}
109
110///
111/// Main API for interacting with the threading back end. Remember, we use tokio as our executor.
112/// This means that by default, all jobs sent to the thread pool have to be async in nature.
113/// These macros make handling of these jobs at the sync/async boundary more convenient.
114///
115pub mod threading_macros {
116    use crate::threading::thread_primitives;
117    use crate::threading::thread_primitives::SafeTaskArgs;
118
119    ///
120    /// First, let's make sure we have *tokio* initialized at least once. The runtime created here
121    /// will be saved to the global context so the next call to this macro will simply grab a
122    /// reference to the previously initialized runtime.
123    ///
124    /// Passing nothing will default to initializing a runtime using the default number of threads
125    /// for this system. This is typically equivalent to number of cores/threads for your CPU.
126    ///
127    /// Passing `threads` number will yield a runtime that allocates that many threads.
128    ///
129    ///
130    /// ## Examples
131    ///
132    /// ```
133    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
134    ///     use rumtk_core::core::RUMResult;
135    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
136    ///
137    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
138    ///         let mut result = Vec::<i32>::new();
139    ///         for arg in args.read().await.iter() {
140    ///             result.push(*arg);
141    ///         }
142    ///         Ok(result)
143    ///     }
144    ///
145    ///     let rt = rumtk_init_threads!();                                      // Creates runtime instance
146    ///     let args = rumtk_create_task_args!(1);                               // Creates a vector of i32s
147    ///     let task = rumtk_create_task!(test, args);                           // Creates a standard task which consists of a function or closure accepting a Vec<T>
148    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task)); // Spawn's task and waits for it to conclude.
149    /// ```
150    ///
151    /// ```
152    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
153    ///     use rumtk_core::core::RUMResult;
154    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
155    ///
156    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
157    ///         let mut result = Vec::<i32>::new();
158    ///         for arg in args.read().await.iter() {
159    ///             result.push(*arg);
160    ///         }
161    ///         Ok(result)
162    ///     }
163    ///
164    ///     let thread_count: usize = 10;
165    ///     let rt = rumtk_init_threads!(&thread_count);
166    ///     let args = rumtk_create_task_args!(1);
167    ///     let task = rumtk_create_task!(test, args);
168    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
169    /// ```
170    #[macro_export]
171    macro_rules! rumtk_init_threads {
172        ( ) => {{
173            use $crate::rumtk_cache_fetch;
174            use $crate::threading::thread_primitives::{init_cache, rt_cache};
175            use $crate::threading::threading_functions::get_default_system_thread_count;
176            let rt = rumtk_cache_fetch!(
177                &mut rt_cache,
178                &get_default_system_thread_count(),
179                init_cache
180            );
181            rt
182        }};
183        ( $threads:expr ) => {{
184            use $crate::rumtk_cache_fetch;
185            use $crate::threading::thread_primitives::{init_cache, rt_cache};
186            let rt = rumtk_cache_fetch!(&mut rt_cache, $threads, init_cache);
187            rt
188        }};
189    }
190
191    ///
192    /// Puts task onto the runtime queue.
193    ///
194    /// The parameters to this macro are a reference to the runtime (`rt`) and a future (`func`).
195    ///
196    /// The return is a [thread_primitives::JoinHandle<T>] instance. If the task was a standard
197    /// framework task, you will get [thread_primitives::AsyncTaskHandle] instead.
198    ///
199    #[macro_export]
200    macro_rules! rumtk_spawn_task {
201        ( $func:expr ) => {{
202            let rt = rumtk_init_threads!();
203            rt.spawn($func)
204        }};
205        ( $rt:expr, $func:expr ) => {{
206            $rt.spawn($func)
207        }};
208    }
209
210    ///
211    /// Using the initialized runtime, wait for the future to resolve in a thread blocking manner!
212    ///
213    /// If you pass a reference to the runtime (`rt`) and an async closure (`func`), we await the
214    /// async closure without passing any arguments.
215    ///
216    /// You can pass a third argument to this macro in the form of any number of arguments (`arg_item`).
217    /// In such a case, we pass those arguments to the call on the async closure and await on results.
218    ///
219    #[macro_export]
220    macro_rules! rumtk_wait_on_task {
221        ( $rt:expr, $func:expr ) => {{
222            $rt.block_on(async move {
223                $func().await
224            })
225        }};
226        ( $rt:expr, $func:expr, $($arg_items:expr),+ ) => {{
227            $rt.block_on(async move {
228                $func($($arg_items),+).await
229            })
230        }};
231    }
232
233    ///
234    /// This macro awaits a future.
235    ///
236    /// The arguments are a reference to the runtime (`rt) and a future.
237    ///
238    /// If there is a result, you will get the result of the future.
239    ///
240    /// ## Examples
241    ///
242    /// ```
243    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
244    ///     use rumtk_core::core::RUMResult;
245    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
246    ///
247    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
248    ///         let mut result = Vec::<i32>::new();
249    ///         for arg in args.read().await.iter() {
250    ///             result.push(*arg);
251    ///         }
252    ///         Ok(result)
253    ///     }
254    ///
255    ///     let rt = rumtk_init_threads!();
256    ///     let args = rumtk_create_task_args!(1);
257    ///     let task = rumtk_create_task!(test, args);
258    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
259    /// ```
260    ///
261    #[macro_export]
262    macro_rules! rumtk_resolve_task {
263        ( $rt:expr, $future:expr ) => {{
264            use $crate::strings::format_compact;
265            //$rt.block_on(async move { $future.await }).unwrap()
266            match $rt.block_on(async move { $future.await }) {
267                Ok(r) => Ok(r),
268                Err(e) => Err(format_compact!("Task failed with {}", e)),
269            }
270        }};
271    }
272
273    ///
274    /// This macro creates an async body that calls the async closure and awaits it.
275    ///
276    #[macro_export]
277    macro_rules! rumtk_create_task {
278        ( $func:expr ) => {{
279            async move {
280                let f = $func;
281                f().await
282            }
283        }};
284        ( $func:expr, $args:expr ) => {{
285            async move {
286                let f = $func;
287                f(&$args).await
288            }
289        }};
290    }
291
292    ///
293    /// Creates an instance of [SafeTaskArgs] with the arguments passed.
294    ///
295    /// ## Note
296    ///
297    /// All arguments must be of the same type
298    ///
299    #[macro_export]
300    macro_rules! rumtk_create_task_args {
301        ( ) => {{
302            use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
303            use tokio::sync::RwLock;
304            SafeTaskArgs::new(RwLock::new(vec![]))
305        }};
306        ( $($args:expr),+ ) => {{
307            use $crate::threading::thread_primitives::{TaskArgs, SafeTaskArgs, TaskItems};
308            use tokio::sync::RwLock;
309            SafeTaskArgs::new(RwLock::new(vec![$($args),+]))
310        }};
311    }
312
313    ///
314    /// Convenience macro for packaging the task components and launching the task in one line.
315    ///
316    /// One of the advantages is that you can generate a new `tokio` runtime by specifying the
317    /// number of threads at the end. This is optional. Meaning, we will default to the system's
318    /// number of threads if that value is not specified.
319    ///
320    /// Between the `func` parameter and the optional `threads` parameter, you can specify a
321    /// variable number of arguments to pass to the task. each argument must be of the same type.
322    /// If you wish to pass different arguments with different types, please define an abstract type
323    /// whose underlying structure is a tuple of items and pass that instead.
324    ///
325    /// ## Examples
326    ///
327    /// ### With Default Thread Count
328    /// ```
329    ///     use rumtk_core::{rumtk_exec_task};
330    ///     use rumtk_core::core::RUMResult;
331    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
332    ///
333    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
334    ///         let mut result = Vec::<i32>::new();
335    ///         for arg in args.read().await.iter() {
336    ///             result.push(*arg);
337    ///         }
338    ///         Ok(result)
339    ///     }
340    ///
341    ///     let result = rumtk_exec_task!(test, vec![5]);
342    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
343    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
344    /// ```
345    ///
346    /// ### With Custom Thread Count
347    /// ```
348    ///     use rumtk_core::{rumtk_exec_task};
349    ///     use rumtk_core::core::RUMResult;
350    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
351    ///
352    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
353    ///         let mut result = Vec::<i32>::new();
354    ///         for arg in args.read().await.iter() {
355    ///             result.push(*arg);
356    ///         }
357    ///         Ok(result)
358    ///     }
359    ///
360    ///     let result = rumtk_exec_task!(test, vec![5], 5);
361    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
362    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
363    /// ```
364    ///
365    /// ### With Async Function Body
366    /// ```
367    ///     use rumtk_core::{rumtk_exec_task};
368    ///     use rumtk_core::core::RUMResult;
369    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
370    ///
371    ///     let result = rumtk_exec_task!(
372    ///     async move |args: &SafeTaskArgs<i32>| -> RUMResult<Vec<i32>> {
373    ///         let mut result = Vec::<i32>::new();
374    ///         for arg in args.read().await.iter() {
375    ///             result.push(*arg);
376    ///         }
377    ///         Ok(result)
378    ///     },
379    ///     vec![5]);
380    ///     assert_eq!(&result.clone().unwrap(), &vec![5], "Results mismatch");
381    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
382    /// ```
383    ///
384    /// ### With Async Function Body and No Args
385    /// ```
386    ///     use rumtk_core::{rumtk_exec_task};
387    ///     use rumtk_core::core::RUMResult;
388    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
389    ///
390    ///     let result = rumtk_exec_task!(
391    ///     async || -> RUMResult<Vec<i32>> {
392    ///         let mut result = Vec::<i32>::new();
393    ///         Ok(result)
394    ///     });
395    ///     let empty = Vec::<i32>::new();
396    ///     assert_eq!(&result.clone().unwrap(), &empty, "Results mismatch");
397    ///     assert_ne!(&result.clone().unwrap(), &vec![5, 10], "Results do not mismatch as expected!");
398    /// ```
399    ///
400    /// ## Equivalent To
401    ///
402    /// ```
403    ///     use rumtk_core::{rumtk_init_threads, rumtk_resolve_task, rumtk_create_task_args, rumtk_create_task, rumtk_spawn_task};
404    ///     use rumtk_core::core::RUMResult;
405    ///     use rumtk_core::threading::thread_primitives::SafeTaskArgs;
406    ///
407    ///     async fn test(args: &SafeTaskArgs<i32>) -> RUMResult<Vec<i32>> {
408    ///         let mut result = Vec::<i32>::new();
409    ///         for arg in args.read().await.iter() {
410    ///             result.push(*arg);
411    ///         }
412    ///         Ok(result)
413    ///     }
414    ///
415    ///     let rt = rumtk_init_threads!();
416    ///     let args = rumtk_create_task_args!(1);
417    ///     let task = rumtk_create_task!(test, args);
418    ///     let result = rumtk_resolve_task!(&rt, rumtk_spawn_task!(&rt, task));
419    /// ```
420    ///
421    #[macro_export]
422    macro_rules! rumtk_exec_task {
423        ($func:expr ) => {{
424            use tokio::sync::RwLock;
425            use $crate::{
426                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
427            };
428            let rt = rumtk_init_threads!();
429            let task = rumtk_create_task!($func);
430            rumtk_resolve_task!(&rt, task)
431        }};
432        ($func:expr, $args:expr ) => {{
433            use tokio::sync::RwLock;
434            use $crate::{
435                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
436            };
437            let rt = rumtk_init_threads!();
438            let args = SafeTaskArgs::new(RwLock::new($args));
439            let task = rumtk_create_task!($func, args);
440            rumtk_resolve_task!(&rt, task)
441        }};
442        ($func:expr, $args:expr , $threads:expr ) => {{
443            use tokio::sync::RwLock;
444            use $crate::{
445                rumtk_create_task, rumtk_create_task_args, rumtk_init_threads, rumtk_resolve_task,
446            };
447            let rt = rumtk_init_threads!(&$threads);
448            let args = SafeTaskArgs::new(RwLock::new($args));
449            let task = rumtk_create_task!($func, args);
450            rumtk_resolve_task!(&rt, task)
451        }};
452    }
453
454    ///
455    /// Sleep a duration of time in a sync context, so no await can be call on the result.
456    ///
457    /// You can pass any value that can be cast to f32.
458    ///
459    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
460    ///
461    /// ## Examples
462    ///
463    /// ```
464    ///     use rumtk_core::rumtk_sleep;
465    ///     rumtk_sleep!(1);           // Sleeps for 1 second.
466    ///     rumtk_sleep!(0.001);       // Sleeps for 1 millisecond
467    ///     rumtk_sleep!(0.000001);    // Sleeps for 1 microsecond
468    ///     rumtk_sleep!(0.000000001); // Sleeps for 1 nanosecond
469    /// ```
470    ///
471    #[macro_export]
472    macro_rules! rumtk_sleep {
473        ( $dur:expr) => {{
474            use $crate::threading::threading_functions::sleep;
475            sleep($dur as f32)
476        }};
477    }
478
479    ///
480    /// Sleep for some duration of time in an async context. Meaning, we can be awaited.
481    ///
482    /// You can pass any value that can be cast to f32.
483    ///
484    /// The precision is up to nanoseconds and it is depicted by the number of decimal places.
485    ///
486    /// ## Examples
487    ///
488    /// ```
489    ///     use rumtk_core::{rumtk_async_sleep, rumtk_exec_task};
490    ///     use rumtk_core::core::RUMResult;
491    ///     rumtk_exec_task!( async || -> RUMResult<()> {
492    ///             rumtk_async_sleep!(1).await;           // Sleeps for 1 second.
493    ///             rumtk_async_sleep!(0.001).await;       // Sleeps for 1 millisecond
494    ///             rumtk_async_sleep!(0.000001).await;    // Sleeps for 1 microsecond
495    ///             rumtk_async_sleep!(0.000000001).await; // Sleeps for 1 nanosecond
496    ///             Ok(())
497    ///         }
498    ///     );
499    /// ```
500    ///
501    #[macro_export]
502    macro_rules! rumtk_async_sleep {
503        ( $dur:expr) => {{
504            use $crate::threading::threading_functions::async_sleep;
505            async_sleep($dur as f32)
506        }};
507    }
508}