Skip to main content

dm_database_sqllog2db/exporter/
mod.rs

1/// Exporter 模块 - 负责将解析后的 SQL 日志导出到各种目标
2///
3/// 支持的导出目标:
4/// - CSV 文件
5/// - `SQLite` 数据库
6use crate::config::Config;
7use crate::error::{ConfigError, Error, Result};
8use dm_database_parser_sqllog::Sqllog;
9use log::info;
10
11#[cfg(feature = "csv")]
12pub mod csv;
13#[cfg(feature = "dm")]
14pub mod dm;
15#[cfg(feature = "duckdb")]
16pub mod duckdb;
17#[cfg(feature = "jsonl")]
18pub mod jsonl;
19#[cfg(feature = "parquet")]
20pub mod parquet;
21#[cfg(feature = "postgres")]
22pub mod postgres;
23#[cfg(feature = "sqlite")]
24pub mod sqlite;
25mod util;
26
27#[cfg(feature = "csv")]
28pub use csv::CsvExporter;
29#[cfg(feature = "dm")]
30pub use dm::DmExporter;
31#[cfg(feature = "duckdb")]
32pub use duckdb::DuckdbExporter;
33#[cfg(feature = "jsonl")]
34pub use jsonl::JsonlExporter;
35#[cfg(feature = "parquet")]
36pub use parquet::ParquetExporter;
37#[cfg(feature = "postgres")]
38pub use postgres::PostgresExporter;
39#[cfg(feature = "sqlite")]
40pub use sqlite::SqliteExporter;
41
42/// Exporter 基础 trait - 所有导出器必须实现此接口
43/// 导出器 trait
44pub trait Exporter {
45    /// 初始化导出器 (例如:创建文件、连接数据库、创建表等)
46    fn initialize(&mut self) -> Result<()>;
47
48    /// 导出单条 SQL 日志记录
49    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
50
51    /// 批量导出多条日志记录 (默认实现:逐条调用 export)
52    fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
53        for sqllog in sqllogs {
54            self.export(sqllog)?;
55        }
56        Ok(())
57    }
58
59    /// 完成导出 (例如:刷新缓冲区、提交事务、关闭文件等)
60    fn finalize(&mut self) -> Result<()>;
61
62    /// 获取导出器名称 (用于日志记录)
63    fn name(&self) -> &str;
64
65    /// 获取导出统计信息的快照
66    /// 默认返回 None;具体导出器可覆盖此方法以提供统计信息
67    fn stats_snapshot(&self) -> Option<ExportStats> {
68        None
69    }
70}
71
72/// 导出统计信息
73#[derive(Debug, Default, Clone)]
74pub struct ExportStats {
75    /// 成功导出的记录数
76    pub exported: usize,
77    /// 跳过的记录数
78    pub skipped: usize,
79    /// 失败的记录数
80    pub failed: usize,
81    /// 刷新/批量写入操作次数(数据库类导出器)
82    pub flush_operations: usize,
83    /// 最近一次刷新写入的记录数
84    pub last_flush_size: usize,
85}
86
87impl ExportStats {
88    #[must_use]
89    pub fn new() -> Self {
90        Self::default()
91    }
92
93    pub fn record_success(&mut self) {
94        self.exported += 1;
95    }
96
97    #[must_use]
98    pub fn total(&self) -> usize {
99        self.exported + self.skipped + self.failed
100    }
101}
102
103/// 导出器管理器 - 管理单个导出器
104pub struct ExporterManager {
105    exporter: Box<dyn Exporter>,
106}
107
108impl std::fmt::Debug for ExporterManager {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        f.debug_struct("ExporterManager")
111            .field("exporter_name", &self.exporter.name())
112            .finish()
113    }
114}
115
116impl ExporterManager {
117    /// 从配置创建导出器管理器
118    pub fn from_config(config: &Config) -> Result<Self> {
119        info!("Initializing exporter manager...");
120
121        // 优先级:CSV > Parquet > JSONL > SQLite > DM
122
123        // 1. 尝试创建 CSV 导出器
124        #[cfg(feature = "csv")]
125        if let Some(csv_config) = config.exporter.csv() {
126            let csv_exporter = CsvExporter::from_config(csv_config);
127            info!("Using CSV exporter: {}", csv_config.file);
128            return Ok(Self {
129                exporter: Box::new(csv_exporter),
130            });
131        }
132
133        // 2. 尝试创建 Parquet 导出器
134        #[cfg(feature = "parquet")]
135        if let Some(parquet_config) = config.exporter.parquet() {
136            let parquet_exporter = ParquetExporter::from_config(parquet_config);
137            info!("Using Parquet exporter: {}", parquet_config.file);
138            return Ok(Self {
139                exporter: Box::new(parquet_exporter),
140            });
141        }
142
143        // 3. 尝试创建 JSONL 导出器
144        #[cfg(feature = "jsonl")]
145        if let Some(jsonl_config) = config.exporter.jsonl() {
146            let jsonl_exporter = JsonlExporter::from_config(jsonl_config);
147            info!("Using JSONL exporter: {}", jsonl_config.file);
148            return Ok(Self {
149                exporter: Box::new(jsonl_exporter),
150            });
151        }
152
153        // 4. 尝试创建 SQLite 导出器
154        #[cfg(feature = "sqlite")]
155        if let Some(sqlite_config) = config.exporter.sqlite() {
156            let sqlite_exporter = SqliteExporter::from_config(sqlite_config);
157            info!("Using SQLite exporter: {}", sqlite_config.database_url);
158            return Ok(Self {
159                exporter: Box::new(sqlite_exporter),
160            });
161        }
162
163        // 5. 尝试创建 DuckDB 导出器
164        #[cfg(feature = "duckdb")]
165        if let Some(duckdb_config) = config.exporter.duckdb() {
166            let duckdb_exporter = DuckdbExporter::from_config(duckdb_config);
167            info!("Using DuckDB exporter: {}", duckdb_config.database_url);
168            return Ok(Self {
169                exporter: Box::new(duckdb_exporter),
170            });
171        }
172
173        // 6. 尝试创建 PostgreSQL 导出器
174        #[cfg(feature = "postgres")]
175        if let Some(postgres_config) = config.exporter.postgres() {
176            let postgres_exporter = PostgresExporter::from_config(postgres_config);
177            info!("Using PostgreSQL exporter");
178            return Ok(Self {
179                exporter: Box::new(postgres_exporter),
180            });
181        }
182
183        // 7. 尝试创建 DM 导出器
184        #[cfg(feature = "dm")]
185        if let Some(dm_config) = config.exporter.dm() {
186            let dm_exporter = DmExporter::from_config(dm_config);
187            info!("Using DM exporter: {}", dm_config.userid);
188            return Ok(Self {
189                exporter: Box::new(dm_exporter),
190            });
191        }
192
193        Err(Error::Config(ConfigError::NoExporters))
194    }
195    /// 初始化导出器
196    pub fn initialize(&mut self) -> Result<()> {
197        info!("Initializing exporters...");
198        self.exporter.initialize()?;
199        info!("Exporters initialized");
200        Ok(())
201    }
202
203    /// 批量导出日志记录
204    pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
205        if sqllogs.is_empty() {
206            return Ok(());
207        }
208
209        // 转换为引用的切片
210        let refs: Vec<&Sqllog<'_>> = sqllogs.iter().collect();
211        self.exporter.export_batch(&refs)
212    }
213
214    /// 完成导出器
215    pub fn finalize(&mut self) -> Result<()> {
216        info!("Finalizing exporters...");
217        self.exporter.finalize()?;
218        info!("Exporters finished");
219        Ok(())
220    }
221
222    /// 获取导出器名称
223    #[must_use]
224    pub fn name(&self) -> &str {
225        self.exporter.name()
226    }
227
228    /// 获取导出统计信息
229    #[must_use]
230    pub fn stats(&self) -> Option<ExportStats> {
231        self.exporter.stats_snapshot()
232    }
233
234    /// 记录导出器的统计信息到日志
235    pub fn log_stats(&self) {
236        if let Some(s) = self.stats() {
237            info!(
238                "Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
239                self.name(),
240                s.exported,
241                s.failed,
242                s.skipped,
243                s.total(),
244                if s.flush_operations > 0 {
245                    format!(
246                        " | flushed:{} times (recent {} entries)",
247                        s.flush_operations, s.last_flush_size
248                    )
249                } else {
250                    String::new()
251                }
252            );
253        } else {
254            info!("No export statistics available");
255        }
256    }
257}