dager 0.1.1

Crate to create and execute a graph of nodes.
Documentation
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */


use crossbeam_channel::{Receiver, Sender, unbounded};
use num_cpus;
use log::{error, info};
use std::{sync::{Arc, RwLock}, thread::JoinHandle, time::{Instant, Duration}};


#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ExecutorError{
    CouldNotSendTask
}

///Timeout for in-thread task fetching (in ms).
pub const TASK_FETCHING_TIMEOUT: u64 = 100;

///A running thread instance that is able to join itself when dropped.
struct Thread{
    id: usize,
    //Handle to this spawned thread
    handle: Option<JoinHandle<()>>,
    stop_sig: Arc<RwLock<bool>>,
}

impl Thread{
    ///Creates a new self handling thread
    fn new(id: usize, task_receiver: Receiver<Task>) -> Self{
	//Stop signal that gets set to true if the thread should exit.
	let stop_sig = Arc::new(RwLock::new(false));
	let stpsig = stop_sig.clone();

	//The worker thread 
	let handle = std::thread::spawn(move ||{

	    while !*stpsig.read().unwrap(){

		//Try to get a new task within some time, otherwise check stopping criteria again.
		if let Ok(t) = task_receiver.recv_timeout(Duration::from_millis(TASK_FETCHING_TIMEOUT)){
		    info!("Exec task in [{}]", id);
		    t.execute();
		}
	    }
	    info!("Exit thread[{}]", id);
	});

	Thread{
	    id,
	    handle: Some(handle),
	    stop_sig
	}
    }
}

impl Drop for Thread{
    fn drop(&mut self){
	info!("Dropping thread[{}]", self.id);
	//Send stop signal
	*self.stop_sig.write().unwrap() = true;
	if let Some(hdl) = self.handle.take(){
	    if let Err(e) = hdl.join(){
		error!("Failed to join worker thread[{}], might be dangling now! Err: {:?}", self.id, e);
	    }   
	}
    }
}


///Gets all the jobs an schedules them onto its inner threads. Is save to be cloned since it is only a managing reference to the threads.
///
/// When dropped, all executing jobs are finshed. However, jobs that are in line are not started. So this obj needs
/// to be kept alive as long as you want to execute any jobs.
pub struct Executor{
    #[allow(dead_code)] //Needed to keep the threads alive.
    threads: Vec<Thread>,
    task_sender: Sender<Task>,
    //used to check if all tasks are out of the channel.
    task_receiver: Receiver<Task>,
}

impl Executor{
    pub fn new() -> Arc<Self>{
	let (task_sender, task_receiver) = unbounded();
	//Get number of cpus, depending on that spawn an amount of worker threads
	let threads: Vec<_> = (0..num_cpus::get()).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
	info!("Created {} threads for executor", threads.len());
	Arc::new(Executor{
	    task_sender,
	    threads,
	    task_receiver
	})
    }

    pub fn new_with_threads(num_threads: usize) -> Arc<Self>{
	let (task_sender, task_receiver) = unbounded();
	let threads: Vec<_> = (0..num_threads).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
	info!("Created {} threads for executor based on input", threads.len());
	Arc::new(Executor{
	    task_sender,
	    threads,
	    task_receiver
	})
    }

    ///Puts this task into the scheduler queue. The executor works via FIFO principle. 
    pub fn schedule(&self, new_task: Task) -> Result<(), ExecutorError>{
	if let Err(e) = self.task_sender.try_send(new_task){
	    error!("Failed to schedule task: Could not send: {}", e);
	    Err(ExecutorError::CouldNotSendTask)
	}else{
	    Ok(())
	}
    }

    ///Graceful shutdown of the executor, by waiting (and blocking) for all tasks to finish. If you don't need that, just drop the executor, which will kill all
    /// threads as fast as possible.
    pub fn shutdown(&self){
	while self.task_receiver.len() > 0{
	    std::thread::sleep(Duration::from_millis(TASK_FETCHING_TIMEOUT));
	}
    }
}

///Contains the context of one job as well as scheduling information. Is passed to the executor for scheduling.
pub struct Task{
    ///The actual task that is executed
    task: Box<dyn FnOnce() + Send>
}

impl Task{
    pub fn new<T>(task: T) -> Self where T: FnOnce() + Send + 'static{
	Task{
	    task: Box::new(task)
	}
    }

    pub fn execute(self){
	(self.task)()
    }
}

#[allow(dead_code)]
fn sec_task(secs: u64) -> Task{
    Task::new(move || {
	let start = Instant::now();
	while start.elapsed() < Duration::from_secs(secs){
	    std::thread::sleep(Duration::from_millis(100));
	}
    })
}

#[test]
fn test_executor(){
    let ex = Executor::new();

    let _t1 = sec_task(1);
    let _t2 = sec_task(2);
    let _t3 = sec_task(3);
    let _t4 = sec_task(1);

    ex.shutdown();
}