rumtk_core/queue.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D.
5 * Copyright (C) 2025 MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21pub mod queue {
22 use crate::core::RUMResult;
23 use crate::threading::thread_primitives::*;
24 use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
25 use std::future::Future;
26 use std::thread::sleep;
27 use std::time::Duration;
28
29 pub const DEFAULT_SLEEP_DURATION: Duration = Duration::from_millis(1);
30 pub const DEFAULT_QUEUE_CAPACITY: usize = 10;
31 pub const DEFAULT_MICROTASK_QUEUE_CAPACITY: usize = 5;
32
33 pub struct TaskQueue<R> {
34 tasks: AsyncTaskHandles<R>,
35 runtime: SafeTokioRuntime,
36 }
37
38 impl<R> TaskQueue<R>
39 where
40 R: Sync + Send + Clone + 'static,
41 {
42 ///
43 /// This method creates a [`TaskQueue`] instance using sensible defaults.
44 ///
45 /// The `threads` field is computed from the number of cores present in system.
46 ///
47 pub fn default() -> RUMResult<TaskQueue<R>> {
48 Self::new(&threading::threading_functions::get_default_system_thread_count())
49 }
50
51 ///
52 /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
53 /// Expects you to provide the count of threads to spawn and the microtask queue size
54 /// allocated by each thread.
55 ///
56 /// This method calls [`Self::with_capacity()`] for the actual object creation.
57 /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
58 ///
59 pub fn new(worker_num: &usize) -> RUMResult<TaskQueue<R>> {
60 let tasks = AsyncTaskHandles::with_capacity(DEFAULT_QUEUE_CAPACITY);
61 let runtime = rumtk_init_threads!(&worker_num);
62 Ok(TaskQueue {
63 tasks,
64 runtime: runtime.clone(),
65 })
66 }
67
68 ///
69 /// Add a task to the processing queue. The idea is that you can queue a processor function
70 /// and list of args that will be picked up by one of the threads for processing.
71 ///
72 pub fn add_task<F>(&mut self, task: F)
73 where
74 F: Future<Output = TaskResult<R>> + Send + Sync + 'static,
75 F::Output: Send + 'static,
76 {
77 let handle = rumtk_spawn_task!(&self.runtime, task);
78 self.tasks.push(handle);
79 }
80
81 ///
82 /// This method waits until all queued tasks have been processed from the main queue.
83 ///
84 /// We poll the status of the main queue every [`DEFAULT_SLEEP_DURATION`] ms.
85 ///
86 /// Upon completion,
87 ///
88 /// 1. We collect the results generated (if any).
89 /// 2. We reset the main task and result internal queue states.
90 /// 3. Return the list of results ([`TaskResults<R>`]).
91 ///
92 /// ### Note:
93 /// ```text
94 /// Results returned here are not guaranteed to be in the same order as the order in which
95 /// the tasks were queued for work. You will need to pass a type as T that automatically
96 /// tracks its own id or has a way for you to resort results.
97 /// ```
98 pub fn wait(&mut self) -> TaskResults<R> {
99 while !self.is_completed() {
100 sleep(DEFAULT_SLEEP_DURATION);
101 }
102
103 let results = self.gather();
104 self.reset();
105 results
106 }
107
108 ///
109 /// Check if all work has been completed from the task queue.
110 ///
111 /// This implementation is branchless.
112 ///
113 pub fn is_completed(&self) -> bool {
114 let mut accumulator: usize = 0;
115
116 if self.tasks.is_empty() {
117 return false;
118 }
119
120 for task in self.tasks.iter() {
121 accumulator += task.is_finished() as usize;
122 }
123 (accumulator / self.tasks.len()) > 0
124 }
125
126 ///
127 /// Reset task queue and results queue states.
128 ///
129 pub fn reset(&mut self) {
130 self.tasks.clear();
131 }
132
133 fn gather(&mut self) -> TaskResults<R> {
134 let mut result_queue = TaskResults::<R>::with_capacity(self.tasks.len());
135 for i in 0..self.tasks.len() {
136 let task = self.tasks.pop().unwrap();
137 result_queue.push(rumtk_resolve_task!(&self.runtime, task).unwrap());
138 }
139 result_queue
140 }
141 }
142}
143
144pub mod queue_macros {
145 #[macro_export]
146 macro_rules! rumtk_new_task_queue {
147 ( $worker_num:expr ) => {{
148 use $crate::queue::queue::TaskQueue;
149 TaskQueue::new($worker_num);
150 }};
151 }
152}