pub mod error;
pub mod spawn_policy;
mod pipe;
use std::ops::Index;
use std::thread::{ JoinHandle, Builder };
use std::panic;
use std::panic:: { RefUnwindSafe };
use std::sync::atomic::{ AtomicBool, Ordering };
use std::sync::Arc;
use error::TaskQueueError;
use pipe::Sender;
use pipe::Receiver;
use pipe::ReceiverHandle;
use pipe::Priority;
use spawn_policy::SpawnPolicy;
use spawn_policy::StaticSpawnPolicy;
pub struct TaskQueue {
sender: Sender<Message>,
policy: Box<SpawnPolicy>,
min_threads: usize,
max_threads: usize,
threads: Vec<ThreadInfo>,
closing_threads: Vec<ThreadInfo>
}
impl TaskQueue {
pub fn new() -> Self {
TaskQueue::with_threads(10, 10).expect("10 and 10 satisfy with_threads method validation")
}
pub fn with_threads(min: usize, max: usize) -> Result<Self, TaskQueueError> {
if min <= 0 || max <= 0 || max < min {
return Err(TaskQueueError::illegal_start_threads(min, max));
}
Ok(TaskQueue {
sender: Sender::<Message>::new(),
policy: Box::new(StaticSpawnPolicy::new()),
min_threads: min,
max_threads: max,
threads: Vec::new(),
closing_threads: Vec::new()
})
}
pub fn enqueue<F>(&mut self, f: F) -> Result<(), TaskQueueError> where F: Fn() + Send + 'static, {
let task = Task { value: Box::new(f) };
self.sender.put(Message::Task(task));
let stats = TaskQueueStats::new(self);
let count = self.policy.get_count(stats);
if self.min_threads > count || count > self.max_threads {
return Err(TaskQueueError::illegal_policy_threads(self.min_threads, self.max_threads, count));
}
let mut runned = self.threads.len();
while runned != count {
if runned > count {
let info = self.threads.remove(0);
let receiver = info.receiver.clone();
self.closing_threads.push(info);
self.sender.put_with_priority(Some(receiver), Priority::High, Message::CloseThread);
runned -= 1;
} else {
let info = self.build_and_run()?;
self.threads.push(info);
runned += 1;
}
}
for i in (0..self.closing_threads.len()).rev() {
let is_thread_closed = {
let info = self.closing_threads.index(i);
info.closed.load(Ordering::SeqCst)
};
if is_thread_closed {
self.closing_threads.remove(i);
}
}
Ok(())
}
fn build_and_run(&mut self) -> Result<ThreadInfo, TaskQueueError> {
let receiver = self.sender.create_receiver();
let receiver_handle = receiver.handle();
let name = format!("TaskQueue::thread {}", receiver_handle);
let close_flag = Arc::new(AtomicBool::new(false));
let close_flag_clone = close_flag.clone();
let handle = Builder::new()
.name(name)
.spawn(move || Self::thread_update(close_flag_clone, receiver))?;
Ok(ThreadInfo::new(receiver_handle, handle, close_flag))
}
fn thread_update(close_flag: Arc<AtomicBool>, receiver: Receiver<Message>) {
loop {
let message = receiver.get();
match message {
Message::Task(t) => {
let _ = panic::catch_unwind(|| t.run());
},
Message::CloseThread => {
close_flag.store(true, Ordering::SeqCst);
return;
}
}
}
}
pub fn stop(mut self) -> Vec<JoinHandle<()>> {
self.stop_impl()
}
pub fn stop_wait(mut self) {
let handles = self.stop_impl();
for h in handles {
h.join().expect("Join error");
}
}
fn stop_impl(&mut self) -> Vec<JoinHandle<()>> {
for info in &self.threads {
self.sender.put_with_priority(Some(info.receiver), Priority::Min, Message::CloseThread);
}
self.threads
.drain(..)
.chain(self.closing_threads.drain(..))
.map(|t| t.handle)
.collect()
}
pub fn stop_immediately(mut self) -> Vec<Task> {
for info in &self.threads {
self.sender.put_with_priority(Some(info.receiver), Priority::High, Message::CloseThread);
}
let threads : Vec<ThreadInfo> = self.threads
.drain(..)
.chain(self.closing_threads.drain(..))
.collect();
for info in threads {
info.handle.join().expect("Join error");
}
let not_executed = self.sender.cancel_all();
let mut result = Vec::<Task>::new();
for m in not_executed {
let task = match m {
Message::Task(t) => t,
Message::CloseThread => panic!("This should never happen")
};
result.push(task);
}
result
}
pub fn set_spawn_policy(&mut self, policy: Box<SpawnPolicy>) {
self.policy = policy;
}
pub fn get_threads_count(&self) -> usize {
self.threads.len()
}
pub fn get_threads_max(&self) -> usize {
self.max_threads
}
pub fn get_threads_min(&self) -> usize {
self.min_threads
}
pub fn tasks_count(&self) -> usize {
self.sender.size()
}
}
impl Drop for TaskQueue {
fn drop(&mut self) {
self.stop_impl();
}
}
struct ThreadInfo {
receiver: ReceiverHandle,
handle: JoinHandle<()>,
closed: Arc<AtomicBool>
}
impl ThreadInfo {
fn new(receiver: ReceiverHandle, handle: JoinHandle<()>, close_flag: Arc<AtomicBool>) -> Self {
ThreadInfo {
receiver: receiver,
handle: handle,
closed: close_flag
}
}
}
enum Message {
Task(Task),
CloseThread,
}
pub struct Task {
value: Box<Fn() + Send>,
}
impl Task {
pub fn run(&self) {
(self.value)();
}
}
impl RefUnwindSafe for Task {
}
#[derive(Clone, Copy)]
pub struct TaskQueueStats {
pub threads_count: usize,
pub threads_max: usize,
pub threads_min: usize,
pub tasks_count: usize,
}
impl TaskQueueStats {
fn new(queue: &TaskQueue) -> Self {
TaskQueueStats {
threads_count: queue.get_threads_count(),
threads_max: queue.get_threads_max(),
threads_min: queue.get_threads_min(),
tasks_count: queue.tasks_count(),
}
}
pub fn empty() -> Self {
TaskQueueStats {
threads_count: 0,
threads_max: 0,
threads_min: 0,
tasks_count: 0
}
}
}