pcapfile_io/api/
reader.rs

1//! 数据集读取器模块
2//!
3//! 提供高级的数据集读取功能,支持多文件PCAP数据集的统一读取接口。
4
5use log::{debug, info};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14    DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18// 错误消息常量
19const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22/// PCAP数据集读取器
23///
24/// 提供对PCAP数据集的高性能读取功能,支持:
25/// - 自动索引管理和验证
26/// - 顺序读取和文件切换
27/// - 智能缓存和性能优化
28/// - 多文件数据集统一访问
29pub struct PcapReader {
30    /// 数据集目录路径
31    dataset_path: PathBuf,
32    /// 数据集名称
33    dataset_name: String,
34    /// 索引管理器
35    index_manager: IndexManager,
36    /// 配置信息
37    configuration: ReaderConfig,
38    /// 当前文件读取器
39    current_reader: Option<PcapFileReader>,
40    /// 当前文件索引
41    current_file_index: usize,
42    /// 当前读取位置(全局数据包索引)
43    current_position: u64,
44    /// 文件信息缓存
45    file_info_cache: FileInfoCache,
46    /// 总大小缓存
47    total_size_cache: RefCell<Option<u64>>,
48    /// 是否已初始化
49    is_initialized: bool,
50}
51
52impl PcapReader {
53    /// 创建新的PCAP读取器
54    ///
55    /// # 参数
56    /// - `base_path` - 基础路径
57    /// - `dataset_name` - 数据集名称
58    ///
59    /// # 返回
60    /// 返回初始化后的读取器实例
61    pub fn new<P: AsRef<Path>>(
62        base_path: P,
63        dataset_name: &str,
64    ) -> PcapResult<Self> {
65        Self::new_with_config(
66            base_path,
67            dataset_name,
68            ReaderConfig::default(),
69        )
70    }
71
72    /// 创建新的PCAP读取器(带配置)
73    ///
74    /// # 参数
75    /// - `base_path` - 基础路径
76    /// - `dataset_name` - 数据集名称
77    /// - `configuration` - 读取器配置信息
78    ///
79    /// # 返回
80    /// 返回初始化后的读取器实例
81    pub fn new_with_config<P: AsRef<Path>>(
82        base_path: P,
83        dataset_name: &str,
84        configuration: ReaderConfig,
85    ) -> PcapResult<Self> {
86        let dataset_path =
87            base_path.as_ref().join(dataset_name);
88
89        // 验证数据集目录
90        if !dataset_path.exists() {
91            return Err(PcapError::DirectoryNotFound(
92                ERROR_DATASET_NOT_FOUND.to_string(),
93            ));
94        }
95
96        if !dataset_path.is_dir() {
97            return Err(PcapError::InvalidArgument(
98                ERROR_INVALID_DATASET.to_string(),
99            ));
100        }
101
102        // 创建索引管理器
103        let index_manager =
104            IndexManager::new(base_path, dataset_name)?;
105
106        // 获取缓存大小(在移动 configuration 之前)
107        let cache_size = configuration.index_cache_size;
108
109        info!("PcapReader已创建 - 数据集: {dataset_name}");
110
111        Ok(Self {
112            dataset_path,
113            dataset_name: dataset_name.to_string(),
114            index_manager,
115            configuration,
116            current_reader: None,
117            current_file_index: 0,
118            current_position: 0,
119            file_info_cache: FileInfoCache::new(cache_size),
120            total_size_cache: RefCell::new(None),
121            is_initialized: false,
122        })
123    }
124
125    /// 初始化读取器
126    ///
127    /// 确保索引可用并准备好读取操作
128    pub fn initialize(&mut self) -> PcapResult<()> {
129        if self.is_initialized {
130            return Ok(());
131        }
132
133        info!("初始化PcapReader...");
134
135        // 确保索引可用
136        let _index = self.index_manager.ensure_index()?;
137
138        self.is_initialized = true;
139        info!("PcapReader初始化完成");
140        Ok(())
141    }
142
143    /// 获取数据集信息
144    pub fn get_dataset_info(
145        &mut self,
146    ) -> PcapResult<DatasetInfo> {
147        self.initialize()?;
148
149        let index = self
150            .index_manager
151            .get_index()
152            .ok_or_else(|| {
153                PcapError::InvalidState(
154                    "索引未加载".to_string(),
155                )
156            })?;
157
158        use chrono::Utc;
159
160        Ok(DatasetInfo {
161            name: self.dataset_name.clone(),
162            path: self.dataset_path.clone(),
163            file_count: index.data_files.files.len(),
164            total_packets: index.total_packets,
165            total_size: self.get_total_size()?,
166            start_timestamp: if index.start_timestamp > 0 {
167                Some(index.start_timestamp)
168            } else {
169                None
170            },
171            end_timestamp: if index.end_timestamp > 0 {
172                Some(index.end_timestamp)
173            } else {
174                None
175            },
176            created_time: Utc::now().to_rfc3339(),
177            modified_time: Utc::now().to_rfc3339(),
178            has_index: true,
179        })
180    }
181
182    /// 获取文件信息列表
183    pub fn get_file_info_list(
184        &mut self,
185    ) -> PcapResult<Vec<FileInfo>> {
186        self.initialize()?;
187
188        let index = self
189            .index_manager
190            .get_index()
191            .ok_or_else(|| {
192                PcapError::InvalidState(
193                    "索引未加载".to_string(),
194                )
195            })?;
196
197        use chrono::Utc;
198        let current_time = Utc::now().to_rfc3339();
199
200        let mut file_infos = Vec::new();
201        for file_index in &index.data_files.files {
202            let file_info = FileInfo {
203                file_name: file_index.file_name.clone(),
204                file_path: self
205                    .dataset_path
206                    .join(&file_index.file_name),
207                file_size: file_index.file_size,
208                packet_count: file_index.packet_count,
209                start_timestamp: if file_index
210                    .start_timestamp
211                    > 0
212                {
213                    Some(file_index.start_timestamp)
214                } else {
215                    None
216                },
217                end_timestamp: if file_index.end_timestamp
218                    > 0
219                {
220                    Some(file_index.end_timestamp)
221                } else {
222                    None
223                },
224                file_hash: Some(
225                    file_index.file_hash.clone(),
226                ),
227                created_time: current_time.clone(),
228                modified_time: current_time.clone(),
229                is_valid: true,
230            };
231            file_infos.push(file_info);
232        }
233
234        Ok(file_infos)
235    }
236
237    /// 获取数据集路径
238    pub fn dataset_path(&self) -> &Path {
239        &self.dataset_path
240    }
241
242    /// 获取数据集名称
243    pub fn dataset_name(&self) -> &str {
244        &self.dataset_name
245    }
246
247    /// 读取下一个数据包(默认方法,带校验结果)
248    ///
249    /// 从当前位置读取下一个数据包,包含校验状态信息。如果当前文件读取完毕,
250    /// 会自动切换到下一个文件。
251    ///
252    /// # 返回
253    /// - `Ok(Some(result))` - 成功读取到数据包和校验结果
254    /// - `Ok(None)` - 到达文件末尾,无更多数据包
255    /// - `Err(error)` - 读取过程中发生错误
256    pub fn read_packet(
257        &mut self,
258    ) -> PcapResult<Option<ValidatedPacket>> {
259        self.initialize()?;
260
261        // 确保当前文件已打开
262        self.ensure_current_file_open()?;
263
264        loop {
265            if let Some(ref mut reader) =
266                self.current_reader
267            {
268                match reader.read_packet() {
269                    Ok(Some(result)) => {
270                        self.current_position += 1;
271                        return Ok(Some(result));
272                    }
273                    Ok(None) => {
274                        // 当前文件读取完毕,尝试切换到下一个文件
275                        if !self.switch_to_next_file()? {
276                            // 没有更多文件
277                            return Ok(None);
278                        }
279                        continue;
280                    }
281                    Err(e) => return Err(e),
282                }
283            } else {
284                // 没有可读取的文件
285                return Ok(None);
286            }
287        }
288    }
289
290    /// 读取下一个数据包(仅返回数据,不返回校验信息)
291    ///
292    /// 从当前位置读取下一个数据包,仅返回数据包本身。如果当前文件读取完毕,
293    /// 会自动切换到下一个文件。
294    ///
295    /// # 返回
296    /// - `Ok(Some(packet))` - 成功读取到数据包
297    /// - `Ok(None)` - 到达文件末尾,无更多数据包
298    /// - `Err(error)` - 读取过程中发生错误
299    pub fn read_packet_data_only(
300        &mut self,
301    ) -> PcapResult<Option<DataPacket>> {
302        match self.read_packet()? {
303            Some(result) => Ok(Some(result.packet)),
304            None => Ok(None),
305        }
306    }
307
308    /// 批量读取多个数据包(默认方法,带校验结果)
309    ///
310    /// # 参数
311    /// - `count` - 要读取的数据包数量
312    ///
313    /// # 返回
314    pub fn read_packets(
315        &mut self,
316        count: usize,
317    ) -> PcapResult<Vec<ValidatedPacket>> {
318        self.initialize()?;
319
320        let mut results = Vec::with_capacity(count);
321
322        // 批量读取指定数量的数据包
323        for _ in 0..count {
324            if let Some(result) = self.read_packet()? {
325                results.push(result);
326            } else {
327                break; // 没有更多数据包
328            }
329        }
330
331        Ok(results)
332    }
333
334    /// 批量读取多个数据包(仅返回数据,不返回校验信息)
335    ///
336    /// # 参数
337    /// - `count` - 要读取的数据包数量
338    ///
339    /// # 返回
340    pub fn read_packets_data_only(
341        &mut self,
342        count: usize,
343    ) -> PcapResult<Vec<DataPacket>> {
344        self.initialize()?;
345
346        let mut packets = Vec::with_capacity(count);
347
348        // 批量读取指定数量的数据包
349        for _ in 0..count {
350            if let Some(packet) =
351                self.read_packet_data_only()?
352            {
353                packets.push(packet);
354            } else {
355                break; // 没有更多数据包
356            }
357        }
358
359        Ok(packets)
360    }
361
362    /// 重置读取器到数据集开始位置
363    ///
364    /// 将读取器重置到数据集的开始位置,后续读取将从第一个数据包开始。
365    pub fn reset(&mut self) -> PcapResult<()> {
366        self.initialize()?;
367
368        // 重置当前读取位置到数据集开始
369        self.current_position = 0;
370        self.current_file_index = 0;
371
372        // 关闭当前文件
373        if let Some(ref mut reader) = self.current_reader {
374            reader.close();
375        }
376        self.current_reader = None;
377
378        // 重新打开第一个文件(如果存在)
379        let index = self
380            .index_manager
381            .get_index()
382            .ok_or_else(|| {
383                PcapError::InvalidState(
384                    "索引未加载".to_string(),
385                )
386            })?;
387
388        if !index.data_files.files.is_empty() {
389            self.open_file(0)?;
390        }
391
392        info!("读取器已重置到数据集开始位置");
393        Ok(())
394    }
395
396    /// 获取索引管理器的引用
397    /// 允许外部通过 reader.index().method() 的方式访问索引功能
398    pub fn index(&self) -> &IndexManager {
399        &self.index_manager
400    }
401
402    /// 获取索引管理器的可变引用
403    /// 允许外部通过 reader.index_mut().method() 的方式访问索引功能
404    pub fn index_mut(&mut self) -> &mut IndexManager {
405        &mut self.index_manager
406    }
407
408    /// 获取缓存统计信息
409    pub fn get_cache_stats(&self) -> CacheStats {
410        self.file_info_cache.get_cache_stats()
411    }
412
413    /// 清理缓存
414    pub fn clear_cache(&mut self) -> PcapResult<()> {
415        let _ = self.file_info_cache.clear();
416        debug!("缓存已清理");
417        Ok(())
418    }
419
420    // =================================================================
421    // 私有方法
422    // =================================================================
423
424    /// 获取数据集总大小
425    fn get_total_size(&self) -> PcapResult<u64> {
426        if let Some(cached_size) =
427            *self.total_size_cache.borrow()
428        {
429            return Ok(cached_size);
430        }
431
432        let index = self
433            .index_manager
434            .get_index()
435            .ok_or_else(|| {
436                PcapError::InvalidState(
437                    "索引未加载".to_string(),
438                )
439            })?;
440
441        let total_size: u64 = index
442            .data_files
443            .files
444            .iter()
445            .map(|f| f.file_size)
446            .sum();
447
448        *self.total_size_cache.borrow_mut() =
449            Some(total_size);
450        Ok(total_size)
451    }
452
453    /// 打开指定索引的文件
454    fn open_file(
455        &mut self,
456        file_index: usize,
457    ) -> PcapResult<()> {
458        let index = self
459            .index_manager
460            .get_index()
461            .ok_or_else(|| {
462                PcapError::InvalidState(
463                    "索引未加载".to_string(),
464                )
465            })?;
466
467        if file_index >= index.data_files.files.len() {
468            return Err(PcapError::InvalidArgument(
469                format!("文件索引超出范围: {file_index}"),
470            ));
471        }
472
473        // 关闭当前文件
474        if let Some(ref mut reader) = self.current_reader {
475            reader.close();
476        }
477
478        // 打开新文件
479        let file_info = &index.data_files.files[file_index];
480        let file_path =
481            self.dataset_path.join(&file_info.file_name);
482
483        let mut reader =
484            crate::data::file_reader::PcapFileReader::new(
485                self.configuration.clone(),
486            );
487        reader.open(&file_path)?;
488
489        self.current_reader = Some(reader);
490        self.current_file_index = file_index;
491
492        debug!("已打开文件: {file_path:?}");
493        Ok(())
494    }
495
496    /// 切换到下一个文件
497    fn switch_to_next_file(&mut self) -> PcapResult<bool> {
498        let index = self
499            .index_manager
500            .get_index()
501            .ok_or_else(|| {
502                PcapError::InvalidState(
503                    "索引未加载".to_string(),
504                )
505            })?;
506
507        if self.current_file_index + 1
508            >= index.data_files.files.len()
509        {
510            // 没有更多文件
511            return Ok(false);
512        }
513
514        self.open_file(self.current_file_index + 1)?;
515        Ok(true)
516    }
517
518    /// 确保当前文件已打开
519    fn ensure_current_file_open(
520        &mut self,
521    ) -> PcapResult<()> {
522        if self.current_reader.is_none() {
523            let index = self
524                .index_manager
525                .get_index()
526                .ok_or_else(|| {
527                    PcapError::InvalidState(
528                        "索引未加载".to_string(),
529                    )
530                })?;
531
532            if !index.data_files.files.is_empty() {
533                self.open_file(0)?;
534            }
535        }
536        Ok(())
537    }
538}
539
540impl Drop for PcapReader {
541    fn drop(&mut self) {
542        // 关闭当前文件读取器
543        if let Some(ref mut reader) = self.current_reader {
544            reader.close();
545        }
546        debug!("PcapReader已清理");
547    }
548}