df_db/
pools.rs

1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10    End,
11    NewJob(Job),
12}
13
14
15struct Worker {
16    _id: usize,
17    t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22        let t = thread::spawn(move || {
23            // let mut index = 0;
24            let mut list = vec![];
25            loop {
26                let message = receiver.lock().unwrap().recv().unwrap();
27                match message {
28                    Message::NewJob(job) => {
29                        list.push(job(id));
30                        // index += 1;
31                    }
32                    Message::End => {
33                        // info!("结束 线程[{}] {}", id, index);
34                        break;
35                    }
36                }
37            }
38            list
39        });
40        Worker {
41            _id: id,
42            t: Some(t),
43        }
44    }
45}
46
47pub struct Pool {
48    workers: Vec<Worker>,
49    max_workers: usize,
50    sender: mpsc::Sender<Message>,
51}
52
53impl Pool {
54    pub fn new(max_workers: usize) -> Pool {
55        if max_workers <= 0 {
56            panic!("max_workers 必须大于0")
57        }
58        let (tx, rx) = mpsc::channel();
59        let mut workers = Vec::with_capacity(max_workers);
60        let receiver = Arc::new(Mutex::new(rx));
61        for i in 0..max_workers {
62            workers.push(Worker::new(i, Arc::clone(&receiver)));
63        }
64        Pool { workers, max_workers, sender: tx }
65    }
66    pub fn execute<F>(&self, f: F) where F: 'static + Send + FnOnce(usize) -> JsonValue {
67        let job = Message::NewJob(Box::new(f));
68        self.sender.send(job).unwrap();
69    }
70    pub fn end(&mut self) -> JsonValue {
71        for _ in 0..self.max_workers {
72            self.sender.send(Message::End).unwrap();
73        }
74        let mut list = vec![];
75        for w in self.workers.iter_mut() {
76            if let Some(t) = w.t.take() {
77                let data = t.join().unwrap();
78                list.extend(data);
79            }
80        }
81        return JsonValue::from(list);
82    }
83    pub fn insert_all(&mut self) -> (Vec<String>, String) {
84        for _ in 0..self.max_workers {
85            self.sender.send(Message::End).unwrap();
86        }
87        let mut list = String::new();
88        let mut id = vec![];
89        for w in self.workers.iter_mut() {
90            if let Some(t) = w.t.take() {
91                let data = t.join().unwrap();
92                for item in data.iter() {
93                    id.push(item[0].to_string());
94                    list = format!("{},{}", list, item[1].clone());
95                }
96            }
97        }
98        return (id, list);
99    }
100}