1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5use log::warn;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10 End,
11 NewJob(Job),
12}
13
14
15struct Worker {
16 _id: usize,
17 t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21 fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22 let t = thread::spawn(move || {
23 let mut list = vec![];
24 let mut count_flag: i64 = 0;
26 loop {
27 let message = match receiver.lock() {
29 Ok(guard) => match guard.recv() {
30 Ok(msg) => msg,
31 Err(e) => {
32 warn!("Worker {} 接收消息失败: {}", id, e);
33 break;
34 }
35 }
36 Err(e) => {
37 warn!("Worker {} 获取锁失败: {}", id, e);
38 break;
39 }
40 };
41
42 match message {
43 Message::NewJob(job) => {
44 list.push(job(id));
45 count_flag += 1;
46 if count_flag == 10000 {
47 warn!("Worker {} 循环次数: 1w,强制退出", id);
48 break;
49 }
50 }
51 Message::End => {
52 break;
53 }
54 }
55 }
56 list
57 });
58 Worker {
59 _id: id,
60 t: Some(t),
61 }
62 }
63}
64
65pub struct Pool {
66 workers: Vec<Worker>,
67 max_workers: usize,
68 sender: mpsc::Sender<Message>,
69}
70
71impl Pool {
72 pub fn new(max_workers: usize) -> Pool {
73 if max_workers == 0 {
74 println!("max_workers 必须大于0")
75 }
76 let (tx, rx) = mpsc::channel();
77 let mut workers = Vec::with_capacity(max_workers);
78 let receiver = Arc::new(Mutex::new(rx));
79 for i in 0..max_workers {
80 workers.push(Worker::new(i, Arc::clone(&receiver)));
81 }
82 Pool { workers, max_workers, sender: tx }
83 }
84 pub fn execute<F>(&self, f: F)
85 where
86 F: 'static + Send + FnOnce(usize) -> JsonValue,
87 {
88 let job = Message::NewJob(Box::new(f));
89 if let Err(e) = self.sender.send(job) {
90 warn!("发送任务失败: {}", e);
91 }
92 }
93 pub fn end(&mut self) -> JsonValue {
94 for _ in 0..self.max_workers {
96 if let Err(e) = self.sender.send(Message::End) {
97 warn!("发送结束消息失败: {}", e);
98 }
99 }
100 let mut list = Vec::new();
101 for w in self.workers.iter_mut() {
102 if let Some(t) = w.t.take() {
103 match t.join() {
104 Ok(data) => {
105 list.extend(data);
106 }
107 Err(e) => {
108 warn!("线程连接失败: {:?}", e);
109 }
110 }
111 }
112 }
113 JsonValue::from(list)
114 }
115 pub fn insert_all(&mut self) -> (Vec<String>, String) {
116 for _ in 0..self.max_workers {
117 if let Err(e) = self.sender.send(Message::End) {
118 warn!("发送结束消息失败: {}", e);
119 }
120 }
121 let mut id = Vec::new();
123 let mut list_parts = Vec::new();
124
125 for w in self.workers.iter_mut() {
126 if let Some(t) = w.t.take() {
127 match t.join() {
128 Ok(data) => {
129 for item in data.iter() {
130 id.push(item[0].to_string());
131 list_parts.push(item[1].to_string());
132 }
133 }
134 Err(e) => {
135 warn!("线程连接失败: {:?}", e);
136 }
137 }
138 }
139 }
140 let list = list_parts.join(",");
142 (id, list)
143 }
144}