pcapfile_io/api/
writer.rs1use log::{debug, info, warn};
6use std::fs;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::WriterConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_writer::PcapFileWriter;
13use crate::data::models::{
14 DataPacket, DatasetInfo, FileInfo,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17use crate::foundation::utils::DateTimeExtensions;
18use chrono::Utc;
19
20pub struct PcapWriter {
28 dataset_path: PathBuf,
30 dataset_name: String,
32 index_manager: IndexManager,
34 configuration: WriterConfig,
36 current_writer: Option<PcapFileWriter>,
38 current_file_index: usize,
40 current_file_size: u64,
42 created_files: Vec<PathBuf>,
44 file_info_cache: FileInfoCache,
46 total_packet_count: u64,
48 current_file_packet_count: u64,
50 is_initialized: bool,
52 is_finalized: bool,
54}
55
56impl PcapWriter {
57 pub fn new<P: AsRef<Path>>(
66 base_path: P,
67 dataset_name: &str,
68 ) -> PcapResult<Self> {
69 Self::new_with_config(
70 base_path,
71 dataset_name,
72 WriterConfig::default(),
73 )
74 }
75
76 pub fn new_with_config<P: AsRef<Path>>(
86 base_path: P,
87 dataset_name: &str,
88 configuration: WriterConfig,
89 ) -> PcapResult<Self> {
90 let dataset_path =
91 base_path.as_ref().join(dataset_name);
92
93 if !dataset_path.exists() {
95 fs::create_dir_all(&dataset_path)
96 .map_err(PcapError::Io)?;
97 info!("已创建数据集目录: {dataset_path:?}");
98 }
99
100 if !dataset_path.is_dir() {
101 return Err(PcapError::InvalidArgument(
102 format!(
103 "指定路径不是目录: {dataset_path:?}"
104 ),
105 ));
106 }
107
108 let index_manager =
110 IndexManager::new(base_path, dataset_name)?;
111
112 let cache_size = configuration.index_cache_size;
114
115 info!("PcapWriter已创建 - 数据集: {dataset_name}");
116
117 Ok(Self {
118 dataset_path,
119 dataset_name: dataset_name.to_string(),
120 index_manager,
121 configuration,
122 current_writer: None,
123 current_file_index: 0,
124 current_file_size: 0,
125 created_files: Vec::new(),
126 file_info_cache: FileInfoCache::new(cache_size),
127 total_packet_count: 0,
128 current_file_packet_count: 0,
129 is_initialized: false,
130 is_finalized: false,
131 })
132 }
133
134 pub fn initialize(&mut self) -> PcapResult<()> {
136 if self.is_initialized {
137 return Ok(());
138 }
139
140 info!("初始化PcapWriter...");
141
142 self.create_new_file()?;
144
145 self.is_initialized = true;
146 info!("PcapWriter初始化完成");
147 Ok(())
148 }
149
150 pub fn finalize(&mut self) -> PcapResult<()> {
152 if self.is_finalized {
153 return Ok(());
154 }
155
156 info!("正在完成PcapWriter...");
157
158 if let Some(ref mut writer) = self.current_writer {
160 writer.flush()?;
161 writer.close();
162 }
163 self.current_writer = None;
164
165 self.index_manager.regenerate_index()?;
167
168 self.is_finalized = true;
169 info!(
170 "PcapWriter已完成 - 总文件数: {}, 总数据包数: {}",
171 self.created_files.len(),
172 self.total_packet_count
173 );
174
175 Ok(())
176 }
177
178 pub fn get_dataset_info(&self) -> DatasetInfo {
180 use chrono::Utc;
181
182 DatasetInfo {
183 name: self.dataset_name.clone(),
184 path: self.dataset_path.clone(),
185 file_count: self.created_files.len(),
186 total_packets: self.total_packet_count,
187 total_size: self.get_total_size(),
188 start_timestamp: None, end_timestamp: None, created_time: Utc::now().to_rfc3339(),
191 modified_time: Utc::now().to_rfc3339(),
192 has_index: true,
193 }
194 }
195
196 pub fn get_file_info_list(&self) -> Vec<FileInfo> {
198 let mut file_infos = Vec::new();
199
200 use chrono::Utc;
201 let current_time = Utc::now().to_rfc3339();
202
203 for file_path in &self.created_files {
204 if let Ok(metadata) = fs::metadata(file_path) {
205 let file_info = FileInfo {
206 file_name: file_path
207 .file_name()
208 .and_then(|name| name.to_str())
209 .unwrap_or("")
210 .to_string(),
211 file_path: file_path.clone(),
212 file_size: metadata.len(),
213 packet_count: 0, start_timestamp: None,
215 end_timestamp: None,
216 file_hash: None,
217 created_time: current_time.clone(),
218 modified_time: current_time.clone(),
219 is_valid: true,
220 };
221 file_infos.push(file_info);
222 }
223 }
224
225 file_infos
226 }
227
228 pub fn index(&self) -> &IndexManager {
231 &self.index_manager
232 }
233
234 pub fn index_mut(&mut self) -> &mut IndexManager {
237 &mut self.index_manager
238 }
239
240 pub fn dataset_path(&self) -> &Path {
242 &self.dataset_path
243 }
244
245 pub fn dataset_name(&self) -> &str {
247 &self.dataset_name
248 }
249
250 pub fn write_packet(
259 &mut self,
260 packet: &DataPacket,
261 ) -> PcapResult<()> {
262 if self.is_finalized {
263 return Err(PcapError::InvalidState(
264 "写入器已完成,无法继续写入".to_string(),
265 ));
266 }
267
268 if !self.is_initialized {
270 self.initialize()?;
271 }
272
273 if self.should_switch_file() {
275 self.switch_to_new_file()?;
276 }
277
278 if let Some(ref mut writer) = self.current_writer {
280 writer.write_packet(packet)?;
281
282 self.current_file_size +=
284 packet.packet_length() as u64 + 16; self.current_file_packet_count += 1;
286 self.total_packet_count += 1;
287
288 debug!(
289 "已写入数据包,当前文件大小: {} 字节",
290 self.current_file_size
291 );
292 } else {
293 return Err(PcapError::InvalidState(
294 "没有可用的写入器".to_string(),
295 ));
296 }
297
298 Ok(())
299 }
300
301 pub fn write_packets(
308 &mut self,
309 packets: &[DataPacket],
310 ) -> PcapResult<()> {
311 for packet in packets {
312 self.write_packet(packet)?;
313 }
314 Ok(())
315 }
316
317 pub fn flush(&mut self) -> PcapResult<()> {
321 if let Some(ref mut writer) = self.current_writer {
322 writer.flush()?;
323 debug!("缓冲区已刷新");
324 }
325 Ok(())
326 }
327
328 pub fn get_cache_stats(&self) -> CacheStats {
330 self.file_info_cache.get_cache_stats()
331 }
332
333 pub fn clear_cache(&mut self) -> PcapResult<()> {
335 let _ = self.file_info_cache.clear();
336 debug!("缓存已清理");
337 Ok(())
338 }
339
340 fn create_new_file(&mut self) -> PcapResult<()> {
346 let time_str = Utc::now().to_filename_string();
348 let filename = format!("data_{time_str}.pcap");
349
350 let file_path = self.dataset_path.join(&filename);
351
352 let mut writer =
354 PcapFileWriter::new(self.configuration.clone());
355 writer
356 .create(&self.dataset_path, &filename)
357 .map_err(PcapError::InvalidFormat)?;
358
359 if let Some(ref mut old_writer) =
361 self.current_writer
362 {
363 old_writer
364 .flush()
365 .map_err(PcapError::InvalidFormat)?;
366 old_writer.close();
367 }
368
369 self.current_writer = Some(writer);
371 self.current_file_size = 0;
372 self.current_file_packet_count = 0;
373 self.created_files.push(file_path.clone());
374
375 info!("已创建新文件: {file_path:?}");
376 Ok(())
377 }
378
379 fn should_switch_file(&self) -> bool {
381 if self.current_file_packet_count
383 >= self.configuration.max_packets_per_file
384 as u64
385 {
386 return true;
387 }
388
389 false
390 }
391
392 fn switch_to_new_file(&mut self) -> PcapResult<()> {
394 self.current_file_index += 1;
395 self.create_new_file()
396 }
397
398 fn get_total_size(&self) -> u64 {
400 self.created_files
401 .iter()
402 .map(|path| {
403 fs::metadata(path)
404 .map(|metadata| metadata.len())
405 .unwrap_or(0)
406 })
407 .sum()
408 }
409}
410
411impl Drop for PcapWriter {
412 fn drop(&mut self) {
413 if !self.is_finalized {
414 if let Err(e) = self.finalize() {
415 warn!("完成PcapWriter时出错: {e}");
416 }
417 }
418 }
419}