pcapfile_io/api/
writer.rs

1//! 数据集写入器模块
2//!
3//! 提供高级的数据集写入功能,支持多文件自动切换、索引生成等功能。
4
5use log::{debug, info, warn};
6use std::fs;
7use std::path::{Path, PathBuf};
8
9use crate::business::cache::{CacheStats, FileInfoCache};
10use crate::business::config::WriterConfig;
11use crate::business::index::IndexManager;
12use crate::data::file_writer::PcapFileWriter;
13use crate::data::models::{
14    DataPacket, DatasetInfo, FileInfo,
15};
16use crate::foundation::error::{PcapError, PcapResult};
17use crate::foundation::utils::DateTimeExtensions;
18use chrono::Utc;
19
20/// PCAP数据集写入器
21///
22/// 提供对PCAP数据集的高性能写入功能,支持:
23/// - 自动文件管理和切换
24/// - 智能索引生成和更新
25/// - 高性能写入优化
26/// - 数据完整性保证
27pub struct PcapWriter {
28    /// 数据集目录路径
29    dataset_path: PathBuf,
30    /// 数据集名称
31    dataset_name: String,
32    /// 索引管理器
33    index_manager: IndexManager,
34    /// 配置信息
35    configuration: WriterConfig,
36    /// 当前文件写入器
37    current_writer: Option<PcapFileWriter>,
38    /// 当前文件索引
39    current_file_index: usize,
40    /// 当前文件大小
41    current_file_size: u64,
42    /// 已创建的文件列表
43    created_files: Vec<PathBuf>,
44    /// 文件信息缓存
45    file_info_cache: FileInfoCache,
46    /// 总数据包计数
47    total_packet_count: u64,
48    /// 当前文件数据包计数
49    current_file_packet_count: u64,
50    /// 是否已初始化
51    is_initialized: bool,
52    /// 是否已完成
53    is_finalized: bool,
54}
55
56impl PcapWriter {
57    /// 创建新的PCAP写入器
58    ///
59    /// # 参数
60    /// - `base_path` - 基础路径
61    /// - `dataset_name` - 数据集名称
62    ///
63    /// # 返回
64    /// 返回初始化后的写入器实例
65    pub fn new<P: AsRef<Path>>(
66        base_path: P,
67        dataset_name: &str,
68    ) -> PcapResult<Self> {
69        Self::new_with_config(
70            base_path,
71            dataset_name,
72            WriterConfig::default(),
73        )
74    }
75
76    /// 创建新的PCAP写入器(带配置)
77    ///
78    /// # 参数
79    /// - `base_path` - 基础路径
80    /// - `dataset_name` - 数据集名称
81    /// - `configuration` - 写入器配置信息
82    ///
83    /// # 返回
84    /// 返回初始化后的写入器实例
85    pub fn new_with_config<P: AsRef<Path>>(
86        base_path: P,
87        dataset_name: &str,
88        configuration: WriterConfig,
89    ) -> PcapResult<Self> {
90        let dataset_path =
91            base_path.as_ref().join(dataset_name);
92
93        // 确保数据集目录存在
94        if !dataset_path.exists() {
95            fs::create_dir_all(&dataset_path)
96                .map_err(PcapError::Io)?;
97            info!("已创建数据集目录: {dataset_path:?}");
98        }
99
100        if !dataset_path.is_dir() {
101            return Err(PcapError::InvalidArgument(
102                format!(
103                    "指定路径不是目录: {dataset_path:?}"
104                ),
105            ));
106        }
107
108        // 创建索引管理器(新签名:base_path + dataset_name)
109        let index_manager =
110            IndexManager::new(base_path, dataset_name)?;
111
112        // 获取缓存大小(在移动 configuration 之前)
113        let cache_size = configuration.index_cache_size;
114
115        info!("PcapWriter已创建 - 数据集: {dataset_name}");
116
117        Ok(Self {
118            dataset_path,
119            dataset_name: dataset_name.to_string(),
120            index_manager,
121            configuration,
122            current_writer: None,
123            current_file_index: 0,
124            current_file_size: 0,
125            created_files: Vec::new(),
126            file_info_cache: FileInfoCache::new(cache_size),
127            total_packet_count: 0,
128            current_file_packet_count: 0,
129            is_initialized: false,
130            is_finalized: false,
131        })
132    }
133
134    /// 初始化写入器
135    pub fn initialize(&mut self) -> PcapResult<()> {
136        if self.is_initialized {
137            return Ok(());
138        }
139
140        info!("初始化PcapWriter...");
141
142        // 创建第一个文件
143        self.create_new_file()?;
144
145        self.is_initialized = true;
146        info!("PcapWriter初始化完成");
147        Ok(())
148    }
149
150    /// 完成写入并生成索引
151    pub fn finalize(&mut self) -> PcapResult<()> {
152        if self.is_finalized {
153            return Ok(());
154        }
155
156        info!("正在完成PcapWriter...");
157
158        // 刷新并关闭当前文件
159        if let Some(ref mut writer) = self.current_writer {
160            writer.flush()?;
161            writer.close();
162        }
163        self.current_writer = None;
164
165        // 生成索引
166        self.index_manager.regenerate_index()?;
167
168        self.is_finalized = true;
169        info!(
170            "PcapWriter已完成 - 总文件数: {}, 总数据包数: {}",
171            self.created_files.len(),
172            self.total_packet_count
173        );
174
175        Ok(())
176    }
177
178    /// 获取数据集信息
179    pub fn get_dataset_info(&self) -> DatasetInfo {
180        use chrono::Utc;
181
182        DatasetInfo {
183            name: self.dataset_name.clone(),
184            path: self.dataset_path.clone(),
185            file_count: self.created_files.len(),
186            total_packets: self.total_packet_count,
187            total_size: self.get_total_size(),
188            start_timestamp: None, // 需要从实际数据中计算
189            end_timestamp: None,   // 需要从实际数据中计算
190            created_time: Utc::now().to_rfc3339(),
191            modified_time: Utc::now().to_rfc3339(),
192            has_index: true,
193        }
194    }
195
196    /// 获取文件信息列表
197    pub fn get_file_info_list(&self) -> Vec<FileInfo> {
198        let mut file_infos = Vec::new();
199
200        use chrono::Utc;
201        let current_time = Utc::now().to_rfc3339();
202
203        for file_path in &self.created_files {
204            if let Ok(metadata) = fs::metadata(file_path) {
205                let file_info = FileInfo {
206                    file_name: file_path
207                        .file_name()
208                        .and_then(|name| name.to_str())
209                        .unwrap_or("")
210                        .to_string(),
211                    file_path: file_path.clone(),
212                    file_size: metadata.len(),
213                    packet_count: 0, // 需要从索引中获取
214                    start_timestamp: None,
215                    end_timestamp: None,
216                    file_hash: None,
217                    created_time: current_time.clone(),
218                    modified_time: current_time.clone(),
219                    is_valid: true,
220                };
221                file_infos.push(file_info);
222            }
223        }
224
225        file_infos
226    }
227
228    /// 获取索引管理器的引用
229    /// 允许外部通过 writer.index().method() 的方式访问索引功能
230    pub fn index(&self) -> &IndexManager {
231        &self.index_manager
232    }
233
234    /// 获取索引管理器的可变引用
235    /// 允许外部通过 writer.index_mut().method() 的方式访问索引功能
236    pub fn index_mut(&mut self) -> &mut IndexManager {
237        &mut self.index_manager
238    }
239
240    /// 获取数据集路径
241    pub fn dataset_path(&self) -> &Path {
242        &self.dataset_path
243    }
244
245    /// 获取数据集名称
246    pub fn dataset_name(&self) -> &str {
247        &self.dataset_name
248    }
249
250    /// 写入单个数据包
251    ///
252    /// # 参数
253    /// - `packet` - 要写入的数据包
254    ///
255    /// # 返回
256    /// - `Ok(())` - 成功写入数据包
257    /// - `Err(error)` - 写入过程中发生错误
258    pub fn write_packet(
259        &mut self,
260        packet: &DataPacket,
261    ) -> PcapResult<()> {
262        if self.is_finalized {
263            return Err(PcapError::InvalidState(
264                "写入器已完成,无法继续写入".to_string(),
265            ));
266        }
267
268        // 确保初始化
269        if !self.is_initialized {
270            self.initialize()?;
271        }
272
273        // 检查是否需要切换文件
274        if self.should_switch_file() {
275            self.switch_to_new_file()?;
276        }
277
278        // 写入数据包
279        if let Some(ref mut writer) = self.current_writer {
280            writer.write_packet(packet)?;
281
282            // 更新统计信息
283            self.current_file_size +=
284                packet.packet_length() as u64 + 16; // 16字节包头
285            self.current_file_packet_count += 1;
286            self.total_packet_count += 1;
287
288            debug!(
289                "已写入数据包,当前文件大小: {} 字节",
290                self.current_file_size
291            );
292        } else {
293            return Err(PcapError::InvalidState(
294                "没有可用的写入器".to_string(),
295            ));
296        }
297
298        Ok(())
299    }
300
301    /// 批量写入多个数据包
302    ///
303    /// # 参数
304    /// - `packets` - 要写入的数据包列表
305    ///
306    /// # 返回
307    pub fn write_packets(
308        &mut self,
309        packets: &[DataPacket],
310    ) -> PcapResult<()> {
311        for packet in packets {
312            self.write_packet(packet)?;
313        }
314        Ok(())
315    }
316
317    /// 刷新当前文件
318    ///
319    /// 将当前文件的缓冲区数据写入磁盘,确保数据完整性。
320    pub fn flush(&mut self) -> PcapResult<()> {
321        if let Some(ref mut writer) = self.current_writer {
322            writer.flush()?;
323            debug!("缓冲区已刷新");
324        }
325        Ok(())
326    }
327
328    /// 获取缓存统计信息
329    pub fn get_cache_stats(&self) -> CacheStats {
330        self.file_info_cache.get_cache_stats()
331    }
332
333    /// 清理缓存
334    pub fn clear_cache(&mut self) -> PcapResult<()> {
335        let _ = self.file_info_cache.clear();
336        debug!("缓存已清理");
337        Ok(())
338    }
339
340    // =================================================================
341    // 私有方法
342    // =================================================================
343
344    /// 创建新的PCAP文件
345    fn create_new_file(&mut self) -> PcapResult<()> {
346        // 生成符合标准的文件名: data_yyMMdd_HHmmss_nnnnnnnnn.pcap
347        let time_str = Utc::now().to_filename_string();
348        let filename = format!("data_{time_str}.pcap");
349
350        let file_path = self.dataset_path.join(&filename);
351
352        // 创建新的写入器
353        let mut writer =
354            PcapFileWriter::new(self.configuration.clone());
355        writer
356            .create(&self.dataset_path, &filename)
357            .map_err(PcapError::InvalidFormat)?;
358
359        // 关闭之前的写入器
360        if let Some(ref mut old_writer) =
361            self.current_writer
362        {
363            old_writer
364                .flush()
365                .map_err(PcapError::InvalidFormat)?;
366            old_writer.close();
367        }
368
369        // 更新状态
370        self.current_writer = Some(writer);
371        self.current_file_size = 0;
372        self.current_file_packet_count = 0;
373        self.created_files.push(file_path.clone());
374
375        info!("已创建新文件: {file_path:?}");
376        Ok(())
377    }
378
379    /// 检查是否需要切换文件
380    fn should_switch_file(&self) -> bool {
381        // 检查数据包数量限制
382        if self.current_file_packet_count
383            >= self.configuration.max_packets_per_file
384                as u64
385        {
386            return true;
387        }
388
389        false
390    }
391
392    /// 切换到新文件
393    fn switch_to_new_file(&mut self) -> PcapResult<()> {
394        self.current_file_index += 1;
395        self.create_new_file()
396    }
397
398    /// 获取总大小
399    fn get_total_size(&self) -> u64 {
400        self.created_files
401            .iter()
402            .map(|path| {
403                fs::metadata(path)
404                    .map(|metadata| metadata.len())
405                    .unwrap_or(0)
406            })
407            .sum()
408    }
409}
410
411impl Drop for PcapWriter {
412    fn drop(&mut self) {
413        if !self.is_finalized {
414            if let Err(e) = self.finalize() {
415                warn!("完成PcapWriter时出错: {e}");
416            }
417        }
418    }
419}