dm-database-sqllog2db 0.2.2

高性能 CLI 工具:流式解析达梦数据库 SQL 日志并导出到 CSV/Parquet/JSONL/SQLite/DuckDB/PostgreSQL/DM
Documentation
use super::util::validate_identifier;
use super::{ExportStats, Exporter};
use crate::error::{Error, ExportError, Result};
use dm_database_parser_sqllog::Sqllog;
use log::{debug, info, warn};

#[cfg(feature = "oracle")]
use oracle::Connection;

/// Oracle 导出器(基于 rust-oracle)
pub struct OracleExporter {
    username: String,
    password: String,
    connect_string: String,
    conn: Option<Connection>,
    table_name: String,
    overwrite: bool,
    append: bool,
    stats: ExportStats,
    pending_records: Vec<OracleRecord>,
}

#[derive(Debug, Clone)]
struct OracleRecord {
    ts: String,
    ep: i32,
    sess_id: String,
    thrd_id: String,
    username: String,
    trx_id: String,
    statement: String,
    appname: String,
    client_ip: String,
    sql: String,
    exec_time_ms: Option<f32>,
    row_count: Option<i32>,
    exec_id: Option<i64>,
}

impl OracleExporter {
    pub fn new(
        username: String,
        password: String,
        connect_string: String,
        table_name: String,
        overwrite: bool,
        append: bool,
    ) -> Self {
        Self {
            username,
            password,
            connect_string,
            conn: None,
            table_name,
            overwrite,
            append,
            stats: ExportStats::new(),
            pending_records: Vec::new(),
        }
    }

    pub fn from_config(config: &crate::config::OracleExporter) -> Self {
        // 从连接字符串解析参数 (简单实现,假设格式为 user/password@host:port/service)
        let parts: Vec<&str> = config.connection_string.split('@').collect();
        let (username, password) = if parts.len() >= 2 {
            let creds: Vec<&str> = parts[0].split('/').collect();
            if creds.len() >= 2 {
                (creds[0].to_string(), creds[1].to_string())
            } else {
                ("user".to_string(), "password".to_string())
            }
        } else {
            ("user".to_string(), "password".to_string())
        };

        let connect_string = if parts.len() >= 2 {
            parts[1].to_string()
        } else {
            "localhost:1521/ORCL".to_string()
        };

        Self::new(
            username,
            password,
            connect_string,
            "sqllog".to_string(), // 默认表名
            false,                // 默认不覆盖
            false,                // 默认不追加
        )
    }

    fn create_table(&mut self) -> Result<()> {
        let conn = self.conn.as_mut().ok_or_else(|| {
            Error::Export(ExportError::DatabaseError {
                reason: "Connection not initialized".to_string(),
            })
        })?;

        // 使用通用类型映射;sql 字段使用 CLOB
        let create_block = format!(
            "BEGIN
                            EXECUTE IMMEDIATE 'CREATE TABLE {} (
                                id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
                                ts VARCHAR2(64) NOT NULL,
                                ep NUMBER(10) NOT NULL,
                                sess_id VARCHAR2(128) NOT NULL,
                                thrd_id VARCHAR2(128) NOT NULL,
                                username VARCHAR2(128) NOT NULL,
                                trx_id VARCHAR2(128) NOT NULL,
                                statement VARCHAR2(128) NOT NULL,
                                appname VARCHAR2(256) NOT NULL,
                                client_ip VARCHAR2(64) NOT NULL,
                                sql CLOB NOT NULL,
                                exec_time_ms BINARY_FLOAT,
                                row_count NUMBER(19),
                                exec_id NUMBER(19)
                            )';
                        EXCEPTION
                            WHEN OTHERS THEN
                                IF SQLCODE != -955 THEN -- ORA-00955
                                    RAISE;
                                END IF;
                        END;",
            self.table_name
        );

        conn.execute(&create_block, &[]).map_err(|e| {
            Error::Export(ExportError::DatabaseError {
                reason: format!("Failed to create table: {}", e),
            })
        })?;

