Skip to main content

dm_database_sqllog2db/exporter/
mod.rs

1use crate::config::Config;
2use crate::error::{ConfigError, Error, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::info;
5
6#[cfg(feature = "csv")]
7pub mod csv;
8#[cfg(feature = "jsonl")]
9pub mod jsonl;
10#[cfg(feature = "sqlite")]
11pub mod sqlite;
12mod util;
13
14#[cfg(feature = "csv")]
15pub use csv::CsvExporter;
16#[cfg(feature = "jsonl")]
17pub use jsonl::JsonlExporter;
18#[cfg(feature = "sqlite")]
19pub use sqlite::SqliteExporter;
20
21/// 所有导出器必须实现的接口
22pub trait Exporter {
23    fn initialize(&mut self) -> Result<()>;
24    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
25
26    /// 批量导出(默认逐条调用 export)
27    fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
28        for sqllog in sqllogs {
29            self.export(sqllog)?;
30        }
31        Ok(())
32    }
33
34    fn finalize(&mut self) -> Result<()>;
35    fn name(&self) -> &str;
36
37    fn stats_snapshot(&self) -> Option<ExportStats> {
38        None
39    }
40}
41
42/// 导出统计
43#[derive(Debug, Default, Clone)]
44pub struct ExportStats {
45    pub exported: usize,
46    pub skipped: usize,
47    pub failed: usize,
48    pub flush_operations: usize,
49    pub last_flush_size: usize,
50}
51
52impl ExportStats {
53    #[must_use]
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    pub fn record_success(&mut self) {
59        self.exported += 1;
60    }
61
62    pub fn record_success_batch(&mut self, count: usize) {
63        self.exported += count;
64    }
65
66    #[must_use]
67    pub fn total(&self) -> usize {
68        self.exported + self.skipped + self.failed
69    }
70}
71
72/// 导出器管理器
73pub struct ExporterManager {
74    exporter: Box<dyn Exporter>,
75}
76
77impl std::fmt::Debug for ExporterManager {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("ExporterManager")
80            .field("exporter", &self.exporter.name())
81            .finish()
82    }
83}
84
85impl ExporterManager {
86    pub fn from_config(config: &Config) -> Result<Self> {
87        info!("Initializing exporter manager...");
88
89        #[cfg(feature = "csv")]
90        if let Some(cfg) = &config.exporter.csv {
91            info!("Using CSV exporter: {}", cfg.file);
92            return Ok(Self {
93                exporter: Box::new(CsvExporter::from_config(cfg)),
94            });
95        }
96
97        #[cfg(feature = "jsonl")]
98        if let Some(cfg) = &config.exporter.jsonl {
99            info!("Using JSONL exporter: {}", cfg.file);
100            return Ok(Self {
101                exporter: Box::new(JsonlExporter::from_config(cfg)),
102            });
103        }
104
105        #[cfg(feature = "sqlite")]
106        if let Some(cfg) = &config.exporter.sqlite {
107            info!("Using SQLite exporter: {}", cfg.database_url);
108            return Ok(Self {
109                exporter: Box::new(SqliteExporter::from_config(cfg)),
110            });
111        }
112
113        Err(Error::Config(ConfigError::NoExporters))
114    }
115
116    pub fn initialize(&mut self) -> Result<()> {
117        info!("Initializing exporters...");
118        self.exporter.initialize()?;
119        info!("Exporters initialized");
120        Ok(())
121    }
122
123    /// 批量导出,直接传 slice,零额外分配
124    pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
125        self.exporter.export_batch(sqllogs)
126    }
127
128    pub fn finalize(&mut self) -> Result<()> {
129        info!("Finalizing exporters...");
130        self.exporter.finalize()?;
131        info!("Exporters finished");
132        Ok(())
133    }
134
135    #[must_use]
136    pub fn name(&self) -> &str {
137        self.exporter.name()
138    }
139
140    pub fn log_stats(&self) {
141        if let Some(s) = self.exporter.stats_snapshot() {
142            info!(
143                "Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
144                self.name(),
145                s.exported,
146                s.failed,
147                s.skipped,
148                s.total(),
149                if s.flush_operations > 0 {
150                    format!(
151                        " | flushed: {} times (recent {} entries)",
152                        s.flush_operations, s.last_flush_size
153                    )
154                } else {
155                    String::new()
156                }
157            );
158        }
159    }
160}