br_db/
pools.rs

1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5use log::warn;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10    End,
11    NewJob(Job),
12}
13
14
15struct Worker {
16    _id: usize,
17    t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22        let t = thread::spawn(move || {
23            // let mut index = 0;
24            let mut list = vec![];
25            // 计算loop执行次数
26            let mut count_flag: i64 = 0;
27            loop {
28                let message = receiver.lock().unwrap().recv().unwrap();
29                match message {
30                    Message::NewJob(job) => {
31                        list.push(job(id));
32                        // index += 1;
33                        count_flag += 1;
34                        if count_flag == 10000 {
35                            warn!("Worker-new循环次数: 1w,强制退出");
36                            break;
37                        }
38                    }
39                    Message::End => {
40                        // info!("结束 线程[{}] {}", id, index);
41                        break;
42                    }
43                }
44            }
45            list
46        });
47        Worker {
48            _id: id,
49            t: Some(t),
50        }
51    }
52}
53
54pub struct Pool {
55    workers: Vec<Worker>,
56    max_workers: usize,
57    sender: mpsc::Sender<Message>,
58}
59
60impl Pool {
61    pub fn new(max_workers: usize) -> Pool {
62        if max_workers == 0 {
63            println!("max_workers 必须大于0")
64        }
65        let (tx, rx) = mpsc::channel();
66        let mut workers = Vec::with_capacity(max_workers);
67        let receiver = Arc::new(Mutex::new(rx));
68        for i in 0..max_workers {
69            workers.push(Worker::new(i, Arc::clone(&receiver)));
70        }
71        Pool { workers, max_workers, sender: tx }
72    }
73    pub fn execute<F>(&self, f: F)
74    where
75        F: 'static + Send + FnOnce(usize) -> JsonValue,
76    {
77        let job = Message::NewJob(Box::new(f));
78        self.sender.send(job).unwrap();
79    }
80    pub fn end(&mut self) -> JsonValue {
81        for _ in 0..self.max_workers {
82            self.sender.send(Message::End).unwrap();
83        }
84        let mut list = vec![];
85        for w in self.workers.iter_mut() {
86            if let Some(t) = w.t.take() {
87                let data = t.join().unwrap();
88                list.extend(data);
89            }
90        }
91        JsonValue::from(list)
92    }
93    pub fn insert_all(&mut self) -> (Vec<String>, String) {
94        for _ in 0..self.max_workers {
95            self.sender.send(Message::End).unwrap();
96        }
97        let mut list = String::new();
98        let mut id = vec![];
99        for w in self.workers.iter_mut() {
100            if let Some(t) = w.t.take() {
101                let data = t.join().unwrap();
102                for item in data.iter() {
103                    id.push(item[0].to_string());
104                    list = format!("{},{}", list, item[1].clone());
105                }
106            }
107        }
108        (id, list)
109    }
110}