use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use rbdc::db::{Connection, ExecResult, Row};
use rbdc::try_stream;
use rbdc::Error;
use rbs::Value;
use std::sync::Arc;
use taos::sync::*;
use crate::encode::*;
use crate::options::TaosConnectOptions;
use crate::rows::{TaosColumn, TaosData, TaosRow};
#[derive(Clone)]
pub struct TaosConnection {
pub conn: Arc<Taos>,
}
impl Connection for TaosConnection{
fn exec_rows(
&mut self,
sql: &str,
params: Vec<Value>,
) -> BoxFuture<'_, Result<BoxStream<'_, Result<Box<dyn Row>, Error>>, Error>> {
let mut sql = sql.to_string();
let conn = self.conn.clone();
Box::pin(async move {
sql = sql_replacen(sql, params);
log::debug!("将要执行的sql:{}", sql);
let mut results: Vec<Box<dyn Row>> = vec![];
if sql.eq("begin") || sql.eq("commit") || sql.eq("rollback") {
log::warn!("不支持事务相关操作,直接返回");
} else {
let mut q = conn.query(&sql).map_err(|e| Error::from(e.to_string()))?;
if q.fields().len() > 0 {
let fields = q.fields();
let mut columns = vec![];
for field in fields {
columns.push(TaosColumn {
name: field.name().to_string(),
column_type: field.ty(),
});
}
for row in q.rows() {
let row_view = row.map_err(|e| Error::from(e.to_string()))?;
let mut datas = vec![];
for (name, value) in row_view {
datas.push(TaosData {
value: Some(format!("{}", value)),
colunm_name: name.to_string(),
});
}
let taos_row = TaosRow {
columns: Arc::new(columns.clone()),
datas,
};
results.push(Box::new(taos_row));
}
}
}
let stream = try_stream! {
for row in results {
r#yield!(row);
}
Ok(())
}
.boxed();
Ok(stream)
})
}
fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
let mut sql=sql.to_string();
Box::pin(async move {
sql=sql_replacen(sql,params);
log::debug!("将要执行sql:{}",sql);
if sql.eq("begin") || sql.eq("commit") || sql.eq("rollback"){
log::warn!("不支持事务相关操作,直接返回");
return Ok(ExecResult {
rows_affected: 0 as u64,
last_insert_id: Value::Null,
})
}
let rows= self.conn.exec(sql).map_err(|e| Error::from(e.to_string()))?;
return Ok(ExecResult {
rows_affected: rows as u64,
last_insert_id: Value::Null,
})
})
}
fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
Ok(())
})
}
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
Ok(())
})
}
}
impl TaosConnection {
pub async fn establish(opt: &TaosConnectOptions) -> Result<Self, Error> {
let builder= TaosBuilder::from_dsn(opt.dsn.clone())
.map_err(|e| Error::from(e.to_string()));
match builder.map(|b|b.build()){
Ok(taos) => {
match taos.map_err(|e|Error::from(e.to_string())){
Ok(conn) => {
Ok(Self{ conn: Arc::new(conn) })
}
Err(e) => {Err(e)}
}
},
Err(e) => {Err(e)}
}
}
}