Skip to main content

dm_database_sqllog2db/exporter/
mod.rs

1/// Exporter 模块 - 负责将解析后的 SQL 日志导出到各种目标
2///
3/// 支持的导出目标:
4/// - CSV 文件
5/// - `SQLite` 数据库
6use crate::config::Config;
7use crate::error::{ConfigError, Error, Result};
8use dm_database_parser_sqllog::Sqllog;
9use log::info;
10
11#[cfg(feature = "csv")]
12pub mod csv;
13#[cfg(feature = "jsonl")]
14pub mod jsonl;
15#[cfg(feature = "sqlite")]
16pub mod sqlite;
17mod util;
18
19#[cfg(feature = "csv")]
20pub use csv::CsvExporter;
21#[cfg(feature = "jsonl")]
22pub use jsonl::JsonlExporter;
23#[cfg(feature = "sqlite")]
24pub use sqlite::SqliteExporter;
25
26/// Exporter 基础 trait - 所有导出器必须实现此接口
27/// 导出器 trait
28pub trait Exporter {
29    /// 初始化导出器 (例如:创建文件、连接数据库、创建表等)
30    fn initialize(&mut self) -> Result<()>;
31
32    /// 导出单条 SQL 日志记录
33    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
34
35    /// 批量导出多条日志记录 (默认实现:逐条调用 export)
36    fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
37        for sqllog in sqllogs {
38            self.export(sqllog)?;
39        }
40        Ok(())
41    }
42
43    /// 完成导出 (例如:刷新缓冲区、提交事务、关闭文件等)
44    fn finalize(&mut self) -> Result<()>;
45
46    /// 获取导出器名称 (用于日志记录)
47    fn name(&self) -> &str;
48
49    /// 获取导出统计信息的快照
50    /// 默认返回 None;具体导出器可覆盖此方法以提供统计信息
51    fn stats_snapshot(&self) -> Option<ExportStats> {
52        None
53    }
54}
55
56/// 导出统计信息
57#[derive(Debug, Default, Clone)]
58pub struct ExportStats {
59    /// 成功导出的记录数
60    pub exported: usize,
61    /// 跳过的记录数
62    pub skipped: usize,
63    /// 失败的记录数
64    pub failed: usize,
65    /// 刷新/批量写入操作次数(数据库类导出器)
66    pub flush_operations: usize,
67    /// 最近一次刷新写入的记录数
68    pub last_flush_size: usize,
69}
70
71impl ExportStats {
72    #[must_use]
73    pub fn new() -> Self {
74        Self::default()
75    }
76
77    pub fn record_success(&mut self) {
78        self.exported += 1;
79    }
80
81    #[must_use]
82    pub fn total(&self) -> usize {
83        self.exported + self.skipped + self.failed
84    }
85}
86
87/// 导出器管理器 - 管理单个导出器
88pub struct ExporterManager {
89    exporter: Box<dyn Exporter>,
90}
91
92impl std::fmt::Debug for ExporterManager {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct("ExporterManager")
95            .field("exporter_name", &self.exporter.name())
96            .finish()
97    }
98}
99
100impl ExporterManager {
101    /// 从配置创建导出器管理器
102    pub fn from_config(config: &Config) -> Result<Self> {
103        info!("Initializing exporter manager...");
104
105        // 优先级:CSV > JSONL > SQLite
106
107        // 1. 尝试创建 CSV 导出器
108        #[cfg(feature = "csv")]
109        if let Some(csv_config) = config.exporter.csv() {
110            let csv_exporter = CsvExporter::from_config(csv_config);
111            info!("Using CSV exporter: {}", csv_config.file);
112            return Ok(Self {
113                exporter: Box::new(csv_exporter),
114            });
115        }
116
117        // 2. 尝试创建 JSONL 导出器
118        #[cfg(feature = "jsonl")]
119        if let Some(jsonl_config) = config.exporter.jsonl() {
120            let jsonl_exporter = JsonlExporter::from_config(jsonl_config);
121            info!("Using JSONL exporter: {}", jsonl_config.file);
122            return Ok(Self {
123                exporter: Box::new(jsonl_exporter),
124            });
125        }
126
127        // 3. 尝试创建 SQLite 导出器
128        #[cfg(feature = "sqlite")]
129        if let Some(sqlite_config) = config.exporter.sqlite() {
130            let sqlite_exporter = SqliteExporter::from_config(sqlite_config);
131            info!("Using SQLite exporter: {}", sqlite_config.database_url);
132            return Ok(Self {
133                exporter: Box::new(sqlite_exporter),
134            });
135        }
136
137        Err(Error::Config(ConfigError::NoExporters))
138    }
139    /// 初始化导出器
140    pub fn initialize(&mut self) -> Result<()> {
141        info!("Initializing exporters...");
142        self.exporter.initialize()?;
143        info!("Exporters initialized");
144        Ok(())
145    }
146
147    /// 批量导出日志记录
148    pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
149        if sqllogs.is_empty() {
150            return Ok(());
151        }
152
153        // 转换为引用的切片
154        let refs: Vec<&Sqllog<'_>> = sqllogs.iter().collect();
155        self.exporter.export_batch(&refs)
156    }
157
158    /// 完成导出器
159    pub fn finalize(&mut self) -> Result<()> {
160        info!("Finalizing exporters...");
161        self.exporter.finalize()?;
162        info!("Exporters finished");
163        Ok(())
164    }
165
166    /// 获取导出器名称
167    #[must_use]
168    pub fn name(&self) -> &str {
169        self.exporter.name()
170    }
171
172    /// 获取导出统计信息
173    #[must_use]
174    pub fn stats(&self) -> Option<ExportStats> {
175        self.exporter.stats_snapshot()
176    }
177
178    /// 记录导出器的统计信息到日志
179    pub fn log_stats(&self) {
180        if let Some(s) = self.stats() {
181            info!(
182                "Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
183                self.name(),
184                s.exported,
185                s.failed,
186                s.skipped,
187                s.total(),
188                if s.flush_operations > 0 {
189                    format!(
190                        " | flushed:{} times (recent {} entries)",
191                        s.flush_operations, s.last_flush_size
192                    )
193                } else {
194                    String::new()
195                }
196            );
197        } else {
198            info!("No export statistics available");
199        }
200    }
201}