1use log::{debug, info, warn};
6use std::fs;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::WriterConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_writer::PcapFileWriter;
13use crate::data::models::{
14 DataPacket, DatasetInfo, FileInfo,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17use crate::foundation::utils::DateTimeExtensions;
18use chrono::Utc;
19
20pub struct PcapWriter {
28 dataset_path: PathBuf,
30 dataset_name: String,
32 index_manager: IndexManager,
34 configuration: WriterConfig,
36 current_writer: Option<PcapFileWriter>,
38 current_file_index: usize,
40 current_file_size: u64,
42 created_files: Vec<PathBuf>,
44 file_info_cache: FileInfoCache,
46 total_packet_count: u64,
48 current_file_packet_count: u64,
50 is_initialized: bool,
52 is_finalized: bool,
54}
55
56impl PcapWriter {
57 pub fn new<P: AsRef<Path>>(
66 base_path: P,
67 dataset_name: &str,
68 ) -> PcapResult<Self> {
69 Self::new_with_config(
70 base_path,
71 dataset_name,
72 WriterConfig::default(),
73 )
74 }
75
76 pub fn new_with_config<P: AsRef<Path>>(
86 base_path: P,
87 dataset_name: &str,
88 configuration: WriterConfig,
89 ) -> PcapResult<Self> {
90 configuration.validate().map_err(|e| {
92 PcapError::InvalidArgument(format!(
93 "写入器配置无效: {e}"
94 ))
95 })?;
96
97 let dataset_path =
98 base_path.as_ref().join(dataset_name);
99
100 if !dataset_path.exists() {
102 fs::create_dir_all(&dataset_path)
103 .map_err(PcapError::Io)?;
104 info!("已创建数据集目录: {dataset_path:?}");
105 }
106
107 if !dataset_path.is_dir() {
108 return Err(PcapError::InvalidArgument(
109 format!(
110 "指定路径不是目录: {dataset_path:?}"
111 ),
112 ));
113 }
114
115 let index_manager =
117 IndexManager::new(base_path, dataset_name)?;
118
119 let cache_size = configuration.index_cache_size;
121
122 info!("PcapWriter已创建 - 数据集: {dataset_name}");
123
124 Ok(Self {
125 dataset_path,
126 dataset_name: dataset_name.to_string(),
127 index_manager,
128 configuration,
129 current_writer: None,
130 current_file_index: 0,
131 current_file_size: 0,
132 created_files: Vec::new(),
133 file_info_cache: FileInfoCache::new(cache_size),
134 total_packet_count: 0,
135 current_file_packet_count: 0,
136 is_initialized: false,
137 is_finalized: false,
138 })
139 }
140
141 pub fn initialize(&mut self) -> PcapResult<()> {
143 if self.is_initialized {
144 return Ok(());
145 }
146
147 info!("初始化PcapWriter...");
148
149 self.create_new_file()?;
151
152 self.is_initialized = true;
153 info!("PcapWriter初始化完成");
154 Ok(())
155 }
156
157 pub fn finalize(&mut self) -> PcapResult<()> {
159 if self.is_finalized {
160 return Ok(());
161 }
162
163 info!("正在完成PcapWriter...");
164
165 if let Some(ref mut writer) = self.current_writer {
167 writer.flush()?;
168 writer.close();
169 }
170 self.current_writer = None;
171
172 self.index_manager.rebuild_index()?;
174
175 self.is_finalized = true;
176 info!(
177 "PcapWriter已完成 - 总文件数: {}, 总数据包数: {}",
178 self.created_files.len(),
179 self.total_packet_count
180 );
181
182 Ok(())
183 }
184
185 pub fn get_dataset_info(&self) -> DatasetInfo {
187 use chrono::Utc;
188
189 let pidx_path = self.dataset_path.join(".pidx");
191 let has_index =
192 pidx_path.exists() && pidx_path.is_file();
193
194 DatasetInfo {
195 name: self.dataset_name.clone(),
196 path: self.dataset_path.clone(),
197 file_count: self.created_files.len(),
198 total_packets: self.total_packet_count,
199 total_size: self.get_total_size(),
200 start_timestamp: None, end_timestamp: None, created_time: Utc::now().to_rfc3339(),
203 modified_time: Utc::now().to_rfc3339(),
204 has_index,
205 }
206 }
207
208 pub fn get_file_info_list(&self) -> Vec<FileInfo> {
210 let mut file_infos = Vec::new();
211
212 use chrono::Utc;
213 let current_time = Utc::now().to_rfc3339();
214
215 let index_info = self.index_manager.get_index();
217
218 for file_path in &self.created_files {
219 if let Ok(metadata) = fs::metadata(file_path) {
220 let file_name = file_path
221 .file_name()
222 .and_then(|name| name.to_str())
223 .unwrap_or("")
224 .to_string();
225
226 let (
228 packet_count,
229 start_timestamp,
230 end_timestamp,
231 ) = if let Some(index) = index_info {
232 if let Some(file_index) =
233 index.data_files.files.iter().find(
234 |f| f.file_name == file_name,
235 )
236 {
237 (
238 file_index.packet_count,
239 if file_index.start_timestamp
240 > 0
241 {
242 Some(
243 file_index
244 .start_timestamp,
245 )
246 } else {
247 None
248 },
249 if file_index.end_timestamp > 0
250 {
251 Some(
252 file_index
253 .end_timestamp,
254 )
255 } else {
256 None
257 },
258 )
259 } else {
260 (0, None, None)
261 }
262 } else {
263 (0, None, None)
264 };
265
266 let file_info = FileInfo {
267 file_name,
268 file_path: file_path.clone(),
269 file_size: metadata.len(),
270 packet_count,
271 start_timestamp,
272 end_timestamp,
273 created_time: current_time.clone(),
274 modified_time: current_time.clone(),
275 is_valid: true,
276 };
277 file_infos.push(file_info);
278 }
279 }
280
281 file_infos
282 }
283
284 pub fn index(&self) -> &IndexManager {
287 &self.index_manager
288 }
289
290 pub fn index_mut(&mut self) -> &mut IndexManager {
293 &mut self.index_manager
294 }
295
296 pub fn dataset_path(&self) -> &Path {
298 &self.dataset_path
299 }
300
301 pub fn dataset_name(&self) -> &str {
303 &self.dataset_name
304 }
305
306 pub fn write_packet(
315 &mut self,
316 packet: &DataPacket,
317 ) -> PcapResult<()> {
318 if self.is_finalized {
319 return Err(PcapError::InvalidState(
320 "写入器已完成,无法继续写入".to_string(),
321 ));
322 }
323
324 if !self.is_initialized {
326 self.initialize()?;
327 }
328
329 if self.should_switch_file() {
331 self.switch_to_new_file()?;
332 }
333
334 if let Some(ref mut writer) = self.current_writer {
336 writer.write_packet(packet)?;
337
338 self.current_file_size +=
340 packet.packet_length() as u64 + 16; self.current_file_packet_count += 1;
342 self.total_packet_count += 1;
343
344 debug!(
345 "已写入数据包,当前文件大小: {} 字节",
346 self.current_file_size
347 );
348 } else {
349 return Err(PcapError::InvalidState(
350 "没有可用的写入器".to_string(),
351 ));
352 }
353
354 Ok(())
355 }
356
357 pub fn write_packets(
364 &mut self,
365 packets: &[DataPacket],
366 ) -> PcapResult<()> {
367 for packet in packets {
368 self.write_packet(packet)?;
369 }
370 Ok(())
371 }
372
373 pub fn flush(&mut self) -> PcapResult<()> {
377 if let Some(ref mut writer) = self.current_writer {
378 writer.flush()?;
379 debug!("缓冲区已刷新");
380 }
381 Ok(())
382 }
383
384 pub fn get_cache_stats(&self) -> CacheStats {
386 self.file_info_cache.get_cache_stats()
387 }
388
389 pub fn clear_cache(&mut self) -> PcapResult<()> {
391 let _ = self.file_info_cache.clear();
392 debug!("缓存已清理");
393 Ok(())
394 }
395
396 fn create_new_file(&mut self) -> PcapResult<()> {
402 let time_str = Utc::now().to_filename_string();
404 let filename = if self
405 .configuration
406 .file_name_format
407 .is_empty()
408 {
409 format!("data_{time_str}.pcap")
411 } else if self.configuration.file_name_format == crate::foundation::types::constants::DEFAULT_FILE_NAME_FORMAT {
412 format!("{time_str}.pcap")
414 } else {
415 format!(
417 "{}.pcap",
418 self.configuration
419 .file_name_format
420 .replace("{}", &time_str)
421 )
422 };
423
424 let file_path = self.dataset_path.join(&filename);
425
426 let mut writer =
428 PcapFileWriter::new(self.configuration.clone());
429 writer
430 .create(&self.dataset_path, &filename)
431 .map_err(PcapError::InvalidFormat)?;
432
433 if let Some(ref mut old_writer) =
435 self.current_writer
436 {
437 old_writer
438 .flush()
439 .map_err(PcapError::InvalidFormat)?;
440 old_writer.close();
441 }
442
443 self.current_writer = Some(writer);
445 self.current_file_size = 0;
446 self.current_file_packet_count = 0;
447 self.created_files.push(file_path.clone());
448
449 info!("已创建新文件: {file_path:?}");
450 Ok(())
451 }
452
453 fn should_switch_file(&self) -> bool {
455 if self.current_file_packet_count
457 >= self.configuration.max_packets_per_file
458 as u64
459 {
460 return true;
461 }
462
463 if self.configuration.max_file_size_bytes > 0
465 && self.current_file_size
466 >= self.configuration.max_file_size_bytes
467 {
468 return true;
469 }
470
471 false
472 }
473
474 fn switch_to_new_file(&mut self) -> PcapResult<()> {
476 self.current_file_index += 1;
477 self.create_new_file()
478 }
479
480 fn get_total_size(&self) -> u64 {
482 self.created_files
483 .iter()
484 .map(|path| {
485 fs::metadata(path)
486 .map(|metadata| metadata.len())
487 .unwrap_or(0)
488 })
489 .sum()
490 }
491}
492
493impl Drop for PcapWriter {
494 fn drop(&mut self) {
495 if !self.is_finalized {
496 if let Err(e) = self.finalize() {
497 warn!("完成PcapWriter时出错: {e}");
498 }
499 }
500 }
501}