use super::util::validate_identifier;
use super::{ExportStats, Exporter};
use crate::error::{Error, ExportError, Result};
use dm_database_parser_sqllog::Sqllog;
use log::{debug, info, warn};
#[cfg(feature = "oracle")]
use oracle::Connection;
pub struct OracleExporter {
username: String,
password: String,
connect_string: String,
conn: Option<Connection>,
table_name: String,
overwrite: bool,
append: bool,
stats: ExportStats,
pending_records: Vec<OracleRecord>,
}
#[derive(Debug, Clone)]
struct OracleRecord {
ts: String,
ep: i32,
sess_id: String,
thrd_id: String,
username: String,
trx_id: String,
statement: String,
appname: String,
client_ip: String,
sql: String,
exec_time_ms: Option<f32>,
row_count: Option<i32>,
exec_id: Option<i64>,
}
impl OracleExporter {
pub fn new(
username: String,
password: String,
connect_string: String,
table_name: String,
overwrite: bool,
append: bool,
) -> Self {
Self {
username,
password,
connect_string,
conn: None,
table_name,
overwrite,
append,
stats: ExportStats::new(),
pending_records: Vec::new(),
}
}
pub fn from_config(config: &crate::config::OracleExporter) -> Self {
let parts: Vec<&str> = config.connection_string.split('@').collect();
let (username, password) = if parts.len() >= 2 {
let creds: Vec<&str> = parts[0].split('/').collect();
if creds.len() >= 2 {
(creds[0].to_string(), creds[1].to_string())
} else {
("user".to_string(), "password".to_string())
}
} else {
("user".to_string(), "password".to_string())
};
let connect_string = if parts.len() >= 2 {
parts[1].to_string()
} else {
"localhost:1521/ORCL".to_string()
};
Self::new(
username,
password,
connect_string,
"sqllog".to_string(), false, false, )
}
fn create_table(&mut self) -> Result<()> {
let conn = self.conn.as_mut().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
let create_block = format!(
"BEGIN
EXECUTE IMMEDIATE 'CREATE TABLE {} (
id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
ts VARCHAR2(64) NOT NULL,
ep NUMBER(10) NOT NULL,
sess_id VARCHAR2(128) NOT NULL,
thrd_id VARCHAR2(128) NOT NULL,
username VARCHAR2(128) NOT NULL,
trx_id VARCHAR2(128) NOT NULL,
statement VARCHAR2(128) NOT NULL,
appname VARCHAR2(256) NOT NULL,
client_ip VARCHAR2(64) NOT NULL,
sql CLOB NOT NULL,
exec_time_ms BINARY_FLOAT,
row_count NUMBER(19),
exec_id NUMBER(19)
)';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN -- ORA-00955
RAISE;
END IF;
END;",
self.table_name
);
conn.execute(&create_block, &[]).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to create table: {}", e),
})
})?;
info!("Oracle table created or already exists");
Ok(())
}
fn flush(&mut self) -> Result<()> {
if self.pending_records.is_empty() {
return Ok(());
}
let conn = self.conn.as_mut().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
let count = self.pending_records.len();
let insert_sql = format!(
"INSERT INTO {} (
ts, ep, sess_id, thrd_id, username, trx_id, statement,
appname, client_ip, sql, exec_time_ms, row_count, exec_id
) VALUES (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10, :11, :12, :13)",
self.table_name
);
let mut stmt = conn.statement(&insert_sql).build().map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to prepare statement: {}", e),
})
})?;
for r in &self.pending_records {
stmt.execute(&[
&r.ts,
&r.ep.to_string(),
&r.sess_id,
&r.thrd_id,
&r.username,
&r.trx_id,
&r.statement,
&r.appname,
&r.client_ip,
&r.sql,
&r.exec_time_ms,
&r.row_count,
&r.exec_id,
])
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to execute insert: {}", e),
})
})?;
}
conn.commit().map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to commit: {}", e),
})
})?;
debug!("Flushed {} records to Oracle", count);
self.stats.flush_operations += 1;
self.stats.last_flush_size = count;
self.pending_records.clear();
Ok(())
}
fn sqllog_to_record(sqllog: &Sqllog<'_>) -> OracleRecord {
let meta = sqllog.parse_meta();
let ind = sqllog.parse_indicators();
OracleRecord {
ts: sqllog.ts.to_string(),
ep: meta.ep as i32,
sess_id: meta.sess_id.to_string(),
thrd_id: meta.thrd_id.to_string(),
username: meta.username.to_string(),
trx_id: meta.trxid.to_string(),
statement: meta.statement.to_string(),
appname: meta.appname.to_string(),
client_ip: meta.client_ip.to_string(),
sql: sqllog.body().to_string(),
exec_time_ms: ind.as_ref().map(|i| i.execute_time),
row_count: ind.as_ref().map(|i| i.row_count as i32),
exec_id: ind.as_ref().map(|i| i.execute_id),
}
}
}
impl Exporter for OracleExporter {
fn initialize(&mut self) -> Result<()> {
info!("Initializing Oracle exporter: {}", self.connect_string);
let conn = Connection::connect(&self.username, &self.password, &self.connect_string)
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to connect to Oracle: {}", e),
})
})?;
self.conn = Some(conn);
validate_identifier(&self.table_name)?;
{
let conn = self.conn.as_mut().ok_or_else(|| {
Error::Export(ExportError::DatabaseError {
reason: "Connection not initialized".to_string(),
})
})?;
if self.overwrite {
let drop_block = format!(
"BEGIN
EXECUTE IMMEDIATE 'DROP TABLE {}';
EXCEPTION WHEN OTHERS THEN
IF SQLCODE != -942 THEN RAISE; END IF;
END;",
self.table_name
);
conn.execute(&drop_block, &[]).map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to drop table: {}", e),
})
})?;
} else if !self.append {
let tname = self.table_name.to_uppercase();
let mut rows = conn
.query("SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = :1", &[&tname])
.map_err(|e| {
Error::Export(ExportError::DatabaseError {
reason: format!("Failed to check table existence: {}", e),
})
})?;
if rows.next().is_some() {
return Err(Error::Export(ExportError::DatabaseError {
reason: format!(
"Table '{}' already exists (set overwrite=true or append=true)",
self.table_name
),
}));
}
}
}
self.create_table()?;
info!("Oracle exporter initialized");
Ok(())
}
fn export(&mut self, sqllog: &Sqllog<'_>) -> Result<()> {
let record = Self::sqllog_to_record(sqllog);
self.pending_records.push(record);
self.flush()?;
self.stats.record_success();
Ok(())
}
fn export_batch(&mut self, sqllogs: &[&Sqllog<'_>]) -> Result<()> {
debug!("Exporting {} records to Oracle in batch", sqllogs.len());
for s in sqllogs {
self.export(s)?;
}
Ok(())
}
fn finalize(&mut self) -> Result<()> {
self.flush()?;
info!(
"Oracle export finished (success: {}, failed: {})",
self.stats.exported, self.stats.failed
);
Ok(())
}
fn name(&self) -> &str {
"Oracle"
}
fn stats_snapshot(&self) -> Option<ExportStats> {
Some(self.stats.clone())
}
}
impl Drop for OracleExporter {
fn drop(&mut self) {
if !self.pending_records.is_empty() {
if let Err(e) = self.finalize() {
warn!("Oracle exporter finalization on Drop failed: {}", e);
}
}
}
}