pcapfile_io/api/
reader.rs

1//! 数据集读取器模块
2//!
3//! 提供高级的数据集读取功能,支持多文件PCAP数据集的统一读取接口。
4
5use log::{debug, info, warn};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14    DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18// 错误消息常量
19const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22/// PCAP数据集读取器
23///
24/// 提供对PCAP数据集的高性能读取功能,支持:
25/// - 自动索引管理和验证
26/// - 顺序读取和文件切换
27/// - 智能缓存和性能优化
28/// - 多文件数据集统一访问
29pub struct PcapReader {
30    /// 数据集目录路径
31    dataset_path: PathBuf,
32    /// 数据集名称
33    dataset_name: String,
34    /// 索引管理器
35    index_manager: IndexManager,
36    /// 配置信息
37    configuration: ReaderConfig,
38    /// 当前文件读取器
39    current_reader: Option<PcapFileReader>,
40    /// 当前文件索引
41    current_file_index: usize,
42    /// 当前读取位置(全局数据包索引)
43    current_position: u64,
44    /// 文件信息缓存
45    file_info_cache: FileInfoCache,
46    /// 总大小缓存
47    total_size_cache: RefCell<Option<u64>>,
48    /// 是否已初始化
49    is_initialized: bool,
50}
51
52impl PcapReader {
53    /// 创建新的PCAP读取器
54    ///
55    /// # 参数
56    /// - `base_path` - 基础路径
57    /// - `dataset_name` - 数据集名称
58    ///
59    /// # 返回
60    /// 返回初始化后的读取器实例
61    pub fn new<P: AsRef<Path>>(
62        base_path: P,
63        dataset_name: &str,
64    ) -> PcapResult<Self> {
65        Self::new_with_config(
66            base_path,
67            dataset_name,
68            ReaderConfig::default(),
69        )
70    }
71
72    /// 创建新的PCAP读取器(带配置)
73    ///
74    /// # 参数
75    /// - `base_path` - 基础路径
76    /// - `dataset_name` - 数据集名称
77    /// - `configuration` - 读取器配置信息
78    ///
79    /// # 返回
80    /// 返回初始化后的读取器实例
81    pub fn new_with_config<P: AsRef<Path>>(
82        base_path: P,
83        dataset_name: &str,
84        configuration: ReaderConfig,
85    ) -> PcapResult<Self> {
86        // 验证配置有效性
87        configuration.validate().map_err(|e| {
88            PcapError::InvalidArgument(format!(
89                "读取器配置无效: {e}"
90            ))
91        })?;
92
93        let dataset_path =
94            base_path.as_ref().join(dataset_name);
95
96        // 验证数据集目录
97        if !dataset_path.exists() {
98            return Err(PcapError::DirectoryNotFound(
99                ERROR_DATASET_NOT_FOUND.to_string(),
100            ));
101        }
102
103        if !dataset_path.is_dir() {
104            return Err(PcapError::InvalidArgument(
105                ERROR_INVALID_DATASET.to_string(),
106            ));
107        }
108
109        // 创建索引管理器
110        let index_manager =
111            IndexManager::new(base_path, dataset_name)?;
112
113        // 获取缓存大小(在移动 configuration 之前)
114        let cache_size = configuration.index_cache_size;
115
116        info!("PcapReader已创建 - 数据集: {dataset_name}");
117
118        Ok(Self {
119            dataset_path,
120            dataset_name: dataset_name.to_string(),
121            index_manager,
122            configuration,
123            current_reader: None,
124            current_file_index: 0,
125            current_position: 0,
126            file_info_cache: FileInfoCache::new(cache_size),
127            total_size_cache: RefCell::new(None),
128            is_initialized: false,
129        })
130    }
131
132    /// 初始化读取器
133    ///
134    /// 确保索引可用并准备好读取操作
135    pub fn initialize(&mut self) -> PcapResult<()> {
136        if self.is_initialized {
137            return Ok(());
138        }
139
140        info!("初始化PcapReader...");
141
142        // 确保索引可用
143        let _index = self.index_manager.ensure_index()?;
144
145        self.is_initialized = true;
146        info!("PcapReader初始化完成");
147        Ok(())
148    }
149
150    /// 获取数据集信息
151    pub fn get_dataset_info(
152        &mut self,
153    ) -> PcapResult<DatasetInfo> {
154        self.initialize()?;
155
156        let index = self
157            .index_manager
158            .get_index()
159            .ok_or_else(|| {
160                PcapError::InvalidState(
161                    "索引未加载".to_string(),
162                )
163            })?;
164
165        use chrono::Utc;
166
167        Ok(DatasetInfo {
168            name: self.dataset_name.clone(),
169            path: self.dataset_path.clone(),
170            file_count: index.data_files.files.len(),
171            total_packets: index.total_packets,
172            total_size: self.get_total_size()?,
173            start_timestamp: if index.start_timestamp > 0 {
174                Some(index.start_timestamp)
175            } else {
176                None
177            },
178            end_timestamp: if index.end_timestamp > 0 {
179                Some(index.end_timestamp)
180            } else {
181                None
182            },
183            created_time: Utc::now().to_rfc3339(),
184            modified_time: Utc::now().to_rfc3339(),
185            has_index: true,
186        })
187    }
188
189    /// 获取文件信息列表
190    pub fn get_file_info_list(
191        &mut self,
192    ) -> PcapResult<Vec<FileInfo>> {
193        self.initialize()?;
194
195        let index = self
196            .index_manager
197            .get_index()
198            .ok_or_else(|| {
199                PcapError::InvalidState(
200                    "索引未加载".to_string(),
201                )
202            })?;
203
204        use chrono::Utc;
205        let current_time = Utc::now().to_rfc3339();
206
207        let mut file_infos = Vec::new();
208        for file_index in &index.data_files.files {
209            let file_path = self
210                .dataset_path
211                .join(&file_index.file_name);
212
213            // 尝试从缓存获取文件信息
214            let file_info = if let Some(cached_info) =
215                self.file_info_cache.get(&file_path)
216            {
217                cached_info
218            } else {
219                // 缓存未命中,创建新的文件信息并缓存
220                let file_info = FileInfo {
221                    file_name: file_index.file_name.clone(),
222                    file_path: file_path.clone(),
223                    file_size: file_index.file_size,
224                    packet_count: file_index.packet_count,
225                    start_timestamp: if file_index
226                        .start_timestamp
227                        > 0
228                    {
229                        Some(file_index.start_timestamp)
230                    } else {
231                        None
232                    },
233                    end_timestamp: if file_index
234                        .end_timestamp
235                        > 0
236                    {
237                        Some(file_index.end_timestamp)
238                    } else {
239                        None
240                    },
241                    created_time: current_time.clone(),
242                    modified_time: current_time.clone(),
243                    is_valid: true,
244                };
245
246                // 将文件信息加入缓存
247                self.file_info_cache
248                    .insert(&file_path, file_info.clone());
249                file_info
250            };
251
252            file_infos.push(file_info);
253        }
254
255        Ok(file_infos)
256    }
257
258    /// 获取数据集路径
259    pub fn dataset_path(&self) -> &Path {
260        &self.dataset_path
261    }
262
263    /// 获取数据集名称
264    pub fn dataset_name(&self) -> &str {
265        &self.dataset_name
266    }
267
268    /// 读取下一个数据包(默认方法,带校验结果)
269    ///
270    /// 从当前位置读取下一个数据包,包含校验状态信息。如果当前文件读取完毕,
271    /// 会自动切换到下一个文件。
272    ///
273    /// # 返回
274    /// - `Ok(Some(result))` - 成功读取到数据包和校验结果
275    /// - `Ok(None)` - 到达文件末尾,无更多数据包
276    /// - `Err(error)` - 读取过程中发生错误
277    pub fn read_packet(
278        &mut self,
279    ) -> PcapResult<Option<ValidatedPacket>> {
280        self.initialize()?;
281
282        // 确保当前文件已打开
283        self.ensure_current_file_open()?;
284
285        loop {
286            if let Some(ref mut reader) =
287                self.current_reader
288            {
289                match reader.read_packet() {
290                    Ok(Some(result)) => {
291                        self.current_position += 1;
292                        return Ok(Some(result));
293                    }
294                    Ok(None) => {
295                        // 当前文件读取完毕,尝试切换到下一个文件
296                        if !self.switch_to_next_file()? {
297                            // 没有更多文件
298                            return Ok(None);
299                        }
300                        continue;
301                    }
302                    Err(e) => return Err(e),
303                }
304            } else {
305                // 没有可读取的文件
306                return Ok(None);
307            }
308        }
309    }
310
311    /// 读取下一个数据包(仅返回数据,不返回校验信息)
312    ///
313    /// 从当前位置读取下一个数据包,仅返回数据包本身。如果当前文件读取完毕,
314    /// 会自动切换到下一个文件。
315    ///
316    /// # 返回
317    /// - `Ok(Some(packet))` - 成功读取到数据包
318    /// - `Ok(None)` - 到达文件末尾,无更多数据包
319    /// - `Err(error)` - 读取过程中发生错误
320    pub fn read_packet_data_only(
321        &mut self,
322    ) -> PcapResult<Option<DataPacket>> {
323        match self.read_packet()? {
324            Some(result) => Ok(Some(result.packet)),
325            None => Ok(None),
326        }
327    }
328
329    /// 批量读取多个数据包(默认方法,带校验结果)
330    ///
331    /// # 参数
332    /// - `count` - 要读取的数据包数量
333    ///
334    /// # 返回
335    pub fn read_packets(
336        &mut self,
337        count: usize,
338    ) -> PcapResult<Vec<ValidatedPacket>> {
339        self.initialize()?;
340
341        let mut results = Vec::with_capacity(count);
342
343        // 批量读取指定数量的数据包
344        for _ in 0..count {
345            if let Some(result) = self.read_packet()? {
346                results.push(result);
347            } else {
348                break; // 没有更多数据包
349            }
350        }
351
352        Ok(results)
353    }
354
355    /// 批量读取多个数据包(仅返回数据,不返回校验信息)
356    ///
357    /// # 参数
358    /// - `count` - 要读取的数据包数量
359    ///
360    /// # 返回
361    pub fn read_packets_data_only(
362        &mut self,
363        count: usize,
364    ) -> PcapResult<Vec<DataPacket>> {
365        self.initialize()?;
366
367        let mut packets = Vec::with_capacity(count);
368
369        // 批量读取指定数量的数据包
370        for _ in 0..count {
371            if let Some(packet) =
372                self.read_packet_data_only()?
373            {
374                packets.push(packet);
375            } else {
376                break; // 没有更多数据包
377            }
378        }
379
380        Ok(packets)
381    }
382
383    /// 重置读取器到数据集开始位置
384    ///
385    /// 将读取器重置到数据集的开始位置,后续读取将从第一个数据包开始。
386    pub fn reset(&mut self) -> PcapResult<()> {
387        self.initialize()?;
388
389        // 重置当前读取位置到数据集开始
390        self.current_position = 0;
391        self.current_file_index = 0;
392
393        // 关闭当前文件
394        if let Some(ref mut reader) = self.current_reader {
395            reader.close();
396        }
397        self.current_reader = None;
398
399        // 重新打开第一个文件(如果存在)
400        let index = self
401            .index_manager
402            .get_index()
403            .ok_or_else(|| {
404                PcapError::InvalidState(
405                    "索引未加载".to_string(),
406                )
407            })?;
408
409        if !index.data_files.files.is_empty() {
410            self.open_file(0)?;
411        }
412
413        info!("读取器已重置到数据集开始位置");
414        Ok(())
415    }
416
417    /// 获取索引管理器的引用
418    /// 允许外部通过 reader.index().method() 的方式访问索引功能
419    pub fn index(&self) -> &IndexManager {
420        &self.index_manager
421    }
422
423    /// 获取索引管理器的可变引用
424    /// 允许外部通过 reader.index_mut().method() 的方式访问索引功能
425    pub fn index_mut(&mut self) -> &mut IndexManager {
426        &mut self.index_manager
427    }
428
429    /// 按时间戳查找数据包位置
430    ///
431    /// # 参数
432    /// - `timestamp_ns` - 目标时间戳(纳秒)
433    ///
434    /// # 返回
435    /// 返回最接近指定时间戳的数据包索引条目,如果未找到则返回None
436    pub fn seek_by_timestamp(
437        &mut self,
438        timestamp_ns: u64,
439    ) -> PcapResult<
440        Option<
441            crate::business::index::types::TimestampPointer,
442        >,
443    > {
444        self.initialize()?;
445
446        let index = self
447            .index_manager
448            .get_index()
449            .ok_or_else(|| {
450                PcapError::InvalidState(
451                    "索引未加载".to_string(),
452                )
453            })?;
454
455        // 在时间戳索引中查找最接近的条目
456        let mut closest_entry = None;
457        let mut min_diff = u64::MAX;
458
459        for (ts, pointer) in &index.timestamp_index {
460            let diff = (*ts).abs_diff(timestamp_ns);
461
462            if diff < min_diff {
463                min_diff = diff;
464                closest_entry = Some(pointer.clone());
465            }
466        }
467
468        Ok(closest_entry)
469    }
470
471    /// 按时间范围读取数据包
472    ///
473    /// # 参数
474    /// - `start_timestamp_ns` - 开始时间戳(纳秒)
475    /// - `end_timestamp_ns` - 结束时间戳(纳秒)
476    ///
477    /// # 返回
478    /// 返回指定时间范围内的所有数据包
479    pub fn read_packets_by_time_range(
480        &mut self,
481        start_timestamp_ns: u64,
482        end_timestamp_ns: u64,
483    ) -> PcapResult<Vec<ValidatedPacket>> {
484        self.initialize()?;
485
486        let pointers = {
487            let index = self
488                .index_manager
489                .get_index()
490                .ok_or_else(|| {
491                    PcapError::InvalidState(
492                        "索引未加载".to_string(),
493                    )
494                })?;
495
496            index
497                .get_packets_in_range(
498                    start_timestamp_ns,
499                    end_timestamp_ns,
500                )
501                .into_iter()
502                .cloned()
503                .collect::<Vec<_>>()
504        };
505
506        let mut result_packets = Vec::new();
507        let mut current_file_index = None;
508
509        // 按时间顺序读取数据包
510        for pointer in pointers {
511            // 检查是否需要切换文件
512            if current_file_index
513                != Some(pointer.file_index)
514            {
515                self.open_file(pointer.file_index)?;
516                current_file_index =
517                    Some(pointer.file_index);
518            }
519
520            // 确保文件已打开
521            self.ensure_current_file_open()?;
522
523            // 读取指定位置的数据包
524            let reader = self
525                .current_reader
526                .as_mut()
527                .ok_or_else(|| {
528                    PcapError::InvalidState(
529                        "当前文件读取器未初始化"
530                            .to_string(),
531                    )
532                })?;
533            let packet_result = reader
534                .read_packet_at(pointer.entry.byte_offset);
535
536            match packet_result {
537                Ok(packet) => {
538                    // 验证时间戳是否在范围内
539                    let packet_timestamp =
540                        packet.packet.get_timestamp_ns();
541                    if packet_timestamp
542                        >= start_timestamp_ns
543                        && packet_timestamp
544                            <= end_timestamp_ns
545                    {
546                        result_packets.push(packet);
547                    }
548                }
549                Err(e) => {
550                    warn!("读取数据包失败: {}", e);
551                    // 继续处理其他数据包
552                }
553            }
554        }
555
556        Ok(result_packets)
557    }
558
559    /// 获取缓存统计信息
560    pub fn get_cache_stats(&self) -> CacheStats {
561        self.file_info_cache.get_cache_stats()
562    }
563
564    /// 清理缓存
565    pub fn clear_cache(&mut self) -> PcapResult<()> {
566        let _ = self.file_info_cache.clear();
567        debug!("缓存已清理");
568        Ok(())
569    }
570
571    /// 跳转到指定时间戳(纳秒)
572    ///
573    /// 返回实际定位到的时间戳。如果精确匹配不存在,返回时间戳后面最接近的数据包。
574    ///
575    /// # 参数
576    /// - `timestamp_ns` - 目标时间戳(纳秒)
577    ///
578    /// # 返回
579    /// - `Ok(actual_timestamp)` - 成功跳转,返回实际定位到的时间戳
580    /// - `Err(error)` - 未找到数据包或发生错误
581    pub fn seek_to_timestamp(
582        &mut self,
583        timestamp_ns: u64,
584    ) -> PcapResult<u64> {
585        self.initialize()?;
586
587        // 1. 先提取所需信息,避免借用冲突
588        let (
589            actual_ts,
590            file_index,
591            byte_offset,
592            packet_offset,
593        ) = {
594            let index = self
595                .index_manager
596                .get_index()
597                .ok_or_else(|| {
598                    PcapError::InvalidState(
599                        "索引未加载".to_string(),
600                    )
601                })?;
602
603            // 尝试精确匹配
604            let (actual_ts, pointer) = if let Some(ptr) =
605                index.find_packet_by_timestamp(timestamp_ns)
606            {
607                (timestamp_ns, ptr.clone())
608            } else {
609                // 查找 >= target 的最小时间戳
610                Self::find_timestamp_ge(&index.timestamp_index, timestamp_ns)
611                    .ok_or_else(|| PcapError::InvalidArgument(
612                        format!("未找到时间戳 >= {timestamp_ns} 的数据包")
613                    ))?
614            };
615
616            // 计算文件内的序号
617            let file_index_data =
618                &index.data_files.files[pointer.file_index];
619            let packet_offset = file_index_data
620                .data_packets
621                .iter()
622                .position(|p| {
623                    p.timestamp_ns
624                        == pointer.entry.timestamp_ns
625                })
626                .unwrap_or(0);
627
628            (
629                actual_ts,
630                pointer.file_index,
631                pointer.entry.byte_offset,
632                packet_offset,
633            )
634        };
635
636        // 2. 打开对应文件
637        self.open_file(file_index)?;
638
639        // 3. seek 到字节偏移
640        if let Some(reader) = self.current_reader.as_mut() {
641            reader.seek_to(byte_offset)?;
642        } else {
643            return Err(PcapError::InvalidState(
644                "文件未打开".to_string(),
645            ));
646        }
647
648        // 4. 更新状态
649        self.current_file_index = file_index;
650
651        // 计算全局位置
652        let index = self
653            .index_manager
654            .get_index()
655            .ok_or_else(|| {
656                PcapError::InvalidState(
657                    "索引未加载".to_string(),
658                )
659            })?;
660        self.current_position = self
661            .calculate_global_position(
662                index,
663                file_index,
664                packet_offset,
665            );
666
667        info!("已跳转到时间戳: {timestamp_ns}ns (实际: {actual_ts}ns), 全局位置: {}", 
668            self.current_position);
669
670        Ok(actual_ts)
671    }
672
673    /// 跳转到指定索引的数据包(从0开始)
674    ///
675    /// # 参数
676    /// - `packet_index` - 目标数据包的全局索引(从0开始)
677    pub fn seek_to_packet(
678        &mut self,
679        packet_index: usize,
680    ) -> PcapResult<()> {
681        self.initialize()?;
682
683        // 1. 先提取所需信息,避免借用冲突
684        let (target_file_idx, byte_offset, packet_offset) = {
685            let index = self
686                .index_manager
687                .get_index()
688                .ok_or_else(|| {
689                    PcapError::InvalidState(
690                        "索引未加载".to_string(),
691                    )
692                })?;
693
694            // 检查索引范围
695            if packet_index >= index.total_packets as usize
696            {
697                return Err(PcapError::InvalidArgument(
698                    format!("数据包索引 {packet_index} 超出范围 (总数: {})", index.total_packets)
699                ));
700            }
701
702            // 遍历文件,找到目标文件和文件内偏移
703            let mut accumulated = 0usize;
704            let mut target_file_idx = 0;
705            let mut packet_offset = 0;
706
707            for (file_idx, file) in
708                index.data_files.files.iter().enumerate()
709            {
710                let next_accumulated = accumulated
711                    + file.packet_count as usize;
712                if packet_index < next_accumulated {
713                    target_file_idx = file_idx;
714                    packet_offset =
715                        packet_index - accumulated;
716                    break;
717                }
718                accumulated = next_accumulated;
719            }
720
721            // 获取数据包条目
722            let file =
723                &index.data_files.files[target_file_idx];
724            let packet_entry =
725                &file.data_packets[packet_offset];
726            let byte_offset = packet_entry.byte_offset;
727
728            (target_file_idx, byte_offset, packet_offset)
729        };
730
731        // 2. 打开文件并 seek
732        self.open_file(target_file_idx)?;
733        if let Some(reader) = self.current_reader.as_mut() {
734            reader.seek_to(byte_offset)?;
735        } else {
736            return Err(PcapError::InvalidState(
737                "文件未打开".to_string(),
738            ));
739        }
740
741        // 3. 更新状态
742        self.current_file_index = target_file_idx;
743        self.current_position = packet_index as u64;
744
745        info!("已跳转到数据包索引: {packet_index}, 文件: {target_file_idx}, 文件内偏移: {packet_offset}");
746
747        Ok(())
748    }
749
750    /// 检查是否已到达文件末尾
751    pub fn is_eof(&self) -> bool {
752        if let Some(index) = self.index_manager.get_index()
753        {
754            self.current_position >= index.total_packets
755        } else {
756            // 未初始化时,检查是否有可读取的文件
757            self.current_reader.is_none()
758        }
759    }
760
761    /// 获取总数据包数量(如果索引可用)
762    pub fn total_packets(&self) -> Option<usize> {
763        self.index_manager
764            .get_index()
765            .map(|idx| idx.total_packets as usize)
766    }
767
768    /// 获取当前数据包索引位置(全局序号,从0开始)
769    pub fn current_packet_index(&self) -> u64 {
770        self.current_position
771    }
772
773    /// 获取当前读取进度(百分比:0.0 - 1.0)
774    pub fn progress(&self) -> Option<f64> {
775        self.total_packets().map(|total| {
776            if total == 0 {
777                1.0
778            } else {
779                (self.current_position as f64
780                    / total as f64)
781                    .min(1.0)
782            }
783        })
784    }
785
786    /// 跳过指定数量的数据包
787    ///
788    /// # 参数
789    /// - `count` - 要跳过的数据包数量
790    ///
791    /// # 返回
792    /// 实际跳过的数据包数量(可能小于请求数量,如果到达末尾)
793    pub fn skip_packets(
794        &mut self,
795        count: usize,
796    ) -> PcapResult<usize> {
797        let current_idx = self.current_position as usize;
798        let target_idx = current_idx + count;
799
800        let total = self.total_packets().unwrap_or(0);
801        // 限制目标索引到有效范围(0 到 total-1)
802        let actual_target = if total == 0 {
803            0
804        } else {
805            target_idx.min(total - 1)
806        };
807        let actual_skipped =
808            actual_target.saturating_sub(current_idx);
809
810        if actual_skipped > 0 {
811            self.seek_to_packet(actual_target)?;
812        }
813
814        Ok(actual_skipped)
815    }
816
817    // =================================================================
818    // 私有方法
819    // =================================================================
820
821    /// 计算指定文件索引和文件内数据包偏移对应的全局数据包位置
822    fn calculate_global_position(
823        &self,
824        index: &crate::business::index::types::PidxIndex,
825        file_index: usize,
826        packet_offset_in_file: usize,
827    ) -> u64 {
828        let mut position = 0u64;
829        for (idx, file) in
830            index.data_files.files.iter().enumerate()
831        {
832            if idx < file_index {
833                position += file.packet_count;
834            } else if idx == file_index {
835                position += packet_offset_in_file as u64;
836                break;
837            }
838        }
839        position
840    }
841
842    /// 查找大于等于指定时间戳的最接近时间戳及其指针
843    fn find_timestamp_ge(
844        timestamp_index: &std::collections::HashMap<
845            u64,
846            crate::business::index::types::TimestampPointer,
847        >,
848        target_ns: u64,
849    ) -> Option<(
850        u64,
851        crate::business::index::types::TimestampPointer,
852    )> {
853        let mut candidates: Vec<u64> = timestamp_index
854            .keys()
855            .filter(|&&ts| ts >= target_ns)
856            .copied()
857            .collect();
858
859        if candidates.is_empty() {
860            return None;
861        }
862
863        candidates.sort_unstable();
864        let closest_ts = candidates[0];
865        timestamp_index
866            .get(&closest_ts)
867            .map(|ptr| (closest_ts, ptr.clone()))
868    }
869
870    /// 获取数据集总大小
871    fn get_total_size(&self) -> PcapResult<u64> {
872        if let Some(cached_size) =
873            *self.total_size_cache.borrow()
874        {
875            return Ok(cached_size);
876        }
877
878        let index = self
879            .index_manager
880            .get_index()
881            .ok_or_else(|| {
882                PcapError::InvalidState(
883                    "索引未加载".to_string(),
884                )
885            })?;
886
887        let total_size: u64 = index
888            .data_files
889            .files
890            .iter()
891            .map(|f| f.file_size)
892            .sum();
893
894        *self.total_size_cache.borrow_mut() =
895            Some(total_size);
896        Ok(total_size)
897    }
898
899    /// 打开指定索引的文件
900    fn open_file(
901        &mut self,
902        file_index: usize,
903    ) -> PcapResult<()> {
904        let index = self
905            .index_manager
906            .get_index()
907            .ok_or_else(|| {
908                PcapError::InvalidState(
909                    "索引未加载".to_string(),
910                )
911            })?;
912
913        if file_index >= index.data_files.files.len() {
914            return Err(PcapError::InvalidArgument(
915                format!("文件索引超出范围: {file_index}"),
916            ));
917        }
918
919        // 关闭当前文件
920        if let Some(ref mut reader) = self.current_reader {
921            reader.close();
922        }
923
924        // 打开新文件
925        let file_info = &index.data_files.files[file_index];
926        let file_path =
927            self.dataset_path.join(&file_info.file_name);
928
929        let mut reader =
930            crate::data::file_reader::PcapFileReader::new(
931                self.configuration.clone(),
932            );
933        reader.open(&file_path)?;
934
935        self.current_reader = Some(reader);
936        self.current_file_index = file_index;
937
938        debug!("已打开文件: {file_path:?}");
939        Ok(())
940    }
941
942    /// 切换到下一个文件
943    fn switch_to_next_file(&mut self) -> PcapResult<bool> {
944        let index = self
945            .index_manager
946            .get_index()
947            .ok_or_else(|| {
948                PcapError::InvalidState(
949                    "索引未加载".to_string(),
950                )
951            })?;
952
953        if self.current_file_index + 1
954            >= index.data_files.files.len()
955        {
956            // 没有更多文件
957            return Ok(false);
958        }
959
960        self.open_file(self.current_file_index + 1)?;
961        Ok(true)
962    }
963
964    /// 确保当前文件已打开
965    fn ensure_current_file_open(
966        &mut self,
967    ) -> PcapResult<()> {
968        if self.current_reader.is_none() {
969            let index = self
970                .index_manager
971                .get_index()
972                .ok_or_else(|| {
973                    PcapError::InvalidState(
974                        "索引未加载".to_string(),
975                    )
976                })?;
977
978            if !index.data_files.files.is_empty() {
979                self.open_file(0)?;
980            }
981        }
982        Ok(())
983    }
984
985    /// 根据时间戳读取数据包
986    pub fn read_packet_by_timestamp(
987        &mut self,
988        timestamp_ns: u64,
989    ) -> PcapResult<Option<ValidatedPacket>> {
990        let pointer = {
991            let index = self
992                .index_manager
993                .get_index()
994                .ok_or_else(|| {
995                    PcapError::InvalidState(
996                        "索引未加载".to_string(),
997                    )
998                })?;
999
1000            match index
1001                .find_packet_by_timestamp(timestamp_ns)
1002            {
1003                Some(ptr) => ptr.clone(),
1004                None => return Ok(None),
1005            }
1006        };
1007
1008        // 检查是否需要切换文件
1009        if pointer.file_index != self.current_file_index {
1010            self.open_file(pointer.file_index)?;
1011        }
1012
1013        // 确保文件已打开
1014        self.ensure_current_file_open()?;
1015
1016        // 读取指定位置的数据包
1017        let reader = self
1018            .current_reader
1019            .as_mut()
1020            .ok_or_else(|| {
1021                PcapError::InvalidState(
1022                    "当前文件读取器未初始化".to_string(),
1023                )
1024            })?;
1025        let packet_result = reader
1026            .read_packet_at(pointer.entry.byte_offset);
1027
1028        match packet_result {
1029            Ok(packet) => {
1030                // 验证时间戳是否匹配
1031                if packet.packet.get_timestamp_ns()
1032                    == timestamp_ns
1033                {
1034                    Ok(Some(packet))
1035                } else {
1036                    Err(PcapError::InvalidState(
1037                        "读取的数据包时间戳不匹配"
1038                            .to_string(),
1039                    ))
1040                }
1041            }
1042            Err(e) => Err(e),
1043        }
1044    }
1045}
1046
1047impl Drop for PcapReader {
1048    fn drop(&mut self) {
1049        // 关闭当前文件读取器
1050        if let Some(ref mut reader) = self.current_reader {
1051            reader.close();
1052        }
1053        debug!("PcapReader已清理");
1054    }
1055}