1use log::{debug, info, warn};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14 DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22pub struct PcapReader {
30 dataset_path: PathBuf,
32 dataset_name: String,
34 index_manager: IndexManager,
36 configuration: ReaderConfig,
38 current_reader: Option<PcapFileReader>,
40 current_file_index: usize,
42 current_position: u64,
44 file_info_cache: FileInfoCache,
46 total_size_cache: RefCell<Option<u64>>,
48 is_initialized: bool,
50}
51
52impl PcapReader {
53 pub fn new<P: AsRef<Path>>(
62 base_path: P,
63 dataset_name: &str,
64 ) -> PcapResult<Self> {
65 Self::new_with_config(
66 base_path,
67 dataset_name,
68 ReaderConfig::default(),
69 )
70 }
71
72 pub fn new_with_config<P: AsRef<Path>>(
82 base_path: P,
83 dataset_name: &str,
84 configuration: ReaderConfig,
85 ) -> PcapResult<Self> {
86 configuration.validate().map_err(|e| {
88 PcapError::InvalidArgument(format!(
89 "读取器配置无效: {e}"
90 ))
91 })?;
92
93 let dataset_path =
94 base_path.as_ref().join(dataset_name);
95
96 if !dataset_path.exists() {
98 return Err(PcapError::DirectoryNotFound(
99 ERROR_DATASET_NOT_FOUND.to_string(),
100 ));
101 }
102
103 if !dataset_path.is_dir() {
104 return Err(PcapError::InvalidArgument(
105 ERROR_INVALID_DATASET.to_string(),
106 ));
107 }
108
109 let index_manager =
111 IndexManager::new(base_path, dataset_name)?;
112
113 let cache_size = configuration.index_cache_size;
115
116 info!("PcapReader已创建 - 数据集: {dataset_name}");
117
118 Ok(Self {
119 dataset_path,
120 dataset_name: dataset_name.to_string(),
121 index_manager,
122 configuration,
123 current_reader: None,
124 current_file_index: 0,
125 current_position: 0,
126 file_info_cache: FileInfoCache::new(cache_size),
127 total_size_cache: RefCell::new(None),
128 is_initialized: false,
129 })
130 }
131
132 pub fn initialize(&mut self) -> PcapResult<()> {
136 if self.is_initialized {
137 return Ok(());
138 }
139
140 info!("初始化PcapReader...");
141
142 let _index = self.index_manager.ensure_index()?;
144
145 self.is_initialized = true;
146 info!("PcapReader初始化完成");
147 Ok(())
148 }
149
150 pub fn get_dataset_info(
152 &mut self,
153 ) -> PcapResult<DatasetInfo> {
154 self.initialize()?;
155
156 let index = self
157 .index_manager
158 .get_index()
159 .ok_or_else(|| {
160 PcapError::InvalidState(
161 "索引未加载".to_string(),
162 )
163 })?;
164
165 use chrono::Utc;
166
167 Ok(DatasetInfo {
168 name: self.dataset_name.clone(),
169 path: self.dataset_path.clone(),
170 file_count: index.data_files.files.len(),
171 total_packets: index.total_packets,
172 total_size: self.get_total_size()?,
173 start_timestamp: if index.start_timestamp > 0 {
174 Some(index.start_timestamp)
175 } else {
176 None
177 },
178 end_timestamp: if index.end_timestamp > 0 {
179 Some(index.end_timestamp)
180 } else {
181 None
182 },
183 created_time: Utc::now().to_rfc3339(),
184 modified_time: Utc::now().to_rfc3339(),
185 has_index: true,
186 })
187 }
188
189 pub fn get_file_info_list(
191 &mut self,
192 ) -> PcapResult<Vec<FileInfo>> {
193 self.initialize()?;
194
195 let index = self
196 .index_manager
197 .get_index()
198 .ok_or_else(|| {
199 PcapError::InvalidState(
200 "索引未加载".to_string(),
201 )
202 })?;
203
204 use chrono::Utc;
205 let current_time = Utc::now().to_rfc3339();
206
207 let mut file_infos = Vec::new();
208 for file_index in &index.data_files.files {
209 let file_path = self
210 .dataset_path
211 .join(&file_index.file_name);
212
213 let file_info = if let Some(cached_info) =
215 self.file_info_cache.get(&file_path)
216 {
217 cached_info
218 } else {
219 let file_info = FileInfo {
221 file_name: file_index.file_name.clone(),
222 file_path: file_path.clone(),
223 file_size: file_index.file_size,
224 packet_count: file_index.packet_count,
225 start_timestamp: if file_index
226 .start_timestamp
227 > 0
228 {
229 Some(file_index.start_timestamp)
230 } else {
231 None
232 },
233 end_timestamp: if file_index
234 .end_timestamp
235 > 0
236 {
237 Some(file_index.end_timestamp)
238 } else {
239 None
240 },
241 file_hash: Some(
242 file_index.file_hash.clone(),
243 ),
244 created_time: current_time.clone(),
245 modified_time: current_time.clone(),
246 is_valid: true,
247 };
248
249 self.file_info_cache
251 .insert(&file_path, file_info.clone());
252 file_info
253 };
254
255 file_infos.push(file_info);
256 }
257
258 Ok(file_infos)
259 }
260
261 pub fn dataset_path(&self) -> &Path {
263 &self.dataset_path
264 }
265
266 pub fn dataset_name(&self) -> &str {
268 &self.dataset_name
269 }
270
271 pub fn read_packet(
281 &mut self,
282 ) -> PcapResult<Option<ValidatedPacket>> {
283 self.initialize()?;
284
285 self.ensure_current_file_open()?;
287
288 loop {
289 if let Some(ref mut reader) =
290 self.current_reader
291 {
292 match reader.read_packet() {
293 Ok(Some(result)) => {
294 self.current_position += 1;
295 return Ok(Some(result));
296 }
297 Ok(None) => {
298 if !self.switch_to_next_file()? {
300 return Ok(None);
302 }
303 continue;
304 }
305 Err(e) => return Err(e),
306 }
307 } else {
308 return Ok(None);
310 }
311 }
312 }
313
314 pub fn read_packet_data_only(
324 &mut self,
325 ) -> PcapResult<Option<DataPacket>> {
326 match self.read_packet()? {
327 Some(result) => Ok(Some(result.packet)),
328 None => Ok(None),
329 }
330 }
331
332 pub fn read_packets(
339 &mut self,
340 count: usize,
341 ) -> PcapResult<Vec<ValidatedPacket>> {
342 self.initialize()?;
343
344 let mut results = Vec::with_capacity(count);
345
346 for _ in 0..count {
348 if let Some(result) = self.read_packet()? {
349 results.push(result);
350 } else {
351 break; }
353 }
354
355 Ok(results)
356 }
357
358 pub fn read_packets_data_only(
365 &mut self,
366 count: usize,
367 ) -> PcapResult<Vec<DataPacket>> {
368 self.initialize()?;
369
370 let mut packets = Vec::with_capacity(count);
371
372 for _ in 0..count {
374 if let Some(packet) =
375 self.read_packet_data_only()?
376 {
377 packets.push(packet);
378 } else {
379 break; }
381 }
382
383 Ok(packets)
384 }
385
386 pub fn reset(&mut self) -> PcapResult<()> {
390 self.initialize()?;
391
392 self.current_position = 0;
394 self.current_file_index = 0;
395
396 if let Some(ref mut reader) = self.current_reader {
398 reader.close();
399 }
400 self.current_reader = None;
401
402 let index = self
404 .index_manager
405 .get_index()
406 .ok_or_else(|| {
407 PcapError::InvalidState(
408 "索引未加载".to_string(),
409 )
410 })?;
411
412 if !index.data_files.files.is_empty() {
413 self.open_file(0)?;
414 }
415
416 info!("读取器已重置到数据集开始位置");
417 Ok(())
418 }
419
420 pub fn index(&self) -> &IndexManager {
423 &self.index_manager
424 }
425
426 pub fn index_mut(&mut self) -> &mut IndexManager {
429 &mut self.index_manager
430 }
431
432 pub fn seek_by_timestamp(
440 &mut self,
441 timestamp_ns: u64,
442 ) -> PcapResult<
443 Option<
444 crate::business::index::types::TimestampPointer,
445 >,
446 > {
447 self.initialize()?;
448
449 let index = self
450 .index_manager
451 .get_index()
452 .ok_or_else(|| {
453 PcapError::InvalidState(
454 "索引未加载".to_string(),
455 )
456 })?;
457
458 let mut closest_entry = None;
460 let mut min_diff = u64::MAX;
461
462 for (ts, pointer) in &index.timestamp_index {
463 let diff = (*ts).abs_diff(timestamp_ns);
464
465 if diff < min_diff {
466 min_diff = diff;
467 closest_entry = Some(pointer.clone());
468 }
469 }
470
471 Ok(closest_entry)
472 }
473
474 pub fn read_packets_by_time_range(
483 &mut self,
484 start_timestamp_ns: u64,
485 end_timestamp_ns: u64,
486 ) -> PcapResult<Vec<ValidatedPacket>> {
487 self.initialize()?;
488
489 let pointers = {
490 let index = self
491 .index_manager
492 .get_index()
493 .ok_or_else(|| {
494 PcapError::InvalidState(
495 "索引未加载".to_string(),
496 )
497 })?;
498
499 index
500 .get_packets_in_range(
501 start_timestamp_ns,
502 end_timestamp_ns,
503 )
504 .into_iter()
505 .cloned()
506 .collect::<Vec<_>>()
507 };
508
509 let mut result_packets = Vec::new();
510 let mut current_file_index = None;
511
512 for pointer in pointers {
514 if current_file_index
516 != Some(pointer.file_index)
517 {
518 self.open_file(pointer.file_index)?;
519 current_file_index =
520 Some(pointer.file_index);
521 }
522
523 self.ensure_current_file_open()?;
525
526 let reader = self
528 .current_reader
529 .as_mut()
530 .ok_or_else(|| {
531 PcapError::InvalidState(
532 "当前文件读取器未初始化"
533 .to_string(),
534 )
535 })?;
536 let packet_result = reader
537 .read_packet_at(pointer.entry.byte_offset);
538
539 match packet_result {
540 Ok(packet) => {
541 let packet_timestamp =
543 packet.packet.get_timestamp_ns();
544 if packet_timestamp
545 >= start_timestamp_ns
546 && packet_timestamp
547 <= end_timestamp_ns
548 {
549 result_packets.push(packet);
550 }
551 }
552 Err(e) => {
553 warn!("读取数据包失败: {}", e);
554 }
556 }
557 }
558
559 Ok(result_packets)
560 }
561
562 pub fn get_cache_stats(&self) -> CacheStats {
564 self.file_info_cache.get_cache_stats()
565 }
566
567 pub fn clear_cache(&mut self) -> PcapResult<()> {
569 let _ = self.file_info_cache.clear();
570 debug!("缓存已清理");
571 Ok(())
572 }
573
574 pub fn seek_to_timestamp(
585 &mut self,
586 timestamp_ns: u64,
587 ) -> PcapResult<u64> {
588 self.initialize()?;
589
590 let (
592 actual_ts,
593 file_index,
594 byte_offset,
595 packet_offset,
596 ) = {
597 let index = self
598 .index_manager
599 .get_index()
600 .ok_or_else(|| {
601 PcapError::InvalidState(
602 "索引未加载".to_string(),
603 )
604 })?;
605
606 let (actual_ts, pointer) = if let Some(ptr) =
608 index.find_packet_by_timestamp(timestamp_ns)
609 {
610 (timestamp_ns, ptr.clone())
611 } else {
612 Self::find_timestamp_ge(&index.timestamp_index, timestamp_ns)
614 .ok_or_else(|| PcapError::InvalidArgument(
615 format!("未找到时间戳 >= {timestamp_ns} 的数据包")
616 ))?
617 };
618
619 let file_index_data =
621 &index.data_files.files[pointer.file_index];
622 let packet_offset = file_index_data
623 .data_packets
624 .iter()
625 .position(|p| {
626 p.timestamp_ns
627 == pointer.entry.timestamp_ns
628 })
629 .unwrap_or(0);
630
631 (
632 actual_ts,
633 pointer.file_index,
634 pointer.entry.byte_offset,
635 packet_offset,
636 )
637 };
638
639 self.open_file(file_index)?;
641
642 if let Some(reader) = self.current_reader.as_mut() {
644 reader.seek_to(byte_offset)?;
645 } else {
646 return Err(PcapError::InvalidState(
647 "文件未打开".to_string(),
648 ));
649 }
650
651 self.current_file_index = file_index;
653
654 let index = self
656 .index_manager
657 .get_index()
658 .ok_or_else(|| {
659 PcapError::InvalidState(
660 "索引未加载".to_string(),
661 )
662 })?;
663 self.current_position = self
664 .calculate_global_position(
665 index,
666 file_index,
667 packet_offset,
668 );
669
670 info!("已跳转到时间戳: {timestamp_ns}ns (实际: {actual_ts}ns), 全局位置: {}",
671 self.current_position);
672
673 Ok(actual_ts)
674 }
675
676 pub fn seek_to_packet(
681 &mut self,
682 packet_index: usize,
683 ) -> PcapResult<()> {
684 self.initialize()?;
685
686 let (target_file_idx, byte_offset, packet_offset) = {
688 let index = self
689 .index_manager
690 .get_index()
691 .ok_or_else(|| {
692 PcapError::InvalidState(
693 "索引未加载".to_string(),
694 )
695 })?;
696
697 if packet_index >= index.total_packets as usize
699 {
700 return Err(PcapError::InvalidArgument(
701 format!("数据包索引 {packet_index} 超出范围 (总数: {})", index.total_packets)
702 ));
703 }
704
705 let mut accumulated = 0usize;
707 let mut target_file_idx = 0;
708 let mut packet_offset = 0;
709
710 for (file_idx, file) in
711 index.data_files.files.iter().enumerate()
712 {
713 let next_accumulated = accumulated
714 + file.packet_count as usize;
715 if packet_index < next_accumulated {
716 target_file_idx = file_idx;
717 packet_offset =
718 packet_index - accumulated;
719 break;
720 }
721 accumulated = next_accumulated;
722 }
723
724 let file =
726 &index.data_files.files[target_file_idx];
727 let packet_entry =
728 &file.data_packets[packet_offset];
729 let byte_offset = packet_entry.byte_offset;
730
731 (target_file_idx, byte_offset, packet_offset)
732 };
733
734 self.open_file(target_file_idx)?;
736 if let Some(reader) = self.current_reader.as_mut() {
737 reader.seek_to(byte_offset)?;
738 } else {
739 return Err(PcapError::InvalidState(
740 "文件未打开".to_string(),
741 ));
742 }
743
744 self.current_file_index = target_file_idx;
746 self.current_position = packet_index as u64;
747
748 info!("已跳转到数据包索引: {packet_index}, 文件: {target_file_idx}, 文件内偏移: {packet_offset}");
749
750 Ok(())
751 }
752
753 pub fn is_eof(&self) -> bool {
755 if let Some(index) = self.index_manager.get_index()
756 {
757 self.current_position >= index.total_packets
758 } else {
759 self.current_reader.is_none()
761 }
762 }
763
764 pub fn total_packets(&self) -> Option<usize> {
766 self.index_manager
767 .get_index()
768 .map(|idx| idx.total_packets as usize)
769 }
770
771 pub fn current_packet_index(&self) -> u64 {
773 self.current_position
774 }
775
776 pub fn progress(&self) -> Option<f64> {
778 self.total_packets().map(|total| {
779 if total == 0 {
780 1.0
781 } else {
782 (self.current_position as f64
783 / total as f64)
784 .min(1.0)
785 }
786 })
787 }
788
789 pub fn skip_packets(
797 &mut self,
798 count: usize,
799 ) -> PcapResult<usize> {
800 let current_idx = self.current_position as usize;
801 let target_idx = current_idx + count;
802
803 let total = self.total_packets().unwrap_or(0);
804 let actual_target = if total == 0 {
806 0
807 } else {
808 target_idx.min(total - 1)
809 };
810 let actual_skipped =
811 actual_target.saturating_sub(current_idx);
812
813 if actual_skipped > 0 {
814 self.seek_to_packet(actual_target)?;
815 }
816
817 Ok(actual_skipped)
818 }
819
820 fn calculate_global_position(
826 &self,
827 index: &crate::business::index::types::PidxIndex,
828 file_index: usize,
829 packet_offset_in_file: usize,
830 ) -> u64 {
831 let mut position = 0u64;
832 for (idx, file) in
833 index.data_files.files.iter().enumerate()
834 {
835 if idx < file_index {
836 position += file.packet_count;
837 } else if idx == file_index {
838 position += packet_offset_in_file as u64;
839 break;
840 }
841 }
842 position
843 }
844
845 fn find_timestamp_ge(
847 timestamp_index: &std::collections::HashMap<
848 u64,
849 crate::business::index::types::TimestampPointer,
850 >,
851 target_ns: u64,
852 ) -> Option<(
853 u64,
854 crate::business::index::types::TimestampPointer,
855 )> {
856 let mut candidates: Vec<u64> = timestamp_index
857 .keys()
858 .filter(|&&ts| ts >= target_ns)
859 .copied()
860 .collect();
861
862 if candidates.is_empty() {
863 return None;
864 }
865
866 candidates.sort_unstable();
867 let closest_ts = candidates[0];
868 timestamp_index
869 .get(&closest_ts)
870 .map(|ptr| (closest_ts, ptr.clone()))
871 }
872
873 fn get_total_size(&self) -> PcapResult<u64> {
875 if let Some(cached_size) =
876 *self.total_size_cache.borrow()
877 {
878 return Ok(cached_size);
879 }
880
881 let index = self
882 .index_manager
883 .get_index()
884 .ok_or_else(|| {
885 PcapError::InvalidState(
886 "索引未加载".to_string(),
887 )
888 })?;
889
890 let total_size: u64 = index
891 .data_files
892 .files
893 .iter()
894 .map(|f| f.file_size)
895 .sum();
896
897 *self.total_size_cache.borrow_mut() =
898 Some(total_size);
899 Ok(total_size)
900 }
901
902 fn open_file(
904 &mut self,
905 file_index: usize,
906 ) -> PcapResult<()> {
907 let index = self
908 .index_manager
909 .get_index()
910 .ok_or_else(|| {
911 PcapError::InvalidState(
912 "索引未加载".to_string(),
913 )
914 })?;
915
916 if file_index >= index.data_files.files.len() {
917 return Err(PcapError::InvalidArgument(
918 format!("文件索引超出范围: {file_index}"),
919 ));
920 }
921
922 if let Some(ref mut reader) = self.current_reader {
924 reader.close();
925 }
926
927 let file_info = &index.data_files.files[file_index];
929 let file_path =
930 self.dataset_path.join(&file_info.file_name);
931
932 let mut reader =
933 crate::data::file_reader::PcapFileReader::new(
934 self.configuration.clone(),
935 );
936 reader.open(&file_path)?;
937
938 self.current_reader = Some(reader);
939 self.current_file_index = file_index;
940
941 debug!("已打开文件: {file_path:?}");
942 Ok(())
943 }
944
945 fn switch_to_next_file(&mut self) -> PcapResult<bool> {
947 let index = self
948 .index_manager
949 .get_index()
950 .ok_or_else(|| {
951 PcapError::InvalidState(
952 "索引未加载".to_string(),
953 )
954 })?;
955
956 if self.current_file_index + 1
957 >= index.data_files.files.len()
958 {
959 return Ok(false);
961 }
962
963 self.open_file(self.current_file_index + 1)?;
964 Ok(true)
965 }
966
967 fn ensure_current_file_open(
969 &mut self,
970 ) -> PcapResult<()> {
971 if self.current_reader.is_none() {
972 let index = self
973 .index_manager
974 .get_index()
975 .ok_or_else(|| {
976 PcapError::InvalidState(
977 "索引未加载".to_string(),
978 )
979 })?;
980
981 if !index.data_files.files.is_empty() {
982 self.open_file(0)?;
983 }
984 }
985 Ok(())
986 }
987
988 pub fn read_packet_by_timestamp(
990 &mut self,
991 timestamp_ns: u64,
992 ) -> PcapResult<Option<ValidatedPacket>> {
993 let pointer = {
994 let index = self
995 .index_manager
996 .get_index()
997 .ok_or_else(|| {
998 PcapError::InvalidState(
999 "索引未加载".to_string(),
1000 )
1001 })?;
1002
1003 match index
1004 .find_packet_by_timestamp(timestamp_ns)
1005 {
1006 Some(ptr) => ptr.clone(),
1007 None => return Ok(None),
1008 }
1009 };
1010
1011 if pointer.file_index != self.current_file_index {
1013 self.open_file(pointer.file_index)?;
1014 }
1015
1016 self.ensure_current_file_open()?;
1018
1019 let reader = self
1021 .current_reader
1022 .as_mut()
1023 .ok_or_else(|| {
1024 PcapError::InvalidState(
1025 "当前文件读取器未初始化".to_string(),
1026 )
1027 })?;
1028 let packet_result = reader
1029 .read_packet_at(pointer.entry.byte_offset);
1030
1031 match packet_result {
1032 Ok(packet) => {
1033 if packet.packet.get_timestamp_ns()
1035 == timestamp_ns
1036 {
1037 Ok(Some(packet))
1038 } else {
1039 Err(PcapError::InvalidState(
1040 "读取的数据包时间戳不匹配"
1041 .to_string(),
1042 ))
1043 }
1044 }
1045 Err(e) => Err(e),
1046 }
1047 }
1048}
1049
1050impl Drop for PcapReader {
1051 fn drop(&mut self) {
1052 if let Some(ref mut reader) = self.current_reader {
1054 reader.close();
1055 }
1056 debug!("PcapReader已清理");
1057 }
1058}