rumtk_core/
queue.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
 * This toolkit aims to be reliable, simple, performant, and standards compliant.
 * Copyright (C) 2025  Luis M. Santos, M.D.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
pub mod queue {
    use std::future::Future;
    use std::sync::Mutex;
    use std::time::Duration;
    use std::thread::{sleep};
    use crate::core::RUMResult;
    use crate::threading;
    pub use crate::threading::thread_primitives::*;

    pub const DEFAULT_SLEEP_DURATION: Duration = Duration::from_millis(1);
    pub const DEFAULT_QUEUE_CAPACITY: usize = 10;
    pub const DEFAULT_MICROTASK_QUEUE_CAPACITY: usize = 5;


    pub struct TaskQueue<R> {
        tasks: AsyncTaskHandles<R>,
        threads: ThreadPool
    }

    impl<R> TaskQueue<R>
    where
        R: Send + Clone + 'static,
    {
        ///
        /// This method creates a [`TaskQueue`] instance using sensible defaults.
        ///
        /// The `threads` field is computed from the number of cores present in system.
        ///
        pub fn default() -> RUMResult<TaskQueue<R>> {
            Self::new(threading::threading_functions::get_default_system_thread_count())
        }

        ///
        /// Creates an instance of [`ThreadedTaskQueue<T, R>`] in the form of [`SafeThreadedTaskQueue<T, R>`].
        /// Expects you to provide the count of threads to spawn and the microtask queue size
        /// allocated by each thread.
        ///
        /// This method calls [`Self::with_capacity()`] for the actual object creation.
        /// The main queue capacity is pre-allocated to [`DEFAULT_QUEUE_CAPACITY`].
        ///
        pub fn new(worker_num: usize) -> RUMResult<TaskQueue<R>> {
            let tasks = AsyncTaskHandles::with_capacity(DEFAULT_QUEUE_CAPACITY);
            let threads = ThreadPool::new(worker_num)?;
            Ok(TaskQueue{tasks, threads})
        }

        ///
        /// Add a task to the processing queue. The idea is that you can queue a processor function
        /// and list of args that will be picked up by one of the threads for processing.
        ///
        pub fn add_task<T: Send + Sync + Clone + 'static>(&mut self, processor: TaskProcessor<T, R>, args: SafeTaskArgs<T>) {
            let task = Task::new(processor, args);
            let safe_task = SafeTask::new(Mutex::new(task));
            self.tasks.push(self.threads.execute(safe_task));
        }

        ///
        /// This method waits until all queued tasks have been processed from the main queue.
        ///
        /// We poll the status of the main queue every [`DEFAULT_SLEEP_DURATION`] ms.
        ///
        /// Upon completion,
        ///
        /// 1. We collect the results generated (if any).
        /// 2. We reset the main task and result internal queue states.
        /// 3. Return the list of results ([`TaskResults<R>`]).
        ///
        /// ### Note:
        ///
        ///     Results returned here are not guaranteed to be in the same order as the order in which
        ///     the tasks were queued for work. You will need to pass a type as T that automatically
        ///     tracks its own id or has a way for you to resort results.
        ///
        pub fn wait(&mut self) -> TaskResults<R> {
            while !self.is_completed() {
                sleep(DEFAULT_SLEEP_DURATION);
            }

            let results = self.gather();
            self.reset();
            results
        }

        ///
        /// Check if all work has been completed from the task queue.
        ///
        /// This implementation is branchless.
        ///
        pub fn is_completed(&self) -> bool {
            let mut accumulator: usize = 0;
            for task in self.tasks.iter() {
                accumulator += task.is_finished() as usize;
            }
            (accumulator / self.tasks.len()) > 0
        }

        ///
        /// Reset task queue and results queue states.
        ///
        pub fn reset(&mut self) {
            self.tasks.clear();
        }

        fn gather(&mut self) -> TaskResults<R> {
            let mut result_queue = TaskResults::<R>::with_capacity(self.tasks.len());
            for i in 0..self.tasks.len() {
                let task = self.tasks.pop().unwrap();
                result_queue.push(self.threads.resolve_task(task));
            }
            result_queue
        }
    }
}

pub mod queue_macros {
    #[macro_export]
    macro_rules! rumtk_new_task_queue {
        ( $worker_num:expr ) => {{
            use $crate::queue::queue::{TaskQueue};
            TaskQueue::new($worker_num);
        }};
    }
}