use crossbeam_channel::{Receiver, Sender, unbounded};
use num_cpus;
use log::{error, info};
use std::{sync::{Arc, RwLock}, thread::JoinHandle, time::{Instant, Duration}};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ExecutorError{
CouldNotSendTask
}
pub const TASK_FETCHING_TIMEOUT: u64 = 100;
struct Thread{
id: usize,
handle: Option<JoinHandle<()>>,
stop_sig: Arc<RwLock<bool>>,
}
impl Thread{
fn new(id: usize, task_receiver: Receiver<Task>) -> Self{
let stop_sig = Arc::new(RwLock::new(false));
let stpsig = stop_sig.clone();
let handle = std::thread::spawn(move ||{
while !*stpsig.read().unwrap(){
if let Ok(t) = task_receiver.recv_timeout(Duration::from_millis(TASK_FETCHING_TIMEOUT)){
info!("Exec task in [{}]", id);
t.execute();
}
}
info!("Exit thread[{}]", id);
});
Thread{
id,
handle: Some(handle),
stop_sig
}
}
}
impl Drop for Thread{
fn drop(&mut self){
info!("Dropping thread[{}]", self.id);
*self.stop_sig.write().unwrap() = true;
if let Some(hdl) = self.handle.take(){
if let Err(e) = hdl.join(){
error!("Failed to join worker thread[{}], might be dangling now! Err: {:?}", self.id, e);
}
}
}
}
pub struct Executor{
#[allow(dead_code)] threads: Vec<Thread>,
task_sender: Sender<Task>,
task_receiver: Receiver<Task>,
}
impl Executor{
pub fn new() -> Arc<Self>{
let (task_sender, task_receiver) = unbounded();
let threads: Vec<_> = (0..num_cpus::get()).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
info!("Created {} threads for executor", threads.len());
Arc::new(Executor{
task_sender,
threads,
task_receiver
})
}
pub fn new_with_threads(num_threads: usize) -> Arc<Self>{
let (task_sender, task_receiver) = unbounded();
let threads: Vec<_> = (0..num_threads).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
info!("Created {} threads for executor based on input", threads.len());
Arc::new(Executor{
task_sender,
threads,
task_receiver
})
}
pub fn schedule(&self, new_task: Task) -> Result<(), ExecutorError>{
if let Err(e) = self.task_sender.try_send(new_task){
error!("Failed to schedule task: Could not send: {}", e);
Err(ExecutorError::CouldNotSendTask)
}else{
Ok(())
}
}
pub fn shutdown(&self){
while self.task_receiver.len() > 0{
std::thread::sleep(Duration::from_millis(TASK_FETCHING_TIMEOUT));
}
}
}
pub struct Task{
task: Box<dyn FnOnce() + Send>
}
impl Task{
pub fn new<T>(task: T) -> Self where T: FnOnce() + Send + 'static{
Task{
task: Box::new(task)
}
}
pub fn execute(self){
(self.task)()
}
}
#[allow(dead_code)]
fn sec_task(secs: u64) -> Task{
Task::new(move || {
let start = Instant::now();
while start.elapsed() < Duration::from_secs(secs){
std::thread::sleep(Duration::from_millis(100));
}
})
}
#[test]
fn test_executor(){
let ex = Executor::new();
let _t1 = sec_task(1);
let _t2 = sec_task(2);
let _t3 = sec_task(3);
let _t4 = sec_task(1);
ex.shutdown();
}