1use json::JsonValue;
2use std::sync::{mpsc, Arc, Mutex};
3use std::thread;
4use std::thread::JoinHandle;
5
6type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
7
8enum Message {
9 End,
10 NewJob(Job),
11}
12
13struct Worker {
14 _id: usize,
15 t: Option<JoinHandle<Vec<JsonValue>>>,
16}
17
18impl Worker {
19 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
20 let stack_size = 8 * 1024 * 1024; let t = thread::Builder::new()
24 .stack_size(stack_size)
25 .spawn(move || {
26 let mut list = vec![];
27 loop {
28 let message = receiver.lock().unwrap().recv();
29 match message {
30 Ok(Message::NewJob(job)) => {
31 list.push(job(id));
32 }
33 Ok(Message::End) => {
34 break;
35 }
36 Err(_) => {
37 break;
38 }
39 }
40 }
41 list
42 })
43 .unwrap();
44 Worker {
45 _id: id,
46 t: Some(t),
47 }
48 }
49}
50
51pub struct Pool {
52 workers: Vec<Worker>,
53 max_workers: usize,
54 sender: Option<mpsc::Sender<Message>>,
55}
56
57impl Pool {
58 pub fn new(max_workers: usize) -> Pool {
59 if max_workers == 0 {
60 println!("max_workers 必须大于0")
61 }
62 let (tx, rx) = mpsc::channel();
63 let mut workers = Vec::with_capacity(max_workers);
64 let receiver = Arc::new(Mutex::new(rx));
65 for i in 0..max_workers {
66 workers.push(Worker::new(i, Arc::clone(&receiver)));
67 }
68 Pool {
69 workers,
70 max_workers,
71 sender: Some(tx),
72 }
73 }
74 pub fn execute<F>(&self, f: F)
75 where
76 F: 'static + Send + FnOnce(usize) -> JsonValue,
77 {
78 let job = Message::NewJob(Box::new(f));
79 if let Some(ref sender) = self.sender {
80 let _ = sender.send(job);
81 }
82 }
83 pub fn end(&mut self) -> JsonValue {
84 if let Some(sender) = self.sender.take() {
85 for _ in 0..self.max_workers {
86 let _ = sender.send(Message::End);
87 }
88 }
89 let mut list = vec![];
90 for w in self.workers.iter_mut() {
91 if let Some(t) = w.t.take() {
92 let data = t.join().unwrap_or_default();
93 list.extend(data);
94 }
95 }
96 JsonValue::from(list)
97 }
98}
99
100impl Drop for Pool {
101 fn drop(&mut self) {
102 self.end();
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 #[test]
111 fn test_pool_new() {
112 let pool = Pool::new(2);
113 assert_eq!(pool.max_workers, 2);
114 assert_eq!(pool.workers.len(), 2);
115 }
116
117 #[test]
118 fn test_pool_new_zero_workers() {
119 let mut pool = Pool::new(0);
120 assert_eq!(pool.max_workers, 0);
121 assert_eq!(pool.workers.len(), 0);
122
123 let result = pool.end();
124 assert_eq!(result.len(), 0);
125 }
126
127 #[test]
128 fn test_pool_execute_single() {
129 let mut pool = Pool::new(2);
130 pool.execute(|id| json::JsonValue::from(format!("worker_{}", id)));
131 let result = pool.end();
132 assert_eq!(result.len(), 1);
133 }
134
135 #[test]
136 fn test_pool_execute_multiple() {
137 let mut pool = Pool::new(4);
138 for i in 0..10 {
139 pool.execute(move |_| json::JsonValue::from(i));
140 }
141 let result = pool.end();
142 assert_eq!(result.len(), 10);
143 }
144
145 #[test]
146 fn test_pool_execute_with_data() {
147 let mut pool = Pool::new(2);
148 pool.execute(|_| {
149 json::object! {
150 "name": "test",
151 "value": 42
152 }
153 });
154 let result = pool.end();
155 assert_eq!(result.len(), 1);
156 assert_eq!(result[0]["name"], "test");
157 assert_eq!(result[0]["value"], 42);
158 }
159
160 #[test]
161 fn test_pool_drop() {
162 let pool = Pool::new(2);
163 pool.execute(|_| json::JsonValue::from(1));
164 pool.execute(|_| json::JsonValue::from(2));
165 drop(pool);
166 }
167}