Skip to main content

exiftool_rs_wrapper/
pool.rs

1//! 连接池支持模块
2//!
3//! 用于高并发场景下的性能优化
4
5use crate::ExifTool;
6use crate::error::{Error, Result};
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11/// ExifTool 连接池
12#[derive(Debug)]
13pub struct ExifToolPool {
14    /// 连接池
15    connections: Arc<Mutex<VecDeque<ExifTool>>>,
16    /// 池大小
17    size: usize,
18}
19
20impl ExifToolPool {
21    /// 创建新的连接池
22    pub fn new(size: usize) -> Result<Self> {
23        if size == 0 {
24            return Err(Error::invalid_arg("Pool size must be greater than 0"));
25        }
26
27        let mut connections = VecDeque::with_capacity(size);
28
29        for _ in 0..size {
30            let exiftool = ExifTool::new()?;
31            connections.push_back(exiftool);
32        }
33
34        Ok(Self {
35            connections: Arc::new(Mutex::new(connections)),
36            size,
37        })
38    }
39
40    /// 获取池大小
41    pub fn size(&self) -> usize {
42        self.size
43    }
44
45    /// 获取可用连接数
46    pub fn available(&self) -> Result<usize> {
47        let connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
48        Ok(connections.len())
49    }
50
51    /// 获取连接
52    pub fn acquire(&self) -> Result<PoolConnection> {
53        let mut connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
54
55        if let Some(exiftool) = connections.pop_front() {
56            Ok(PoolConnection {
57                exiftool: Some(exiftool),
58                pool: Arc::clone(&self.connections),
59            })
60        } else {
61            Err(Error::process("No available connections in pool"))
62        }
63    }
64
65    /// 在超时时间内等待获取连接
66    pub fn acquire_timeout(&self, timeout: Duration) -> Result<PoolConnection> {
67        let start = Instant::now();
68
69        loop {
70            if let Some(conn) = self.try_acquire() {
71                return Ok(conn);
72            }
73
74            if start.elapsed() >= timeout {
75                return Err(Error::Timeout);
76            }
77
78            std::thread::sleep(Duration::from_millis(5));
79        }
80    }
81
82    /// 尝试获取连接(非阻塞)
83    pub fn try_acquire(&self) -> Option<PoolConnection> {
84        self.acquire().ok()
85    }
86
87    /// 关闭所有连接
88    pub fn close(&self) -> Result<()> {
89        let mut connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
90
91        while let Some(exiftool) = connections.pop_front() {
92            let _ = exiftool.close();
93        }
94
95        Ok(())
96    }
97}
98
99impl Clone for ExifToolPool {
100    fn clone(&self) -> Self {
101        Self {
102            connections: Arc::clone(&self.connections),
103            size: self.size,
104        }
105    }
106}
107
108/// 池连接包装器
109///
110/// 当此对象被丢弃时,连接会自动归还到池中
111pub struct PoolConnection {
112    exiftool: Option<ExifTool>,
113    pool: Arc<Mutex<VecDeque<ExifTool>>>,
114}
115
116impl PoolConnection {
117    /// 获取内部 ExifTool 的引用
118    pub fn get(&self) -> Option<&ExifTool> {
119        self.exiftool.as_ref()
120    }
121
122    /// 获取内部 ExifTool 的可变引用
123    pub fn get_mut(&mut self) -> Option<&mut ExifTool> {
124        self.exiftool.as_mut()
125    }
126}
127
128impl Drop for PoolConnection {
129    fn drop(&mut self) {
130        if let Some(exiftool) = self.exiftool.take()
131            && let Ok(mut pool) = self.pool.lock()
132        {
133            pool.push_back(exiftool);
134        }
135        // 如果锁被污染,连接会被丢弃
136    }
137}
138
139/// 使用连接池的辅助函数
140///
141/// 获取连接,执行操作,自动归还连接
142pub fn with_pool<F, R>(pool: &ExifToolPool, f: F) -> Result<R>
143where
144    F: FnOnce(&mut ExifTool) -> Result<R>,
145{
146    let mut conn = pool.acquire()?;
147    let exiftool = conn
148        .get_mut()
149        .ok_or_else(|| Error::process("Failed to get connection"))?;
150    f(exiftool)
151}
152
153/// 批量处理使用连接池
154pub fn batch_with_pool<P, F, R>(pool: &ExifToolPool, items: Vec<P>, processor: F) -> Vec<Result<R>>
155where
156    P: Send + 'static,
157    F: Fn(&mut ExifTool, P) -> Result<R> + Send + Sync + 'static,
158    R: Send + 'static,
159{
160    use std::thread;
161
162    let total = items.len();
163    if total == 0 {
164        return Vec::new();
165    }
166
167    let processor = Arc::new(processor);
168    let pool = pool.clone();
169    let workers = pool.size().min(total);
170    let queue: Arc<Mutex<VecDeque<(usize, P)>>> =
171        Arc::new(Mutex::new(items.into_iter().enumerate().collect()));
172    let results: Arc<Mutex<Vec<Option<Result<R>>>>> = Arc::new(Mutex::new(
173        std::iter::repeat_with(|| None).take(total).collect(),
174    ));
175    let mut handles = Vec::with_capacity(workers);
176
177    for _ in 0..workers {
178        let pool = pool.clone();
179        let processor = Arc::clone(&processor);
180        let queue = Arc::clone(&queue);
181        let results = Arc::clone(&results);
182
183        let handle = thread::spawn(move || {
184            loop {
185                let next = {
186                    let mut queue = match queue.lock() {
187                        Ok(q) => q,
188                        Err(_) => return,
189                    };
190                    queue.pop_front()
191                };
192
193                let Some((index, item)) = next else {
194                    return;
195                };
196
197                let result = match pool.acquire_timeout(Duration::from_secs(30)) {
198                    Ok(mut conn) => match conn.get_mut() {
199                        Some(exiftool) => processor(exiftool, item),
200                        None => Err(Error::process("Failed to get connection")),
201                    },
202                    Err(e) => Err(e),
203                };
204
205                if let Ok(mut all_results) = results.lock() {
206                    all_results[index] = Some(result);
207                }
208            }
209        });
210
211        handles.push(handle);
212    }
213
214    for handle in handles {
215        if handle.join().is_err() {
216            return vec![Err(Error::process("Thread panicked"))];
217        }
218    }
219
220    let mut out = Vec::with_capacity(total);
221    if let Ok(mut locked) = results.lock() {
222        for item in locked.drain(..) {
223            out.push(item.unwrap_or_else(|| Err(Error::process("Missing batch result"))));
224        }
225        return out;
226    }
227
228    vec![Err(Error::MutexPoisoned)]
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    #[test]
236    fn test_pool_creation() {
237        match ExifToolPool::new(2) {
238            Ok(pool) => {
239                assert_eq!(pool.size(), 2);
240                assert_eq!(pool.available().unwrap(), 2);
241            }
242            Err(Error::ExifToolNotFound) => {
243                println!("⚠ ExifTool not found, skipping test");
244            }
245            Err(e) => panic!("Unexpected error: {:?}", e),
246        }
247    }
248
249    #[test]
250    fn test_pool_acquire() {
251        match ExifToolPool::new(2) {
252            Ok(pool) => {
253                // 获取两个连接
254                let _conn1 = pool.acquire().unwrap();
255                let _conn2 = pool.acquire().unwrap();
256
257                // 池应该空了
258                assert_eq!(pool.available().unwrap(), 0);
259            }
260            Err(Error::ExifToolNotFound) => {
261                println!("⚠ ExifTool not found, skipping test");
262            }
263            Err(e) => panic!("Unexpected error: {:?}", e),
264        }
265    }
266
267    #[test]
268    fn test_pool_connection_return() {
269        match ExifToolPool::new(1) {
270            Ok(pool) => {
271                {
272                    let _conn = pool.acquire().unwrap();
273                    assert_eq!(pool.available().unwrap(), 0);
274                }
275                // 连接应该已归还
276                assert_eq!(pool.available().unwrap(), 1);
277            }
278            Err(Error::ExifToolNotFound) => {
279                println!("⚠ ExifTool not found, skipping test");
280            }
281            Err(e) => panic!("Unexpected error: {:?}", e),
282        }
283    }
284}