dm_database_sqllog2db/exporter/
mod.rs

1/// Exporter 模块 - 负责将解析后的 SQL 日志导出到各种目标
2///
3/// 支持的导出目标:
4/// - CSV 文件
5/// - SQLite 数据库
6use crate::config::Config;
7use crate::error::{ConfigError, Error, Result};
8use dm_database_parser_sqllog::Sqllog;
9use log::info;
10
11#[cfg(feature = "csv")]
12pub mod csv;
13#[cfg(feature = "dm")]
14pub mod dm;
15#[cfg(feature = "duckdb")]
16pub mod duckdb;
17#[cfg(feature = "jsonl")]
18pub mod jsonl;
19#[cfg(feature = "parquet")]
20pub mod parquet;
21#[cfg(feature = "postgres")]
22pub mod postgres;
23#[cfg(feature = "sqlite")]
24pub mod sqlite;
25mod util;
26
27#[cfg(feature = "csv")]
28pub use csv::CsvExporter;
29#[cfg(feature = "dm")]
30pub use dm::DmExporter;
31#[cfg(feature = "duckdb")]
32pub use duckdb::DuckdbExporter;
33#[cfg(feature = "jsonl")]
34pub use jsonl::JsonlExporter;
35#[cfg(feature = "parquet")]
36pub use parquet::ParquetExporter;
37#[cfg(feature = "postgres")]
38pub use postgres::PostgresExporter;
39#[cfg(feature = "sqlite")]
40pub use sqlite::SqliteExporter;
41
42/// Exporter 基础 trait - 所有导出器必须实现此接口
43/// 导出器 trait
44pub trait Exporter {
45    /// 初始化导出器 (例如:创建文件、连接数据库、创建表等)
46    fn initialize(&mut self) -> Result<()>;
47
48    /// 导出单条 SQL 日志记录
49    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
50
51    /// 批量导出多条日志记录 (默认实现:逐条调用 export)
52    fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
53        for sqllog in sqllogs {
54            self.export(sqllog)?;
55        }
56        Ok(())
57    }
58
59    /// 完成导出 (例如:刷新缓冲区、提交事务、关闭文件等)
60    fn finalize(&mut self) -> Result<()>;
61
62    /// 获取导出器名称 (用于日志记录)
63    fn name(&self) -> &str;
64
65    /// 获取导出统计信息的快照
66    /// 默认返回 None;具体导出器可覆盖此方法以提供统计信息
67    fn stats_snapshot(&self) -> Option<ExportStats> {
68        None
69    }
70}
71
72/// 导出统计信息
73#[derive(Debug, Default, Clone)]
74pub struct ExportStats {
75    /// 成功导出的记录数
76    pub exported: usize,
77    /// 跳过的记录数
78    pub skipped: usize,
79    /// 失败的记录数
80    pub failed: usize,
81    /// 刷新/批量写入操作次数(数据库类导出器)
82    pub flush_operations: usize,
83    /// 最近一次刷新写入的记录数
84    pub last_flush_size: usize,
85}
86
87impl ExportStats {
88    pub fn new() -> Self {
89        Self::default()
90    }
91
92    pub fn record_success(&mut self) {
93        self.exported += 1;
94    }
95
96    pub fn total(&self) -> usize {
97        self.exported + self.skipped + self.failed
98    }
99}
100
101/// 导出器管理器 - 管理单个导出器
102pub struct ExporterManager {
103    exporter: Box<dyn Exporter>,
104}
105
106impl ExporterManager {
107    /// 从配置创建导出器管理器
108    pub fn from_config(config: &Config) -> Result<Self> {
109        info!("Initializing exporter manager...");
110
111        // 优先级:CSV > Parquet > JSONL > SQLite > DM
112
113        // 1. 尝试创建 CSV 导出器
114        #[cfg(feature = "csv")]
115        if let Some(csv_config) = config.exporter.csv() {
116            let csv_exporter = CsvExporter::from_config(csv_config);
117            info!("Using CSV exporter: {}", csv_config.file);
118            return Ok(Self {
119                exporter: Box::new(csv_exporter),
120            });
121        }
122
123        // 2. 尝试创建 Parquet 导出器
124        #[cfg(feature = "parquet")]
125        if let Some(parquet_config) = config.exporter.parquet() {
126            let parquet_exporter = ParquetExporter::from_config(parquet_config);
127            info!("Using Parquet exporter: {}", parquet_config.file);
128            return Ok(Self {
129                exporter: Box::new(parquet_exporter),
130            });
131        }
132
133        // 3. 尝试创建 JSONL 导出器
134        #[cfg(feature = "jsonl")]
135        if let Some(jsonl_config) = config.exporter.jsonl() {
136            let jsonl_exporter = JsonlExporter::from_config(jsonl_config);
137            info!("Using JSONL exporter: {}", jsonl_config.file);
138            return Ok(Self {
139                exporter: Box::new(jsonl_exporter),
140            });
141        }
142
143        // 4. 尝试创建 SQLite 导出器
144        #[cfg(feature = "sqlite")]
145        if let Some(sqlite_config) = config.exporter.sqlite() {
146            let sqlite_exporter = SqliteExporter::from_config(sqlite_config);
147            info!("Using SQLite exporter: {}", sqlite_config.database_url);
148            return Ok(Self {
149                exporter: Box::new(sqlite_exporter),
150            });
151        }
152
153        // 5. 尝试创建 DuckDB 导出器
154        #[cfg(feature = "duckdb")]
155        if let Some(duckdb_config) = config.exporter.duckdb() {
156            let duckdb_exporter = DuckdbExporter::from_config(duckdb_config);
157            info!("Using DuckDB exporter: {}", duckdb_config.database_url);
158            return Ok(Self {
159                exporter: Box::new(duckdb_exporter),
160            });
161        }
162
163        // 6. 尝试创建 PostgreSQL 导出器
164        #[cfg(feature = "postgres")]
165        if let Some(postgres_config) = config.exporter.postgres() {
166            let postgres_exporter = PostgresExporter::from_config(postgres_config);
167            info!("Using PostgreSQL exporter");
168            return Ok(Self {
169                exporter: Box::new(postgres_exporter),
170            });
171        }
172
173        // 7. 尝试创建 DM 导出器
174        #[cfg(feature = "dm")]
175        if let Some(dm_config) = config.exporter.dm() {
176            let dm_exporter = DmExporter::from_config(dm_config);
177            info!("Using DM exporter: {}", dm_config.userid);
178            return Ok(Self {
179                exporter: Box::new(dm_exporter),
180            });
181        }
182
183        Err(Error::Config(ConfigError::NoExporters))
184    }
185    /// 初始化导出器
186    pub fn initialize(&mut self) -> Result<()> {
187        info!("Initializing exporters...");
188        self.exporter.initialize()?;
189        info!("Exporters initialized");
190        Ok(())
191    }
192
193    /// 批量导出日志记录
194    pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
195        if sqllogs.is_empty() {
196            return Ok(());
197        }
198
199        // 转换为引用的切片
200        let refs: Vec<&Sqllog<'_>> = sqllogs.iter().collect();
201        self.exporter.export_batch(&refs)
202    }
203
204    /// 完成导出器
205    pub fn finalize(&mut self) -> Result<()> {
206        info!("Finalizing exporters...");
207        self.exporter.finalize()?;
208        info!("Exporters finished");
209        Ok(())
210    }
211
212    /// 获取导出器名称
213    pub fn name(&self) -> &str {
214        self.exporter.name()
215    }
216
217    /// 获取导出统计信息
218    pub fn stats(&self) -> Option<ExportStats> {
219        self.exporter.stats_snapshot()
220    }
221
222    /// 记录导出器的统计信息到日志
223    pub fn log_stats(&self) {
224        if let Some(s) = self.stats() {
225            info!(
226                "Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
227                self.name(),
228                s.exported,
229                s.failed,
230                s.skipped,
231                s.total(),
232                if s.flush_operations > 0 {
233                    format!(
234                        " | flushed:{} times (recent {} entries)",
235                        s.flush_operations, s.last_flush_size
236                    )
237                } else {
238                    String::new()
239                }
240            );
241        } else {
242            info!("No export statistics available");
243        }
244    }
245}