1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5use log::warn;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10 End,
11 NewJob(Job),
12}
13
14
15struct Worker {
16 _id: usize,
17 t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21 fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22 let t = thread::spawn(move || {
23 let mut list = vec![];
24 let mut count_flag: i64 = 0;
26 loop {
27 let message = receiver.lock().unwrap().recv().unwrap();
28 match message {
29 Message::NewJob(job) => {
30 list.push(job(id));
31 count_flag += 1;
33 if count_flag == 10000 {
34 warn!("Worker-new循环次数: 1w,强制退出");
35 break;
36 }
37 }
38 Message::End => {
39 break;
41 }
42 }
43 }
44 list
45 });
46 Worker {
47 _id: id,
48 t: Some(t),
49 }
50 }
51}
52
53pub struct Pool {
54 workers: Vec<Worker>,
55 max_workers: usize,
56 sender: mpsc::Sender<Message>,
57}
58
59impl Pool {
60 pub fn new(max_workers: usize) -> Pool {
61 if max_workers == 0 {
62 println!("max_workers 必须大于0")
63 }
64 let (tx, rx) = mpsc::channel();
65 let mut workers = Vec::with_capacity(max_workers);
66 let receiver = Arc::new(Mutex::new(rx));
67 for i in 0..max_workers {
68 workers.push(Worker::new(i, Arc::clone(&receiver)));
69 }
70 Pool { workers, max_workers, sender: tx }
71 }
72 pub fn execute<F>(&self, f: F)
73 where
74 F: 'static + Send + FnOnce(usize) -> JsonValue,
75 {
76 let job = Message::NewJob(Box::new(f));
77 self.sender.send(job).unwrap();
78 }
79 pub fn end(&mut self) -> JsonValue {
80 for _ in 0..self.max_workers {
81 self.sender.send(Message::End).unwrap();
82 }
83 let mut list = vec![];
84 for w in self.workers.iter_mut() {
85 if let Some(t) = w.t.take() {
86 let data = t.join().unwrap();
87 list.extend(data);
88 }
89 }
90 JsonValue::from(list)
91 }
92 pub fn insert_all(&mut self) -> (Vec<String>, String) {
93 for _ in 0..self.max_workers {
94 self.sender.send(Message::End).unwrap();
95 }
96 let mut list = String::new();
97 let mut id = vec![];
98 for w in self.workers.iter_mut() {
99 if let Some(t) = w.t.take() {
100 let data = t.join().unwrap();
101 for item in data.iter() {
102 id.push(item[0].to_string());
103 list = format!("{},{}", list, item[1].clone());
104 }
105 }
106 }
107 (id, list)
108 }
109}