use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::vec;
use thrift::transport::TIoChannel;
use thrift::{
protocol::{
TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, TCompactOutputProtocol,
TInputProtocol, TOutputProtocol,
},
transport::{TFramedReadTransport, TFramedWriteTransport, TTcpChannel},
};
use crate::client::rpc::{
TSCreateMultiTimeseriesReq, TSCreateTimeseriesReq, TSIServiceSyncClient, TSInsertRecordsReq,
TSInsertStringRecordReq, TSInsertTabletReq, TSOpenSessionReq, TSProtocolVersion,
TTSIServiceSyncClient,
};
use crate::protocal::{
TSCompressionType, TSDataType, TSEncoding, FLAG, MULTIPLE_ERROR, NEED_REDIRECTION,
SUCCESS_STATUS,
};
use super::rpc::{
TSDeleteDataReq, TSExecuteStatementReq, TSInsertRecordReq, TSInsertRecordsOfOneDeviceReq,
TSInsertTabletsReq, TSQueryDataSet, TSSetTimeZoneReq,
};
use super::{
rpc::{TSCloseSessionReq, TSStatus},
RowRecord,
};
use super::{DataSet, Dictionary, Result, Session, Value};
static DEFAULT_TIME_ZONE: &str = "Asia/Shanghai";
#[derive(Debug, Clone)]
pub struct Config {
pub host: String,
pub port: i32,
pub username: String,
pub password: String,
pub timeout_ms: Option<i64>,
pub fetch_size: i32,
pub timezone: Option<String>,
pub enable_compression: bool,
pub protocol_version: TSProtocolVersion,
pub is_align: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
host: String::from("127.0.0.1"),
port: 6667,
username: String::from("root"),
password: String::from("root"),
timeout_ms: Some(30000),
fetch_size: 1000,
timezone: Some(String::from(DEFAULT_TIME_ZONE)),
enable_compression: false,
protocol_version: TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3,
is_align: true,
}
}
}
pub struct RpcSession {
config: Config,
session_id: Option<i64>,
statement_id: i64,
client: TSIServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>,
}
impl<'a> RpcSession {
pub fn new(config: &'a Config) -> Result<Self> {
let mut tcp_channel = TTcpChannel::new();
let endpint = format!("{}:{}", config.host, config.port);
tcp_channel.open(&endpint).map_err(|err| {
Box::<dyn Error>::from(format!("failed to connect to {}, {:?}", endpint, err))
})?;
let (i_chan, o_chan) = tcp_channel.split()?;
let (i_prot, o_prot) = (
TFramedReadTransport::new(i_chan),
TFramedWriteTransport::new(o_chan),
);
let (input_protocol, output_protocol): (Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>) =
if config.enable_compression {
(
Box::new(TCompactInputProtocol::new(i_prot)),
Box::new(TCompactOutputProtocol::new(o_prot)),
)
} else {
(
Box::new(TBinaryInputProtocol::new(i_prot, true)),
Box::new(TBinaryOutputProtocol::new(o_prot, true)),
)
};
Ok(Self {
config: config.clone(),
session_id: None,
statement_id: -1,
client: TSIServiceSyncClient::new(input_protocol, output_protocol),
})
}
}
impl<'a> Iterator for RpcDataSet<'a> {
type Item = RowRecord;
fn next(&mut self) -> Option<Self::Item> {
if self.has_cached_results() {
let mut values: Vec<Value> = Vec::with_capacity(self.column_names.len());
let ts = self.query_data_set.time.drain(0..8).collect::<Vec<u8>>();
self.timestamp = i64::from_be_bytes(ts.try_into().unwrap());
for (column_index, column_data) in self.query_data_set.value_list.iter_mut().enumerate()
{
if self.row_index % 8 == 0 {
self.bitmaps[column_index] =
self.query_data_set.bitmap_list[column_index].remove(0);
}
let bitmap = self.bitmaps[column_index];
let shift = self.row_index % 8;
let null = ((FLAG >> shift) & (bitmap & 0xff)) == 0;
if !null {
let original_column_index = self.column_index_map.get(&column_index).unwrap();
let data_type = self.data_types.get(*original_column_index).unwrap();
let mut bytes: Vec<u8> = Vec::new();
match data_type {
TSDataType::Boolean => {
bytes.push(TSDataType::Boolean as u8);
bytes.push(column_data.remove(0));
}
TSDataType::Int32 => {
bytes.push(TSDataType::Int32 as u8);
bytes.extend(column_data.drain(0..4));
}
TSDataType::Int64 => {
bytes.push(TSDataType::Int64 as u8);
bytes.extend(column_data.drain(0..8));
}
TSDataType::Float => {
bytes.push(TSDataType::Float as u8);
bytes.extend(column_data.drain(0..4));
}
TSDataType::Double => {
bytes.push(TSDataType::Double as u8);
bytes.extend(column_data.drain(0..8));
}
TSDataType::Text => {
bytes.push(TSDataType::Text as u8);
let len = i32::from_be_bytes(
column_data
.drain(0..4)
.collect::<Vec<u8>>()
.try_into()
.unwrap(),
);
bytes.extend(column_data.drain(0..len as usize).collect::<Vec<u8>>());
}
}
values.push(Value::from(bytes));
} else {
values.push(Value::Null);
}
}
self.row_index += 1;
let mut output_values: Vec<Value> = Vec::with_capacity(if self.is_ignore_timestamp() {
self.get_column_names().len()
} else {
self.get_column_names().len() + 1
});
if !self.is_ignore_timestamp() {
output_values.push(Value::Int64(self.timestamp));
}
output_values.extend(self.column_names.iter().map(|column_name| {
values[*(self.column_name_index_map.get(column_name).unwrap()) as usize].clone()
}));
Some(RowRecord {
timestamp: self.timestamp,
values: output_values,
})
} else {
None
}
}
}
pub struct RpcDataSet<'a> {
session: &'a mut RpcSession,
statement: String,
query_id: i64,
is_ignore_time_stamp: Option<bool>,
timestamp: i64,
column_names: Vec<String>,
data_types: Vec<TSDataType>,
query_data_set: TSQueryDataSet,
column_index_map: HashMap<usize, usize>,
column_name_index_map: BTreeMap<String, i32>,
bitmaps: Vec<u8>,
row_index: usize,
closed: bool,
}
impl<'a> RpcDataSet<'a> {
#[allow(dead_code)]
fn is_null(&self, column_index: usize, row_index: usize) -> bool {
let bitmap = self.bitmaps[column_index];
let shift = row_index % 8;
((FLAG >> shift) & (bitmap & 0xff)) == 0
}
fn has_cached_results(&mut self) -> bool {
if self.closed {
return false;
}
if self.query_data_set.time.is_empty() {
if let Some(session_id) = self.session.session_id {
match self
.session
.client
.fetch_results(super::rpc::TSFetchResultsReq {
session_id,
statement: self.statement.clone(),
fetch_size: self.session.config.fetch_size,
query_id: self.query_id,
is_align: self.session.config.is_align,
timeout: self.session.config.timeout_ms,
}) {
Ok(resp) => {
let status = resp.status;
match check_status(status) {
Ok(_) => {
if resp.has_result_set {
if let Some(query_data_set) = resp.query_data_set {
self.query_data_set = query_data_set;
self.row_index = 0;
}
} else {
self.close();
return false;
}
}
Err(err) => {
eprint!("An error occurred when fetch result: {}", err);
return false;
}
}
}
Err(err) => {
eprint!("An error occurred when fetch result: {}", err);
return false;
}
}
}
}
!self.query_data_set.time.is_empty()
}
pub fn close(&mut self) {
if !self.closed {
if let Some(session_id) = self.session.session_id {
match self
.session
.client
.close_operation(super::rpc::TSCloseOperationReq {
session_id,
query_id: Some(self.query_id),
statement_id: Some(self.session.statement_id),
}) {
Ok(status) => match check_status(status) {
Ok(_) => {
self.closed = true;
}
Err(err) => {
eprint!("An error occurred when closing dataset {:?}", err)
}
},
Err(err) => {
eprint!("An error occurred when closing dataset {:?}", err)
}
}
}
}
}
}
impl<'a> Drop for RpcDataSet<'a> {
fn drop(&mut self) {
self.close();
}
}
impl<'a> DataSet for RpcDataSet<'a> {
fn get_column_names(&self) -> Vec<String> {
if self.is_ignore_timestamp() {
self.column_names.clone()
} else {
let mut column_names = vec![String::from("Time")];
column_names.extend(self.column_names.clone());
column_names
}
}
fn get_data_types(&self) -> Vec<TSDataType> {
if self.is_ignore_timestamp() {
self.data_types.clone()
} else {
let mut column_types = vec![TSDataType::Int64];
column_types.extend(self.data_types.clone());
column_types
}
}
fn is_ignore_timestamp(&self) -> bool {
if let Some(v) = self.is_ignore_time_stamp {
v
} else {
false
}
}
}
fn check_status(status: TSStatus) -> Result<()> {
match status.code {
SUCCESS_STATUS | NEED_REDIRECTION => Ok(()),
MULTIPLE_ERROR => {
let mut messges = String::new();
if let Some(sub_status) = status.sub_status {
for s in sub_status {
if s.code != SUCCESS_STATUS && s.code != NEED_REDIRECTION {
if let Some(msg) = s.message {
messges.push_str(format!("Code: {}, {}", s.code, msg).as_str());
messges.push(';');
}
}
}
}
if !messges.is_empty() {
Err(messges.into())
} else {
Ok(())
}
}
_ => {
if let Some(message) = status.message {
Err(format!("code: {}, {}", status.code, message).into())
} else {
Err(format!("code: {}", status.code).into())
}
}
}
}
fn fire_closed_error() -> Result<()> {
Err("Operation can't be performed, the session is closed.".into())
}
impl<'a> Session<'a> for RpcSession {
fn open(&mut self) -> Result<()> {
let resp = self.client.open_session(TSOpenSessionReq::new(
self.config.protocol_version,
self.config
.timezone
.clone()
.unwrap_or_else(|| DEFAULT_TIME_ZONE.to_string()),
self.config.username.clone(),
self.config.password.clone(),
None,
))?;
let status = resp.status;
match check_status(status) {
Ok(_) => {
self.session_id = resp.session_id;
Ok(())
}
Err(err) => Err(err),
}
}
fn close(&mut self) -> Result<()> {
if let Some(session_id) = self.session_id {
let status = self
.client
.close_session(TSCloseSessionReq::new(session_id))?;
self.session_id = None;
check_status(status)
} else {
fire_closed_error()
}
}
fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
if let Some(session_id) = self.session_id {
let status = self
.client
.set_storage_group(session_id, storage_group_id.into())?;
check_status(status)
} else {
fire_closed_error()
}
}
fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
self.delete_storage_groups(vec![storage_group_id])
}
fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()> {
if let Some(session_id) = self.session_id {
let status = self.client.delete_storage_groups(
session_id,
storage_group_ids.iter().map(ToString::to_string).collect(),
)?;
check_status(status)
} else {
fire_closed_error()
}
}
fn create_timeseries<T>(
&mut self,
path: &str,
data_type: crate::protocal::TSDataType,
encoding: crate::protocal::TSEncoding,
compressor: crate::protocal::TSCompressionType,
props: T,
attributes: T,
tags: T,
measurement_alias: Option<String>,
) -> Result<()>
where
T: Into<Option<Dictionary>>,
{
if let Some(session_id) = self.session_id {
let status = self.client.create_timeseries(TSCreateTimeseriesReq::new(
session_id,
path.to_string(),
data_type.into(),
encoding.into(),
compressor.into(),
props,
tags,
attributes,
measurement_alias,
))?;
check_status(status)
} else {
fire_closed_error()
}
}
fn create_multi_timeseries<T>(
&mut self,
paths: Vec<&str>,
data_types: Vec<crate::protocal::TSDataType>,
encodings: Vec<crate::protocal::TSEncoding>,
compressors: Vec<crate::protocal::TSCompressionType>,
props_list: T,
attributes_list: T,
tags_list: T,
measurement_alias_list: Option<Vec<String>>,
) -> Result<()>
where
T: Into<Option<Vec<Dictionary>>>,
{
if let Some(session_id) = self.session_id {
let status = self
.client
.create_multi_timeseries(TSCreateMultiTimeseriesReq::new(
session_id,
paths.iter().map(ToString::to_string).collect(),
data_types.into_iter().map(TSDataType::into).collect(),
encodings.into_iter().map(TSEncoding::into).collect(),
compressors
.into_iter()
.map(TSCompressionType::into)
.collect(),
props_list,
attributes_list,
tags_list,
measurement_alias_list,
))?;
check_status(status)
} else {
fire_closed_error()
}
}
fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()> {
if let Some(session_id) = self.session_id {
let status = self
.client
.delete_timeseries(session_id, paths.iter().map(ToString::to_string).collect())?;
check_status(status)
} else {
fire_closed_error()
}
}
fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()> {
if let Some(session_id) = self.session_id {
let status = self.client.delete_data(TSDeleteDataReq::new(
session_id,
paths.into_iter().map(ToString::to_string).collect(),
start_time,
end_time,
))?;
check_status(status)
} else {
fire_closed_error()
}
}
fn insert_string_record<T>(
&mut self,
device_id: &str,
measurements: Vec<&str>,
values: Vec<&str>,
timestamp: i64,
is_aligned: T,
) -> Result<()>
where
T: Into<Option<bool>>,
{
if let Some(session_id) = self.session_id {
let status = self
.client
.insert_string_record(TSInsertStringRecordReq::new(
session_id,
device_id.to_string(),
measurements.iter().map(ToString::to_string).collect(),
values.iter().map(ToString::to_string).collect(),
timestamp,
is_aligned,
))?;
check_status(status)
} else {
fire_closed_error()
}
}
fn get_time_zone(&mut self) -> Result<String> {
if let Some(session_id) = self.session_id {
let resp = self.client.get_time_zone(session_id)?;
let status = resp.status;
match check_status(status) {
Ok(_) => Ok(resp.time_zone),
Err(err) => Err(err),
}
} else {
Err("Operation can't be performed, the session is closed.".into())
}
}
fn set_time_zone(&mut self, time_zone: &str) -> Result<()> {
if let Some(session_id) = self.session_id {
let status = self
.client
.set_time_zone(TSSetTimeZoneReq::new(session_id, time_zone.to_string()))?;
check_status(status)
} else {
fire_closed_error()
}
}
fn execute_statement<T>(
&'a mut self,
statement: &str,
timeout_ms: T,
) -> Result<Box<dyn 'a + DataSet>>
where
T: Into<Option<i64>>,
{
if let Some(session_id) = self.session_id {
let resp = self.client.execute_statement(TSExecuteStatementReq {
session_id,
statement: statement.to_string(),
statement_id: self.statement_id,
fetch_size: Some(self.config.fetch_size),
timeout: timeout_ms.into(),
enable_redirect_query: None,
jdbc_query: None,
})?;
let status = resp.status;
let code = status.code;
if code == SUCCESS_STATUS {
{
if let (Some(column_names), Some(data_type_list)) =
(resp.columns, resp.data_type_list)
{
let column_name_index_map = match resp.column_name_index_map {
Some(map) => map,
None => {
let mut map: BTreeMap<String, i32> = BTreeMap::new();
for (index, name) in column_names.iter().enumerate() {
map.insert(name.to_string(), index as i32);
}
map
}
};
let data_types: Vec<TSDataType> =
data_type_list.iter().map(TSDataType::from).collect();
let mut column_index_map: HashMap<usize, usize> = HashMap::new();
let column_count = column_names.len();
for (index, name) in column_names.iter().enumerate() {
column_index_map
.insert(*column_name_index_map.get(name).unwrap() as usize, index);
}
Ok(Box::new(RpcDataSet {
session: self,
statement: statement.to_string(),
query_id: resp.query_id.unwrap(),
timestamp: -1,
is_ignore_time_stamp: resp.ignore_time_stamp,
query_data_set: resp.query_data_set.unwrap(),
column_names,
data_types,
bitmaps: vec![0_u8; column_count],
row_index: 0,
column_index_map,
column_name_index_map,
closed: false,
}))
} else {
Err("Can't get resources on execute_statement".into())
}
}
} else {
if let Err(e) = check_status(status) {
Err(e)
} else {
Err(format!("Unknow, code: {}", code).into())
}
}
} else {
Err("Operation can't be performed, the session is closed.".into())
}
}
fn execute_query_statement<T>(
&'a mut self,
statement: &str,
timeout_ms: T,
) -> Result<Box<dyn 'a + DataSet>>
where
T: Into<Option<i64>>,
{
if let Some(session_id) = self.session_id {
let resp = self.client.execute_query_statement(TSExecuteStatementReq {
session_id,
statement: statement.to_string(),
statement_id: self.statement_id,
fetch_size: Some(self.config.fetch_size),
timeout: timeout_ms.into(),
enable_redirect_query: None,
jdbc_query: None,
})?;
let status = resp.status;
let code = status.code;
if code == SUCCESS_STATUS {
let column_names: Vec<String> = resp.columns.unwrap();
let column_name_index_map = match resp.column_name_index_map {
Some(v) => v,
None => {
let mut map: BTreeMap<String, i32> = BTreeMap::new();
for (index, name) in column_names.iter().enumerate() {
map.insert(name.to_string(), index as i32);
}
map
}
};
let data_types: Vec<TSDataType> = resp
.data_type_list
.unwrap()
.iter()
.map(TSDataType::from)
.collect();
let mut column_index_map: HashMap<usize, usize> = HashMap::new();
let column_count = column_names.len();
for (index, name) in column_names.iter().enumerate() {
column_index_map
.insert(*column_name_index_map.get(name).unwrap() as usize, index);
}
let dataset = RpcDataSet {
session: self,
statement: statement.to_string(),
query_id: resp.query_id.unwrap(),
timestamp: -1,
is_ignore_time_stamp: resp.ignore_time_stamp,
query_data_set: resp.query_data_set.unwrap(),
column_names,
data_types,
bitmaps: vec![0_u8; column_count],
row_index: 0,
column_index_map,
column_name_index_map,
closed: false,
};
Ok(Box::new(dataset))
} else {
match check_status(status) {
Ok(_) => Err(format!("Unknow, code: {}", code).into()),
Err(err) => Err(err),
}
}
} else {
Err("Operation can't be performed, the session is closed.".into())
}
}
fn insert_record<T>(
&mut self,
device_id: &str,
measurements: Vec<&str>,
values: Vec<Value>,
timestamp: i64,
is_aligned: T,
) -> Result<()>
where
T: Into<Option<bool>>,
{
if let Some(session_id) = self.session_id {
let mut values_bytes: Vec<u8> = Vec::new();
values.iter().for_each(|v| {
let mut value_bytes: Vec<u8> = v.into();
values_bytes.append(&mut value_bytes);
});
let status = self.client.insert_record(TSInsertRecordReq::new(
session_id,
device_id.to_string(),
measurements.iter().map(ToString::to_string).collect(),
values_bytes,
timestamp,
is_aligned,
))?;
check_status(status)
} else {
fire_closed_error()
}
}
fn insert_records_of_one_device(
&mut self,
device_id: &str,
timestamps: Vec<i64>,
measurements: Vec<Vec<&str>>,
values: Vec<Vec<super::Value>>,
sorted: bool,
) -> Result<()> {
let mut sorted_timestamps = timestamps;
let mut sorted_measurements = measurements;
let mut sorted_values = values;
if !sorted {
let permutation = permutation::sort(&sorted_timestamps[..]);
sorted_timestamps = permutation.apply_slice(&sorted_timestamps[..]);
sorted_measurements = permutation.apply_slice(&sorted_measurements[..]);
sorted_values = permutation.apply_slice(&sorted_values[..]);
}
if let Some(session_id) = self.session_id {
let values_list = sorted_values
.iter()
.map(|vec| {
let mut values: Vec<u8> = Vec::new();
for value in vec.iter() {
let mut value_data: Vec<u8> = value.into();
values.append(&mut value_data);
}
values
})
.collect();
let status =
self.client
.insert_records_of_one_device(TSInsertRecordsOfOneDeviceReq::new(
session_id,
device_id.to_string(),
sorted_measurements
.iter()
.map(|vec| vec.iter().map(ToString::to_string).collect())
.collect(),
values_list,
sorted_timestamps,
false,
))?;
check_status(status)
} else {
fire_closed_error()
}
}
fn insert_records(
&mut self,
prefix_path: Vec<&str>,
measurements: Vec<Vec<&str>>,
values: Vec<Vec<super::Value>>,
timestamps: Vec<i64>,
) -> Result<()> {
if let Some(session_id) = self.session_id {
let values_list = values
.iter()
.map(|vec| {
let mut values: Vec<u8> = Vec::new();
for value in vec.iter() {
let mut value_data: Vec<u8> = value.into();
values.append(&mut value_data);
}
values
})
.collect();
let status = self.client.insert_records(TSInsertRecordsReq {
session_id,
prefix_paths: prefix_path.iter().map(ToString::to_string).collect(),
measurements_list: measurements
.iter()
.map(|ms| ms.iter().map(ToString::to_string).collect())
.collect(),
values_list,
timestamps,
is_aligned: None,
})?;
check_status(status)
} else {
fire_closed_error()
}
}
fn insert_tablet(&mut self, tablet: &super::Tablet) -> Result<()> {
if let Some(session_id) = self.session_id {
let mut timestamps_list: Vec<u8> = Vec::with_capacity(tablet.timestamps.len() * 8);
tablet
.timestamps
.iter()
.for_each(|ts| timestamps_list.append(&mut ts.to_be_bytes().to_vec()));
let status = self.client.insert_tablet(TSInsertTabletReq {
session_id,
prefix_path: tablet.get_prefix_path(),
measurements: tablet
.measurement_schemas
.iter()
.map(|f| f.measurement.to_string())
.collect(),
values: tablet.into(),
timestamps: timestamps_list,
types: tablet
.get_measurement_schemas()
.into_iter()
.map(|measurement_schema| measurement_schema.data_type.into())
.collect(),
size: tablet.get_row_count() as i32,
is_aligned: Some(false),
})?;
check_status(status)
} else {
fire_closed_error()
}
}
fn insert_tablets(&mut self, tablets: Vec<&super::Tablet>) -> Result<()> {
if let Some(session_id) = self.session_id {
let status = self.client.insert_tablets(TSInsertTabletsReq {
session_id,
prefix_paths: tablets.iter().map(|t| t.get_prefix_path()).collect(),
measurements_list: tablets
.iter()
.map(|tablet| {
tablet
.measurement_schemas
.iter()
.map(|f| f.measurement.to_string())
.collect()
})
.collect(),
values_list: tablets.iter().map(|tablet| Into::into(*tablet)).collect(),
timestamps_list: tablets
.iter()
.map(|tablet| {
let mut ts_item: Vec<u8> = Vec::new();
tablet
.timestamps
.iter()
.for_each(|ts| ts_item.append(&mut ts.to_be_bytes().to_vec()));
ts_item
})
.collect(),
types_list: tablets
.iter()
.map(|tablet| {
tablet
.get_measurement_schemas()
.into_iter()
.map(|f| {
let t: i32 = f.data_type.into();
t
})
.collect()
})
.collect(),
size_list: tablets
.iter()
.map(|tablet| tablet.get_row_count() as i32)
.collect(),
is_aligned: Some(false),
})?;
check_status(status)
} else {
fire_closed_error()
}
}
fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()> {
if let Some(session_id) = self.session_id {
let status =
self.client
.execute_batch_statement(super::rpc::TSExecuteBatchStatementReq {
session_id,
statements: statemens.iter().map(ToString::to_string).collect(),
})?;
check_status(status)
} else {
fire_closed_error()
}
}
fn execute_raw_data_query(
&'a mut self,
paths: Vec<&str>,
start_time: i64,
end_time: i64,
) -> Result<Box<dyn 'a + DataSet>> {
if let Some(session_id) = self.session_id {
let resp = self
.client
.execute_raw_data_query(super::rpc::TSRawDataQueryReq {
session_id,
paths: paths.iter().map(ToString::to_string).collect(),
fetch_size: Some(self.config.fetch_size),
start_time,
end_time,
statement_id: self.statement_id,
enable_redirect_query: None,
jdbc_query: None,
})?;
let status = resp.status;
let code = status.code;
if code == SUCCESS_STATUS {
if let (Some(query_data_set), Some(column_names), Some(data_types_list)) =
(resp.query_data_set, resp.columns, resp.data_type_list)
{
let column_name_index_map = match resp.column_name_index_map {
Some(v) => v,
None => {
let mut map: BTreeMap<String, i32> = BTreeMap::new();
for (index, name) in column_names.iter().enumerate() {
map.insert(name.to_string(), index as i32);
}
map
}
};
let data_types: Vec<TSDataType> =
data_types_list.iter().map(TSDataType::from).collect();
let mut column_index_map: HashMap<usize, usize> = HashMap::new();
let column_count = column_names.len();
for (index, name) in column_names.iter().enumerate() {
column_index_map
.insert(*column_name_index_map.get(name).unwrap() as usize, index);
}
Ok(Box::new(RpcDataSet {
session: self,
statement: "".to_string(),
query_id: resp.query_id.unwrap(),
timestamp: -1,
is_ignore_time_stamp: resp.ignore_time_stamp,
query_data_set,
column_names,
data_types,
bitmaps: vec![0_u8; column_count],
row_index: 0,
column_index_map,
column_name_index_map,
closed: false,
}))
} else {
Err("Did't get the result.".into())
}
} else {
match check_status(status) {
Ok(_) => Err(format!("Unknow, code: {}", code).into()),
Err(err) => Err(err),
}
}
} else {
Err("Operation can't be performed, the session is closed.".into())
}
}
fn execute_update_statement(
&'a mut self,
statement: &str,
) -> Result<Option<Box<dyn 'a + DataSet>>> {
if let Some(session_id) = self.session_id {
let resp = self
.client
.execute_update_statement(TSExecuteStatementReq {
session_id,
statement: statement.to_string(),
statement_id: self.statement_id,
fetch_size: Some(self.config.fetch_size),
timeout: self.config.timeout_ms,
enable_redirect_query: None,
jdbc_query: None,
})?;
let status = resp.status;
let code = status.code;
if code == SUCCESS_STATUS {
if let (Some(query_data_set), Some(column_names), Some(data_type_list)) =
(resp.query_data_set, resp.columns, resp.data_type_list)
{
let column_name_index_map = match resp.column_name_index_map {
Some(v) => v,
None => {
let mut map: BTreeMap<String, i32> = BTreeMap::new();
for (index, name) in column_names.iter().enumerate() {
map.insert(name.to_string(), index as i32);
}
map
}
};
let data_types: Vec<TSDataType> =
data_type_list.iter().map(TSDataType::from).collect();
let mut column_index_map: HashMap<usize, usize> = HashMap::new();
let column_count = column_names.len();
for (index, name) in column_names.iter().enumerate() {
column_index_map
.insert(*column_name_index_map.get(name).unwrap() as usize, index);
}
Ok(Some(Box::new(RpcDataSet {
session: self,
statement: statement.to_string(),
query_id: resp.query_id.unwrap(),
timestamp: -1,
is_ignore_time_stamp: resp.ignore_time_stamp,
query_data_set,
column_names,
data_types,
bitmaps: vec![0_u8; column_count],
row_index: 0,
column_index_map,
column_name_index_map,
closed: false,
})))
} else {
Ok(None)
}
} else {
match check_status(status) {
Ok(_) => Err(format!("Unknow, code: {}", code).into()),
Err(err) => Err(err),
}
}
} else {
Err("Operation can't be performed, the session is closed.".into())
}
}
}
impl Drop for RpcSession {
fn drop(&mut self) {
if let Some(session_id) = self.session_id {
self.close().unwrap_or_else(|err| {
eprint!("error closing the session {}, reason {}", session_id, err)
});
}
}
}