pcapfile_io/api/
reader.rs1use log::{debug, info};
6use std::cell::RefCell;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::ReaderConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_reader::PcapFileReader;
13use crate::data::models::{
14 DataPacket, DatasetInfo, FileInfo, ValidatedPacket,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17
18const ERROR_DATASET_NOT_FOUND: &str = "数据集目录不存在";
20const ERROR_INVALID_DATASET: &str = "无效的数据集目录";
21
22pub struct PcapReader {
30 dataset_path: PathBuf,
32 dataset_name: String,
34 index_manager: IndexManager,
36 configuration: ReaderConfig,
38 current_reader: Option<PcapFileReader>,
40 current_file_index: usize,
42 current_position: u64,
44 file_info_cache: FileInfoCache,
46 total_size_cache: RefCell<Option<u64>>,
48 is_initialized: bool,
50}
51
52impl PcapReader {
53 pub fn new<P: AsRef<Path>>(
62 base_path: P,
63 dataset_name: &str,
64 ) -> PcapResult<Self> {
65 Self::new_with_config(
66 base_path,
67 dataset_name,
68 ReaderConfig::default(),
69 )
70 }
71
72 pub fn new_with_config<P: AsRef<Path>>(
82 base_path: P,
83 dataset_name: &str,
84 configuration: ReaderConfig,
85 ) -> PcapResult<Self> {
86 let dataset_path =
87 base_path.as_ref().join(dataset_name);
88
89 if !dataset_path.exists() {
91 return Err(PcapError::DirectoryNotFound(
92 ERROR_DATASET_NOT_FOUND.to_string(),
93 ));
94 }
95
96 if !dataset_path.is_dir() {
97 return Err(PcapError::InvalidArgument(
98 ERROR_INVALID_DATASET.to_string(),
99 ));
100 }
101
102 let index_manager =
104 IndexManager::new(base_path, dataset_name)?;
105
106 let cache_size = configuration.index_cache_size;
108
109 info!("PcapReader已创建 - 数据集: {dataset_name}");
110
111 Ok(Self {
112 dataset_path,
113 dataset_name: dataset_name.to_string(),
114 index_manager,
115 configuration,
116 current_reader: None,
117 current_file_index: 0,
118 current_position: 0,
119 file_info_cache: FileInfoCache::new(cache_size),
120 total_size_cache: RefCell::new(None),
121 is_initialized: false,
122 })
123 }
124
125 pub fn initialize(&mut self) -> PcapResult<()> {
129 if self.is_initialized {
130 return Ok(());
131 }
132
133 info!("初始化PcapReader...");
134
135 let _index = self.index_manager.ensure_index()?;
137
138 self.is_initialized = true;
139 info!("PcapReader初始化完成");
140 Ok(())
141 }
142
143 pub fn get_dataset_info(
145 &mut self,
146 ) -> PcapResult<DatasetInfo> {
147 self.initialize()?;
148
149 let index = self
150 .index_manager
151 .get_index()
152 .ok_or_else(|| {
153 PcapError::InvalidState(
154 "索引未加载".to_string(),
155 )
156 })?;
157
158 use chrono::Utc;
159
160 Ok(DatasetInfo {
161 name: self.dataset_name.clone(),
162 path: self.dataset_path.clone(),
163 file_count: index.data_files.files.len(),
164 total_packets: index.total_packets,
165 total_size: self.get_total_size()?,
166 start_timestamp: if index.start_timestamp > 0 {
167 Some(index.start_timestamp)
168 } else {
169 None
170 },
171 end_timestamp: if index.end_timestamp > 0 {
172 Some(index.end_timestamp)
173 } else {
174 None
175 },
176 created_time: Utc::now().to_rfc3339(),
177 modified_time: Utc::now().to_rfc3339(),
178 has_index: true,
179 })
180 }
181
182 pub fn get_file_info_list(
184 &mut self,
185 ) -> PcapResult<Vec<FileInfo>> {
186 self.initialize()?;
187
188 let index = self
189 .index_manager
190 .get_index()
191 .ok_or_else(|| {
192 PcapError::InvalidState(
193 "索引未加载".to_string(),
194 )
195 })?;
196
197 use chrono::Utc;
198 let current_time = Utc::now().to_rfc3339();
199
200 let mut file_infos = Vec::new();
201 for file_index in &index.data_files.files {
202 let file_info = FileInfo {
203 file_name: file_index.file_name.clone(),
204 file_path: self
205 .dataset_path
206 .join(&file_index.file_name),
207 file_size: file_index.file_size,
208 packet_count: file_index.packet_count,
209 start_timestamp: if file_index
210 .start_timestamp
211 > 0
212 {
213 Some(file_index.start_timestamp)
214 } else {
215 None
216 },
217 end_timestamp: if file_index.end_timestamp
218 > 0
219 {
220 Some(file_index.end_timestamp)
221 } else {
222 None
223 },
224 file_hash: Some(
225 file_index.file_hash.clone(),
226 ),
227 created_time: current_time.clone(),
228 modified_time: current_time.clone(),
229 is_valid: true,
230 };
231 file_infos.push(file_info);
232 }
233
234 Ok(file_infos)
235 }
236
237 pub fn dataset_path(&self) -> &Path {
239 &self.dataset_path
240 }
241
242 pub fn dataset_name(&self) -> &str {
244 &self.dataset_name
245 }
246
247 pub fn read_packet(
257 &mut self,
258 ) -> PcapResult<Option<ValidatedPacket>> {
259 self.initialize()?;
260
261 self.ensure_current_file_open()?;
263
264 loop {
265 if let Some(ref mut reader) =
266 self.current_reader
267 {
268 match reader.read_packet() {
269 Ok(Some(result)) => {
270 self.current_position += 1;
271 return Ok(Some(result));
272 }
273 Ok(None) => {
274 if !self.switch_to_next_file()? {
276 return Ok(None);
278 }
279 continue;
280 }
281 Err(e) => return Err(e),
282 }
283 } else {
284 return Ok(None);
286 }
287 }
288 }
289
290 pub fn read_packet_data_only(
300 &mut self,
301 ) -> PcapResult<Option<DataPacket>> {
302 match self.read_packet()? {
303 Some(result) => Ok(Some(result.packet)),
304 None => Ok(None),
305 }
306 }
307
308 pub fn read_packets(
315 &mut self,
316 count: usize,
317 ) -> PcapResult<Vec<ValidatedPacket>> {
318 self.initialize()?;
319
320 let mut results = Vec::with_capacity(count);
321
322 for _ in 0..count {
324 if let Some(result) = self.read_packet()? {
325 results.push(result);
326 } else {
327 break; }
329 }
330
331 Ok(results)
332 }
333
334 pub fn read_packets_data_only(
341 &mut self,
342 count: usize,
343 ) -> PcapResult<Vec<DataPacket>> {
344 self.initialize()?;
345
346 let mut packets = Vec::with_capacity(count);
347
348 for _ in 0..count {
350 if let Some(packet) =
351 self.read_packet_data_only()?
352 {
353 packets.push(packet);
354 } else {
355 break; }
357 }
358
359 Ok(packets)
360 }
361
362 pub fn reset(&mut self) -> PcapResult<()> {
366 self.initialize()?;
367
368 self.current_position = 0;
370 self.current_file_index = 0;
371
372 if let Some(ref mut reader) = self.current_reader {
374 reader.close();
375 }
376 self.current_reader = None;
377
378 let index = self
380 .index_manager
381 .get_index()
382 .ok_or_else(|| {
383 PcapError::InvalidState(
384 "索引未加载".to_string(),
385 )
386 })?;
387
388 if !index.data_files.files.is_empty() {
389 self.open_file(0)?;
390 }
391
392 info!("读取器已重置到数据集开始位置");
393 Ok(())
394 }
395
396 pub fn index(&self) -> &IndexManager {
399 &self.index_manager
400 }
401
402 pub fn index_mut(&mut self) -> &mut IndexManager {
405 &mut self.index_manager
406 }
407
408 pub fn get_cache_stats(&self) -> CacheStats {
410 self.file_info_cache.get_cache_stats()
411 }
412
413 pub fn clear_cache(&mut self) -> PcapResult<()> {
415 let _ = self.file_info_cache.clear();
416 debug!("缓存已清理");
417 Ok(())
418 }
419
420 fn get_total_size(&self) -> PcapResult<u64> {
426 if let Some(cached_size) =
427 *self.total_size_cache.borrow()
428 {
429 return Ok(cached_size);
430 }
431
432 let index = self
433 .index_manager
434 .get_index()
435 .ok_or_else(|| {
436 PcapError::InvalidState(
437 "索引未加载".to_string(),
438 )
439 })?;
440
441 let total_size: u64 = index
442 .data_files
443 .files
444 .iter()
445 .map(|f| f.file_size)
446 .sum();
447
448 *self.total_size_cache.borrow_mut() =
449 Some(total_size);
450 Ok(total_size)
451 }
452
453 fn open_file(
455 &mut self,
456 file_index: usize,
457 ) -> PcapResult<()> {
458 let index = self
459 .index_manager
460 .get_index()
461 .ok_or_else(|| {
462 PcapError::InvalidState(
463 "索引未加载".to_string(),
464 )
465 })?;
466
467 if file_index >= index.data_files.files.len() {
468 return Err(PcapError::InvalidArgument(
469 format!("文件索引超出范围: {file_index}"),
470 ));
471 }
472
473 if let Some(ref mut reader) = self.current_reader {
475 reader.close();
476 }
477
478 let file_info = &index.data_files.files[file_index];
480 let file_path =
481 self.dataset_path.join(&file_info.file_name);
482
483 let mut reader =
484 crate::data::file_reader::PcapFileReader::new(
485 self.configuration.clone(),
486 );
487 reader.open(&file_path)?;
488
489 self.current_reader = Some(reader);
490 self.current_file_index = file_index;
491
492 debug!("已打开文件: {file_path:?}");
493 Ok(())
494 }
495
496 fn switch_to_next_file(&mut self) -> PcapResult<bool> {
498 let index = self
499 .index_manager
500 .get_index()
501 .ok_or_else(|| {
502 PcapError::InvalidState(
503 "索引未加载".to_string(),
504 )
505 })?;
506
507 if self.current_file_index + 1
508 >= index.data_files.files.len()
509 {
510 return Ok(false);
512 }
513
514 self.open_file(self.current_file_index + 1)?;
515 Ok(true)
516 }
517
518 fn ensure_current_file_open(
520 &mut self,
521 ) -> PcapResult<()> {
522 if self.current_reader.is_none() {
523 let index = self
524 .index_manager
525 .get_index()
526 .ok_or_else(|| {
527 PcapError::InvalidState(
528 "索引未加载".to_string(),
529 )
530 })?;
531
532 if !index.data_files.files.is_empty() {
533 self.open_file(0)?;
534 }
535 }
536 Ok(())
537 }
538}
539
540impl Drop for PcapReader {
541 fn drop(&mut self) {
542 if let Some(ref mut reader) = self.current_reader {
544 reader.close();
545 }
546 debug!("PcapReader已清理");
547 }
548}