iotdb/
session.rs

1use std::collections::{BTreeMap, HashMap};
2
3use chrono::{Local, Utc};
4use log::{debug, error, info, trace, warn};
5use thrift::protocol::{TInputProtocol, TOutputProtocol};
6
7use crate::rpc::{
8    ServerProperties, TSCancelOperationReq, TSCloseSessionReq, TSCreateMultiTimeseriesReq,
9    TSCreateTimeseriesReq, TSDeleteDataReq, TSExecuteStatementReq, TSExecuteStatementResp,
10    TSIServiceSyncClient, TSInsertRecordReq, TSInsertRecordsOfOneDeviceReq, TSInsertRecordsReq,
11    TSInsertStringRecordsReq, TSInsertTabletReq, TSInsertTabletsReq, TSOpenSessionReq,
12    TSProtocolVersion, TSSetTimeZoneReq, TSStatus, TTSIServiceSyncClient,
13};
14use thrift::{ApplicationErrorKind, Error, ProtocolErrorKind, TransportErrorKind};
15
16type ClientType = TSIServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>;
17
18pub const SUCCESS_CODE: i32 = 200;
19
20pub struct Session {
21    user: String,
22    password: String,
23    zone_id: String,
24    fetch_size: i32,
25    session_id: i64,
26    statement_id: i64,
27    is_close: bool,
28    protocol_version: TSProtocolVersion,
29    config: BTreeMap<String, String>,
30    client: ClientType,
31}
32
33impl Session {
34    pub fn new(client: ClientType) -> Session {
35        let tz = format!("{}{}", Utc::now().offset(), Local::now().offset());
36        Self {
37            user: "root".to_string(),
38            password: "root".to_string(),
39            zone_id: tz,
40            fetch_size: 1024,
41            session_id: -1,
42            statement_id: -1,
43            is_close: true,
44            protocol_version: TSProtocolVersion::IotdbServiceProtocolV3,
45            config: BTreeMap::new(),
46            client,
47        }
48    }
49
50    pub fn user(&mut self, user: &str) -> &mut Session {
51        self.user = user.to_string();
52        self
53    }
54
55    pub fn password(&mut self, password: &str) -> &mut Session {
56        self.password = password.to_string();
57        self
58    }
59
60    pub fn zone_id(&mut self, zone_id: &str) -> &mut Session {
61        self.zone_id = zone_id.to_string();
62        self
63    }
64
65    pub fn fetch_size(&mut self, fetch_size: i32) -> &mut Session {
66        self.fetch_size = fetch_size;
67        self
68    }
69
70    pub fn protocol_version(&mut self, user: &str) -> &mut Session {
71        self.user = user.to_string();
72        self
73    }
74
75    pub fn config(&mut self, key: &str, value: &str) -> &mut Session {
76        self.config
77            .clone()
78            .insert(key.to_string(), value.to_string());
79        self
80    }
81
82    pub fn config_map(&mut self, map: HashMap<&str, &str>) -> &mut Session {
83        for key in map.keys() {
84            self.config
85                .clone()
86                .insert(key.to_string(), map.get(key).unwrap().to_string());
87        }
88        self
89    }
90
91    // Verify success status of operation
92    pub fn is_success(&self, status: &TSStatus) -> bool {
93        if status.code == SUCCESS_CODE {
94            true
95        } else {
96            false
97        }
98    }
99
100    // Open Session
101    pub fn open(&mut self) -> thrift::Result<&mut Session> {
102        trace!("Open session");
103        let open_req = TSOpenSessionReq::new(
104            self.protocol_version.clone(),
105            self.zone_id.to_string(),
106            self.user.clone(),
107            self.password.clone(),
108            self.config.clone(),
109        );
110
111        match self.client.open_session(open_req.clone()) {
112            Ok(resp) => {
113                let status = resp.status;
114                if self.is_success(&status) {
115                    if self.protocol_version != resp.server_protocol_version {
116                        let msg = format!(
117                            "Protocol version is different, client is {:?},server is {:?}",
118                            self.protocol_version, resp.server_protocol_version
119                        );
120                        error!("{}", msg.clone());
121                        Err(thrift::new_protocol_error(
122                            ProtocolErrorKind::BadVersion,
123                            msg,
124                        ))
125                    } else {
126                        self.session_id = resp.session_id.unwrap();
127                        self.statement_id = self.client.request_statement_id(self.session_id)?;
128                        self.is_close = false;
129                        debug!("Session opened");
130                        Ok(self)
131                    }
132                } else {
133                    error!("{}", status.message.clone().unwrap());
134                    Err(thrift::new_application_error(
135                        ApplicationErrorKind::MissingResult,
136                        status.message.unwrap(),
137                    ))
138                }
139            }
140            Err(error) => Err(thrift::new_transport_error(
141                TransportErrorKind::Unknown,
142                error.to_string(),
143            )),
144        }
145    }
146
147    pub fn is_open(&self) -> bool {
148        !self.is_close.clone()
149    }
150
151    // Close Session
152    pub fn close(&mut self) -> thrift::Result<()> {
153        trace!("Close session");
154        if self.is_close {
155            Ok(())
156        } else {
157            let req = TSCloseSessionReq::new(self.session_id);
158            match self.client.close_session(req) {
159                Ok(status) => {
160                    if self.is_success(&status) {
161                        self.is_close = true;
162                        debug!("Session closed");
163                        Ok(())
164                    } else {
165                        error!("{}", status.message.clone().unwrap());
166                        Err(thrift::new_application_error(
167                            ApplicationErrorKind::MissingResult,
168                            status.message.unwrap(),
169                        ))
170                    }
171                }
172                Err(error) => Err(thrift::new_transport_error(
173                    TransportErrorKind::Unknown,
174                    error.to_string(),
175                )),
176            }
177        }
178    }
179
180    /// Set a storage group
181    pub fn set_storage_group(&mut self, storage_group: &str) -> thrift::Result<()> {
182        trace!("Set storage group");
183        match self
184            .client
185            .set_storage_group(self.session_id, storage_group.to_string())
186        {
187            Ok(status) => {
188                if self.is_success(&status) {
189                    Ok(())
190                } else {
191                    error!("{}", status.message.clone().unwrap());
192                    Err(thrift::new_application_error(
193                        ApplicationErrorKind::MissingResult,
194                        status.message.unwrap(),
195                    ))
196                }
197            }
198            Err(error) => Err(thrift::new_transport_error(
199                TransportErrorKind::Unknown,
200                error.to_string(),
201            )),
202        }
203    }
204
205    /// Delete a storage group.
206    pub fn delete_storage_group(&mut self, storage_group: &str) -> thrift::Result<()> {
207        trace!("Delete a storage group");
208        match self
209            .client
210            .delete_storage_groups(self.session_id, vec![storage_group.to_string()])
211        {
212            Ok(status) => {
213                if self.is_success(&status) {
214                    Ok(())
215                } else {
216                    error!("{}", status.message.clone().unwrap());
217                    Err(thrift::new_application_error(
218                        ApplicationErrorKind::MissingResult,
219                        status.message.unwrap(),
220                    ))
221                }
222            }
223            Err(error) => Err(thrift::new_transport_error(
224                TransportErrorKind::Unknown,
225                error.to_string(),
226            )),
227        }
228    }
229
230    /// Delete storage groups.
231    pub fn delete_storage_groups(&mut self, storage_groups: Vec<String>) -> thrift::Result<()> {
232        trace!("Delete storage groups");
233        match self
234            .client
235            .delete_storage_groups(self.session_id, storage_groups)
236        {
237            Ok(status) => {
238                if self.is_success(&status) {
239                    Ok(())
240                } else {
241                    error!("{}", status.message.clone().unwrap());
242                    Err(thrift::new_application_error(
243                        ApplicationErrorKind::MissingResult,
244                        status.message.unwrap(),
245                    ))
246                }
247            }
248            Err(error) => Err(thrift::new_transport_error(
249                TransportErrorKind::Unknown,
250                error.to_string(),
251            )),
252        }
253    }
254
255    /// Create single time-series
256    pub fn create_time_series(
257        &mut self,
258        ts_path: String,
259        data_type: i32,
260        encoding: i32,
261        compressor: i32,
262    ) -> thrift::Result<()> {
263        trace!("Create single time-series");
264        let req = TSCreateTimeseriesReq::new(
265            self.session_id,
266            ts_path,
267            data_type,
268            encoding,
269            compressor,
270            None,
271            None,
272            None,
273            None,
274        );
275        match self.client.create_timeseries(req) {
276            Ok(status) => {
277                if self.is_success(&status) {
278                    Ok(())
279                } else {
280                    error!("{}", status.message.clone().unwrap());
281                    Err(thrift::new_application_error(
282                        ApplicationErrorKind::MissingResult,
283                        status.message.unwrap(),
284                    ))
285                }
286            }
287            Err(error) => Err(thrift::new_transport_error(
288                TransportErrorKind::Unknown,
289                error.to_string(),
290            )),
291        }
292    }
293
294    /// Create multiple time-series
295    pub fn create_multi_time_series(
296        &mut self,
297        ts_path_vec: Vec<String>,
298        data_type_vec: Vec<i32>,
299        encoding_vec: Vec<i32>,
300        compressor_vec: Vec<i32>,
301    ) -> thrift::Result<()> {
302        trace!("Create multiple time-series");
303        let req = TSCreateMultiTimeseriesReq::new(
304            self.session_id,
305            ts_path_vec,
306            data_type_vec,
307            encoding_vec,
308            compressor_vec,
309            None,
310            None,
311            None,
312            None,
313        );
314        match self.client.create_multi_timeseries(req) {
315            Ok(status) => {
316                if self.is_success(&status) {
317                    Ok(())
318                } else {
319                    error!("{}", status.message.clone().unwrap());
320                    Err(thrift::new_application_error(
321                        ApplicationErrorKind::MissingResult,
322                        status.message.unwrap(),
323                    ))
324                }
325            }
326            Err(error) => Err(thrift::new_transport_error(
327                TransportErrorKind::Unknown,
328                error.to_string(),
329            )),
330        }
331    }
332
333    /// Delete multiple time series
334    pub fn delete_time_series(&mut self, path_vec: Vec<String>) -> thrift::Result<()> {
335        trace!("Delete multiple time-series");
336        match self.client.delete_timeseries(self.session_id, path_vec) {
337            Ok(status) => {
338                if self.is_success(&status) {
339                    Ok(())
340                } else {
341                    error!("{}", status.message.clone().unwrap());
342                    Err(thrift::new_application_error(
343                        ApplicationErrorKind::MissingResult,
344                        status.message.unwrap(),
345                    ))
346                }
347            }
348            Err(error) => Err(thrift::new_transport_error(
349                TransportErrorKind::Unknown,
350                error.to_string(),
351            )),
352        }
353    }
354
355    /// Check whether a specific time-series exists
356    // TODO
357    pub fn check_time_series_exist(&mut self, path: &str) {
358        trace!("Check time-series exists");
359        self.query(format!("SHOW TIMESERIES {}", path).as_str());
360    }
361
362    /// Delete all data <= time in multiple time-series
363    pub fn delete_data(&mut self, path_vec: Vec<String>, timestamp: i64) -> thrift::Result<()> {
364        trace!("Delete data");
365        let req = TSDeleteDataReq::new(self.session_id, path_vec, 0, timestamp);
366        match self.client.delete_data(req) {
367            Ok(status) => {
368                if self.is_success(&status) {
369                    Ok(())
370                } else {
371                    error!("{}", status.message.clone().unwrap());
372                    Err(thrift::new_application_error(
373                        ApplicationErrorKind::MissingResult,
374                        status.message.unwrap(),
375                    ))
376                }
377            }
378            Err(error) => Err(thrift::new_transport_error(
379                TransportErrorKind::Unknown,
380                error.to_string(),
381            )),
382        }
383    }
384
385    /// Insert string records
386    // TODO
387    pub fn insert_string_records(
388        &mut self,
389        device_ids: Vec<String>,
390        measurements_list: Vec<Vec<String>>,
391        values_list: Vec<Vec<String>>,
392        timestamps: Vec<i64>,
393    ) -> thrift::Result<TSStatus> {
394        let req = TSInsertStringRecordsReq::new(
395            self.session_id,
396            device_ids,
397            measurements_list,
398            values_list,
399            timestamps,
400        );
401        self.client.insert_string_records(req)
402    }
403
404    /// Insert record
405    // TODO
406    pub fn insert_record(
407        &mut self,
408        device_id: String,
409        measurements: Vec<String>,
410        values: Vec<u8>,
411        timestamp: i64,
412    ) -> thrift::Result<TSStatus> {
413        let req =
414            TSInsertRecordReq::new(self.session_id, device_id, measurements, values, timestamp);
415        self.client.insert_record(req)
416    }
417
418    /// Insert records
419    // TODO
420    pub fn insert_records(
421        &mut self,
422        device_ids: Vec<String>,
423        measurements_list: Vec<Vec<String>>,
424        values_list: Vec<Vec<u8>>,
425        timestamps: Vec<i64>,
426    ) -> thrift::Result<TSStatus> {
427        let req = TSInsertRecordsReq::new(
428            self.session_id,
429            device_ids,
430            measurements_list,
431            values_list,
432            timestamps,
433        );
434        self.client.insert_records(req)
435    }
436
437    /// Insert records of one device
438    // TODO
439    pub fn insert_records_of_one_device(
440        &mut self,
441        device_id: String,
442        measurements_list: Vec<Vec<String>>,
443        values_list: Vec<Vec<u8>>,
444        timestamps: Vec<i64>,
445    ) -> thrift::Result<TSStatus> {
446        let req = TSInsertRecordsOfOneDeviceReq::new(
447            self.session_id,
448            device_id,
449            measurements_list,
450            values_list,
451            timestamps,
452        );
453        Ok(self.client.insert_records_of_one_device(req)?)
454    }
455
456    /// Insert tablet
457    // TODO
458    pub fn insert_tablet(
459        &mut self,
460        device_id: String,
461        measurements: Vec<String>,
462        values: Vec<u8>,
463        timestamps: Vec<u8>,
464        types: Vec<i32>,
465        size: i32,
466    ) -> thrift::Result<TSStatus> {
467        trace!("Delete data");
468        let req = TSInsertTabletReq::new(
469            self.session_id,
470            device_id,
471            measurements,
472            values,
473            timestamps,
474            types,
475            size,
476        );
477        self.client.insert_tablet(req)
478    }
479
480    /// Insert tablets
481    // TODO
482    pub fn insert_tablets(
483        &mut self,
484        device_ids: Vec<String>,
485        measurements_list: Vec<Vec<String>>,
486        values_list: Vec<Vec<u8>>,
487        timestamps_list: Vec<Vec<u8>>,
488        types_list: Vec<Vec<i32>>,
489        size_list: Vec<i32>,
490    ) -> thrift::Result<TSStatus> {
491        let req = TSInsertTabletsReq::new(
492            self.session_id,
493            device_ids,
494            measurements_list,
495            values_list,
496            timestamps_list,
497            types_list,
498            size_list,
499        );
500        self.client.insert_tablets(req)
501    }
502
503    /// Set time zone
504    pub fn set_time_zone(&mut self, time_zone: &str) -> thrift::Result<()> {
505        trace!("Set time zone");
506        let req = TSSetTimeZoneReq::new(self.session_id, time_zone.to_string());
507        match self.client.set_time_zone(req) {
508            Ok(status) => {
509                if status.code == 200 {
510                    Ok(())
511                } else {
512                    error!("{}", status.message.clone().unwrap());
513                    Err(thrift::new_application_error(
514                        ApplicationErrorKind::MissingResult,
515                        status.message.unwrap(),
516                    ))
517                }
518            }
519            Err(error) => Err(thrift::new_transport_error(
520                TransportErrorKind::Unknown,
521                error.to_string(),
522            )),
523        }
524    }
525
526    // Exec Query
527    pub fn query(&mut self, sql: &str) -> thrift::Result<TSExecuteStatementResp> {
528        debug!("Exec query \"{}\"", &sql);
529        let req = TSExecuteStatementReq::new(
530            self.session_id,
531            sql.to_string(),
532            self.statement_id,
533            self.fetch_size,
534        );
535        match self.client.execute_query_statement(req) {
536            Ok(resp) => {
537                if resp.status.code == 200 {
538                    Ok(resp)
539                } else {
540                    error!("{}", resp.status.message.clone().unwrap());
541                    Err(thrift::new_application_error(
542                        ApplicationErrorKind::MissingResult,
543                        resp.status.message.unwrap(),
544                    ))
545                }
546            }
547            Err(error) => Err(thrift::new_transport_error(
548                TransportErrorKind::Unknown,
549                error.to_string(),
550            )),
551        }
552    }
553
554    /// Get time zone
555    pub fn get_time_zone(&mut self) -> thrift::Result<String> {
556        trace!("Get time zone");
557        match self.client.get_time_zone(self.session_id.clone()) {
558            Ok(resp) => {
559                if resp.status.code == 200 {
560                    Ok(resp.time_zone)
561                } else {
562                    error!("{}", resp.status.message.unwrap());
563                    Ok(String::new())
564                }
565            }
566            Err(_) => Ok(String::new()),
567        }
568    }
569
570    /// Get properties
571    pub fn get_properties(&mut self) -> thrift::Result<ServerProperties> {
572        trace!("Get properties");
573        Ok(self.client.get_properties()?)
574    }
575
576    /// Cancel operation
577    //TODO
578    pub fn cancel_operation(&mut self, query_id: i64) -> thrift::Result<TSStatus> {
579        let req = TSCancelOperationReq::new(self.session_id, query_id);
580        self.client.cancel_operation(req)
581    }
582}