iotdb/client/
remote.rs

1//
2// Licensed to the Apache Software Foundation (ASF) under one
3// or more contributor license agreements.  See the NOTICE file
4// distributed with this work for additional information
5// regarding copyright ownership.  The ASF licenses this file
6// to you under the Apache License, Version 2.0 (the
7// "License"); you may not use this file except in compliance
8// with the License.  You may obtain a copy of the License at
9//
10//  http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing,
13// software distributed under the License is distributed on an
14// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15// KIND, either express or implied.  See the License for the
16// specific language governing permissions and limitations
17// under the License.
18//
19
20use std::collections::{BTreeMap, HashMap};
21use std::error::Error;
22use std::vec;
23use thrift::transport::TIoChannel;
24
25use thrift::{
26    protocol::{
27        TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, TCompactOutputProtocol,
28        TInputProtocol, TOutputProtocol,
29    },
30    transport::{TFramedReadTransport, TFramedWriteTransport, TTcpChannel},
31};
32use typed_builder::TypedBuilder;
33
34use crate::client::rpc::{
35    TSCreateMultiTimeseriesReq, TSCreateTimeseriesReq, TSIServiceSyncClient, TSInsertRecordsReq,
36    TSInsertStringRecordReq, TSInsertTabletReq, TSOpenSessionReq, TSProtocolVersion,
37    TTSIServiceSyncClient,
38};
39use crate::protocal::{
40    TSCompressionType, TSDataType, TSEncoding, FLAG, MULTIPLE_ERROR, NEED_REDIRECTION,
41    SUCCESS_STATUS,
42};
43
44use super::rpc::{
45    TSDeleteDataReq, TSExecuteStatementReq, TSInsertRecordReq, TSInsertRecordsOfOneDeviceReq,
46    TSInsertTabletsReq, TSQueryDataSet, TSSetTimeZoneReq,
47};
48use super::{
49    rpc::{TSCloseSessionReq, TSStatus},
50    RowRecord,
51};
52use super::{DataSet, Dictionary, Result, Session, Value};
53
54const DEFAULT_TIME_ZONE: &str = "Asia/Shanghai";
55
56impl From<TSStatus> for core::result::Result<(), Box<dyn Error>> {
57    fn from(status: TSStatus) -> Self {
58        match status.code {
59            SUCCESS_STATUS | NEED_REDIRECTION => Ok(()),
60            MULTIPLE_ERROR => {
61                let mut messges = String::new();
62                if let Some(sub_status) = status.sub_status {
63                    for s in sub_status {
64                        if s.code != SUCCESS_STATUS && s.code != NEED_REDIRECTION {
65                            if let Some(msg) = s.message {
66                                messges.push_str(format!("Code: {}, {}", s.code, msg).as_str());
67                                messges.push(';');
68                            }
69                        }
70                    }
71                }
72                if !messges.is_empty() {
73                    Err(messges.into())
74                } else {
75                    Ok(())
76                }
77            }
78            _ => {
79                if let Some(message) = status.message {
80                    Err(format!("code: {}, {}", status.code, message).into())
81                } else {
82                    Err(format!("code: {}", status.code).into())
83                }
84            }
85        }
86    }
87}
88
89#[derive(Debug, Clone, TypedBuilder)]
90#[builder(field_defaults(default, setter(into)))]
91
92pub struct Config {
93    #[builder(default = String::from("127.0.0.1"))]
94    pub host: String,
95    #[builder(default = 6667)]
96    pub port: i32,
97    #[builder(default = String::from("root"))]
98    pub username: String,
99    #[builder(default = String::from("root"))]
100    pub password: String,
101    #[builder(default = Some(3000))]
102    pub timeout_ms: Option<i64>,
103    #[builder(default = 1000)]
104    pub fetch_size: i32,
105    #[builder(default = Some(String::from(DEFAULT_TIME_ZONE)))]
106    pub timezone: Option<String>,
107    #[builder(default = false)]
108    pub enable_compression: bool,
109    #[builder(default = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3)]
110    pub protocol_version: TSProtocolVersion,
111    #[builder(default = true)]
112    pub is_align: bool,
113}
114
115impl Default for Config {
116    fn default() -> Self {
117        Self {
118            host: String::from("127.0.0.1"),
119            port: 6667,
120            username: String::from("root"),
121            password: String::from("root"),
122            timeout_ms: Some(30000),
123            fetch_size: 1000,
124            timezone: Some(String::from(DEFAULT_TIME_ZONE)),
125            enable_compression: false,
126            protocol_version: TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3,
127            is_align: true,
128        }
129    }
130}
131
132pub struct RpcSession {
133    config: Config,
134    session_id: Option<i64>,
135    statement_id: i64,
136    client: TSIServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>,
137}
138
139impl RpcSession {
140    pub fn new(config: Config) -> Result<Self> {
141        let mut tcp_channel = TTcpChannel::new();
142        let endpint = format!("{}:{}", config.host, config.port);
143
144        tcp_channel.open(&endpint).map_err(|err| {
145            Box::<dyn Error>::from(format!("failed to connect to {}, {:?}", endpint, err))
146        })?;
147
148        let (i_chan, o_chan) = tcp_channel.split()?;
149
150        let (i_prot, o_prot) = (
151            TFramedReadTransport::new(i_chan),
152            TFramedWriteTransport::new(o_chan),
153        );
154
155        let (input_protocol, output_protocol): (Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>) =
156            if config.enable_compression {
157                (
158                    Box::new(TCompactInputProtocol::new(i_prot)),
159                    Box::new(TCompactOutputProtocol::new(o_prot)),
160                )
161            } else {
162                (
163                    Box::new(TBinaryInputProtocol::new(i_prot, true)),
164                    Box::new(TBinaryOutputProtocol::new(o_prot, true)),
165                )
166            };
167
168        Ok(Self {
169            config: config,
170            session_id: None,
171            statement_id: -1,
172            client: TSIServiceSyncClient::new(input_protocol, output_protocol),
173        })
174    }
175}
176
177impl<'a> Iterator for RpcDataSet<'a> {
178    type Item = RowRecord;
179
180    fn next(&mut self) -> Option<Self::Item> {
181        if self.has_cached_results() {
182            let mut values: Vec<Value> = Vec::with_capacity(self.column_names.len());
183
184            let ts = self.query_data_set.time.drain(0..8).collect::<Vec<u8>>();
185            self.timestamp = i64::from_be_bytes(ts.try_into().unwrap());
186
187            for column_index in 0..self.query_data_set.value_list.len() {
188                if self.row_index % 8 == 0 {
189                    self.bitmaps[column_index] =
190                        self.query_data_set.bitmap_list[column_index].remove(0);
191                }
192
193                let null = self.is_null(column_index, self.row_index);
194
195                let column_data = self
196                    .query_data_set
197                    .value_list
198                    .get_mut(column_index)
199                    .unwrap();
200                if !null {
201                    let original_column_index = self.column_index_map.get(&column_index).unwrap();
202                    let data_type = self.data_types.get(*original_column_index).unwrap();
203
204                    let mut bytes: Vec<u8> = Vec::new();
205                    match data_type {
206                        TSDataType::Boolean => {
207                            bytes.push(TSDataType::Boolean as u8);
208                            bytes.push(column_data.remove(0));
209                        }
210                        TSDataType::Int32 => {
211                            bytes.push(TSDataType::Int32 as u8);
212                            bytes.extend(column_data.drain(0..4));
213                        }
214                        TSDataType::Int64 => {
215                            bytes.push(TSDataType::Int64 as u8);
216                            bytes.extend(column_data.drain(0..8));
217                        }
218                        TSDataType::Float => {
219                            bytes.push(TSDataType::Float as u8);
220                            bytes.extend(column_data.drain(0..4));
221                        }
222                        TSDataType::Double => {
223                            bytes.push(TSDataType::Double as u8);
224                            bytes.extend(column_data.drain(0..8));
225                        }
226                        TSDataType::Text => {
227                            bytes.push(TSDataType::Text as u8);
228                            let len = i32::from_be_bytes(
229                                column_data
230                                    .drain(0..4)
231                                    .collect::<Vec<u8>>()
232                                    .try_into()
233                                    .unwrap(),
234                            );
235                            bytes.extend(column_data.drain(0..len as usize).collect::<Vec<_>>());
236                        }
237                    }
238                    values.push(Value::from(bytes));
239                } else {
240                    values.push(Value::Null);
241                }
242            }
243
244            self.row_index += 1;
245
246            let mut output_values: Vec<Value> = Vec::with_capacity(if self.is_ignore_timestamp() {
247                self.get_column_names().len()
248            } else {
249                self.get_column_names().len() + 1
250            });
251            if !self.is_ignore_timestamp() {
252                output_values.push(Value::Int64(self.timestamp));
253            }
254
255            output_values.extend(self.column_names.iter().map(|column_name| {
256                values[*(self.column_name_index_map.get(column_name).unwrap()) as usize].clone()
257            }));
258            Some(RowRecord {
259                timestamp: self.timestamp,
260                values: output_values,
261            })
262        } else {
263            None
264        }
265    }
266}
267
268pub(crate) struct RpcDataSet<'a> {
269    session: &'a mut RpcSession,
270    statement: String,
271    query_id: i64,
272    is_ignore_time_stamp: Option<bool>,
273    timestamp: i64,
274    column_names: Vec<String>,
275    data_types: Vec<TSDataType>,
276    query_data_set: TSQueryDataSet,
277    column_index_map: HashMap<usize, usize>,
278    column_name_index_map: BTreeMap<String, i32>,
279    bitmaps: Vec<u8>,
280    row_index: usize,
281    closed: bool,
282}
283
284impl<'a> RpcDataSet<'a> {
285    fn is_null(&self, column_index: usize, row_index: usize) -> bool {
286        let bitmap = self.bitmaps[column_index];
287        let shift = row_index % 8;
288        ((FLAG >> shift) & (bitmap & 0xff)) == 0
289    }
290
291    fn has_cached_results(&mut self) -> bool {
292        if self.closed {
293            return false;
294        }
295        if self.query_data_set.time.is_empty() {
296            if let Some(session_id) = self.session.session_id {
297                //Fetching result from iotdb server
298                match self
299                    .session
300                    .client
301                    .fetch_results(super::rpc::TSFetchResultsReq {
302                        session_id,
303                        statement: self.statement.clone(),
304                        fetch_size: self.session.config.fetch_size,
305                        query_id: self.query_id,
306                        is_align: self.session.config.is_align,
307                        timeout: self.session.config.timeout_ms,
308                    }) {
309                    Ok(resp) => {
310                        let status = resp.status;
311                        let res: Result<()> = status.into();
312
313                        match res {
314                            Ok(_) => {
315                                if resp.has_result_set {
316                                    //update query_data_set and release row_index
317                                    if let Some(query_data_set) = resp.query_data_set {
318                                        self.query_data_set = query_data_set;
319                                        self.row_index = 0;
320                                    }
321                                } else {
322                                    //Auto close the dataset when it doesn't have any results on the server.
323                                    self.close();
324                                    return false;
325                                }
326                            }
327                            Err(err) => {
328                                eprint!("An error occurred when fetch result: {}", err);
329                                return false;
330                            }
331                        }
332                    }
333                    Err(err) => {
334                        eprint!("An error occurred when fetch result: {}", err);
335                        return false;
336                    }
337                }
338            }
339        }
340        !self.query_data_set.time.is_empty()
341    }
342
343    pub fn close(&mut self) {
344        if !self.closed {
345            if let Some(session_id) = self.session.session_id {
346                match self
347                    .session
348                    .client
349                    .close_operation(super::rpc::TSCloseOperationReq {
350                        session_id,
351                        query_id: Some(self.query_id),
352                        statement_id: Some(self.session.statement_id),
353                    }) {
354                    Ok(status) => {
355                        let res: Result<()> = status.into();
356                        match res {
357                            Ok(_) => {
358                                self.closed = true;
359                            }
360                            Err(err) => {
361                                eprint!("An error occurred when closing dataset {:?}", err)
362                            }
363                        }
364                    }
365                    Err(err) => {
366                        eprint!("An error occurred when closing dataset {:?}", err)
367                    }
368                }
369            }
370        }
371    }
372}
373
374impl<'a> Drop for RpcDataSet<'a> {
375    fn drop(&mut self) {
376        self.close();
377    }
378}
379
380impl<'a> DataSet for RpcDataSet<'a> {
381    fn get_column_names(&self) -> Vec<String> {
382        if self.is_ignore_timestamp() {
383            self.column_names.clone()
384        } else {
385            //Include the time column
386            let mut column_names = vec![String::from("Time")];
387            column_names.extend(self.column_names.clone());
388            column_names
389        }
390    }
391
392    fn get_data_types(&self) -> Vec<TSDataType> {
393        if self.is_ignore_timestamp() {
394            self.data_types.clone()
395        } else {
396            //Include the time column
397            let mut column_types = vec![TSDataType::Int64];
398            column_types.extend(self.data_types.clone());
399            column_types
400        }
401    }
402
403    fn is_ignore_timestamp(&self) -> bool {
404        if let Some(v) = self.is_ignore_time_stamp {
405            v
406        } else {
407            false
408        }
409    }
410}
411
412fn fire_closed_error() -> Result<()> {
413    Err("Operation can't be performed, the session is closed.".into())
414}
415
416impl<'a> Session<'a> for RpcSession {
417    fn open(&mut self) -> Result<()> {
418        let resp = self.client.open_session(TSOpenSessionReq::new(
419            self.config.protocol_version,
420            self.config
421                .timezone
422                .clone()
423                .unwrap_or_else(|| DEFAULT_TIME_ZONE.to_string()),
424            self.config.username.clone(),
425            self.config.password.clone(),
426            None,
427        ))?;
428        let res: Result<()> = resp.status.into();
429        res?;
430        self.session_id = resp.session_id;
431        Ok(())
432    }
433
434    fn close(&mut self) -> Result<()> {
435        if let Some(session_id) = self.session_id {
436            let status = self
437                .client
438                .close_session(TSCloseSessionReq::new(session_id))?;
439            self.session_id = None;
440            status.into()
441        } else {
442            fire_closed_error()
443        }
444    }
445
446    fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
447        if let Some(session_id) = self.session_id {
448            let status = self
449                .client
450                .set_storage_group(session_id, storage_group_id.into())?;
451            status.into()
452        } else {
453            fire_closed_error()
454        }
455    }
456
457    fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
458        self.delete_storage_groups(vec![storage_group_id])
459    }
460
461    fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()> {
462        if let Some(session_id) = self.session_id {
463            let status = self.client.delete_storage_groups(
464                session_id,
465                storage_group_ids.iter().map(ToString::to_string).collect(),
466            )?;
467            status.into()
468        } else {
469            fire_closed_error()
470        }
471    }
472
473    fn create_timeseries<T>(
474        &mut self,
475        path: &str,
476        data_type: crate::protocal::TSDataType,
477        encoding: crate::protocal::TSEncoding,
478        compressor: crate::protocal::TSCompressionType,
479        props: T,
480        attributes: T,
481        tags: T,
482        measurement_alias: Option<String>,
483    ) -> Result<()>
484    where
485        T: Into<Option<Dictionary>>,
486    {
487        if let Some(session_id) = self.session_id {
488            self.client
489                .create_timeseries(TSCreateTimeseriesReq::new(
490                    session_id,
491                    path.to_string(),
492                    data_type.into(),
493                    encoding.into(),
494                    compressor.into(),
495                    props,
496                    tags,
497                    attributes,
498                    measurement_alias,
499                ))?
500                .into()
501        } else {
502            fire_closed_error()
503        }
504    }
505
506    fn create_multi_timeseries<T>(
507        &mut self,
508        paths: Vec<&str>,
509        data_types: Vec<crate::protocal::TSDataType>,
510        encodings: Vec<crate::protocal::TSEncoding>,
511        compressors: Vec<crate::protocal::TSCompressionType>,
512        props_list: T,
513        attributes_list: T,
514        tags_list: T,
515        measurement_alias_list: Option<Vec<String>>,
516    ) -> Result<()>
517    where
518        T: Into<Option<Vec<Dictionary>>>,
519    {
520        if let Some(session_id) = self.session_id {
521            let status = self
522                .client
523                .create_multi_timeseries(TSCreateMultiTimeseriesReq::new(
524                    session_id,
525                    paths.iter().map(ToString::to_string).collect(),
526                    data_types.into_iter().map(TSDataType::into).collect(),
527                    encodings.into_iter().map(TSEncoding::into).collect(),
528                    compressors
529                        .into_iter()
530                        .map(TSCompressionType::into)
531                        .collect(),
532                    props_list,
533                    attributes_list,
534                    tags_list,
535                    measurement_alias_list,
536                ))?;
537            status.into()
538        } else {
539            fire_closed_error()
540        }
541    }
542
543    fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()> {
544        if let Some(session_id) = self.session_id {
545            let status = self
546                .client
547                .delete_timeseries(session_id, paths.iter().map(ToString::to_string).collect())?;
548            status.into()
549        } else {
550            fire_closed_error()
551        }
552    }
553
554    fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()> {
555        if let Some(session_id) = self.session_id {
556            let status = self.client.delete_data(TSDeleteDataReq::new(
557                session_id,
558                paths.into_iter().map(ToString::to_string).collect(),
559                start_time,
560                end_time,
561            ))?;
562            status.into()
563        } else {
564            fire_closed_error()
565        }
566    }
567
568    fn insert_string_record<T>(
569        &mut self,
570        device_id: &str,
571        measurements: Vec<&str>,
572        values: Vec<&str>,
573        timestamp: i64,
574        is_aligned: T,
575    ) -> Result<()>
576    where
577        T: Into<Option<bool>>,
578    {
579        if let Some(session_id) = self.session_id {
580            let status = self
581                .client
582                .insert_string_record(TSInsertStringRecordReq::new(
583                    session_id,
584                    device_id.to_string(),
585                    measurements.iter().map(ToString::to_string).collect(),
586                    values.iter().map(ToString::to_string).collect(),
587                    timestamp,
588                    is_aligned,
589                ))?;
590            status.into()
591        } else {
592            fire_closed_error()
593        }
594    }
595
596    fn get_time_zone(&mut self) -> Result<String> {
597        if let Some(session_id) = self.session_id {
598            let resp = self.client.get_time_zone(session_id)?;
599            let res: Result<()> = resp.status.into();
600            res?;
601            Ok(resp.time_zone)
602        } else {
603            Err("Operation can't be performed, the session is closed.".into())
604        }
605    }
606
607    fn set_time_zone(&mut self, time_zone: &str) -> Result<()> {
608        if let Some(session_id) = self.session_id {
609            self.client
610                .set_time_zone(TSSetTimeZoneReq::new(session_id, time_zone.to_string()))?
611                .into()
612        } else {
613            fire_closed_error()
614        }
615    }
616
617    fn execute_statement<T>(
618        &'a mut self,
619        statement: &str,
620        timeout_ms: T,
621    ) -> Result<Box<dyn 'a + DataSet>>
622    where
623        T: Into<Option<i64>>,
624    {
625        if let Some(session_id) = self.session_id {
626            let resp = self.client.execute_statement(TSExecuteStatementReq {
627                session_id,
628                statement: statement.to_string(),
629                statement_id: self.statement_id,
630                fetch_size: Some(self.config.fetch_size),
631                timeout: timeout_ms.into(),
632                enable_redirect_query: None,
633                jdbc_query: None,
634            })?;
635            let status = resp.status;
636            let code = status.code;
637            if code == SUCCESS_STATUS {
638                {
639                    if let (Some(column_names), Some(data_type_list)) =
640                        (resp.columns, resp.data_type_list)
641                    {
642                        let column_name_index_map = match resp.column_name_index_map {
643                            Some(map) => map,
644                            None => {
645                                let mut map: BTreeMap<String, i32> = BTreeMap::new();
646                                for (index, name) in column_names.iter().enumerate() {
647                                    map.insert(name.to_string(), index as i32);
648                                }
649                                map
650                            }
651                        };
652
653                        let data_types: Vec<TSDataType> =
654                            data_type_list.iter().map(TSDataType::from).collect();
655
656                        let mut column_index_map: HashMap<usize, usize> = HashMap::new();
657
658                        let column_count = column_names.len();
659                        for (index, name) in column_names.iter().enumerate() {
660                            column_index_map
661                                .insert(*column_name_index_map.get(name).unwrap() as usize, index);
662                        }
663
664                        Ok(Box::new(RpcDataSet {
665                            session: self,
666                            statement: statement.to_string(),
667                            query_id: resp.query_id.unwrap(),
668                            timestamp: -1,
669                            is_ignore_time_stamp: resp.ignore_time_stamp,
670                            query_data_set: resp.query_data_set.unwrap(),
671                            column_names,
672                            data_types,
673                            bitmaps: vec![0_u8; column_count],
674                            row_index: 0,
675                            column_index_map,
676                            column_name_index_map,
677                            closed: false,
678                        }))
679                    } else {
680                        Err("Can't get resources on execute_statement".into())
681                    }
682                }
683            } else {
684                let res: Result<()> = status.into();
685                res?;
686                Err(format!("Unknow, code: {}", code).into())
687            }
688        } else {
689            Err("Operation can't be performed, the session is closed.".into())
690        }
691    }
692
693    fn execute_query_statement<T>(
694        &'a mut self,
695        statement: &str,
696        timeout_ms: T,
697    ) -> Result<Box<dyn 'a + DataSet>>
698    where
699        T: Into<Option<i64>>,
700    {
701        if let Some(session_id) = self.session_id {
702            let resp = self.client.execute_query_statement(TSExecuteStatementReq {
703                session_id,
704                statement: statement.to_string(),
705                statement_id: self.statement_id,
706                fetch_size: Some(self.config.fetch_size),
707                timeout: timeout_ms.into(),
708                enable_redirect_query: None,
709                jdbc_query: None,
710            })?;
711            let status = resp.status;
712            let code = status.code;
713            if code == SUCCESS_STATUS {
714                let column_names: Vec<String> = resp.columns.unwrap();
715
716                let column_name_index_map = match resp.column_name_index_map {
717                    Some(v) => v,
718                    None => {
719                        let mut map: BTreeMap<String, i32> = BTreeMap::new();
720                        for (index, name) in column_names.iter().enumerate() {
721                            map.insert(name.to_string(), index as i32);
722                        }
723                        map
724                    }
725                };
726
727                let data_types: Vec<TSDataType> = resp
728                    .data_type_list
729                    .unwrap()
730                    .iter()
731                    .map(TSDataType::from)
732                    .collect();
733
734                let mut column_index_map: HashMap<usize, usize> = HashMap::new();
735
736                let column_count = column_names.len();
737                for (index, name) in column_names.iter().enumerate() {
738                    column_index_map
739                        .insert(*column_name_index_map.get(name).unwrap() as usize, index);
740                }
741                let dataset = RpcDataSet {
742                    session: self,
743                    statement: statement.to_string(),
744                    query_id: resp.query_id.unwrap(),
745                    timestamp: -1,
746                    is_ignore_time_stamp: resp.ignore_time_stamp,
747                    query_data_set: resp.query_data_set.unwrap(),
748                    column_names,
749                    data_types,
750                    bitmaps: vec![0_u8; column_count],
751                    row_index: 0,
752                    column_index_map,
753                    column_name_index_map,
754                    closed: false,
755                };
756                Ok(Box::new(dataset))
757            } else {
758                let res: Result<()> = status.into();
759                res?;
760                Err(format!("Unknow, code: {}", code).into())
761            }
762        } else {
763            Err("Operation can't be performed, the session is closed.".into())
764        }
765    }
766
767    fn insert_record<T>(
768        &mut self,
769        device_id: &str,
770        measurements: Vec<&str>,
771        values: Vec<Value>,
772        timestamp: i64,
773        is_aligned: T,
774    ) -> Result<()>
775    where
776        T: Into<Option<bool>>,
777    {
778        if let Some(session_id) = self.session_id {
779            let mut values_bytes: Vec<u8> = Vec::new();
780            values.iter().for_each(|v| {
781                let mut value_bytes: Vec<u8> = v.into();
782                values_bytes.append(&mut value_bytes);
783            });
784            let status = self.client.insert_record(TSInsertRecordReq::new(
785                session_id,
786                device_id.to_string(),
787                measurements.iter().map(ToString::to_string).collect(),
788                values_bytes,
789                timestamp,
790                is_aligned,
791            ))?;
792            status.into()
793        } else {
794            fire_closed_error()
795        }
796    }
797
798    fn insert_records_of_one_device(
799        &mut self,
800        device_id: &str,
801        timestamps: Vec<i64>,
802        measurements: Vec<Vec<&str>>,
803        values: Vec<Vec<super::Value>>,
804        sorted: bool,
805    ) -> Result<()> {
806        let mut sorted_timestamps = timestamps;
807        let mut sorted_measurements = measurements;
808        let mut sorted_values = values;
809
810        if !sorted {
811            let permutation = permutation::sort(&sorted_timestamps[..]);
812            sorted_timestamps = permutation.apply_slice(&sorted_timestamps[..]);
813            sorted_measurements = permutation.apply_slice(&sorted_measurements[..]);
814            sorted_values = permutation.apply_slice(&sorted_values[..]);
815        }
816
817        if let Some(session_id) = self.session_id {
818            let values_list = sorted_values
819                .iter()
820                .map(|vec| {
821                    let mut values: Vec<u8> = Vec::new();
822                    for value in vec.iter() {
823                        let mut value_data: Vec<u8> = value.into();
824                        values.append(&mut value_data);
825                    }
826                    values
827                })
828                .collect();
829            let status =
830                self.client
831                    .insert_records_of_one_device(TSInsertRecordsOfOneDeviceReq::new(
832                        session_id,
833                        device_id.to_string(),
834                        sorted_measurements
835                            .iter()
836                            .map(|vec| vec.iter().map(ToString::to_string).collect())
837                            .collect(),
838                        values_list,
839                        sorted_timestamps,
840                        false,
841                    ))?;
842            status.into()
843        } else {
844            fire_closed_error()
845        }
846    }
847
848    fn insert_records(
849        &mut self,
850        prefix_path: Vec<&str>,
851        measurements: Vec<Vec<&str>>,
852        values: Vec<Vec<super::Value>>,
853        timestamps: Vec<i64>,
854    ) -> Result<()> {
855        if let Some(session_id) = self.session_id {
856            let values_list = values
857                .iter()
858                .map(|vec| {
859                    let mut values: Vec<u8> = Vec::new();
860                    for value in vec.iter() {
861                        let mut value_data: Vec<u8> = value.into();
862                        values.append(&mut value_data);
863                    }
864                    values
865                })
866                .collect();
867            let status = self.client.insert_records(TSInsertRecordsReq {
868                session_id,
869                prefix_paths: prefix_path.iter().map(ToString::to_string).collect(),
870                measurements_list: measurements
871                    .iter()
872                    .map(|ms| ms.iter().map(ToString::to_string).collect())
873                    .collect(),
874                values_list,
875                timestamps,
876                is_aligned: None,
877            })?;
878            status.into()
879        } else {
880            fire_closed_error()
881        }
882    }
883
884    fn insert_tablet(&mut self, tablet: &super::Tablet) -> Result<()> {
885        if let Some(session_id) = self.session_id {
886            let mut timestamps_list: Vec<u8> = Vec::with_capacity(tablet.timestamps.len() * 8);
887            tablet
888                .timestamps
889                .iter()
890                .for_each(|ts| timestamps_list.append(&mut ts.to_be_bytes().to_vec()));
891
892            let status = self.client.insert_tablet(TSInsertTabletReq {
893                session_id,
894                prefix_path: tablet.get_prefix_path(),
895                measurements: tablet
896                    .measurement_schemas
897                    .iter()
898                    .map(|f| f.measurement.to_string())
899                    .collect(),
900                values: tablet.into(),
901                timestamps: timestamps_list,
902                types: tablet
903                    .get_measurement_schemas()
904                    .into_iter()
905                    .map(|measurement_schema| measurement_schema.data_type.into())
906                    .collect(),
907                size: tablet.get_row_count() as i32,
908                is_aligned: Some(false),
909            })?;
910            status.into()
911        } else {
912            fire_closed_error()
913        }
914    }
915
916    fn insert_tablets(&mut self, tablets: Vec<&super::Tablet>) -> Result<()> {
917        if let Some(session_id) = self.session_id {
918            let status = self.client.insert_tablets(TSInsertTabletsReq {
919                session_id,
920                prefix_paths: tablets.iter().map(|t| t.get_prefix_path()).collect(),
921                measurements_list: tablets
922                    .iter()
923                    .map(|tablet| {
924                        tablet
925                            .measurement_schemas
926                            .iter()
927                            .map(|f| f.measurement.to_string())
928                            .collect()
929                    })
930                    .collect(),
931                values_list: tablets.iter().map(|tablet| Into::into(*tablet)).collect(),
932                timestamps_list: tablets
933                    .iter()
934                    .map(|tablet| {
935                        let mut ts_item: Vec<u8> = Vec::new();
936                        tablet
937                            .timestamps
938                            .iter()
939                            .for_each(|ts| ts_item.append(&mut ts.to_be_bytes().to_vec()));
940                        ts_item
941                    })
942                    .collect(),
943                types_list: tablets
944                    .iter()
945                    .map(|tablet| {
946                        tablet
947                            .get_measurement_schemas()
948                            .into_iter()
949                            .map(|f| {
950                                let t: i32 = f.data_type.into();
951                                t
952                            })
953                            .collect()
954                    })
955                    .collect(),
956                size_list: tablets
957                    .iter()
958                    .map(|tablet| tablet.get_row_count() as i32)
959                    .collect(),
960                is_aligned: Some(false),
961            })?;
962            status.into()
963        } else {
964            fire_closed_error()
965        }
966    }
967
968    fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()> {
969        if let Some(session_id) = self.session_id {
970            let status =
971                self.client
972                    .execute_batch_statement(super::rpc::TSExecuteBatchStatementReq {
973                        session_id,
974                        statements: statemens.iter().map(ToString::to_string).collect(),
975                    })?;
976            status.into()
977        } else {
978            fire_closed_error()
979        }
980    }
981
982    fn execute_raw_data_query(
983        &'a mut self,
984        paths: Vec<&str>,
985        start_time: i64,
986        end_time: i64,
987    ) -> Result<Box<dyn 'a + DataSet>> {
988        if let Some(session_id) = self.session_id {
989            let resp = self
990                .client
991                .execute_raw_data_query(super::rpc::TSRawDataQueryReq {
992                    session_id,
993                    paths: paths.iter().map(ToString::to_string).collect(),
994                    fetch_size: Some(self.config.fetch_size),
995                    start_time,
996                    end_time,
997                    statement_id: self.statement_id,
998                    enable_redirect_query: None,
999                    jdbc_query: None,
1000                })?;
1001            let status = resp.status;
1002            let code = status.code;
1003            if code == SUCCESS_STATUS {
1004                if let (Some(query_data_set), Some(column_names), Some(data_types_list)) =
1005                    (resp.query_data_set, resp.columns, resp.data_type_list)
1006                {
1007                    let column_name_index_map = match resp.column_name_index_map {
1008                        Some(v) => v,
1009                        None => {
1010                            let mut map: BTreeMap<String, i32> = BTreeMap::new();
1011                            for (index, name) in column_names.iter().enumerate() {
1012                                map.insert(name.to_string(), index as i32);
1013                            }
1014                            map
1015                        }
1016                    };
1017
1018                    let data_types: Vec<TSDataType> =
1019                        data_types_list.iter().map(TSDataType::from).collect();
1020
1021                    let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1022
1023                    let column_count = column_names.len();
1024                    for (index, name) in column_names.iter().enumerate() {
1025                        column_index_map
1026                            .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1027                    }
1028
1029                    Ok(Box::new(RpcDataSet {
1030                        session: self,
1031                        statement: "".to_string(),
1032                        query_id: resp.query_id.unwrap(),
1033                        timestamp: -1,
1034                        is_ignore_time_stamp: resp.ignore_time_stamp,
1035                        query_data_set,
1036                        column_names,
1037                        data_types,
1038                        bitmaps: vec![0_u8; column_count],
1039                        row_index: 0,
1040                        column_index_map,
1041                        column_name_index_map,
1042                        closed: false,
1043                    }))
1044                } else {
1045                    Err("Did't get the result.".into())
1046                }
1047            } else {
1048                let res: Result<()> = status.into();
1049                res?;
1050                Err(format!("Unknow, code: {}", code).into())
1051            }
1052        } else {
1053            Err("Operation can't be performed, the session is closed.".into())
1054        }
1055    }
1056
1057    fn execute_update_statement(
1058        &'a mut self,
1059        statement: &str,
1060    ) -> Result<Option<Box<dyn 'a + DataSet>>> {
1061        if let Some(session_id) = self.session_id {
1062            let resp = self
1063                .client
1064                .execute_update_statement(TSExecuteStatementReq {
1065                    session_id,
1066                    statement: statement.to_string(),
1067                    statement_id: self.statement_id,
1068                    fetch_size: Some(self.config.fetch_size),
1069                    timeout: self.config.timeout_ms,
1070                    enable_redirect_query: None,
1071                    jdbc_query: None,
1072                })?;
1073            let status = resp.status;
1074            let code = status.code;
1075            if code == SUCCESS_STATUS {
1076                if let (Some(query_data_set), Some(column_names), Some(data_type_list)) =
1077                    (resp.query_data_set, resp.columns, resp.data_type_list)
1078                {
1079                    let column_name_index_map = match resp.column_name_index_map {
1080                        Some(v) => v,
1081                        None => {
1082                            let mut map: BTreeMap<String, i32> = BTreeMap::new();
1083                            for (index, name) in column_names.iter().enumerate() {
1084                                map.insert(name.to_string(), index as i32);
1085                            }
1086                            map
1087                        }
1088                    };
1089
1090                    let data_types: Vec<TSDataType> =
1091                        data_type_list.iter().map(TSDataType::from).collect();
1092
1093                    let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1094
1095                    let column_count = column_names.len();
1096                    for (index, name) in column_names.iter().enumerate() {
1097                        column_index_map
1098                            .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1099                    }
1100
1101                    Ok(Some(Box::new(RpcDataSet {
1102                        session: self,
1103                        statement: statement.to_string(),
1104                        query_id: resp.query_id.unwrap(),
1105                        timestamp: -1,
1106                        is_ignore_time_stamp: resp.ignore_time_stamp,
1107                        query_data_set,
1108                        column_names,
1109                        data_types,
1110                        bitmaps: vec![0_u8; column_count],
1111                        row_index: 0,
1112                        column_index_map,
1113                        column_name_index_map,
1114                        closed: false,
1115                    })))
1116                } else {
1117                    Ok(None)
1118                }
1119            } else {
1120                let res: Result<()> = status.into();
1121                res?;
1122                Err(format!("Unknow, code: {}", code).into())
1123            }
1124        } else {
1125            Err("Operation can't be performed, the session is closed.".into())
1126        }
1127    }
1128}
1129
1130impl Drop for RpcSession {
1131    fn drop(&mut self) {
1132        if let Some(session_id) = self.session_id {
1133            self.close().unwrap_or_else(|err| {
1134                eprint!("error closing the session {}, reason {}", session_id, err)
1135            });
1136        }
1137    }
1138}