use std::fmt;
use odbc_api::{Connection as OdbcConnection, Environment, Cursor, ResultSetMetadata, ConnectionOptions as OdbcConnectionOptions, handles::StatementImpl};
use log::{debug, warn};
use crate::error::{Error, Result as SdkResult};
use crate::types::{DamengValue};
use crate::query::QueryBuilder;
fn gbk_to_utf8(bytes: &[u8]) -> SdkResult<String> {
use encoding_rs::GBK;
if let Ok(s) = std::str::from_utf8(bytes) {
return Ok(s.to_string());
}
let (decoded, _, _) = GBK.decode(bytes);
Ok(decoded.to_string())
}
#[derive(Debug, Clone)]
pub struct ConnectionOptions {
pub server: String,
pub port: u16,
pub username: String,
pub password: String,
pub schema: String,
pub timeout: u32,
pub use_tls: bool,
pub additional_params: Vec<(String, String)>,
pub driver:String,
}
impl Default for ConnectionOptions {
fn default() -> Self {
Self {
server: "localhost".to_string(),
port: 5236,
username: "SYSDBA".to_string(),
driver:"{DM8 ODBC DRIVER}".to_string(),
password: "".to_string(),
schema: "DMHR".to_string(),
timeout: 30,
use_tls: false,
additional_params: Vec::new(),
}
}
}
impl fmt::Display for ConnectionOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ConnectionOptions {{ server: {}, port: {}, username: {}, schema: {}, timeout: {}s, use_tls: {} }}",
self.server, self.port, self.username, self.schema, self.timeout, self.use_tls
)
}
}
pub struct Transaction {
_inner: (),
}
pub struct Connection {
_environment: Environment,
_inner: Option<OdbcConnection<'static>>,
options: ConnectionOptions,
}
use odbc_api::IntoParameter;
impl Connection {
pub fn connect() -> SdkResult<Self> {
Self::with_options(ConnectionOptions::default())
}
pub fn with_options(options: ConnectionOptions) -> SdkResult<Self> {
debug!("Creating connection with options: {}", options);
let connection_string = build_connection_string(&options);
debug!("Connection string: {}", mask_password(&connection_string));
let environment = Environment::new()
.map_err(|e| Error::connection(format!("Failed to create ODBC environment: {}", e)))?;
let env_ref: &'static Environment = Box::leak(Box::new(environment));
let connection = env_ref.connect_with_connection_string(&connection_string, OdbcConnectionOptions::default())
.map_err(|e| Error::connection(format!("Failed to connect to database: {}", e)))?;
debug!("Successfully connected to database");
Ok(Self {
_environment: Environment::new().unwrap(), _inner: Some(connection),
options,
})
}
pub fn options(&self) -> &ConnectionOptions {
&self.options
}
pub fn query_with_param<P>(&mut self, sql: &str, params: P) -> SdkResult<QueryResult>
where
P: odbc_api::ParameterCollectionRef,
{
debug!("Executing parametrized query: {}", sql);
let cursor = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?
.execute(sql, params)
.map_err(|e| Error::query(format!("Failed to execute query with params: {}", e)))?
.ok_or_else(|| Error::query("Query did not return a result set".to_string()))?;
QueryResult::from_cursor(cursor, sql)
}
pub fn execute_with_param<P>(&mut self, sql: &str, params: P) -> SdkResult<bool>
where
P: odbc_api::ParameterCollectionRef,
{
debug!("Executing parametrized statement: {}", sql);
let conn = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?;
let result = conn.execute(sql, params);
match result {
Ok(_) => Ok(true),
Err(e) => Err(Error::query(format!("Failed to execute statement with params: {}", e))),
}
}
pub fn query(&mut self, sql: &str) -> SdkResult<QueryResult> {
debug!("Executing query: {}", sql);
let cursor = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?
.execute(sql, ())
.map_err(|e| Error::query(format!("Failed to execute query: {}", e)))?
.ok_or_else(|| Error::query("Query did not return a result set".to_string()))?;
QueryResult::from_cursor(cursor, sql)
}
pub fn execute(&mut self, sql: &str) -> SdkResult<bool> {
debug!("Executing statement: {}", sql);
let conn = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?;
let cursor = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?
.execute(sql, ());
Ok(cursor.is_ok())
}
pub fn begin_transaction(&mut self) -> SdkResult<Transaction> {
debug!("Beginning transaction");
let conn = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?;
conn.set_autocommit(false)
.map_err(|e| Error::transaction(format!("Failed to begin transaction: {}", e)))?;
Ok(Transaction { _inner: () })
}
pub fn commit(&mut self) -> SdkResult<()> {
debug!("Committing transaction");
let conn = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?;
conn.commit()
.map_err(|e| Error::transaction(format!("Failed to commit transaction: {}", e)))?;
conn.set_autocommit(true)
.map_err(|e| Error::transaction(format!("Failed to set autocommit after commit: {}", e)))?;
Ok(())
}
pub fn rollback(&mut self) -> SdkResult<()> {
debug!("Rolling back transaction");
let conn = self._inner.as_mut()
.ok_or_else(|| Error::connection("Connection not established".to_string()))?;
conn.rollback()
.map_err(|e| Error::transaction(format!("Failed to rollback transaction: {}", e)))?;
conn.set_autocommit(true)
.map_err(|e| Error::transaction(format!("Failed to set autocommit after rollback: {}", e)))?;
Ok(())
}
pub fn query_builder(&self) -> QueryBuilder {
QueryBuilder::new()
}
pub fn database_info(&self) -> SdkResult<DatabaseInfo> {
Ok(DatabaseInfo {
dbms_name: "Dameng DB".to_string(),
db_name: self.options.schema.clone(),
driver_name: "DM8 ODBC DRIVER".to_string(),
driver_version: "8.1.4".to_string(),
})
}
}
#[derive(Debug, Clone)]
pub struct DatabaseInfo {
pub dbms_name: String,
pub db_name: String,
pub driver_name: String,
pub driver_version: String,
}
impl fmt::Display for DatabaseInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"DatabaseInfo {{ dbms: {}, name: {}, driver: {} {} }}",
self.dbms_name, self.db_name, self.driver_name, self.driver_version
)
}
}
#[derive(Debug, Clone)]
pub struct QueryResult {
sql: String,
columns: Vec<String>,
rows: Vec<Vec<DamengValue>>,
}
impl QueryResult {
fn from_cursor<C>(mut cursor: odbc_api::CursorImpl<C>, sql: &str) -> SdkResult<Self>
where
C: odbc_api::handles::AsStatementRef,
{
use odbc_api::buffers::TextRowSet;
let num_cols = cursor.num_result_cols()
.map_err(|e| Error::query(format!("Failed to get column count: {}", e)))? as usize;
let mut columns = Vec::with_capacity(num_cols);
for col_index in 1..=num_cols as u16 {
let name = cursor.col_name(col_index)
.map_err(|e| Error::query(format!("Failed to get column name: {}", e)))?;
columns.push(name);
}
if num_cols == 0 {
return Ok(Self {
sql: sql.to_string(),
columns,
rows: Vec::new(),
});
}
let batch_size = 250;
let max_str_len = 4000;
let mut buffer = TextRowSet::for_cursor(batch_size, &mut cursor, Some(max_str_len))
.map_err(|e| Error::query(format!("Failed to create text buffer: {}", e)))?;
let mut block_cursor = cursor.bind_buffer(&mut buffer)
.map_err(|e| Error::query(format!("Failed to bind buffer: {}", e)))?;
let mut rows = Vec::new();
while let Some(batch) = block_cursor.fetch()
.map_err(|e| Error::query(format!("Failed to fetch batch: {}", e)))?
{
let num_rows = batch.num_rows();
for row_idx in 0..num_rows {
let mut row = Vec::with_capacity(num_cols);
for col_idx in 0..num_cols {
let text = batch.at(col_idx, row_idx);
if let Some(text_bytes) = text {
let string_value = gbk_to_utf8(text_bytes)?;
row.push(DamengValue::String(string_value));
} else {
row.push(DamengValue::Null);
}
}
rows.push(row);
}
}
Ok(Self {
sql: sql.to_string(),
columns,
rows,
})
}
pub fn new() -> Self {
Self {
sql: String::new(),
columns: Vec::new(),
rows: Vec::new(),
}
}
pub fn num_columns(&self) -> SdkResult<usize> {
Ok(self.columns.len())
}
pub fn column_names(&self) -> SdkResult<Vec<String>> {
Ok(self.columns.clone())
}
pub fn num_rows(&self) -> usize {
self.rows.len()
}
pub fn fetch_all(&mut self) -> SdkResult<Vec<Vec<DamengValue>>> {
Ok(self.rows.clone())
}
pub fn get_row(&self, index: usize) -> Option<&Vec<DamengValue>> {
self.rows.get(index)
}
pub fn get_value(&self, row: usize, col: usize) -> Option<&DamengValue> {
self.rows.get(row)?.get(col)
}
}
fn build_connection_string(options: &ConnectionOptions) -> String {
let mut parts = vec![
format!("DRIVER={}",options.driver),
format!("SERVER={}", options.server),
format!("PORT={}", options.port),
format!("UID={}", options.username),
format!("PWD={}", options.password),
format!("DATABASE={}", options.schema),
];
if options.use_tls {
parts.push("ENCRYPT=yes".to_string());
}
for (key, value) in &options.additional_params {
parts.push(format!("{}={}", key, value));
}
parts.join(";")
}
fn mask_password(connection_string: &str) -> String {
let parts: Vec<&str> = connection_string.split(';').collect();
let masked_parts: Vec<String> = parts
.iter()
.map(|part| {
if part.starts_with("PWD=") || part.starts_with("PASSWORD=") {
if let Some((key, _)) = part.split_once('=') {
format!("{}=*****", key)
} else {
part.to_string()
}
} else {
part.to_string()
}
})
.collect();
masked_parts.join(";")
}
impl Transaction {
#[deprecated(note = "Use Connection::commit() instead")]
pub fn commit(&mut self) -> SdkResult<()> {
Ok(())
}
#[deprecated(note = "Use Connection::rollback() instead")]
pub fn rollback(&mut self) -> SdkResult<()> {
Ok(())
}
}