pub mod error;
pub mod spawn_policy;
mod pipe;
use std::thread::{ JoinHandle, Builder };
use error::TaskQueueError;
use pipe::Sender;
use pipe::Reciver;
use pipe::ReciverHandle;
use spawn_policy::SpawnPolicy;
use spawn_policy::StaticSpawnPolicy;
pub struct TaskQueue {
sender: Sender<Message>,
policy: Box<SpawnPolicy>,
min_threads: usize,
max_threads: usize,
threads: Vec<ThreadInfo>,
last_thread_id: i64
}
impl TaskQueue {
pub fn new() -> TaskQueue {
TaskQueue::with_threads(10, 10)
}
pub fn with_threads(min: usize, max: usize) -> TaskQueue {
TaskQueue {
sender: Sender::<Message>::new(),
policy: Box::new(StaticSpawnPolicy::new()),
min_threads: min,
max_threads: max,
threads: Vec::new(),
last_thread_id: 0
}
}
pub fn enqueue<F>(&mut self, f: F) -> Result<(), TaskQueueError> where F: Fn() + Send + 'static, {
let task = Task { value: Box::new(f) };
self.sender.put(Message::Task(task));
let count = self.policy.get_count(self);
let mut runned = self.threads.len();
if count > runned {
for _ in runned..count {
let info = try!(self.build_and_run());
self.threads.push(info);
}
} else {
loop {
if runned == count {
break;
}
let info = self.threads.remove(0);
self.sender.put_for(info.reciver, Message::CloseThread);
runned -= 1;
}
}
Ok(())
}
fn build_and_run(&mut self) -> Result<ThreadInfo, TaskQueueError> {
self.last_thread_id += 1;
let name = format!("TaskQueue::thread {}", self.last_thread_id);
let reciver = self.sender.create_reciver();
let reciver_handle = reciver.handle();
let handle = try!(Builder::new()
.name(name)
.spawn(move || TaskQueue::thread_update(reciver)));
Ok(ThreadInfo::new(reciver_handle, handle))
}
fn thread_update(reciver: Reciver<Message>) {
loop {
let message = reciver.get();
match message {
Message::Task(t) => t.run(),
Message::CloseThread => return,
}
}
}
pub fn stop(self) -> Result<Vec<Task>, TaskQueueError> {
for info in &self.threads {
self.sender.put_for(info.reciver.clone(), Message::CloseThread);
}
for info in self.threads {
if let Err(_) = info.handle.join() {
return Err(TaskQueueError::Join);
}
}
let not_executed = self.sender.cancel_all();
let mut result = Vec::<Task>::new();
for m in not_executed {
let task = match m {
Message::Task(t) => t,
Message::CloseThread => panic!("This should never happen")
};
result.push(task);
}
Ok(result)
}
pub fn get_threads_count(&self) -> usize {
self.threads.len()
}
pub fn get_max_threads(&self) -> usize {
self.max_threads
}
pub fn get_min_threads(&self) -> usize {
self.min_threads
}
pub fn set_spawn_policy(&mut self, policy: Box<SpawnPolicy>) {
self.policy = policy;
}
}
struct ThreadInfo {
reciver: ReciverHandle,
handle: JoinHandle<()>,
}
impl ThreadInfo {
fn new(reciver: ReciverHandle, handle: JoinHandle<()>) -> ThreadInfo {
ThreadInfo {
reciver: reciver,
handle: handle
}
}
}
enum Message {
Task(Task),
CloseThread,
}
pub struct Task {
value: Box<Fn() + Send + 'static>,
}
impl Task {
pub fn run(&self) {
(self.value)();
}
}