1use crate::ExifTool;
6use crate::error::Result;
7use std::fmt;
8use std::io::{self, Read};
9use std::path::Path;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13pub type ProgressCallback = Arc<dyn Fn(usize, usize) -> bool + Send + Sync>;
15
16#[derive(Clone)]
18pub struct StreamOptions {
19 pub buffer_size: usize,
21 pub progress_callback: Option<ProgressCallback>,
23}
24
25impl fmt::Debug for StreamOptions {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 f.debug_struct("StreamOptions")
28 .field("buffer_size", &self.buffer_size)
29 .field("has_callback", &self.progress_callback.is_some())
30 .finish()
31 }
32}
33
34impl Default for StreamOptions {
35 fn default() -> Self {
36 Self {
37 buffer_size: 64 * 1024, progress_callback: None,
39 }
40 }
41}
42
43impl StreamOptions {
44 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn buffer_size(mut self, size: usize) -> Self {
51 self.buffer_size = size;
52 self
53 }
54
55 pub fn on_progress<F>(mut self, callback: F) -> Self
57 where
58 F: Fn(usize, usize) -> bool + Send + Sync + 'static,
59 {
60 self.progress_callback = Some(Arc::new(callback));
61 self
62 }
63}
64
65pub struct ProgressTracker {
67 total: AtomicU64,
69 processed: AtomicU64,
71 callback: Option<ProgressCallback>,
73 cancelled: std::sync::atomic::AtomicBool,
75}
76
77impl fmt::Debug for ProgressTracker {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 f.debug_struct("ProgressTracker")
80 .field("total", &self.total.load(Ordering::SeqCst))
81 .field("processed", &self.processed.load(Ordering::SeqCst))
82 .field("has_callback", &self.callback.is_some())
83 .field("cancelled", &self.cancelled.load(Ordering::SeqCst))
84 .finish()
85 }
86}
87
88impl ProgressTracker {
89 pub fn new(total: usize, callback: Option<ProgressCallback>) -> Self {
91 Self {
92 total: AtomicU64::new(total as u64),
93 processed: AtomicU64::new(0),
94 callback,
95 cancelled: std::sync::atomic::AtomicBool::new(false),
96 }
97 }
98
99 pub fn update(&self, bytes: usize) {
101 let processed = self.processed.fetch_add(bytes as u64, Ordering::SeqCst) + bytes as u64;
102 let total = self.total.load(Ordering::SeqCst);
103
104 if let Some(ref callback) = self.callback
105 && !callback(processed as usize, total as usize)
106 {
107 self.cancelled.store(true, Ordering::SeqCst);
108 }
109 }
110
111 pub fn is_cancelled(&self) -> bool {
113 self.cancelled.load(Ordering::SeqCst)
114 }
115
116 pub fn percentage(&self) -> f64 {
118 let processed = self.processed.load(Ordering::SeqCst);
119 let total = self.total.load(Ordering::SeqCst);
120
121 if total == 0 {
122 0.0
123 } else {
124 (processed as f64 / total as f64) * 100.0
125 }
126 }
127
128 pub fn processed(&self) -> u64 {
130 self.processed.load(Ordering::SeqCst)
131 }
132
133 pub fn total(&self) -> u64 {
135 self.total.load(Ordering::SeqCst)
136 }
137}
138
139pub struct ProgressReader<R: Read> {
141 inner: R,
142 tracker: Arc<ProgressTracker>,
143}
144
145impl<R: Read> ProgressReader<R> {
146 pub fn new(inner: R, tracker: Arc<ProgressTracker>, _buffer_size: usize) -> Self {
148 Self { inner, tracker }
149 }
150
151 pub fn is_cancelled(&self) -> bool {
153 self.tracker.is_cancelled()
154 }
155}
156
157impl<R: Read> Read for ProgressReader<R> {
158 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
159 if self.is_cancelled() {
160 return Err(io::Error::new(
161 io::ErrorKind::Interrupted,
162 "Operation cancelled",
163 ));
164 }
165
166 let n = self.inner.read(buf)?;
167 self.tracker.update(n);
168 Ok(n)
169 }
170}
171
172pub trait StreamingOperations {
174 fn process_streaming<P, F, R>(
176 &self,
177 path: P,
178 options: &StreamOptions,
179 processor: F,
180 ) -> Result<R>
181 where
182 P: AsRef<Path>,
183 F: FnMut(&mut dyn Read) -> Result<R>;
184
185 fn process_batch_with_progress<P, F>(
187 &self,
188 paths: &[P],
189 options: &StreamOptions,
190 processor: F,
191 ) -> Vec<Result<()>>
192 where
193 P: AsRef<Path>,
194 F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>;
195}
196
197impl StreamingOperations for ExifTool {
198 fn process_streaming<P, F, R>(
199 &self,
200 path: P,
201 options: &StreamOptions,
202 mut processor: F,
203 ) -> Result<R>
204 where
205 P: AsRef<Path>,
206 F: FnMut(&mut dyn Read) -> Result<R>,
207 {
208 let file = std::fs::File::open(path.as_ref()).map_err(crate::error::Error::Io)?;
210
211 let tracker = Arc::new(ProgressTracker::new(1, options.progress_callback.clone()));
212
213 let mut reader = ProgressReader::new(file, tracker, options.buffer_size);
214
215 processor(&mut reader)
216 }
217
218 fn process_batch_with_progress<P, F>(
219 &self,
220 paths: &[P],
221 options: &StreamOptions,
222 processor: F,
223 ) -> Vec<Result<()>>
224 where
225 P: AsRef<Path>,
226 F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>,
227 {
228 let total = paths.len();
229 let tracker = Arc::new(ProgressTracker::new(
230 total,
231 options.progress_callback.clone(),
232 ));
233
234 let mut results = Vec::with_capacity(total);
235 let mut processor = processor;
236
237 for path in paths {
238 let result = processor(self, path.as_ref(), &tracker);
239 tracker.update(1);
240 results.push(result);
241 }
242
243 results
244 }
245}
246
247pub struct Cache<K, V> {
249 inner: std::sync::Mutex<lru::LruCache<K, V>>,
251 hits: AtomicU64,
253 misses: AtomicU64,
255}
256
257impl<K, V> fmt::Debug for Cache<K, V> {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 f.debug_struct("Cache")
260 .field("hits", &self.hits.load(Ordering::SeqCst))
261 .field("misses", &self.misses.load(Ordering::SeqCst))
262 .finish()
263 }
264}
265
266impl<K: std::hash::Hash + Eq + Clone, V: Clone> Cache<K, V> {
267 pub fn new(capacity: usize) -> Self {
269 use std::num::NonZeroUsize;
270 let capacity = NonZeroUsize::new(capacity.max(1)).unwrap();
271 Self {
272 inner: std::sync::Mutex::new(lru::LruCache::new(capacity)),
273 hits: AtomicU64::new(0),
274 misses: AtomicU64::new(0),
275 }
276 }
277
278 pub fn get(&self, key: &K) -> Option<V> {
280 let mut cache = self.inner.lock().ok()?;
281
282 if let Some(value) = cache.get(key) {
283 self.hits.fetch_add(1, Ordering::SeqCst);
284 Some(value.clone())
285 } else {
286 self.misses.fetch_add(1, Ordering::SeqCst);
287 None
288 }
289 }
290
291 pub fn put(&self, key: K, value: V) {
293 if let Ok(mut cache) = self.inner.lock() {
294 cache.put(key, value);
295 }
296 }
297
298 pub fn hit_rate(&self) -> f64 {
300 let hits = self.hits.load(Ordering::SeqCst);
301 let misses = self.misses.load(Ordering::SeqCst);
302 let total = hits + misses;
303
304 if total == 0 {
305 0.0
306 } else {
307 (hits as f64 / total as f64) * 100.0
308 }
309 }
310
311 pub fn clear(&self) {
313 if let Ok(mut cache) = self.inner.lock() {
314 cache.clear();
315 }
316 self.hits.store(0, Ordering::SeqCst);
317 self.misses.store(0, Ordering::SeqCst);
318 }
319}
320
321#[derive(Debug, Default)]
323pub struct PerformanceStats {
324 pub total_operations: AtomicU64,
326 pub successful_operations: AtomicU64,
328 pub failed_operations: AtomicU64,
330 pub total_time_us: AtomicU64,
332}
333
334impl PerformanceStats {
335 pub fn record(&self, success: bool, elapsed_us: u64) {
337 self.total_operations.fetch_add(1, Ordering::SeqCst);
338 self.total_time_us.fetch_add(elapsed_us, Ordering::SeqCst);
339
340 if success {
341 self.successful_operations.fetch_add(1, Ordering::SeqCst);
342 } else {
343 self.failed_operations.fetch_add(1, Ordering::SeqCst);
344 }
345 }
346
347 pub fn avg_time_us(&self) -> u64 {
349 let total = self.total_operations.load(Ordering::SeqCst);
350 let time = self.total_time_us.load(Ordering::SeqCst);
351
352 if total == 0 { 0 } else { time / total }
353 }
354
355 pub fn success_rate(&self) -> f64 {
357 let total = self.total_operations.load(Ordering::SeqCst);
358 let success = self.successful_operations.load(Ordering::SeqCst);
359
360 if total == 0 {
361 0.0
362 } else {
363 (success as f64 / total as f64) * 100.0
364 }
365 }
366}
367
368#[cfg(feature = "async")]
373pub mod async_stream {
374 use crate::error::{Error, Result};
379 use crate::types::Metadata;
380 use std::path::{Path, PathBuf};
381 use tokio::sync::mpsc;
382 use tokio::sync::watch;
383
384 #[derive(Debug, Clone)]
386 pub enum StreamEvent {
387 Progress(usize, usize),
389 MetadataChunk(Metadata),
391 Complete,
393 Cancelled,
395 }
396
397 #[derive(Debug, Clone)]
399 pub struct AsyncStreamOptions {
400 pub buffer_size: usize,
402 pub enable_progress: bool,
404 pub progress_interval_ms: u64,
406 }
407
408 impl Default for AsyncStreamOptions {
409 fn default() -> Self {
410 Self {
411 buffer_size: 64 * 1024, enable_progress: true,
413 progress_interval_ms: 100,
414 }
415 }
416 }
417
418 impl AsyncStreamOptions {
419 pub fn new() -> Self {
421 Self::default()
422 }
423
424 pub fn buffer_size(mut self, size: usize) -> Self {
426 self.buffer_size = size;
427 self
428 }
429
430 pub fn enable_progress(mut self, enable: bool) -> Self {
432 self.enable_progress = enable;
433 self
434 }
435
436 pub fn progress_interval_ms(mut self, interval: u64) -> Self {
438 self.progress_interval_ms = interval;
439 self
440 }
441 }
442
443 #[derive(Debug, Clone)]
447 pub struct AsyncStreamHandle {
448 cancel_tx: watch::Sender<bool>,
449 }
450
451 impl AsyncStreamHandle {
452 pub fn new() -> (Self, watch::Receiver<bool>) {
454 let (cancel_tx, cancel_rx) = watch::channel(false);
455 (Self { cancel_tx }, cancel_rx)
456 }
457
458 pub fn cancel(&self) -> Result<()> {
460 self.cancel_tx
461 .send(true)
462 .map_err(|e| Error::process(format!("Failed to send cancel signal: {}", e)))
463 }
464
465 pub fn is_cancelled(&self) -> bool {
467 *self.cancel_tx.borrow()
468 }
469 }
470
471 impl Default for AsyncStreamHandle {
472 fn default() -> Self {
473 let (handle, _) = Self::new();
474 handle
475 }
476 }
477
478 #[derive(Debug, Clone)]
480 pub struct AsyncBatchResult {
481 pub success_count: usize,
483 pub failure_count: usize,
485 pub total_count: usize,
487 }
488
489 #[async_trait::async_trait]
493 pub trait AsyncStreamingOperations {
494 async fn stream_query<P: AsRef<Path> + Send>(
526 &self,
527 path: P,
528 ) -> Result<(mpsc::Receiver<StreamEvent>, AsyncStreamHandle)>;
529
530 async fn stream_batch<P: AsRef<Path> + Send>(
534 &self,
535 paths: &[P],
536 options: &AsyncStreamOptions,
537 ) -> Result<(
538 mpsc::Receiver<(PathBuf, Result<Metadata>)>,
539 AsyncStreamHandle,
540 )>;
541
542 async fn stream_large_file<P: AsRef<Path> + Send>(
546 &self,
547 path: P,
548 options: &AsyncStreamOptions,
549 ) -> Result<(mpsc::Receiver<StreamEvent>, AsyncStreamHandle)>;
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use std::io::Cursor;
557
558 #[test]
559 fn test_progress_tracker() {
560 let tracker = ProgressTracker::new(100, None);
561
562 tracker.update(25);
563 assert_eq!(tracker.processed(), 25);
564 assert_eq!(tracker.percentage(), 25.0);
565
566 tracker.update(50);
567 assert_eq!(tracker.processed(), 75);
568 assert_eq!(tracker.percentage(), 75.0);
569 }
570
571 #[test]
572 fn test_progress_tracker_callback() {
573 let called = Arc::new(AtomicU64::new(0));
574 let called_clone = Arc::clone(&called);
575
576 let callback: ProgressCallback = Arc::new(move |processed, total| {
577 called_clone.store(processed as u64, Ordering::SeqCst);
578 assert_eq!(total, 100);
579 true
580 });
581
582 let tracker = ProgressTracker::new(100, Some(callback));
583 tracker.update(50);
584
585 assert_eq!(called.load(Ordering::SeqCst), 50);
586 }
587
588 #[test]
589 fn test_progress_reader() {
590 let data = b"Hello, World!";
591 let tracker = Arc::new(ProgressTracker::new(data.len(), None));
592
593 let mut reader = ProgressReader::new(Cursor::new(data), tracker.clone(), 1024);
594
595 let mut buf = Vec::new();
596 reader.read_to_end(&mut buf).unwrap();
597
598 assert_eq!(buf, data);
599 assert_eq!(tracker.processed(), data.len() as u64);
600 }
601
602 #[test]
603 fn test_performance_stats() {
604 let stats = PerformanceStats::default();
605
606 stats.record(true, 1000);
607 stats.record(true, 2000);
608 stats.record(false, 500);
609
610 assert_eq!(stats.total_operations.load(Ordering::SeqCst), 3);
611 assert_eq!(stats.successful_operations.load(Ordering::SeqCst), 2);
612 assert_eq!(stats.failed_operations.load(Ordering::SeqCst), 1);
613 assert_eq!(stats.avg_time_us(), 1166); }
615}