1use log::{debug, info, warn};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14 DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22pub struct PcapReader {
30 dataset_path: PathBuf,
32 dataset_name: String,
34 index_manager: IndexManager,
36 configuration: ReaderConfig,
38 current_reader: Option<PcapFileReader>,
40 current_file_index: usize,
42 current_position: u64,
44 file_info_cache: FileInfoCache,
46 total_size_cache: RefCell<Option<u64>>,
48 is_initialized: bool,
50}
51
52impl PcapReader {
53 pub fn new<P: AsRef<Path>>(
62 base_path: P,
63 dataset_name: &str,
64 ) -> PcapResult<Self> {
65 Self::new_with_config(
66 base_path,
67 dataset_name,
68 ReaderConfig::default(),
69 )
70 }
71
72 pub fn new_with_config<P: AsRef<Path>>(
82 base_path: P,
83 dataset_name: &str,
84 configuration: ReaderConfig,
85 ) -> PcapResult<Self> {
86 configuration.validate().map_err(|e| {
88 PcapError::InvalidArgument(format!(
89 "读取器配置无效: {e}"
90 ))
91 })?;
92
93 let dataset_path =
94 base_path.as_ref().join(dataset_name);
95
96 if !dataset_path.exists() {
98 return Err(PcapError::DirectoryNotFound(
99 ERROR_DATASET_NOT_FOUND.to_string(),
100 ));
101 }
102
103 if !dataset_path.is_dir() {
104 return Err(PcapError::InvalidArgument(
105 ERROR_INVALID_DATASET.to_string(),
106 ));
107 }
108
109 let index_manager =
111 IndexManager::new(base_path, dataset_name)?;
112
113 let cache_size = configuration.index_cache_size;
115
116 info!("PcapReader已创建 - 数据集: {dataset_name}");
117
118 Ok(Self {
119 dataset_path,
120 dataset_name: dataset_name.to_string(),
121 index_manager,
122 configuration,
123 current_reader: None,
124 current_file_index: 0,
125 current_position: 0,
126 file_info_cache: FileInfoCache::new(cache_size),
127 total_size_cache: RefCell::new(None),
128 is_initialized: false,
129 })
130 }
131
132 pub fn initialize(&mut self) -> PcapResult<()> {
136 if self.is_initialized {
137 return Ok(());
138 }
139
140 info!("初始化PcapReader...");
141
142 let _index = self.index_manager.ensure_index()?;
144
145 self.is_initialized = true;
146 info!("PcapReader初始化完成");
147 Ok(())
148 }
149
150 pub fn get_dataset_info(
152 &mut self,
153 ) -> PcapResult<DatasetInfo> {
154 self.initialize()?;
155
156 let index = self
157 .index_manager
158 .get_index()
159 .ok_or_else(|| {
160 PcapError::InvalidState(
161 "索引未加载".to_string(),
162 )
163 })?;
164
165 use chrono::Utc;
166
167 Ok(DatasetInfo {
168 name: self.dataset_name.clone(),
169 path: self.dataset_path.clone(),
170 file_count: index.data_files.files.len(),
171 total_packets: index.total_packets,
172 total_size: self.get_total_size()?,
173 start_timestamp: if index.start_timestamp > 0 {
174 Some(index.start_timestamp)
175 } else {
176 None
177 },
178 end_timestamp: if index.end_timestamp > 0 {
179 Some(index.end_timestamp)
180 } else {
181 None
182 },
183 created_time: Utc::now().to_rfc3339(),
184 modified_time: Utc::now().to_rfc3339(),
185 has_index: true,
186 })
187 }
188
189 pub fn get_file_info_list(
191 &mut self,
192 ) -> PcapResult<Vec<FileInfo>> {
193 self.initialize()?;
194
195 let index = self
196 .index_manager
197 .get_index()
198 .ok_or_else(|| {
199 PcapError::InvalidState(
200 "索引未加载".to_string(),
201 )
202 })?;
203
204 use chrono::Utc;
205 let current_time = Utc::now().to_rfc3339();
206
207 let mut file_infos = Vec::new();
208 for file_index in &index.data_files.files {
209 let file_path = self
210 .dataset_path
211 .join(&file_index.file_name);
212
213 let file_info = if let Some(cached_info) =
215 self.file_info_cache.get(&file_path)
216 {
217 cached_info
218 } else {
219 let file_info = FileInfo {
221 file_name: file_index.file_name.clone(),
222 file_path: file_path.clone(),
223 file_size: file_index.file_size,
224 packet_count: file_index.packet_count,
225 start_timestamp: if file_index
226 .start_timestamp
227 > 0
228 {
229 Some(file_index.start_timestamp)
230 } else {
231 None
232 },
233 end_timestamp: if file_index
234 .end_timestamp
235 > 0
236 {
237 Some(file_index.end_timestamp)
238 } else {
239 None
240 },
241 file_hash: Some(
242 file_index.file_hash.clone(),
243 ),
244 created_time: current_time.clone(),
245 modified_time: current_time.clone(),
246 is_valid: true,
247 };
248
249 self.file_info_cache
251 .insert(&file_path, file_info.clone());
252 file_info
253 };
254
255 file_infos.push(file_info);
256 }
257
258 Ok(file_infos)
259 }
260
261 pub fn dataset_path(&self) -> &Path {
263 &self.dataset_path
264 }
265
266 pub fn dataset_name(&self) -> &str {
268 &self.dataset_name
269 }
270
271 pub fn read_packet(
281 &mut self,
282 ) -> PcapResult<Option<ValidatedPacket>> {
283 self.initialize()?;
284
285 self.ensure_current_file_open()?;
287
288 loop {
289 if let Some(ref mut reader) =
290 self.current_reader
291 {
292 match reader.read_packet() {
293 Ok(Some(result)) => {
294 self.current_position += 1;
295 return Ok(Some(result));
296 }
297 Ok(None) => {
298 if !self.switch_to_next_file()? {
300 return Ok(None);
302 }
303 continue;
304 }
305 Err(e) => return Err(e),
306 }
307 } else {
308 return Ok(None);
310 }
311 }
312 }
313
314 pub fn read_packet_data_only(
324 &mut self,
325 ) -> PcapResult<Option<DataPacket>> {
326 match self.read_packet()? {
327 Some(result) => Ok(Some(result.packet)),
328 None => Ok(None),
329 }
330 }
331
332 pub fn read_packets(
339 &mut self,
340 count: usize,
341 ) -> PcapResult<Vec<ValidatedPacket>> {
342 self.initialize()?;
343
344 let mut results = Vec::with_capacity(count);
345
346 for _ in 0..count {
348 if let Some(result) = self.read_packet()? {
349 results.push(result);
350 } else {
351 break; }
353 }
354
355 Ok(results)
356 }
357
358 pub fn read_packets_data_only(
365 &mut self,
366 count: usize,
367 ) -> PcapResult<Vec<DataPacket>> {
368 self.initialize()?;
369
370 let mut packets = Vec::with_capacity(count);
371
372 for _ in 0..count {
374 if let Some(packet) =
375 self.read_packet_data_only()?
376 {
377 packets.push(packet);
378 } else {
379 break; }
381 }
382
383 Ok(packets)
384 }
385
386 pub fn reset(&mut self) -> PcapResult<()> {
390 self.initialize()?;
391
392 self.current_position = 0;
394 self.current_file_index = 0;
395
396 if let Some(ref mut reader) = self.current_reader {
398 reader.close();
399 }
400 self.current_reader = None;
401
402 let index = self
404 .index_manager
405 .get_index()
406 .ok_or_else(|| {
407 PcapError::InvalidState(
408 "索引未加载".to_string(),
409 )
410 })?;
411
412 if !index.data_files.files.is_empty() {
413 self.open_file(0)?;
414 }
415
416 info!("读取器已重置到数据集开始位置");
417 Ok(())
418 }
419
420 pub fn index(&self) -> &IndexManager {
423 &self.index_manager
424 }
425
426 pub fn index_mut(&mut self) -> &mut IndexManager {
429 &mut self.index_manager
430 }
431
432 pub fn seek_by_timestamp(
440 &mut self,
441 timestamp_ns: u64,
442 ) -> PcapResult<
443 Option<
444 crate::business::index::types::TimestampPointer,
445 >,
446 > {
447 self.initialize()?;
448
449 let index = self
450 .index_manager
451 .get_index()
452 .ok_or_else(|| {
453 PcapError::InvalidState(
454 "索引未加载".to_string(),
455 )
456 })?;
457
458 let mut closest_entry = None;
460 let mut min_diff = u64::MAX;
461
462 for (ts, pointer) in &index.timestamp_index {
463 let diff = (*ts).abs_diff(timestamp_ns);
464
465 if diff < min_diff {
466 min_diff = diff;
467 closest_entry = Some(pointer.clone());
468 }
469 }
470
471 Ok(closest_entry)
472 }
473
474 pub fn read_packets_by_time_range(
483 &mut self,
484 start_timestamp_ns: u64,
485 end_timestamp_ns: u64,
486 ) -> PcapResult<Vec<ValidatedPacket>> {
487 self.initialize()?;
488
489 let pointers = {
490 let index = self
491 .index_manager
492 .get_index()
493 .ok_or_else(|| {
494 PcapError::InvalidState(
495 "索引未加载".to_string(),
496 )
497 })?;
498
499 index
500 .get_packets_in_range(
501 start_timestamp_ns,
502 end_timestamp_ns,
503 )
504 .into_iter()
505 .cloned()
506 .collect::<Vec<_>>()
507 };
508
509 let mut result_packets = Vec::new();
510 let mut current_file_index = None;
511
512 for pointer in pointers {
514 if current_file_index
516 != Some(pointer.file_index)
517 {
518 self.open_file(pointer.file_index)?;
519 current_file_index =
520 Some(pointer.file_index);
521 }
522
523 self.ensure_current_file_open()?;
525
526 let reader = self
528 .current_reader
529 .as_mut()
530 .ok_or_else(|| {
531 PcapError::InvalidState(
532 "当前文件读取器未初始化"
533 .to_string(),
534 )
535 })?;
536 let packet_result = reader
537 .read_packet_at(pointer.entry.byte_offset);
538
539 match packet_result {
540 Ok(packet) => {
541 let packet_timestamp =
543 packet.packet.get_timestamp_ns();
544 if packet_timestamp
545 >= start_timestamp_ns
546 && packet_timestamp
547 <= end_timestamp_ns
548 {
549 result_packets.push(packet);
550 }
551 }
552 Err(e) => {
553 warn!("读取数据包失败: {}", e);
554 }
556 }
557 }
558
559 Ok(result_packets)
560 }
561
562 pub fn get_cache_stats(&self) -> CacheStats {
564 self.file_info_cache.get_cache_stats()
565 }
566
567 pub fn clear_cache(&mut self) -> PcapResult<()> {
569 let _ = self.file_info_cache.clear();
570 debug!("缓存已清理");
571 Ok(())
572 }
573
574 fn get_total_size(&self) -> PcapResult<u64> {
580 if let Some(cached_size) =
581 *self.total_size_cache.borrow()
582 {
583 return Ok(cached_size);
584 }
585
586 let index = self
587 .index_manager
588 .get_index()
589 .ok_or_else(|| {
590 PcapError::InvalidState(
591 "索引未加载".to_string(),
592 )
593 })?;
594
595 let total_size: u64 = index
596 .data_files
597 .files
598 .iter()
599 .map(|f| f.file_size)
600 .sum();
601
602 *self.total_size_cache.borrow_mut() =
603 Some(total_size);
604 Ok(total_size)
605 }
606
607 fn open_file(
609 &mut self,
610 file_index: usize,
611 ) -> PcapResult<()> {
612 let index = self
613 .index_manager
614 .get_index()
615 .ok_or_else(|| {
616 PcapError::InvalidState(
617 "索引未加载".to_string(),
618 )
619 })?;
620
621 if file_index >= index.data_files.files.len() {
622 return Err(PcapError::InvalidArgument(
623 format!("文件索引超出范围: {file_index}"),
624 ));
625 }
626
627 if let Some(ref mut reader) = self.current_reader {
629 reader.close();
630 }
631
632 let file_info = &index.data_files.files[file_index];
634 let file_path =
635 self.dataset_path.join(&file_info.file_name);
636
637 let mut reader =
638 crate::data::file_reader::PcapFileReader::new(
639 self.configuration.clone(),
640 );
641 reader.open(&file_path)?;
642
643 self.current_reader = Some(reader);
644 self.current_file_index = file_index;
645
646 debug!("已打开文件: {file_path:?}");
647 Ok(())
648 }
649
650 fn switch_to_next_file(&mut self) -> PcapResult<bool> {
652 let index = self
653 .index_manager
654 .get_index()
655 .ok_or_else(|| {
656 PcapError::InvalidState(
657 "索引未加载".to_string(),
658 )
659 })?;
660
661 if self.current_file_index + 1
662 >= index.data_files.files.len()
663 {
664 return Ok(false);
666 }
667
668 self.open_file(self.current_file_index + 1)?;
669 Ok(true)
670 }
671
672 fn ensure_current_file_open(
674 &mut self,
675 ) -> PcapResult<()> {
676 if self.current_reader.is_none() {
677 let index = self
678 .index_manager
679 .get_index()
680 .ok_or_else(|| {
681 PcapError::InvalidState(
682 "索引未加载".to_string(),
683 )
684 })?;
685
686 if !index.data_files.files.is_empty() {
687 self.open_file(0)?;
688 }
689 }
690 Ok(())
691 }
692
693 pub fn read_packet_by_timestamp(
695 &mut self,
696 timestamp_ns: u64,
697 ) -> PcapResult<Option<ValidatedPacket>> {
698 let pointer = {
699 let index = self
700 .index_manager
701 .get_index()
702 .ok_or_else(|| {
703 PcapError::InvalidState(
704 "索引未加载".to_string(),
705 )
706 })?;
707
708 match index
709 .find_packet_by_timestamp(timestamp_ns)
710 {
711 Some(ptr) => ptr.clone(),
712 None => return Ok(None),
713 }
714 };
715
716 if pointer.file_index != self.current_file_index {
718 self.open_file(pointer.file_index)?;
719 }
720
721 self.ensure_current_file_open()?;
723
724 let reader = self
726 .current_reader
727 .as_mut()
728 .ok_or_else(|| {
729 PcapError::InvalidState(
730 "当前文件读取器未初始化".to_string(),
731 )
732 })?;
733 let packet_result = reader
734 .read_packet_at(pointer.entry.byte_offset);
735
736 match packet_result {
737 Ok(packet) => {
738 if packet.packet.get_timestamp_ns()
740 == timestamp_ns
741 {
742 Ok(Some(packet))
743 } else {
744 Err(PcapError::InvalidState(
745 "读取的数据包时间戳不匹配"
746 .to_string(),
747 ))
748 }
749 }
750 Err(e) => Err(e),
751 }
752 }
753}
754
755impl Drop for PcapReader {
756 fn drop(&mut self) {
757 if let Some(ref mut reader) = self.current_reader {
759 reader.close();
760 }
761 debug!("PcapReader已清理");
762 }
763}