1use log::{debug, info, warn};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14 DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22pub struct PcapReader {
30 dataset_path: PathBuf,
32 dataset_name: String,
34 index_manager: IndexManager,
36 configuration: ReaderConfig,
38 current_reader: Option<PcapFileReader>,
40 current_file_index: usize,
42 current_position: u64,
44 file_info_cache: FileInfoCache,
46 total_size_cache: RefCell<Option<u64>>,
48 is_initialized: bool,
50}
51
52impl PcapReader {
53 pub fn new<P: AsRef<Path>>(
62 base_path: P,
63 dataset_name: &str,
64 ) -> PcapResult<Self> {
65 Self::new_with_config(
66 base_path,
67 dataset_name,
68 ReaderConfig::default(),
69 )
70 }
71
72 pub fn new_with_config<P: AsRef<Path>>(
82 base_path: P,
83 dataset_name: &str,
84 configuration: ReaderConfig,
85 ) -> PcapResult<Self> {
86 configuration.validate().map_err(|e| {
88 PcapError::InvalidArgument(format!(
89 "读取器配置无效: {e}"
90 ))
91 })?;
92
93 let dataset_path =
94 base_path.as_ref().join(dataset_name);
95
96 if !dataset_path.exists() {
98 return Err(PcapError::DirectoryNotFound(
99 ERROR_DATASET_NOT_FOUND.to_string(),
100 ));
101 }
102
103 if !dataset_path.is_dir() {
104 return Err(PcapError::InvalidArgument(
105 ERROR_INVALID_DATASET.to_string(),
106 ));
107 }
108
109 let index_manager =
111 IndexManager::new(base_path, dataset_name)?;
112
113 let cache_size = configuration.index_cache_size;
115
116 info!("PcapReader已创建 - 数据集: {dataset_name}");
117
118 Ok(Self {
119 dataset_path,
120 dataset_name: dataset_name.to_string(),
121 index_manager,
122 configuration,
123 current_reader: None,
124 current_file_index: 0,
125 current_position: 0,
126 file_info_cache: FileInfoCache::new(cache_size),
127 total_size_cache: RefCell::new(None),
128 is_initialized: false,
129 })
130 }
131
132 pub fn initialize(&mut self) -> PcapResult<()> {
136 if self.is_initialized {
137 return Ok(());
138 }
139
140 info!("初始化PcapReader...");
141
142 let _index = self.index_manager.ensure_index()?;
144
145 self.is_initialized = true;
146 info!("PcapReader初始化完成");
147 Ok(())
148 }
149
150 pub fn get_dataset_info(
152 &mut self,
153 ) -> PcapResult<DatasetInfo> {
154 self.initialize()?;
155
156 let index = self
157 .index_manager
158 .get_index()
159 .ok_or_else(|| {
160 PcapError::InvalidState(
161 "索引未加载".to_string(),
162 )
163 })?;
164
165 use chrono::Utc;
166
167 Ok(DatasetInfo {
168 name: self.dataset_name.clone(),
169 path: self.dataset_path.clone(),
170 file_count: index.data_files.files.len(),
171 total_packets: index.total_packets,
172 total_size: self.get_total_size()?,
173 start_timestamp: if index.start_timestamp > 0 {
174 Some(index.start_timestamp)
175 } else {
176 None
177 },
178 end_timestamp: if index.end_timestamp > 0 {
179 Some(index.end_timestamp)
180 } else {
181 None
182 },
183 created_time: Utc::now().to_rfc3339(),
184 modified_time: Utc::now().to_rfc3339(),
185 has_index: true,
186 })
187 }
188
189 pub fn get_file_info_list(
191 &mut self,
192 ) -> PcapResult<Vec<FileInfo>> {
193 self.initialize()?;
194
195 let index = self
196 .index_manager
197 .get_index()
198 .ok_or_else(|| {
199 PcapError::InvalidState(
200 "索引未加载".to_string(),
201 )
202 })?;
203
204 use chrono::Utc;
205 let current_time = Utc::now().to_rfc3339();
206
207 let mut file_infos = Vec::new();
208 for file_index in &index.data_files.files {
209 let file_path = self
210 .dataset_path
211 .join(&file_index.file_name);
212
213 let file_info = if let Some(cached_info) =
215 self.file_info_cache.get(&file_path)
216 {
217 cached_info
218 } else {
219 let file_info = FileInfo {
221 file_name: file_index.file_name.clone(),
222 file_path: file_path.clone(),
223 file_size: file_index.file_size,
224 packet_count: file_index.packet_count,
225 start_timestamp: if file_index
226 .start_timestamp
227 > 0
228 {
229 Some(file_index.start_timestamp)
230 } else {
231 None
232 },
233 end_timestamp: if file_index
234 .end_timestamp
235 > 0
236 {
237 Some(file_index.end_timestamp)
238 } else {
239 None
240 },
241 created_time: current_time.clone(),
242 modified_time: current_time.clone(),
243 is_valid: true,
244 };
245
246 self.file_info_cache
248 .insert(&file_path, file_info.clone());
249 file_info
250 };
251
252 file_infos.push(file_info);
253 }
254
255 Ok(file_infos)
256 }
257
258 pub fn dataset_path(&self) -> &Path {
260 &self.dataset_path
261 }
262
263 pub fn dataset_name(&self) -> &str {
265 &self.dataset_name
266 }
267
268 pub fn read_packet(
278 &mut self,
279 ) -> PcapResult<Option<ValidatedPacket>> {
280 self.initialize()?;
281
282 self.ensure_current_file_open()?;
284
285 loop {
286 if let Some(ref mut reader) =
287 self.current_reader
288 {
289 match reader.read_packet() {
290 Ok(Some(result)) => {
291 self.current_position += 1;
292 return Ok(Some(result));
293 }
294 Ok(None) => {
295 if !self.switch_to_next_file()? {
297 return Ok(None);
299 }
300 continue;
301 }
302 Err(e) => return Err(e),
303 }
304 } else {
305 return Ok(None);
307 }
308 }
309 }
310
311 pub fn read_packet_data_only(
321 &mut self,
322 ) -> PcapResult<Option<DataPacket>> {
323 match self.read_packet()? {
324 Some(result) => Ok(Some(result.packet)),
325 None => Ok(None),
326 }
327 }
328
329 pub fn read_packets(
336 &mut self,
337 count: usize,
338 ) -> PcapResult<Vec<ValidatedPacket>> {
339 self.initialize()?;
340
341 let mut results = Vec::with_capacity(count);
342
343 for _ in 0..count {
345 if let Some(result) = self.read_packet()? {
346 results.push(result);
347 } else {
348 break; }
350 }
351
352 Ok(results)
353 }
354
355 pub fn read_packets_data_only(
362 &mut self,
363 count: usize,
364 ) -> PcapResult<Vec<DataPacket>> {
365 self.initialize()?;
366
367 let mut packets = Vec::with_capacity(count);
368
369 for _ in 0..count {
371 if let Some(packet) =
372 self.read_packet_data_only()?
373 {
374 packets.push(packet);
375 } else {
376 break; }
378 }
379
380 Ok(packets)
381 }
382
383 pub fn reset(&mut self) -> PcapResult<()> {
387 self.initialize()?;
388
389 self.current_position = 0;
391 self.current_file_index = 0;
392
393 if let Some(ref mut reader) = self.current_reader {
395 reader.close();
396 }
397 self.current_reader = None;
398
399 let index = self
401 .index_manager
402 .get_index()
403 .ok_or_else(|| {
404 PcapError::InvalidState(
405 "索引未加载".to_string(),
406 )
407 })?;
408
409 if !index.data_files.files.is_empty() {
410 self.open_file(0)?;
411 }
412
413 info!("读取器已重置到数据集开始位置");
414 Ok(())
415 }
416
417 pub fn index(&self) -> &IndexManager {
420 &self.index_manager
421 }
422
423 pub fn index_mut(&mut self) -> &mut IndexManager {
426 &mut self.index_manager
427 }
428
429 pub fn seek_by_timestamp(
437 &mut self,
438 timestamp_ns: u64,
439 ) -> PcapResult<
440 Option<
441 crate::business::index::types::TimestampPointer,
442 >,
443 > {
444 self.initialize()?;
445
446 let index = self
447 .index_manager
448 .get_index()
449 .ok_or_else(|| {
450 PcapError::InvalidState(
451 "索引未加载".to_string(),
452 )
453 })?;
454
455 let mut closest_entry = None;
457 let mut min_diff = u64::MAX;
458
459 for (ts, pointer) in &index.timestamp_index {
460 let diff = (*ts).abs_diff(timestamp_ns);
461
462 if diff < min_diff {
463 min_diff = diff;
464 closest_entry = Some(pointer.clone());
465 }
466 }
467
468 Ok(closest_entry)
469 }
470
471 pub fn read_packets_by_time_range(
480 &mut self,
481 start_timestamp_ns: u64,
482 end_timestamp_ns: u64,
483 ) -> PcapResult<Vec<ValidatedPacket>> {
484 self.initialize()?;
485
486 let pointers = {
487 let index = self
488 .index_manager
489 .get_index()
490 .ok_or_else(|| {
491 PcapError::InvalidState(
492 "索引未加载".to_string(),
493 )
494 })?;
495
496 index
497 .get_packets_in_range(
498 start_timestamp_ns,
499 end_timestamp_ns,
500 )
501 .into_iter()
502 .cloned()
503 .collect::<Vec<_>>()
504 };
505
506 let mut result_packets = Vec::new();
507 let mut current_file_index = None;
508
509 for pointer in pointers {
511 if current_file_index
513 != Some(pointer.file_index)
514 {
515 self.open_file(pointer.file_index)?;
516 current_file_index =
517 Some(pointer.file_index);
518 }
519
520 self.ensure_current_file_open()?;
522
523 let reader = self
525 .current_reader
526 .as_mut()
527 .ok_or_else(|| {
528 PcapError::InvalidState(
529 "当前文件读取器未初始化"
530 .to_string(),
531 )
532 })?;
533 let packet_result = reader
534 .read_packet_at(pointer.entry.byte_offset);
535
536 match packet_result {
537 Ok(packet) => {
538 let packet_timestamp =
540 packet.packet.get_timestamp_ns();
541 if packet_timestamp
542 >= start_timestamp_ns
543 && packet_timestamp
544 <= end_timestamp_ns
545 {
546 result_packets.push(packet);
547 }
548 }
549 Err(e) => {
550 warn!("读取数据包失败: {}", e);
551 }
553 }
554 }
555
556 Ok(result_packets)
557 }
558
559 pub fn get_cache_stats(&self) -> CacheStats {
561 self.file_info_cache.get_cache_stats()
562 }
563
564 pub fn clear_cache(&mut self) -> PcapResult<()> {
566 let _ = self.file_info_cache.clear();
567 debug!("缓存已清理");
568 Ok(())
569 }
570
571 pub fn seek_to_timestamp(
582 &mut self,
583 timestamp_ns: u64,
584 ) -> PcapResult<u64> {
585 self.initialize()?;
586
587 let (
589 actual_ts,
590 file_index,
591 byte_offset,
592 packet_offset,
593 ) = {
594 let index = self
595 .index_manager
596 .get_index()
597 .ok_or_else(|| {
598 PcapError::InvalidState(
599 "索引未加载".to_string(),
600 )
601 })?;
602
603 let (actual_ts, pointer) = if let Some(ptr) =
605 index.find_packet_by_timestamp(timestamp_ns)
606 {
607 (timestamp_ns, ptr.clone())
608 } else {
609 Self::find_timestamp_ge(&index.timestamp_index, timestamp_ns)
611 .ok_or_else(|| PcapError::InvalidArgument(
612 format!("未找到时间戳 >= {timestamp_ns} 的数据包")
613 ))?
614 };
615
616 let file_index_data =
618 &index.data_files.files[pointer.file_index];
619 let packet_offset = file_index_data
620 .data_packets
621 .iter()
622 .position(|p| {
623 p.timestamp_ns
624 == pointer.entry.timestamp_ns
625 })
626 .unwrap_or(0);
627
628 (
629 actual_ts,
630 pointer.file_index,
631 pointer.entry.byte_offset,
632 packet_offset,
633 )
634 };
635
636 self.open_file(file_index)?;
638
639 if let Some(reader) = self.current_reader.as_mut() {
641 reader.seek_to(byte_offset)?;
642 } else {
643 return Err(PcapError::InvalidState(
644 "文件未打开".to_string(),
645 ));
646 }
647
648 self.current_file_index = file_index;
650
651 let index = self
653 .index_manager
654 .get_index()
655 .ok_or_else(|| {
656 PcapError::InvalidState(
657 "索引未加载".to_string(),
658 )
659 })?;
660 self.current_position = self
661 .calculate_global_position(
662 index,
663 file_index,
664 packet_offset,
665 );
666
667 info!("已跳转到时间戳: {timestamp_ns}ns (实际: {actual_ts}ns), 全局位置: {}",
668 self.current_position);
669
670 Ok(actual_ts)
671 }
672
673 pub fn seek_to_packet(
678 &mut self,
679 packet_index: usize,
680 ) -> PcapResult<()> {
681 self.initialize()?;
682
683 let (target_file_idx, byte_offset, packet_offset) = {
685 let index = self
686 .index_manager
687 .get_index()
688 .ok_or_else(|| {
689 PcapError::InvalidState(
690 "索引未加载".to_string(),
691 )
692 })?;
693
694 if packet_index >= index.total_packets as usize
696 {
697 return Err(PcapError::InvalidArgument(
698 format!("数据包索引 {packet_index} 超出范围 (总数: {})", index.total_packets)
699 ));
700 }
701
702 let mut accumulated = 0usize;
704 let mut target_file_idx = 0;
705 let mut packet_offset = 0;
706
707 for (file_idx, file) in
708 index.data_files.files.iter().enumerate()
709 {
710 let next_accumulated = accumulated
711 + file.packet_count as usize;
712 if packet_index < next_accumulated {
713 target_file_idx = file_idx;
714 packet_offset =
715 packet_index - accumulated;
716 break;
717 }
718 accumulated = next_accumulated;
719 }
720
721 let file =
723 &index.data_files.files[target_file_idx];
724 let packet_entry =
725 &file.data_packets[packet_offset];
726 let byte_offset = packet_entry.byte_offset;
727
728 (target_file_idx, byte_offset, packet_offset)
729 };
730
731 self.open_file(target_file_idx)?;
733 if let Some(reader) = self.current_reader.as_mut() {
734 reader.seek_to(byte_offset)?;
735 } else {
736 return Err(PcapError::InvalidState(
737 "文件未打开".to_string(),
738 ));
739 }
740
741 self.current_file_index = target_file_idx;
743 self.current_position = packet_index as u64;
744
745 info!("已跳转到数据包索引: {packet_index}, 文件: {target_file_idx}, 文件内偏移: {packet_offset}");
746
747 Ok(())
748 }
749
750 pub fn is_eof(&self) -> bool {
752 if let Some(index) = self.index_manager.get_index()
753 {
754 self.current_position >= index.total_packets
755 } else {
756 self.current_reader.is_none()
758 }
759 }
760
761 pub fn total_packets(&self) -> Option<usize> {
763 self.index_manager
764 .get_index()
765 .map(|idx| idx.total_packets as usize)
766 }
767
768 pub fn current_packet_index(&self) -> u64 {
770 self.current_position
771 }
772
773 pub fn progress(&self) -> Option<f64> {
775 self.total_packets().map(|total| {
776 if total == 0 {
777 1.0
778 } else {
779 (self.current_position as f64
780 / total as f64)
781 .min(1.0)
782 }
783 })
784 }
785
786 pub fn skip_packets(
794 &mut self,
795 count: usize,
796 ) -> PcapResult<usize> {
797 let current_idx = self.current_position as usize;
798 let target_idx = current_idx + count;
799
800 let total = self.total_packets().unwrap_or(0);
801 let actual_target = if total == 0 {
803 0
804 } else {
805 target_idx.min(total - 1)
806 };
807 let actual_skipped =
808 actual_target.saturating_sub(current_idx);
809
810 if actual_skipped > 0 {
811 self.seek_to_packet(actual_target)?;
812 }
813
814 Ok(actual_skipped)
815 }
816
817 fn calculate_global_position(
823 &self,
824 index: &crate::business::index::types::PidxIndex,
825 file_index: usize,
826 packet_offset_in_file: usize,
827 ) -> u64 {
828 let mut position = 0u64;
829 for (idx, file) in
830 index.data_files.files.iter().enumerate()
831 {
832 if idx < file_index {
833 position += file.packet_count;
834 } else if idx == file_index {
835 position += packet_offset_in_file as u64;
836 break;
837 }
838 }
839 position
840 }
841
842 fn find_timestamp_ge(
844 timestamp_index: &std::collections::HashMap<
845 u64,
846 crate::business::index::types::TimestampPointer,
847 >,
848 target_ns: u64,
849 ) -> Option<(
850 u64,
851 crate::business::index::types::TimestampPointer,
852 )> {
853 let mut candidates: Vec<u64> = timestamp_index
854 .keys()
855 .filter(|&&ts| ts >= target_ns)
856 .copied()
857 .collect();
858
859 if candidates.is_empty() {
860 return None;
861 }
862
863 candidates.sort_unstable();
864 let closest_ts = candidates[0];
865 timestamp_index
866 .get(&closest_ts)
867 .map(|ptr| (closest_ts, ptr.clone()))
868 }
869
870 fn get_total_size(&self) -> PcapResult<u64> {
872 if let Some(cached_size) =
873 *self.total_size_cache.borrow()
874 {
875 return Ok(cached_size);
876 }
877
878 let index = self
879 .index_manager
880 .get_index()
881 .ok_or_else(|| {
882 PcapError::InvalidState(
883 "索引未加载".to_string(),
884 )
885 })?;
886
887 let total_size: u64 = index
888 .data_files
889 .files
890 .iter()
891 .map(|f| f.file_size)
892 .sum();
893
894 *self.total_size_cache.borrow_mut() =
895 Some(total_size);
896 Ok(total_size)
897 }
898
899 fn open_file(
901 &mut self,
902 file_index: usize,
903 ) -> PcapResult<()> {
904 let index = self
905 .index_manager
906 .get_index()
907 .ok_or_else(|| {
908 PcapError::InvalidState(
909 "索引未加载".to_string(),
910 )
911 })?;
912
913 if file_index >= index.data_files.files.len() {
914 return Err(PcapError::InvalidArgument(
915 format!("文件索引超出范围: {file_index}"),
916 ));
917 }
918
919 if let Some(ref mut reader) = self.current_reader {
921 reader.close();
922 }
923
924 let file_info = &index.data_files.files[file_index];
926 let file_path =
927 self.dataset_path.join(&file_info.file_name);
928
929 let mut reader =
930 crate::data::file_reader::PcapFileReader::new(
931 self.configuration.clone(),
932 );
933 reader.open(&file_path)?;
934
935 self.current_reader = Some(reader);
936 self.current_file_index = file_index;
937
938 debug!("已打开文件: {file_path:?}");
939 Ok(())
940 }
941
942 fn switch_to_next_file(&mut self) -> PcapResult<bool> {
944 let index = self
945 .index_manager
946 .get_index()
947 .ok_or_else(|| {
948 PcapError::InvalidState(
949 "索引未加载".to_string(),
950 )
951 })?;
952
953 if self.current_file_index + 1
954 >= index.data_files.files.len()
955 {
956 return Ok(false);
958 }
959
960 self.open_file(self.current_file_index + 1)?;
961 Ok(true)
962 }
963
964 fn ensure_current_file_open(
966 &mut self,
967 ) -> PcapResult<()> {
968 if self.current_reader.is_none() {
969 let index = self
970 .index_manager
971 .get_index()
972 .ok_or_else(|| {
973 PcapError::InvalidState(
974 "索引未加载".to_string(),
975 )
976 })?;
977
978 if !index.data_files.files.is_empty() {
979 self.open_file(0)?;
980 }
981 }
982 Ok(())
983 }
984
985 pub fn read_packet_by_timestamp(
987 &mut self,
988 timestamp_ns: u64,
989 ) -> PcapResult<Option<ValidatedPacket>> {
990 let pointer = {
991 let index = self
992 .index_manager
993 .get_index()
994 .ok_or_else(|| {
995 PcapError::InvalidState(
996 "索引未加载".to_string(),
997 )
998 })?;
999
1000 match index
1001 .find_packet_by_timestamp(timestamp_ns)
1002 {
1003 Some(ptr) => ptr.clone(),
1004 None => return Ok(None),
1005 }
1006 };
1007
1008 if pointer.file_index != self.current_file_index {
1010 self.open_file(pointer.file_index)?;
1011 }
1012
1013 self.ensure_current_file_open()?;
1015
1016 let reader = self
1018 .current_reader
1019 .as_mut()
1020 .ok_or_else(|| {
1021 PcapError::InvalidState(
1022 "当前文件读取器未初始化".to_string(),
1023 )
1024 })?;
1025 let packet_result = reader
1026 .read_packet_at(pointer.entry.byte_offset);
1027
1028 match packet_result {
1029 Ok(packet) => {
1030 if packet.packet.get_timestamp_ns()
1032 == timestamp_ns
1033 {
1034 Ok(Some(packet))
1035 } else {
1036 Err(PcapError::InvalidState(
1037 "读取的数据包时间戳不匹配"
1038 .to_string(),
1039 ))
1040 }
1041 }
1042 Err(e) => Err(e),
1043 }
1044 }
1045}
1046
1047impl Drop for PcapReader {
1048 fn drop(&mut self) {
1049 if let Some(ref mut reader) = self.current_reader {
1051 reader.close();
1052 }
1053 debug!("PcapReader已清理");
1054 }
1055}