Skip to main content

br_email/
pools.rs

1use json::JsonValue;
2use std::sync::{mpsc, Arc, Mutex};
3use std::thread;
4use std::thread::JoinHandle;
5
6type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
7
8enum Message {
9    End,
10    NewJob(Job),
11}
12
13struct Worker {
14    _id: usize,
15    t: Option<JoinHandle<Vec<JsonValue>>>,
16}
17
18impl Worker {
19    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
20        // 设置线程栈大小为 4 MB
21        let stack_size = 8 * 1024 * 1024; // 4 MB
22
23        let t = thread::Builder::new()
24            .stack_size(stack_size)
25            .spawn(move || {
26                let mut list = vec![];
27                loop {
28                    let message = receiver.lock().unwrap().recv();
29                    match message {
30                        Ok(Message::NewJob(job)) => {
31                            list.push(job(id));
32                        }
33                        Ok(Message::End) => {
34                            break;
35                        }
36                        Err(_) => {
37                            break;
38                        }
39                    }
40                }
41                list
42            })
43            .unwrap();
44        Worker {
45            _id: id,
46            t: Some(t),
47        }
48    }
49}
50
51pub struct Pool {
52    workers: Vec<Worker>,
53    max_workers: usize,
54    sender: Option<mpsc::Sender<Message>>,
55}
56
57impl Pool {
58    pub fn new(max_workers: usize) -> Pool {
59        if max_workers == 0 {
60            println!("max_workers 必须大于0")
61        }
62        let (tx, rx) = mpsc::channel();
63        let mut workers = Vec::with_capacity(max_workers);
64        let receiver = Arc::new(Mutex::new(rx));
65        for i in 0..max_workers {
66            workers.push(Worker::new(i, Arc::clone(&receiver)));
67        }
68        Pool {
69            workers,
70            max_workers,
71            sender: Some(tx),
72        }
73    }
74    pub fn execute<F>(&self, f: F)
75    where
76        F: 'static + Send + FnOnce(usize) -> JsonValue,
77    {
78        let job = Message::NewJob(Box::new(f));
79        if let Some(ref sender) = self.sender {
80            let _ = sender.send(job);
81        }
82    }
83    pub fn end(&mut self) -> JsonValue {
84        if let Some(sender) = self.sender.take() {
85            for _ in 0..self.max_workers {
86                let _ = sender.send(Message::End);
87            }
88        }
89        let mut list = vec![];
90        for w in self.workers.iter_mut() {
91            if let Some(t) = w.t.take() {
92                let data = t.join().unwrap_or_default();
93                list.extend(data);
94            }
95        }
96        JsonValue::from(list)
97    }
98}
99
100impl Drop for Pool {
101    fn drop(&mut self) {
102        self.end();
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn test_pool_new() {
112        let pool = Pool::new(2);
113        assert_eq!(pool.max_workers, 2);
114        assert_eq!(pool.workers.len(), 2);
115    }
116
117    #[test]
118    fn test_pool_new_zero_workers() {
119        let mut pool = Pool::new(0);
120        assert_eq!(pool.max_workers, 0);
121        assert_eq!(pool.workers.len(), 0);
122
123        let result = pool.end();
124        assert_eq!(result.len(), 0);
125    }
126
127    #[test]
128    fn test_pool_execute_single() {
129        let mut pool = Pool::new(2);
130        pool.execute(|id| json::JsonValue::from(format!("worker_{}", id)));
131        let result = pool.end();
132        assert_eq!(result.len(), 1);
133    }
134
135    #[test]
136    fn test_pool_execute_multiple() {
137        let mut pool = Pool::new(4);
138        for i in 0..10 {
139            pool.execute(move |_| json::JsonValue::from(i));
140        }
141        let result = pool.end();
142        assert_eq!(result.len(), 10);
143    }
144
145    #[test]
146    fn test_pool_execute_with_data() {
147        let mut pool = Pool::new(2);
148        pool.execute(|_| {
149            json::object! {
150                "name": "test",
151                "value": 42
152            }
153        });
154        let result = pool.end();
155        assert_eq!(result.len(), 1);
156        assert_eq!(result[0]["name"], "test");
157        assert_eq!(result[0]["value"], 42);
158    }
159
160    #[test]
161    fn test_pool_drop() {
162        let pool = Pool::new(2);
163        pool.execute(|_| json::JsonValue::from(1));
164        pool.execute(|_| json::JsonValue::from(2));
165        drop(pool);
166    }
167}