Skip to main content

exiftool_rs_wrapper/
stream.rs

1//! 流式处理和性能优化模块
2//!
3//! 支持大文件流式处理、进度回调、内存池优化
4
5use crate::ExifTool;
6use crate::error::Result;
7use std::fmt;
8use std::io::{self, Read};
9use std::path::Path;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// 进度回调函数类型
14pub type ProgressCallback = Arc<dyn Fn(usize, usize) -> bool + Send + Sync>;
15
16/// 流式处理选项
17#[derive(Clone)]
18pub struct StreamOptions {
19    /// 缓冲区大小(字节)
20    pub buffer_size: usize,
21    /// 进度回调
22    pub progress_callback: Option<ProgressCallback>,
23}
24
25impl fmt::Debug for StreamOptions {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.debug_struct("StreamOptions")
28            .field("buffer_size", &self.buffer_size)
29            .field("has_callback", &self.progress_callback.is_some())
30            .finish()
31    }
32}
33
34impl Default for StreamOptions {
35    fn default() -> Self {
36        Self {
37            buffer_size: 64 * 1024, // 64KB 默认缓冲区
38            progress_callback: None,
39        }
40    }
41}
42
43impl StreamOptions {
44    /// 创建新的流式处理选项
45    pub fn new() -> Self {
46        Self::default()
47    }
48
49    /// 设置缓冲区大小
50    pub fn buffer_size(mut self, size: usize) -> Self {
51        self.buffer_size = size;
52        self
53    }
54
55    /// 设置进度回调
56    pub fn on_progress<F>(mut self, callback: F) -> Self
57    where
58        F: Fn(usize, usize) -> bool + Send + Sync + 'static,
59    {
60        self.progress_callback = Some(Arc::new(callback));
61        self
62    }
63}
64
65/// 进度追踪器
66pub struct ProgressTracker {
67    /// 总字节数
68    total: AtomicU64,
69    /// 已处理字节数
70    processed: AtomicU64,
71    /// 回调函数
72    callback: Option<ProgressCallback>,
73    /// 是否取消
74    cancelled: std::sync::atomic::AtomicBool,
75}
76
77impl fmt::Debug for ProgressTracker {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        f.debug_struct("ProgressTracker")
80            .field("total", &self.total.load(Ordering::SeqCst))
81            .field("processed", &self.processed.load(Ordering::SeqCst))
82            .field("has_callback", &self.callback.is_some())
83            .field("cancelled", &self.cancelled.load(Ordering::SeqCst))
84            .finish()
85    }
86}
87
88impl ProgressTracker {
89    /// 创建新的进度追踪器
90    pub fn new(total: usize, callback: Option<ProgressCallback>) -> Self {
91        Self {
92            total: AtomicU64::new(total as u64),
93            processed: AtomicU64::new(0),
94            callback,
95            cancelled: std::sync::atomic::AtomicBool::new(false),
96        }
97    }
98
99    /// 更新进度
100    pub fn update(&self, bytes: usize) {
101        let processed = self.processed.fetch_add(bytes as u64, Ordering::SeqCst) + bytes as u64;
102        let total = self.total.load(Ordering::SeqCst);
103
104        if let Some(ref callback) = self.callback
105            && !callback(processed as usize, total as usize)
106        {
107            self.cancelled.store(true, Ordering::SeqCst);
108        }
109    }
110
111    /// 检查是否已取消
112    pub fn is_cancelled(&self) -> bool {
113        self.cancelled.load(Ordering::SeqCst)
114    }
115
116    /// 获取进度百分比
117    pub fn percentage(&self) -> f64 {
118        let processed = self.processed.load(Ordering::SeqCst);
119        let total = self.total.load(Ordering::SeqCst);
120
121        if total == 0 {
122            0.0
123        } else {
124            (processed as f64 / total as f64) * 100.0
125        }
126    }
127
128    /// 获取已处理字节数
129    pub fn processed(&self) -> u64 {
130        self.processed.load(Ordering::SeqCst)
131    }
132
133    /// 获取总字节数
134    pub fn total(&self) -> u64 {
135        self.total.load(Ordering::SeqCst)
136    }
137}
138
139/// 缓冲读取器(支持进度追踪)
140pub struct ProgressReader<R: Read> {
141    inner: R,
142    tracker: Arc<ProgressTracker>,
143}
144
145impl<R: Read> ProgressReader<R> {
146    /// 创建新的进度读取器
147    pub fn new(inner: R, tracker: Arc<ProgressTracker>, _buffer_size: usize) -> Self {
148        Self { inner, tracker }
149    }
150
151    /// 检查是否已取消
152    pub fn is_cancelled(&self) -> bool {
153        self.tracker.is_cancelled()
154    }
155}
156
157impl<R: Read> Read for ProgressReader<R> {
158    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
159        if self.is_cancelled() {
160            return Err(io::Error::new(
161                io::ErrorKind::Interrupted,
162                "Operation cancelled",
163            ));
164        }
165
166        let n = self.inner.read(buf)?;
167        self.tracker.update(n);
168        Ok(n)
169    }
170}
171
172/// 流式处理 trait
173pub trait StreamingOperations {
174    /// 流式处理大文件
175    fn process_streaming<P, F, R>(
176        &self,
177        path: P,
178        options: &StreamOptions,
179        processor: F,
180    ) -> Result<R>
181    where
182        P: AsRef<Path>,
183        F: FnMut(&mut dyn Read) -> Result<R>;
184
185    /// 批量处理带进度回调
186    fn process_batch_with_progress<P, F>(
187        &self,
188        paths: &[P],
189        options: &StreamOptions,
190        processor: F,
191    ) -> Vec<Result<()>>
192    where
193        P: AsRef<Path>,
194        F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>;
195}
196
197impl StreamingOperations for ExifTool {
198    fn process_streaming<P, F, R>(
199        &self,
200        path: P,
201        options: &StreamOptions,
202        mut processor: F,
203    ) -> Result<R>
204    where
205        P: AsRef<Path>,
206        F: FnMut(&mut dyn Read) -> Result<R>,
207    {
208        // 使用标准文件读取实现流式处理
209        let file = std::fs::File::open(path.as_ref()).map_err(crate::error::Error::Io)?;
210
211        let tracker = Arc::new(ProgressTracker::new(1, options.progress_callback.clone()));
212
213        let mut reader = ProgressReader::new(file, tracker, options.buffer_size);
214
215        processor(&mut reader)
216    }
217
218    fn process_batch_with_progress<P, F>(
219        &self,
220        paths: &[P],
221        options: &StreamOptions,
222        processor: F,
223    ) -> Vec<Result<()>>
224    where
225        P: AsRef<Path>,
226        F: FnMut(&ExifTool, &Path, &ProgressTracker) -> Result<()>,
227    {
228        let total = paths.len();
229        let tracker = Arc::new(ProgressTracker::new(
230            total,
231            options.progress_callback.clone(),
232        ));
233
234        let mut results = Vec::with_capacity(total);
235        let mut processor = processor;
236
237        for path in paths {
238            let result = processor(self, path.as_ref(), &tracker);
239            tracker.update(1);
240            results.push(result);
241        }
242
243        results
244    }
245}
246
247/// 缓存管理器
248pub struct Cache<K, V> {
249    /// 内部缓存
250    inner: std::sync::Mutex<lru::LruCache<K, V>>,
251    /// 命中率统计
252    hits: AtomicU64,
253    /// 未命中统计
254    misses: AtomicU64,
255}
256
257impl<K, V> fmt::Debug for Cache<K, V> {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        f.debug_struct("Cache")
260            .field("hits", &self.hits.load(Ordering::SeqCst))
261            .field("misses", &self.misses.load(Ordering::SeqCst))
262            .finish()
263    }
264}
265
266impl<K: std::hash::Hash + Eq + Clone, V: Clone> Cache<K, V> {
267    /// 创建新的缓存
268    pub fn new(capacity: usize) -> Self {
269        use std::num::NonZeroUsize;
270        let capacity = NonZeroUsize::new(capacity.max(1)).unwrap();
271        Self {
272            inner: std::sync::Mutex::new(lru::LruCache::new(capacity)),
273            hits: AtomicU64::new(0),
274            misses: AtomicU64::new(0),
275        }
276    }
277
278    /// 获取值
279    pub fn get(&self, key: &K) -> Option<V> {
280        let mut cache = self.inner.lock().ok()?;
281
282        if let Some(value) = cache.get(key) {
283            self.hits.fetch_add(1, Ordering::SeqCst);
284            Some(value.clone())
285        } else {
286            self.misses.fetch_add(1, Ordering::SeqCst);
287            None
288        }
289    }
290
291    /// 插入值
292    pub fn put(&self, key: K, value: V) {
293        if let Ok(mut cache) = self.inner.lock() {
294            cache.put(key, value);
295        }
296    }
297
298    /// 获取命中率
299    pub fn hit_rate(&self) -> f64 {
300        let hits = self.hits.load(Ordering::SeqCst);
301        let misses = self.misses.load(Ordering::SeqCst);
302        let total = hits + misses;
303
304        if total == 0 {
305            0.0
306        } else {
307            (hits as f64 / total as f64) * 100.0
308        }
309    }
310
311    /// 清空缓存
312    pub fn clear(&self) {
313        if let Ok(mut cache) = self.inner.lock() {
314            cache.clear();
315        }
316        self.hits.store(0, Ordering::SeqCst);
317        self.misses.store(0, Ordering::SeqCst);
318    }
319}
320
321/// 性能统计
322#[derive(Debug, Default)]
323pub struct PerformanceStats {
324    /// 总操作数
325    pub total_operations: AtomicU64,
326    /// 成功操作数
327    pub successful_operations: AtomicU64,
328    /// 失败操作数
329    pub failed_operations: AtomicU64,
330    /// 总耗时(微秒)
331    pub total_time_us: AtomicU64,
332}
333
334impl PerformanceStats {
335    /// 记录操作
336    pub fn record(&self, success: bool, elapsed_us: u64) {
337        self.total_operations.fetch_add(1, Ordering::SeqCst);
338        self.total_time_us.fetch_add(elapsed_us, Ordering::SeqCst);
339
340        if success {
341            self.successful_operations.fetch_add(1, Ordering::SeqCst);
342        } else {
343            self.failed_operations.fetch_add(1, Ordering::SeqCst);
344        }
345    }
346
347    /// 获取平均耗时(微秒)
348    pub fn avg_time_us(&self) -> u64 {
349        let total = self.total_operations.load(Ordering::SeqCst);
350        let time = self.total_time_us.load(Ordering::SeqCst);
351
352        if total == 0 { 0 } else { time / total }
353    }
354
355    /// 获取成功率
356    pub fn success_rate(&self) -> f64 {
357        let total = self.total_operations.load(Ordering::SeqCst);
358        let success = self.successful_operations.load(Ordering::SeqCst);
359
360        if total == 0 {
361            0.0
362        } else {
363            (success as f64 / total as f64) * 100.0
364        }
365    }
366}
367
368// ============================================================================
369// 异步流支持(需要 async feature)
370// ============================================================================
371
372#[cfg(feature = "async")]
373pub mod async_stream {
374    //! 异步流式处理支持模块
375    //!
376    //! 提供基于流的异步元数据处理,支持进度跟踪和取消操作。
377
378    use crate::error::{Error, Result};
379    use crate::types::Metadata;
380    use std::path::{Path, PathBuf};
381    use tokio::sync::mpsc;
382    use tokio::sync::watch;
383
384    /// 流事件类型
385    #[derive(Debug, Clone)]
386    pub enum StreamEvent {
387        /// 进度更新(已处理字节数,总字节数)
388        Progress(usize, usize),
389        /// 元数据块(用于流式解析)
390        MetadataChunk(Metadata),
391        /// 处理完成
392        Complete,
393        /// 处理取消
394        Cancelled,
395    }
396
397    /// 异步流选项
398    #[derive(Debug, Clone)]
399    pub struct AsyncStreamOptions {
400        /// 缓冲区大小(字节)
401        pub buffer_size: usize,
402        /// 是否启用进度报告
403        pub enable_progress: bool,
404        /// 进度报告间隔(毫秒)
405        pub progress_interval_ms: u64,
406    }
407
408    impl Default for AsyncStreamOptions {
409        fn default() -> Self {
410            Self {
411                buffer_size: 64 * 1024, // 64KB
412                enable_progress: true,
413                progress_interval_ms: 100,
414            }
415        }
416    }
417
418    impl AsyncStreamOptions {
419        /// 创建新的异步流选项
420        pub fn new() -> Self {
421            Self::default()
422        }
423
424        /// 设置缓冲区大小
425        pub fn buffer_size(mut self, size: usize) -> Self {
426            self.buffer_size = size;
427            self
428        }
429
430        /// 设置是否启用进度报告
431        pub fn enable_progress(mut self, enable: bool) -> Self {
432            self.enable_progress = enable;
433            self
434        }
435
436        /// 设置进度报告间隔
437        pub fn progress_interval_ms(mut self, interval: u64) -> Self {
438            self.progress_interval_ms = interval;
439            self
440        }
441    }
442
443    /// 异步流处理句柄
444    ///
445    /// 用于控制异步流处理的执行,支持取消操作。
446    #[derive(Debug, Clone)]
447    pub struct AsyncStreamHandle {
448        cancel_tx: watch::Sender<bool>,
449    }
450
451    impl AsyncStreamHandle {
452        /// 创建新的流处理句柄
453        pub fn new() -> (Self, watch::Receiver<bool>) {
454            let (cancel_tx, cancel_rx) = watch::channel(false);
455            (Self { cancel_tx }, cancel_rx)
456        }
457
458        /// 取消流处理
459        pub fn cancel(&self) -> Result<()> {
460            self.cancel_tx
461                .send(true)
462                .map_err(|e| Error::process(format!("Failed to send cancel signal: {}", e)))
463        }
464
465        /// 检查是否已请求取消
466        pub fn is_cancelled(&self) -> bool {
467            *self.cancel_tx.borrow()
468        }
469    }
470
471    impl Default for AsyncStreamHandle {
472        fn default() -> Self {
473            let (handle, _) = Self::new();
474            handle
475        }
476    }
477
478    /// 异步批量处理结果
479    #[derive(Debug, Clone)]
480    pub struct AsyncBatchResult {
481        /// 成功处理的文件数
482        pub success_count: usize,
483        /// 失败的文件数
484        pub failure_count: usize,
485        /// 总文件数
486        pub total_count: usize,
487    }
488
489    /// 异步流处理 trait
490    ///
491    /// 为 ExifTool 提供异步流式处理能力。
492    #[async_trait::async_trait]
493    pub trait AsyncStreamingOperations {
494        /// 异步流式查询文件元数据
495        ///
496        /// 返回一个接收流事件的通道,可用于实时跟踪处理进度。
497        ///
498        /// # 示例
499        ///
500        /// ```rust,ignore
501        /// use exiftool_rs_wrapper::AsyncExifTool;
502        /// use exiftool_rs_wrapper::stream::async_stream::AsyncStreamingOperations;
503        ///
504        /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
505        ///     let exiftool = AsyncExifTool::new()?;
506        ///
507        ///     let (mut rx, _handle) = exiftool.stream_query("photo.jpg").await?;
508        ///
509        ///     while let Some(event) = rx.recv().await {
510        ///         match event {
511        ///             StreamEvent::Progress(processed, total) => {
512        ///                 println!("Progress: {}/{} bytes", processed, total);
513        ///             }
514        ///             StreamEvent::Complete => {
515        ///                 println!("Processing complete");
516        ///                 break;
517        ///             }
518        ///             _ => {}
519        ///         }
520        ///     }
521        ///
522        ///     Ok(())
523        /// }
524        /// ```
525        async fn stream_query<P: AsRef<Path> + Send>(
526            &self,
527            path: P,
528        ) -> Result<(mpsc::Receiver<StreamEvent>, AsyncStreamHandle)>;
529
530        /// 异步批量处理多个文件
531        ///
532        /// 并发处理多个文件,通过流返回进度和结果。
533        async fn stream_batch<P: AsRef<Path> + Send>(
534            &self,
535            paths: &[P],
536            options: &AsyncStreamOptions,
537        ) -> Result<(
538            mpsc::Receiver<(PathBuf, Result<Metadata>)>,
539            AsyncStreamHandle,
540        )>;
541
542        /// 异步处理大文件
543        ///
544        /// 适合处理视频等大文件,支持分块读取和进度跟踪。
545        async fn stream_large_file<P: AsRef<Path> + Send>(
546            &self,
547            path: P,
548            options: &AsyncStreamOptions,
549        ) -> Result<(mpsc::Receiver<StreamEvent>, AsyncStreamHandle)>;
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use std::io::Cursor;
557
558    #[test]
559    fn test_progress_tracker() {
560        let tracker = ProgressTracker::new(100, None);
561
562        tracker.update(25);
563        assert_eq!(tracker.processed(), 25);
564        assert_eq!(tracker.percentage(), 25.0);
565
566        tracker.update(50);
567        assert_eq!(tracker.processed(), 75);
568        assert_eq!(tracker.percentage(), 75.0);
569    }
570
571    #[test]
572    fn test_progress_tracker_callback() {
573        let called = Arc::new(AtomicU64::new(0));
574        let called_clone = Arc::clone(&called);
575
576        let callback: ProgressCallback = Arc::new(move |processed, total| {
577            called_clone.store(processed as u64, Ordering::SeqCst);
578            assert_eq!(total, 100);
579            true
580        });
581
582        let tracker = ProgressTracker::new(100, Some(callback));
583        tracker.update(50);
584
585        assert_eq!(called.load(Ordering::SeqCst), 50);
586    }
587
588    #[test]
589    fn test_progress_reader() {
590        let data = b"Hello, World!";
591        let tracker = Arc::new(ProgressTracker::new(data.len(), None));
592
593        let mut reader = ProgressReader::new(Cursor::new(data), tracker.clone(), 1024);
594
595        let mut buf = Vec::new();
596        reader.read_to_end(&mut buf).unwrap();
597
598        assert_eq!(buf, data);
599        assert_eq!(tracker.processed(), data.len() as u64);
600    }
601
602    #[test]
603    fn test_performance_stats() {
604        let stats = PerformanceStats::default();
605
606        stats.record(true, 1000);
607        stats.record(true, 2000);
608        stats.record(false, 500);
609
610        assert_eq!(stats.total_operations.load(Ordering::SeqCst), 3);
611        assert_eq!(stats.successful_operations.load(Ordering::SeqCst), 2);
612        assert_eq!(stats.failed_operations.load(Ordering::SeqCst), 1);
613        assert_eq!(stats.avg_time_us(), 1166); // (1000+2000+500)/3
614    }
615}