iotdb_client/client/
mod.rs

1//
2// Licensed to the Apache Software Foundation (ASF) under one
3// or more contributor license agreements.  See the NOTICE file
4// distributed with this work for additional information
5// regarding copyright ownership.  The ASF licenses this file
6// to you under the Apache License, Version 2.0 (the
7// "License"); you may not use this file except in compliance
8// with the License.  You may obtain a copy of the License at
9//
10//  http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing,
13// software distributed under the License is distributed on an
14// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15// KIND, either express or implied.  See the License for the
16// specific language governing permissions and limitations
17// under the License.
18//
19
20pub mod remote;
21mod rpc;
22pub mod common;
23
24use crate::protocal::{TSCompressionType, TSDataType, TSEncoding};
25use std::collections::BTreeMap;
26use std::error::Error;
27
28macro_rules! to_be_vec {
29    ($v:expr) => {{
30        $v.to_be_bytes().to_vec()
31    }};
32}
33
34macro_rules! str {
35    ($v:expr) => {{
36        $v.to_string()
37    }};
38}
39
40pub type Result<T> = core::result::Result<T, Box<dyn Error>>;
41
42pub type Dictionary = BTreeMap<String, String>;
43
44#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
45pub struct MeasurementSchema {
46    pub measurement: String,
47    pub data_type: TSDataType,
48    pub encoding: TSEncoding,
49    pub compressor: TSCompressionType,
50    pub properties: Option<Dictionary>,
51}
52
53impl MeasurementSchema {
54    pub fn new(
55        measurement: String,
56        data_type: TSDataType,
57        encoding: TSEncoding,
58        compressor: TSCompressionType,
59        properties: Option<Dictionary>,
60    ) -> Self {
61        Self {
62            measurement,
63            data_type,
64            encoding,
65            compressor,
66            properties,
67        }
68    }
69}
70#[derive(Debug, Clone)]
71pub struct Tablet {
72    prefix_path: String,
73    measurement_schemas: Vec<MeasurementSchema>,
74    timestamps: Vec<i64>,
75    columns: Vec<Vec<Value>>,
76    // bitmaps: Vec<Vec<u8>>,
77}
78
79impl From<&Tablet> for Vec<u8> {
80    fn from(tablet: &Tablet) -> Vec<u8> {
81        let mut buffer: Vec<u8> =
82            Vec::with_capacity(tablet.get_row_count() * tablet.get_column_count() * 8);
83        tablet.columns.iter().for_each(|column| {
84            column.iter().for_each(|value| match value {
85                Value::Bool(v) => match v {
86                    true => buffer.push(1),
87                    false => buffer.push(0),
88                },
89                Value::Int32(v) => buffer.append(&mut to_be_vec!(v)),
90                Value::Int64(v) => buffer.append(&mut to_be_vec!(v)),
91                Value::Float(v) => buffer.append(&mut to_be_vec!(v)),
92                Value::Double(v) => buffer.append(&mut to_be_vec!(v)),
93                Value::Text(t) => {
94                    let len = t.len() as i32;
95                    buffer.append(&mut len.to_be_bytes().to_vec());
96                    buffer.append(&mut t.as_bytes().to_vec());
97                }
98                Value::Null => unimplemented!("null value doesn't implemented for tablet"),
99            });
100        });
101        buffer
102    }
103}
104
105impl Tablet {
106    pub fn new(prefix_path: &str, measurement_schemas: Vec<MeasurementSchema>) -> Self {
107        let mut columns: Vec<Vec<Value>> = Vec::new();
108        measurement_schemas
109            .iter()
110            .for_each(|_| columns.push(Vec::new()));
111        Self {
112            prefix_path: prefix_path.to_string(),
113            timestamps: Vec::new(),
114            columns,
115            measurement_schemas,
116        }
117    }
118
119    pub fn sort(&mut self) {
120        let permutation = permutation::sort(&self.timestamps[..]);
121
122        self.timestamps = permutation.apply_slice(&self.timestamps[..]);
123        for i in 0..self.columns.len() {
124            self.columns[i] = permutation.apply_slice(&self.columns[i][..]);
125        }
126    }
127
128    pub fn get_prefix_path(&self) -> String {
129        self.prefix_path.clone()
130    }
131
132    pub fn get_measurement_schemas(&self) -> Vec<MeasurementSchema> {
133        self.measurement_schemas.clone()
134    }
135
136    pub fn add_row(&mut self, row: Vec<Value>, timestamp: i64) -> Result<()> {
137        if row.len() != self.columns.len() {
138            return Err(format!("row values '{:?}' must match columns", row).into());
139        }
140
141        row.iter().for_each(|v| {
142            assert!(
143                *v != Value::Null,
144                "Null values are currently not supported."
145            )
146        });
147
148        self.timestamps.push(timestamp);
149        self.columns
150            .iter_mut()
151            .zip(row.iter())
152            .for_each(|(column, value)| column.push(value.clone()));
153        Ok(())
154    }
155
156    pub fn get_timestamps_at(&self, row_index: usize) -> i64 {
157        assert!(row_index < self.timestamps.len());
158        self.timestamps[row_index]
159    }
160
161    pub fn get_value_at(&self, colum_index: usize, row_index: usize) -> Value {
162        assert!(colum_index < self.columns.len());
163        assert!(row_index < self.timestamps.len());
164        self.columns[colum_index][row_index].clone()
165    }
166
167    pub fn get_row_count(&self) -> usize {
168        self.timestamps.len()
169    }
170
171    pub fn get_column_count(&self) -> usize {
172        self.columns.len()
173    }
174}
175
176#[derive(Debug, Clone, PartialEq)]
177pub enum Value {
178    Bool(bool),
179    Int32(i32),
180    Int64(i64),
181    Float(f32),
182    Double(f64),
183    Text(String),
184    Null,
185}
186
187impl ToString for Value {
188    fn to_string(&self) -> String {
189        match &self {
190            Value::Bool(v) => str!(v),
191            Value::Int32(v) => str!(v),
192            Value::Int64(v) => str!(v),
193            Value::Float(v) => str!(v),
194            Value::Double(v) => str!(v),
195            Value::Text(v) => str!(v),
196            Value::Null => str!("null"),
197        }
198    }
199}
200
201impl From<Vec<u8>> for Value {
202    fn from(mut bytes: Vec<u8>) -> Self {
203        match bytes.remove(0) {
204            0 => Value::Bool(bytes.remove(0) == 1_u8),
205            1 => Value::Int32(i32::from_be_bytes(bytes.try_into().unwrap())),
206            2 => Value::Int64(i64::from_be_bytes(bytes.try_into().unwrap())),
207            3 => Value::Float(f32::from_be_bytes(bytes.try_into().unwrap())),
208            4 => Value::Double(f64::from_be_bytes(bytes.try_into().unwrap())),
209            5 => Value::Text(String::from_utf8(bytes).unwrap()),
210            _ => Value::Null,
211        }
212    }
213}
214
215impl From<&Value> for Vec<u8> {
216    fn from(v: &Value) -> Vec<u8> {
217        match v {
218            Value::Bool(v) => match v {
219                true => vec![TSDataType::Boolean as u8, 1],
220                false => vec![TSDataType::Boolean as u8, 0],
221            },
222            Value::Int32(v) => {
223                let mut buff: Vec<u8> = Vec::with_capacity(4);
224                buff.push(TSDataType::Int32 as u8);
225                buff.append(&mut to_be_vec!(v));
226                buff
227            }
228            Value::Int64(v) => {
229                let mut buff: Vec<u8> = Vec::with_capacity(8);
230                buff.push(TSDataType::Int64 as u8);
231                buff.append(&mut to_be_vec!(v));
232                buff
233            }
234            Value::Float(v) => {
235                let mut buff: Vec<u8> = Vec::with_capacity(4);
236                buff.push(TSDataType::Float as u8);
237                buff.append(&mut to_be_vec!(v));
238                buff
239            }
240            Value::Double(v) => {
241                let mut buff: Vec<u8> = Vec::with_capacity(8);
242                buff.push(TSDataType::Double as u8);
243                buff.append(&mut to_be_vec!(v));
244                buff
245            }
246            Value::Text(t) => {
247                let mut buff: Vec<u8> = Vec::with_capacity(4 + t.len());
248                let len = t.len() as i32;
249                buff.push(TSDataType::Text as u8);
250                buff.append(&mut to_be_vec!(len));
251                buff.append(&mut t.as_bytes().to_vec());
252                buff
253            }
254            Value::Null => vec![],
255        }
256    }
257}
258
259#[derive(Clone, Debug)]
260pub struct RowRecord {
261    pub timestamp: i64,
262    pub values: Vec<Value>,
263}
264pub trait DataSet: Iterator<Item = RowRecord> {
265    fn get_column_names(&self) -> Vec<String>;
266    fn get_data_types(&self) -> Vec<TSDataType>;
267    fn is_ignore_timestamp(&self) -> bool;
268}
269
270pub trait Session<'a> {
271    fn open(&mut self) -> Result<()>;
272
273    fn close(&mut self) -> Result<()>;
274
275    fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
276
277    fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
278
279    fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()>;
280
281    fn create_timeseries<T>(
282        &mut self,
283        path: &str,
284        data_type: TSDataType,
285        encoding: TSEncoding,
286        compressor: TSCompressionType,
287        props: T,
288        attributes: T,
289        tags: T,
290        measurement_alias: Option<String>,
291    ) -> Result<()>
292    where
293        T: Into<Option<Dictionary>>;
294
295    fn create_multi_timeseries<T>(
296        &mut self,
297        paths: Vec<&str>,
298        data_types: Vec<TSDataType>,
299        encodings: Vec<TSEncoding>,
300        compressors: Vec<TSCompressionType>,
301        props_list: T,
302        attributes_list: T,
303        tags_list: T,
304        measurement_alias_list: Option<Vec<String>>,
305    ) -> Result<()>
306    where
307        T: Into<Option<Vec<Dictionary>>>;
308
309    fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()>;
310
311    fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()>;
312
313    fn insert_string_record<T>(
314        &mut self,
315        device_id: &str,
316        measurements: Vec<&str>,
317        values: Vec<&str>,
318        timestamp: i64,
319        is_aligned: T,
320        timeout: i64
321    ) -> Result<()>
322    where
323        T: Into<Option<bool>>;
324
325    fn get_time_zone(&mut self) -> Result<String>;
326
327    fn set_time_zone(&mut self, time_zone: &str) -> Result<()>;
328
329    fn execute_statement<T>(
330        &'a mut self,
331        statement: &str,
332        timeout_ms: T,
333    ) -> Result<Box<dyn 'a + DataSet>>
334    where
335        T: Into<Option<i64>>;
336
337    fn execute_query_statement<T>(
338        &'a mut self,
339        statement: &str,
340        timeout_ms: T,
341    ) -> Result<Box<dyn 'a + DataSet>>
342    where
343        T: Into<Option<i64>>;
344
345    fn insert_record<T>(
346        &mut self,
347        device_id: &str,
348        measurements: Vec<&str>,
349        values: Vec<Value>,
350        timestamp: i64,
351        is_aligned: T,
352    ) -> Result<()>
353    where
354        T: Into<Option<bool>>;
355
356    fn insert_records_of_one_device(
357        &mut self,
358        device_id: &str,
359        timestamps: Vec<i64>,
360        measurements: Vec<Vec<&str>>,
361        values: Vec<Vec<Value>>,
362        sorted: bool,
363    ) -> Result<()>;
364
365    fn insert_records(
366        &mut self,
367        prefix_path: Vec<&str>,
368        measurements: Vec<Vec<&str>>,
369        values: Vec<Vec<Value>>,
370        timestamps: Vec<i64>,
371    ) -> Result<()>;
372
373    fn insert_tablet(&mut self, tablet: &Tablet) -> Result<()>;
374
375    fn insert_tablets(&mut self, tablets: Vec<&Tablet>) -> Result<()>;
376
377    fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()>;
378
379    fn execute_raw_data_query(
380        &'a mut self,
381        paths: Vec<&str>,
382        start_time: i64,
383        end_time: i64,
384        timeout_ms: i64
385    ) -> Result<Box<dyn 'a + DataSet>>;
386
387    fn execute_update_statement(
388        &'a mut self,
389        statement: &str,
390    ) -> Result<Option<Box<dyn 'a + DataSet>>>;
391}