iotdb_client/client/
remote.rs

1//
2// Licensed to the Apache Software Foundation (ASF) under one
3// or more contributor license agreements.  See the NOTICE file
4// distributed with this work for additional information
5// regarding copyright ownership.  The ASF licenses this file
6// to you under the Apache License, Version 2.0 (the
7// "License"); you may not use this file except in compliance
8// with the License.  You may obtain a copy of the License at
9//
10//  http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing,
13// software distributed under the License is distributed on an
14// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15// KIND, either express or implied.  See the License for the
16// specific language governing permissions and limitations
17// under the License.
18//
19
20use std::collections::{BTreeMap, HashMap};
21use std::error::Error;
22use std::vec;
23use thrift::transport::TIoChannel;
24use crate::client::common::*;
25
26
27use thrift::{
28    protocol::{
29        TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, TCompactOutputProtocol,
30        TInputProtocol, TOutputProtocol,
31    },
32    transport::{TFramedReadTransport, TFramedWriteTransport, TTcpChannel},
33};
34use typed_builder::TypedBuilder;
35
36use crate::client::rpc::{
37    TSCreateMultiTimeseriesReq, TSCreateTimeseriesReq, IClientRPCServiceSyncClient, TSInsertRecordsReq,
38    TSInsertStringRecordReq, TSInsertTabletReq, TSOpenSessionReq, TSProtocolVersion,
39    TIClientRPCServiceSyncClient,
40};
41use crate::protocal::{
42    TSCompressionType, TSDataType, TSEncoding, FLAG, MULTIPLE_ERROR, NEED_REDIRECTION,
43    SUCCESS_STATUS,
44};
45
46use super::rpc::{
47    TSDeleteDataReq, TSExecuteStatementReq, TSInsertRecordReq, TSInsertRecordsOfOneDeviceReq,
48    TSInsertTabletsReq, TSQueryDataSet, TSSetTimeZoneReq,
49};
50use super::{
51    rpc::{TSCloseSessionReq},
52    RowRecord,
53};
54use super::{DataSet, Dictionary, Result, Session, Value};
55
56const DEFAULT_TIME_ZONE: &str = "Asia/Shanghai";
57
58impl From<TSStatus> for core::result::Result<(), Box<dyn Error>> {
59    fn from(status: TSStatus) -> Self {
60        match status.code {
61            SUCCESS_STATUS | NEED_REDIRECTION => Ok(()),
62            MULTIPLE_ERROR => {
63                let mut messges = String::new();
64                if let Some(sub_status) = status.sub_status {
65                    for s in sub_status {
66                        if s.code != SUCCESS_STATUS && s.code != NEED_REDIRECTION {
67                            if let Some(msg) = s.message {
68                                messges.push_str(format!("Code: {}, {}", s.code, msg).as_str());
69                                messges.push(';');
70                            }
71                        }
72                    }
73                }
74                if !messges.is_empty() {
75                    Err(messges.into())
76                } else {
77                    Ok(())
78                }
79            }
80            _ => {
81                if let Some(message) = status.message {
82                    Err(format!("code: {}, {}", status.code, message).into())
83                } else {
84                    Err(format!("code: {}", status.code).into())
85                }
86            }
87        }
88    }
89}
90
91#[derive(Debug, Clone, TypedBuilder)]
92#[builder(field_defaults(default, setter(into)))]
93
94pub struct Config {
95    #[builder(default = String::from("127.0.0.1"))]
96    pub host: String,
97    #[builder(default = 6667)]
98    pub port: i32,
99    #[builder(default = String::from("root"))]
100    pub username: String,
101    #[builder(default = String::from("root"))]
102    pub password: String,
103    #[builder(default = Some(3000))]
104    pub timeout_ms: Option<i64>,
105    #[builder(default = 1000)]
106    pub fetch_size: i32,
107    #[builder(default = Some(String::from(DEFAULT_TIME_ZONE)))]
108    pub timezone: Option<String>,
109    #[builder(default = false)]
110    pub enable_compression: bool,
111    #[builder(default = TSProtocolVersion::IotdbServiceProtocolV3)]
112    pub protocol_version: TSProtocolVersion,
113    #[builder(default = true)]
114    pub is_align: bool,
115}
116
117impl Default for Config {
118    fn default() -> Self {
119        Self {
120            host: String::from("127.0.0.1"),
121            port: 6667,
122            username: String::from("root"),
123            password: String::from("root"),
124            timeout_ms: Some(30000),
125            fetch_size: 1000,
126            timezone: Some(String::from(DEFAULT_TIME_ZONE)),
127            enable_compression: false,
128            protocol_version: TSProtocolVersion::IotdbServiceProtocolV3,
129            is_align: true,
130        }
131    }
132}
133
134pub struct RpcSession {
135    config: Config,
136    session_id: Option<i64>,
137    statement_id: i64,
138    client: IClientRPCServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>,
139}
140
141impl RpcSession {
142    pub fn new(config: Config) -> Result<Self> {
143        let mut tcp_channel = TTcpChannel::new();
144        let endpint = format!("{}:{}", config.host, config.port);
145
146        tcp_channel.open(&endpint).map_err(|err| {
147            Box::<dyn Error>::from(format!("failed to connect to {}, {:?}", endpint, err))
148        })?;
149
150        let (i_chan, o_chan) = tcp_channel.split()?;
151
152        let (i_prot, o_prot) = (
153            TFramedReadTransport::new(i_chan),
154            TFramedWriteTransport::new(o_chan),
155        );
156
157        let (input_protocol, output_protocol): (Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>) =
158            if config.enable_compression {
159                (
160                    Box::new(TCompactInputProtocol::new(i_prot)),
161                    Box::new(TCompactOutputProtocol::new(o_prot)),
162                )
163            } else {
164                (
165                    Box::new(TBinaryInputProtocol::new(i_prot, true)),
166                    Box::new(TBinaryOutputProtocol::new(o_prot, true)),
167                )
168            };
169
170        Ok(Self {
171            config: config,
172            session_id: None,
173            statement_id: -1,
174            client: IClientRPCServiceSyncClient::new(input_protocol, output_protocol),
175        })
176    }
177}
178
179impl<'a> Iterator for RpcDataSet<'a> {
180    type Item = RowRecord;
181
182    fn next(&mut self) -> Option<Self::Item> {
183        if self.has_cached_results() {
184            let mut values: Vec<Value> = Vec::with_capacity(self.column_names.len());
185
186            let ts = self.query_data_set.time.drain(0..8).collect::<Vec<u8>>();
187            self.timestamp = i64::from_be_bytes(ts.try_into().unwrap());
188
189            for column_index in 0..self.query_data_set.value_list.len() {
190                if self.row_index % 8 == 0 {
191                    self.bitmaps[column_index] =
192                        self.query_data_set.bitmap_list[column_index].remove(0);
193                }
194
195                let null = self.is_null(column_index, self.row_index);
196
197                let column_data = self
198                    .query_data_set
199                    .value_list
200                    .get_mut(column_index)
201                    .unwrap();
202                if !null {
203                    let original_column_index = self.column_index_map.get(&column_index).unwrap();
204                    let data_type = self.data_types.get(*original_column_index).unwrap();
205
206                    let mut bytes: Vec<u8> = Vec::new();
207                    match data_type {
208                        TSDataType::Boolean => {
209                            bytes.push(TSDataType::Boolean as u8);
210                            bytes.push(column_data.remove(0));
211                        }
212                        TSDataType::Int32 => {
213                            bytes.push(TSDataType::Int32 as u8);
214                            bytes.extend(column_data.drain(0..4));
215                        }
216                        TSDataType::Int64 => {
217                            bytes.push(TSDataType::Int64 as u8);
218                            bytes.extend(column_data.drain(0..8));
219                        }
220                        TSDataType::Float => {
221                            bytes.push(TSDataType::Float as u8);
222                            bytes.extend(column_data.drain(0..4));
223                        }
224                        TSDataType::Double => {
225                            bytes.push(TSDataType::Double as u8);
226                            bytes.extend(column_data.drain(0..8));
227                        }
228                        TSDataType::Text => {
229                            bytes.push(TSDataType::Text as u8);
230                            let len = i32::from_be_bytes(
231                                column_data
232                                    .drain(0..4)
233                                    .collect::<Vec<u8>>()
234                                    .try_into()
235                                    .unwrap(),
236                            );
237                            bytes.extend(column_data.drain(0..len as usize).collect::<Vec<_>>());
238                        }
239                    }
240                    values.push(Value::from(bytes));
241                } else {
242                    values.push(Value::Null);
243                }
244            }
245
246            self.row_index += 1;
247
248            let mut output_values: Vec<Value> = Vec::with_capacity(if self.is_ignore_timestamp() {
249                self.get_column_names().len()
250            } else {
251                self.get_column_names().len() + 1
252            });
253            if !self.is_ignore_timestamp() {
254                output_values.push(Value::Int64(self.timestamp));
255            }
256
257            output_values.extend(self.column_names.iter().map(|column_name| {
258                values[*(self.column_name_index_map.get(column_name).unwrap()) as usize].clone()
259            }));
260            Some(RowRecord {
261                timestamp: self.timestamp,
262                values: output_values,
263            })
264        } else {
265            None
266        }
267    }
268}
269
270pub(crate) struct RpcDataSet<'a> {
271    session: &'a mut RpcSession,
272    statement: String,
273    query_id: i64,
274    is_ignore_time_stamp: Option<bool>,
275    timestamp: i64,
276    column_names: Vec<String>,
277    data_types: Vec<TSDataType>,
278    query_data_set: TSQueryDataSet,
279    column_index_map: HashMap<usize, usize>,
280    column_name_index_map: BTreeMap<String, i32>,
281    bitmaps: Vec<u8>,
282    row_index: usize,
283    closed: bool,
284}
285
286impl<'a> RpcDataSet<'a> {
287    fn is_null(&self, column_index: usize, row_index: usize) -> bool {
288        let bitmap = self.bitmaps[column_index];
289        let shift = row_index % 8;
290        ((FLAG >> shift) & (bitmap & 0xff)) == 0
291    }
292
293    fn has_cached_results(&mut self) -> bool {
294        if self.closed {
295            return false;
296        }
297        if self.query_data_set.time.is_empty() {
298            if let Some(session_id) = self.session.session_id {
299                //Fetching result from iotdb server
300                match self
301                    .session
302                    .client
303                    .fetch_results(super::rpc::TSFetchResultsReq {
304                        session_id,
305                        statement: self.statement.clone(),
306                        fetch_size: self.session.config.fetch_size,
307                        query_id: self.query_id,
308                        is_align: self.session.config.is_align,
309                        timeout: self.session.config.timeout_ms,
310                    }) {
311                    Ok(resp) => {
312                        let status = resp.status;
313                        let res: Result<()> = status.into();
314
315                        match res {
316                            Ok(_) => {
317                                if resp.has_result_set {
318                                    //update query_data_set and release row_index
319                                    if let Some(query_data_set) = resp.query_data_set {
320                                        self.query_data_set = query_data_set;
321                                        self.row_index = 0;
322                                    }
323                                } else {
324                                    //Auto close the dataset when it doesn't have any results on the server.
325                                    self.close();
326                                    return false;
327                                }
328                            }
329                            Err(err) => {
330                                eprint!("An error occurred when fetch result: {}", err);
331                                return false;
332                            }
333                        }
334                    }
335                    Err(err) => {
336                        eprint!("An error occurred when fetch result: {}", err);
337                        return false;
338                    }
339                }
340            }
341        }
342        !self.query_data_set.time.is_empty()
343    }
344
345    pub fn close(&mut self) {
346        if !self.closed {
347            if let Some(session_id) = self.session.session_id {
348                match self
349                    .session
350                    .client
351                    .close_operation(super::rpc::TSCloseOperationReq {
352                        session_id,
353                        query_id: Some(self.query_id),
354                        statement_id: Some(self.session.statement_id),
355                    }) {
356                    Ok(status) => {
357                        let res: Result<()> = status.into();
358                        match res {
359                            Ok(_) => {
360                                self.closed = true;
361                            }
362                            Err(err) => {
363                                eprint!("An error occurred when closing dataset {:?}", err)
364                            }
365                        }
366                    }
367                    Err(err) => {
368                        eprint!("An error occurred when closing dataset {:?}", err)
369                    }
370                }
371            }
372        }
373    }
374}
375
376impl<'a> Drop for RpcDataSet<'a> {
377    fn drop(&mut self) {
378        self.close();
379    }
380}
381
382impl<'a> DataSet for RpcDataSet<'a> {
383    fn get_column_names(&self) -> Vec<String> {
384        if self.is_ignore_timestamp() {
385            self.column_names.clone()
386        } else {
387            //Include the time column
388            let mut column_names = vec![String::from("Time")];
389            column_names.extend(self.column_names.clone());
390            column_names
391        }
392    }
393
394    fn get_data_types(&self) -> Vec<TSDataType> {
395        if self.is_ignore_timestamp() {
396            self.data_types.clone()
397        } else {
398            //Include the time column
399            let mut column_types = vec![TSDataType::Int64];
400            column_types.extend(self.data_types.clone());
401            column_types
402        }
403    }
404
405    fn is_ignore_timestamp(&self) -> bool {
406        if let Some(v) = self.is_ignore_time_stamp {
407            v
408        } else {
409            false
410        }
411    }
412}
413
414fn fire_closed_error() -> Result<()> {
415    Err("Operation can't be performed, the session is closed.".into())
416}
417
418impl<'a> Session<'a> for RpcSession {
419    fn open(&mut self) -> Result<()> {
420        let resp = self.client.open_session(TSOpenSessionReq::new(
421            self.config.protocol_version,
422            self.config
423                .timezone
424                .clone()
425                .unwrap_or_else(|| DEFAULT_TIME_ZONE.to_string()),
426            self.config.username.clone(),
427            self.config.password.clone(),
428            None,
429        ))?;
430        let res: Result<()> = resp.status.into();
431        res?;
432        self.session_id = resp.session_id;
433
434        // request statement id
435        let result = self.client.request_statement_id(resp.session_id.unwrap());
436        match result {
437            Ok(stId) => {
438                self.statement_id = stId;
439            }
440            Err(_) => {}
441        }
442
443        Ok(())
444    }
445
446    fn close(&mut self) -> Result<()> {
447        if let Some(session_id) = self.session_id {
448            let status = self
449                .client
450                .close_session(TSCloseSessionReq::new(session_id))?;
451            self.session_id = None;
452            status.into()
453        } else {
454            fire_closed_error()
455        }
456    }
457
458    fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
459        if let Some(session_id) = self.session_id {
460            let status = self
461                .client
462                .set_storage_group(session_id, storage_group_id.into())?;
463            status.into()
464        } else {
465            fire_closed_error()
466        }
467    }
468
469    fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
470        self.delete_storage_groups(vec![storage_group_id])
471    }
472
473    fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()> {
474        if let Some(session_id) = self.session_id {
475            let status = self.client.delete_storage_groups(
476                session_id,
477                storage_group_ids.iter().map(ToString::to_string).collect(),
478            )?;
479            status.into()
480        } else {
481            fire_closed_error()
482        }
483    }
484
485    fn create_timeseries<T>(
486        &mut self,
487        path: &str,
488        data_type: crate::protocal::TSDataType,
489        encoding: crate::protocal::TSEncoding,
490        compressor: crate::protocal::TSCompressionType,
491        props: T,
492        attributes: T,
493        tags: T,
494        measurement_alias: Option<String>,
495    ) -> Result<()>
496    where
497        T: Into<Option<Dictionary>>,
498    {
499        if let Some(session_id) = self.session_id {
500            self.client
501                .create_timeseries(TSCreateTimeseriesReq::new(
502                    session_id,
503                    path.to_string(),
504                    data_type.into(),
505                    encoding.into(),
506                    compressor.into(),
507                    props,
508                    tags,
509                    attributes,
510                    measurement_alias,
511                ))?
512                .into()
513        } else {
514            fire_closed_error()
515        }
516    }
517
518    fn create_multi_timeseries<T>(
519        &mut self,
520        paths: Vec<&str>,
521        data_types: Vec<crate::protocal::TSDataType>,
522        encodings: Vec<crate::protocal::TSEncoding>,
523        compressors: Vec<crate::protocal::TSCompressionType>,
524        props_list: T,
525        attributes_list: T,
526        tags_list: T,
527        measurement_alias_list: Option<Vec<String>>,
528    ) -> Result<()>
529    where
530        T: Into<Option<Vec<Dictionary>>>,
531    {
532        if let Some(session_id) = self.session_id {
533            let status = self
534                .client
535                .create_multi_timeseries(TSCreateMultiTimeseriesReq::new(
536                    session_id,
537                    paths.iter().map(ToString::to_string).collect(),
538                    data_types.into_iter().map(TSDataType::into).collect(),
539                    encodings.into_iter().map(TSEncoding::into).collect(),
540                    compressors
541                        .into_iter()
542                        .map(TSCompressionType::into)
543                        .collect(),
544                    props_list,
545                    attributes_list,
546                    tags_list,
547                    measurement_alias_list,
548                ))?;
549            status.into()
550        } else {
551            fire_closed_error()
552        }
553    }
554
555    fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()> {
556        if let Some(session_id) = self.session_id {
557            let status = self
558                .client
559                .delete_timeseries(session_id, paths.iter().map(ToString::to_string).collect())?;
560            status.into()
561        } else {
562            fire_closed_error()
563        }
564    }
565
566    fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()> {
567        if let Some(session_id) = self.session_id {
568            let status = self.client.delete_data(TSDeleteDataReq::new(
569                session_id,
570                paths.into_iter().map(ToString::to_string).collect(),
571                start_time,
572                end_time,
573            ))?;
574            status.into()
575        } else {
576            fire_closed_error()
577        }
578    }
579
580    fn insert_string_record<T>(
581        &mut self,
582        device_id: &str,
583        measurements: Vec<&str>,
584        values: Vec<&str>,
585        timestamp: i64,
586        is_aligned: T,
587        timeout: i64
588    ) -> Result<()>
589    where
590        T: Into<Option<bool>>,
591    {
592        if let Some(session_id) = self.session_id {
593            let status = self
594                .client
595                .insert_string_record(TSInsertStringRecordReq::new(
596                    session_id,
597                    device_id.to_string(),
598                    measurements.iter().map(ToString::to_string).collect(),
599                    values.iter().map(ToString::to_string).collect(),
600                    timestamp,
601                    is_aligned,
602                    timeout
603                ))?;
604            status.into()
605        } else {
606            fire_closed_error()
607        }
608    }
609
610    fn get_time_zone(&mut self) -> Result<String> {
611        if let Some(session_id) = self.session_id {
612            let resp = self.client.get_time_zone(session_id)?;
613            let res: Result<()> = resp.status.into();
614            res?;
615            Ok(resp.time_zone)
616        } else {
617            Err("Operation can't be performed, the session is closed.".into())
618        }
619    }
620
621    fn set_time_zone(&mut self, time_zone: &str) -> Result<()> {
622        if let Some(session_id) = self.session_id {
623            self.client
624                .set_time_zone(TSSetTimeZoneReq::new(session_id, time_zone.to_string()))?
625                .into()
626        } else {
627            fire_closed_error()
628        }
629    }
630
631    fn execute_statement<T>(
632        &'a mut self,
633        statement: &str,
634        timeout_ms: T,
635    ) -> Result<Box<dyn 'a + DataSet>>
636    where
637        T: Into<Option<i64>>,
638    {
639        if let Some(session_id) = self.session_id {
640            let resp = self.client.execute_statement(TSExecuteStatementReq {
641                session_id,
642                statement: statement.to_string(),
643                statement_id: self.statement_id,
644                fetch_size: Some(self.config.fetch_size),
645                timeout: timeout_ms.into(),
646                enable_redirect_query: None,
647                jdbc_query: None,
648            })?;
649            let status = resp.status;
650            let code = status.code;
651            if code == SUCCESS_STATUS {
652                {
653                    if let (Some(column_names), Some(data_type_list)) =
654                        (resp.columns, resp.data_type_list)
655                    {
656                        let column_name_index_map = match resp.column_name_index_map {
657                            Some(map) => map,
658                            None => {
659                                let mut map: BTreeMap<String, i32> = BTreeMap::new();
660                                for (index, name) in column_names.iter().enumerate() {
661                                    map.insert(name.to_string(), index as i32);
662                                }
663                                map
664                            }
665                        };
666
667                        let data_types: Vec<TSDataType> =
668                            data_type_list.iter().map(TSDataType::from).collect();
669
670                        let mut column_index_map: HashMap<usize, usize> = HashMap::new();
671
672                        let column_count = column_names.len();
673                        for (index, name) in column_names.iter().enumerate() {
674                            column_index_map
675                                .insert(*column_name_index_map.get(name).unwrap() as usize, index);
676                        }
677
678                        Ok(Box::new(RpcDataSet {
679                            session: self,
680                            statement: statement.to_string(),
681                            query_id: resp.query_id.unwrap(),
682                            timestamp: -1,
683                            is_ignore_time_stamp: resp.ignore_time_stamp,
684                            query_data_set: resp.query_data_set.unwrap(),
685                            column_names,
686                            data_types,
687                            bitmaps: vec![0_u8; column_count],
688                            row_index: 0,
689                            column_index_map,
690                            column_name_index_map,
691                            closed: false,
692                        }))
693                    } else {
694                        Err("Can't get resources on execute_statement".into())
695                    }
696                }
697            } else {
698                let res: Result<()> = status.into();
699                res?;
700                Err(format!("Unknow, code: {}", code).into())
701            }
702        } else {
703            Err("Operation can't be performed, the session is closed.".into())
704        }
705    }
706
707    fn execute_query_statement<T>(
708        &'a mut self,
709        statement: &str,
710        timeout_ms: T,
711    ) -> Result<Box<dyn 'a + DataSet>>
712    where
713        T: Into<Option<i64>>,
714    {
715        if let Some(session_id) = self.session_id {
716            let resp = self.client.execute_query_statement(TSExecuteStatementReq {
717                session_id,
718                statement: statement.to_string(),
719                statement_id: self.statement_id,
720                fetch_size: Some(self.config.fetch_size),
721                timeout: timeout_ms.into(),
722                enable_redirect_query: None,
723                jdbc_query: None,
724            })?;
725            let status = resp.status;
726            let code = status.code;
727            if code == SUCCESS_STATUS {
728                let column_names: Vec<String> = resp.columns.unwrap();
729
730                let column_name_index_map = match resp.column_name_index_map {
731                    Some(v) => v,
732                    None => {
733                        let mut map: BTreeMap<String, i32> = BTreeMap::new();
734                        for (index, name) in column_names.iter().enumerate() {
735                            map.insert(name.to_string(), index as i32);
736                        }
737                        map
738                    }
739                };
740
741                let data_types: Vec<TSDataType> = resp
742                    .data_type_list
743                    .unwrap()
744                    .iter()
745                    .map(TSDataType::from)
746                    .collect();
747
748                let mut column_index_map: HashMap<usize, usize> = HashMap::new();
749
750                let column_count = column_names.len();
751                for (index, name) in column_names.iter().enumerate() {
752                    column_index_map
753                        .insert(*column_name_index_map.get(name).unwrap() as usize, index);
754                }
755                let dataset = RpcDataSet {
756                    session: self,
757                    statement: statement.to_string(),
758                    query_id: resp.query_id.unwrap(),
759                    timestamp: -1,
760                    is_ignore_time_stamp: resp.ignore_time_stamp,
761                    query_data_set: resp.query_data_set.unwrap(),
762                    column_names,
763                    data_types,
764                    bitmaps: vec![0_u8; column_count],
765                    row_index: 0,
766                    column_index_map,
767                    column_name_index_map,
768                    closed: false,
769                };
770                Ok(Box::new(dataset))
771            } else {
772                let res: Result<()> = status.into();
773                res?;
774                Err(format!("Unknow, code: {}", code).into())
775            }
776        } else {
777            Err("Operation can't be performed, the session is closed.".into())
778        }
779    }
780
781    fn insert_record<T>(
782        &mut self,
783        device_id: &str,
784        measurements: Vec<&str>,
785        values: Vec<Value>,
786        timestamp: i64,
787        is_aligned: T,
788    ) -> Result<()>
789    where
790        T: Into<Option<bool>>,
791    {
792        if let Some(session_id) = self.session_id {
793            let mut values_bytes: Vec<u8> = Vec::new();
794            values.iter().for_each(|v| {
795                let mut value_bytes: Vec<u8> = v.into();
796                values_bytes.append(&mut value_bytes);
797            });
798            let status = self.client.insert_record(TSInsertRecordReq::new(
799                session_id,
800                device_id.to_string(),
801                measurements.iter().map(ToString::to_string).collect(),
802                values_bytes,
803                timestamp,
804                is_aligned,
805            ))?;
806            status.into()
807        } else {
808            fire_closed_error()
809        }
810    }
811
812    fn insert_records_of_one_device(
813        &mut self,
814        device_id: &str,
815        timestamps: Vec<i64>,
816        measurements: Vec<Vec<&str>>,
817        values: Vec<Vec<super::Value>>,
818        sorted: bool,
819    ) -> Result<()> {
820        let mut sorted_timestamps = timestamps;
821        let mut sorted_measurements = measurements;
822        let mut sorted_values = values;
823
824        if !sorted {
825            let permutation = permutation::sort(&sorted_timestamps[..]);
826            sorted_timestamps = permutation.apply_slice(&sorted_timestamps[..]);
827            sorted_measurements = permutation.apply_slice(&sorted_measurements[..]);
828            sorted_values = permutation.apply_slice(&sorted_values[..]);
829        }
830
831        if let Some(session_id) = self.session_id {
832            let values_list = sorted_values
833                .iter()
834                .map(|vec| {
835                    let mut values: Vec<u8> = Vec::new();
836                    for value in vec.iter() {
837                        let mut value_data: Vec<u8> = value.into();
838                        values.append(&mut value_data);
839                    }
840                    values
841                })
842                .collect();
843            let status =
844                self.client
845                    .insert_records_of_one_device(TSInsertRecordsOfOneDeviceReq::new(
846                        session_id,
847                        device_id.to_string(),
848                        sorted_measurements
849                            .iter()
850                            .map(|vec| vec.iter().map(ToString::to_string).collect())
851                            .collect(),
852                        values_list,
853                        sorted_timestamps,
854                        false,
855                    ))?;
856            status.into()
857        } else {
858            fire_closed_error()
859        }
860    }
861
862    fn insert_records(
863        &mut self,
864        prefix_path: Vec<&str>,
865        measurements: Vec<Vec<&str>>,
866        values: Vec<Vec<super::Value>>,
867        timestamps: Vec<i64>,
868    ) -> Result<()> {
869        if let Some(session_id) = self.session_id {
870            let values_list = values
871                .iter()
872                .map(|vec| {
873                    let mut values: Vec<u8> = Vec::new();
874                    for value in vec.iter() {
875                        let mut value_data: Vec<u8> = value.into();
876                        values.append(&mut value_data);
877                    }
878                    values
879                })
880                .collect();
881            let status = self.client.insert_records(TSInsertRecordsReq {
882                session_id,
883                prefix_paths: prefix_path.iter().map(ToString::to_string).collect(),
884                measurements_list: measurements
885                    .iter()
886                    .map(|ms| ms.iter().map(ToString::to_string).collect())
887                    .collect(),
888                values_list,
889                timestamps,
890                is_aligned: None,
891            })?;
892            status.into()
893        } else {
894            fire_closed_error()
895        }
896    }
897
898    fn insert_tablet(&mut self, tablet: &super::Tablet) -> Result<()> {
899        if let Some(session_id) = self.session_id {
900            let mut timestamps_list: Vec<u8> = Vec::with_capacity(tablet.timestamps.len() * 8);
901            tablet
902                .timestamps
903                .iter()
904                .for_each(|ts| timestamps_list.append(&mut ts.to_be_bytes().to_vec()));
905
906            let status = self.client.insert_tablet(TSInsertTabletReq {
907                session_id,
908                prefix_path: tablet.get_prefix_path(),
909                measurements: tablet
910                    .measurement_schemas
911                    .iter()
912                    .map(|f| f.measurement.to_string())
913                    .collect(),
914                values: tablet.into(),
915                timestamps: timestamps_list,
916                types: tablet
917                    .get_measurement_schemas()
918                    .into_iter()
919                    .map(|measurement_schema| measurement_schema.data_type.into())
920                    .collect(),
921                size: tablet.get_row_count() as i32,
922                is_aligned: Some(false),
923            })?;
924            status.into()
925        } else {
926            fire_closed_error()
927        }
928    }
929
930    fn insert_tablets(&mut self, tablets: Vec<&super::Tablet>) -> Result<()> {
931        if let Some(session_id) = self.session_id {
932            let status = self.client.insert_tablets(TSInsertTabletsReq {
933                session_id,
934                prefix_paths: tablets.iter().map(|t| t.get_prefix_path()).collect(),
935                measurements_list: tablets
936                    .iter()
937                    .map(|tablet| {
938                        tablet
939                            .measurement_schemas
940                            .iter()
941                            .map(|f| f.measurement.to_string())
942                            .collect()
943                    })
944                    .collect(),
945                values_list: tablets.iter().map(|tablet| Into::into(*tablet)).collect(),
946                timestamps_list: tablets
947                    .iter()
948                    .map(|tablet| {
949                        let mut ts_item: Vec<u8> = Vec::new();
950                        tablet
951                            .timestamps
952                            .iter()
953                            .for_each(|ts| ts_item.append(&mut ts.to_be_bytes().to_vec()));
954                        ts_item
955                    })
956                    .collect(),
957                types_list: tablets
958                    .iter()
959                    .map(|tablet| {
960                        tablet
961                            .get_measurement_schemas()
962                            .into_iter()
963                            .map(|f| {
964                                let t: i32 = f.data_type.into();
965                                t
966                            })
967                            .collect()
968                    })
969                    .collect(),
970                size_list: tablets
971                    .iter()
972                    .map(|tablet| tablet.get_row_count() as i32)
973                    .collect(),
974                is_aligned: Some(false),
975            })?;
976            status.into()
977        } else {
978            fire_closed_error()
979        }
980    }
981
982    fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()> {
983        if let Some(session_id) = self.session_id {
984            let status =
985                self.client
986                    .execute_batch_statement(super::rpc::TSExecuteBatchStatementReq {
987                        session_id,
988                        statements: statemens.iter().map(ToString::to_string).collect(),
989                    })?;
990            status.into()
991        } else {
992            fire_closed_error()
993        }
994    }
995
996    fn execute_raw_data_query(
997        &'a mut self,
998        paths: Vec<&str>,
999        start_time: i64,
1000        end_time: i64,
1001        timeout_ms: i64
1002    ) -> Result<Box<dyn 'a + DataSet>> {
1003        if let Some(session_id) = self.session_id {
1004            let resp = self
1005                .client
1006                .execute_raw_data_query(super::rpc::TSRawDataQueryReq {
1007                    session_id,
1008                    paths: paths.iter().map(ToString::to_string).collect(),
1009                    fetch_size: Some(self.config.fetch_size),
1010                    start_time,
1011                    end_time,
1012                    statement_id: self.statement_id,
1013                    enable_redirect_query: None,
1014                    jdbc_query: None,
1015                    timeout: Option::from(timeout_ms),
1016                    legal_path_nodes: Option::from(false)
1017                })?;
1018            let status = resp.status;
1019            let code = status.code;
1020            if code == SUCCESS_STATUS {
1021                if let (Some(query_data_set), Some(column_names), Some(data_types_list)) =
1022                    (resp.query_data_set, resp.columns, resp.data_type_list)
1023                {
1024                    let column_name_index_map = match resp.column_name_index_map {
1025                        Some(v) => v,
1026                        None => {
1027                            let mut map: BTreeMap<String, i32> = BTreeMap::new();
1028                            for (index, name) in column_names.iter().enumerate() {
1029                                map.insert(name.to_string(), index as i32);
1030                            }
1031                            map
1032                        }
1033                    };
1034
1035                    let data_types: Vec<TSDataType> =
1036                        data_types_list.iter().map(TSDataType::from).collect();
1037
1038                    let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1039
1040                    let column_count = column_names.len();
1041                    for (index, name) in column_names.iter().enumerate() {
1042                        column_index_map
1043                            .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1044                    }
1045
1046                    Ok(Box::new(RpcDataSet {
1047                        session: self,
1048                        statement: "".to_string(),
1049                        query_id: resp.query_id.unwrap(),
1050                        timestamp: -1,
1051                        is_ignore_time_stamp: resp.ignore_time_stamp,
1052                        query_data_set,
1053                        column_names,
1054                        data_types,
1055                        bitmaps: vec![0_u8; column_count],
1056                        row_index: 0,
1057                        column_index_map,
1058                        column_name_index_map,
1059                        closed: false,
1060                    }))
1061                } else {
1062                    Err("Did't get the result.".into())
1063                }
1064            } else {
1065                let res: Result<()> = status.into();
1066                res?;
1067                Err(format!("Unknow, code: {}", code).into())
1068            }
1069        } else {
1070            Err("Operation can't be performed, the session is closed.".into())
1071        }
1072    }
1073
1074    fn execute_update_statement(
1075        &'a mut self,
1076        statement: &str,
1077    ) -> Result<Option<Box<dyn 'a + DataSet>>> {
1078        if let Some(session_id) = self.session_id {
1079            let resp = self
1080                .client
1081                .execute_update_statement(TSExecuteStatementReq {
1082                    session_id,
1083                    statement: statement.to_string(),
1084                    statement_id: self.statement_id,
1085                    fetch_size: Some(self.config.fetch_size),
1086                    timeout: self.config.timeout_ms,
1087                    enable_redirect_query: None,
1088                    jdbc_query: None,
1089                })?;
1090            let status = resp.status;
1091            let code = status.code;
1092            if code == SUCCESS_STATUS {
1093                if let (Some(query_data_set), Some(column_names), Some(data_type_list)) =
1094                    (resp.query_data_set, resp.columns, resp.data_type_list)
1095                {
1096                    let column_name_index_map = match resp.column_name_index_map {
1097                        Some(v) => v,
1098                        None => {
1099                            let mut map: BTreeMap<String, i32> = BTreeMap::new();
1100                            for (index, name) in column_names.iter().enumerate() {
1101                                map.insert(name.to_string(), index as i32);
1102                            }
1103                            map
1104                        }
1105                    };
1106
1107                    let data_types: Vec<TSDataType> =
1108                        data_type_list.iter().map(TSDataType::from).collect();
1109
1110                    let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1111
1112                    let column_count = column_names.len();
1113                    for (index, name) in column_names.iter().enumerate() {
1114                        column_index_map
1115                            .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1116                    }
1117
1118                    Ok(Some(Box::new(RpcDataSet {
1119                        session: self,
1120                        statement: statement.to_string(),
1121                        query_id: resp.query_id.unwrap(),
1122                        timestamp: -1,
1123                        is_ignore_time_stamp: resp.ignore_time_stamp,
1124                        query_data_set,
1125                        column_names,
1126                        data_types,
1127                        bitmaps: vec![0_u8; column_count],
1128                        row_index: 0,
1129                        column_index_map,
1130                        column_name_index_map,
1131                        closed: false,
1132                    })))
1133                } else {
1134                    Ok(None)
1135                }
1136            } else {
1137                let res: Result<()> = status.into();
1138                res?;
1139                Err(format!("Unknow, code: {}", code).into())
1140            }
1141        } else {
1142            Err("Operation can't be performed, the session is closed.".into())
1143        }
1144    }
1145}
1146
1147impl Drop for RpcSession {
1148    fn drop(&mut self) {
1149        if let Some(session_id) = self.session_id {
1150            self.close().unwrap_or_else(|err| {
1151                eprint!("error closing the session {}, reason {}", session_id, err)
1152            });
1153        }
1154    }
1155}