1use std::collections::{BTreeMap, HashMap};
21use std::error::Error;
22use std::vec;
23use thrift::transport::TIoChannel;
24use crate::client::common::*;
25
26
27use thrift::{
28 protocol::{
29 TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, TCompactOutputProtocol,
30 TInputProtocol, TOutputProtocol,
31 },
32 transport::{TFramedReadTransport, TFramedWriteTransport, TTcpChannel},
33};
34use typed_builder::TypedBuilder;
35
36use crate::client::rpc::{
37 TSCreateMultiTimeseriesReq, TSCreateTimeseriesReq, IClientRPCServiceSyncClient, TSInsertRecordsReq,
38 TSInsertStringRecordReq, TSInsertTabletReq, TSOpenSessionReq, TSProtocolVersion,
39 TIClientRPCServiceSyncClient,
40};
41use crate::protocal::{
42 TSCompressionType, TSDataType, TSEncoding, FLAG, MULTIPLE_ERROR, NEED_REDIRECTION,
43 SUCCESS_STATUS,
44};
45
46use super::rpc::{
47 TSDeleteDataReq, TSExecuteStatementReq, TSInsertRecordReq, TSInsertRecordsOfOneDeviceReq,
48 TSInsertTabletsReq, TSQueryDataSet, TSSetTimeZoneReq,
49};
50use super::{
51 rpc::{TSCloseSessionReq},
52 RowRecord,
53};
54use super::{DataSet, Dictionary, Result, Session, Value};
55
56const DEFAULT_TIME_ZONE: &str = "Asia/Shanghai";
57
58impl From<TSStatus> for core::result::Result<(), Box<dyn Error>> {
59 fn from(status: TSStatus) -> Self {
60 match status.code {
61 SUCCESS_STATUS | NEED_REDIRECTION => Ok(()),
62 MULTIPLE_ERROR => {
63 let mut messges = String::new();
64 if let Some(sub_status) = status.sub_status {
65 for s in sub_status {
66 if s.code != SUCCESS_STATUS && s.code != NEED_REDIRECTION {
67 if let Some(msg) = s.message {
68 messges.push_str(format!("Code: {}, {}", s.code, msg).as_str());
69 messges.push(';');
70 }
71 }
72 }
73 }
74 if !messges.is_empty() {
75 Err(messges.into())
76 } else {
77 Ok(())
78 }
79 }
80 _ => {
81 if let Some(message) = status.message {
82 Err(format!("code: {}, {}", status.code, message).into())
83 } else {
84 Err(format!("code: {}", status.code).into())
85 }
86 }
87 }
88 }
89}
90
91#[derive(Debug, Clone, TypedBuilder)]
92#[builder(field_defaults(default, setter(into)))]
93
94pub struct Config {
95 #[builder(default = String::from("127.0.0.1"))]
96 pub host: String,
97 #[builder(default = 6667)]
98 pub port: i32,
99 #[builder(default = String::from("root"))]
100 pub username: String,
101 #[builder(default = String::from("root"))]
102 pub password: String,
103 #[builder(default = Some(3000))]
104 pub timeout_ms: Option<i64>,
105 #[builder(default = 1000)]
106 pub fetch_size: i32,
107 #[builder(default = Some(String::from(DEFAULT_TIME_ZONE)))]
108 pub timezone: Option<String>,
109 #[builder(default = false)]
110 pub enable_compression: bool,
111 #[builder(default = TSProtocolVersion::IotdbServiceProtocolV3)]
112 pub protocol_version: TSProtocolVersion,
113 #[builder(default = true)]
114 pub is_align: bool,
115}
116
117impl Default for Config {
118 fn default() -> Self {
119 Self {
120 host: String::from("127.0.0.1"),
121 port: 6667,
122 username: String::from("root"),
123 password: String::from("root"),
124 timeout_ms: Some(30000),
125 fetch_size: 1000,
126 timezone: Some(String::from(DEFAULT_TIME_ZONE)),
127 enable_compression: false,
128 protocol_version: TSProtocolVersion::IotdbServiceProtocolV3,
129 is_align: true,
130 }
131 }
132}
133
134pub struct RpcSession {
135 config: Config,
136 session_id: Option<i64>,
137 statement_id: i64,
138 client: IClientRPCServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>,
139}
140
141impl RpcSession {
142 pub fn new(config: Config) -> Result<Self> {
143 let mut tcp_channel = TTcpChannel::new();
144 let endpint = format!("{}:{}", config.host, config.port);
145
146 tcp_channel.open(&endpint).map_err(|err| {
147 Box::<dyn Error>::from(format!("failed to connect to {}, {:?}", endpint, err))
148 })?;
149
150 let (i_chan, o_chan) = tcp_channel.split()?;
151
152 let (i_prot, o_prot) = (
153 TFramedReadTransport::new(i_chan),
154 TFramedWriteTransport::new(o_chan),
155 );
156
157 let (input_protocol, output_protocol): (Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>) =
158 if config.enable_compression {
159 (
160 Box::new(TCompactInputProtocol::new(i_prot)),
161 Box::new(TCompactOutputProtocol::new(o_prot)),
162 )
163 } else {
164 (
165 Box::new(TBinaryInputProtocol::new(i_prot, true)),
166 Box::new(TBinaryOutputProtocol::new(o_prot, true)),
167 )
168 };
169
170 Ok(Self {
171 config: config,
172 session_id: None,
173 statement_id: -1,
174 client: IClientRPCServiceSyncClient::new(input_protocol, output_protocol),
175 })
176 }
177}
178
179impl<'a> Iterator for RpcDataSet<'a> {
180 type Item = RowRecord;
181
182 fn next(&mut self) -> Option<Self::Item> {
183 if self.has_cached_results() {
184 let mut values: Vec<Value> = Vec::with_capacity(self.column_names.len());
185
186 let ts = self.query_data_set.time.drain(0..8).collect::<Vec<u8>>();
187 self.timestamp = i64::from_be_bytes(ts.try_into().unwrap());
188
189 for column_index in 0..self.query_data_set.value_list.len() {
190 if self.row_index % 8 == 0 {
191 self.bitmaps[column_index] =
192 self.query_data_set.bitmap_list[column_index].remove(0);
193 }
194
195 let null = self.is_null(column_index, self.row_index);
196
197 let column_data = self
198 .query_data_set
199 .value_list
200 .get_mut(column_index)
201 .unwrap();
202 if !null {
203 let original_column_index = self.column_index_map.get(&column_index).unwrap();
204 let data_type = self.data_types.get(*original_column_index).unwrap();
205
206 let mut bytes: Vec<u8> = Vec::new();
207 match data_type {
208 TSDataType::Boolean => {
209 bytes.push(TSDataType::Boolean as u8);
210 bytes.push(column_data.remove(0));
211 }
212 TSDataType::Int32 => {
213 bytes.push(TSDataType::Int32 as u8);
214 bytes.extend(column_data.drain(0..4));
215 }
216 TSDataType::Int64 => {
217 bytes.push(TSDataType::Int64 as u8);
218 bytes.extend(column_data.drain(0..8));
219 }
220 TSDataType::Float => {
221 bytes.push(TSDataType::Float as u8);
222 bytes.extend(column_data.drain(0..4));
223 }
224 TSDataType::Double => {
225 bytes.push(TSDataType::Double as u8);
226 bytes.extend(column_data.drain(0..8));
227 }
228 TSDataType::Text => {
229 bytes.push(TSDataType::Text as u8);
230 let len = i32::from_be_bytes(
231 column_data
232 .drain(0..4)
233 .collect::<Vec<u8>>()
234 .try_into()
235 .unwrap(),
236 );
237 bytes.extend(column_data.drain(0..len as usize).collect::<Vec<_>>());
238 }
239 }
240 values.push(Value::from(bytes));
241 } else {
242 values.push(Value::Null);
243 }
244 }
245
246 self.row_index += 1;
247
248 let mut output_values: Vec<Value> = Vec::with_capacity(if self.is_ignore_timestamp() {
249 self.get_column_names().len()
250 } else {
251 self.get_column_names().len() + 1
252 });
253 if !self.is_ignore_timestamp() {
254 output_values.push(Value::Int64(self.timestamp));
255 }
256
257 output_values.extend(self.column_names.iter().map(|column_name| {
258 values[*(self.column_name_index_map.get(column_name).unwrap()) as usize].clone()
259 }));
260 Some(RowRecord {
261 timestamp: self.timestamp,
262 values: output_values,
263 })
264 } else {
265 None
266 }
267 }
268}
269
270pub(crate) struct RpcDataSet<'a> {
271 session: &'a mut RpcSession,
272 statement: String,
273 query_id: i64,
274 is_ignore_time_stamp: Option<bool>,
275 timestamp: i64,
276 column_names: Vec<String>,
277 data_types: Vec<TSDataType>,
278 query_data_set: TSQueryDataSet,
279 column_index_map: HashMap<usize, usize>,
280 column_name_index_map: BTreeMap<String, i32>,
281 bitmaps: Vec<u8>,
282 row_index: usize,
283 closed: bool,
284}
285
286impl<'a> RpcDataSet<'a> {
287 fn is_null(&self, column_index: usize, row_index: usize) -> bool {
288 let bitmap = self.bitmaps[column_index];
289 let shift = row_index % 8;
290 ((FLAG >> shift) & (bitmap & 0xff)) == 0
291 }
292
293 fn has_cached_results(&mut self) -> bool {
294 if self.closed {
295 return false;
296 }
297 if self.query_data_set.time.is_empty() {
298 if let Some(session_id) = self.session.session_id {
299 match self
301 .session
302 .client
303 .fetch_results(super::rpc::TSFetchResultsReq {
304 session_id,
305 statement: self.statement.clone(),
306 fetch_size: self.session.config.fetch_size,
307 query_id: self.query_id,
308 is_align: self.session.config.is_align,
309 timeout: self.session.config.timeout_ms,
310 }) {
311 Ok(resp) => {
312 let status = resp.status;
313 let res: Result<()> = status.into();
314
315 match res {
316 Ok(_) => {
317 if resp.has_result_set {
318 if let Some(query_data_set) = resp.query_data_set {
320 self.query_data_set = query_data_set;
321 self.row_index = 0;
322 }
323 } else {
324 self.close();
326 return false;
327 }
328 }
329 Err(err) => {
330 eprint!("An error occurred when fetch result: {}", err);
331 return false;
332 }
333 }
334 }
335 Err(err) => {
336 eprint!("An error occurred when fetch result: {}", err);
337 return false;
338 }
339 }
340 }
341 }
342 !self.query_data_set.time.is_empty()
343 }
344
345 pub fn close(&mut self) {
346 if !self.closed {
347 if let Some(session_id) = self.session.session_id {
348 match self
349 .session
350 .client
351 .close_operation(super::rpc::TSCloseOperationReq {
352 session_id,
353 query_id: Some(self.query_id),
354 statement_id: Some(self.session.statement_id),
355 }) {
356 Ok(status) => {
357 let res: Result<()> = status.into();
358 match res {
359 Ok(_) => {
360 self.closed = true;
361 }
362 Err(err) => {
363 eprint!("An error occurred when closing dataset {:?}", err)
364 }
365 }
366 }
367 Err(err) => {
368 eprint!("An error occurred when closing dataset {:?}", err)
369 }
370 }
371 }
372 }
373 }
374}
375
376impl<'a> Drop for RpcDataSet<'a> {
377 fn drop(&mut self) {
378 self.close();
379 }
380}
381
382impl<'a> DataSet for RpcDataSet<'a> {
383 fn get_column_names(&self) -> Vec<String> {
384 if self.is_ignore_timestamp() {
385 self.column_names.clone()
386 } else {
387 let mut column_names = vec![String::from("Time")];
389 column_names.extend(self.column_names.clone());
390 column_names
391 }
392 }
393
394 fn get_data_types(&self) -> Vec<TSDataType> {
395 if self.is_ignore_timestamp() {
396 self.data_types.clone()
397 } else {
398 let mut column_types = vec![TSDataType::Int64];
400 column_types.extend(self.data_types.clone());
401 column_types
402 }
403 }
404
405 fn is_ignore_timestamp(&self) -> bool {
406 if let Some(v) = self.is_ignore_time_stamp {
407 v
408 } else {
409 false
410 }
411 }
412}
413
414fn fire_closed_error() -> Result<()> {
415 Err("Operation can't be performed, the session is closed.".into())
416}
417
418impl<'a> Session<'a> for RpcSession {
419 fn open(&mut self) -> Result<()> {
420 let resp = self.client.open_session(TSOpenSessionReq::new(
421 self.config.protocol_version,
422 self.config
423 .timezone
424 .clone()
425 .unwrap_or_else(|| DEFAULT_TIME_ZONE.to_string()),
426 self.config.username.clone(),
427 self.config.password.clone(),
428 None,
429 ))?;
430 let res: Result<()> = resp.status.into();
431 res?;
432 self.session_id = resp.session_id;
433
434 let result = self.client.request_statement_id(resp.session_id.unwrap());
436 match result {
437 Ok(stId) => {
438 self.statement_id = stId;
439 }
440 Err(_) => {}
441 }
442
443 Ok(())
444 }
445
446 fn close(&mut self) -> Result<()> {
447 if let Some(session_id) = self.session_id {
448 let status = self
449 .client
450 .close_session(TSCloseSessionReq::new(session_id))?;
451 self.session_id = None;
452 status.into()
453 } else {
454 fire_closed_error()
455 }
456 }
457
458 fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
459 if let Some(session_id) = self.session_id {
460 let status = self
461 .client
462 .set_storage_group(session_id, storage_group_id.into())?;
463 status.into()
464 } else {
465 fire_closed_error()
466 }
467 }
468
469 fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
470 self.delete_storage_groups(vec![storage_group_id])
471 }
472
473 fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()> {
474 if let Some(session_id) = self.session_id {
475 let status = self.client.delete_storage_groups(
476 session_id,
477 storage_group_ids.iter().map(ToString::to_string).collect(),
478 )?;
479 status.into()
480 } else {
481 fire_closed_error()
482 }
483 }
484
485 fn create_timeseries<T>(
486 &mut self,
487 path: &str,
488 data_type: crate::protocal::TSDataType,
489 encoding: crate::protocal::TSEncoding,
490 compressor: crate::protocal::TSCompressionType,
491 props: T,
492 attributes: T,
493 tags: T,
494 measurement_alias: Option<String>,
495 ) -> Result<()>
496 where
497 T: Into<Option<Dictionary>>,
498 {
499 if let Some(session_id) = self.session_id {
500 self.client
501 .create_timeseries(TSCreateTimeseriesReq::new(
502 session_id,
503 path.to_string(),
504 data_type.into(),
505 encoding.into(),
506 compressor.into(),
507 props,
508 tags,
509 attributes,
510 measurement_alias,
511 ))?
512 .into()
513 } else {
514 fire_closed_error()
515 }
516 }
517
518 fn create_multi_timeseries<T>(
519 &mut self,
520 paths: Vec<&str>,
521 data_types: Vec<crate::protocal::TSDataType>,
522 encodings: Vec<crate::protocal::TSEncoding>,
523 compressors: Vec<crate::protocal::TSCompressionType>,
524 props_list: T,
525 attributes_list: T,
526 tags_list: T,
527 measurement_alias_list: Option<Vec<String>>,
528 ) -> Result<()>
529 where
530 T: Into<Option<Vec<Dictionary>>>,
531 {
532 if let Some(session_id) = self.session_id {
533 let status = self
534 .client
535 .create_multi_timeseries(TSCreateMultiTimeseriesReq::new(
536 session_id,
537 paths.iter().map(ToString::to_string).collect(),
538 data_types.into_iter().map(TSDataType::into).collect(),
539 encodings.into_iter().map(TSEncoding::into).collect(),
540 compressors
541 .into_iter()
542 .map(TSCompressionType::into)
543 .collect(),
544 props_list,
545 attributes_list,
546 tags_list,
547 measurement_alias_list,
548 ))?;
549 status.into()
550 } else {
551 fire_closed_error()
552 }
553 }
554
555 fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()> {
556 if let Some(session_id) = self.session_id {
557 let status = self
558 .client
559 .delete_timeseries(session_id, paths.iter().map(ToString::to_string).collect())?;
560 status.into()
561 } else {
562 fire_closed_error()
563 }
564 }
565
566 fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()> {
567 if let Some(session_id) = self.session_id {
568 let status = self.client.delete_data(TSDeleteDataReq::new(
569 session_id,
570 paths.into_iter().map(ToString::to_string).collect(),
571 start_time,
572 end_time,
573 ))?;
574 status.into()
575 } else {
576 fire_closed_error()
577 }
578 }
579
580 fn insert_string_record<T>(
581 &mut self,
582 device_id: &str,
583 measurements: Vec<&str>,
584 values: Vec<&str>,
585 timestamp: i64,
586 is_aligned: T,
587 timeout: i64
588 ) -> Result<()>
589 where
590 T: Into<Option<bool>>,
591 {
592 if let Some(session_id) = self.session_id {
593 let status = self
594 .client
595 .insert_string_record(TSInsertStringRecordReq::new(
596 session_id,
597 device_id.to_string(),
598 measurements.iter().map(ToString::to_string).collect(),
599 values.iter().map(ToString::to_string).collect(),
600 timestamp,
601 is_aligned,
602 timeout
603 ))?;
604 status.into()
605 } else {
606 fire_closed_error()
607 }
608 }
609
610 fn get_time_zone(&mut self) -> Result<String> {
611 if let Some(session_id) = self.session_id {
612 let resp = self.client.get_time_zone(session_id)?;
613 let res: Result<()> = resp.status.into();
614 res?;
615 Ok(resp.time_zone)
616 } else {
617 Err("Operation can't be performed, the session is closed.".into())
618 }
619 }
620
621 fn set_time_zone(&mut self, time_zone: &str) -> Result<()> {
622 if let Some(session_id) = self.session_id {
623 self.client
624 .set_time_zone(TSSetTimeZoneReq::new(session_id, time_zone.to_string()))?
625 .into()
626 } else {
627 fire_closed_error()
628 }
629 }
630
631 fn execute_statement<T>(
632 &'a mut self,
633 statement: &str,
634 timeout_ms: T,
635 ) -> Result<Box<dyn 'a + DataSet>>
636 where
637 T: Into<Option<i64>>,
638 {
639 if let Some(session_id) = self.session_id {
640 let resp = self.client.execute_statement(TSExecuteStatementReq {
641 session_id,
642 statement: statement.to_string(),
643 statement_id: self.statement_id,
644 fetch_size: Some(self.config.fetch_size),
645 timeout: timeout_ms.into(),
646 enable_redirect_query: None,
647 jdbc_query: None,
648 })?;
649 let status = resp.status;
650 let code = status.code;
651 if code == SUCCESS_STATUS {
652 {
653 if let (Some(column_names), Some(data_type_list)) =
654 (resp.columns, resp.data_type_list)
655 {
656 let column_name_index_map = match resp.column_name_index_map {
657 Some(map) => map,
658 None => {
659 let mut map: BTreeMap<String, i32> = BTreeMap::new();
660 for (index, name) in column_names.iter().enumerate() {
661 map.insert(name.to_string(), index as i32);
662 }
663 map
664 }
665 };
666
667 let data_types: Vec<TSDataType> =
668 data_type_list.iter().map(TSDataType::from).collect();
669
670 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
671
672 let column_count = column_names.len();
673 for (index, name) in column_names.iter().enumerate() {
674 column_index_map
675 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
676 }
677
678 Ok(Box::new(RpcDataSet {
679 session: self,
680 statement: statement.to_string(),
681 query_id: resp.query_id.unwrap(),
682 timestamp: -1,
683 is_ignore_time_stamp: resp.ignore_time_stamp,
684 query_data_set: resp.query_data_set.unwrap(),
685 column_names,
686 data_types,
687 bitmaps: vec![0_u8; column_count],
688 row_index: 0,
689 column_index_map,
690 column_name_index_map,
691 closed: false,
692 }))
693 } else {
694 Err("Can't get resources on execute_statement".into())
695 }
696 }
697 } else {
698 let res: Result<()> = status.into();
699 res?;
700 Err(format!("Unknow, code: {}", code).into())
701 }
702 } else {
703 Err("Operation can't be performed, the session is closed.".into())
704 }
705 }
706
707 fn execute_query_statement<T>(
708 &'a mut self,
709 statement: &str,
710 timeout_ms: T,
711 ) -> Result<Box<dyn 'a + DataSet>>
712 where
713 T: Into<Option<i64>>,
714 {
715 if let Some(session_id) = self.session_id {
716 let resp = self.client.execute_query_statement(TSExecuteStatementReq {
717 session_id,
718 statement: statement.to_string(),
719 statement_id: self.statement_id,
720 fetch_size: Some(self.config.fetch_size),
721 timeout: timeout_ms.into(),
722 enable_redirect_query: None,
723 jdbc_query: None,
724 })?;
725 let status = resp.status;
726 let code = status.code;
727 if code == SUCCESS_STATUS {
728 let column_names: Vec<String> = resp.columns.unwrap();
729
730 let column_name_index_map = match resp.column_name_index_map {
731 Some(v) => v,
732 None => {
733 let mut map: BTreeMap<String, i32> = BTreeMap::new();
734 for (index, name) in column_names.iter().enumerate() {
735 map.insert(name.to_string(), index as i32);
736 }
737 map
738 }
739 };
740
741 let data_types: Vec<TSDataType> = resp
742 .data_type_list
743 .unwrap()
744 .iter()
745 .map(TSDataType::from)
746 .collect();
747
748 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
749
750 let column_count = column_names.len();
751 for (index, name) in column_names.iter().enumerate() {
752 column_index_map
753 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
754 }
755 let dataset = RpcDataSet {
756 session: self,
757 statement: statement.to_string(),
758 query_id: resp.query_id.unwrap(),
759 timestamp: -1,
760 is_ignore_time_stamp: resp.ignore_time_stamp,
761 query_data_set: resp.query_data_set.unwrap(),
762 column_names,
763 data_types,
764 bitmaps: vec![0_u8; column_count],
765 row_index: 0,
766 column_index_map,
767 column_name_index_map,
768 closed: false,
769 };
770 Ok(Box::new(dataset))
771 } else {
772 let res: Result<()> = status.into();
773 res?;
774 Err(format!("Unknow, code: {}", code).into())
775 }
776 } else {
777 Err("Operation can't be performed, the session is closed.".into())
778 }
779 }
780
781 fn insert_record<T>(
782 &mut self,
783 device_id: &str,
784 measurements: Vec<&str>,
785 values: Vec<Value>,
786 timestamp: i64,
787 is_aligned: T,
788 ) -> Result<()>
789 where
790 T: Into<Option<bool>>,
791 {
792 if let Some(session_id) = self.session_id {
793 let mut values_bytes: Vec<u8> = Vec::new();
794 values.iter().for_each(|v| {
795 let mut value_bytes: Vec<u8> = v.into();
796 values_bytes.append(&mut value_bytes);
797 });
798 let status = self.client.insert_record(TSInsertRecordReq::new(
799 session_id,
800 device_id.to_string(),
801 measurements.iter().map(ToString::to_string).collect(),
802 values_bytes,
803 timestamp,
804 is_aligned,
805 ))?;
806 status.into()
807 } else {
808 fire_closed_error()
809 }
810 }
811
812 fn insert_records_of_one_device(
813 &mut self,
814 device_id: &str,
815 timestamps: Vec<i64>,
816 measurements: Vec<Vec<&str>>,
817 values: Vec<Vec<super::Value>>,
818 sorted: bool,
819 ) -> Result<()> {
820 let mut sorted_timestamps = timestamps;
821 let mut sorted_measurements = measurements;
822 let mut sorted_values = values;
823
824 if !sorted {
825 let permutation = permutation::sort(&sorted_timestamps[..]);
826 sorted_timestamps = permutation.apply_slice(&sorted_timestamps[..]);
827 sorted_measurements = permutation.apply_slice(&sorted_measurements[..]);
828 sorted_values = permutation.apply_slice(&sorted_values[..]);
829 }
830
831 if let Some(session_id) = self.session_id {
832 let values_list = sorted_values
833 .iter()
834 .map(|vec| {
835 let mut values: Vec<u8> = Vec::new();
836 for value in vec.iter() {
837 let mut value_data: Vec<u8> = value.into();
838 values.append(&mut value_data);
839 }
840 values
841 })
842 .collect();
843 let status =
844 self.client
845 .insert_records_of_one_device(TSInsertRecordsOfOneDeviceReq::new(
846 session_id,
847 device_id.to_string(),
848 sorted_measurements
849 .iter()
850 .map(|vec| vec.iter().map(ToString::to_string).collect())
851 .collect(),
852 values_list,
853 sorted_timestamps,
854 false,
855 ))?;
856 status.into()
857 } else {
858 fire_closed_error()
859 }
860 }
861
862 fn insert_records(
863 &mut self,
864 prefix_path: Vec<&str>,
865 measurements: Vec<Vec<&str>>,
866 values: Vec<Vec<super::Value>>,
867 timestamps: Vec<i64>,
868 ) -> Result<()> {
869 if let Some(session_id) = self.session_id {
870 let values_list = values
871 .iter()
872 .map(|vec| {
873 let mut values: Vec<u8> = Vec::new();
874 for value in vec.iter() {
875 let mut value_data: Vec<u8> = value.into();
876 values.append(&mut value_data);
877 }
878 values
879 })
880 .collect();
881 let status = self.client.insert_records(TSInsertRecordsReq {
882 session_id,
883 prefix_paths: prefix_path.iter().map(ToString::to_string).collect(),
884 measurements_list: measurements
885 .iter()
886 .map(|ms| ms.iter().map(ToString::to_string).collect())
887 .collect(),
888 values_list,
889 timestamps,
890 is_aligned: None,
891 })?;
892 status.into()
893 } else {
894 fire_closed_error()
895 }
896 }
897
898 fn insert_tablet(&mut self, tablet: &super::Tablet) -> Result<()> {
899 if let Some(session_id) = self.session_id {
900 let mut timestamps_list: Vec<u8> = Vec::with_capacity(tablet.timestamps.len() * 8);
901 tablet
902 .timestamps
903 .iter()
904 .for_each(|ts| timestamps_list.append(&mut ts.to_be_bytes().to_vec()));
905
906 let status = self.client.insert_tablet(TSInsertTabletReq {
907 session_id,
908 prefix_path: tablet.get_prefix_path(),
909 measurements: tablet
910 .measurement_schemas
911 .iter()
912 .map(|f| f.measurement.to_string())
913 .collect(),
914 values: tablet.into(),
915 timestamps: timestamps_list,
916 types: tablet
917 .get_measurement_schemas()
918 .into_iter()
919 .map(|measurement_schema| measurement_schema.data_type.into())
920 .collect(),
921 size: tablet.get_row_count() as i32,
922 is_aligned: Some(false),
923 })?;
924 status.into()
925 } else {
926 fire_closed_error()
927 }
928 }
929
930 fn insert_tablets(&mut self, tablets: Vec<&super::Tablet>) -> Result<()> {
931 if let Some(session_id) = self.session_id {
932 let status = self.client.insert_tablets(TSInsertTabletsReq {
933 session_id,
934 prefix_paths: tablets.iter().map(|t| t.get_prefix_path()).collect(),
935 measurements_list: tablets
936 .iter()
937 .map(|tablet| {
938 tablet
939 .measurement_schemas
940 .iter()
941 .map(|f| f.measurement.to_string())
942 .collect()
943 })
944 .collect(),
945 values_list: tablets.iter().map(|tablet| Into::into(*tablet)).collect(),
946 timestamps_list: tablets
947 .iter()
948 .map(|tablet| {
949 let mut ts_item: Vec<u8> = Vec::new();
950 tablet
951 .timestamps
952 .iter()
953 .for_each(|ts| ts_item.append(&mut ts.to_be_bytes().to_vec()));
954 ts_item
955 })
956 .collect(),
957 types_list: tablets
958 .iter()
959 .map(|tablet| {
960 tablet
961 .get_measurement_schemas()
962 .into_iter()
963 .map(|f| {
964 let t: i32 = f.data_type.into();
965 t
966 })
967 .collect()
968 })
969 .collect(),
970 size_list: tablets
971 .iter()
972 .map(|tablet| tablet.get_row_count() as i32)
973 .collect(),
974 is_aligned: Some(false),
975 })?;
976 status.into()
977 } else {
978 fire_closed_error()
979 }
980 }
981
982 fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()> {
983 if let Some(session_id) = self.session_id {
984 let status =
985 self.client
986 .execute_batch_statement(super::rpc::TSExecuteBatchStatementReq {
987 session_id,
988 statements: statemens.iter().map(ToString::to_string).collect(),
989 })?;
990 status.into()
991 } else {
992 fire_closed_error()
993 }
994 }
995
996 fn execute_raw_data_query(
997 &'a mut self,
998 paths: Vec<&str>,
999 start_time: i64,
1000 end_time: i64,
1001 timeout_ms: i64
1002 ) -> Result<Box<dyn 'a + DataSet>> {
1003 if let Some(session_id) = self.session_id {
1004 let resp = self
1005 .client
1006 .execute_raw_data_query(super::rpc::TSRawDataQueryReq {
1007 session_id,
1008 paths: paths.iter().map(ToString::to_string).collect(),
1009 fetch_size: Some(self.config.fetch_size),
1010 start_time,
1011 end_time,
1012 statement_id: self.statement_id,
1013 enable_redirect_query: None,
1014 jdbc_query: None,
1015 timeout: Option::from(timeout_ms),
1016 legal_path_nodes: Option::from(false)
1017 })?;
1018 let status = resp.status;
1019 let code = status.code;
1020 if code == SUCCESS_STATUS {
1021 if let (Some(query_data_set), Some(column_names), Some(data_types_list)) =
1022 (resp.query_data_set, resp.columns, resp.data_type_list)
1023 {
1024 let column_name_index_map = match resp.column_name_index_map {
1025 Some(v) => v,
1026 None => {
1027 let mut map: BTreeMap<String, i32> = BTreeMap::new();
1028 for (index, name) in column_names.iter().enumerate() {
1029 map.insert(name.to_string(), index as i32);
1030 }
1031 map
1032 }
1033 };
1034
1035 let data_types: Vec<TSDataType> =
1036 data_types_list.iter().map(TSDataType::from).collect();
1037
1038 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1039
1040 let column_count = column_names.len();
1041 for (index, name) in column_names.iter().enumerate() {
1042 column_index_map
1043 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1044 }
1045
1046 Ok(Box::new(RpcDataSet {
1047 session: self,
1048 statement: "".to_string(),
1049 query_id: resp.query_id.unwrap(),
1050 timestamp: -1,
1051 is_ignore_time_stamp: resp.ignore_time_stamp,
1052 query_data_set,
1053 column_names,
1054 data_types,
1055 bitmaps: vec![0_u8; column_count],
1056 row_index: 0,
1057 column_index_map,
1058 column_name_index_map,
1059 closed: false,
1060 }))
1061 } else {
1062 Err("Did't get the result.".into())
1063 }
1064 } else {
1065 let res: Result<()> = status.into();
1066 res?;
1067 Err(format!("Unknow, code: {}", code).into())
1068 }
1069 } else {
1070 Err("Operation can't be performed, the session is closed.".into())
1071 }
1072 }
1073
1074 fn execute_update_statement(
1075 &'a mut self,
1076 statement: &str,
1077 ) -> Result<Option<Box<dyn 'a + DataSet>>> {
1078 if let Some(session_id) = self.session_id {
1079 let resp = self
1080 .client
1081 .execute_update_statement(TSExecuteStatementReq {
1082 session_id,
1083 statement: statement.to_string(),
1084 statement_id: self.statement_id,
1085 fetch_size: Some(self.config.fetch_size),
1086 timeout: self.config.timeout_ms,
1087 enable_redirect_query: None,
1088 jdbc_query: None,
1089 })?;
1090 let status = resp.status;
1091 let code = status.code;
1092 if code == SUCCESS_STATUS {
1093 if let (Some(query_data_set), Some(column_names), Some(data_type_list)) =
1094 (resp.query_data_set, resp.columns, resp.data_type_list)
1095 {
1096 let column_name_index_map = match resp.column_name_index_map {
1097 Some(v) => v,
1098 None => {
1099 let mut map: BTreeMap<String, i32> = BTreeMap::new();
1100 for (index, name) in column_names.iter().enumerate() {
1101 map.insert(name.to_string(), index as i32);
1102 }
1103 map
1104 }
1105 };
1106
1107 let data_types: Vec<TSDataType> =
1108 data_type_list.iter().map(TSDataType::from).collect();
1109
1110 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1111
1112 let column_count = column_names.len();
1113 for (index, name) in column_names.iter().enumerate() {
1114 column_index_map
1115 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1116 }
1117
1118 Ok(Some(Box::new(RpcDataSet {
1119 session: self,
1120 statement: statement.to_string(),
1121 query_id: resp.query_id.unwrap(),
1122 timestamp: -1,
1123 is_ignore_time_stamp: resp.ignore_time_stamp,
1124 query_data_set,
1125 column_names,
1126 data_types,
1127 bitmaps: vec![0_u8; column_count],
1128 row_index: 0,
1129 column_index_map,
1130 column_name_index_map,
1131 closed: false,
1132 })))
1133 } else {
1134 Ok(None)
1135 }
1136 } else {
1137 let res: Result<()> = status.into();
1138 res?;
1139 Err(format!("Unknow, code: {}", code).into())
1140 }
1141 } else {
1142 Err("Operation can't be performed, the session is closed.".into())
1143 }
1144 }
1145}
1146
1147impl Drop for RpcSession {
1148 fn drop(&mut self) {
1149 if let Some(session_id) = self.session_id {
1150 self.close().unwrap_or_else(|err| {
1151 eprint!("error closing the session {}, reason {}", session_id, err)
1152 });
1153 }
1154 }
1155}