        info!("Oracle table created or already exists");
        Ok(())
    }

    fn flush(&mut self) -> Result<()> {
        if self.pending_records.is_empty() {
            return Ok(());
        }
        let conn = self.conn.as_mut().ok_or_else(|| {
            Error::Export(ExportError::DatabaseError {
                reason: "Connection not initialized".to_string(),
            })
        })?;

        let count = self.pending_records.len();

        let insert_sql = format!(
            "INSERT INTO {} (
                    ts, ep, sess_id, thrd_id, username, trx_id, statement,
                    appname, client_ip, sql, exec_time_ms, row_count, exec_id
                 ) VALUES (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10, :11, :12, :13)",
            self.table_name
        );

        let mut stmt = conn.statement(&insert_sql).build().map_err(|e| {
            Error::Export(ExportError::DatabaseError {
                reason: format!("Failed to prepare statement: {}", e),
            })
        })?;

        for r in &self.pending_records {
            stmt.execute(&[
                &r.ts,
                &r.ep.to_string(),
                &r.sess_id,
                &r.thrd_id,
                &r.username,
                &r.trx_id,
                &r.statement,
                &r.appname,
                &r.client_ip,
                &r.sql,
                &r.exec_time_ms,
                &r.row_count,
                &r.exec_id,
            ])
            .map_err(|e| {
                Error::Export(ExportError::DatabaseError {
                    reason: format!("Failed to execute insert: {}", e),
                })
            })?;
        }

        conn.commit().map_err(|e| {
            Error::Export(ExportError::DatabaseError {
                reason: format!("Failed to commit: {}", e),
            })
        })?;

        debug!("Flushed {} records to Oracle", count);
        self.stats.flush_operations += 1;
        self.stats.last_flush_size = count;
        self.pending_records.clear();
        Ok(())
    }

    fn sqllog_to_record(sqllog: &Sqllog<'_>) -> OracleRecord {
        let meta = sqllog.parse_meta();
        let ind = sqllog.parse_indicators();

        OracleRecord {
            ts: sqllog.ts.to_string(),
            ep: meta.ep as i32,
            sess_id: meta.sess_id.to_string(),
            thrd_id: meta.thrd_id.to_string(),
            username: meta.username.to_string(),
            trx_id: meta.trxid.to_string(),
            statement: meta.statement.to_string(),
            appname: meta.appname.to_string(),
            client_ip: meta.client_ip.to_string(),
            sql: sqllog.body().to_string(),
            exec_time_ms: ind.as_ref().map(|i| i.execute_time),
            row_count: ind.as_ref().map(|i| i.row_count as i32),
            exec_id: ind.as_ref().map(|i| i.execute_id),
        }
    }
}

impl Exporter for OracleExporter {
    fn initialize(&mut self) -> Result<()> {
        info!("Initializing Oracle exporter: {}", self.connect_string);

        // 注意:Oracle 为网络数据库,未创建本地目录;确保 instant client 已在 PATH 中。

        // 创建连接
        let conn = Connection::connect(&self.username, &self.password, &self.connect_string)
            .map_err(|e| {
                Error::Export(ExportError::DatabaseError {
                    reason: format!("Failed to connect to Oracle: {}", e),
                })
            })?;
        self.conn = Some(conn);

        // 校验表名
        validate_identifier(&self.table_name)?;

        // 处理 overwrite/append 逻辑
        {
            let conn = self.conn.as_mut().ok_or_else(|| {
                Error::Export(ExportError::DatabaseError {
                    reason: "Connection not initialized".to_string(),
                })
            })?;
            if self.overwrite {
                let drop_block = format!(
                    "BEGIN
                       EXECUTE IMMEDIATE 'DROP TABLE {}';
                     EXCEPTION WHEN OTHERS THEN
                       IF SQLCODE != -942 THEN RAISE; END IF;
                     END;",
                    self.table_name
                );
                conn.execute(&drop_block, &[]).map_err(|e| {
                    Error::Export(ExportError::DatabaseError {
                        reason: format!("Failed to drop table: {}", e),
                    })
                })?;
            } else if !self.append {
                // 检查是否存在(USER_TABLES 按未加引号的标识符大写存储)
                let tname = self.table_name.to_uppercase();
                let mut rows = conn
                    .query("SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = :1", &[&tname])
                    .map_err(|e| {
                        Error::Export(ExportError::DatabaseError {
                            reason: format!("Failed to check table existence: {}", e),
                        })
                    })?;
                if rows.next().is_some() {
                    return Err(Error::Export(ExportError::DatabaseError {
                        reason: format!(
                            "Table '{}' already exists (set overwrite=true or append=true)",
                            self.table_name
                        ),
                    }));
                }
            }
        }

        // 创建表(若不存在)
        self.create_table()?;

        info!("Oracle exporter initialized");
        Ok(())
    }

    fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
        let record = Self::sqllog_to_record(sqllog);
        self.pending_records.push(record);
        self.flush()?;
        self.stats.record_success();
        Ok(())
    }

    fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
        debug!("Exporting {} records to Oracle in batch", sqllogs.len());
        for s in sqllogs {
            self.export(s)?;
        }
        Ok(())
    }

    fn finalize(&mut self) -> Result<()> {
        self.flush()?;
        info!(
            "Oracle export finished (success: {}, failed: {})",
            self.stats.exported, self.stats.failed
        );
        Ok(())
    }

    fn name(&self) -> &str {
        "Oracle"
    }

    fn stats_snapshot(&self) -> Option<ExportStats> {
        Some(self.stats.clone())
    }
}

impl Drop for OracleExporter {
    fn drop(&mut self) {
        if !self.pending_records.is_empty() {
            if let Err(e) = self.finalize() {
                warn!("Oracle exporter finalization on Drop failed: {}", e);
            }
        }
    }
}