Skip to main content

br_email/
pools.rs

1use json::JsonValue;
2use std::sync::{mpsc, Arc, Mutex};
3use std::thread;
4use std::thread::JoinHandle;
5
6type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
7
8enum Message {
9    End,
10    NewJob(Job),
11}
12
13struct Worker {
14    _id: usize,
15    t: Option<JoinHandle<Vec<JsonValue>>>,
16}
17
18impl Worker {
19    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
20        // 设置线程栈大小为 4 MB
21        let stack_size = 8 * 1024 * 1024; // 4 MB
22
23        let t = thread::Builder::new()
24            .stack_size(stack_size)
25            .spawn(move || {
26                let mut list = vec![];
27                loop {
28                    let message = receiver.lock().unwrap().recv();
29                    match message {
30                        Ok(Message::NewJob(job)) => {
31                            list.push(job(id));
32                        }
33                        Ok(Message::End) => {
34                            break;
35                        }
36                        Err(_) => {
37                            break;
38                        }
39                    }
40                }
41                list
42            })
43            .unwrap();
44        Worker {
45            _id: id,
46            t: Some(t),
47        }
48    }
49}
50
51pub struct Pool {
52    workers: Vec<Worker>,
53    max_workers: usize,
54    sender: mpsc::Sender<Message>,
55}
56
57impl Pool {
58    pub fn new(max_workers: usize) -> Pool {
59        if max_workers == 0 {
60            println!("max_workers 必须大于0")
61        }
62        let (tx, rx) = mpsc::channel();
63        let mut workers = Vec::with_capacity(max_workers);
64        let receiver = Arc::new(Mutex::new(rx));
65        for i in 0..max_workers {
66            workers.push(Worker::new(i, Arc::clone(&receiver)));
67        }
68        Pool {
69            workers,
70            max_workers,
71            sender: tx,
72        }
73    }
74    pub fn execute<F>(&self, f: F)
75    where
76        F: 'static + Send + FnOnce(usize) -> JsonValue,
77    {
78        let job = Message::NewJob(Box::new(f));
79        self.sender.send(job).unwrap();
80    }
81    pub fn end(&mut self) -> JsonValue {
82        for _ in 0..self.max_workers {
83            self.sender.send(Message::End).unwrap();
84        }
85        let mut list = vec![];
86        for w in self.workers.iter_mut() {
87            if let Some(t) = w.t.take() {
88                let data = t.join().unwrap_or_default();
89                list.extend(data);
90            }
91        }
92        JsonValue::from(list)
93    }
94}
95
96impl Drop for Pool {
97    fn drop(&mut self) {
98        self.end();
99    }
100}