pub mod queue {
use crate::core::RUMResult;
use crate::threading::thread_primitives::*;
use crate::{rumtk_init_threads, rumtk_resolve_task, rumtk_spawn_task, threading};
use std::future::Future;
use std::thread::sleep;
use std::time::Duration;
pub const DEFAULT_SLEEP_DURATION: Duration = Duration::from_millis(1);
pub const DEFAULT_QUEUE_CAPACITY: usize = 10;
pub const DEFAULT_MICROTASK_QUEUE_CAPACITY: usize = 5;
pub struct TaskQueue<R> {
tasks: AsyncTaskHandles<R>,
runtime: &'static SafeTokioRuntime,
}
impl<R> TaskQueue<R>
where
R: Sync + Send + Clone + 'static,
{
pub fn default() -> RUMResult<TaskQueue<R>> {
Self::new(&threading::threading_functions::get_default_system_thread_count())
}
pub fn new(worker_num: &usize) -> RUMResult<TaskQueue<R>> {
let tasks = AsyncTaskHandles::with_capacity(DEFAULT_QUEUE_CAPACITY);
let runtime = rumtk_init_threads!(&worker_num);
Ok(TaskQueue { tasks, runtime })
}
pub fn add_task<F>(&mut self, task: F)
where
F: Future<Output = TaskResult<R>> + Send + Sync + 'static,
F::Output: Send + 'static,
{
let handle = rumtk_spawn_task!(&self.runtime, task);
self.tasks.push(handle);
}
pub fn wait(&mut self) -> TaskResults<R> {
while !self.is_completed() {
sleep(DEFAULT_SLEEP_DURATION);
}
let results = self.gather();
self.reset();
results
}
pub fn is_completed(&self) -> bool {
let mut accumulator: usize = 0;
if self.tasks.is_empty() {
return false;
}
for task in self.tasks.iter() {
accumulator += task.is_finished() as usize;
}
(accumulator / self.tasks.len()) > 0
}
pub fn reset(&mut self) {
self.tasks.clear();
}
fn gather(&mut self) -> TaskResults<R> {
let mut result_queue = TaskResults::<R>::with_capacity(self.tasks.len());
for i in 0..self.tasks.len() {
let task = self.tasks.pop().unwrap();
result_queue.push(rumtk_resolve_task!(&self.runtime, task).unwrap());
}
result_queue
}
}
}
pub mod queue_macros {
#[macro_export]
macro_rules! rumtk_new_task_queue {
( $worker_num:expr ) => {{
use $crate::queue::queue::TaskQueue;
TaskQueue::new($worker_num);
}};
}
}