dm_database_sqllog2db/exporter/
mod.rs1use crate::config::Config;
7use crate::error::{ConfigError, Error, Result};
8use dm_database_parser_sqllog::Sqllog;
9use log::info;
10
11#[cfg(feature = "csv")]
12pub mod csv;
13#[cfg(feature = "jsonl")]
14pub mod jsonl;
15#[cfg(feature = "sqlite")]
16pub mod sqlite;
17mod util;
18
19#[cfg(feature = "csv")]
20pub use csv::CsvExporter;
21#[cfg(feature = "jsonl")]
22pub use jsonl::JsonlExporter;
23#[cfg(feature = "sqlite")]
24pub use sqlite::SqliteExporter;
25
26pub trait Exporter {
29 fn initialize(&mut self) -> Result<()>;
31
32 fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
34
35 fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
37 for sqllog in sqllogs {
38 self.export(sqllog)?;
39 }
40 Ok(())
41 }
42
43 fn finalize(&mut self) -> Result<()>;
45
46 fn name(&self) -> &str;
48
49 fn stats_snapshot(&self) -> Option<ExportStats> {
52 None
53 }
54}
55
56#[derive(Debug, Default, Clone)]
58pub struct ExportStats {
59 pub exported: usize,
61 pub skipped: usize,
63 pub failed: usize,
65 pub flush_operations: usize,
67 pub last_flush_size: usize,
69}
70
71impl ExportStats {
72 #[must_use]
73 pub fn new() -> Self {
74 Self::default()
75 }
76
77 pub fn record_success(&mut self) {
78 self.exported += 1;
79 }
80
81 #[must_use]
82 pub fn total(&self) -> usize {
83 self.exported + self.skipped + self.failed
84 }
85}
86
87pub struct ExporterManager {
89 exporter: Box<dyn Exporter>,
90}
91
92impl std::fmt::Debug for ExporterManager {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.debug_struct("ExporterManager")
95 .field("exporter_name", &self.exporter.name())
96 .finish()
97 }
98}
99
100impl ExporterManager {
101 pub fn from_config(config: &Config) -> Result<Self> {
103 info!("Initializing exporter manager...");
104
105 #[cfg(feature = "csv")]
109 if let Some(csv_config) = config.exporter.csv() {
110 let csv_exporter = CsvExporter::from_config(csv_config);
111 info!("Using CSV exporter: {}", csv_config.file);
112 return Ok(Self {
113 exporter: Box::new(csv_exporter),
114 });
115 }
116
117 #[cfg(feature = "jsonl")]
119 if let Some(jsonl_config) = config.exporter.jsonl() {
120 let jsonl_exporter = JsonlExporter::from_config(jsonl_config);
121 info!("Using JSONL exporter: {}", jsonl_config.file);
122 return Ok(Self {
123 exporter: Box::new(jsonl_exporter),
124 });
125 }
126
127 #[cfg(feature = "sqlite")]
129 if let Some(sqlite_config) = config.exporter.sqlite() {
130 let sqlite_exporter = SqliteExporter::from_config(sqlite_config);
131 info!("Using SQLite exporter: {}", sqlite_config.database_url);
132 return Ok(Self {
133 exporter: Box::new(sqlite_exporter),
134 });
135 }
136
137 Err(Error::Config(ConfigError::NoExporters))
138 }
139 pub fn initialize(&mut self) -> Result<()> {
141 info!("Initializing exporters...");
142 self.exporter.initialize()?;
143 info!("Exporters initialized");
144 Ok(())
145 }
146
147 pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
149 if sqllogs.is_empty() {
150 return Ok(());
151 }
152
153 let refs: Vec<&Sqllog<'_>> = sqllogs.iter().collect();
155 self.exporter.export_batch(&refs)
156 }
157
158 pub fn finalize(&mut self) -> Result<()> {
160 info!("Finalizing exporters...");
161 self.exporter.finalize()?;
162 info!("Exporters finished");
163 Ok(())
164 }
165
166 #[must_use]
168 pub fn name(&self) -> &str {
169 self.exporter.name()
170 }
171
172 #[must_use]
174 pub fn stats(&self) -> Option<ExportStats> {
175 self.exporter.stats_snapshot()
176 }
177
178 pub fn log_stats(&self) {
180 if let Some(s) = self.stats() {
181 info!(
182 "Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
183 self.name(),
184 s.exported,
185 s.failed,
186 s.skipped,
187 s.total(),
188 if s.flush_operations > 0 {
189 format!(
190 " | flushed:{} times (recent {} entries)",
191 s.flush_operations, s.last_flush_size
192 )
193 } else {
194 String::new()
195 }
196 );
197 } else {
198 info!("No export statistics available");
199 }
200 }
201}