use crate::config::Config;
use crate::error::{ConfigError, Error, Result};
use dm_database_parser_sqllog::Sqllog;
use log::info;
#[cfg(feature = "csv")]
pub mod csv;
#[cfg(feature = "jsonl")]
pub mod jsonl;
#[cfg(feature = "sqlite")]
pub mod sqlite;
mod util;
#[cfg(feature = "csv")]
pub use csv::CsvExporter;
#[cfg(feature = "jsonl")]
pub use jsonl::JsonlExporter;
#[cfg(feature = "sqlite")]
pub use sqlite::SqliteExporter;
pub trait Exporter {
fn initialize(&mut self) -> Result<()>;
fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()>;
fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
for sqllog in sqllogs {
self.export(sqllog)?;
}
Ok(())
}
fn finalize(&mut self) -> Result<()>;
fn name(&self) -> &str;
fn stats_snapshot(&self) -> Option<ExportStats> {
None
}
}
#[derive(Debug, Default, Clone)]
pub struct ExportStats {
pub exported: usize,
pub skipped: usize,
pub failed: usize,
pub flush_operations: usize,
pub last_flush_size: usize,
}
impl ExportStats {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record_success(&mut self) {
self.exported += 1;
}
#[must_use]
pub fn total(&self) -> usize {
self.exported + self.skipped + self.failed
}
}
pub struct ExporterManager {
exporter: Box<dyn Exporter>,
}
impl std::fmt::Debug for ExporterManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExporterManager")
.field("exporter_name", &self.exporter.name())
.finish()
}
}
impl ExporterManager {
pub fn from_config(config: &Config) -> Result<Self> {
info!("Initializing exporter manager...");
#[cfg(feature = "csv")]
if let Some(csv_config) = config.exporter.csv() {
let csv_exporter = CsvExporter::from_config(csv_config);
info!("Using CSV exporter: {}", csv_config.file);
return Ok(Self {
exporter: Box::new(csv_exporter),
});
}
#[cfg(feature = "jsonl")]
if let Some(jsonl_config) = config.exporter.jsonl() {
let jsonl_exporter = JsonlExporter::from_config(jsonl_config);
info!("Using JSONL exporter: {}", jsonl_config.file);
return Ok(Self {
exporter: Box::new(jsonl_exporter),
});
}
#[cfg(feature = "sqlite")]
if let Some(sqlite_config) = config.exporter.sqlite() {
let sqlite_exporter = SqliteExporter::from_config(sqlite_config);
info!("Using SQLite exporter: {}", sqlite_config.database_url);
return Ok(Self {
exporter: Box::new(sqlite_exporter),
});
}
Err(Error::Config(ConfigError::NoExporters))
}
pub fn initialize(&mut self) -> Result<()> {
info!("Initializing exporters...");
self.exporter.initialize()?;
info!("Exporters initialized");
Ok(())
}
pub fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
if sqllogs.is_empty() {
return Ok(());
}
let refs: Vec<&Sqllog<'_>> = sqllogs.iter().collect();
self.exporter.export_batch(&refs)
}
pub fn finalize(&mut self) -> Result<()> {
info!("Finalizing exporters...");
self.exporter.finalize()?;
info!("Exporters finished");
Ok(())
}
#[must_use]
pub fn name(&self) -> &str {
self.exporter.name()
}
#[must_use]
pub fn stats(&self) -> Option<ExportStats> {
self.exporter.stats_snapshot()
}
pub fn log_stats(&self) {
if let Some(s) = self.stats() {
info!(
"Export stats: {} => success: {}, failed: {}, skipped: {} (total: {}){}",
self.name(),
s.exported,
s.failed,
s.skipped,
s.total(),
if s.flush_operations > 0 {
format!(
" | flushed:{} times (recent {} entries)",
s.flush_operations, s.last_flush_size
)
} else {
String::new()
}
);
} else {
info!("No export statistics available");
}
}
}