1use crate::ExifTool;
6use crate::error::Result;
7use std::fmt;
8use std::io::{self, Read};
9use std::path::Path;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13pub type ProgressCallback = Arc<dyn Fn(usize, usize) -> bool + Send + Sync>;
15
16#[derive(Clone)]
18pub struct StreamOptions {
19 pub buffer_size: usize,
21 pub progress_callback: Option<ProgressCallback>,
23 pub timeout: Option<u64>,
25}
26
27impl fmt::Debug for StreamOptions {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("StreamOptions")
30 .field("buffer_size", &self.buffer_size)
31 .field("has_callback", &self.progress_callback.is_some())
32 .field("timeout", &self.timeout)
33 .finish()
34 }
35}
36
37impl Default for StreamOptions {
38 fn default() -> Self {
39 Self {
40 buffer_size: 64 * 1024, progress_callback: None,
42 timeout: None,
43 }
44 }
45}
46
47impl StreamOptions {
48 pub fn new() -> Self {
50 Self::default()
51 }
52
53 pub fn buffer_size(mut self, size: usize) -> Self {
55 self.buffer_size = size;
56 self
57 }
58
59 pub fn on_progress<F>(mut self, callback: F) -> Self
61 where
62 F: Fn(usize, usize) -> bool + Send + Sync + 'static,
63 {
64 self.progress_callback = Some(Arc::new(callback));
65 self
66 }
67
68 pub fn timeout(mut self, seconds: u64) -> Self {
70 self.timeout = Some(seconds);
71 self
72 }
73}
74
75pub struct ProgressTracker {
77 total: AtomicU64,
79 processed: AtomicU64,
81 callback: Option<ProgressCallback>,
83 cancelled: std::sync::atomic::AtomicBool,
85}
86
87impl fmt::Debug for ProgressTracker {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 f.debug_struct("ProgressTracker")
90 .field("total", &self.total.load(Ordering::SeqCst))
91 .field("processed", &self.processed.load(Ordering::SeqCst))
92 .field("has_callback", &self.callback.is_some())
93 .field("cancelled", &self.cancelled.load(Ordering::SeqCst))
94 .finish()
95 }
96}
97
98impl ProgressTracker {
99 pub fn new(total: usize, callback: Option<ProgressCallback>) -> Self {
101 Self {
102 total: AtomicU64::new(total as u64),
103 processed: AtomicU64::new(0),
104 callback,
105 cancelled: std::sync::atomic::AtomicBool::new(false),
106 }
107 }
108
109 pub fn update(&self, bytes: usize) {
111 let processed = self.processed.fetch_add(bytes as u64, Ordering::SeqCst) + bytes as u64;
112 let total = self.total.load(Ordering::SeqCst);
113
114 if let Some(ref callback) = self.callback
115 && !callback(processed as usize, total as usize)
116 {
117 self.cancelled.store(true, Ordering::SeqCst);
118 }
119 }
120
121 pub fn is_cancelled(&self) -> bool {
123 self.cancelled.load(Ordering::SeqCst)
124 }
125
126 pub fn percentage(&self) -> f64 {
128 let processed = self.processed.load(Ordering::SeqCst);
129 let total = self.total.load(Ordering::SeqCst);
130
131 if total == 0 {
132 0.0
133 } else {
134 (processed as f64 / total as f64) * 100.0
135 }
136 }
137
138 pub fn processed(&self) -> u64 {
140 self.processed.load(Ordering::SeqCst)
141 }
142
143 pub fn total(&self) -> u64 {
145 self.total.load(Ordering::SeqCst)
146 }
147}
148
149pub struct ProgressReader<R: Read> {
151 inner: R,
152 tracker: Arc<ProgressTracker>,
153 #[allow(dead_code)]
154 buffer_size: usize,
155}
156
157impl<R: Read> ProgressReader<R> {
158 pub fn new(inner: R, tracker: Arc<ProgressTracker>, buffer_size: usize) -> Self {
160 Self {
161 inner,
162 tracker,
163 buffer_size,
164 }
165 }
166
167 pub fn is_cancelled(&self) -> bool {
169 self.tracker.is_cancelled()
170 }
171}
172
173impl<R: Read> Read for ProgressReader<R> {
174 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
175 if self.is_cancelled() {
176 return Err(io::Error::new(
177 io::ErrorKind::Interrupted,
178 "Operation cancelled",
179 ));
180 }
181
182 let n = self.inner.read(buf)?;
183 self.tracker.update(n);
184 Ok(n)
185 }
186}
187
188pub trait StreamingOperations {
190 fn process_streaming<P, F, R>(
192 &self,
193 path: P,
194 options: &StreamOptions,
195 processor: F,
196 ) -> Result<R>
197 where
198 P: AsRef<Path>,
199 F: FnMut(&mut dyn Read) -> Result<R>;
200
201 fn process_batch_with_progress<P, F>(
203 &self,
204 paths: &[P],
205 options: &StreamOptions,
206 processor: F,
207 ) -> Vec<Result<()>>
208 where
209 P: AsRef<Path>,
210 F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>;
211}
212
213impl StreamingOperations for ExifTool {
214 fn process_streaming<P, F, R>(
215 &self,
216 _path: P,
217 _options: &StreamOptions,
218 _processor: F,
219 ) -> Result<R>
220 where
221 P: AsRef<Path>,
222 F: FnMut(&mut dyn Read) -> Result<R>,
223 {
224 todo!("Streaming mode requires special handling with ExifTool process")
227 }
228
229 fn process_batch_with_progress<P, F>(
230 &self,
231 paths: &[P],
232 options: &StreamOptions,
233 processor: F,
234 ) -> Vec<Result<()>>
235 where
236 P: AsRef<Path>,
237 F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>,
238 {
239 let total = paths.len();
240 let tracker = Arc::new(ProgressTracker::new(
241 total,
242 options.progress_callback.clone(),
243 ));
244
245 let mut results = Vec::with_capacity(total);
246 let mut processor = processor;
247
248 for path in paths {
249 let result = processor(self, path.as_ref(), &tracker);
250 tracker.update(1);
251 results.push(result);
252 }
253
254 results
255 }
256}
257
258pub struct Cache<K, V> {
260 inner: std::sync::Mutex<lru::LruCache<K, V>>,
262 hits: AtomicU64,
264 misses: AtomicU64,
266}
267
268impl<K, V> fmt::Debug for Cache<K, V> {
269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270 f.debug_struct("Cache")
271 .field("hits", &self.hits.load(Ordering::SeqCst))
272 .field("misses", &self.misses.load(Ordering::SeqCst))
273 .finish()
274 }
275}
276
277impl<K: std::hash::Hash + Eq + Clone, V: Clone> Cache<K, V> {
278 pub fn new(capacity: usize) -> Self {
280 use std::num::NonZeroUsize;
281 let capacity = NonZeroUsize::new(capacity.max(1)).unwrap();
282 Self {
283 inner: std::sync::Mutex::new(lru::LruCache::new(capacity)),
284 hits: AtomicU64::new(0),
285 misses: AtomicU64::new(0),
286 }
287 }
288
289 pub fn get(&self, key: &K) -> Option<V> {
291 let mut cache = self.inner.lock().ok()?;
292
293 if let Some(value) = cache.get(key) {
294 self.hits.fetch_add(1, Ordering::SeqCst);
295 Some(value.clone())
296 } else {
297 self.misses.fetch_add(1, Ordering::SeqCst);
298 None
299 }
300 }
301
302 pub fn put(&self, key: K, value: V) {
304 if let Ok(mut cache) = self.inner.lock() {
305 cache.put(key, value);
306 }
307 }
308
309 pub fn hit_rate(&self) -> f64 {
311 let hits = self.hits.load(Ordering::SeqCst);
312 let misses = self.misses.load(Ordering::SeqCst);
313 let total = hits + misses;
314
315 if total == 0 {
316 0.0
317 } else {
318 (hits as f64 / total as f64) * 100.0
319 }
320 }
321
322 pub fn clear(&self) {
324 if let Ok(mut cache) = self.inner.lock() {
325 cache.clear();
326 }
327 self.hits.store(0, Ordering::SeqCst);
328 self.misses.store(0, Ordering::SeqCst);
329 }
330}
331
332#[derive(Debug, Default)]
334pub struct PerformanceStats {
335 pub total_operations: AtomicU64,
337 pub successful_operations: AtomicU64,
339 pub failed_operations: AtomicU64,
341 pub total_time_us: AtomicU64,
343}
344
345impl PerformanceStats {
346 pub fn record(&self, success: bool, elapsed_us: u64) {
348 self.total_operations.fetch_add(1, Ordering::SeqCst);
349 self.total_time_us.fetch_add(elapsed_us, Ordering::SeqCst);
350
351 if success {
352 self.successful_operations.fetch_add(1, Ordering::SeqCst);
353 } else {
354 self.failed_operations.fetch_add(1, Ordering::SeqCst);
355 }
356 }
357
358 pub fn avg_time_us(&self) -> u64 {
360 let total = self.total_operations.load(Ordering::SeqCst);
361 let time = self.total_time_us.load(Ordering::SeqCst);
362
363 if total == 0 { 0 } else { time / total }
364 }
365
366 pub fn success_rate(&self) -> f64 {
368 let total = self.total_operations.load(Ordering::SeqCst);
369 let success = self.successful_operations.load(Ordering::SeqCst);
370
371 if total == 0 {
372 0.0
373 } else {
374 (success as f64 / total as f64) * 100.0
375 }
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use std::io::Cursor;
383
384 #[test]
385 fn test_progress_tracker() {
386 let tracker = ProgressTracker::new(100, None);
387
388 tracker.update(25);
389 assert_eq!(tracker.processed(), 25);
390 assert_eq!(tracker.percentage(), 25.0);
391
392 tracker.update(50);
393 assert_eq!(tracker.processed(), 75);
394 assert_eq!(tracker.percentage(), 75.0);
395 }
396
397 #[test]
398 fn test_progress_tracker_callback() {
399 let called = Arc::new(AtomicU64::new(0));
400 let called_clone = Arc::clone(&called);
401
402 let callback: ProgressCallback = Arc::new(move |processed, total| {
403 called_clone.store(processed as u64, Ordering::SeqCst);
404 assert_eq!(total, 100);
405 true
406 });
407
408 let tracker = ProgressTracker::new(100, Some(callback));
409 tracker.update(50);
410
411 assert_eq!(called.load(Ordering::SeqCst), 50);
412 }
413
414 #[test]
415 fn test_progress_reader() {
416 let data = b"Hello, World!";
417 let tracker = Arc::new(ProgressTracker::new(data.len(), None));
418
419 let mut reader = ProgressReader::new(Cursor::new(data), tracker.clone(), 1024);
420
421 let mut buf = Vec::new();
422 reader.read_to_end(&mut buf).unwrap();
423
424 assert_eq!(buf, data);
425 assert_eq!(tracker.processed(), data.len() as u64);
426 }
427
428 #[test]
429 fn test_performance_stats() {
430 let stats = PerformanceStats::default();
431
432 stats.record(true, 1000);
433 stats.record(true, 2000);
434 stats.record(false, 500);
435
436 assert_eq!(stats.total_operations.load(Ordering::SeqCst), 3);
437 assert_eq!(stats.successful_operations.load(Ordering::SeqCst), 2);
438 assert_eq!(stats.failed_operations.load(Ordering::SeqCst), 1);
439 assert_eq!(stats.avg_time_us(), 1166); }
441}