1use std::collections::{BTreeMap, HashMap};
2
3use chrono::{Local, Utc};
4use log::{debug, error, info, trace, warn};
5use thrift::protocol::{TInputProtocol, TOutputProtocol};
6
7use crate::rpc::{
8 ServerProperties, TSCancelOperationReq, TSCloseSessionReq, TSCreateMultiTimeseriesReq,
9 TSCreateTimeseriesReq, TSDeleteDataReq, TSExecuteStatementReq, TSExecuteStatementResp,
10 TSIServiceSyncClient, TSInsertRecordReq, TSInsertRecordsOfOneDeviceReq, TSInsertRecordsReq,
11 TSInsertStringRecordsReq, TSInsertTabletReq, TSInsertTabletsReq, TSOpenSessionReq,
12 TSProtocolVersion, TSSetTimeZoneReq, TSStatus, TTSIServiceSyncClient,
13};
14use thrift::{ApplicationErrorKind, Error, ProtocolErrorKind, TransportErrorKind};
15
16type ClientType = TSIServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>;
17
18pub const SUCCESS_CODE: i32 = 200;
19
20pub struct Session {
21 user: String,
22 password: String,
23 zone_id: String,
24 fetch_size: i32,
25 session_id: i64,
26 statement_id: i64,
27 is_close: bool,
28 protocol_version: TSProtocolVersion,
29 config: BTreeMap<String, String>,
30 client: ClientType,
31}
32
33impl Session {
34 pub fn new(client: ClientType) -> Session {
35 let tz = format!("{}{}", Utc::now().offset(), Local::now().offset());
36 Self {
37 user: "root".to_string(),
38 password: "root".to_string(),
39 zone_id: tz,
40 fetch_size: 1024,
41 session_id: -1,
42 statement_id: -1,
43 is_close: true,
44 protocol_version: TSProtocolVersion::IotdbServiceProtocolV3,
45 config: BTreeMap::new(),
46 client,
47 }
48 }
49
50 pub fn user(&mut self, user: &str) -> &mut Session {
51 self.user = user.to_string();
52 self
53 }
54
55 pub fn password(&mut self, password: &str) -> &mut Session {
56 self.password = password.to_string();
57 self
58 }
59
60 pub fn zone_id(&mut self, zone_id: &str) -> &mut Session {
61 self.zone_id = zone_id.to_string();
62 self
63 }
64
65 pub fn fetch_size(&mut self, fetch_size: i32) -> &mut Session {
66 self.fetch_size = fetch_size;
67 self
68 }
69
70 pub fn protocol_version(&mut self, user: &str) -> &mut Session {
71 self.user = user.to_string();
72 self
73 }
74
75 pub fn config(&mut self, key: &str, value: &str) -> &mut Session {
76 self.config
77 .clone()
78 .insert(key.to_string(), value.to_string());
79 self
80 }
81
82 pub fn config_map(&mut self, map: HashMap<&str, &str>) -> &mut Session {
83 for key in map.keys() {
84 self.config
85 .clone()
86 .insert(key.to_string(), map.get(key).unwrap().to_string());
87 }
88 self
89 }
90
91 pub fn is_success(&self, status: &TSStatus) -> bool {
93 if status.code == SUCCESS_CODE {
94 true
95 } else {
96 false
97 }
98 }
99
100 pub fn open(&mut self) -> thrift::Result<&mut Session> {
102 trace!("Open session");
103 let open_req = TSOpenSessionReq::new(
104 self.protocol_version.clone(),
105 self.zone_id.to_string(),
106 self.user.clone(),
107 self.password.clone(),
108 self.config.clone(),
109 );
110
111 match self.client.open_session(open_req.clone()) {
112 Ok(resp) => {
113 let status = resp.status;
114 if self.is_success(&status) {
115 if self.protocol_version != resp.server_protocol_version {
116 let msg = format!(
117 "Protocol version is different, client is {:?},server is {:?}",
118 self.protocol_version, resp.server_protocol_version
119 );
120 error!("{}", msg.clone());
121 Err(thrift::new_protocol_error(
122 ProtocolErrorKind::BadVersion,
123 msg,
124 ))
125 } else {
126 self.session_id = resp.session_id.unwrap();
127 self.statement_id = self.client.request_statement_id(self.session_id)?;
128 self.is_close = false;
129 debug!("Session opened");
130 Ok(self)
131 }
132 } else {
133 error!("{}", status.message.clone().unwrap());
134 Err(thrift::new_application_error(
135 ApplicationErrorKind::MissingResult,
136 status.message.unwrap(),
137 ))
138 }
139 }
140 Err(error) => Err(thrift::new_transport_error(
141 TransportErrorKind::Unknown,
142 error.to_string(),
143 )),
144 }
145 }
146
147 pub fn is_open(&self) -> bool {
148 !self.is_close.clone()
149 }
150
151 pub fn close(&mut self) -> thrift::Result<()> {
153 trace!("Close session");
154 if self.is_close {
155 Ok(())
156 } else {
157 let req = TSCloseSessionReq::new(self.session_id);
158 match self.client.close_session(req) {
159 Ok(status) => {
160 if self.is_success(&status) {
161 self.is_close = true;
162 debug!("Session closed");
163 Ok(())
164 } else {
165 error!("{}", status.message.clone().unwrap());
166 Err(thrift::new_application_error(
167 ApplicationErrorKind::MissingResult,
168 status.message.unwrap(),
169 ))
170 }
171 }
172 Err(error) => Err(thrift::new_transport_error(
173 TransportErrorKind::Unknown,
174 error.to_string(),
175 )),
176 }
177 }
178 }
179
180 pub fn set_storage_group(&mut self, storage_group: &str) -> thrift::Result<()> {
182 trace!("Set storage group");
183 match self
184 .client
185 .set_storage_group(self.session_id, storage_group.to_string())
186 {
187 Ok(status) => {
188 if self.is_success(&status) {
189 Ok(())
190 } else {
191 error!("{}", status.message.clone().unwrap());
192 Err(thrift::new_application_error(
193 ApplicationErrorKind::MissingResult,
194 status.message.unwrap(),
195 ))
196 }
197 }
198 Err(error) => Err(thrift::new_transport_error(
199 TransportErrorKind::Unknown,
200 error.to_string(),
201 )),
202 }
203 }
204
205 pub fn delete_storage_group(&mut self, storage_group: &str) -> thrift::Result<()> {
207 trace!("Delete a storage group");
208 match self
209 .client
210 .delete_storage_groups(self.session_id, vec![storage_group.to_string()])
211 {
212 Ok(status) => {
213 if self.is_success(&status) {
214 Ok(())
215 } else {
216 error!("{}", status.message.clone().unwrap());
217 Err(thrift::new_application_error(
218 ApplicationErrorKind::MissingResult,
219 status.message.unwrap(),
220 ))
221 }
222 }
223 Err(error) => Err(thrift::new_transport_error(
224 TransportErrorKind::Unknown,
225 error.to_string(),
226 )),
227 }
228 }
229
230 pub fn delete_storage_groups(&mut self, storage_groups: Vec<String>) -> thrift::Result<()> {
232 trace!("Delete storage groups");
233 match self
234 .client
235 .delete_storage_groups(self.session_id, storage_groups)
236 {
237 Ok(status) => {
238 if self.is_success(&status) {
239 Ok(())
240 } else {
241 error!("{}", status.message.clone().unwrap());
242 Err(thrift::new_application_error(
243 ApplicationErrorKind::MissingResult,
244 status.message.unwrap(),
245 ))
246 }
247 }
248 Err(error) => Err(thrift::new_transport_error(
249 TransportErrorKind::Unknown,
250 error.to_string(),
251 )),
252 }
253 }
254
255 pub fn create_time_series(
257 &mut self,
258 ts_path: String,
259 data_type: i32,
260 encoding: i32,
261 compressor: i32,
262 ) -> thrift::Result<()> {
263 trace!("Create single time-series");
264 let req = TSCreateTimeseriesReq::new(
265 self.session_id,
266 ts_path,
267 data_type,
268 encoding,
269 compressor,
270 None,
271 None,
272 None,
273 None,
274 );
275 match self.client.create_timeseries(req) {
276 Ok(status) => {
277 if self.is_success(&status) {
278 Ok(())
279 } else {
280 error!("{}", status.message.clone().unwrap());
281 Err(thrift::new_application_error(
282 ApplicationErrorKind::MissingResult,
283 status.message.unwrap(),
284 ))
285 }
286 }
287 Err(error) => Err(thrift::new_transport_error(
288 TransportErrorKind::Unknown,
289 error.to_string(),
290 )),
291 }
292 }
293
294 pub fn create_multi_time_series(
296 &mut self,
297 ts_path_vec: Vec<String>,
298 data_type_vec: Vec<i32>,
299 encoding_vec: Vec<i32>,
300 compressor_vec: Vec<i32>,
301 ) -> thrift::Result<()> {
302 trace!("Create multiple time-series");
303 let req = TSCreateMultiTimeseriesReq::new(
304 self.session_id,
305 ts_path_vec,
306 data_type_vec,
307 encoding_vec,
308 compressor_vec,
309 None,
310 None,
311 None,
312 None,
313 );
314 match self.client.create_multi_timeseries(req) {
315 Ok(status) => {
316 if self.is_success(&status) {
317 Ok(())
318 } else {
319 error!("{}", status.message.clone().unwrap());
320 Err(thrift::new_application_error(
321 ApplicationErrorKind::MissingResult,
322 status.message.unwrap(),
323 ))
324 }
325 }
326 Err(error) => Err(thrift::new_transport_error(
327 TransportErrorKind::Unknown,
328 error.to_string(),
329 )),
330 }
331 }
332
333 pub fn delete_time_series(&mut self, path_vec: Vec<String>) -> thrift::Result<()> {
335 trace!("Delete multiple time-series");
336 match self.client.delete_timeseries(self.session_id, path_vec) {
337 Ok(status) => {
338 if self.is_success(&status) {
339 Ok(())
340 } else {
341 error!("{}", status.message.clone().unwrap());
342 Err(thrift::new_application_error(
343 ApplicationErrorKind::MissingResult,
344 status.message.unwrap(),
345 ))
346 }
347 }
348 Err(error) => Err(thrift::new_transport_error(
349 TransportErrorKind::Unknown,
350 error.to_string(),
351 )),
352 }
353 }
354
355 pub fn check_time_series_exist(&mut self, path: &str) {
358 trace!("Check time-series exists");
359 self.query(format!("SHOW TIMESERIES {}", path).as_str());
360 }
361
362 pub fn delete_data(&mut self, path_vec: Vec<String>, timestamp: i64) -> thrift::Result<()> {
364 trace!("Delete data");
365 let req = TSDeleteDataReq::new(self.session_id, path_vec, 0, timestamp);
366 match self.client.delete_data(req) {
367 Ok(status) => {
368 if self.is_success(&status) {
369 Ok(())
370 } else {
371 error!("{}", status.message.clone().unwrap());
372 Err(thrift::new_application_error(
373 ApplicationErrorKind::MissingResult,
374 status.message.unwrap(),
375 ))
376 }
377 }
378 Err(error) => Err(thrift::new_transport_error(
379 TransportErrorKind::Unknown,
380 error.to_string(),
381 )),
382 }
383 }
384
385 pub fn insert_string_records(
388 &mut self,
389 device_ids: Vec<String>,
390 measurements_list: Vec<Vec<String>>,
391 values_list: Vec<Vec<String>>,
392 timestamps: Vec<i64>,
393 ) -> thrift::Result<TSStatus> {
394 let req = TSInsertStringRecordsReq::new(
395 self.session_id,
396 device_ids,
397 measurements_list,
398 values_list,
399 timestamps,
400 );
401 self.client.insert_string_records(req)
402 }
403
404 pub fn insert_record(
407 &mut self,
408 device_id: String,
409 measurements: Vec<String>,
410 values: Vec<u8>,
411 timestamp: i64,
412 ) -> thrift::Result<TSStatus> {
413 let req =
414 TSInsertRecordReq::new(self.session_id, device_id, measurements, values, timestamp);
415 self.client.insert_record(req)
416 }
417
418 pub fn insert_records(
421 &mut self,
422 device_ids: Vec<String>,
423 measurements_list: Vec<Vec<String>>,
424 values_list: Vec<Vec<u8>>,
425 timestamps: Vec<i64>,
426 ) -> thrift::Result<TSStatus> {
427 let req = TSInsertRecordsReq::new(
428 self.session_id,
429 device_ids,
430 measurements_list,
431 values_list,
432 timestamps,
433 );
434 self.client.insert_records(req)
435 }
436
437 pub fn insert_records_of_one_device(
440 &mut self,
441 device_id: String,
442 measurements_list: Vec<Vec<String>>,
443 values_list: Vec<Vec<u8>>,
444 timestamps: Vec<i64>,
445 ) -> thrift::Result<TSStatus> {
446 let req = TSInsertRecordsOfOneDeviceReq::new(
447 self.session_id,
448 device_id,
449 measurements_list,
450 values_list,
451 timestamps,
452 );
453 Ok(self.client.insert_records_of_one_device(req)?)
454 }
455
456 pub fn insert_tablet(
459 &mut self,
460 device_id: String,
461 measurements: Vec<String>,
462 values: Vec<u8>,
463 timestamps: Vec<u8>,
464 types: Vec<i32>,
465 size: i32,
466 ) -> thrift::Result<TSStatus> {
467 trace!("Delete data");
468 let req = TSInsertTabletReq::new(
469 self.session_id,
470 device_id,
471 measurements,
472 values,
473 timestamps,
474 types,
475 size,
476 );
477 self.client.insert_tablet(req)
478 }
479
480 pub fn insert_tablets(
483 &mut self,
484 device_ids: Vec<String>,
485 measurements_list: Vec<Vec<String>>,
486 values_list: Vec<Vec<u8>>,
487 timestamps_list: Vec<Vec<u8>>,
488 types_list: Vec<Vec<i32>>,
489 size_list: Vec<i32>,
490 ) -> thrift::Result<TSStatus> {
491 let req = TSInsertTabletsReq::new(
492 self.session_id,
493 device_ids,
494 measurements_list,
495 values_list,
496 timestamps_list,
497 types_list,
498 size_list,
499 );
500 self.client.insert_tablets(req)
501 }
502
503 pub fn set_time_zone(&mut self, time_zone: &str) -> thrift::Result<()> {
505 trace!("Set time zone");
506 let req = TSSetTimeZoneReq::new(self.session_id, time_zone.to_string());
507 match self.client.set_time_zone(req) {
508 Ok(status) => {
509 if status.code == 200 {
510 Ok(())
511 } else {
512 error!("{}", status.message.clone().unwrap());
513 Err(thrift::new_application_error(
514 ApplicationErrorKind::MissingResult,
515 status.message.unwrap(),
516 ))
517 }
518 }
519 Err(error) => Err(thrift::new_transport_error(
520 TransportErrorKind::Unknown,
521 error.to_string(),
522 )),
523 }
524 }
525
526 pub fn query(&mut self, sql: &str) -> thrift::Result<TSExecuteStatementResp> {
528 debug!("Exec query \"{}\"", &sql);
529 let req = TSExecuteStatementReq::new(
530 self.session_id,
531 sql.to_string(),
532 self.statement_id,
533 self.fetch_size,
534 );
535 match self.client.execute_query_statement(req) {
536 Ok(resp) => {
537 if resp.status.code == 200 {
538 Ok(resp)
539 } else {
540 error!("{}", resp.status.message.clone().unwrap());
541 Err(thrift::new_application_error(
542 ApplicationErrorKind::MissingResult,
543 resp.status.message.unwrap(),
544 ))
545 }
546 }
547 Err(error) => Err(thrift::new_transport_error(
548 TransportErrorKind::Unknown,
549 error.to_string(),
550 )),
551 }
552 }
553
554 pub fn get_time_zone(&mut self) -> thrift::Result<String> {
556 trace!("Get time zone");
557 match self.client.get_time_zone(self.session_id.clone()) {
558 Ok(resp) => {
559 if resp.status.code == 200 {
560 Ok(resp.time_zone)
561 } else {
562 error!("{}", resp.status.message.unwrap());
563 Ok(String::new())
564 }
565 }
566 Err(_) => Ok(String::new()),
567 }
568 }
569
570 pub fn get_properties(&mut self) -> thrift::Result<ServerProperties> {
572 trace!("Get properties");
573 Ok(self.client.get_properties()?)
574 }
575
576 pub fn cancel_operation(&mut self, query_id: i64) -> thrift::Result<TSStatus> {
579 let req = TSCancelOperationReq::new(self.session_id, query_id);
580 self.client.cancel_operation(req)
581 }
582}