exiftool_rs_wrapper/
pool.rs1use crate::ExifTool;
6use crate::error::{Error, Result};
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11#[derive(Debug)]
13pub struct ExifToolPool {
14 connections: Arc<Mutex<VecDeque<ExifTool>>>,
16 size: usize,
18}
19
20impl ExifToolPool {
21 pub fn new(size: usize) -> Result<Self> {
23 if size == 0 {
24 return Err(Error::invalid_arg("Pool size must be greater than 0"));
25 }
26
27 let mut connections = VecDeque::with_capacity(size);
28
29 for _ in 0..size {
30 let exiftool = ExifTool::new()?;
31 connections.push_back(exiftool);
32 }
33
34 Ok(Self {
35 connections: Arc::new(Mutex::new(connections)),
36 size,
37 })
38 }
39
40 pub fn size(&self) -> usize {
42 self.size
43 }
44
45 pub fn available(&self) -> Result<usize> {
47 let connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
48 Ok(connections.len())
49 }
50
51 pub fn acquire(&self) -> Result<PoolConnection> {
53 let mut connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
54
55 if let Some(exiftool) = connections.pop_front() {
56 Ok(PoolConnection {
57 exiftool: Some(exiftool),
58 pool: Arc::clone(&self.connections),
59 })
60 } else {
61 Err(Error::process("No available connections in pool"))
62 }
63 }
64
65 pub fn acquire_timeout(&self, timeout: Duration) -> Result<PoolConnection> {
67 let start = Instant::now();
68
69 loop {
70 if let Some(conn) = self.try_acquire() {
71 return Ok(conn);
72 }
73
74 if start.elapsed() >= timeout {
75 return Err(Error::Timeout);
76 }
77
78 std::thread::sleep(Duration::from_millis(5));
79 }
80 }
81
82 pub fn try_acquire(&self) -> Option<PoolConnection> {
84 self.acquire().ok()
85 }
86
87 pub fn close(&self) -> Result<()> {
89 let mut connections = self.connections.lock().map_err(|_| Error::MutexPoisoned)?;
90
91 while let Some(exiftool) = connections.pop_front() {
92 let _ = exiftool.close();
93 }
94
95 Ok(())
96 }
97}
98
99impl Clone for ExifToolPool {
100 fn clone(&self) -> Self {
101 Self {
102 connections: Arc::clone(&self.connections),
103 size: self.size,
104 }
105 }
106}
107
108pub struct PoolConnection {
112 exiftool: Option<ExifTool>,
113 pool: Arc<Mutex<VecDeque<ExifTool>>>,
114}
115
116impl PoolConnection {
117 pub fn get(&self) -> Option<&ExifTool> {
119 self.exiftool.as_ref()
120 }
121
122 pub fn get_mut(&mut self) -> Option<&mut ExifTool> {
124 self.exiftool.as_mut()
125 }
126}
127
128impl Drop for PoolConnection {
129 fn drop(&mut self) {
130 if let Some(exiftool) = self.exiftool.take()
131 && let Ok(mut pool) = self.pool.lock()
132 {
133 pool.push_back(exiftool);
134 }
135 }
137}
138
139pub fn with_pool<F, R>(pool: &ExifToolPool, f: F) -> Result<R>
143where
144 F: FnOnce(&mut ExifTool) -> Result<R>,
145{
146 let mut conn = pool.acquire()?;
147 let exiftool = conn
148 .get_mut()
149 .ok_or_else(|| Error::process("Failed to get connection"))?;
150 f(exiftool)
151}
152
153pub fn batch_with_pool<P, F, R>(pool: &ExifToolPool, items: Vec<P>, processor: F) -> Vec<Result<R>>
155where
156 P: Send + 'static,
157 F: Fn(&mut ExifTool, P) -> Result<R> + Send + Sync + 'static,
158 R: Send + 'static,
159{
160 use std::thread;
161
162 let total = items.len();
163 if total == 0 {
164 return Vec::new();
165 }
166
167 let processor = Arc::new(processor);
168 let pool = pool.clone();
169 let workers = pool.size().min(total);
170 let queue: Arc<Mutex<VecDeque<(usize, P)>>> =
171 Arc::new(Mutex::new(items.into_iter().enumerate().collect()));
172 let results: Arc<Mutex<Vec<Option<Result<R>>>>> = Arc::new(Mutex::new(
173 std::iter::repeat_with(|| None).take(total).collect(),
174 ));
175 let mut handles = Vec::with_capacity(workers);
176
177 for _ in 0..workers {
178 let pool = pool.clone();
179 let processor = Arc::clone(&processor);
180 let queue = Arc::clone(&queue);
181 let results = Arc::clone(&results);
182
183 let handle = thread::spawn(move || {
184 loop {
185 let next = {
186 let mut queue = match queue.lock() {
187 Ok(q) => q,
188 Err(_) => return,
189 };
190 queue.pop_front()
191 };
192
193 let Some((index, item)) = next else {
194 return;
195 };
196
197 let result = match pool.acquire_timeout(Duration::from_secs(30)) {
198 Ok(mut conn) => match conn.get_mut() {
199 Some(exiftool) => processor(exiftool, item),
200 None => Err(Error::process("Failed to get connection")),
201 },
202 Err(e) => Err(e),
203 };
204
205 if let Ok(mut all_results) = results.lock() {
206 all_results[index] = Some(result);
207 }
208 }
209 });
210
211 handles.push(handle);
212 }
213
214 for handle in handles {
215 if handle.join().is_err() {
216 return vec![Err(Error::process("Thread panicked"))];
217 }
218 }
219
220 let mut out = Vec::with_capacity(total);
221 if let Ok(mut locked) = results.lock() {
222 for item in locked.drain(..) {
223 out.push(item.unwrap_or_else(|| Err(Error::process("Missing batch result"))));
224 }
225 return out;
226 }
227
228 vec![Err(Error::MutexPoisoned)]
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 #[test]
236 fn test_pool_creation() {
237 match ExifToolPool::new(2) {
238 Ok(pool) => {
239 assert_eq!(pool.size(), 2);
240 assert_eq!(pool.available().unwrap(), 2);
241 }
242 Err(Error::ExifToolNotFound) => {
243 println!("⚠ ExifTool not found, skipping test");
244 }
245 Err(e) => panic!("Unexpected error: {:?}", e),
246 }
247 }
248
249 #[test]
250 fn test_pool_acquire() {
251 match ExifToolPool::new(2) {
252 Ok(pool) => {
253 let _conn1 = pool.acquire().unwrap();
255 let _conn2 = pool.acquire().unwrap();
256
257 assert_eq!(pool.available().unwrap(), 0);
259 }
260 Err(Error::ExifToolNotFound) => {
261 println!("⚠ ExifTool not found, skipping test");
262 }
263 Err(e) => panic!("Unexpected error: {:?}", e),
264 }
265 }
266
267 #[test]
268 fn test_pool_connection_return() {
269 match ExifToolPool::new(1) {
270 Ok(pool) => {
271 {
272 let _conn = pool.acquire().unwrap();
273 assert_eq!(pool.available().unwrap(), 0);
274 }
275 assert_eq!(pool.available().unwrap(), 1);
277 }
278 Err(Error::ExifToolNotFound) => {
279 println!("⚠ ExifTool not found, skipping test");
280 }
281 Err(e) => panic!("Unexpected error: {:?}", e),
282 }
283 }
284}