pcapfile_io/api/
reader.rs

1//! 数据集读取器模块
2//!
3//! 提供高级的数据集读取功能,支持多文件PCAP数据集的统一读取接口。
4
5use log::{debug, info, warn};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14    DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18// 错误消息常量
19const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22/// PCAP数据集读取器
23///
24/// 提供对PCAP数据集的高性能读取功能,支持:
25/// - 自动索引管理和验证
26/// - 顺序读取和文件切换
27/// - 智能缓存和性能优化
28/// - 多文件数据集统一访问
29pub struct PcapReader {
30    /// 数据集目录路径
31    dataset_path: PathBuf,
32    /// 数据集名称
33    dataset_name: String,
34    /// 索引管理器
35    index_manager: IndexManager,
36    /// 配置信息
37    configuration: ReaderConfig,
38    /// 当前文件读取器
39    current_reader: Option<PcapFileReader>,
40    /// 当前文件索引
41    current_file_index: usize,
42    /// 当前读取位置(全局数据包索引)
43    current_position: u64,
44    /// 文件信息缓存
45    file_info_cache: FileInfoCache,
46    /// 总大小缓存
47    total_size_cache: RefCell<Option<u64>>,
48    /// 是否已初始化
49    is_initialized: bool,
50}
51
52impl PcapReader {
53    /// 创建新的PCAP读取器
54    ///
55    /// # 参数
56    /// - `base_path` - 基础路径
57    /// - `dataset_name` - 数据集名称
58    ///
59    /// # 返回
60    /// 返回初始化后的读取器实例
61    pub fn new<P: AsRef<Path>>(
62        base_path: P,
63        dataset_name: &str,
64    ) -> PcapResult<Self> {
65        Self::new_with_config(
66            base_path,
67            dataset_name,
68            ReaderConfig::default(),
69        )
70    }
71
72    /// 创建新的PCAP读取器(带配置)
73    ///
74    /// # 参数
75    /// - `base_path` - 基础路径
76    /// - `dataset_name` - 数据集名称
77    /// - `configuration` - 读取器配置信息
78    ///
79    /// # 返回
80    /// 返回初始化后的读取器实例
81    pub fn new_with_config<P: AsRef<Path>>(
82        base_path: P,
83        dataset_name: &str,
84        configuration: ReaderConfig,
85    ) -> PcapResult<Self> {
86        // 验证配置有效性
87        configuration.validate().map_err(|e| {
88            PcapError::InvalidArgument(format!(
89                "读取器配置无效: {e}"
90            ))
91        })?;
92
93        let dataset_path =
94            base_path.as_ref().join(dataset_name);
95
96        // 验证数据集目录
97        if !dataset_path.exists() {
98            return Err(PcapError::DirectoryNotFound(
99                ERROR_DATASET_NOT_FOUND.to_string(),
100            ));
101        }
102
103        if !dataset_path.is_dir() {
104            return Err(PcapError::InvalidArgument(
105                ERROR_INVALID_DATASET.to_string(),
106            ));
107        }
108
109        // 创建索引管理器
110        let index_manager =
111            IndexManager::new(base_path, dataset_name)?;
112
113        // 获取缓存大小(在移动 configuration 之前)
114        let cache_size = configuration.index_cache_size;
115
116        info!("PcapReader已创建 - 数据集: {dataset_name}");
117
118        Ok(Self {
119            dataset_path,
120            dataset_name: dataset_name.to_string(),
121            index_manager,
122            configuration,
123            current_reader: None,
124            current_file_index: 0,
125            current_position: 0,
126            file_info_cache: FileInfoCache::new(cache_size),
127            total_size_cache: RefCell::new(None),
128            is_initialized: false,
129        })
130    }
131
132    /// 初始化读取器
133    ///
134    /// 确保索引可用并准备好读取操作
135    pub fn initialize(&mut self) -> PcapResult<()> {
136        if self.is_initialized {
137            return Ok(());
138        }
139
140        info!("初始化PcapReader...");
141
142        // 确保索引可用
143        let _index = self.index_manager.ensure_index()?;
144
145        self.is_initialized = true;
146        info!("PcapReader初始化完成");
147        Ok(())
148    }
149
150    /// 获取数据集信息
151    pub fn get_dataset_info(
152        &mut self,
153    ) -> PcapResult<DatasetInfo> {
154        self.initialize()?;
155
156        let index = self
157            .index_manager
158            .get_index()
159            .ok_or_else(|| {
160                PcapError::InvalidState(
161                    "索引未加载".to_string(),
162                )
163            })?;
164
165        use chrono::Utc;
166
167        Ok(DatasetInfo {
168            name: self.dataset_name.clone(),
169            path: self.dataset_path.clone(),
170            file_count: index.data_files.files.len(),
171            total_packets: index.total_packets,
172            total_size: self.get_total_size()?,
173            start_timestamp: if index.start_timestamp > 0 {
174                Some(index.start_timestamp)
175            } else {
176                None
177            },
178            end_timestamp: if index.end_timestamp > 0 {
179                Some(index.end_timestamp)
180            } else {
181                None
182            },
183            created_time: Utc::now().to_rfc3339(),
184            modified_time: Utc::now().to_rfc3339(),
185            has_index: true,
186        })
187    }
188
189    /// 获取文件信息列表
190    pub fn get_file_info_list(
191        &mut self,
192    ) -> PcapResult<Vec<FileInfo>> {
193        self.initialize()?;
194
195        let index = self
196            .index_manager
197            .get_index()
198            .ok_or_else(|| {
199                PcapError::InvalidState(
200                    "索引未加载".to_string(),
201                )
202            })?;
203
204        use chrono::Utc;
205        let current_time = Utc::now().to_rfc3339();
206
207        let mut file_infos = Vec::new();
208        for file_index in &index.data_files.files {
209            let file_path = self
210                .dataset_path
211                .join(&file_index.file_name);
212
213            // 尝试从缓存获取文件信息
214            let file_info = if let Some(cached_info) =
215                self.file_info_cache.get(&file_path)
216            {
217                cached_info
218            } else {
219                // 缓存未命中,创建新的文件信息并缓存
220                let file_info = FileInfo {
221                    file_name: file_index.file_name.clone(),
222                    file_path: file_path.clone(),
223                    file_size: file_index.file_size,
224                    packet_count: file_index.packet_count,
225                    start_timestamp: if file_index
226                        .start_timestamp
227                        > 0
228                    {
229                        Some(file_index.start_timestamp)
230                    } else {
231                        None
232                    },
233                    end_timestamp: if file_index
234                        .end_timestamp
235                        > 0
236                    {
237                        Some(file_index.end_timestamp)
238                    } else {
239                        None
240                    },
241                    file_hash: Some(
242                        file_index.file_hash.clone(),
243                    ),
244                    created_time: current_time.clone(),
245                    modified_time: current_time.clone(),
246                    is_valid: true,
247                };
248
249                // 将文件信息加入缓存
250                self.file_info_cache
251                    .insert(&file_path, file_info.clone());
252                file_info
253            };
254
255            file_infos.push(file_info);
256        }
257
258        Ok(file_infos)
259    }
260
261    /// 获取数据集路径
262    pub fn dataset_path(&self) -> &Path {
263        &self.dataset_path
264    }
265
266    /// 获取数据集名称
267    pub fn dataset_name(&self) -> &str {
268        &self.dataset_name
269    }
270
271    /// 读取下一个数据包(默认方法,带校验结果)
272    ///
273    /// 从当前位置读取下一个数据包,包含校验状态信息。如果当前文件读取完毕,
274    /// 会自动切换到下一个文件。
275    ///
276    /// # 返回
277    /// - `Ok(Some(result))` - 成功读取到数据包和校验结果
278    /// - `Ok(None)` - 到达文件末尾,无更多数据包
279    /// - `Err(error)` - 读取过程中发生错误
280    pub fn read_packet(
281        &mut self,
282    ) -> PcapResult<Option<ValidatedPacket>> {
283        self.initialize()?;
284
285        // 确保当前文件已打开
286        self.ensure_current_file_open()?;
287
288        loop {
289            if let Some(ref mut reader) =
290                self.current_reader
291            {
292                match reader.read_packet() {
293                    Ok(Some(result)) => {
294                        self.current_position += 1;
295                        return Ok(Some(result));
296                    }
297                    Ok(None) => {
298                        // 当前文件读取完毕,尝试切换到下一个文件
299                        if !self.switch_to_next_file()? {
300                            // 没有更多文件
301                            return Ok(None);
302                        }
303                        continue;
304                    }
305                    Err(e) => return Err(e),
306                }
307            } else {
308                // 没有可读取的文件
309                return Ok(None);
310            }
311        }
312    }
313
314    /// 读取下一个数据包(仅返回数据,不返回校验信息)
315    ///
316    /// 从当前位置读取下一个数据包,仅返回数据包本身。如果当前文件读取完毕,
317    /// 会自动切换到下一个文件。
318    ///
319    /// # 返回
320    /// - `Ok(Some(packet))` - 成功读取到数据包
321    /// - `Ok(None)` - 到达文件末尾,无更多数据包
322    /// - `Err(error)` - 读取过程中发生错误
323    pub fn read_packet_data_only(
324        &mut self,
325    ) -> PcapResult<Option<DataPacket>> {
326        match self.read_packet()? {
327            Some(result) => Ok(Some(result.packet)),
328            None => Ok(None),
329        }
330    }
331
332    /// 批量读取多个数据包(默认方法,带校验结果)
333    ///
334    /// # 参数
335    /// - `count` - 要读取的数据包数量
336    ///
337    /// # 返回
338    pub fn read_packets(
339        &mut self,
340        count: usize,
341    ) -> PcapResult<Vec<ValidatedPacket>> {
342        self.initialize()?;
343
344        let mut results = Vec::with_capacity(count);
345
346        // 批量读取指定数量的数据包
347        for _ in 0..count {
348            if let Some(result) = self.read_packet()? {
349                results.push(result);
350            } else {
351                break; // 没有更多数据包
352            }
353        }
354
355        Ok(results)
356    }
357
358    /// 批量读取多个数据包(仅返回数据,不返回校验信息)
359    ///
360    /// # 参数
361    /// - `count` - 要读取的数据包数量
362    ///
363    /// # 返回
364    pub fn read_packets_data_only(
365        &mut self,
366        count: usize,
367    ) -> PcapResult<Vec<DataPacket>> {
368        self.initialize()?;
369
370        let mut packets = Vec::with_capacity(count);
371
372        // 批量读取指定数量的数据包
373        for _ in 0..count {
374            if let Some(packet) =
375                self.read_packet_data_only()?
376            {
377                packets.push(packet);
378            } else {
379                break; // 没有更多数据包
380            }
381        }
382
383        Ok(packets)
384    }
385
386    /// 重置读取器到数据集开始位置
387    ///
388    /// 将读取器重置到数据集的开始位置,后续读取将从第一个数据包开始。
389    pub fn reset(&mut self) -> PcapResult<()> {
390        self.initialize()?;
391
392        // 重置当前读取位置到数据集开始
393        self.current_position = 0;
394        self.current_file_index = 0;
395
396        // 关闭当前文件
397        if let Some(ref mut reader) = self.current_reader {
398            reader.close();
399        }
400        self.current_reader = None;
401
402        // 重新打开第一个文件(如果存在)
403        let index = self
404            .index_manager
405            .get_index()
406            .ok_or_else(|| {
407                PcapError::InvalidState(
408                    "索引未加载".to_string(),
409                )
410            })?;
411
412        if !index.data_files.files.is_empty() {
413            self.open_file(0)?;
414        }
415
416        info!("读取器已重置到数据集开始位置");
417        Ok(())
418    }
419
420    /// 获取索引管理器的引用
421    /// 允许外部通过 reader.index().method() 的方式访问索引功能
422    pub fn index(&self) -> &IndexManager {
423        &self.index_manager
424    }
425
426    /// 获取索引管理器的可变引用
427    /// 允许外部通过 reader.index_mut().method() 的方式访问索引功能
428    pub fn index_mut(&mut self) -> &mut IndexManager {
429        &mut self.index_manager
430    }
431
432    /// 按时间戳查找数据包位置
433    ///
434    /// # 参数
435    /// - `timestamp_ns` - 目标时间戳(纳秒)
436    ///
437    /// # 返回
438    /// 返回最接近指定时间戳的数据包索引条目,如果未找到则返回None
439    pub fn seek_by_timestamp(
440        &mut self,
441        timestamp_ns: u64,
442    ) -> PcapResult<
443        Option<
444            crate::business::index::types::TimestampPointer,
445        >,
446    > {
447        self.initialize()?;
448
449        let index = self
450            .index_manager
451            .get_index()
452            .ok_or_else(|| {
453                PcapError::InvalidState(
454                    "索引未加载".to_string(),
455                )
456            })?;
457
458        // 在时间戳索引中查找最接近的条目
459        let mut closest_entry = None;
460        let mut min_diff = u64::MAX;
461
462        for (ts, pointer) in &index.timestamp_index {
463            let diff = (*ts).abs_diff(timestamp_ns);
464
465            if diff < min_diff {
466                min_diff = diff;
467                closest_entry = Some(pointer.clone());
468            }
469        }
470
471        Ok(closest_entry)
472    }
473
474    /// 按时间范围读取数据包
475    ///
476    /// # 参数
477    /// - `start_timestamp_ns` - 开始时间戳(纳秒)
478    /// - `end_timestamp_ns` - 结束时间戳(纳秒)
479    ///
480    /// # 返回
481    /// 返回指定时间范围内的所有数据包
482    pub fn read_packets_by_time_range(
483        &mut self,
484        start_timestamp_ns: u64,
485        end_timestamp_ns: u64,
486    ) -> PcapResult<Vec<ValidatedPacket>> {
487        self.initialize()?;
488
489        let pointers = {
490            let index = self
491                .index_manager
492                .get_index()
493                .ok_or_else(|| {
494                    PcapError::InvalidState(
495                        "索引未加载".to_string(),
496                    )
497                })?;
498
499            index
500                .get_packets_in_range(
501                    start_timestamp_ns,
502                    end_timestamp_ns,
503                )
504                .into_iter()
505                .cloned()
506                .collect::<Vec<_>>()
507        };
508
509        let mut result_packets = Vec::new();
510        let mut current_file_index = None;
511
512        // 按时间顺序读取数据包
513        for pointer in pointers {
514            // 检查是否需要切换文件
515            if current_file_index
516                != Some(pointer.file_index)
517            {
518                self.open_file(pointer.file_index)?;
519                current_file_index =
520                    Some(pointer.file_index);
521            }
522
523            // 确保文件已打开
524            self.ensure_current_file_open()?;
525
526            // 读取指定位置的数据包
527            let reader = self
528                .current_reader
529                .as_mut()
530                .ok_or_else(|| {
531                    PcapError::InvalidState(
532                        "当前文件读取器未初始化"
533                            .to_string(),
534                    )
535                })?;
536            let packet_result = reader
537                .read_packet_at(pointer.entry.byte_offset);
538
539            match packet_result {
540                Ok(packet) => {
541                    // 验证时间戳是否在范围内
542                    let packet_timestamp =
543                        packet.packet.get_timestamp_ns();
544                    if packet_timestamp
545                        >= start_timestamp_ns
546                        && packet_timestamp
547                            <= end_timestamp_ns
548                    {
549                        result_packets.push(packet);
550                    }
551                }
552                Err(e) => {
553                    warn!("读取数据包失败: {}", e);
554                    // 继续处理其他数据包
555                }
556            }
557        }
558
559        Ok(result_packets)
560    }
561
562    /// 获取缓存统计信息
563    pub fn get_cache_stats(&self) -> CacheStats {
564        self.file_info_cache.get_cache_stats()
565    }
566
567    /// 清理缓存
568    pub fn clear_cache(&mut self) -> PcapResult<()> {
569        let _ = self.file_info_cache.clear();
570        debug!("缓存已清理");
571        Ok(())
572    }
573
574    // =================================================================
575    // 私有方法
576    // =================================================================
577
578    /// 获取数据集总大小
579    fn get_total_size(&self) -> PcapResult<u64> {
580        if let Some(cached_size) =
581            *self.total_size_cache.borrow()
582        {
583            return Ok(cached_size);
584        }
585
586        let index = self
587            .index_manager
588            .get_index()
589            .ok_or_else(|| {
590                PcapError::InvalidState(
591                    "索引未加载".to_string(),
592                )
593            })?;
594
595        let total_size: u64 = index
596            .data_files
597            .files
598            .iter()
599            .map(|f| f.file_size)
600            .sum();
601
602        *self.total_size_cache.borrow_mut() =
603            Some(total_size);
604        Ok(total_size)
605    }
606
607    /// 打开指定索引的文件
608    fn open_file(
609        &mut self,
610        file_index: usize,
611    ) -> PcapResult<()> {
612        let index = self
613            .index_manager
614            .get_index()
615            .ok_or_else(|| {
616                PcapError::InvalidState(
617                    "索引未加载".to_string(),
618                )
619            })?;
620
621        if file_index >= index.data_files.files.len() {
622            return Err(PcapError::InvalidArgument(
623                format!("文件索引超出范围: {file_index}"),
624            ));
625        }
626
627        // 关闭当前文件
628        if let Some(ref mut reader) = self.current_reader {
629            reader.close();
630        }
631
632        // 打开新文件
633        let file_info = &index.data_files.files[file_index];
634        let file_path =
635            self.dataset_path.join(&file_info.file_name);
636
637        let mut reader =
638            crate::data::file_reader::PcapFileReader::new(
639                self.configuration.clone(),
640            );
641        reader.open(&file_path)?;
642
643        self.current_reader = Some(reader);
644        self.current_file_index = file_index;
645
646        debug!("已打开文件: {file_path:?}");
647        Ok(())
648    }
649
650    /// 切换到下一个文件
651    fn switch_to_next_file(&mut self) -> PcapResult<bool> {
652        let index = self
653            .index_manager
654            .get_index()
655            .ok_or_else(|| {
656                PcapError::InvalidState(
657                    "索引未加载".to_string(),
658                )
659            })?;
660
661        if self.current_file_index + 1
662            >= index.data_files.files.len()
663        {
664            // 没有更多文件
665            return Ok(false);
666        }
667
668        self.open_file(self.current_file_index + 1)?;
669        Ok(true)
670    }
671
672    /// 确保当前文件已打开
673    fn ensure_current_file_open(
674        &mut self,
675    ) -> PcapResult<()> {
676        if self.current_reader.is_none() {
677            let index = self
678                .index_manager
679                .get_index()
680                .ok_or_else(|| {
681                    PcapError::InvalidState(
682                        "索引未加载".to_string(),
683                    )
684                })?;
685
686            if !index.data_files.files.is_empty() {
687                self.open_file(0)?;
688            }
689        }
690        Ok(())
691    }
692
693    /// 根据时间戳读取数据包
694    pub fn read_packet_by_timestamp(
695        &mut self,
696        timestamp_ns: u64,
697    ) -> PcapResult<Option<ValidatedPacket>> {
698        let pointer = {
699            let index = self
700                .index_manager
701                .get_index()
702                .ok_or_else(|| {
703                    PcapError::InvalidState(
704                        "索引未加载".to_string(),
705                    )
706                })?;
707
708            match index
709                .find_packet_by_timestamp(timestamp_ns)
710            {
711                Some(ptr) => ptr.clone(),
712                None => return Ok(None),
713            }
714        };
715
716        // 检查是否需要切换文件
717        if pointer.file_index != self.current_file_index {
718            self.open_file(pointer.file_index)?;
719        }
720
721        // 确保文件已打开
722        self.ensure_current_file_open()?;
723
724        // 读取指定位置的数据包
725        let reader = self
726            .current_reader
727            .as_mut()
728            .ok_or_else(|| {
729                PcapError::InvalidState(
730                    "当前文件读取器未初始化".to_string(),
731                )
732            })?;
733        let packet_result = reader
734            .read_packet_at(pointer.entry.byte_offset);
735
736        match packet_result {
737            Ok(packet) => {
738                // 验证时间戳是否匹配
739                if packet.packet.get_timestamp_ns()
740                    == timestamp_ns
741                {
742                    Ok(Some(packet))
743                } else {
744                    Err(PcapError::InvalidState(
745                        "读取的数据包时间戳不匹配"
746                            .to_string(),
747                    ))
748                }
749            }
750            Err(e) => Err(e),
751        }
752    }
753}
754
755impl Drop for PcapReader {
756    fn drop(&mut self) {
757        // 关闭当前文件读取器
758        if let Some(ref mut reader) = self.current_reader {
759            reader.close();
760        }
761        debug!("PcapReader已清理");
762    }
763}