1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10 End,
11 NewJob(Job),
12}
13
14
15struct Worker {
16 _id: usize,
17 t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21 fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22 let t = thread::spawn(move || {
23 let mut list = vec![];
25 loop {
26 let message = receiver.lock().unwrap().recv().unwrap();
27 match message {
28 Message::NewJob(job) => {
29 list.push(job(id));
30 }
32 Message::End => {
33 break;
35 }
36 }
37 }
38 list
39 });
40 Worker {
41 _id: id,
42 t: Some(t),
43 }
44 }
45}
46
47pub struct Pool {
48 workers: Vec<Worker>,
49 max_workers: usize,
50 sender: mpsc::Sender<Message>,
51}
52
53impl Pool {
54 pub fn new(max_workers: usize) -> Pool {
55 if max_workers <= 0 {
56 panic!("max_workers 必须大于0")
57 }
58 let (tx, rx) = mpsc::channel();
59 let mut workers = Vec::with_capacity(max_workers);
60 let receiver = Arc::new(Mutex::new(rx));
61 for i in 0..max_workers {
62 workers.push(Worker::new(i, Arc::clone(&receiver)));
63 }
64 Pool { workers, max_workers, sender: tx }
65 }
66 pub fn execute<F>(&self, f: F) where F: 'static + Send + FnOnce(usize) -> JsonValue {
67 let job = Message::NewJob(Box::new(f));
68 self.sender.send(job).unwrap();
69 }
70 pub fn end(&mut self) -> JsonValue {
71 for _ in 0..self.max_workers {
72 self.sender.send(Message::End).unwrap();
73 }
74 let mut list = vec![];
75 for w in self.workers.iter_mut() {
76 if let Some(t) = w.t.take() {
77 let data = t.join().unwrap();
78 list.extend(data);
79 }
80 }
81 return JsonValue::from(list);
82 }
83 pub fn insert_all(&mut self) -> (Vec<String>, String) {
84 for _ in 0..self.max_workers {
85 self.sender.send(Message::End).unwrap();
86 }
87 let mut list = String::new();
88 let mut id = vec![];
89 for w in self.workers.iter_mut() {
90 if let Some(t) = w.t.take() {
91 let data = t.join().unwrap();
92 for item in data.iter() {
93 id.push(item[0].to_string());
94 list = format!("{},{}", list, item[1].clone());
95 }
96 }
97 }
98 return (id, list);
99 }
100}