dager/
executor.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5
6use crossbeam_channel::{Receiver, Sender, unbounded};
7use num_cpus;
8use log::{error, info};
9use std::{sync::{Arc, RwLock}, thread::JoinHandle, time::{Instant, Duration}};
10
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13pub enum ExecutorError{
14    CouldNotSendTask
15}
16
17///Timeout for in-thread task fetching (in ms).
18pub const TASK_FETCHING_TIMEOUT: u64 = 100;
19
20///A running thread instance that is able to join itself when dropped.
21struct Thread{
22    id: usize,
23    //Handle to this spawned thread
24    handle: Option<JoinHandle<()>>,
25    stop_sig: Arc<RwLock<bool>>,
26}
27
28impl Thread{
29    ///Creates a new self handling thread
30    fn new(id: usize, task_receiver: Receiver<Task>) -> Self{
31	//Stop signal that gets set to true if the thread should exit.
32	let stop_sig = Arc::new(RwLock::new(false));
33	let stpsig = stop_sig.clone();
34
35	//The worker thread 
36	let handle = std::thread::spawn(move ||{
37
38	    while !*stpsig.read().unwrap(){
39
40		//Try to get a new task within some time, otherwise check stopping criteria again.
41		if let Ok(t) = task_receiver.recv_timeout(Duration::from_millis(TASK_FETCHING_TIMEOUT)){
42		    info!("Exec task in [{}]", id);
43		    t.execute();
44		}
45	    }
46	    info!("Exit thread[{}]", id);
47	});
48
49	Thread{
50	    id,
51	    handle: Some(handle),
52	    stop_sig
53	}
54    }
55}
56
57impl Drop for Thread{
58    fn drop(&mut self){
59	info!("Dropping thread[{}]", self.id);
60	//Send stop signal
61	*self.stop_sig.write().unwrap() = true;
62	if let Some(hdl) = self.handle.take(){
63	    if let Err(e) = hdl.join(){
64		error!("Failed to join worker thread[{}], might be dangling now! Err: {:?}", self.id, e);
65	    }   
66	}
67    }
68}
69
70
71///Gets all the jobs an schedules them onto its inner threads. Is save to be cloned since it is only a managing reference to the threads.
72///
73/// When dropped, all executing jobs are finshed. However, jobs that are in line are not started. So this obj needs
74/// to be kept alive as long as you want to execute any jobs.
75pub struct Executor{
76    #[allow(dead_code)] //Needed to keep the threads alive.
77    threads: Vec<Thread>,
78    task_sender: Sender<Task>,
79    //used to check if all tasks are out of the channel.
80    task_receiver: Receiver<Task>,
81}
82
83impl Executor{
84    pub fn new() -> Arc<Self>{
85	let (task_sender, task_receiver) = unbounded();
86	//Get number of cpus, depending on that spawn an amount of worker threads
87	let threads: Vec<_> = (0..num_cpus::get()).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
88	info!("Created {} threads for executor", threads.len());
89	Arc::new(Executor{
90	    task_sender,
91	    threads,
92	    task_receiver
93	})
94    }
95
96    pub fn new_with_threads(num_threads: usize) -> Arc<Self>{
97	let (task_sender, task_receiver) = unbounded();
98	let threads: Vec<_> = (0..num_threads).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
99	info!("Created {} threads for executor based on input", threads.len());
100	Arc::new(Executor{
101	    task_sender,
102	    threads,
103	    task_receiver
104	})
105    }
106
107    ///Puts this task into the scheduler queue. The executor works via FIFO principle. 
108    pub fn schedule(&self, new_task: Task) -> Result<(), ExecutorError>{
109	if let Err(e) = self.task_sender.try_send(new_task){
110	    error!("Failed to schedule task: Could not send: {}", e);
111	    Err(ExecutorError::CouldNotSendTask)
112	}else{
113	    Ok(())
114	}
115    }
116
117    ///Graceful shutdown of the executor, by waiting (and blocking) for all tasks to finish. If you don't need that, just drop the executor, which will kill all
118    /// threads as fast as possible.
119    pub fn shutdown(&self){
120	while self.task_receiver.len() > 0{
121	    std::thread::sleep(Duration::from_millis(TASK_FETCHING_TIMEOUT));
122	}
123    }
124}
125
126///Contains the context of one job as well as scheduling information. Is passed to the executor for scheduling.
127pub struct Task{
128    ///The actual task that is executed
129    task: Box<dyn FnOnce() + Send>
130}
131
132impl Task{
133    pub fn new<T>(task: T) -> Self where T: FnOnce() + Send + 'static{
134	Task{
135	    task: Box::new(task)
136	}
137    }
138
139    pub fn execute(self){
140	(self.task)()
141    }
142}
143
144#[allow(dead_code)]
145fn sec_task(secs: u64) -> Task{
146    Task::new(move || {
147	let start = Instant::now();
148	while start.elapsed() < Duration::from_secs(secs){
149	    std::thread::sleep(Duration::from_millis(100));
150	}
151    })
152}
153
154#[test]
155fn test_executor(){
156    let ex = Executor::new();
157
158    let _t1 = sec_task(1);
159    let _t2 = sec_task(2);
160    let _t3 = sec_task(3);
161    let _t4 = sec_task(1);
162
163    ex.shutdown();
164}