1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5use log::warn;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10 End,
11 NewJob(Job),
12}
13
14
15struct Worker {
16 _id: usize,
17 t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21 fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22 let t = thread::spawn(move || {
23 let mut list = vec![];
25 let mut count_flag: i64 = 0;
27 loop {
28 let message = receiver.lock().unwrap().recv().unwrap();
29 match message {
30 Message::NewJob(job) => {
31 list.push(job(id));
32 count_flag += 1;
34 if count_flag == 10000 {
35 warn!("Worker-new循环次数: 1w,强制退出");
36 break;
37 }
38 }
39 Message::End => {
40 break;
42 }
43 }
44 }
45 list
46 });
47 Worker {
48 _id: id,
49 t: Some(t),
50 }
51 }
52}
53
54pub struct Pool {
55 workers: Vec<Worker>,
56 max_workers: usize,
57 sender: mpsc::Sender<Message>,
58}
59
60impl Pool {
61 pub fn new(max_workers: usize) -> Pool {
62 if max_workers == 0 {
63 println!("max_workers 必须大于0")
64 }
65 let (tx, rx) = mpsc::channel();
66 let mut workers = Vec::with_capacity(max_workers);
67 let receiver = Arc::new(Mutex::new(rx));
68 for i in 0..max_workers {
69 workers.push(Worker::new(i, Arc::clone(&receiver)));
70 }
71 Pool { workers, max_workers, sender: tx }
72 }
73 pub fn execute<F>(&self, f: F)
74 where
75 F: 'static + Send + FnOnce(usize) -> JsonValue,
76 {
77 let job = Message::NewJob(Box::new(f));
78 self.sender.send(job).unwrap();
79 }
80 pub fn end(&mut self) -> JsonValue {
81 for _ in 0..self.max_workers {
82 self.sender.send(Message::End).unwrap();
83 }
84 let mut list = vec![];
85 for w in self.workers.iter_mut() {
86 if let Some(t) = w.t.take() {
87 let data = t.join().unwrap();
88 list.extend(data);
89 }
90 }
91 JsonValue::from(list)
92 }
93 pub fn insert_all(&mut self) -> (Vec<String>, String) {
94 for _ in 0..self.max_workers {
95 self.sender.send(Message::End).unwrap();
96 }
97 let mut list = String::new();
98 let mut id = vec![];
99 for w in self.workers.iter_mut() {
100 if let Some(t) = w.t.take() {
101 let data = t.join().unwrap();
102 for item in data.iter() {
103 id.push(item[0].to_string());
104 list = format!("{},{}", list, item[1].clone());
105 }
106 }
107 }
108 (id, list)
109 }
110}