pcapfile_io/api/
writer.rs

1//! 数据集写入器模块
2//!
3//! 提供高级的数据集写入功能,支持多文件自动切换、索引生成等功能。
4
5use log::{debug, info, warn};
6use std::fs;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::WriterConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_writer::PcapFileWriter;
13use crate::data::models::{
14    DataPacket, DatasetInfo, FileInfo,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17use crate::foundation::utils::DateTimeExtensions;
18use chrono::Utc;
19
20/// PCAP数据集写入器
21///
22/// 提供对PCAP数据集的高性能写入功能,支持:
23/// - 自动文件管理和切换
24/// - 智能索引生成和更新
25/// - 高性能写入优化
26/// - 数据完整性保证
27pub struct PcapWriter {
28    /// 数据集目录路径
29    dataset_path: PathBuf,
30    /// 数据集名称
31    dataset_name: String,
32    /// 索引管理器
33    index_manager: IndexManager,
34    /// 配置信息
35    configuration: WriterConfig,
36    /// 当前文件写入器
37    current_writer: Option<PcapFileWriter>,
38    /// 当前文件索引
39    current_file_index: usize,
40    /// 当前文件大小
41    current_file_size: u64,
42    /// 已创建的文件列表
43    created_files: Vec<PathBuf>,
44    /// 文件信息缓存
45    file_info_cache: FileInfoCache,
46    /// 总数据包计数
47    total_packet_count: u64,
48    /// 当前文件数据包计数
49    current_file_packet_count: u64,
50    /// 是否已初始化
51    is_initialized: bool,
52    /// 是否已完成
53    is_finalized: bool,
54}
55
56impl PcapWriter {
57    /// 创建新的PCAP写入器
58    ///
59    /// # 参数
60    /// - `base_path` - 基础路径
61    /// - `dataset_name` - 数据集名称
62    ///
63    /// # 返回
64    /// 返回初始化后的写入器实例
65    pub fn new<P: AsRef<Path>>(
66        base_path: P,
67        dataset_name: &str,
68    ) -> PcapResult<Self> {
69        Self::new_with_config(
70            base_path,
71            dataset_name,
72            WriterConfig::default(),
73        )
74    }
75
76    /// 创建新的PCAP写入器(带配置)
77    ///
78    /// # 参数
79    /// - `base_path` - 基础路径
80    /// - `dataset_name` - 数据集名称
81    /// - `configuration` - 写入器配置信息
82    ///
83    /// # 返回
84    /// 返回初始化后的写入器实例
85    pub fn new_with_config<P: AsRef<Path>>(
86        base_path: P,
87        dataset_name: &str,
88        configuration: WriterConfig,
89    ) -> PcapResult<Self> {
90        // 验证配置有效性
91        configuration.validate().map_err(|e| {
92            PcapError::InvalidArgument(format!(
93                "写入器配置无效: {e}"
94            ))
95        })?;
96
97        let dataset_path =
98            base_path.as_ref().join(dataset_name);
99
100        // 确保数据集目录存在
101        if !dataset_path.exists() {
102            fs::create_dir_all(&dataset_path)
103                .map_err(PcapError::Io)?;
104            info!("已创建数据集目录: {dataset_path:?}");
105        }
106
107        if !dataset_path.is_dir() {
108            return Err(PcapError::InvalidArgument(
109                format!(
110                    "指定路径不是目录: {dataset_path:?}"
111                ),
112            ));
113        }
114
115        // 创建索引管理器(新签名:base_path + dataset_name)
116        let index_manager =
117            IndexManager::new(base_path, dataset_name)?;
118
119        // 获取缓存大小(在移动 configuration 之前)
120        let cache_size = configuration.index_cache_size;
121
122        info!("PcapWriter已创建 - 数据集: {dataset_name}");
123
124        Ok(Self {
125            dataset_path,
126            dataset_name: dataset_name.to_string(),
127            index_manager,
128            configuration,
129            current_writer: None,
130            current_file_index: 0,
131            current_file_size: 0,
132            created_files: Vec::new(),
133            file_info_cache: FileInfoCache::new(cache_size),
134            total_packet_count: 0,
135            current_file_packet_count: 0,
136            is_initialized: false,
137            is_finalized: false,
138        })
139    }
140
141    /// 初始化写入器
142    pub fn initialize(&mut self) -> PcapResult<()> {
143        if self.is_initialized {
144            return Ok(());
145        }
146
147        info!("初始化PcapWriter...");
148
149        // 创建第一个文件
150        self.create_new_file()?;
151
152        self.is_initialized = true;
153        info!("PcapWriter初始化完成");
154        Ok(())
155    }
156
157    /// 完成写入并生成索引
158    pub fn finalize(&mut self) -> PcapResult<()> {
159        if self.is_finalized {
160            return Ok(());
161        }
162
163        info!("正在完成PcapWriter...");
164
165        // 刷新并关闭当前文件
166        if let Some(ref mut writer) = self.current_writer {
167            writer.flush()?;
168            writer.close();
169        }
170        self.current_writer = None;
171
172        // 生成索引
173        self.index_manager.rebuild_index()?;
174
175        self.is_finalized = true;
176        info!(
177            "PcapWriter已完成 - 总文件数: {}, 总数据包数: {}",
178            self.created_files.len(),
179            self.total_packet_count
180        );
181
182        Ok(())
183    }
184
185    /// 获取数据集信息
186    pub fn get_dataset_info(&self) -> DatasetInfo {
187        use chrono::Utc;
188
189        // 检查索引文件是否存在
190        let pidx_path = self.dataset_path.join(".pidx");
191        let has_index =
192            pidx_path.exists() && pidx_path.is_file();
193
194        DatasetInfo {
195            name: self.dataset_name.clone(),
196            path: self.dataset_path.clone(),
197            file_count: self.created_files.len(),
198            total_packets: self.total_packet_count,
199            total_size: self.get_total_size(),
200            start_timestamp: None, // 需要从实际数据中计算
201            end_timestamp: None,   // 需要从实际数据中计算
202            created_time: Utc::now().to_rfc3339(),
203            modified_time: Utc::now().to_rfc3339(),
204            has_index,
205        }
206    }
207
208    /// 获取文件信息列表
209    pub fn get_file_info_list(&self) -> Vec<FileInfo> {
210        let mut file_infos = Vec::new();
211
212        use chrono::Utc;
213        let current_time = Utc::now().to_rfc3339();
214
215        // 尝试从索引获取详细信息
216        let index_info = self.index_manager.get_index();
217
218        for file_path in &self.created_files {
219            if let Ok(metadata) = fs::metadata(file_path) {
220                let file_name = file_path
221                    .file_name()
222                    .and_then(|name| name.to_str())
223                    .unwrap_or("")
224                    .to_string();
225
226                // 从索引中查找对应的文件信息
227                let (
228                    packet_count,
229                    start_timestamp,
230                    end_timestamp,
231                ) = if let Some(index) = index_info {
232                    if let Some(file_index) =
233                        index.data_files.files.iter().find(
234                            |f| f.file_name == file_name,
235                        )
236                    {
237                        (
238                            file_index.packet_count,
239                            if file_index.start_timestamp
240                                > 0
241                            {
242                                Some(
243                                    file_index
244                                        .start_timestamp,
245                                )
246                            } else {
247                                None
248                            },
249                            if file_index.end_timestamp > 0
250                            {
251                                Some(
252                                    file_index
253                                        .end_timestamp,
254                                )
255                            } else {
256                                None
257                            },
258                        )
259                    } else {
260                        (0, None, None)
261                    }
262                } else {
263                    (0, None, None)
264                };
265
266                let file_info = FileInfo {
267                    file_name,
268                    file_path: file_path.clone(),
269                    file_size: metadata.len(),
270                    packet_count,
271                    start_timestamp,
272                    end_timestamp,
273                    created_time: current_time.clone(),
274                    modified_time: current_time.clone(),
275                    is_valid: true,
276                };
277                file_infos.push(file_info);
278            }
279        }
280
281        file_infos
282    }
283
284    /// 获取索引管理器的引用
285    /// 允许外部通过 writer.index().method() 的方式访问索引功能
286    pub fn index(&self) -> &IndexManager {
287        &self.index_manager
288    }
289
290    /// 获取索引管理器的可变引用
291    /// 允许外部通过 writer.index_mut().method() 的方式访问索引功能
292    pub fn index_mut(&mut self) -> &mut IndexManager {
293        &mut self.index_manager
294    }
295
296    /// 获取数据集路径
297    pub fn dataset_path(&self) -> &Path {
298        &self.dataset_path
299    }
300
301    /// 获取数据集名称
302    pub fn dataset_name(&self) -> &str {
303        &self.dataset_name
304    }
305
306    /// 写入单个数据包
307    ///
308    /// # 参数
309    /// - `packet` - 要写入的数据包
310    ///
311    /// # 返回
312    /// - `Ok(())` - 成功写入数据包
313    /// - `Err(error)` - 写入过程中发生错误
314    pub fn write_packet(
315        &mut self,
316        packet: &DataPacket,
317    ) -> PcapResult<()> {
318        if self.is_finalized {
319            return Err(PcapError::InvalidState(
320                "写入器已完成,无法继续写入".to_string(),
321            ));
322        }
323
324        // 确保初始化
325        if !self.is_initialized {
326            self.initialize()?;
327        }
328
329        // 检查是否需要切换文件
330        if self.should_switch_file() {
331            self.switch_to_new_file()?;
332        }
333
334        // 写入数据包
335        if let Some(ref mut writer) = self.current_writer {
336            writer.write_packet(packet)?;
337
338            // 更新统计信息
339            self.current_file_size +=
340                packet.packet_length() as u64 + 16; // 16字节包头
341            self.current_file_packet_count += 1;
342            self.total_packet_count += 1;
343
344            debug!(
345                "已写入数据包,当前文件大小: {} 字节",
346                self.current_file_size
347            );
348        } else {
349            return Err(PcapError::InvalidState(
350                "没有可用的写入器".to_string(),
351            ));
352        }
353
354        Ok(())
355    }
356
357    /// 批量写入多个数据包
358    ///
359    /// # 参数
360    /// - `packets` - 要写入的数据包列表
361    ///
362    /// # 返回
363    pub fn write_packets(
364        &mut self,
365        packets: &[DataPacket],
366    ) -> PcapResult<()> {
367        for packet in packets {
368            self.write_packet(packet)?;
369        }
370        Ok(())
371    }
372
373    /// 刷新当前文件
374    ///
375    /// 将当前文件的缓冲区数据写入磁盘,确保数据完整性。
376    pub fn flush(&mut self) -> PcapResult<()> {
377        if let Some(ref mut writer) = self.current_writer {
378            writer.flush()?;
379            debug!("缓冲区已刷新");
380        }
381        Ok(())
382    }
383
384    /// 获取缓存统计信息
385    pub fn get_cache_stats(&self) -> CacheStats {
386        self.file_info_cache.get_cache_stats()
387    }
388
389    /// 清理缓存
390    pub fn clear_cache(&mut self) -> PcapResult<()> {
391        let _ = self.file_info_cache.clear();
392        debug!("缓存已清理");
393        Ok(())
394    }
395
396    // =================================================================
397    // 私有方法
398    // =================================================================
399
400    /// 创建新的PCAP文件
401    fn create_new_file(&mut self) -> PcapResult<()> {
402        // 使用配置的文件命名格式生成文件名
403        let time_str = Utc::now().to_filename_string();
404        let filename = if self
405            .configuration
406            .file_name_format
407            .is_empty()
408        {
409            // 默认格式:data_yyMMdd_HHmmss_nnnnnnnnn.pcap
410            format!("data_{time_str}.pcap")
411        } else if self.configuration.file_name_format == crate::foundation::types::constants::DEFAULT_FILE_NAME_FORMAT {
412            // 如果使用默认格式,直接使用时间字符串
413            format!("{time_str}.pcap")
414        } else {
415            // 使用用户配置的格式
416            format!(
417                "{}.pcap",
418                self.configuration
419                    .file_name_format
420                    .replace("{}", &time_str)
421            )
422        };
423
424        let file_path = self.dataset_path.join(&filename);
425
426        // 创建新的写入器
427        let mut writer =
428            PcapFileWriter::new(self.configuration.clone());
429        writer
430            .create(&self.dataset_path, &filename)
431            .map_err(PcapError::InvalidFormat)?;
432
433        // 关闭之前的写入器
434        if let Some(ref mut old_writer) =
435            self.current_writer
436        {
437            old_writer
438                .flush()
439                .map_err(PcapError::InvalidFormat)?;
440            old_writer.close();
441        }
442
443        // 更新状态
444        self.current_writer = Some(writer);
445        self.current_file_size = 0;
446        self.current_file_packet_count = 0;
447        self.created_files.push(file_path.clone());
448
449        info!("已创建新文件: {file_path:?}");
450        Ok(())
451    }
452
453    /// 检查是否需要切换文件
454    fn should_switch_file(&self) -> bool {
455        // 检查数据包数量限制
456        if self.current_file_packet_count
457            >= self.configuration.max_packets_per_file
458                as u64
459        {
460            return true;
461        }
462
463        // 检查文件大小限制
464        if self.configuration.max_file_size_bytes > 0
465            && self.current_file_size
466                >= self.configuration.max_file_size_bytes
467        {
468            return true;
469        }
470
471        false
472    }
473
474    /// 切换到新文件
475    fn switch_to_new_file(&mut self) -> PcapResult<()> {
476        self.current_file_index += 1;
477        self.create_new_file()
478    }
479
480    /// 获取总大小
481    fn get_total_size(&self) -> u64 {
482        self.created_files
483            .iter()
484            .map(|path| {
485                fs::metadata(path)
486                    .map(|metadata| metadata.len())
487                    .unwrap_or(0)
488            })
489            .sum()
490    }
491}
492
493impl Drop for PcapWriter {
494    fn drop(&mut self) {
495        if !self.is_finalized {
496            if let Err(e) = self.finalize() {
497                warn!("完成PcapWriter时出错: {e}");
498            }
499        }
500    }
501}