pcapfile_io/api/
reader.rs

1//! 数据集读取器模块
2//!
3//! 提供高级的数据集读取功能,支持多文件PCAP数据集的统一读取接口。
4
5use log::{debug, info, warn};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14    DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18// 错误消息常量
19const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22/// PCAP数据集读取器
23///
24/// 提供对PCAP数据集的高性能读取功能,支持:
25/// - 自动索引管理和验证
26/// - 顺序读取和文件切换
27/// - 智能缓存和性能优化
28/// - 多文件数据集统一访问
29pub struct PcapReader {
30    /// 数据集目录路径
31    dataset_path: PathBuf,
32    /// 数据集名称
33    dataset_name: String,
34    /// 索引管理器
35    index_manager: IndexManager,
36    /// 配置信息
37    configuration: ReaderConfig,
38    /// 当前文件读取器
39    current_reader: Option<PcapFileReader>,
40    /// 当前文件索引
41    current_file_index: usize,
42    /// 当前读取位置(全局数据包索引)
43    current_position: u64,
44    /// 文件信息缓存
45    file_info_cache: FileInfoCache,
46    /// 总大小缓存
47    total_size_cache: RefCell<Option<u64>>,
48    /// 是否已初始化
49    is_initialized: bool,
50}
51
52impl PcapReader {
53    /// 创建新的PCAP读取器
54    ///
55    /// # 参数
56    /// - `base_path` - 基础路径
57    /// - `dataset_name` - 数据集名称
58    ///
59    /// # 返回
60    /// 返回初始化后的读取器实例
61    pub fn new<P: AsRef<Path>>(
62        base_path: P,
63        dataset_name: &str,
64    ) -> PcapResult<Self> {
65        Self::new_with_config(
66            base_path,
67            dataset_name,
68            ReaderConfig::default(),
69        )
70    }
71
72    /// 创建新的PCAP读取器(带配置)
73    ///
74    /// # 参数
75    /// - `base_path` - 基础路径
76    /// - `dataset_name` - 数据集名称
77    /// - `configuration` - 读取器配置信息
78    ///
79    /// # 返回
80    /// 返回初始化后的读取器实例
81    pub fn new_with_config<P: AsRef<Path>>(
82        base_path: P,
83        dataset_name: &str,
84        configuration: ReaderConfig,
85    ) -> PcapResult<Self> {
86        // 验证配置有效性
87        configuration.validate().map_err(|e| {
88            PcapError::InvalidArgument(format!(
89                "读取器配置无效: {e}"
90            ))
91        })?;
92
93        let dataset_path =
94            base_path.as_ref().join(dataset_name);
95
96        // 验证数据集目录
97        if !dataset_path.exists() {
98            return Err(PcapError::DirectoryNotFound(
99                ERROR_DATASET_NOT_FOUND.to_string(),
100            ));
101        }
102
103        if !dataset_path.is_dir() {
104            return Err(PcapError::InvalidArgument(
105                ERROR_INVALID_DATASET.to_string(),
106            ));
107        }
108
109        // 创建索引管理器
110        let index_manager =
111            IndexManager::new(base_path, dataset_name)?;
112
113        // 获取缓存大小(在移动 configuration 之前)
114        let cache_size = configuration.index_cache_size;
115
116        info!("PcapReader已创建 - 数据集: {dataset_name}");
117
118        Ok(Self {
119            dataset_path,
120            dataset_name: dataset_name.to_string(),
121            index_manager,
122            configuration,
123            current_reader: None,
124            current_file_index: 0,
125            current_position: 0,
126            file_info_cache: FileInfoCache::new(cache_size),
127            total_size_cache: RefCell::new(None),
128            is_initialized: false,
129        })
130    }
131
132    /// 初始化读取器
133    ///
134    /// 确保索引可用并准备好读取操作
135    pub fn initialize(&mut self) -> PcapResult<()> {
136        if self.is_initialized {
137            return Ok(());
138        }
139
140        info!("初始化PcapReader...");
141
142        // 确保索引可用
143        let _index = self.index_manager.ensure_index()?;
144
145        self.is_initialized = true;
146        info!("PcapReader初始化完成");
147        Ok(())
148    }
149
150    /// 获取数据集信息
151    pub fn get_dataset_info(
152        &mut self,
153    ) -> PcapResult<DatasetInfo> {
154        self.initialize()?;
155
156        let index = self
157            .index_manager
158            .get_index()
159            .ok_or_else(|| {
160                PcapError::InvalidState(
161                    "索引未加载".to_string(),
162                )
163            })?;
164
165        use chrono::Utc;
166
167        Ok(DatasetInfo {
168            name: self.dataset_name.clone(),
169            path: self.dataset_path.clone(),
170            file_count: index.data_files.files.len(),
171            total_packets: index.total_packets,
172            total_size: self.get_total_size()?,
173            start_timestamp: if index.start_timestamp > 0 {
174                Some(index.start_timestamp)
175            } else {
176                None
177            },
178            end_timestamp: if index.end_timestamp > 0 {
179                Some(index.end_timestamp)
180            } else {
181                None
182            },
183            created_time: Utc::now().to_rfc3339(),
184            modified_time: Utc::now().to_rfc3339(),
185            has_index: true,
186        })
187    }
188
189    /// 获取文件信息列表
190    pub fn get_file_info_list(
191        &mut self,
192    ) -> PcapResult<Vec<FileInfo>> {
193        self.initialize()?;
194
195        let index = self
196            .index_manager
197            .get_index()
198            .ok_or_else(|| {
199                PcapError::InvalidState(
200                    "索引未加载".to_string(),
201                )
202            })?;
203
204        use chrono::Utc;
205        let current_time = Utc::now().to_rfc3339();
206
207        let mut file_infos = Vec::new();
208        for file_index in &index.data_files.files {
209            let file_path = self
210                .dataset_path
211                .join(&file_index.file_name);
212
213            // 尝试从缓存获取文件信息
214            let file_info = if let Some(cached_info) =
215                self.file_info_cache.get(&file_path)
216            {
217                cached_info
218            } else {
219                // 缓存未命中,创建新的文件信息并缓存
220                let file_info = FileInfo {
221                    file_name: file_index.file_name.clone(),
222                    file_path: file_path.clone(),
223                    file_size: file_index.file_size,
224                    packet_count: file_index.packet_count,
225                    start_timestamp: if file_index
226                        .start_timestamp
227                        > 0
228                    {
229                        Some(file_index.start_timestamp)
230                    } else {
231                        None
232                    },
233                    end_timestamp: if file_index
234                        .end_timestamp
235                        > 0
236                    {
237                        Some(file_index.end_timestamp)
238                    } else {
239                        None
240                    },
241                    file_hash: Some(
242                        file_index.file_hash.clone(),
243                    ),
244                    created_time: current_time.clone(),
245                    modified_time: current_time.clone(),
246                    is_valid: true,
247                };
248
249                // 将文件信息加入缓存
250                self.file_info_cache
251                    .insert(&file_path, file_info.clone());
252                file_info
253            };
254
255            file_infos.push(file_info);
256        }
257
258        Ok(file_infos)
259    }
260
261    /// 获取数据集路径
262    pub fn dataset_path(&self) -> &Path {
263        &self.dataset_path
264    }
265
266    /// 获取数据集名称
267    pub fn dataset_name(&self) -> &str {
268        &self.dataset_name
269    }
270
271    /// 读取下一个数据包(默认方法,带校验结果)
272    ///
273    /// 从当前位置读取下一个数据包,包含校验状态信息。如果当前文件读取完毕,
274    /// 会自动切换到下一个文件。
275    ///
276    /// # 返回
277    /// - `Ok(Some(result))` - 成功读取到数据包和校验结果
278    /// - `Ok(None)` - 到达文件末尾,无更多数据包
279    /// - `Err(error)` - 读取过程中发生错误
280    pub fn read_packet(
281        &mut self,
282    ) -> PcapResult<Option<ValidatedPacket>> {
283        self.initialize()?;
284
285        // 确保当前文件已打开
286        self.ensure_current_file_open()?;
287
288        loop {
289            if let Some(ref mut reader) =
290                self.current_reader
291            {
292                match reader.read_packet() {
293                    Ok(Some(result)) => {
294                        self.current_position += 1;
295                        return Ok(Some(result));
296                    }
297                    Ok(None) => {
298                        // 当前文件读取完毕,尝试切换到下一个文件
299                        if !self.switch_to_next_file()? {
300                            // 没有更多文件
301                            return Ok(None);
302                        }
303                        continue;
304                    }
305                    Err(e) => return Err(e),
306                }
307            } else {
308                // 没有可读取的文件
309                return Ok(None);
310            }
311        }
312    }
313
314    /// 读取下一个数据包(仅返回数据,不返回校验信息)
315    ///
316    /// 从当前位置读取下一个数据包,仅返回数据包本身。如果当前文件读取完毕,
317    /// 会自动切换到下一个文件。
318    ///
319    /// # 返回
320    /// - `Ok(Some(packet))` - 成功读取到数据包
321    /// - `Ok(None)` - 到达文件末尾,无更多数据包
322    /// - `Err(error)` - 读取过程中发生错误
323    pub fn read_packet_data_only(
324        &mut self,
325    ) -> PcapResult<Option<DataPacket>> {
326        match self.read_packet()? {
327            Some(result) => Ok(Some(result.packet)),
328            None => Ok(None),
329        }
330    }
331
332    /// 批量读取多个数据包(默认方法,带校验结果)
333    ///
334    /// # 参数
335    /// - `count` - 要读取的数据包数量
336    ///
337    /// # 返回
338    pub fn read_packets(
339        &mut self,
340        count: usize,
341    ) -> PcapResult<Vec<ValidatedPacket>> {
342        self.initialize()?;
343
344        let mut results = Vec::with_capacity(count);
345
346        // 批量读取指定数量的数据包
347        for _ in 0..count {
348            if let Some(result) = self.read_packet()? {
349                results.push(result);
350            } else {
351                break; // 没有更多数据包
352            }
353        }
354
355        Ok(results)
356    }
357
358    /// 批量读取多个数据包(仅返回数据,不返回校验信息)
359    ///
360    /// # 参数
361    /// - `count` - 要读取的数据包数量
362    ///
363    /// # 返回
364    pub fn read_packets_data_only(
365        &mut self,
366        count: usize,
367    ) -> PcapResult<Vec<DataPacket>> {
368        self.initialize()?;
369
370        let mut packets = Vec::with_capacity(count);
371
372        // 批量读取指定数量的数据包
373        for _ in 0..count {
374            if let Some(packet) =
375                self.read_packet_data_only()?
376            {
377                packets.push(packet);
378            } else {
379                break; // 没有更多数据包
380            }
381        }
382
383        Ok(packets)
384    }
385
386    /// 重置读取器到数据集开始位置
387    ///
388    /// 将读取器重置到数据集的开始位置,后续读取将从第一个数据包开始。
389    pub fn reset(&mut self) -> PcapResult<()> {
390        self.initialize()?;
391
392        // 重置当前读取位置到数据集开始
393        self.current_position = 0;
394        self.current_file_index = 0;
395
396        // 关闭当前文件
397        if let Some(ref mut reader) = self.current_reader {
398            reader.close();
399        }
400        self.current_reader = None;
401
402        // 重新打开第一个文件(如果存在)
403        let index = self
404            .index_manager
405            .get_index()
406            .ok_or_else(|| {
407                PcapError::InvalidState(
408                    "索引未加载".to_string(),
409                )
410            })?;
411
412        if !index.data_files.files.is_empty() {
413            self.open_file(0)?;
414        }
415
416        info!("读取器已重置到数据集开始位置");
417        Ok(())
418    }
419
420    /// 获取索引管理器的引用
421    /// 允许外部通过 reader.index().method() 的方式访问索引功能
422    pub fn index(&self) -> &IndexManager {
423        &self.index_manager
424    }
425
426    /// 获取索引管理器的可变引用
427    /// 允许外部通过 reader.index_mut().method() 的方式访问索引功能
428    pub fn index_mut(&mut self) -> &mut IndexManager {
429        &mut self.index_manager
430    }
431
432    /// 按时间戳查找数据包位置
433    ///
434    /// # 参数
435    /// - `timestamp_ns` - 目标时间戳(纳秒)
436    ///
437    /// # 返回
438    /// 返回最接近指定时间戳的数据包索引条目,如果未找到则返回None
439    pub fn seek_by_timestamp(
440        &mut self,
441        timestamp_ns: u64,
442    ) -> PcapResult<
443        Option<
444            crate::business::index::types::TimestampPointer,
445        >,
446    > {
447        self.initialize()?;
448
449        let index = self
450            .index_manager
451            .get_index()
452            .ok_or_else(|| {
453                PcapError::InvalidState(
454                    "索引未加载".to_string(),
455                )
456            })?;
457
458        // 在时间戳索引中查找最接近的条目
459        let mut closest_entry = None;
460        let mut min_diff = u64::MAX;
461
462        for (ts, pointer) in &index.timestamp_index {
463            let diff = (*ts).abs_diff(timestamp_ns);
464
465            if diff < min_diff {
466                min_diff = diff;
467                closest_entry = Some(pointer.clone());
468            }
469        }
470
471        Ok(closest_entry)
472    }
473
474    /// 按时间范围读取数据包
475    ///
476    /// # 参数
477    /// - `start_timestamp_ns` - 开始时间戳(纳秒)
478    /// - `end_timestamp_ns` - 结束时间戳(纳秒)
479    ///
480    /// # 返回
481    /// 返回指定时间范围内的所有数据包
482    pub fn read_packets_by_time_range(
483        &mut self,
484        start_timestamp_ns: u64,
485        end_timestamp_ns: u64,
486    ) -> PcapResult<Vec<ValidatedPacket>> {
487        self.initialize()?;
488
489        let pointers = {
490            let index = self
491                .index_manager
492                .get_index()
493                .ok_or_else(|| {
494                    PcapError::InvalidState(
495                        "索引未加载".to_string(),
496                    )
497                })?;
498
499            index
500                .get_packets_in_range(
501                    start_timestamp_ns,
502                    end_timestamp_ns,
503                )
504                .into_iter()
505                .cloned()
506                .collect::<Vec<_>>()
507        };
508
509        let mut result_packets = Vec::new();
510        let mut current_file_index = None;
511
512        // 按时间顺序读取数据包
513        for pointer in pointers {
514            // 检查是否需要切换文件
515            if current_file_index
516                != Some(pointer.file_index)
517            {
518                self.open_file(pointer.file_index)?;
519                current_file_index =
520                    Some(pointer.file_index);
521            }
522
523            // 确保文件已打开
524            self.ensure_current_file_open()?;
525
526            // 读取指定位置的数据包
527            let reader = self
528                .current_reader
529                .as_mut()
530                .ok_or_else(|| {
531                    PcapError::InvalidState(
532                        "当前文件读取器未初始化"
533                            .to_string(),
534                    )
535                })?;
536            let packet_result = reader
537                .read_packet_at(pointer.entry.byte_offset);
538
539            match packet_result {
540                Ok(packet) => {
541                    // 验证时间戳是否在范围内
542                    let packet_timestamp =
543                        packet.packet.get_timestamp_ns();
544                    if packet_timestamp
545                        >= start_timestamp_ns
546                        && packet_timestamp
547                            <= end_timestamp_ns
548                    {
549                        result_packets.push(packet);
550                    }
551                }
552                Err(e) => {
553                    warn!("读取数据包失败: {}", e);
554                    // 继续处理其他数据包
555                }
556            }
557        }
558
559        Ok(result_packets)
560    }
561
562    /// 获取缓存统计信息
563    pub fn get_cache_stats(&self) -> CacheStats {
564        self.file_info_cache.get_cache_stats()
565    }
566
567    /// 清理缓存
568    pub fn clear_cache(&mut self) -> PcapResult<()> {
569        let _ = self.file_info_cache.clear();
570        debug!("缓存已清理");
571        Ok(())
572    }
573
574    /// 跳转到指定时间戳(纳秒)
575    ///
576    /// 返回实际定位到的时间戳。如果精确匹配不存在,返回时间戳后面最接近的数据包。
577    ///
578    /// # 参数
579    /// - `timestamp_ns` - 目标时间戳(纳秒)
580    ///
581    /// # 返回
582    /// - `Ok(actual_timestamp)` - 成功跳转,返回实际定位到的时间戳
583    /// - `Err(error)` - 未找到数据包或发生错误
584    pub fn seek_to_timestamp(
585        &mut self,
586        timestamp_ns: u64,
587    ) -> PcapResult<u64> {
588        self.initialize()?;
589
590        // 1. 先提取所需信息,避免借用冲突
591        let (
592            actual_ts,
593            file_index,
594            byte_offset,
595            packet_offset,
596        ) = {
597            let index = self
598                .index_manager
599                .get_index()
600                .ok_or_else(|| {
601                    PcapError::InvalidState(
602                        "索引未加载".to_string(),
603                    )
604                })?;
605
606            // 尝试精确匹配
607            let (actual_ts, pointer) = if let Some(ptr) =
608                index.find_packet_by_timestamp(timestamp_ns)
609            {
610                (timestamp_ns, ptr.clone())
611            } else {
612                // 查找 >= target 的最小时间戳
613                Self::find_timestamp_ge(&index.timestamp_index, timestamp_ns)
614                    .ok_or_else(|| PcapError::InvalidArgument(
615                        format!("未找到时间戳 >= {timestamp_ns} 的数据包")
616                    ))?
617            };
618
619            // 计算文件内的序号
620            let file_index_data =
621                &index.data_files.files[pointer.file_index];
622            let packet_offset = file_index_data
623                .data_packets
624                .iter()
625                .position(|p| {
626                    p.timestamp_ns
627                        == pointer.entry.timestamp_ns
628                })
629                .unwrap_or(0);
630
631            (
632                actual_ts,
633                pointer.file_index,
634                pointer.entry.byte_offset,
635                packet_offset,
636            )
637        };
638
639        // 2. 打开对应文件
640        self.open_file(file_index)?;
641
642        // 3. seek 到字节偏移
643        if let Some(reader) = self.current_reader.as_mut() {
644            reader.seek_to(byte_offset)?;
645        } else {
646            return Err(PcapError::InvalidState(
647                "文件未打开".to_string(),
648            ));
649        }
650
651        // 4. 更新状态
652        self.current_file_index = file_index;
653
654        // 计算全局位置
655        let index = self
656            .index_manager
657            .get_index()
658            .ok_or_else(|| {
659                PcapError::InvalidState(
660                    "索引未加载".to_string(),
661                )
662            })?;
663        self.current_position = self
664            .calculate_global_position(
665                index,
666                file_index,
667                packet_offset,
668            );
669
670        info!("已跳转到时间戳: {timestamp_ns}ns (实际: {actual_ts}ns), 全局位置: {}", 
671            self.current_position);
672
673        Ok(actual_ts)
674    }
675
676    /// 跳转到指定索引的数据包(从0开始)
677    ///
678    /// # 参数
679    /// - `packet_index` - 目标数据包的全局索引(从0开始)
680    pub fn seek_to_packet(
681        &mut self,
682        packet_index: usize,
683    ) -> PcapResult<()> {
684        self.initialize()?;
685
686        // 1. 先提取所需信息,避免借用冲突
687        let (target_file_idx, byte_offset, packet_offset) = {
688            let index = self
689                .index_manager
690                .get_index()
691                .ok_or_else(|| {
692                    PcapError::InvalidState(
693                        "索引未加载".to_string(),
694                    )
695                })?;
696
697            // 检查索引范围
698            if packet_index >= index.total_packets as usize
699            {
700                return Err(PcapError::InvalidArgument(
701                    format!("数据包索引 {packet_index} 超出范围 (总数: {})", index.total_packets)
702                ));
703            }
704
705            // 遍历文件,找到目标文件和文件内偏移
706            let mut accumulated = 0usize;
707            let mut target_file_idx = 0;
708            let mut packet_offset = 0;
709
710            for (file_idx, file) in
711                index.data_files.files.iter().enumerate()
712            {
713                let next_accumulated = accumulated
714                    + file.packet_count as usize;
715                if packet_index < next_accumulated {
716                    target_file_idx = file_idx;
717                    packet_offset =
718                        packet_index - accumulated;
719                    break;
720                }
721                accumulated = next_accumulated;
722            }
723
724            // 获取数据包条目
725            let file =
726                &index.data_files.files[target_file_idx];
727            let packet_entry =
728                &file.data_packets[packet_offset];
729            let byte_offset = packet_entry.byte_offset;
730
731            (target_file_idx, byte_offset, packet_offset)
732        };
733
734        // 2. 打开文件并 seek
735        self.open_file(target_file_idx)?;
736        if let Some(reader) = self.current_reader.as_mut() {
737            reader.seek_to(byte_offset)?;
738        } else {
739            return Err(PcapError::InvalidState(
740                "文件未打开".to_string(),
741            ));
742        }
743
744        // 3. 更新状态
745        self.current_file_index = target_file_idx;
746        self.current_position = packet_index as u64;
747
748        info!("已跳转到数据包索引: {packet_index}, 文件: {target_file_idx}, 文件内偏移: {packet_offset}");
749
750        Ok(())
751    }
752
753    /// 检查是否已到达文件末尾
754    pub fn is_eof(&self) -> bool {
755        if let Some(index) = self.index_manager.get_index()
756        {
757            self.current_position >= index.total_packets
758        } else {
759            // 未初始化时,检查是否有可读取的文件
760            self.current_reader.is_none()
761        }
762    }
763
764    /// 获取总数据包数量(如果索引可用)
765    pub fn total_packets(&self) -> Option<usize> {
766        self.index_manager
767            .get_index()
768            .map(|idx| idx.total_packets as usize)
769    }
770
771    /// 获取当前数据包索引位置(全局序号,从0开始)
772    pub fn current_packet_index(&self) -> u64 {
773        self.current_position
774    }
775
776    /// 获取当前读取进度(百分比:0.0 - 1.0)
777    pub fn progress(&self) -> Option<f64> {
778        self.total_packets().map(|total| {
779            if total == 0 {
780                1.0
781            } else {
782                (self.current_position as f64
783                    / total as f64)
784                    .min(1.0)
785            }
786        })
787    }
788
789    /// 跳过指定数量的数据包
790    ///
791    /// # 参数
792    /// - `count` - 要跳过的数据包数量
793    ///
794    /// # 返回
795    /// 实际跳过的数据包数量(可能小于请求数量,如果到达末尾)
796    pub fn skip_packets(
797        &mut self,
798        count: usize,
799    ) -> PcapResult<usize> {
800        let current_idx = self.current_position as usize;
801        let target_idx = current_idx + count;
802
803        let total = self.total_packets().unwrap_or(0);
804        // 限制目标索引到有效范围(0 到 total-1)
805        let actual_target = if total == 0 {
806            0
807        } else {
808            target_idx.min(total - 1)
809        };
810        let actual_skipped =
811            actual_target.saturating_sub(current_idx);
812
813        if actual_skipped > 0 {
814            self.seek_to_packet(actual_target)?;
815        }
816
817        Ok(actual_skipped)
818    }
819
820    // =================================================================
821    // 私有方法
822    // =================================================================
823
824    /// 计算指定文件索引和文件内数据包偏移对应的全局数据包位置
825    fn calculate_global_position(
826        &self,
827        index: &crate::business::index::types::PidxIndex,
828        file_index: usize,
829        packet_offset_in_file: usize,
830    ) -> u64 {
831        let mut position = 0u64;
832        for (idx, file) in
833            index.data_files.files.iter().enumerate()
834        {
835            if idx < file_index {
836                position += file.packet_count;
837            } else if idx == file_index {
838                position += packet_offset_in_file as u64;
839                break;
840            }
841        }
842        position
843    }
844
845    /// 查找大于等于指定时间戳的最接近时间戳及其指针
846    fn find_timestamp_ge(
847        timestamp_index: &std::collections::HashMap<
848            u64,
849            crate::business::index::types::TimestampPointer,
850        >,
851        target_ns: u64,
852    ) -> Option<(
853        u64,
854        crate::business::index::types::TimestampPointer,
855    )> {
856        let mut candidates: Vec<u64> = timestamp_index
857            .keys()
858            .filter(|&&ts| ts >= target_ns)
859            .copied()
860            .collect();
861
862        if candidates.is_empty() {
863            return None;
864        }
865
866        candidates.sort_unstable();
867        let closest_ts = candidates[0];
868        timestamp_index
869            .get(&closest_ts)
870            .map(|ptr| (closest_ts, ptr.clone()))
871    }
872
873    /// 获取数据集总大小
874    fn get_total_size(&self) -> PcapResult<u64> {
875        if let Some(cached_size) =
876            *self.total_size_cache.borrow()
877        {
878            return Ok(cached_size);
879        }
880
881        let index = self
882            .index_manager
883            .get_index()
884            .ok_or_else(|| {
885                PcapError::InvalidState(
886                    "索引未加载".to_string(),
887                )
888            })?;
889
890        let total_size: u64 = index
891            .data_files
892            .files
893            .iter()
894            .map(|f| f.file_size)
895            .sum();
896
897        *self.total_size_cache.borrow_mut() =
898            Some(total_size);
899        Ok(total_size)
900    }
901
902    /// 打开指定索引的文件
903    fn open_file(
904        &mut self,
905        file_index: usize,
906    ) -> PcapResult<()> {
907        let index = self
908            .index_manager
909            .get_index()
910            .ok_or_else(|| {
911                PcapError::InvalidState(
912                    "索引未加载".to_string(),
913                )
914            })?;
915
916        if file_index >= index.data_files.files.len() {
917            return Err(PcapError::InvalidArgument(
918                format!("文件索引超出范围: {file_index}"),
919            ));
920        }
921
922        // 关闭当前文件
923        if let Some(ref mut reader) = self.current_reader {
924            reader.close();
925        }
926
927        // 打开新文件
928        let file_info = &index.data_files.files[file_index];
929        let file_path =
930            self.dataset_path.join(&file_info.file_name);
931
932        let mut reader =
933            crate::data::file_reader::PcapFileReader::new(
934                self.configuration.clone(),
935            );
936        reader.open(&file_path)?;
937
938        self.current_reader = Some(reader);
939        self.current_file_index = file_index;
940
941        debug!("已打开文件: {file_path:?}");
942        Ok(())
943    }
944
945    /// 切换到下一个文件
946    fn switch_to_next_file(&mut self) -> PcapResult<bool> {
947        let index = self
948            .index_manager
949            .get_index()
950            .ok_or_else(|| {
951                PcapError::InvalidState(
952                    "索引未加载".to_string(),
953                )
954            })?;
955
956        if self.current_file_index + 1
957            >= index.data_files.files.len()
958        {
959            // 没有更多文件
960            return Ok(false);
961        }
962
963        self.open_file(self.current_file_index + 1)?;
964        Ok(true)
965    }
966
967    /// 确保当前文件已打开
968    fn ensure_current_file_open(
969        &mut self,
970    ) -> PcapResult<()> {
971        if self.current_reader.is_none() {
972            let index = self
973                .index_manager
974                .get_index()
975                .ok_or_else(|| {
976                    PcapError::InvalidState(
977                        "索引未加载".to_string(),
978                    )
979                })?;
980
981            if !index.data_files.files.is_empty() {
982                self.open_file(0)?;
983            }
984        }
985        Ok(())
986    }
987
988    /// 根据时间戳读取数据包
989    pub fn read_packet_by_timestamp(
990        &mut self,
991        timestamp_ns: u64,
992    ) -> PcapResult<Option<ValidatedPacket>> {
993        let pointer = {
994            let index = self
995                .index_manager
996                .get_index()
997                .ok_or_else(|| {
998                    PcapError::InvalidState(
999                        "索引未加载".to_string(),
1000                    )
1001                })?;
1002
1003            match index
1004                .find_packet_by_timestamp(timestamp_ns)
1005            {
1006                Some(ptr) => ptr.clone(),
1007                None => return Ok(None),
1008            }
1009        };
1010
1011        // 检查是否需要切换文件
1012        if pointer.file_index != self.current_file_index {
1013            self.open_file(pointer.file_index)?;
1014        }
1015
1016        // 确保文件已打开
1017        self.ensure_current_file_open()?;
1018
1019        // 读取指定位置的数据包
1020        let reader = self
1021            .current_reader
1022            .as_mut()
1023            .ok_or_else(|| {
1024                PcapError::InvalidState(
1025                    "当前文件读取器未初始化".to_string(),
1026                )
1027            })?;
1028        let packet_result = reader
1029            .read_packet_at(pointer.entry.byte_offset);
1030
1031        match packet_result {
1032            Ok(packet) => {
1033                // 验证时间戳是否匹配
1034                if packet.packet.get_timestamp_ns()
1035                    == timestamp_ns
1036                {
1037                    Ok(Some(packet))
1038                } else {
1039                    Err(PcapError::InvalidState(
1040                        "读取的数据包时间戳不匹配"
1041                            .to_string(),
1042                    ))
1043                }
1044            }
1045            Err(e) => Err(e),
1046        }
1047    }
1048}
1049
1050impl Drop for PcapReader {
1051    fn drop(&mut self) {
1052        // 关闭当前文件读取器
1053        if let Some(ref mut reader) = self.current_reader {
1054            reader.close();
1055        }
1056        debug!("PcapReader已清理");
1057    }
1058}