1use json::JsonValue;
2use std::sync::{mpsc, Arc, Mutex};
3use std::thread;
4use std::thread::JoinHandle;
5
6type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
7
8enum Message {
9 End,
10 NewJob(Job),
11}
12
13struct Worker {
14 _id: usize,
15 t: Option<JoinHandle<Vec<JsonValue>>>,
16}
17
18impl Worker {
19 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
20 let stack_size = 8 * 1024 * 1024; let t = thread::Builder::new()
24 .stack_size(stack_size)
25 .spawn(move || {
26 let mut list = vec![];
27 loop {
28 let message = match receiver.lock() {
29 Ok(guard) => guard.recv(),
30 Err(_) => break,
31 };
32 match message {
33 Ok(Message::NewJob(job)) => {
34 list.push(job(id));
35 }
36 Ok(Message::End) => {
37 break;
38 }
39 Err(_) => {
40 break;
41 }
42 }
43 }
44 list
45 });
46 match t {
47 Ok(handle) => Worker {
48 _id: id,
49 t: Some(handle),
50 },
51 Err(e) => {
52 log::error!("线程创建失败: {e}");
53 Worker { _id: id, t: None }
54 }
55 }
56 }
57}
58
59pub struct Pool {
60 workers: Vec<Worker>,
61 max_workers: usize,
62 sender: Option<mpsc::Sender<Message>>,
63}
64
65impl Pool {
66 pub fn new(max_workers: usize) -> Pool {
67 if max_workers == 0 {
68 log::warn!("max_workers 必须大于0, 线程池将为空");
69 }
70 let (tx, rx) = mpsc::channel();
71 let mut workers = Vec::with_capacity(max_workers);
72 let receiver = Arc::new(Mutex::new(rx));
73 for i in 0..max_workers {
74 workers.push(Worker::new(i, Arc::clone(&receiver)));
75 }
76 Pool {
77 workers,
78 max_workers,
79 sender: Some(tx),
80 }
81 }
82 pub fn execute<F>(&self, f: F)
83 where
84 F: 'static + Send + FnOnce(usize) -> JsonValue,
85 {
86 let job = Message::NewJob(Box::new(f));
87 if let Some(ref sender) = self.sender {
88 let _ = sender.send(job);
89 }
90 }
91 pub fn end(&mut self) -> JsonValue {
92 if let Some(sender) = self.sender.take() {
93 for _ in 0..self.max_workers {
94 let _ = sender.send(Message::End);
95 }
96 }
97 let mut list = vec![];
98 for w in self.workers.iter_mut() {
99 if let Some(t) = w.t.take() {
100 let data = t.join().unwrap_or_default();
101 list.extend(data);
102 }
103 }
104 JsonValue::from(list)
105 }
106}
107
108impl Drop for Pool {
109 fn drop(&mut self) {
110 self.end();
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[test]
119 fn test_pool_new() {
120 let pool = Pool::new(2);
121 assert_eq!(pool.max_workers, 2);
122 assert_eq!(pool.workers.len(), 2);
123 }
124
125 #[test]
126 fn test_pool_new_zero_workers() {
127 let mut pool = Pool::new(0);
128 assert_eq!(pool.max_workers, 0);
129 assert_eq!(pool.workers.len(), 0);
130
131 let result = pool.end();
132 assert_eq!(result.len(), 0);
133 }
134
135 #[test]
136 fn test_pool_execute_single() {
137 let mut pool = Pool::new(2);
138 pool.execute(|id| json::JsonValue::from(format!("worker_{}", id)));
139 let result = pool.end();
140 assert_eq!(result.len(), 1);
141 }
142
143 #[test]
144 fn test_pool_execute_multiple() {
145 let mut pool = Pool::new(4);
146 for i in 0..10 {
147 pool.execute(move |_| json::JsonValue::from(i));
148 }
149 let result = pool.end();
150 assert_eq!(result.len(), 10);
151 }
152
153 #[test]
154 fn test_pool_execute_with_data() {
155 let mut pool = Pool::new(2);
156 pool.execute(|_| {
157 json::object! {
158 "name": "test",
159 "value": 42
160 }
161 });
162 let result = pool.end();
163 assert_eq!(result.len(), 1);
164 assert_eq!(result[0]["name"], "test");
165 assert_eq!(result[0]["value"], 42);
166 }
167
168 #[test]
169 fn test_pool_drop() {
170 let pool = Pool::new(2);
171 pool.execute(|_| json::JsonValue::from(1));
172 pool.execute(|_| json::JsonValue::from(2));
173 drop(pool);
174 }
175}