br_db/
pools.rs

1use std::sync::{Arc, mpsc, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4use json::{JsonValue};
5use log::warn;
6
7type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
8
9enum Message {
10    End,
11    NewJob(Job),
12}
13
14
15struct Worker {
16    _id: usize,
17    t: Option<JoinHandle<Vec<JsonValue>>>,
18}
19
20impl Worker {
21    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
22        let t = thread::spawn(move || {
23            let mut list = vec![];
24            // 计算loop执行次数
25            let mut count_flag: i64 = 0;
26            loop {
27                // 改进错误处理,避免 unwrap
28                let message = match receiver.lock() {
29                    Ok(guard) => match guard.recv() {
30                        Ok(msg) => msg,
31                        Err(e) => {
32                            warn!("Worker {} 接收消息失败: {}", id, e);
33                            break;
34                        }
35                    }
36                    Err(e) => {
37                        warn!("Worker {} 获取锁失败: {}", id, e);
38                        break;
39                    }
40                };
41                
42                match message {
43                    Message::NewJob(job) => {
44                        list.push(job(id));
45                        count_flag += 1;
46                        if count_flag == 10000 {
47                            warn!("Worker {} 循环次数: 1w,强制退出", id);
48                            break;
49                        }
50                    }
51                    Message::End => {
52                        break;
53                    }
54                }
55            }
56            list
57        });
58        Worker {
59            _id: id,
60            t: Some(t),
61        }
62    }
63}
64
65pub struct Pool {
66    workers: Vec<Worker>,
67    max_workers: usize,
68    sender: mpsc::Sender<Message>,
69}
70
71impl Pool {
72    pub fn new(max_workers: usize) -> Pool {
73        if max_workers == 0 {
74            println!("max_workers 必须大于0")
75        }
76        let (tx, rx) = mpsc::channel();
77        let mut workers = Vec::with_capacity(max_workers);
78        let receiver = Arc::new(Mutex::new(rx));
79        for i in 0..max_workers {
80            workers.push(Worker::new(i, Arc::clone(&receiver)));
81        }
82        Pool { workers, max_workers, sender: tx }
83    }
84    pub fn execute<F>(&self, f: F)
85    where
86        F: 'static + Send + FnOnce(usize) -> JsonValue,
87    {
88        let job = Message::NewJob(Box::new(f));
89        if let Err(e) = self.sender.send(job) {
90            warn!("发送任务失败: {}", e);
91        }
92    }
93    pub fn end(&mut self) -> JsonValue {
94        // 改进错误处理
95        for _ in 0..self.max_workers {
96            if let Err(e) = self.sender.send(Message::End) {
97                warn!("发送结束消息失败: {}", e);
98            }
99        }
100        let mut list = Vec::new();
101        for w in self.workers.iter_mut() {
102            if let Some(t) = w.t.take() {
103                match t.join() {
104                    Ok(data) => {
105                        list.extend(data);
106                    }
107                    Err(e) => {
108                        warn!("线程连接失败: {:?}", e);
109                    }
110                }
111            }
112        }
113        JsonValue::from(list)
114    }
115    pub fn insert_all(&mut self) -> (Vec<String>, String) {
116        for _ in 0..self.max_workers {
117            if let Err(e) = self.sender.send(Message::End) {
118                warn!("发送结束消息失败: {}", e);
119            }
120        }
121        // 预分配容量,减少重新分配
122        let mut id = Vec::new();
123        let mut list_parts = Vec::new();
124        
125        for w in self.workers.iter_mut() {
126            if let Some(t) = w.t.take() {
127                match t.join() {
128                    Ok(data) => {
129                        for item in data.iter() {
130                            id.push(item[0].to_string());
131                            list_parts.push(item[1].to_string());
132                        }
133                    }
134                    Err(e) => {
135                        warn!("线程连接失败: {:?}", e);
136                    }
137                }
138            }
139        }
140        // 使用 join 而不是 format!,更高效
141        let list = list_parts.join(",");
142        (id, list)
143    }
144}