Skip to main content

exiftool_rs_wrapper/
pool.rs

1//! 连接池支持模块
2//!
3//! 用于高并发场景下的性能优化
4
5use crate::ExifTool;
6use crate::error::{Error, Result};
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9
10/// ExifTool 连接池
11#[derive(Debug)]
12pub struct ExifToolPool {
13    /// 连接池
14    connections: Arc<Mutex<VecDeque<ExifTool>>>,
15    /// 池大小
16    size: usize,
17}
18
19impl ExifToolPool {
20    /// 创建新的连接池
21    pub fn new(size: usize) -> Result<Self> {
22        if size == 0 {
23            return Err(Error::invalid_arg("Pool size must be greater than 0"));
24        }
25
26        let mut connections = VecDeque::with_capacity(size);
27
28        for _ in 0..size {
29            let exiftool = ExifTool::new()?;
30            connections.push_back(exiftool);
31        }
32
33        Ok(Self {
34            connections: Arc::new(Mutex::new(connections)),
35            size,
36        })
37    }
38
39    /// 获取池大小
40    pub fn size(&self) -> usize {
41        self.size
42    }
43
44    /// 获取可用连接数
45    pub fn available(&self) -> Result<usize> {
46        let connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
47        Ok(connections.len())
48    }
49
50    /// 获取连接
51    pub fn acquire(&self) -> Result<PoolConnection> {
52        let mut connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
53
54        if let Some(exiftool) = connections.pop_front() {
55            Ok(PoolConnection {
56                exiftool: Some(exiftool),
57                pool: Arc::clone(&self.connections),
58            })
59        } else {
60            Err(Error::process("No available connections in pool"))
61        }
62    }
63
64    /// 尝试获取连接(非阻塞)
65    pub fn try_acquire(&self) -> Option<PoolConnection> {
66        self.acquire().ok()
67    }
68
69    /// 关闭所有连接
70    pub fn close(&self) -> Result<()> {
71        let mut connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
72
73        while let Some(exiftool) = connections.pop_front() {
74            let _ = exiftool.close();
75        }
76
77        Ok(())
78    }
79}
80
81impl Clone for ExifToolPool {
82    fn clone(&self) -> Self {
83        Self {
84            connections: Arc::clone(&self.connections),
85            size: self.size,
86        }
87    }
88}
89
90/// 池连接包装器
91///
92/// 当此对象被丢弃时,连接会自动归还到池中
93pub struct PoolConnection {
94    exiftool: Option<ExifTool>,
95    pool: Arc<Mutex<VecDeque<ExifTool>>>,
96}
97
98impl PoolConnection {
99    /// 获取内部 ExifTool 的引用
100    pub fn get(&self) -> Option<&ExifTool> {
101        self.exiftool.as_ref()
102    }
103
104    /// 获取内部 ExifTool 的可变引用
105    pub fn get_mut(&mut self) -> Option<&mut ExifTool> {
106        self.exiftool.as_mut()
107    }
108}
109
110impl Drop for PoolConnection {
111    fn drop(&mut self) {
112        if let Some(exiftool) = self.exiftool.take()
113            && let Ok(mut pool) = self.pool.lock()
114        {
115            pool.push_back(exiftool);
116        }
117        // 如果锁被污染,连接会被丢弃
118    }
119}
120
121/// 使用连接池的辅助函数
122///
123/// 获取连接,执行操作,自动归还连接
124pub fn with_pool<F, R>(pool: &ExifToolPool, f: F) -> Result<R>
125where
126    F: FnOnce(&mut ExifTool) -> Result<R>,
127{
128    let mut conn = pool.acquire()?;
129    let exiftool = conn
130        .get_mut()
131        .ok_or_else(|| Error::process("Failed to get connection"))?;
132    f(exiftool)
133}
134
135/// 批量处理使用连接池
136pub fn batch_with_pool<P, F, R>(pool: &ExifToolPool, items: Vec<P>, processor: F) -> Vec<Result<R>>
137where
138    P: Send + 'static,
139    F: Fn(&mut ExifTool, P) -> Result<R> + Send + Sync + 'static,
140    R: Send + 'static,
141{
142    use std::thread;
143
144    let processor = Arc::new(processor);
145    let pool = pool.clone();
146    let mut handles = Vec::with_capacity(items.len());
147
148    for item in items {
149        let pool = pool.clone();
150        let processor = Arc::clone(&processor);
151
152        let handle = thread::spawn(move || with_pool(&pool, |exiftool| processor(exiftool, item)));
153
154        handles.push(handle);
155    }
156
157    handles
158        .into_iter()
159        .map(|h| {
160            h.join()
161                .unwrap_or_else(|_| Err(Error::process("Thread panicked")))
162        })
163        .collect()
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_pool_creation() {
172        match ExifToolPool::new(2) {
173            Ok(pool) => {
174                assert_eq!(pool.size(), 2);
175                assert_eq!(pool.available().unwrap(), 2);
176            }
177            Err(Error::ExifToolNotFound) => {
178                println!("⚠ ExifTool not found, skipping test");
179            }
180            Err(e) => panic!("Unexpected error: {:?}", e),
181        }
182    }
183
184    #[test]
185    fn test_pool_acquire() {
186        match ExifToolPool::new(2) {
187            Ok(pool) => {
188                // 获取两个连接
189                let _conn1 = pool.acquire().unwrap();
190                let _conn2 = pool.acquire().unwrap();
191
192                // 池应该空了
193                assert_eq!(pool.available().unwrap(), 0);
194            }
195            Err(Error::ExifToolNotFound) => {
196                println!("⚠ ExifTool not found, skipping test");
197            }
198            Err(e) => panic!("Unexpected error: {:?}", e),
199        }
200    }
201
202    #[test]
203    fn test_pool_connection_return() {
204        match ExifToolPool::new(1) {
205            Ok(pool) => {
206                {
207                    let _conn = pool.acquire().unwrap();
208                    assert_eq!(pool.available().unwrap(), 0);
209                }
210                // 连接应该已归还
211                assert_eq!(pool.available().unwrap(), 1);
212            }
213            Err(Error::ExifToolNotFound) => {
214                println!("⚠ ExifTool not found, skipping test");
215            }
216            Err(e) => panic!("Unexpected error: {:?}", e),
217        }
218    }
219}