dm_database_sqllog2db/exporter/
mod.rs1use crate::config::Config;
2use crate::error::{ConfigError, Error, Result};
3use dm_database_parser_sqllog::Sqllog;
4use log::info;
5
6#[cfg(feature = "csv")]
7pub mod csv;
8#[cfg(feature = "jsonl")]
9pub mod jsonl;
10#[cfg(feature = "sqlite")]
11pub mod sqlite;
12mod util;
13
14#[cfg(feature = "csv")]
15pub use csv::CsvExporter;
16#[cfg(feature = "jsonl")]
17pub use jsonl::JsonlExporter;
18#[cfg(feature = "sqlite")]
19pub use sqlite::SqliteExporter;
20
21pub trait Exporter {
23 fn initialize(&mut self) -> Result<()>;
24 fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
25
26 fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
28 for sqllog in sqllogs {
29 self.export(sqllog)?;
30 }
31 Ok(())
32 }
33
34 fn finalize(&mut self) -> Result<()>;
35 fn name(&self) -> &str;
36
37 fn stats_snapshot(&self) -> Option<ExportStats> {
38 None
39 }
40}
41
42#[derive(Debug, Default, Clone)]
44pub struct ExportStats {
45 pub exported: usize,
46 pub skipped: usize,
47 pub failed: usize,
48 pub flush_operations: usize,
49 pub last_flush_size: usize,
50}
51
52impl ExportStats {
53 #[must_use]
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn record_success(&mut self) {
59 self.exported += 1;
60 }
61
62 pub fn record_success_batch(&mut self, count: usize) {
63 self.exported += count;
64 }
65
66 #[must_use]
67 pub fn total(&self) -> usize {
68 self.exported + self.skipped + self.failed
69 }
70}
71
72pub struct ExporterManager {
74 exporter: Box<dyn Exporter>,
75}
76
77impl std::fmt::Debug for ExporterManager {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("ExporterManager")
80 .field("exporter", &self.exporter.name())
81 .finish()
82 }
83}
84
85impl ExporterManager {
86 pub fn from_config(config: &Config) -> Result<Self> {
87 info!("Initializing exporter manager...");
88
89 #[cfg(feature = "csv")]
90 if let Some(cfg) = &config.exporter.csv {
91 info!("Using CSV exporter: {}", cfg.file);
92 return Ok(Self {
93 exporter: Box::new(CsvExporter::from_config(cfg)),
94 });
95 }
96
97 #[cfg(feature = "jsonl")]
98 if let Some(cfg) = &config.exporter.jsonl {
99 info!("Using JSONL exporter: {}", cfg.file);
100 return Ok(Self {
101 exporter: Box::new(JsonlExporter::from_config(cfg)),
102 });
103 }
104
105 #[cfg(feature = "sqlite")]
106 if let Some(cfg) = &config.exporter.sqlite {
107 info!("Using SQLite exporter: {}", cfg.database_url);
108 return Ok(Self {
109 exporter: Box::new(SqliteExporter::from_config(cfg)),
110 });
111 }
112
113 Err(Error::Config(ConfigError::NoExporters))
114 }
115
116 pub fn initialize(&mut self) -> Result<()> {
117 info!("Initializing exporters...");
118 self.exporter.initialize()?;
119 info!("Exporters initialized");
120 Ok(())
121 }
122
123 pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
125 self.exporter.export_batch(sqllogs)
126 }
127
128 pub fn finalize(&mut self) -> Result<()> {
129 info!("Finalizing exporters...");
130 self.exporter.finalize()?;
131 info!("Exporters finished");
132 Ok(())
133 }
134
135 #[must_use]
136 pub fn name(&self) -> &str {
137 self.exporter.name()
138 }
139
140 pub fn log_stats(&self) {
141 if let Some(s) = self.exporter.stats_snapshot() {
142 info!(
143 "Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
144 self.name(),
145 s.exported,
146 s.failed,
147 s.skipped,
148 s.total(),
149 if s.flush_operations > 0 {
150 format!(
151 " | flushed: {} times (recent {} entries)",
152 s.flush_operations, s.last_flush_size
153 )
154 } else {
155 String::new()
156 }
157 );
158 }
159 }
160}