1use crossbeam_channel::{Receiver, Sender, unbounded};
7use num_cpus;
8use log::{error, info};
9use std::{sync::{Arc, RwLock}, thread::JoinHandle, time::{Instant, Duration}};
10
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13pub enum ExecutorError{
14 CouldNotSendTask
15}
16
17pub const TASK_FETCHING_TIMEOUT: u64 = 100;
19
20struct Thread{
22 id: usize,
23 handle: Option<JoinHandle<()>>,
25 stop_sig: Arc<RwLock<bool>>,
26}
27
28impl Thread{
29 fn new(id: usize, task_receiver: Receiver<Task>) -> Self{
31 let stop_sig = Arc::new(RwLock::new(false));
33 let stpsig = stop_sig.clone();
34
35 let handle = std::thread::spawn(move ||{
37
38 while !*stpsig.read().unwrap(){
39
40 if let Ok(t) = task_receiver.recv_timeout(Duration::from_millis(TASK_FETCHING_TIMEOUT)){
42 info!("Exec task in [{}]", id);
43 t.execute();
44 }
45 }
46 info!("Exit thread[{}]", id);
47 });
48
49 Thread{
50 id,
51 handle: Some(handle),
52 stop_sig
53 }
54 }
55}
56
57impl Drop for Thread{
58 fn drop(&mut self){
59 info!("Dropping thread[{}]", self.id);
60 *self.stop_sig.write().unwrap() = true;
62 if let Some(hdl) = self.handle.take(){
63 if let Err(e) = hdl.join(){
64 error!("Failed to join worker thread[{}], might be dangling now! Err: {:?}", self.id, e);
65 }
66 }
67 }
68}
69
70
71pub struct Executor{
76 #[allow(dead_code)] threads: Vec<Thread>,
78 task_sender: Sender<Task>,
79 task_receiver: Receiver<Task>,
81}
82
83impl Executor{
84 pub fn new() -> Arc<Self>{
85 let (task_sender, task_receiver) = unbounded();
86 let threads: Vec<_> = (0..num_cpus::get()).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
88 info!("Created {} threads for executor", threads.len());
89 Arc::new(Executor{
90 task_sender,
91 threads,
92 task_receiver
93 })
94 }
95
96 pub fn new_with_threads(num_threads: usize) -> Arc<Self>{
97 let (task_sender, task_receiver) = unbounded();
98 let threads: Vec<_> = (0..num_threads).into_iter().map(|i| Thread::new(i, task_receiver.clone())).collect();
99 info!("Created {} threads for executor based on input", threads.len());
100 Arc::new(Executor{
101 task_sender,
102 threads,
103 task_receiver
104 })
105 }
106
107 pub fn schedule(&self, new_task: Task) -> Result<(), ExecutorError>{
109 if let Err(e) = self.task_sender.try_send(new_task){
110 error!("Failed to schedule task: Could not send: {}", e);
111 Err(ExecutorError::CouldNotSendTask)
112 }else{
113 Ok(())
114 }
115 }
116
117 pub fn shutdown(&self){
120 while self.task_receiver.len() > 0{
121 std::thread::sleep(Duration::from_millis(TASK_FETCHING_TIMEOUT));
122 }
123 }
124}
125
126pub struct Task{
128 task: Box<dyn FnOnce() + Send>
130}
131
132impl Task{
133 pub fn new<T>(task: T) -> Self where T: FnOnce() + Send + 'static{
134 Task{
135 task: Box::new(task)
136 }
137 }
138
139 pub fn execute(self){
140 (self.task)()
141 }
142}
143
144#[allow(dead_code)]
145fn sec_task(secs: u64) -> Task{
146 Task::new(move || {
147 let start = Instant::now();
148 while start.elapsed() < Duration::from_secs(secs){
149 std::thread::sleep(Duration::from_millis(100));
150 }
151 })
152}
153
154#[test]
155fn test_executor(){
156 let ex = Executor::new();
157
158 let _t1 = sec_task(1);
159 let _t2 = sec_task(2);
160 let _t3 = sec_task(3);
161 let _t4 = sec_task(1);
162
163 ex.shutdown();
164}