pcapfile_io/api/
writer.rs

1//! 数据集写入器模块
2//!
3//! 提供高级的数据集写入功能,支持多文件自动切换、索引生成等功能。
4
5use log::{debug, info, warn};
6use std::fs;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::WriterConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_writer::PcapFileWriter;
13use crate::data::models::{
14    DataPacket, DatasetInfo, FileInfo,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17use crate::foundation::utils::DateTimeExtensions;
18use chrono::Utc;
19
20/// PCAP数据集写入器
21///
22/// 提供对PCAP数据集的高性能写入功能,支持:
23/// - 自动文件管理和切换
24/// - 智能索引生成和更新
25/// - 高性能写入优化
26/// - 数据完整性保证
27pub struct PcapWriter {
28    /// 数据集目录路径
29    dataset_path: PathBuf,
30    /// 数据集名称
31    dataset_name: String,
32    /// 索引管理器
33    index_manager: IndexManager,
34    /// 配置信息
35    configuration: WriterConfig,
36    /// 当前文件写入器
37    current_writer: Option<PcapFileWriter>,
38    /// 当前文件索引
39    current_file_index: usize,
40    /// 当前文件大小
41    current_file_size: u64,
42    /// 已创建的文件列表
43    created_files: Vec<PathBuf>,
44    /// 文件信息缓存
45    file_info_cache: FileInfoCache,
46    /// 总数据包计数
47    total_packet_count: u64,
48    /// 当前文件数据包计数
49    current_file_packet_count: u64,
50    /// 是否已初始化
51    is_initialized: bool,
52    /// 是否已完成
53    is_finalized: bool,
54}
55
56impl PcapWriter {
57    /// 创建新的PCAP写入器
58    ///
59    /// # 参数
60    /// - `base_path` - 基础路径
61    /// - `dataset_name` - 数据集名称
62    ///
63    /// # 返回
64    /// 返回初始化后的写入器实例
65    pub fn new<P: AsRef<Path>>(
66        base_path: P,
67        dataset_name: &str,
68    ) -> PcapResult<Self> {
69        Self::new_with_config(
70            base_path,
71            dataset_name,
72            WriterConfig::default(),
73        )
74    }
75
76    /// 创建新的PCAP写入器(带配置)
77    ///
78    /// # 参数
79    /// - `base_path` - 基础路径
80    /// - `dataset_name` - 数据集名称
81    /// - `configuration` - 写入器配置信息
82    ///
83    /// # 返回
84    /// 返回初始化后的写入器实例
85    pub fn new_with_config<P: AsRef<Path>>(
86        base_path: P,
87        dataset_name: &str,
88        configuration: WriterConfig,
89    ) -> PcapResult<Self> {
90        // 验证配置有效性
91        configuration.validate().map_err(|e| {
92            PcapError::InvalidArgument(format!(
93                "写入器配置无效: {e}"
94            ))
95        })?;
96
97        let dataset_path =
98            base_path.as_ref().join(dataset_name);
99
100        // 确保数据集目录存在
101        if !dataset_path.exists() {
102            fs::create_dir_all(&dataset_path)
103                .map_err(PcapError::Io)?;
104            info!("已创建数据集目录: {dataset_path:?}");
105        }
106
107        if !dataset_path.is_dir() {
108            return Err(PcapError::InvalidArgument(
109                format!(
110                    "指定路径不是目录: {dataset_path:?}"
111                ),
112            ));
113        }
114
115        // 创建索引管理器(新签名:base_path + dataset_name)
116        let index_manager =
117            IndexManager::new(base_path, dataset_name)?;
118
119        // 获取缓存大小(在移动 configuration 之前)
120        let cache_size = configuration.index_cache_size;
121
122        info!("PcapWriter已创建 - 数据集: {dataset_name}");
123
124        Ok(Self {
125            dataset_path,
126            dataset_name: dataset_name.to_string(),
127            index_manager,
128            configuration,
129            current_writer: None,
130            current_file_index: 0,
131            current_file_size: 0,
132            created_files: Vec::new(),
133            file_info_cache: FileInfoCache::new(cache_size),
134            total_packet_count: 0,
135            current_file_packet_count: 0,
136            is_initialized: false,
137            is_finalized: false,
138        })
139    }
140
141    /// 初始化写入器
142    pub fn initialize(&mut self) -> PcapResult<()> {
143        if self.is_initialized {
144            return Ok(());
145        }
146
147        info!("初始化PcapWriter...");
148
149        // 创建第一个文件
150        self.create_new_file()?;
151
152        self.is_initialized = true;
153        info!("PcapWriter初始化完成");
154        Ok(())
155    }
156
157    /// 完成写入并生成索引
158    pub fn finalize(&mut self) -> PcapResult<()> {
159        if self.is_finalized {
160            return Ok(());
161        }
162
163        info!("正在完成PcapWriter...");
164
165        // 刷新并关闭当前文件
166        if let Some(ref mut writer) = self.current_writer {
167            writer.flush()?;
168            writer.close();
169        }
170        self.current_writer = None;
171
172        // 生成索引
173        self.index_manager.rebuild_index()?;
174
175        self.is_finalized = true;
176        info!(
177            "PcapWriter已完成 - 总文件数: {}, 总数据包数: {}",
178            self.created_files.len(),
179            self.total_packet_count
180        );
181
182        Ok(())
183    }
184
185    /// 获取数据集信息
186    pub fn get_dataset_info(&self) -> DatasetInfo {
187        use chrono::Utc;
188
189        // 检查索引文件是否存在
190        let pidx_path = self.dataset_path.join(".pidx");
191        let has_index =
192            pidx_path.exists() && pidx_path.is_file();
193
194        DatasetInfo {
195            name: self.dataset_name.clone(),
196            path: self.dataset_path.clone(),
197            file_count: self.created_files.len(),
198            total_packets: self.total_packet_count,
199            total_size: self.get_total_size(),
200            start_timestamp: None, // 需要从实际数据中计算
201            end_timestamp: None,   // 需要从实际数据中计算
202            created_time: Utc::now().to_rfc3339(),
203            modified_time: Utc::now().to_rfc3339(),
204            has_index,
205        }
206    }
207
208    /// 获取文件信息列表
209    pub fn get_file_info_list(&self) -> Vec<FileInfo> {
210        let mut file_infos = Vec::new();
211
212        use chrono::Utc;
213        let current_time = Utc::now().to_rfc3339();
214
215        // 尝试从索引获取详细信息
216        let index_info = self.index_manager.get_index();
217
218        for file_path in &self.created_files {
219            if let Ok(metadata) = fs::metadata(file_path) {
220                let file_name = file_path
221                    .file_name()
222                    .and_then(|name| name.to_str())
223                    .unwrap_or("")
224                    .to_string();
225
226                // 从索引中查找对应的文件信息
227                let (
228                    packet_count,
229                    start_timestamp,
230                    end_timestamp,
231                    file_hash,
232                ) = if let Some(index) = index_info {
233                    if let Some(file_index) =
234                        index.data_files.files.iter().find(
235                            |f| f.file_name == file_name,
236                        )
237                    {
238                        (
239                            file_index.packet_count,
240                            if file_index.start_timestamp
241                                > 0
242                            {
243                                Some(
244                                    file_index
245                                        .start_timestamp,
246                                )
247                            } else {
248                                None
249                            },
250                            if file_index.end_timestamp > 0
251                            {
252                                Some(
253                                    file_index
254                                        .end_timestamp,
255                                )
256                            } else {
257                                None
258                            },
259                            Some(
260                                file_index
261                                    .file_hash
262                                    .clone(),
263                            ),
264                        )
265                    } else {
266                        (0, None, None, None)
267                    }
268                } else {
269                    (0, None, None, None)
270                };
271
272                let file_info = FileInfo {
273                    file_name,
274                    file_path: file_path.clone(),
275                    file_size: metadata.len(),
276                    packet_count,
277                    start_timestamp,
278                    end_timestamp,
279                    file_hash,
280                    created_time: current_time.clone(),
281                    modified_time: current_time.clone(),
282                    is_valid: true,
283                };
284                file_infos.push(file_info);
285            }
286        }
287
288        file_infos
289    }
290
291    /// 获取索引管理器的引用
292    /// 允许外部通过 writer.index().method() 的方式访问索引功能
293    pub fn index(&self) -> &IndexManager {
294        &self.index_manager
295    }
296
297    /// 获取索引管理器的可变引用
298    /// 允许外部通过 writer.index_mut().method() 的方式访问索引功能
299    pub fn index_mut(&mut self) -> &mut IndexManager {
300        &mut self.index_manager
301    }
302
303    /// 获取数据集路径
304    pub fn dataset_path(&self) -> &Path {
305        &self.dataset_path
306    }
307
308    /// 获取数据集名称
309    pub fn dataset_name(&self) -> &str {
310        &self.dataset_name
311    }
312
313    /// 写入单个数据包
314    ///
315    /// # 参数
316    /// - `packet` - 要写入的数据包
317    ///
318    /// # 返回
319    /// - `Ok(())` - 成功写入数据包
320    /// - `Err(error)` - 写入过程中发生错误
321    pub fn write_packet(
322        &mut self,
323        packet: &DataPacket,
324    ) -> PcapResult<()> {
325        if self.is_finalized {
326            return Err(PcapError::InvalidState(
327                "写入器已完成,无法继续写入".to_string(),
328            ));
329        }
330
331        // 确保初始化
332        if !self.is_initialized {
333            self.initialize()?;
334        }
335
336        // 检查是否需要切换文件
337        if self.should_switch_file() {
338            self.switch_to_new_file()?;
339        }
340
341        // 写入数据包
342        if let Some(ref mut writer) = self.current_writer {
343            writer.write_packet(packet)?;
344
345            // 更新统计信息
346            self.current_file_size +=
347                packet.packet_length() as u64 + 16; // 16字节包头
348            self.current_file_packet_count += 1;
349            self.total_packet_count += 1;
350
351            debug!(
352                "已写入数据包,当前文件大小: {} 字节",
353                self.current_file_size
354            );
355        } else {
356            return Err(PcapError::InvalidState(
357                "没有可用的写入器".to_string(),
358            ));
359        }
360
361        Ok(())
362    }
363
364    /// 批量写入多个数据包
365    ///
366    /// # 参数
367    /// - `packets` - 要写入的数据包列表
368    ///
369    /// # 返回
370    pub fn write_packets(
371        &mut self,
372        packets: &[DataPacket],
373    ) -> PcapResult<()> {
374        for packet in packets {
375            self.write_packet(packet)?;
376        }
377        Ok(())
378    }
379
380    /// 刷新当前文件
381    ///
382    /// 将当前文件的缓冲区数据写入磁盘,确保数据完整性。
383    pub fn flush(&mut self) -> PcapResult<()> {
384        if let Some(ref mut writer) = self.current_writer {
385            writer.flush()?;
386            debug!("缓冲区已刷新");
387        }
388        Ok(())
389    }
390
391    /// 获取缓存统计信息
392    pub fn get_cache_stats(&self) -> CacheStats {
393        self.file_info_cache.get_cache_stats()
394    }
395
396    /// 清理缓存
397    pub fn clear_cache(&mut self) -> PcapResult<()> {
398        let _ = self.file_info_cache.clear();
399        debug!("缓存已清理");
400        Ok(())
401    }
402
403    // =================================================================
404    // 私有方法
405    // =================================================================
406
407    /// 创建新的PCAP文件
408    fn create_new_file(&mut self) -> PcapResult<()> {
409        // 使用配置的文件命名格式生成文件名
410        let time_str = Utc::now().to_filename_string();
411        let filename = if self
412            .configuration
413            .file_name_format
414            .is_empty()
415        {
416            // 默认格式:data_yyMMdd_HHmmss_nnnnnnnnn.pcap
417            format!("data_{time_str}.pcap")
418        } else if self.configuration.file_name_format == crate::foundation::types::constants::DEFAULT_FILE_NAME_FORMAT {
419            // 如果使用默认格式,直接使用时间字符串
420            format!("{time_str}.pcap")
421        } else {
422            // 使用用户配置的格式
423            format!(
424                "{}.pcap",
425                self.configuration
426                    .file_name_format
427                    .replace("{}", &time_str)
428            )
429        };
430
431        let file_path = self.dataset_path.join(&filename);
432
433        // 创建新的写入器
434        let mut writer =
435            PcapFileWriter::new(self.configuration.clone());
436        writer
437            .create(&self.dataset_path, &filename)
438            .map_err(PcapError::InvalidFormat)?;
439
440        // 关闭之前的写入器
441        if let Some(ref mut old_writer) =
442            self.current_writer
443        {
444            old_writer
445                .flush()
446                .map_err(PcapError::InvalidFormat)?;
447            old_writer.close();
448        }
449
450        // 更新状态
451        self.current_writer = Some(writer);
452        self.current_file_size = 0;
453        self.current_file_packet_count = 0;
454        self.created_files.push(file_path.clone());
455
456        info!("已创建新文件: {file_path:?}");
457        Ok(())
458    }
459
460    /// 检查是否需要切换文件
461    fn should_switch_file(&self) -> bool {
462        // 检查数据包数量限制
463        if self.current_file_packet_count
464            >= self.configuration.max_packets_per_file
465                as u64
466        {
467            return true;
468        }
469
470        // 检查文件大小限制
471        if self.configuration.max_file_size_bytes > 0
472            && self.current_file_size
473                >= self.configuration.max_file_size_bytes
474        {
475            return true;
476        }
477
478        false
479    }
480
481    /// 切换到新文件
482    fn switch_to_new_file(&mut self) -> PcapResult<()> {
483        self.current_file_index += 1;
484        self.create_new_file()
485    }
486
487    /// 获取总大小
488    fn get_total_size(&self) -> u64 {
489        self.created_files
490            .iter()
491            .map(|path| {
492                fs::metadata(path)
493                    .map(|metadata| metadata.len())
494                    .unwrap_or(0)
495            })
496            .sum()
497    }
498}
499
500impl Drop for PcapWriter {
501    fn drop(&mut self) {
502        if !self.is_finalized {
503            if let Err(e) = self.finalize() {
504                warn!("完成PcapWriter时出错: {e}");
505            }
506        }
507    }
508}