Skip to main content

br_email/
pools.rs

1use json::JsonValue;
2use std::sync::{mpsc, Arc, Mutex};
3use std::thread;
4use std::thread::JoinHandle;
5
6type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
7
8enum Message {
9    End,
10    NewJob(Job),
11}
12
13struct Worker {
14    _id: usize,
15    t: Option<JoinHandle<Vec<JsonValue>>>,
16}
17
18impl Worker {
19    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
20        // 设置线程栈大小为 4 MB
21        let stack_size = 8 * 1024 * 1024; // 8 MB
22
23        let t = thread::Builder::new()
24            .stack_size(stack_size)
25            .spawn(move || {
26                let mut list = vec![];
27                loop {
28                    let message = match receiver.lock() {
29                        Ok(guard) => guard.recv(),
30                        Err(_) => break,
31                    };
32                    match message {
33                        Ok(Message::NewJob(job)) => {
34                            list.push(job(id));
35                        }
36                        Ok(Message::End) => {
37                            break;
38                        }
39                        Err(_) => {
40                            break;
41                        }
42                    }
43                }
44                list
45            });
46        match t {
47            Ok(handle) => Worker {
48                _id: id,
49                t: Some(handle),
50            },
51            Err(e) => {
52                log::error!("线程创建失败: {e}");
53                Worker { _id: id, t: None }
54            }
55        }
56    }
57}
58
59pub struct Pool {
60    workers: Vec<Worker>,
61    max_workers: usize,
62    sender: Option<mpsc::Sender<Message>>,
63}
64
65impl Pool {
66    pub fn new(max_workers: usize) -> Pool {
67        if max_workers == 0 {
68            log::warn!("max_workers 必须大于0, 线程池将为空");
69        }
70        let (tx, rx) = mpsc::channel();
71        let mut workers = Vec::with_capacity(max_workers);
72        let receiver = Arc::new(Mutex::new(rx));
73        for i in 0..max_workers {
74            workers.push(Worker::new(i, Arc::clone(&receiver)));
75        }
76        Pool {
77            workers,
78            max_workers,
79            sender: Some(tx),
80        }
81    }
82    pub fn execute<F>(&self, f: F)
83    where
84        F: 'static + Send + FnOnce(usize) -> JsonValue,
85    {
86        let job = Message::NewJob(Box::new(f));
87        if let Some(ref sender) = self.sender {
88            let _ = sender.send(job);
89        }
90    }
91    pub fn end(&mut self) -> JsonValue {
92        if let Some(sender) = self.sender.take() {
93            for _ in 0..self.max_workers {
94                let _ = sender.send(Message::End);
95            }
96        }
97        let mut list = vec![];
98        for w in self.workers.iter_mut() {
99            if let Some(t) = w.t.take() {
100                let data = t.join().unwrap_or_default();
101                list.extend(data);
102            }
103        }
104        JsonValue::from(list)
105    }
106}
107
108impl Drop for Pool {
109    fn drop(&mut self) {
110        self.end();
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    #[test]
119    fn test_pool_new() {
120        let pool = Pool::new(2);
121        assert_eq!(pool.max_workers, 2);
122        assert_eq!(pool.workers.len(), 2);
123    }
124
125    #[test]
126    fn test_pool_new_zero_workers() {
127        let mut pool = Pool::new(0);
128        assert_eq!(pool.max_workers, 0);
129        assert_eq!(pool.workers.len(), 0);
130
131        let result = pool.end();
132        assert_eq!(result.len(), 0);
133    }
134
135    #[test]
136    fn test_pool_execute_single() {
137        let mut pool = Pool::new(2);
138        pool.execute(|id| json::JsonValue::from(format!("worker_{}", id)));
139        let result = pool.end();
140        assert_eq!(result.len(), 1);
141    }
142
143    #[test]
144    fn test_pool_execute_multiple() {
145        let mut pool = Pool::new(4);
146        for i in 0..10 {
147            pool.execute(move |_| json::JsonValue::from(i));
148        }
149        let result = pool.end();
150        assert_eq!(result.len(), 10);
151    }
152
153    #[test]
154    fn test_pool_execute_with_data() {
155        let mut pool = Pool::new(2);
156        pool.execute(|_| {
157            json::object! {
158                "name": "test",
159                "value": 42
160            }
161        });
162        let result = pool.end();
163        assert_eq!(result.len(), 1);
164        assert_eq!(result[0]["name"], "test");
165        assert_eq!(result[0]["value"], 42);
166    }
167
168    #[test]
169    fn test_pool_drop() {
170        let pool = Pool::new(2);
171        pool.execute(|_| json::JsonValue::from(1));
172        pool.execute(|_| json::JsonValue::from(2));
173        drop(pool);
174    }
175}