Skip to main content

br_db/
pools.rs

1use json::JsonValue;
2use log::warn;
3use std::sync::{mpsc, Arc, Mutex};
4use std::thread;
5use std::thread::JoinHandle;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10    End,
11    NewJob(Job),
12}
13
14struct Worker {
15    _id: usize,
16    t: Option<JoinHandle<Vec<JsonValue>>>,
17}
18
19impl Worker {
20    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
21        let t = thread::spawn(move || {
22            let mut list = vec![];
23            let mut count_flag: i64 = 0;
24            loop {
25                let message = match receiver.lock() {
26                    Ok(guard) => match guard.recv() {
27                        Ok(msg) => msg,
28                        Err(_) => break,
29                    },
30                    Err(_) => break,
31                };
32                match message {
33                    Message::NewJob(job) => {
34                        list.push(job(id));
35                        count_flag += 1;
36                        if count_flag == 10000 {
37                            warn!("Worker-new循环次数: 1w,强制退出");
38                            break;
39                        }
40                    }
41                    Message::End => {
42                        break;
43                    }
44                }
45            }
46            list
47        });
48        Worker {
49            _id: id,
50            t: Some(t),
51        }
52    }
53}
54
55pub struct Pool {
56    workers: Vec<Worker>,
57    max_workers: usize,
58    sender: mpsc::Sender<Message>,
59}
60
61impl Pool {
62    pub fn new(max_workers: usize) -> Pool {
63        if max_workers == 0 {
64            println!("max_workers 必须大于0")
65        }
66        let (tx, rx) = mpsc::channel();
67        let mut workers = Vec::with_capacity(max_workers);
68        let receiver = Arc::new(Mutex::new(rx));
69        for i in 0..max_workers {
70            workers.push(Worker::new(i, Arc::clone(&receiver)));
71        }
72        Pool {
73            workers,
74            max_workers,
75            sender: tx,
76        }
77    }
78    pub fn execute<F>(&self, f: F)
79    where
80        F: 'static + Send + FnOnce(usize) -> JsonValue,
81    {
82        let job = Message::NewJob(Box::new(f));
83        let _ = self.sender.send(job);
84    }
85    pub fn end(&mut self) -> JsonValue {
86        for _ in 0..self.max_workers {
87            let _ = self.sender.send(Message::End);
88        }
89        let mut list = vec![];
90        for w in self.workers.iter_mut() {
91            if let Some(t) = w.t.take() {
92                if let Ok(data) = t.join() {
93                    list.extend(data);
94                }
95            }
96        }
97        JsonValue::from(list)
98    }
99    pub fn insert_all(&mut self) -> (Vec<String>, String) {
100        for _ in 0..self.max_workers {
101            let _ = self.sender.send(Message::End);
102        }
103        let mut list = String::new();
104        let mut id = vec![];
105        for w in self.workers.iter_mut() {
106            if let Some(t) = w.t.take() {
107                if let Ok(data) = t.join() {
108                    for item in data.iter() {
109                        id.push(item[0].to_string());
110                        list = format!("{},{}", list, item[1].clone());
111                    }
112                }
113            }
114        }
115        (id, list)
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use json::{array, object, JsonValue};
123    use std::sync::atomic::{AtomicUsize, Ordering};
124    use std::sync::Arc;
125
126    #[test]
127    fn new_with_1_worker() {
128        let pool = Pool::new(1);
129        assert_eq!(pool.max_workers, 1);
130        assert_eq!(pool.workers.len(), 1);
131    }
132
133    #[test]
134    fn new_with_2_workers() {
135        let pool = Pool::new(2);
136        assert_eq!(pool.max_workers, 2);
137        assert_eq!(pool.workers.len(), 2);
138    }
139
140    #[test]
141    fn new_with_4_workers() {
142        let pool = Pool::new(4);
143        assert_eq!(pool.max_workers, 4);
144        assert_eq!(pool.workers.len(), 4);
145    }
146
147    #[test]
148    fn new_with_0_workers_does_not_panic() {
149        let pool = Pool::new(0);
150        assert_eq!(pool.max_workers, 0);
151        assert_eq!(pool.workers.len(), 0);
152    }
153
154    #[test]
155    fn execute_single_job_and_end() {
156        let mut pool = Pool::new(1);
157        pool.execute(|_worker_id| JsonValue::from(42));
158        let result = pool.end();
159        assert!(result.is_array());
160        assert_eq!(result.len(), 1);
161        assert_eq!(result[0], 42);
162    }
163
164    #[test]
165    fn execute_multiple_jobs_single_worker() {
166        let mut pool = Pool::new(1);
167        for i in 0..5 {
168            pool.execute(move |_| JsonValue::from(i));
169        }
170        let result = pool.end();
171        assert_eq!(result.len(), 5);
172        let mut values: Vec<i32> = (0..5).map(|i| result[i].as_i32().unwrap()).collect();
173        values.sort();
174        assert_eq!(values, vec![0, 1, 2, 3, 4]);
175    }
176
177    #[test]
178    fn execute_multiple_jobs_multiple_workers() {
179        let mut pool = Pool::new(4);
180        let counter = Arc::new(AtomicUsize::new(0));
181        for _ in 0..20 {
182            let c = Arc::clone(&counter);
183            pool.execute(move |_| {
184                c.fetch_add(1, Ordering::SeqCst);
185                JsonValue::from(1)
186            });
187        }
188        let result = pool.end();
189        assert_eq!(result.len(), 20);
190        assert_eq!(counter.load(Ordering::SeqCst), 20);
191    }
192
193    #[test]
194    fn end_with_no_jobs_returns_empty_array() {
195        let mut pool = Pool::new(2);
196        let result = pool.end();
197        assert!(result.is_array());
198        assert_eq!(result.len(), 0);
199    }
200
201    #[test]
202    fn jobs_receive_valid_worker_id() {
203        let mut pool = Pool::new(4);
204        for _ in 0..20 {
205            pool.execute(|worker_id| {
206                assert!(worker_id < 4);
207                JsonValue::from(worker_id)
208            });
209        }
210        let result = pool.end();
211        assert_eq!(result.len(), 20);
212        for i in 0..result.len() {
213            let wid = result[i].as_usize().unwrap();
214            assert!(wid < 4);
215        }
216    }
217
218    #[test]
219    fn execute_returns_complex_json() {
220        let mut pool = Pool::new(2);
221        pool.execute(|_| object! { "name": "alice", "age": 30 });
222        pool.execute(|_| object! { "name": "bob", "age": 25 });
223        let result = pool.end();
224        assert_eq!(result.len(), 2);
225        let mut names: Vec<String> = (0..2)
226            .map(|i| result[i]["name"].as_str().unwrap().to_string())
227            .collect();
228        names.sort();
229        assert_eq!(names, vec!["alice", "bob"]);
230    }
231
232    #[test]
233    fn insert_all_single_job() {
234        let mut pool = Pool::new(1);
235        pool.execute(|_| array!["id_1", "(1,'hello')"]);
236        let (ids, values) = pool.insert_all();
237        assert_eq!(ids, vec!["id_1"]);
238        assert_eq!(values, ",(1,'hello')");
239    }
240
241    #[test]
242    fn insert_all_multiple_jobs() {
243        let mut pool = Pool::new(1);
244        pool.execute(|_| array!["id_a", "(1,'a')"]);
245        pool.execute(|_| array!["id_b", "(2,'b')"]);
246        pool.execute(|_| array!["id_c", "(3,'c')"]);
247        let (ids, values) = pool.insert_all();
248        assert_eq!(ids.len(), 3);
249        assert!(ids.contains(&"id_a".to_string()));
250        assert!(ids.contains(&"id_b".to_string()));
251        assert!(ids.contains(&"id_c".to_string()));
252        assert!(values.contains("(1,'a')"));
253        assert!(values.contains("(2,'b')"));
254        assert!(values.contains("(3,'c')"));
255    }
256
257    #[test]
258    fn insert_all_no_jobs_returns_empty() {
259        let mut pool = Pool::new(2);
260        let (ids, values) = pool.insert_all();
261        assert!(ids.is_empty());
262        assert!(values.is_empty());
263    }
264
265    #[test]
266    fn insert_all_multiple_workers() {
267        let mut pool = Pool::new(3);
268        for i in 0..9 {
269            let id_str = format!("id_{}", i);
270            let val_str = format!("({},'v{}')", i, i);
271            pool.execute(move |_| array![id_str.as_str(), val_str.as_str()]);
272        }
273        let (ids, values) = pool.insert_all();
274        assert_eq!(ids.len(), 9);
275        for i in 0..9 {
276            assert!(ids.contains(&format!("id_{}", i)));
277            assert!(values.contains(&format!("({},'v{}')", i, i)));
278        }
279    }
280
281    #[test]
282    fn worker_count_matches_requested() {
283        for n in [1, 2, 3, 5, 8] {
284            let mut pool = Pool::new(n);
285            assert_eq!(pool.workers.len(), n);
286            assert_eq!(pool.max_workers, n);
287            pool.end();
288        }
289    }
290}