use super::util::ensure_parent_dir;
use super::{ExportStats, Exporter, util::f32_ms_to_i64};
use crate::config;
use crate::error::{Error, ExportError, Result};
use dm_database_parser_sqllog::Sqllog;
use rayon::prelude::*;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
pub struct CsvExporter {
path: PathBuf,
overwrite: bool,
append: bool,
writer: Option<BufWriter<File>>,
stats: ExportStats,
itoa_buf: itoa::Buffer, line_buf: Vec<u8>, }
impl std::fmt::Debug for CsvExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CsvExporter")
.field("path", &self.path)
.field("overwrite", &self.overwrite)
.field("append", &self.append)
.field("stats", &self.stats)
.finish_non_exhaustive()
}
}
impl CsvExporter {
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
overwrite: false,
append: false,
writer: None,
stats: ExportStats::new(),
itoa_buf: itoa::Buffer::new(), line_buf: Vec::with_capacity(1024), }
}
#[must_use]
pub fn from_config(config: &config::CsvExporter) -> Self {
let mut exporter = Self::new(&config.file);
if config.append {
exporter.overwrite = false;
exporter.append = true;
}
exporter
}
#[inline]
fn format_csv_line(sqllog: &Sqllog<'_>) -> Vec<u8> {
let meta = sqllog.parse_meta();
let mut buf = Vec::with_capacity(512);
let mut itoa_buf = itoa::Buffer::new();
buf.extend_from_slice(sqllog.ts.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(itoa_buf.format(meta.ep).as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.sess_id.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.thrd_id.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.username.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.trxid.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.statement.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.appname.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.client_ip.as_ref().as_bytes());
buf.push(b',');
if let Some(tag) = &sqllog.tag {
buf.extend_from_slice(tag.as_ref().as_bytes());
}
buf.push(b',');
buf.push(b'"');
for &byte in sqllog.body().as_ref().as_bytes() {
if byte == b'"' {
buf.push(b'"');
buf.push(b'"');
} else {
buf.push(byte);
}
}
buf.push(b'"');
buf.push(b',');
if let Some(indicators) = sqllog.parse_indicators() {
let exec_time_ms = f32_ms_to_i64(indicators.execute_time);
buf.extend_from_slice(itoa_buf.format(exec_time_ms).as_bytes());
buf.push(b',');
buf.extend_from_slice(itoa_buf.format(i64::from(indicators.row_count)).as_bytes());
buf.push(b',');
buf.extend_from_slice(itoa_buf.format(indicators.execute_id).as_bytes());
buf.push(b'\n');
} else {
buf.extend_from_slice(b",,\n");
}
buf
}
}
impl Exporter for CsvExporter {
fn initialize(&mut self) -> Result<()> {
ensure_parent_dir(&self.path).map_err(|e| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: format!("Failed to create directory: {e}"),
})
})?;
let append_mode = self.append;
let file_exists = self.path.exists();
let file = if append_mode {
OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
} else {
OpenOptions::new()
.create(true)
.write(true)
.truncate(self.overwrite)
.open(&self.path)
};
let file = file.map_err(|e| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: format!("Failed to open file: {e}"),
})
})?;
let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, file);
if !append_mode || !file_exists {
writer.write_all(b"ts,ep,sess_id,thrd_id,username,trx_id,statement,appname,client_ip,tag,sql,exec_time_ms,row_count,exec_id\n")
.map_err(|e| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: format!("Failed to write CSV header: {e}"),
})
})?;
}
self.writer = Some(writer);
Ok(())
}
fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
let meta = sqllog.parse_meta();
let writer = self.writer.as_mut().ok_or_else(|| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: "CSV exporter not initialized".to_string(),
})
})?;
self.line_buf.clear();
let buf = &mut self.line_buf;
buf.extend_from_slice(sqllog.ts.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(self.itoa_buf.format(meta.ep).as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.sess_id.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.thrd_id.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.username.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.trxid.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.statement.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.appname.as_ref().as_bytes());
buf.push(b',');
buf.extend_from_slice(meta.client_ip.as_ref().as_bytes());
buf.push(b',');
if let Some(tag) = &sqllog.tag {
buf.extend_from_slice(tag.as_ref().as_bytes());
}
buf.push(b',');
buf.push(b'"');
for &byte in sqllog.body().as_ref().as_bytes() {
if byte == b'"' {
buf.push(b'"');
buf.push(b'"');
} else {
buf.push(byte);
}
}
buf.push(b'"');
buf.push(b',');
if let Some(indicators) = sqllog.parse_indicators() {
let exec_time_ms = f32_ms_to_i64(indicators.execute_time);
buf.extend_from_slice(self.itoa_buf.format(exec_time_ms).as_bytes());
buf.push(b',');
buf.extend_from_slice(
self.itoa_buf
.format(i64::from(indicators.row_count))
.as_bytes(),
);
buf.push(b',');
buf.extend_from_slice(self.itoa_buf.format(indicators.execute_id).as_bytes());
buf.push(b'\n');
} else {
buf.extend_from_slice(b",,\n");
}
writer.write_all(buf).map_err(|e| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: format!("Failed to write CSV line: {e}"),
})
})?;
self.stats.record_success();
Ok(())
}
fn finalize(&mut self) -> Result<()> {
if let Some(mut writer) = self.writer.take() {
writer.flush().map_err(|e| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: format!("Failed to flush buffer: {e}"),
})
})?;
} else {
}
Ok(())
}
fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
if sqllogs.is_empty() {
return Ok(());
}
let writer = self.writer.as_mut().ok_or_else(|| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: "CSV exporter not initialized".to_string(),
})
})?;
const CHUNK_SIZE: usize = 500;
for chunk in sqllogs.chunks(CHUNK_SIZE) {
let lines: Vec<Vec<u8>> = chunk
.par_iter()
.map(|sqllog| Self::format_csv_line(sqllog))
.collect();
for line in lines {
writer.write_all(&line).map_err(|e| {
Error::Export(ExportError::CsvExportFailed {
path: self.path.clone(),
reason: format!("Failed to write CSV line: {e}"),
})
})?;
self.stats.record_success();
}
}
Ok(())
}
fn name(&self) -> &'static str {
"CSV"
}
fn stats_snapshot(&self) -> Option<ExportStats> {
Some(self.stats.clone())
}
}
impl CsvExporter {}
impl Drop for CsvExporter {
fn drop(&mut self) {
if self.writer.is_some() {
let _ = self.finalize();
}
}
}