dm-database-sqllog2db 0.4.2

高性能 CLI 工具:流式解析达梦数据库 SQL 日志并导出到 CSV/JSONL/SQLite
Documentation
use super::util::{ensure_parent_dir, f32_ms_to_i64, strip_ip_prefix};
use super::{ExportStats, Exporter};
use crate::config;
use crate::error::{Error, ExportError, Result};
use dm_database_parser_sqllog::Sqllog;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};

pub struct CsvExporter {
    path: PathBuf,
    overwrite: bool,
    append: bool,
    writer: Option<BufWriter<File>>,
    stats: ExportStats,
    itoa_buf: itoa::Buffer,
    line_buf: Vec<u8>,
}

impl std::fmt::Debug for CsvExporter {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("CsvExporter")
            .field("path", &self.path)
            .field("stats", &self.stats)
            .finish_non_exhaustive()
    }
}

impl CsvExporter {
    #[must_use]
    pub fn new(path: impl AsRef<Path>) -> Self {
        Self {
            path: path.as_ref().to_path_buf(),
            overwrite: false,
            append: false,
            writer: None,
            stats: ExportStats::new(),
            itoa_buf: itoa::Buffer::new(),
            line_buf: Vec::with_capacity(512),
        }
    }

    #[must_use]
    pub fn from_config(config: &config::CsvExporter) -> Self {
        let mut e = Self::new(&config.file);
        if config.append {
            e.append = true;
        } else {
            e.overwrite = config.overwrite;
        }
        e
    }

    /// 将单条记录格式化并写入 writer
    /// 接收各字段的独立可变引用,允许 Rust 同时分开借用 self 的多个字段
    #[inline]
    fn write_record(
        itoa_buf: &mut itoa::Buffer,
        line_buf: &mut Vec<u8>,
        sqllog: &Sqllog<'_>,
        writer: &mut BufWriter<File>,
        path: &Path,
    ) -> Result<()> {
        let meta = sqllog.parse_meta();
        let pm = sqllog.parse_performance_metrics();

        line_buf.clear();

        line_buf.extend_from_slice(sqllog.ts.as_ref().as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(itoa_buf.format(meta.ep).as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(meta.sess_id.as_ref().as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(meta.thrd_id.as_ref().as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(meta.username.as_ref().as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(meta.trxid.as_ref().as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(meta.statement.as_ref().as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(meta.appname.as_ref().as_bytes());
        line_buf.push(b',');
        line_buf.extend_from_slice(strip_ip_prefix(meta.client_ip.as_ref()).as_bytes());
        line_buf.push(b',');
        if let Some(tag) = &sqllog.tag {
            line_buf.extend_from_slice(tag.as_ref().as_bytes());
        }
        line_buf.push(b',');
        line_buf.push(b'"');
        for &byte in pm.sql.as_bytes() {
            if byte == b'"' {
                line_buf.push(b'"');
            }
            line_buf.push(byte);
        }
        line_buf.push(b'"');
        line_buf.push(b',');
        if pm.exec_id != 0 || pm.exectime > 0.0 {
            line_buf.extend_from_slice(itoa_buf.format(f32_ms_to_i64(pm.exectime)).as_bytes());
            line_buf.push(b',');
            line_buf.extend_from_slice(itoa_buf.format(i64::from(pm.rowcount)).as_bytes());
            line_buf.push(b',');
            line_buf.extend_from_slice(itoa_buf.format(pm.exec_id).as_bytes());
            line_buf.push(b'\n');
        } else {
            line_buf.extend_from_slice(b",,\n");
        }

        writer.write_all(line_buf).map_err(|e| {
            Error::Export(ExportError::WriteError {
                path: path.to_path_buf(),
                reason: format!("write failed: {e}"),
            })
        })
    }
}

impl Exporter for CsvExporter {
    fn initialize(&mut self) -> Result<()> {
        ensure_parent_dir(&self.path).map_err(|e| {
            Error::Export(ExportError::WriteError {
                path: self.path.clone(),
                reason: format!("create dir failed: {e}"),
            })
        })?;

        let append_mode = self.append;
        let file_exists = self.path.exists();

        let file = if append_mode {
            OpenOptions::new()
                .create(true)
                .append(true)
                .open(&self.path)
        } else {
            OpenOptions::new()
                .create(true)
                .write(true)
                .truncate(self.overwrite)
                .open(&self.path)
        }
        .map_err(|e| {
            Error::Export(ExportError::WriteError {
                path: self.path.clone(),
                reason: format!("open failed: {e}"),
            })
        })?;

        let mut writer = BufWriter::with_capacity(16 * 1024 * 1024, file);

        if !append_mode || !file_exists {
            writer
                .write_all(b"ts,ep,sess_id,thrd_id,username,trx_id,statement,appname,client_ip,tag,sql,exec_time_ms,row_count,exec_id\n")
                .map_err(|e| {
                    Error::Export(ExportError::WriteError {
                        path: self.path.clone(),
                        reason: format!("write header failed: {e}"),
                    })
                })?;
        }

        self.writer = Some(writer);
        Ok(())
    }

    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
        let writer = self.writer.as_mut().ok_or_else(|| {
            Error::Export(ExportError::WriteError {
                path: self.path.clone(),
                reason: "not initialized".to_string(),
            })
        })?;
        Self::write_record(
            &mut self.itoa_buf,
            &mut self.line_buf,
            sqllog,
            writer,
            &self.path,
        )?;
        self.stats.record_success();
        Ok(())
    }

    fn export_batch(&mut self, sqllogs: &[Sqllog<'_>]) -> Result<()> {
        if sqllogs.is_empty() {
            return Ok(());
        }
        let writer = self.writer.as_mut().ok_or_else(|| {
            Error::Export(ExportError::WriteError {
                path: self.path.clone(),
                reason: "not initialized".to_string(),
            })
        })?;
        for sqllog in sqllogs {
            Self::write_record(
                &mut self.itoa_buf,
                &mut self.line_buf,
                sqllog,
                writer,
                &self.path,
            )?;
        }
        self.stats.record_success_batch(sqllogs.len());
        Ok(())
    }

    fn finalize(&mut self) -> Result<()> {
        if let Some(mut writer) = self.writer.take() {
            writer.flush().map_err(|e| {
                Error::Export(ExportError::WriteError {
                    path: self.path.clone(),
                    reason: format!("flush failed: {e}"),
                })
            })?;
        }
        Ok(())
    }

    fn name(&self) -> &'static str {
        "CSV"
    }

    fn stats_snapshot(&self) -> Option<ExportStats> {
        Some(self.stats.clone())
    }
}

impl Drop for CsvExporter {
    fn drop(&mut self) {
        if self.writer.is_some() {
            let _ = self.finalize();
        }
    }
}