iotdb/client/
mod.rs

1//
2// Licensed to the Apache Software Foundation (ASF) under one
3// or more contributor license agreements.  See the NOTICE file
4// distributed with this work for additional information
5// regarding copyright ownership.  The ASF licenses this file
6// to you under the Apache License, Version 2.0 (the
7// "License"); you may not use this file except in compliance
8// with the License.  You may obtain a copy of the License at
9//
10//  http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing,
13// software distributed under the License is distributed on an
14// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15// KIND, either express or implied.  See the License for the
16// specific language governing permissions and limitations
17// under the License.
18//
19
20pub mod remote;
21mod rpc;
22
23use crate::protocal::{TSCompressionType, TSDataType, TSEncoding};
24use std::collections::BTreeMap;
25use std::error::Error;
26
27macro_rules! to_be_vec {
28    ($v:expr) => {{
29        $v.to_be_bytes().to_vec()
30    }};
31}
32
33macro_rules! str {
34    ($v:expr) => {{
35        $v.to_string()
36    }};
37}
38
39pub type Result<T> = core::result::Result<T, Box<dyn Error>>;
40
41pub type Dictionary = BTreeMap<String, String>;
42
43#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
44pub struct MeasurementSchema {
45    pub measurement: String,
46    pub data_type: TSDataType,
47    pub encoding: TSEncoding,
48    pub compressor: TSCompressionType,
49    pub properties: Option<Dictionary>,
50}
51
52impl MeasurementSchema {
53    pub fn new(
54        measurement: String,
55        data_type: TSDataType,
56        encoding: TSEncoding,
57        compressor: TSCompressionType,
58        properties: Option<Dictionary>,
59    ) -> Self {
60        Self {
61            measurement,
62            data_type,
63            encoding,
64            compressor,
65            properties,
66        }
67    }
68}
69#[derive(Debug, Clone)]
70pub struct Tablet {
71    prefix_path: String,
72    measurement_schemas: Vec<MeasurementSchema>,
73    timestamps: Vec<i64>,
74    columns: Vec<Vec<Value>>,
75    // bitmaps: Vec<Vec<u8>>,
76}
77
78impl From<&Tablet> for Vec<u8> {
79    fn from(tablet: &Tablet) -> Vec<u8> {
80        let mut buffer: Vec<u8> =
81            Vec::with_capacity(tablet.get_row_count() * tablet.get_column_count() * 8);
82        tablet.columns.iter().for_each(|column| {
83            column.iter().for_each(|value| match value {
84                Value::Bool(v) => match v {
85                    true => buffer.push(1),
86                    false => buffer.push(0),
87                },
88                Value::Int32(v) => buffer.append(&mut to_be_vec!(v)),
89                Value::Int64(v) => buffer.append(&mut to_be_vec!(v)),
90                Value::Float(v) => buffer.append(&mut to_be_vec!(v)),
91                Value::Double(v) => buffer.append(&mut to_be_vec!(v)),
92                Value::Text(t) => {
93                    let len = t.len() as i32;
94                    buffer.append(&mut len.to_be_bytes().to_vec());
95                    buffer.append(&mut t.as_bytes().to_vec());
96                }
97                Value::Null => unimplemented!("null value doesn't implemented for tablet"),
98            });
99        });
100        buffer
101    }
102}
103
104impl Tablet {
105    pub fn new(prefix_path: &str, measurement_schemas: Vec<MeasurementSchema>) -> Self {
106        let mut columns: Vec<Vec<Value>> = Vec::new();
107        measurement_schemas
108            .iter()
109            .for_each(|_| columns.push(Vec::new()));
110        Self {
111            prefix_path: prefix_path.to_string(),
112            timestamps: Vec::new(),
113            columns,
114            measurement_schemas,
115        }
116    }
117
118    pub fn sort(&mut self) {
119        let permutation = permutation::sort(&self.timestamps[..]);
120
121        self.timestamps = permutation.apply_slice(&self.timestamps[..]);
122        for i in 0..self.columns.len() {
123            self.columns[i] = permutation.apply_slice(&self.columns[i][..]);
124        }
125    }
126
127    pub fn get_prefix_path(&self) -> String {
128        self.prefix_path.clone()
129    }
130
131    pub fn get_measurement_schemas(&self) -> Vec<MeasurementSchema> {
132        self.measurement_schemas.clone()
133    }
134
135    pub fn add_row(&mut self, row: Vec<Value>, timestamp: i64) -> Result<()> {
136        if row.len() != self.columns.len() {
137            return Err(format!("row values '{:?}' must match columns", row).into());
138        }
139
140        row.iter().for_each(|v| {
141            assert!(
142                *v != Value::Null,
143                "Null values are currently not supported."
144            )
145        });
146
147        self.timestamps.push(timestamp);
148        self.columns
149            .iter_mut()
150            .zip(row.iter())
151            .for_each(|(column, value)| column.push(value.clone()));
152        Ok(())
153    }
154
155    pub fn get_timestamps_at(&self, row_index: usize) -> i64 {
156        assert!(row_index < self.timestamps.len());
157        self.timestamps[row_index]
158    }
159
160    pub fn get_value_at(&self, colum_index: usize, row_index: usize) -> Value {
161        assert!(colum_index < self.columns.len());
162        assert!(row_index < self.timestamps.len());
163        self.columns[colum_index][row_index].clone()
164    }
165
166    pub fn get_row_count(&self) -> usize {
167        self.timestamps.len()
168    }
169
170    pub fn get_column_count(&self) -> usize {
171        self.columns.len()
172    }
173}
174
175#[derive(Debug, Clone, PartialEq)]
176pub enum Value {
177    Bool(bool),
178    Int32(i32),
179    Int64(i64),
180    Float(f32),
181    Double(f64),
182    Text(String),
183    Null,
184}
185
186impl ToString for Value {
187    fn to_string(&self) -> String {
188        match &self {
189            Value::Bool(v) => str!(v),
190            Value::Int32(v) => str!(v),
191            Value::Int64(v) => str!(v),
192            Value::Float(v) => str!(v),
193            Value::Double(v) => str!(v),
194            Value::Text(v) => str!(v),
195            Value::Null => str!("null"),
196        }
197    }
198}
199
200impl From<Vec<u8>> for Value {
201    fn from(mut bytes: Vec<u8>) -> Self {
202        match bytes.remove(0) {
203            0 => Value::Bool(bytes.remove(0) == 1_u8),
204            1 => Value::Int32(i32::from_be_bytes(bytes.try_into().unwrap())),
205            2 => Value::Int64(i64::from_be_bytes(bytes.try_into().unwrap())),
206            3 => Value::Float(f32::from_be_bytes(bytes.try_into().unwrap())),
207            4 => Value::Double(f64::from_be_bytes(bytes.try_into().unwrap())),
208            5 => Value::Text(String::from_utf8(bytes).unwrap()),
209            _ => Value::Null,
210        }
211    }
212}
213
214impl From<&Value> for Vec<u8> {
215    fn from(v: &Value) -> Vec<u8> {
216        match v {
217            Value::Bool(v) => match v {
218                true => vec![TSDataType::Boolean as u8, 1],
219                false => vec![TSDataType::Boolean as u8, 0],
220            },
221            Value::Int32(v) => {
222                let mut buff: Vec<u8> = Vec::with_capacity(4);
223                buff.push(TSDataType::Int32 as u8);
224                buff.append(&mut to_be_vec!(v));
225                buff
226            }
227            Value::Int64(v) => {
228                let mut buff: Vec<u8> = Vec::with_capacity(8);
229                buff.push(TSDataType::Int64 as u8);
230                buff.append(&mut to_be_vec!(v));
231                buff
232            }
233            Value::Float(v) => {
234                let mut buff: Vec<u8> = Vec::with_capacity(4);
235                buff.push(TSDataType::Float as u8);
236                buff.append(&mut to_be_vec!(v));
237                buff
238            }
239            Value::Double(v) => {
240                let mut buff: Vec<u8> = Vec::with_capacity(8);
241                buff.push(TSDataType::Double as u8);
242                buff.append(&mut to_be_vec!(v));
243                buff
244            }
245            Value::Text(t) => {
246                let mut buff: Vec<u8> = Vec::with_capacity(4 + t.len());
247                let len = t.len() as i32;
248                buff.push(TSDataType::Text as u8);
249                buff.append(&mut to_be_vec!(len));
250                buff.append(&mut t.as_bytes().to_vec());
251                buff
252            }
253            Value::Null => vec![],
254        }
255    }
256}
257
258#[derive(Clone, Debug)]
259pub struct RowRecord {
260    pub timestamp: i64,
261    pub values: Vec<Value>,
262}
263pub trait DataSet: Iterator<Item = RowRecord> {
264    fn get_column_names(&self) -> Vec<String>;
265    fn get_data_types(&self) -> Vec<TSDataType>;
266    fn is_ignore_timestamp(&self) -> bool;
267}
268
269pub trait Session<'a> {
270    fn open(&mut self) -> Result<()>;
271
272    fn close(&mut self) -> Result<()>;
273
274    fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
275
276    fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
277
278    fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()>;
279
280    fn create_timeseries<T>(
281        &mut self,
282        path: &str,
283        data_type: TSDataType,
284        encoding: TSEncoding,
285        compressor: TSCompressionType,
286        props: T,
287        attributes: T,
288        tags: T,
289        measurement_alias: Option<String>,
290    ) -> Result<()>
291    where
292        T: Into<Option<Dictionary>>;
293
294    fn create_multi_timeseries<T>(
295        &mut self,
296        paths: Vec<&str>,
297        data_types: Vec<TSDataType>,
298        encodings: Vec<TSEncoding>,
299        compressors: Vec<TSCompressionType>,
300        props_list: T,
301        attributes_list: T,
302        tags_list: T,
303        measurement_alias_list: Option<Vec<String>>,
304    ) -> Result<()>
305    where
306        T: Into<Option<Vec<Dictionary>>>;
307
308    fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()>;
309
310    fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()>;
311
312    fn insert_string_record<T>(
313        &mut self,
314        device_id: &str,
315        measurements: Vec<&str>,
316        values: Vec<&str>,
317        timestamp: i64,
318        is_aligned: T,
319    ) -> Result<()>
320    where
321        T: Into<Option<bool>>;
322
323    fn get_time_zone(&mut self) -> Result<String>;
324
325    fn set_time_zone(&mut self, time_zone: &str) -> Result<()>;
326
327    fn execute_statement<T>(
328        &'a mut self,
329        statement: &str,
330        timeout_ms: T,
331    ) -> Result<Box<dyn 'a + DataSet>>
332    where
333        T: Into<Option<i64>>;
334
335    fn execute_query_statement<T>(
336        &'a mut self,
337        statement: &str,
338        timeout_ms: T,
339    ) -> Result<Box<dyn 'a + DataSet>>
340    where
341        T: Into<Option<i64>>;
342
343    fn insert_record<T>(
344        &mut self,
345        device_id: &str,
346        measurements: Vec<&str>,
347        values: Vec<Value>,
348        timestamp: i64,
349        is_aligned: T,
350    ) -> Result<()>
351    where
352        T: Into<Option<bool>>;
353
354    fn insert_records_of_one_device(
355        &mut self,
356        device_id: &str,
357        timestamps: Vec<i64>,
358        measurements: Vec<Vec<&str>>,
359        values: Vec<Vec<Value>>,
360        sorted: bool,
361    ) -> Result<()>;
362
363    fn insert_records(
364        &mut self,
365        prefix_path: Vec<&str>,
366        measurements: Vec<Vec<&str>>,
367        values: Vec<Vec<Value>>,
368        timestamps: Vec<i64>,
369    ) -> Result<()>;
370
371    fn insert_tablet(&mut self, tablet: &Tablet) -> Result<()>;
372
373    fn insert_tablets(&mut self, tablets: Vec<&Tablet>) -> Result<()>;
374
375    fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()>;
376
377    fn execute_raw_data_query(
378        &'a mut self,
379        paths: Vec<&str>,
380        start_time: i64,
381        end_time: i64,
382    ) -> Result<Box<dyn 'a + DataSet>>;
383
384    fn execute_update_statement(
385        &'a mut self,
386        statement: &str,
387    ) -> Result<Option<Box<dyn 'a + DataSet>>>;
388}