1use log::{debug, info, warn};
6use std::fs;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::WriterConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_writer::PcapFileWriter;
13use crate::data::models::{
14 DataPacket, DatasetInfo, FileInfo,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17use crate::foundation::utils::DateTimeExtensions;
18use chrono::Utc;
19
20pub struct PcapWriter {
28 dataset_path: PathBuf,
30 dataset_name: String,
32 index_manager: IndexManager,
34 configuration: WriterConfig,
36 current_writer: Option<PcapFileWriter>,
38 current_file_index: usize,
40 current_file_size: u64,
42 created_files: Vec<PathBuf>,
44 file_info_cache: FileInfoCache,
46 total_packet_count: u64,
48 current_file_packet_count: u64,
50 is_initialized: bool,
52 is_finalized: bool,
54}
55
56impl PcapWriter {
57 pub fn new<P: AsRef<Path>>(
66 base_path: P,
67 dataset_name: &str,
68 ) -> PcapResult<Self> {
69 Self::new_with_config(
70 base_path,
71 dataset_name,
72 WriterConfig::default(),
73 )
74 }
75
76 pub fn new_with_config<P: AsRef<Path>>(
86 base_path: P,
87 dataset_name: &str,
88 configuration: WriterConfig,
89 ) -> PcapResult<Self> {
90 configuration.validate().map_err(|e| {
92 PcapError::InvalidArgument(format!(
93 "写入器配置无效: {e}"
94 ))
95 })?;
96
97 let dataset_path =
98 base_path.as_ref().join(dataset_name);
99
100 if !dataset_path.exists() {
102 fs::create_dir_all(&dataset_path)
103 .map_err(PcapError::Io)?;
104 info!("已创建数据集目录: {dataset_path:?}");
105 }
106
107 if !dataset_path.is_dir() {
108 return Err(PcapError::InvalidArgument(
109 format!(
110 "指定路径不是目录: {dataset_path:?}"
111 ),
112 ));
113 }
114
115 let index_manager =
117 IndexManager::new(base_path, dataset_name)?;
118
119 let cache_size = configuration.index_cache_size;
121
122 info!("PcapWriter已创建 - 数据集: {dataset_name}");
123
124 Ok(Self {
125 dataset_path,
126 dataset_name: dataset_name.to_string(),
127 index_manager,
128 configuration,
129 current_writer: None,
130 current_file_index: 0,
131 current_file_size: 0,
132 created_files: Vec::new(),
133 file_info_cache: FileInfoCache::new(cache_size),
134 total_packet_count: 0,
135 current_file_packet_count: 0,
136 is_initialized: false,
137 is_finalized: false,
138 })
139 }
140
141 pub fn initialize(&mut self) -> PcapResult<()> {
143 if self.is_initialized {
144 return Ok(());
145 }
146
147 info!("初始化PcapWriter...");
148
149 self.create_new_file()?;
151
152 self.is_initialized = true;
153 info!("PcapWriter初始化完成");
154 Ok(())
155 }
156
157 pub fn finalize(&mut self) -> PcapResult<()> {
159 if self.is_finalized {
160 return Ok(());
161 }
162
163 info!("正在完成PcapWriter...");
164
165 if let Some(ref mut writer) = self.current_writer {
167 writer.flush()?;
168 writer.close();
169 }
170 self.current_writer = None;
171
172 self.index_manager.rebuild_index()?;
174
175 self.is_finalized = true;
176 info!(
177 "PcapWriter已完成 - 总文件数: {}, 总数据包数: {}",
178 self.created_files.len(),
179 self.total_packet_count
180 );
181
182 Ok(())
183 }
184
185 pub fn get_dataset_info(&self) -> DatasetInfo {
187 use chrono::Utc;
188
189 let pidx_path = self.dataset_path.join(".pidx");
191 let has_index =
192 pidx_path.exists() && pidx_path.is_file();
193
194 DatasetInfo {
195 name: self.dataset_name.clone(),
196 path: self.dataset_path.clone(),
197 file_count: self.created_files.len(),
198 total_packets: self.total_packet_count,
199 total_size: self.get_total_size(),
200 start_timestamp: None, end_timestamp: None, created_time: Utc::now().to_rfc3339(),
203 modified_time: Utc::now().to_rfc3339(),
204 has_index,
205 }
206 }
207
208 pub fn get_file_info_list(&self) -> Vec<FileInfo> {
210 let mut file_infos = Vec::new();
211
212 use chrono::Utc;
213 let current_time = Utc::now().to_rfc3339();
214
215 let index_info = self.index_manager.get_index();
217
218 for file_path in &self.created_files {
219 if let Ok(metadata) = fs::metadata(file_path) {
220 let file_name = file_path
221 .file_name()
222 .and_then(|name| name.to_str())
223 .unwrap_or("")
224 .to_string();
225
226 let (
228 packet_count,
229 start_timestamp,
230 end_timestamp,
231 file_hash,
232 ) = if let Some(index) = index_info {
233 if let Some(file_index) =
234 index.data_files.files.iter().find(
235 |f| f.file_name == file_name,
236 )
237 {
238 (
239 file_index.packet_count,
240 if file_index.start_timestamp
241 > 0
242 {
243 Some(
244 file_index
245 .start_timestamp,
246 )
247 } else {
248 None
249 },
250 if file_index.end_timestamp > 0
251 {
252 Some(
253 file_index
254 .end_timestamp,
255 )
256 } else {
257 None
258 },
259 Some(
260 file_index
261 .file_hash
262 .clone(),
263 ),
264 )
265 } else {
266 (0, None, None, None)
267 }
268 } else {
269 (0, None, None, None)
270 };
271
272 let file_info = FileInfo {
273 file_name,
274 file_path: file_path.clone(),
275 file_size: metadata.len(),
276 packet_count,
277 start_timestamp,
278 end_timestamp,
279 file_hash,
280 created_time: current_time.clone(),
281 modified_time: current_time.clone(),
282 is_valid: true,
283 };
284 file_infos.push(file_info);
285 }
286 }
287
288 file_infos
289 }
290
291 pub fn index(&self) -> &IndexManager {
294 &self.index_manager
295 }
296
297 pub fn index_mut(&mut self) -> &mut IndexManager {
300 &mut self.index_manager
301 }
302
303 pub fn dataset_path(&self) -> &Path {
305 &self.dataset_path
306 }
307
308 pub fn dataset_name(&self) -> &str {
310 &self.dataset_name
311 }
312
313 pub fn write_packet(
322 &mut self,
323 packet: &DataPacket,
324 ) -> PcapResult<()> {
325 if self.is_finalized {
326 return Err(PcapError::InvalidState(
327 "写入器已完成,无法继续写入".to_string(),
328 ));
329 }
330
331 if !self.is_initialized {
333 self.initialize()?;
334 }
335
336 if self.should_switch_file() {
338 self.switch_to_new_file()?;
339 }
340
341 if let Some(ref mut writer) = self.current_writer {
343 writer.write_packet(packet)?;
344
345 self.current_file_size +=
347 packet.packet_length() as u64 + 16; self.current_file_packet_count += 1;
349 self.total_packet_count += 1;
350
351 debug!(
352 "已写入数据包,当前文件大小: {} 字节",
353 self.current_file_size
354 );
355 } else {
356 return Err(PcapError::InvalidState(
357 "没有可用的写入器".to_string(),
358 ));
359 }
360
361 Ok(())
362 }
363
364 pub fn write_packets(
371 &mut self,
372 packets: &[DataPacket],
373 ) -> PcapResult<()> {
374 for packet in packets {
375 self.write_packet(packet)?;
376 }
377 Ok(())
378 }
379
380 pub fn flush(&mut self) -> PcapResult<()> {
384 if let Some(ref mut writer) = self.current_writer {
385 writer.flush()?;
386 debug!("缓冲区已刷新");
387 }
388 Ok(())
389 }
390
391 pub fn get_cache_stats(&self) -> CacheStats {
393 self.file_info_cache.get_cache_stats()
394 }
395
396 pub fn clear_cache(&mut self) -> PcapResult<()> {
398 let _ = self.file_info_cache.clear();
399 debug!("缓存已清理");
400 Ok(())
401 }
402
403 fn create_new_file(&mut self) -> PcapResult<()> {
409 let time_str = Utc::now().to_filename_string();
411 let filename = if self
412 .configuration
413 .file_name_format
414 .is_empty()
415 {
416 format!("data_{time_str}.pcap")
418 } else if self.configuration.file_name_format == crate::foundation::types::constants::DEFAULT_FILE_NAME_FORMAT {
419 format!("{time_str}.pcap")
421 } else {
422 format!(
424 "{}.pcap",
425 self.configuration
426 .file_name_format
427 .replace("{}", &time_str)
428 )
429 };
430
431 let file_path = self.dataset_path.join(&filename);
432
433 let mut writer =
435 PcapFileWriter::new(self.configuration.clone());
436 writer
437 .create(&self.dataset_path, &filename)
438 .map_err(PcapError::InvalidFormat)?;
439
440 if let Some(ref mut old_writer) =
442 self.current_writer
443 {
444 old_writer
445 .flush()
446 .map_err(PcapError::InvalidFormat)?;
447 old_writer.close();
448 }
449
450 self.current_writer = Some(writer);
452 self.current_file_size = 0;
453 self.current_file_packet_count = 0;
454 self.created_files.push(file_path.clone());
455
456 info!("已创建新文件: {file_path:?}");
457 Ok(())
458 }
459
460 fn should_switch_file(&self) -> bool {
462 if self.current_file_packet_count
464 >= self.configuration.max_packets_per_file
465 as u64
466 {
467 return true;
468 }
469
470 if self.configuration.max_file_size_bytes > 0
472 && self.current_file_size
473 >= self.configuration.max_file_size_bytes
474 {
475 return true;
476 }
477
478 false
479 }
480
481 fn switch_to_new_file(&mut self) -> PcapResult<()> {
483 self.current_file_index += 1;
484 self.create_new_file()
485 }
486
487 fn get_total_size(&self) -> u64 {
489 self.created_files
490 .iter()
491 .map(|path| {
492 fs::metadata(path)
493 .map(|metadata| metadata.len())
494 .unwrap_or(0)
495 })
496 .sum()
497 }
498}
499
500impl Drop for PcapWriter {
501 fn drop(&mut self) {
502 if !self.is_finalized {
503 if let Err(e) = self.finalize() {
504 warn!("完成PcapWriter时出错: {e}");
505 }
506 }
507 }
508}