Skip to main content

exiftool_rs_wrapper/
stream.rs

1//! 流式处理和性能优化模块
2//!
3//! 支持大文件流式处理、进度回调、内存池优化
4
5use crate::ExifTool;
6use crate::error::Result;
7use std::fmt;
8use std::io::{self, Read};
9use std::path::Path;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// 进度回调函数类型
14pub type ProgressCallback = Arc<dyn Fn(usize, usize) -> bool + Send + Sync>;
15
16/// 流式处理选项
17#[derive(Clone)]
18pub struct StreamOptions {
19    /// 缓冲区大小(字节)
20    pub buffer_size: usize,
21    /// 进度回调
22    pub progress_callback: Option<ProgressCallback>,
23    /// 超时(秒)
24    pub timeout: Option<u64>,
25}
26
27impl fmt::Debug for StreamOptions {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.debug_struct("StreamOptions")
30            .field("buffer_size", &self.buffer_size)
31            .field("has_callback", &self.progress_callback.is_some())
32            .field("timeout", &self.timeout)
33            .finish()
34    }
35}
36
37impl Default for StreamOptions {
38    fn default() -> Self {
39        Self {
40            buffer_size: 64 * 1024, // 64KB 默认缓冲区
41            progress_callback: None,
42            timeout: None,
43        }
44    }
45}
46
47impl StreamOptions {
48    /// 创建新的流式处理选项
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    /// 设置缓冲区大小
54    pub fn buffer_size(mut self, size: usize) -> Self {
55        self.buffer_size = size;
56        self
57    }
58
59    /// 设置进度回调
60    pub fn on_progress<F>(mut self, callback: F) -> Self
61    where
62        F: Fn(usize, usize) -> bool + Send + Sync + 'static,
63    {
64        self.progress_callback = Some(Arc::new(callback));
65        self
66    }
67
68    /// 设置超时
69    pub fn timeout(mut self, seconds: u64) -> Self {
70        self.timeout = Some(seconds);
71        self
72    }
73}
74
75/// 进度追踪器
76pub struct ProgressTracker {
77    /// 总字节数
78    total: AtomicU64,
79    /// 已处理字节数
80    processed: AtomicU64,
81    /// 回调函数
82    callback: Option<ProgressCallback>,
83    /// 是否取消
84    cancelled: std::sync::atomic::AtomicBool,
85}
86
87impl fmt::Debug for ProgressTracker {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("ProgressTracker")
90            .field("total", &self.total.load(Ordering::SeqCst))
91            .field("processed", &self.processed.load(Ordering::SeqCst))
92            .field("has_callback", &self.callback.is_some())
93            .field("cancelled", &self.cancelled.load(Ordering::SeqCst))
94            .finish()
95    }
96}
97
98impl ProgressTracker {
99    /// 创建新的进度追踪器
100    pub fn new(total: usize, callback: Option<ProgressCallback>) -> Self {
101        Self {
102            total: AtomicU64::new(total as u64),
103            processed: AtomicU64::new(0),
104            callback,
105            cancelled: std::sync::atomic::AtomicBool::new(false),
106        }
107    }
108
109    /// 更新进度
110    pub fn update(&self, bytes: usize) {
111        let processed = self.processed.fetch_add(bytes as u64, Ordering::SeqCst) + bytes as u64;
112        let total = self.total.load(Ordering::SeqCst);
113
114        if let Some(ref callback) = self.callback
115            && !callback(processed as usize, total as usize)
116        {
117            self.cancelled.store(true, Ordering::SeqCst);
118        }
119    }
120
121    /// 检查是否已取消
122    pub fn is_cancelled(&self) -> bool {
123        self.cancelled.load(Ordering::SeqCst)
124    }
125
126    /// 获取进度百分比
127    pub fn percentage(&self) -> f64 {
128        let processed = self.processed.load(Ordering::SeqCst);
129        let total = self.total.load(Ordering::SeqCst);
130
131        if total == 0 {
132            0.0
133        } else {
134            (processed as f64 / total as f64) * 100.0
135        }
136    }
137
138    /// 获取已处理字节数
139    pub fn processed(&self) -> u64 {
140        self.processed.load(Ordering::SeqCst)
141    }
142
143    /// 获取总字节数
144    pub fn total(&self) -> u64 {
145        self.total.load(Ordering::SeqCst)
146    }
147}
148
149/// 缓冲读取器(支持进度追踪)
150pub struct ProgressReader<R: Read> {
151    inner: R,
152    tracker: Arc<ProgressTracker>,
153    #[allow(dead_code)]
154    buffer_size: usize,
155}
156
157impl<R: Read> ProgressReader<R> {
158    /// 创建新的进度读取器
159    pub fn new(inner: R, tracker: Arc<ProgressTracker>, buffer_size: usize) -> Self {
160        Self {
161            inner,
162            tracker,
163            buffer_size,
164        }
165    }
166
167    /// 检查是否已取消
168    pub fn is_cancelled(&self) -> bool {
169        self.tracker.is_cancelled()
170    }
171}
172
173impl<R: Read> Read for ProgressReader<R> {
174    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
175        if self.is_cancelled() {
176            return Err(io::Error::new(
177                io::ErrorKind::Interrupted,
178                "Operation cancelled",
179            ));
180        }
181
182        let n = self.inner.read(buf)?;
183        self.tracker.update(n);
184        Ok(n)
185    }
186}
187
188/// 流式处理 trait
189pub trait StreamingOperations {
190    /// 流式处理大文件
191    fn process_streaming<P, F, R>(
192        &self,
193        path: P,
194        options: &StreamOptions,
195        processor: F,
196    ) -> Result<R>
197    where
198        P: AsRef<Path>,
199        F: FnMut(&mut dyn Read) -> Result<R>;
200
201    /// 批量处理带进度回调
202    fn process_batch_with_progress<P, F>(
203        &self,
204        paths: &[P],
205        options: &StreamOptions,
206        processor: F,
207    ) -> Vec<Result<()>>
208    where
209        P: AsRef<Path>,
210        F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>;
211}
212
213impl StreamingOperations for ExifTool {
214    fn process_streaming<P, F, R>(
215        &self,
216        _path: P,
217        _options: &StreamOptions,
218        _processor: F,
219    ) -> Result<R>
220    where
221        P: AsRef<Path>,
222        F: FnMut(&mut dyn Read) -> Result<R>,
223    {
224        // 流式处理需要使用 ExifTool 的流式模式
225        // 这里简化处理,实际实现会更复杂
226        todo!("Streaming mode requires special handling with ExifTool process")
227    }
228
229    fn process_batch_with_progress<P, F>(
230        &self,
231        paths: &[P],
232        options: &StreamOptions,
233        processor: F,
234    ) -> Vec<Result<()>>
235    where
236        P: AsRef<Path>,
237        F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>,
238    {
239        let total = paths.len();
240        let tracker = Arc::new(ProgressTracker::new(
241            total,
242            options.progress_callback.clone(),
243        ));
244
245        let mut results = Vec::with_capacity(total);
246        let mut processor = processor;
247
248        for path in paths {
249            let result = processor(self, path.as_ref(), &tracker);
250            tracker.update(1);
251            results.push(result);
252        }
253
254        results
255    }
256}
257
258/// 缓存管理器
259pub struct Cache<K, V> {
260    /// 内部缓存
261    inner: std::sync::Mutex<lru::LruCache<K, V>>,
262    /// 命中率统计
263    hits: AtomicU64,
264    /// 未命中统计
265    misses: AtomicU64,
266}
267
268impl<K, V> fmt::Debug for Cache<K, V> {
269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270        f.debug_struct("Cache")
271            .field("hits", &self.hits.load(Ordering::SeqCst))
272            .field("misses", &self.misses.load(Ordering::SeqCst))
273            .finish()
274    }
275}
276
277impl<K: std::hash::Hash + Eq + Clone, V: Clone> Cache<K, V> {
278    /// 创建新的缓存
279    pub fn new(capacity: usize) -> Self {
280        use std::num::NonZeroUsize;
281        let capacity = NonZeroUsize::new(capacity.max(1)).unwrap();
282        Self {
283            inner: std::sync::Mutex::new(lru::LruCache::new(capacity)),
284            hits: AtomicU64::new(0),
285            misses: AtomicU64::new(0),
286        }
287    }
288
289    /// 获取值
290    pub fn get(&self, key: &K) -> Option<V> {
291        let mut cache = self.inner.lock().ok()?;
292
293        if let Some(value) = cache.get(key) {
294            self.hits.fetch_add(1, Ordering::SeqCst);
295            Some(value.clone())
296        } else {
297            self.misses.fetch_add(1, Ordering::SeqCst);
298            None
299        }
300    }
301
302    /// 插入值
303    pub fn put(&self, key: K, value: V) {
304        if let Ok(mut cache) = self.inner.lock() {
305            cache.put(key, value);
306        }
307    }
308
309    /// 获取命中率
310    pub fn hit_rate(&self) -> f64 {
311        let hits = self.hits.load(Ordering::SeqCst);
312        let misses = self.misses.load(Ordering::SeqCst);
313        let total = hits + misses;
314
315        if total == 0 {
316            0.0
317        } else {
318            (hits as f64 / total as f64) * 100.0
319        }
320    }
321
322    /// 清空缓存
323    pub fn clear(&self) {
324        if let Ok(mut cache) = self.inner.lock() {
325            cache.clear();
326        }
327        self.hits.store(0, Ordering::SeqCst);
328        self.misses.store(0, Ordering::SeqCst);
329    }
330}
331
332/// 性能统计
333#[derive(Debug, Default)]
334pub struct PerformanceStats {
335    /// 总操作数
336    pub total_operations: AtomicU64,
337    /// 成功操作数
338    pub successful_operations: AtomicU64,
339    /// 失败操作数
340    pub failed_operations: AtomicU64,
341    /// 总耗时(微秒)
342    pub total_time_us: AtomicU64,
343}
344
345impl PerformanceStats {
346    /// 记录操作
347    pub fn record(&self, success: bool, elapsed_us: u64) {
348        self.total_operations.fetch_add(1, Ordering::SeqCst);
349        self.total_time_us.fetch_add(elapsed_us, Ordering::SeqCst);
350
351        if success {
352            self.successful_operations.fetch_add(1, Ordering::SeqCst);
353        } else {
354            self.failed_operations.fetch_add(1, Ordering::SeqCst);
355        }
356    }
357
358    /// 获取平均耗时(微秒)
359    pub fn avg_time_us(&self) -> u64 {
360        let total = self.total_operations.load(Ordering::SeqCst);
361        let time = self.total_time_us.load(Ordering::SeqCst);
362
363        if total == 0 { 0 } else { time / total }
364    }
365
366    /// 获取成功率
367    pub fn success_rate(&self) -> f64 {
368        let total = self.total_operations.load(Ordering::SeqCst);
369        let success = self.successful_operations.load(Ordering::SeqCst);
370
371        if total == 0 {
372            0.0
373        } else {
374            (success as f64 / total as f64) * 100.0
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use std::io::Cursor;
383
384    #[test]
385    fn test_progress_tracker() {
386        let tracker = ProgressTracker::new(100, None);
387
388        tracker.update(25);
389        assert_eq!(tracker.processed(), 25);
390        assert_eq!(tracker.percentage(), 25.0);
391
392        tracker.update(50);
393        assert_eq!(tracker.processed(), 75);
394        assert_eq!(tracker.percentage(), 75.0);
395    }
396
397    #[test]
398    fn test_progress_tracker_callback() {
399        let called = Arc::new(AtomicU64::new(0));
400        let called_clone = Arc::clone(&called);
401
402        let callback: ProgressCallback = Arc::new(move |processed, total| {
403            called_clone.store(processed as u64, Ordering::SeqCst);
404            assert_eq!(total, 100);
405            true
406        });
407
408        let tracker = ProgressTracker::new(100, Some(callback));
409        tracker.update(50);
410
411        assert_eq!(called.load(Ordering::SeqCst), 50);
412    }
413
414    #[test]
415    fn test_progress_reader() {
416        let data = b"Hello, World!";
417        let tracker = Arc::new(ProgressTracker::new(data.len(), None));
418
419        let mut reader = ProgressReader::new(Cursor::new(data), tracker.clone(), 1024);
420
421        let mut buf = Vec::new();
422        reader.read_to_end(&mut buf).unwrap();
423
424        assert_eq!(buf, data);
425        assert_eq!(tracker.processed(), data.len() as u64);
426    }
427
428    #[test]
429    fn test_performance_stats() {
430        let stats = PerformanceStats::default();
431
432        stats.record(true, 1000);
433        stats.record(true, 2000);
434        stats.record(false, 500);
435
436        assert_eq!(stats.total_operations.load(Ordering::SeqCst), 3);
437        assert_eq!(stats.successful_operations.load(Ordering::SeqCst), 2);
438        assert_eq!(stats.failed_operations.load(Ordering::SeqCst), 1);
439        assert_eq!(stats.avg_time_us(), 1166); // (1000+2000+500)/3
440    }
441}