br_db/
pools.rs

1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5use log::warn;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10    End,
11    NewJob(Job),
12}
13
14
15struct Worker {
16    _id: usize,
17    t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22        let t = thread::spawn(move || {
23            let mut list = vec![];
24            // 计算loop执行次数
25            let mut count_flag: i64 = 0;
26            loop {
27                let message = receiver.lock().unwrap().recv().unwrap();
28                match message {
29                    Message::NewJob(job) => {
30                        list.push(job(id));
31                        // index += 1;
32                        count_flag += 1;
33                        if count_flag == 10000 {
34                            warn!("Worker-new循环次数: 1w,强制退出");
35                            break;
36                        }
37                    }
38                    Message::End => {
39                        // info!("结束 线程[{}] {}", id, index);
40                        break;
41                    }
42                }
43            }
44            list
45        });
46        Worker {
47            _id: id,
48            t: Some(t),
49        }
50    }
51}
52
53pub struct Pool {
54    workers: Vec<Worker>,
55    max_workers: usize,
56    sender: mpsc::Sender<Message>,
57}
58
59impl Pool {
60    pub fn new(max_workers: usize) -> Pool {
61        if max_workers == 0 {
62            println!("max_workers 必须大于0")
63        }
64        let (tx, rx) = mpsc::channel();
65        let mut workers = Vec::with_capacity(max_workers);
66        let receiver = Arc::new(Mutex::new(rx));
67        for i in 0..max_workers {
68            workers.push(Worker::new(i, Arc::clone(&receiver)));
69        }
70        Pool { workers, max_workers, sender: tx }
71    }
72    pub fn execute<F>(&self, f: F)
73    where
74        F: 'static + Send + FnOnce(usize) -> JsonValue,
75    {
76        let job = Message::NewJob(Box::new(f));
77        self.sender.send(job).unwrap();
78    }
79    pub fn end(&mut self) -> JsonValue {
80        for _ in 0..self.max_workers {
81            self.sender.send(Message::End).unwrap();
82        }
83        let mut list = vec![];
84        for w in self.workers.iter_mut() {
85            if let Some(t) = w.t.take() {
86                let data = t.join().unwrap();
87                list.extend(data);
88            }
89        }
90        JsonValue::from(list)
91    }
92    pub fn insert_all(&mut self) -> (Vec<String>, String) {
93        for _ in 0..self.max_workers {
94            self.sender.send(Message::End).unwrap();
95        }
96        let mut list = String::new();
97        let mut id = vec![];
98        for w in self.workers.iter_mut() {
99            if let Some(t) = w.t.take() {
100                let data = t.join().unwrap();
101                for item in data.iter() {
102                    id.push(item[0].to_string());
103                    list = format!("{},{}", list, item[1].clone());
104                }
105            }
106        }
107        (id, list)
108    }
109}