dm_database_sqllog2db/exporter/
mod.rs1use crate::config::Config;
7use crate::error::{ConfigError, Error, Result};
8use dm_database_parser_sqllog::Sqllog;
9use log::info;
10
11#[cfg(feature = "csv")]
12pub mod csv;
13#[cfg(feature = "dm")]
14pub mod dm;
15#[cfg(feature = "duckdb")]
16pub mod duckdb;
17#[cfg(feature = "jsonl")]
18pub mod jsonl;
19#[cfg(feature = "parquet")]
20pub mod parquet;
21#[cfg(feature = "postgres")]
22pub mod postgres;
23#[cfg(feature = "sqlite")]
24pub mod sqlite;
25mod util;
26
27#[cfg(feature = "csv")]
28pub use csv::CsvExporter;
29#[cfg(feature = "dm")]
30pub use dm::DmExporter;
31#[cfg(feature = "duckdb")]
32pub use duckdb::DuckdbExporter;
33#[cfg(feature = "jsonl")]
34pub use jsonl::JsonlExporter;
35#[cfg(feature = "parquet")]
36pub use parquet::ParquetExporter;
37#[cfg(feature = "postgres")]
38pub use postgres::PostgresExporter;
39#[cfg(feature = "sqlite")]
40pub use sqlite::SqliteExporter;
41
42pub trait Exporter {
45 fn initialize(&mut self) -> Result<()>;
47
48 fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
50
51 fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
53 for sqllog in sqllogs {
54 self.export(sqllog)?;
55 }
56 Ok(())
57 }
58
59 fn finalize(&mut self) -> Result<()>;
61
62 fn name(&self) -> &str;
64
65 fn stats_snapshot(&self) -> Option<ExportStats> {
68 None
69 }
70}
71
72#[derive(Debug, Default, Clone)]
74pub struct ExportStats {
75 pub exported: usize,
77 pub skipped: usize,
79 pub failed: usize,
81 pub flush_operations: usize,
83 pub last_flush_size: usize,
85}
86
87impl ExportStats {
88 #[must_use]
89 pub fn new() -> Self {
90 Self::default()
91 }
92
93 pub fn record_success(&mut self) {
94 self.exported += 1;
95 }
96
97 #[must_use]
98 pub fn total(&self) -> usize {
99 self.exported + self.skipped + self.failed
100 }
101}
102
103pub struct ExporterManager {
105 exporter: Box<dyn Exporter>,
106}
107
108impl std::fmt::Debug for ExporterManager {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 f.debug_struct("ExporterManager")
111 .field("exporter_name", &self.exporter.name())
112 .finish()
113 }
114}
115
116impl ExporterManager {
117 pub fn from_config(config: &Config) -> Result<Self> {
119 info!("Initializing exporter manager...");
120
121 #[cfg(feature = "csv")]
125 if let Some(csv_config) = config.exporter.csv() {
126 let csv_exporter = CsvExporter::from_config(csv_config);
127 info!("Using CSV exporter: {}", csv_config.file);
128 return Ok(Self {
129 exporter: Box::new(csv_exporter),
130 });
131 }
132
133 #[cfg(feature = "parquet")]
135 if let Some(parquet_config) = config.exporter.parquet() {
136 let parquet_exporter = ParquetExporter::from_config(parquet_config);
137 info!("Using Parquet exporter: {}", parquet_config.file);
138 return Ok(Self {
139 exporter: Box::new(parquet_exporter),
140 });
141 }
142
143 #[cfg(feature = "jsonl")]
145 if let Some(jsonl_config) = config.exporter.jsonl() {
146 let jsonl_exporter = JsonlExporter::from_config(jsonl_config);
147 info!("Using JSONL exporter: {}", jsonl_config.file);
148 return Ok(Self {
149 exporter: Box::new(jsonl_exporter),
150 });
151 }
152
153 #[cfg(feature = "sqlite")]
155 if let Some(sqlite_config) = config.exporter.sqlite() {
156 let sqlite_exporter = SqliteExporter::from_config(sqlite_config);
157 info!("Using SQLite exporter: {}", sqlite_config.database_url);
158 return Ok(Self {
159 exporter: Box::new(sqlite_exporter),
160 });
161 }
162
163 #[cfg(feature = "duckdb")]
165 if let Some(duckdb_config) = config.exporter.duckdb() {
166 let duckdb_exporter = DuckdbExporter::from_config(duckdb_config);
167 info!("Using DuckDB exporter: {}", duckdb_config.database_url);
168 return Ok(Self {
169 exporter: Box::new(duckdb_exporter),
170 });
171 }
172
173 #[cfg(feature = "postgres")]
175 if let Some(postgres_config) = config.exporter.postgres() {
176 let postgres_exporter = PostgresExporter::from_config(postgres_config);
177 info!("Using PostgreSQL exporter");
178 return Ok(Self {
179 exporter: Box::new(postgres_exporter),
180 });
181 }
182
183 #[cfg(feature = "dm")]
185 if let Some(dm_config) = config.exporter.dm() {
186 let dm_exporter = DmExporter::from_config(dm_config);
187 info!("Using DM exporter: {}", dm_config.userid);
188 return Ok(Self {
189 exporter: Box::new(dm_exporter),
190 });
191 }
192
193 Err(Error::Config(ConfigError::NoExporters))
194 }
195 pub fn initialize(&mut self) -> Result<()> {
197 info!("Initializing exporters...");
198 self.exporter.initialize()?;
199 info!("Exporters initialized");
200 Ok(())
201 }
202
203 pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
205 if sqllogs.is_empty() {
206 return Ok(());
207 }
208
209 let refs: Vec<&Sqllog<'_>> = sqllogs.iter().collect();
211 self.exporter.export_batch(&refs)
212 }
213
214 pub fn finalize(&mut self) -> Result<()> {
216 info!("Finalizing exporters...");
217 self.exporter.finalize()?;
218 info!("Exporters finished");
219 Ok(())
220 }
221
222 #[must_use]
224 pub fn name(&self) -> &str {
225 self.exporter.name()
226 }
227
228 #[must_use]
230 pub fn stats(&self) -> Option<ExportStats> {
231 self.exporter.stats_snapshot()
232 }
233
234 pub fn log_stats(&self) {
236 if let Some(s) = self.stats() {
237 info!(
238 "Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
239 self.name(),
240 s.exported,
241 s.failed,
242 s.skipped,
243 s.total(),
244 if s.flush_operations > 0 {
245 format!(
246 " | flushed:{} times (recent {} entries)",
247 s.flush_operations, s.last_flush_size
248 )
249 } else {
250 String::new()
251 }
252 );
253 } else {
254 info!("No export statistics available");
255 }
256 }
257}