#![allow(dead_code)]
use std::cell::RefCell;
use std::collections::HashMap;
use super::cellvalue::CellValue;
use super::conn_params::ConnParams;
use super::error::Error;
use super::params::Params;
use super::statement_async::StatementAsync;
use super::transaction_async::*;
use super::wireprotocol_async::*;
use super::xsqlvar::XSQLVar;
use super::*;
pub struct ConnectionAsync {
wp: RefCell<WireProtocolAsync>,
trans_handle: i32, pub host: String,
pub port: u16,
pub username: String,
pub password: String,
pub db_name: String,
conn_options: HashMap<String, String>,
}
impl ConnectionAsync {
pub async fn connect(
host: &str,
port: u16,
db_name: &str,
username: &str,
password: &str,
conn_options: &HashMap<String, String>,
) -> Result<ConnectionAsync, Error> {
let mut wp = WireProtocolAsync::new(host, port, &conn_options["timezone"]).await?;
let (client_public, client_secret) = srp::get_client_seed();
wp.op_connect(db_name, username, password, &conn_options, &client_public)
.await?;
wp.parse_connect_response(
username,
password,
&conn_options,
&client_public,
&client_secret,
)
.await?;
wp.op_attach(db_name, username, password, &conn_options["role"])
.await?;
let (db_handle, _, _) = wp.op_response().await?;
wp.db_handle = db_handle;
wp.op_transaction(true).await?;
let (trans_handle, _, _) = wp.op_response().await?;
Ok(ConnectionAsync {
wp: RefCell::new(wp),
trans_handle,
host: host.to_string(),
port,
username: username.to_string(),
password: password.to_string(),
db_name: db_name.to_string(),
conn_options: conn_options.clone(),
})
}
pub async fn connect_url(conn_string: &str) -> Result<ConnectionAsync, Error> {
let (conn_params, conn_options) = ConnParams::from_url(conn_string)?;
ConnectionAsync::connect(
&conn_params.host,
conn_params.port,
&conn_params.db_name,
&conn_params.username,
&conn_params.password,
&conn_options,
)
.await
}
pub async fn create_database_url(conn_string: &str) -> Result<ConnectionAsync, Error> {
let (conn_params, conn_options) = ConnParams::from_url(conn_string)?;
let mut wp = WireProtocolAsync::new(
&conn_params.host,
conn_params.port,
&conn_options["timezone"],
)
.await?;
let (client_public, client_secret) = srp::get_client_seed();
wp.op_connect(
&conn_params.db_name,
&conn_params.username,
&conn_params.password,
&conn_options,
&client_public,
)
.await?;
wp.parse_connect_response(
&conn_params.username,
&conn_params.password,
&conn_options,
&client_public,
&client_secret,
)
.await?;
let page_size: u32 = conn_options["page_size"].parse().unwrap();
wp.op_create(
&conn_params.db_name,
&conn_params.username,
&conn_params.password,
&conn_options["role"],
page_size,
)
.await?;
let (db_handle, _, _) = wp.op_response().await?;
wp.db_handle = db_handle;
wp.op_transaction(true).await?;
let (trans_handle, _, _) = wp.op_response().await?;
Ok(ConnectionAsync {
wp: RefCell::new(wp),
trans_handle,
host: conn_params.host,
port: conn_params.port,
username: conn_params.username,
password: conn_params.password,
db_name: conn_params.db_name,
conn_options,
})
}
pub(crate) async fn _execute_batch(
&mut self,
query: &str,
trans_handle: i32,
) -> Result<(), Error> {
let mut wp = self.wp.borrow_mut();
wp.op_exec_immediate(trans_handle, query).await?;
wp.op_response().await?;
wp.op_commit_retaining(trans_handle).await?;
wp.op_response().await?;
Ok(())
}
pub async fn execute_batch(&mut self, query: &str) -> Result<(), Error> {
self._execute_batch(query, self.trans_handle).await
}
pub(crate) async fn _execute<P: Params>(
&mut self,
query: &str,
params: P,
trans_handle: i32,
) -> Result<(), Error> {
let mut stmt = {
let mut wp = self.wp.borrow_mut();
wp.op_allocate_statement().await?;
let mut stmt_handle = if (wp.accept_type & PTYPE_MASK) == PTYPE_LAZY_SEND {
wp.lazy_response_count += 1;
-1
} else {
let (stmt_handle, _, _) = wp.op_response().await?;
stmt_handle
};
wp.op_prepare_statement(stmt_handle, trans_handle, query)
.await?;
if (wp.accept_type & PTYPE_MASK) == PTYPE_LAZY_SEND && wp.lazy_response_count > 0 {
wp.lazy_response_count -= 1;
let (h, _, _) = wp.op_response().await?;
stmt_handle = h;
}
let (_, buf, _) = wp.op_response().await?;
let (stmt_type, xsqlda) = wp.parse_xsqlda(&buf, stmt_handle).await?;
StatementAsync::new(self, trans_handle, stmt_handle, stmt_type, xsqlda, true)
};
stmt.execute(params).await?;
Ok(())
}
pub async fn execute<P: Params>(&mut self, query: &str, params: P) -> Result<(), Error> {
self._execute(query, params, self.trans_handle).await
}
pub(crate) async fn _commit(&self, trans_handle: i32) -> Result<(), Error> {
let mut wp = self.wp.borrow_mut();
wp.op_commit_retaining(trans_handle).await?;
wp.op_response().await?;
Ok(())
}
pub async fn commit(&self) -> Result<(), Error> {
self._commit(self.trans_handle).await
}
pub(crate) async fn _begin_trans(&mut self) -> Result<i32, Error> {
let mut wp = self.wp.borrow_mut();
wp.op_transaction(false).await?;
let (trans_handle, _, _) = wp.op_response().await?;
Ok(trans_handle)
}
pub(crate) async fn _rollback(&mut self, trans_handle: i32) -> Result<(), Error> {
let mut wp = self.wp.borrow_mut();
wp.op_rollback_retaining(trans_handle).await?;
wp.op_response().await?;
Ok(())
}
pub async fn rollback(&mut self) -> Result<(), Error> {
self._rollback(self.trans_handle).await
}
pub async fn _prepare(
&mut self,
query: &str,
trans_handle: i32,
) -> Result<StatementAsync<'_>, Error> {
let mut wp = self.wp.borrow_mut();
wp.op_allocate_statement().await?;
let mut stmt_handle = if (wp.accept_type & PTYPE_MASK) == PTYPE_LAZY_SEND {
wp.lazy_response_count += 1;
-1
} else {
let (stmt_handle, _, _) = wp.op_response().await?;
stmt_handle
};
wp.op_prepare_statement(stmt_handle, trans_handle, query)
.await?;
if (wp.accept_type & PTYPE_MASK) == PTYPE_LAZY_SEND && wp.lazy_response_count > 0 {
wp.lazy_response_count -= 1;
let (h, _, _) = wp.op_response().await?;
stmt_handle = h;
}
let (_, _, buf) = wp.op_response().await?;
let (stmt_type, xsqlda) = wp.parse_xsqlda(&buf, stmt_handle).await?;
Ok(StatementAsync::new(
self,
trans_handle,
stmt_handle,
stmt_type,
xsqlda,
true, ))
}
pub async fn prepare(&mut self, query: &str) -> Result<StatementAsync<'_>, Error> {
self._prepare(query, self.trans_handle).await
}
pub async fn transaction(&mut self) -> Result<TransactionAsync<'_>, Error> {
TransactionAsync::new(self).await
}
pub(crate) async fn _execute_statement(
&self,
trans_handle: i32,
stmt_handle: i32,
stmt_type: u32,
params: &[(Vec<u8>, Vec<u8>, bool)],
) -> Result<usize, Error> {
let mut wp = self.wp.borrow_mut();
wp.op_execute(stmt_handle, trans_handle, params).await?;
wp.op_response().await?;
Ok(wp.rowcount(stmt_handle, stmt_type).await?)
}
pub(crate) async fn _fetch(
&self,
stmt_handle: i32,
blr: &Vec<u8>,
xsqlda: &[XSQLVar],
) -> Result<(Vec<Vec<CellValue>>, bool), Error> {
let mut wp = self.wp.borrow_mut();
wp.op_fetch(stmt_handle, &blr).await?;
wp.op_fetch_response(xsqlda).await
}
pub(crate) async fn _get_blob_segments(
&self,
blob_id: &Vec<u8>,
trans_handle: i32,
) -> Result<Vec<u8>, Error> {
let mut wp = self.wp.borrow_mut();
wp.get_blob_segments(blob_id, trans_handle).await
}
pub(crate) async fn _free_statement(&self, stmt_handle: i32, drop_type: i32) -> () {
let mut wp = self.wp.borrow_mut();
wp.op_free_statement(stmt_handle, drop_type).await.unwrap();
if (wp.accept_type & PTYPE_MASK) == PTYPE_LAZY_SEND {
wp.lazy_response_count += 1;
} else {
wp.op_response().await.unwrap();
}
}
pub(crate) async fn drop_transaction(&self, trans_handle: i32) -> () {
let mut wp = self.wp.borrow_mut();
wp.op_rollback(trans_handle).await.unwrap();
wp.op_response().await.unwrap();
}
}