1use json::JsonValue;
2use log::warn;
3use std::sync::{mpsc, Arc, Mutex};
4use std::thread;
5use std::thread::JoinHandle;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10 End,
11 NewJob(Job),
12}
13
14struct Worker {
15 _id: usize,
16 t: Option<JoinHandle<Vec<JsonValue>>>,
17}
18
19impl Worker {
20 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
21 let t = thread::spawn(move || {
22 let mut list = vec![];
23 let mut count_flag: i64 = 0;
24 loop {
25 let message = match receiver.lock() {
26 Ok(guard) => match guard.recv() {
27 Ok(msg) => msg,
28 Err(_) => break,
29 },
30 Err(_) => break,
31 };
32 match message {
33 Message::NewJob(job) => {
34 list.push(job(id));
35 count_flag += 1;
36 if count_flag == 10000 {
37 warn!("Worker-new循环次数: 1w,强制退出");
38 break;
39 }
40 }
41 Message::End => {
42 break;
43 }
44 }
45 }
46 list
47 });
48 Worker {
49 _id: id,
50 t: Some(t),
51 }
52 }
53}
54
55pub struct Pool {
56 workers: Vec<Worker>,
57 max_workers: usize,
58 sender: mpsc::Sender<Message>,
59}
60
61impl Pool {
62 pub fn new(max_workers: usize) -> Pool {
63 if max_workers == 0 {
64 println!("max_workers 必须大于0")
65 }
66 let (tx, rx) = mpsc::channel();
67 let mut workers = Vec::with_capacity(max_workers);
68 let receiver = Arc::new(Mutex::new(rx));
69 for i in 0..max_workers {
70 workers.push(Worker::new(i, Arc::clone(&receiver)));
71 }
72 Pool {
73 workers,
74 max_workers,
75 sender: tx,
76 }
77 }
78 pub fn execute<F>(&self, f: F)
79 where
80 F: 'static + Send + FnOnce(usize) -> JsonValue,
81 {
82 let job = Message::NewJob(Box::new(f));
83 let _ = self.sender.send(job);
84 }
85 pub fn end(&mut self) -> JsonValue {
86 for _ in 0..self.max_workers {
87 let _ = self.sender.send(Message::End);
88 }
89 let mut list = vec![];
90 for w in self.workers.iter_mut() {
91 if let Some(t) = w.t.take() {
92 if let Ok(data) = t.join() {
93 list.extend(data);
94 }
95 }
96 }
97 JsonValue::from(list)
98 }
99 pub fn insert_all(&mut self) -> (Vec<String>, String) {
100 for _ in 0..self.max_workers {
101 let _ = self.sender.send(Message::End);
102 }
103 let mut list = String::new();
104 let mut id = vec![];
105 for w in self.workers.iter_mut() {
106 if let Some(t) = w.t.take() {
107 if let Ok(data) = t.join() {
108 for item in data.iter() {
109 id.push(item[0].to_string());
110 list = format!("{},{}", list, item[1].clone());
111 }
112 }
113 }
114 }
115 (id, list)
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use json::{array, object, JsonValue};
123 use std::sync::atomic::{AtomicUsize, Ordering};
124 use std::sync::Arc;
125
126 #[test]
127 fn new_with_1_worker() {
128 let pool = Pool::new(1);
129 assert_eq!(pool.max_workers, 1);
130 assert_eq!(pool.workers.len(), 1);
131 }
132
133 #[test]
134 fn new_with_2_workers() {
135 let pool = Pool::new(2);
136 assert_eq!(pool.max_workers, 2);
137 assert_eq!(pool.workers.len(), 2);
138 }
139
140 #[test]
141 fn new_with_4_workers() {
142 let pool = Pool::new(4);
143 assert_eq!(pool.max_workers, 4);
144 assert_eq!(pool.workers.len(), 4);
145 }
146
147 #[test]
148 fn new_with_0_workers_does_not_panic() {
149 let pool = Pool::new(0);
150 assert_eq!(pool.max_workers, 0);
151 assert_eq!(pool.workers.len(), 0);
152 }
153
154 #[test]
155 fn execute_single_job_and_end() {
156 let mut pool = Pool::new(1);
157 pool.execute(|_worker_id| JsonValue::from(42));
158 let result = pool.end();
159 assert!(result.is_array());
160 assert_eq!(result.len(), 1);
161 assert_eq!(result[0], 42);
162 }
163
164 #[test]
165 fn execute_multiple_jobs_single_worker() {
166 let mut pool = Pool::new(1);
167 for i in 0..5 {
168 pool.execute(move |_| JsonValue::from(i));
169 }
170 let result = pool.end();
171 assert_eq!(result.len(), 5);
172 let mut values: Vec<i32> = (0..5).map(|i| result[i].as_i32().unwrap()).collect();
173 values.sort();
174 assert_eq!(values, vec![0, 1, 2, 3, 4]);
175 }
176
177 #[test]
178 fn execute_multiple_jobs_multiple_workers() {
179 let mut pool = Pool::new(4);
180 let counter = Arc::new(AtomicUsize::new(0));
181 for _ in 0..20 {
182 let c = Arc::clone(&counter);
183 pool.execute(move |_| {
184 c.fetch_add(1, Ordering::SeqCst);
185 JsonValue::from(1)
186 });
187 }
188 let result = pool.end();
189 assert_eq!(result.len(), 20);
190 assert_eq!(counter.load(Ordering::SeqCst), 20);
191 }
192
193 #[test]
194 fn end_with_no_jobs_returns_empty_array() {
195 let mut pool = Pool::new(2);
196 let result = pool.end();
197 assert!(result.is_array());
198 assert_eq!(result.len(), 0);
199 }
200
201 #[test]
202 fn jobs_receive_valid_worker_id() {
203 let mut pool = Pool::new(4);
204 for _ in 0..20 {
205 pool.execute(|worker_id| {
206 assert!(worker_id < 4);
207 JsonValue::from(worker_id)
208 });
209 }
210 let result = pool.end();
211 assert_eq!(result.len(), 20);
212 for i in 0..result.len() {
213 let wid = result[i].as_usize().unwrap();
214 assert!(wid < 4);
215 }
216 }
217
218 #[test]
219 fn execute_returns_complex_json() {
220 let mut pool = Pool::new(2);
221 pool.execute(|_| object! { "name": "alice", "age": 30 });
222 pool.execute(|_| object! { "name": "bob", "age": 25 });
223 let result = pool.end();
224 assert_eq!(result.len(), 2);
225 let mut names: Vec<String> = (0..2)
226 .map(|i| result[i]["name"].as_str().unwrap().to_string())
227 .collect();
228 names.sort();
229 assert_eq!(names, vec!["alice", "bob"]);
230 }
231
232 #[test]
233 fn insert_all_single_job() {
234 let mut pool = Pool::new(1);
235 pool.execute(|_| array!["id_1", "(1,'hello')"]);
236 let (ids, values) = pool.insert_all();
237 assert_eq!(ids, vec!["id_1"]);
238 assert_eq!(values, ",(1,'hello')");
239 }
240
241 #[test]
242 fn insert_all_multiple_jobs() {
243 let mut pool = Pool::new(1);
244 pool.execute(|_| array!["id_a", "(1,'a')"]);
245 pool.execute(|_| array!["id_b", "(2,'b')"]);
246 pool.execute(|_| array!["id_c", "(3,'c')"]);
247 let (ids, values) = pool.insert_all();
248 assert_eq!(ids.len(), 3);
249 assert!(ids.contains(&"id_a".to_string()));
250 assert!(ids.contains(&"id_b".to_string()));
251 assert!(ids.contains(&"id_c".to_string()));
252 assert!(values.contains("(1,'a')"));
253 assert!(values.contains("(2,'b')"));
254 assert!(values.contains("(3,'c')"));
255 }
256
257 #[test]
258 fn insert_all_no_jobs_returns_empty() {
259 let mut pool = Pool::new(2);
260 let (ids, values) = pool.insert_all();
261 assert!(ids.is_empty());
262 assert!(values.is_empty());
263 }
264
265 #[test]
266 fn insert_all_multiple_workers() {
267 let mut pool = Pool::new(3);
268 for i in 0..9 {
269 let id_str = format!("id_{}", i);
270 let val_str = format!("({},'v{}')", i, i);
271 pool.execute(move |_| array![id_str.as_str(), val_str.as_str()]);
272 }
273 let (ids, values) = pool.insert_all();
274 assert_eq!(ids.len(), 9);
275 for i in 0..9 {
276 assert!(ids.contains(&format!("id_{}", i)));
277 assert!(values.contains(&format!("({},'v{}')", i, i)));
278 }
279 }
280
281 #[test]
282 fn worker_count_matches_requested() {
283 for n in [1, 2, 3, 5, 8] {
284 let mut pool = Pool::new(n);
285 assert_eq!(pool.workers.len(), n);
286 assert_eq!(pool.max_workers, n);
287 pool.end();
288 }
289 }
290}