1use std::collections::{BTreeMap, HashMap};
21use std::error::Error;
22use std::vec;
23use thrift::transport::TIoChannel;
24
25use thrift::{
26 protocol::{
27 TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, TCompactOutputProtocol,
28 TInputProtocol, TOutputProtocol,
29 },
30 transport::{TFramedReadTransport, TFramedWriteTransport, TTcpChannel},
31};
32use typed_builder::TypedBuilder;
33
34use crate::client::rpc::{
35 TSCreateMultiTimeseriesReq, TSCreateTimeseriesReq, TSIServiceSyncClient, TSInsertRecordsReq,
36 TSInsertStringRecordReq, TSInsertTabletReq, TSOpenSessionReq, TSProtocolVersion,
37 TTSIServiceSyncClient,
38};
39use crate::protocal::{
40 TSCompressionType, TSDataType, TSEncoding, FLAG, MULTIPLE_ERROR, NEED_REDIRECTION,
41 SUCCESS_STATUS,
42};
43
44use super::rpc::{
45 TSDeleteDataReq, TSExecuteStatementReq, TSInsertRecordReq, TSInsertRecordsOfOneDeviceReq,
46 TSInsertTabletsReq, TSQueryDataSet, TSSetTimeZoneReq,
47};
48use super::{
49 rpc::{TSCloseSessionReq, TSStatus},
50 RowRecord,
51};
52use super::{DataSet, Dictionary, Result, Session, Value};
53
54const DEFAULT_TIME_ZONE: &str = "Asia/Shanghai";
55
56impl From<TSStatus> for core::result::Result<(), Box<dyn Error>> {
57 fn from(status: TSStatus) -> Self {
58 match status.code {
59 SUCCESS_STATUS | NEED_REDIRECTION => Ok(()),
60 MULTIPLE_ERROR => {
61 let mut messges = String::new();
62 if let Some(sub_status) = status.sub_status {
63 for s in sub_status {
64 if s.code != SUCCESS_STATUS && s.code != NEED_REDIRECTION {
65 if let Some(msg) = s.message {
66 messges.push_str(format!("Code: {}, {}", s.code, msg).as_str());
67 messges.push(';');
68 }
69 }
70 }
71 }
72 if !messges.is_empty() {
73 Err(messges.into())
74 } else {
75 Ok(())
76 }
77 }
78 _ => {
79 if let Some(message) = status.message {
80 Err(format!("code: {}, {}", status.code, message).into())
81 } else {
82 Err(format!("code: {}", status.code).into())
83 }
84 }
85 }
86 }
87}
88
89#[derive(Debug, Clone, TypedBuilder)]
90#[builder(field_defaults(default, setter(into)))]
91
92pub struct Config {
93 #[builder(default = String::from("127.0.0.1"))]
94 pub host: String,
95 #[builder(default = 6667)]
96 pub port: i32,
97 #[builder(default = String::from("root"))]
98 pub username: String,
99 #[builder(default = String::from("root"))]
100 pub password: String,
101 #[builder(default = Some(3000))]
102 pub timeout_ms: Option<i64>,
103 #[builder(default = 1000)]
104 pub fetch_size: i32,
105 #[builder(default = Some(String::from(DEFAULT_TIME_ZONE)))]
106 pub timezone: Option<String>,
107 #[builder(default = false)]
108 pub enable_compression: bool,
109 #[builder(default = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3)]
110 pub protocol_version: TSProtocolVersion,
111 #[builder(default = true)]
112 pub is_align: bool,
113}
114
115impl Default for Config {
116 fn default() -> Self {
117 Self {
118 host: String::from("127.0.0.1"),
119 port: 6667,
120 username: String::from("root"),
121 password: String::from("root"),
122 timeout_ms: Some(30000),
123 fetch_size: 1000,
124 timezone: Some(String::from(DEFAULT_TIME_ZONE)),
125 enable_compression: false,
126 protocol_version: TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3,
127 is_align: true,
128 }
129 }
130}
131
132pub struct RpcSession {
133 config: Config,
134 session_id: Option<i64>,
135 statement_id: i64,
136 client: TSIServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>,
137}
138
139impl RpcSession {
140 pub fn new(config: Config) -> Result<Self> {
141 let mut tcp_channel = TTcpChannel::new();
142 let endpint = format!("{}:{}", config.host, config.port);
143
144 tcp_channel.open(&endpint).map_err(|err| {
145 Box::<dyn Error>::from(format!("failed to connect to {}, {:?}", endpint, err))
146 })?;
147
148 let (i_chan, o_chan) = tcp_channel.split()?;
149
150 let (i_prot, o_prot) = (
151 TFramedReadTransport::new(i_chan),
152 TFramedWriteTransport::new(o_chan),
153 );
154
155 let (input_protocol, output_protocol): (Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>) =
156 if config.enable_compression {
157 (
158 Box::new(TCompactInputProtocol::new(i_prot)),
159 Box::new(TCompactOutputProtocol::new(o_prot)),
160 )
161 } else {
162 (
163 Box::new(TBinaryInputProtocol::new(i_prot, true)),
164 Box::new(TBinaryOutputProtocol::new(o_prot, true)),
165 )
166 };
167
168 Ok(Self {
169 config: config,
170 session_id: None,
171 statement_id: -1,
172 client: TSIServiceSyncClient::new(input_protocol, output_protocol),
173 })
174 }
175}
176
177impl<'a> Iterator for RpcDataSet<'a> {
178 type Item = RowRecord;
179
180 fn next(&mut self) -> Option<Self::Item> {
181 if self.has_cached_results() {
182 let mut values: Vec<Value> = Vec::with_capacity(self.column_names.len());
183
184 let ts = self.query_data_set.time.drain(0..8).collect::<Vec<u8>>();
185 self.timestamp = i64::from_be_bytes(ts.try_into().unwrap());
186
187 for column_index in 0..self.query_data_set.value_list.len() {
188 if self.row_index % 8 == 0 {
189 self.bitmaps[column_index] =
190 self.query_data_set.bitmap_list[column_index].remove(0);
191 }
192
193 let null = self.is_null(column_index, self.row_index);
194
195 let column_data = self
196 .query_data_set
197 .value_list
198 .get_mut(column_index)
199 .unwrap();
200 if !null {
201 let original_column_index = self.column_index_map.get(&column_index).unwrap();
202 let data_type = self.data_types.get(*original_column_index).unwrap();
203
204 let mut bytes: Vec<u8> = Vec::new();
205 match data_type {
206 TSDataType::Boolean => {
207 bytes.push(TSDataType::Boolean as u8);
208 bytes.push(column_data.remove(0));
209 }
210 TSDataType::Int32 => {
211 bytes.push(TSDataType::Int32 as u8);
212 bytes.extend(column_data.drain(0..4));
213 }
214 TSDataType::Int64 => {
215 bytes.push(TSDataType::Int64 as u8);
216 bytes.extend(column_data.drain(0..8));
217 }
218 TSDataType::Float => {
219 bytes.push(TSDataType::Float as u8);
220 bytes.extend(column_data.drain(0..4));
221 }
222 TSDataType::Double => {
223 bytes.push(TSDataType::Double as u8);
224 bytes.extend(column_data.drain(0..8));
225 }
226 TSDataType::Text => {
227 bytes.push(TSDataType::Text as u8);
228 let len = i32::from_be_bytes(
229 column_data
230 .drain(0..4)
231 .collect::<Vec<u8>>()
232 .try_into()
233 .unwrap(),
234 );
235 bytes.extend(column_data.drain(0..len as usize).collect::<Vec<_>>());
236 }
237 }
238 values.push(Value::from(bytes));
239 } else {
240 values.push(Value::Null);
241 }
242 }
243
244 self.row_index += 1;
245
246 let mut output_values: Vec<Value> = Vec::with_capacity(if self.is_ignore_timestamp() {
247 self.get_column_names().len()
248 } else {
249 self.get_column_names().len() + 1
250 });
251 if !self.is_ignore_timestamp() {
252 output_values.push(Value::Int64(self.timestamp));
253 }
254
255 output_values.extend(self.column_names.iter().map(|column_name| {
256 values[*(self.column_name_index_map.get(column_name).unwrap()) as usize].clone()
257 }));
258 Some(RowRecord {
259 timestamp: self.timestamp,
260 values: output_values,
261 })
262 } else {
263 None
264 }
265 }
266}
267
268pub(crate) struct RpcDataSet<'a> {
269 session: &'a mut RpcSession,
270 statement: String,
271 query_id: i64,
272 is_ignore_time_stamp: Option<bool>,
273 timestamp: i64,
274 column_names: Vec<String>,
275 data_types: Vec<TSDataType>,
276 query_data_set: TSQueryDataSet,
277 column_index_map: HashMap<usize, usize>,
278 column_name_index_map: BTreeMap<String, i32>,
279 bitmaps: Vec<u8>,
280 row_index: usize,
281 closed: bool,
282}
283
284impl<'a> RpcDataSet<'a> {
285 fn is_null(&self, column_index: usize, row_index: usize) -> bool {
286 let bitmap = self.bitmaps[column_index];
287 let shift = row_index % 8;
288 ((FLAG >> shift) & (bitmap & 0xff)) == 0
289 }
290
291 fn has_cached_results(&mut self) -> bool {
292 if self.closed {
293 return false;
294 }
295 if self.query_data_set.time.is_empty() {
296 if let Some(session_id) = self.session.session_id {
297 match self
299 .session
300 .client
301 .fetch_results(super::rpc::TSFetchResultsReq {
302 session_id,
303 statement: self.statement.clone(),
304 fetch_size: self.session.config.fetch_size,
305 query_id: self.query_id,
306 is_align: self.session.config.is_align,
307 timeout: self.session.config.timeout_ms,
308 }) {
309 Ok(resp) => {
310 let status = resp.status;
311 let res: Result<()> = status.into();
312
313 match res {
314 Ok(_) => {
315 if resp.has_result_set {
316 if let Some(query_data_set) = resp.query_data_set {
318 self.query_data_set = query_data_set;
319 self.row_index = 0;
320 }
321 } else {
322 self.close();
324 return false;
325 }
326 }
327 Err(err) => {
328 eprint!("An error occurred when fetch result: {}", err);
329 return false;
330 }
331 }
332 }
333 Err(err) => {
334 eprint!("An error occurred when fetch result: {}", err);
335 return false;
336 }
337 }
338 }
339 }
340 !self.query_data_set.time.is_empty()
341 }
342
343 pub fn close(&mut self) {
344 if !self.closed {
345 if let Some(session_id) = self.session.session_id {
346 match self
347 .session
348 .client
349 .close_operation(super::rpc::TSCloseOperationReq {
350 session_id,
351 query_id: Some(self.query_id),
352 statement_id: Some(self.session.statement_id),
353 }) {
354 Ok(status) => {
355 let res: Result<()> = status.into();
356 match res {
357 Ok(_) => {
358 self.closed = true;
359 }
360 Err(err) => {
361 eprint!("An error occurred when closing dataset {:?}", err)
362 }
363 }
364 }
365 Err(err) => {
366 eprint!("An error occurred when closing dataset {:?}", err)
367 }
368 }
369 }
370 }
371 }
372}
373
374impl<'a> Drop for RpcDataSet<'a> {
375 fn drop(&mut self) {
376 self.close();
377 }
378}
379
380impl<'a> DataSet for RpcDataSet<'a> {
381 fn get_column_names(&self) -> Vec<String> {
382 if self.is_ignore_timestamp() {
383 self.column_names.clone()
384 } else {
385 let mut column_names = vec![String::from("Time")];
387 column_names.extend(self.column_names.clone());
388 column_names
389 }
390 }
391
392 fn get_data_types(&self) -> Vec<TSDataType> {
393 if self.is_ignore_timestamp() {
394 self.data_types.clone()
395 } else {
396 let mut column_types = vec![TSDataType::Int64];
398 column_types.extend(self.data_types.clone());
399 column_types
400 }
401 }
402
403 fn is_ignore_timestamp(&self) -> bool {
404 if let Some(v) = self.is_ignore_time_stamp {
405 v
406 } else {
407 false
408 }
409 }
410}
411
412fn fire_closed_error() -> Result<()> {
413 Err("Operation can't be performed, the session is closed.".into())
414}
415
416impl<'a> Session<'a> for RpcSession {
417 fn open(&mut self) -> Result<()> {
418 let resp = self.client.open_session(TSOpenSessionReq::new(
419 self.config.protocol_version,
420 self.config
421 .timezone
422 .clone()
423 .unwrap_or_else(|| DEFAULT_TIME_ZONE.to_string()),
424 self.config.username.clone(),
425 self.config.password.clone(),
426 None,
427 ))?;
428 let res: Result<()> = resp.status.into();
429 res?;
430 self.session_id = resp.session_id;
431 Ok(())
432 }
433
434 fn close(&mut self) -> Result<()> {
435 if let Some(session_id) = self.session_id {
436 let status = self
437 .client
438 .close_session(TSCloseSessionReq::new(session_id))?;
439 self.session_id = None;
440 status.into()
441 } else {
442 fire_closed_error()
443 }
444 }
445
446 fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
447 if let Some(session_id) = self.session_id {
448 let status = self
449 .client
450 .set_storage_group(session_id, storage_group_id.into())?;
451 status.into()
452 } else {
453 fire_closed_error()
454 }
455 }
456
457 fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()> {
458 self.delete_storage_groups(vec![storage_group_id])
459 }
460
461 fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()> {
462 if let Some(session_id) = self.session_id {
463 let status = self.client.delete_storage_groups(
464 session_id,
465 storage_group_ids.iter().map(ToString::to_string).collect(),
466 )?;
467 status.into()
468 } else {
469 fire_closed_error()
470 }
471 }
472
473 fn create_timeseries<T>(
474 &mut self,
475 path: &str,
476 data_type: crate::protocal::TSDataType,
477 encoding: crate::protocal::TSEncoding,
478 compressor: crate::protocal::TSCompressionType,
479 props: T,
480 attributes: T,
481 tags: T,
482 measurement_alias: Option<String>,
483 ) -> Result<()>
484 where
485 T: Into<Option<Dictionary>>,
486 {
487 if let Some(session_id) = self.session_id {
488 self.client
489 .create_timeseries(TSCreateTimeseriesReq::new(
490 session_id,
491 path.to_string(),
492 data_type.into(),
493 encoding.into(),
494 compressor.into(),
495 props,
496 tags,
497 attributes,
498 measurement_alias,
499 ))?
500 .into()
501 } else {
502 fire_closed_error()
503 }
504 }
505
506 fn create_multi_timeseries<T>(
507 &mut self,
508 paths: Vec<&str>,
509 data_types: Vec<crate::protocal::TSDataType>,
510 encodings: Vec<crate::protocal::TSEncoding>,
511 compressors: Vec<crate::protocal::TSCompressionType>,
512 props_list: T,
513 attributes_list: T,
514 tags_list: T,
515 measurement_alias_list: Option<Vec<String>>,
516 ) -> Result<()>
517 where
518 T: Into<Option<Vec<Dictionary>>>,
519 {
520 if let Some(session_id) = self.session_id {
521 let status = self
522 .client
523 .create_multi_timeseries(TSCreateMultiTimeseriesReq::new(
524 session_id,
525 paths.iter().map(ToString::to_string).collect(),
526 data_types.into_iter().map(TSDataType::into).collect(),
527 encodings.into_iter().map(TSEncoding::into).collect(),
528 compressors
529 .into_iter()
530 .map(TSCompressionType::into)
531 .collect(),
532 props_list,
533 attributes_list,
534 tags_list,
535 measurement_alias_list,
536 ))?;
537 status.into()
538 } else {
539 fire_closed_error()
540 }
541 }
542
543 fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()> {
544 if let Some(session_id) = self.session_id {
545 let status = self
546 .client
547 .delete_timeseries(session_id, paths.iter().map(ToString::to_string).collect())?;
548 status.into()
549 } else {
550 fire_closed_error()
551 }
552 }
553
554 fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()> {
555 if let Some(session_id) = self.session_id {
556 let status = self.client.delete_data(TSDeleteDataReq::new(
557 session_id,
558 paths.into_iter().map(ToString::to_string).collect(),
559 start_time,
560 end_time,
561 ))?;
562 status.into()
563 } else {
564 fire_closed_error()
565 }
566 }
567
568 fn insert_string_record<T>(
569 &mut self,
570 device_id: &str,
571 measurements: Vec<&str>,
572 values: Vec<&str>,
573 timestamp: i64,
574 is_aligned: T,
575 ) -> Result<()>
576 where
577 T: Into<Option<bool>>,
578 {
579 if let Some(session_id) = self.session_id {
580 let status = self
581 .client
582 .insert_string_record(TSInsertStringRecordReq::new(
583 session_id,
584 device_id.to_string(),
585 measurements.iter().map(ToString::to_string).collect(),
586 values.iter().map(ToString::to_string).collect(),
587 timestamp,
588 is_aligned,
589 ))?;
590 status.into()
591 } else {
592 fire_closed_error()
593 }
594 }
595
596 fn get_time_zone(&mut self) -> Result<String> {
597 if let Some(session_id) = self.session_id {
598 let resp = self.client.get_time_zone(session_id)?;
599 let res: Result<()> = resp.status.into();
600 res?;
601 Ok(resp.time_zone)
602 } else {
603 Err("Operation can't be performed, the session is closed.".into())
604 }
605 }
606
607 fn set_time_zone(&mut self, time_zone: &str) -> Result<()> {
608 if let Some(session_id) = self.session_id {
609 self.client
610 .set_time_zone(TSSetTimeZoneReq::new(session_id, time_zone.to_string()))?
611 .into()
612 } else {
613 fire_closed_error()
614 }
615 }
616
617 fn execute_statement<T>(
618 &'a mut self,
619 statement: &str,
620 timeout_ms: T,
621 ) -> Result<Box<dyn 'a + DataSet>>
622 where
623 T: Into<Option<i64>>,
624 {
625 if let Some(session_id) = self.session_id {
626 let resp = self.client.execute_statement(TSExecuteStatementReq {
627 session_id,
628 statement: statement.to_string(),
629 statement_id: self.statement_id,
630 fetch_size: Some(self.config.fetch_size),
631 timeout: timeout_ms.into(),
632 enable_redirect_query: None,
633 jdbc_query: None,
634 })?;
635 let status = resp.status;
636 let code = status.code;
637 if code == SUCCESS_STATUS {
638 {
639 if let (Some(column_names), Some(data_type_list)) =
640 (resp.columns, resp.data_type_list)
641 {
642 let column_name_index_map = match resp.column_name_index_map {
643 Some(map) => map,
644 None => {
645 let mut map: BTreeMap<String, i32> = BTreeMap::new();
646 for (index, name) in column_names.iter().enumerate() {
647 map.insert(name.to_string(), index as i32);
648 }
649 map
650 }
651 };
652
653 let data_types: Vec<TSDataType> =
654 data_type_list.iter().map(TSDataType::from).collect();
655
656 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
657
658 let column_count = column_names.len();
659 for (index, name) in column_names.iter().enumerate() {
660 column_index_map
661 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
662 }
663
664 Ok(Box::new(RpcDataSet {
665 session: self,
666 statement: statement.to_string(),
667 query_id: resp.query_id.unwrap(),
668 timestamp: -1,
669 is_ignore_time_stamp: resp.ignore_time_stamp,
670 query_data_set: resp.query_data_set.unwrap(),
671 column_names,
672 data_types,
673 bitmaps: vec![0_u8; column_count],
674 row_index: 0,
675 column_index_map,
676 column_name_index_map,
677 closed: false,
678 }))
679 } else {
680 Err("Can't get resources on execute_statement".into())
681 }
682 }
683 } else {
684 let res: Result<()> = status.into();
685 res?;
686 Err(format!("Unknow, code: {}", code).into())
687 }
688 } else {
689 Err("Operation can't be performed, the session is closed.".into())
690 }
691 }
692
693 fn execute_query_statement<T>(
694 &'a mut self,
695 statement: &str,
696 timeout_ms: T,
697 ) -> Result<Box<dyn 'a + DataSet>>
698 where
699 T: Into<Option<i64>>,
700 {
701 if let Some(session_id) = self.session_id {
702 let resp = self.client.execute_query_statement(TSExecuteStatementReq {
703 session_id,
704 statement: statement.to_string(),
705 statement_id: self.statement_id,
706 fetch_size: Some(self.config.fetch_size),
707 timeout: timeout_ms.into(),
708 enable_redirect_query: None,
709 jdbc_query: None,
710 })?;
711 let status = resp.status;
712 let code = status.code;
713 if code == SUCCESS_STATUS {
714 let column_names: Vec<String> = resp.columns.unwrap();
715
716 let column_name_index_map = match resp.column_name_index_map {
717 Some(v) => v,
718 None => {
719 let mut map: BTreeMap<String, i32> = BTreeMap::new();
720 for (index, name) in column_names.iter().enumerate() {
721 map.insert(name.to_string(), index as i32);
722 }
723 map
724 }
725 };
726
727 let data_types: Vec<TSDataType> = resp
728 .data_type_list
729 .unwrap()
730 .iter()
731 .map(TSDataType::from)
732 .collect();
733
734 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
735
736 let column_count = column_names.len();
737 for (index, name) in column_names.iter().enumerate() {
738 column_index_map
739 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
740 }
741 let dataset = RpcDataSet {
742 session: self,
743 statement: statement.to_string(),
744 query_id: resp.query_id.unwrap(),
745 timestamp: -1,
746 is_ignore_time_stamp: resp.ignore_time_stamp,
747 query_data_set: resp.query_data_set.unwrap(),
748 column_names,
749 data_types,
750 bitmaps: vec![0_u8; column_count],
751 row_index: 0,
752 column_index_map,
753 column_name_index_map,
754 closed: false,
755 };
756 Ok(Box::new(dataset))
757 } else {
758 let res: Result<()> = status.into();
759 res?;
760 Err(format!("Unknow, code: {}", code).into())
761 }
762 } else {
763 Err("Operation can't be performed, the session is closed.".into())
764 }
765 }
766
767 fn insert_record<T>(
768 &mut self,
769 device_id: &str,
770 measurements: Vec<&str>,
771 values: Vec<Value>,
772 timestamp: i64,
773 is_aligned: T,
774 ) -> Result<()>
775 where
776 T: Into<Option<bool>>,
777 {
778 if let Some(session_id) = self.session_id {
779 let mut values_bytes: Vec<u8> = Vec::new();
780 values.iter().for_each(|v| {
781 let mut value_bytes: Vec<u8> = v.into();
782 values_bytes.append(&mut value_bytes);
783 });
784 let status = self.client.insert_record(TSInsertRecordReq::new(
785 session_id,
786 device_id.to_string(),
787 measurements.iter().map(ToString::to_string).collect(),
788 values_bytes,
789 timestamp,
790 is_aligned,
791 ))?;
792 status.into()
793 } else {
794 fire_closed_error()
795 }
796 }
797
798 fn insert_records_of_one_device(
799 &mut self,
800 device_id: &str,
801 timestamps: Vec<i64>,
802 measurements: Vec<Vec<&str>>,
803 values: Vec<Vec<super::Value>>,
804 sorted: bool,
805 ) -> Result<()> {
806 let mut sorted_timestamps = timestamps;
807 let mut sorted_measurements = measurements;
808 let mut sorted_values = values;
809
810 if !sorted {
811 let permutation = permutation::sort(&sorted_timestamps[..]);
812 sorted_timestamps = permutation.apply_slice(&sorted_timestamps[..]);
813 sorted_measurements = permutation.apply_slice(&sorted_measurements[..]);
814 sorted_values = permutation.apply_slice(&sorted_values[..]);
815 }
816
817 if let Some(session_id) = self.session_id {
818 let values_list = sorted_values
819 .iter()
820 .map(|vec| {
821 let mut values: Vec<u8> = Vec::new();
822 for value in vec.iter() {
823 let mut value_data: Vec<u8> = value.into();
824 values.append(&mut value_data);
825 }
826 values
827 })
828 .collect();
829 let status =
830 self.client
831 .insert_records_of_one_device(TSInsertRecordsOfOneDeviceReq::new(
832 session_id,
833 device_id.to_string(),
834 sorted_measurements
835 .iter()
836 .map(|vec| vec.iter().map(ToString::to_string).collect())
837 .collect(),
838 values_list,
839 sorted_timestamps,
840 false,
841 ))?;
842 status.into()
843 } else {
844 fire_closed_error()
845 }
846 }
847
848 fn insert_records(
849 &mut self,
850 prefix_path: Vec<&str>,
851 measurements: Vec<Vec<&str>>,
852 values: Vec<Vec<super::Value>>,
853 timestamps: Vec<i64>,
854 ) -> Result<()> {
855 if let Some(session_id) = self.session_id {
856 let values_list = values
857 .iter()
858 .map(|vec| {
859 let mut values: Vec<u8> = Vec::new();
860 for value in vec.iter() {
861 let mut value_data: Vec<u8> = value.into();
862 values.append(&mut value_data);
863 }
864 values
865 })
866 .collect();
867 let status = self.client.insert_records(TSInsertRecordsReq {
868 session_id,
869 prefix_paths: prefix_path.iter().map(ToString::to_string).collect(),
870 measurements_list: measurements
871 .iter()
872 .map(|ms| ms.iter().map(ToString::to_string).collect())
873 .collect(),
874 values_list,
875 timestamps,
876 is_aligned: None,
877 })?;
878 status.into()
879 } else {
880 fire_closed_error()
881 }
882 }
883
884 fn insert_tablet(&mut self, tablet: &super::Tablet) -> Result<()> {
885 if let Some(session_id) = self.session_id {
886 let mut timestamps_list: Vec<u8> = Vec::with_capacity(tablet.timestamps.len() * 8);
887 tablet
888 .timestamps
889 .iter()
890 .for_each(|ts| timestamps_list.append(&mut ts.to_be_bytes().to_vec()));
891
892 let status = self.client.insert_tablet(TSInsertTabletReq {
893 session_id,
894 prefix_path: tablet.get_prefix_path(),
895 measurements: tablet
896 .measurement_schemas
897 .iter()
898 .map(|f| f.measurement.to_string())
899 .collect(),
900 values: tablet.into(),
901 timestamps: timestamps_list,
902 types: tablet
903 .get_measurement_schemas()
904 .into_iter()
905 .map(|measurement_schema| measurement_schema.data_type.into())
906 .collect(),
907 size: tablet.get_row_count() as i32,
908 is_aligned: Some(false),
909 })?;
910 status.into()
911 } else {
912 fire_closed_error()
913 }
914 }
915
916 fn insert_tablets(&mut self, tablets: Vec<&super::Tablet>) -> Result<()> {
917 if let Some(session_id) = self.session_id {
918 let status = self.client.insert_tablets(TSInsertTabletsReq {
919 session_id,
920 prefix_paths: tablets.iter().map(|t| t.get_prefix_path()).collect(),
921 measurements_list: tablets
922 .iter()
923 .map(|tablet| {
924 tablet
925 .measurement_schemas
926 .iter()
927 .map(|f| f.measurement.to_string())
928 .collect()
929 })
930 .collect(),
931 values_list: tablets.iter().map(|tablet| Into::into(*tablet)).collect(),
932 timestamps_list: tablets
933 .iter()
934 .map(|tablet| {
935 let mut ts_item: Vec<u8> = Vec::new();
936 tablet
937 .timestamps
938 .iter()
939 .for_each(|ts| ts_item.append(&mut ts.to_be_bytes().to_vec()));
940 ts_item
941 })
942 .collect(),
943 types_list: tablets
944 .iter()
945 .map(|tablet| {
946 tablet
947 .get_measurement_schemas()
948 .into_iter()
949 .map(|f| {
950 let t: i32 = f.data_type.into();
951 t
952 })
953 .collect()
954 })
955 .collect(),
956 size_list: tablets
957 .iter()
958 .map(|tablet| tablet.get_row_count() as i32)
959 .collect(),
960 is_aligned: Some(false),
961 })?;
962 status.into()
963 } else {
964 fire_closed_error()
965 }
966 }
967
968 fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()> {
969 if let Some(session_id) = self.session_id {
970 let status =
971 self.client
972 .execute_batch_statement(super::rpc::TSExecuteBatchStatementReq {
973 session_id,
974 statements: statemens.iter().map(ToString::to_string).collect(),
975 })?;
976 status.into()
977 } else {
978 fire_closed_error()
979 }
980 }
981
982 fn execute_raw_data_query(
983 &'a mut self,
984 paths: Vec<&str>,
985 start_time: i64,
986 end_time: i64,
987 ) -> Result<Box<dyn 'a + DataSet>> {
988 if let Some(session_id) = self.session_id {
989 let resp = self
990 .client
991 .execute_raw_data_query(super::rpc::TSRawDataQueryReq {
992 session_id,
993 paths: paths.iter().map(ToString::to_string).collect(),
994 fetch_size: Some(self.config.fetch_size),
995 start_time,
996 end_time,
997 statement_id: self.statement_id,
998 enable_redirect_query: None,
999 jdbc_query: None,
1000 })?;
1001 let status = resp.status;
1002 let code = status.code;
1003 if code == SUCCESS_STATUS {
1004 if let (Some(query_data_set), Some(column_names), Some(data_types_list)) =
1005 (resp.query_data_set, resp.columns, resp.data_type_list)
1006 {
1007 let column_name_index_map = match resp.column_name_index_map {
1008 Some(v) => v,
1009 None => {
1010 let mut map: BTreeMap<String, i32> = BTreeMap::new();
1011 for (index, name) in column_names.iter().enumerate() {
1012 map.insert(name.to_string(), index as i32);
1013 }
1014 map
1015 }
1016 };
1017
1018 let data_types: Vec<TSDataType> =
1019 data_types_list.iter().map(TSDataType::from).collect();
1020
1021 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1022
1023 let column_count = column_names.len();
1024 for (index, name) in column_names.iter().enumerate() {
1025 column_index_map
1026 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1027 }
1028
1029 Ok(Box::new(RpcDataSet {
1030 session: self,
1031 statement: "".to_string(),
1032 query_id: resp.query_id.unwrap(),
1033 timestamp: -1,
1034 is_ignore_time_stamp: resp.ignore_time_stamp,
1035 query_data_set,
1036 column_names,
1037 data_types,
1038 bitmaps: vec![0_u8; column_count],
1039 row_index: 0,
1040 column_index_map,
1041 column_name_index_map,
1042 closed: false,
1043 }))
1044 } else {
1045 Err("Did't get the result.".into())
1046 }
1047 } else {
1048 let res: Result<()> = status.into();
1049 res?;
1050 Err(format!("Unknow, code: {}", code).into())
1051 }
1052 } else {
1053 Err("Operation can't be performed, the session is closed.".into())
1054 }
1055 }
1056
1057 fn execute_update_statement(
1058 &'a mut self,
1059 statement: &str,
1060 ) -> Result<Option<Box<dyn 'a + DataSet>>> {
1061 if let Some(session_id) = self.session_id {
1062 let resp = self
1063 .client
1064 .execute_update_statement(TSExecuteStatementReq {
1065 session_id,
1066 statement: statement.to_string(),
1067 statement_id: self.statement_id,
1068 fetch_size: Some(self.config.fetch_size),
1069 timeout: self.config.timeout_ms,
1070 enable_redirect_query: None,
1071 jdbc_query: None,
1072 })?;
1073 let status = resp.status;
1074 let code = status.code;
1075 if code == SUCCESS_STATUS {
1076 if let (Some(query_data_set), Some(column_names), Some(data_type_list)) =
1077 (resp.query_data_set, resp.columns, resp.data_type_list)
1078 {
1079 let column_name_index_map = match resp.column_name_index_map {
1080 Some(v) => v,
1081 None => {
1082 let mut map: BTreeMap<String, i32> = BTreeMap::new();
1083 for (index, name) in column_names.iter().enumerate() {
1084 map.insert(name.to_string(), index as i32);
1085 }
1086 map
1087 }
1088 };
1089
1090 let data_types: Vec<TSDataType> =
1091 data_type_list.iter().map(TSDataType::from).collect();
1092
1093 let mut column_index_map: HashMap<usize, usize> = HashMap::new();
1094
1095 let column_count = column_names.len();
1096 for (index, name) in column_names.iter().enumerate() {
1097 column_index_map
1098 .insert(*column_name_index_map.get(name).unwrap() as usize, index);
1099 }
1100
1101 Ok(Some(Box::new(RpcDataSet {
1102 session: self,
1103 statement: statement.to_string(),
1104 query_id: resp.query_id.unwrap(),
1105 timestamp: -1,
1106 is_ignore_time_stamp: resp.ignore_time_stamp,
1107 query_data_set,
1108 column_names,
1109 data_types,
1110 bitmaps: vec![0_u8; column_count],
1111 row_index: 0,
1112 column_index_map,
1113 column_name_index_map,
1114 closed: false,
1115 })))
1116 } else {
1117 Ok(None)
1118 }
1119 } else {
1120 let res: Result<()> = status.into();
1121 res?;
1122 Err(format!("Unknow, code: {}", code).into())
1123 }
1124 } else {
1125 Err("Operation can't be performed, the session is closed.".into())
1126 }
1127 }
1128}
1129
1130impl Drop for RpcSession {
1131 fn drop(&mut self) {
1132 if let Some(session_id) = self.session_id {
1133 self.close().unwrap_or_else(|err| {
1134 eprint!("error closing the session {}, reason {}", session_id, err)
1135 });
1136 }
1137 }
1138}