br_email/
pools.rs

1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5
6type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
7
8enum Message {
9    End,
10    NewJob(Job),
11}
12
13struct Worker {
14    _id: usize,
15    t: Option<JoinHandle<Vec<JsonValue>>>,
16}
17
18impl Worker {
19    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
20
21        // 设置线程栈大小为 4 MB
22        let stack_size = 8 * 1024 * 1024; // 4 MB
23
24        let t = thread::Builder::new()
25            .stack_size(stack_size)
26            .spawn(move || {
27                let mut list = vec![];
28                loop {
29                    let message = receiver.lock().unwrap().recv();
30                    match message {
31                        Ok(Message::NewJob(job)) => {
32                            list.push(job(id));
33                        }
34                        Ok(Message::End) => {
35                            break;
36                        }
37                        Err(_) => {
38                            break;
39                        }
40                    }
41                }
42                list
43            }).unwrap();
44        Worker {
45            _id: id,
46            t: Some(t),
47        }
48    }
49}
50
51pub struct Pool {
52    workers: Vec<Worker>,
53    max_workers: usize,
54    sender: mpsc::Sender<Message>,
55}
56
57impl Pool {
58    pub fn new(max_workers: usize) -> Pool {
59        if max_workers == 0 {
60            println!("max_workers 必须大于0")
61        }
62        let (tx, rx) = mpsc::channel();
63        let mut workers = Vec::with_capacity(max_workers);
64        let receiver = Arc::new(Mutex::new(rx));
65        for i in 0..max_workers {
66            workers.push(Worker::new(i, Arc::clone(&receiver)));
67        }
68        Pool { workers, max_workers, sender: tx }
69    }
70    pub fn execute<F>(&self, f: F)
71    where
72        F: 'static + Send + FnOnce(usize) -> JsonValue,
73    {
74        let job = Message::NewJob(Box::new(f));
75        self.sender.send(job).unwrap();
76    }
77    pub fn end(&mut self) -> JsonValue {
78        for _ in 0..self.max_workers {
79            self.sender.send(Message::End).unwrap();
80        }
81        let mut list = vec![];
82        for w in self.workers.iter_mut() {
83            if let Some(t) = w.t.take() {
84                let data = t.join().unwrap_or_default();
85                list.extend(data);
86            }
87        }
88        JsonValue::from(list)
89    }
90}
91
92impl Drop for Pool {
93    fn drop(&mut self) {
94        self.end();
95    }
96}