1pub mod remote;
21mod rpc;
22
23use crate::protocal::{TSCompressionType, TSDataType, TSEncoding};
24use std::collections::BTreeMap;
25use std::error::Error;
26
27macro_rules! to_be_vec {
28 ($v:expr) => {{
29 $v.to_be_bytes().to_vec()
30 }};
31}
32
33macro_rules! str {
34 ($v:expr) => {{
35 $v.to_string()
36 }};
37}
38
39pub type Result<T> = core::result::Result<T, Box<dyn Error>>;
40
41pub type Dictionary = BTreeMap<String, String>;
42
43#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
44pub struct MeasurementSchema {
45 pub measurement: String,
46 pub data_type: TSDataType,
47 pub encoding: TSEncoding,
48 pub compressor: TSCompressionType,
49 pub properties: Option<Dictionary>,
50}
51
52impl MeasurementSchema {
53 pub fn new(
54 measurement: String,
55 data_type: TSDataType,
56 encoding: TSEncoding,
57 compressor: TSCompressionType,
58 properties: Option<Dictionary>,
59 ) -> Self {
60 Self {
61 measurement,
62 data_type,
63 encoding,
64 compressor,
65 properties,
66 }
67 }
68}
69#[derive(Debug, Clone)]
70pub struct Tablet {
71 prefix_path: String,
72 measurement_schemas: Vec<MeasurementSchema>,
73 timestamps: Vec<i64>,
74 columns: Vec<Vec<Value>>,
75 }
77
78impl From<&Tablet> for Vec<u8> {
79 fn from(tablet: &Tablet) -> Vec<u8> {
80 let mut buffer: Vec<u8> =
81 Vec::with_capacity(tablet.get_row_count() * tablet.get_column_count() * 8);
82 tablet.columns.iter().for_each(|column| {
83 column.iter().for_each(|value| match value {
84 Value::Bool(v) => match v {
85 true => buffer.push(1),
86 false => buffer.push(0),
87 },
88 Value::Int32(v) => buffer.append(&mut to_be_vec!(v)),
89 Value::Int64(v) => buffer.append(&mut to_be_vec!(v)),
90 Value::Float(v) => buffer.append(&mut to_be_vec!(v)),
91 Value::Double(v) => buffer.append(&mut to_be_vec!(v)),
92 Value::Text(t) => {
93 let len = t.len() as i32;
94 buffer.append(&mut len.to_be_bytes().to_vec());
95 buffer.append(&mut t.as_bytes().to_vec());
96 }
97 Value::Null => unimplemented!("null value doesn't implemented for tablet"),
98 });
99 });
100 buffer
101 }
102}
103
104impl Tablet {
105 pub fn new(prefix_path: &str, measurement_schemas: Vec<MeasurementSchema>) -> Self {
106 let mut columns: Vec<Vec<Value>> = Vec::new();
107 measurement_schemas
108 .iter()
109 .for_each(|_| columns.push(Vec::new()));
110 Self {
111 prefix_path: prefix_path.to_string(),
112 timestamps: Vec::new(),
113 columns,
114 measurement_schemas,
115 }
116 }
117
118 pub fn sort(&mut self) {
119 let permutation = permutation::sort(&self.timestamps[..]);
120
121 self.timestamps = permutation.apply_slice(&self.timestamps[..]);
122 for i in 0..self.columns.len() {
123 self.columns[i] = permutation.apply_slice(&self.columns[i][..]);
124 }
125 }
126
127 pub fn get_prefix_path(&self) -> String {
128 self.prefix_path.clone()
129 }
130
131 pub fn get_measurement_schemas(&self) -> Vec<MeasurementSchema> {
132 self.measurement_schemas.clone()
133 }
134
135 pub fn add_row(&mut self, row: Vec<Value>, timestamp: i64) -> Result<()> {
136 if row.len() != self.columns.len() {
137 return Err(format!("row values '{:?}' must match columns", row).into());
138 }
139
140 row.iter().for_each(|v| {
141 assert!(
142 *v != Value::Null,
143 "Null values are currently not supported."
144 )
145 });
146
147 self.timestamps.push(timestamp);
148 self.columns
149 .iter_mut()
150 .zip(row.iter())
151 .for_each(|(column, value)| column.push(value.clone()));
152 Ok(())
153 }
154
155 pub fn get_timestamps_at(&self, row_index: usize) -> i64 {
156 assert!(row_index < self.timestamps.len());
157 self.timestamps[row_index]
158 }
159
160 pub fn get_value_at(&self, colum_index: usize, row_index: usize) -> Value {
161 assert!(colum_index < self.columns.len());
162 assert!(row_index < self.timestamps.len());
163 self.columns[colum_index][row_index].clone()
164 }
165
166 pub fn get_row_count(&self) -> usize {
167 self.timestamps.len()
168 }
169
170 pub fn get_column_count(&self) -> usize {
171 self.columns.len()
172 }
173}
174
175#[derive(Debug, Clone, PartialEq)]
176pub enum Value {
177 Bool(bool),
178 Int32(i32),
179 Int64(i64),
180 Float(f32),
181 Double(f64),
182 Text(String),
183 Null,
184}
185
186impl ToString for Value {
187 fn to_string(&self) -> String {
188 match &self {
189 Value::Bool(v) => str!(v),
190 Value::Int32(v) => str!(v),
191 Value::Int64(v) => str!(v),
192 Value::Float(v) => str!(v),
193 Value::Double(v) => str!(v),
194 Value::Text(v) => str!(v),
195 Value::Null => str!("null"),
196 }
197 }
198}
199
200impl From<Vec<u8>> for Value {
201 fn from(mut bytes: Vec<u8>) -> Self {
202 match bytes.remove(0) {
203 0 => Value::Bool(bytes.remove(0) == 1_u8),
204 1 => Value::Int32(i32::from_be_bytes(bytes.try_into().unwrap())),
205 2 => Value::Int64(i64::from_be_bytes(bytes.try_into().unwrap())),
206 3 => Value::Float(f32::from_be_bytes(bytes.try_into().unwrap())),
207 4 => Value::Double(f64::from_be_bytes(bytes.try_into().unwrap())),
208 5 => Value::Text(String::from_utf8(bytes).unwrap()),
209 _ => Value::Null,
210 }
211 }
212}
213
214impl From<&Value> for Vec<u8> {
215 fn from(v: &Value) -> Vec<u8> {
216 match v {
217 Value::Bool(v) => match v {
218 true => vec![TSDataType::Boolean as u8, 1],
219 false => vec![TSDataType::Boolean as u8, 0],
220 },
221 Value::Int32(v) => {
222 let mut buff: Vec<u8> = Vec::with_capacity(4);
223 buff.push(TSDataType::Int32 as u8);
224 buff.append(&mut to_be_vec!(v));
225 buff
226 }
227 Value::Int64(v) => {
228 let mut buff: Vec<u8> = Vec::with_capacity(8);
229 buff.push(TSDataType::Int64 as u8);
230 buff.append(&mut to_be_vec!(v));
231 buff
232 }
233 Value::Float(v) => {
234 let mut buff: Vec<u8> = Vec::with_capacity(4);
235 buff.push(TSDataType::Float as u8);
236 buff.append(&mut to_be_vec!(v));
237 buff
238 }
239 Value::Double(v) => {
240 let mut buff: Vec<u8> = Vec::with_capacity(8);
241 buff.push(TSDataType::Double as u8);
242 buff.append(&mut to_be_vec!(v));
243 buff
244 }
245 Value::Text(t) => {
246 let mut buff: Vec<u8> = Vec::with_capacity(4 + t.len());
247 let len = t.len() as i32;
248 buff.push(TSDataType::Text as u8);
249 buff.append(&mut to_be_vec!(len));
250 buff.append(&mut t.as_bytes().to_vec());
251 buff
252 }
253 Value::Null => vec![],
254 }
255 }
256}
257
258#[derive(Clone, Debug)]
259pub struct RowRecord {
260 pub timestamp: i64,
261 pub values: Vec<Value>,
262}
263pub trait DataSet: Iterator<Item = RowRecord> {
264 fn get_column_names(&self) -> Vec<String>;
265 fn get_data_types(&self) -> Vec<TSDataType>;
266 fn is_ignore_timestamp(&self) -> bool;
267}
268
269pub trait Session<'a> {
270 fn open(&mut self) -> Result<()>;
271
272 fn close(&mut self) -> Result<()>;
273
274 fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
275
276 fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
277
278 fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()>;
279
280 fn create_timeseries<T>(
281 &mut self,
282 path: &str,
283 data_type: TSDataType,
284 encoding: TSEncoding,
285 compressor: TSCompressionType,
286 props: T,
287 attributes: T,
288 tags: T,
289 measurement_alias: Option<String>,
290 ) -> Result<()>
291 where
292 T: Into<Option<Dictionary>>;
293
294 fn create_multi_timeseries<T>(
295 &mut self,
296 paths: Vec<&str>,
297 data_types: Vec<TSDataType>,
298 encodings: Vec<TSEncoding>,
299 compressors: Vec<TSCompressionType>,
300 props_list: T,
301 attributes_list: T,
302 tags_list: T,
303 measurement_alias_list: Option<Vec<String>>,
304 ) -> Result<()>
305 where
306 T: Into<Option<Vec<Dictionary>>>;
307
308 fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()>;
309
310 fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()>;
311
312 fn insert_string_record<T>(
313 &mut self,
314 device_id: &str,
315 measurements: Vec<&str>,
316 values: Vec<&str>,
317 timestamp: i64,
318 is_aligned: T,
319 ) -> Result<()>
320 where
321 T: Into<Option<bool>>;
322
323 fn get_time_zone(&mut self) -> Result<String>;
324
325 fn set_time_zone(&mut self, time_zone: &str) -> Result<()>;
326
327 fn execute_statement<T>(
328 &'a mut self,
329 statement: &str,
330 timeout_ms: T,
331 ) -> Result<Box<dyn 'a + DataSet>>
332 where
333 T: Into<Option<i64>>;
334
335 fn execute_query_statement<T>(
336 &'a mut self,
337 statement: &str,
338 timeout_ms: T,
339 ) -> Result<Box<dyn 'a + DataSet>>
340 where
341 T: Into<Option<i64>>;
342
343 fn insert_record<T>(
344 &mut self,
345 device_id: &str,
346 measurements: Vec<&str>,
347 values: Vec<Value>,
348 timestamp: i64,
349 is_aligned: T,
350 ) -> Result<()>
351 where
352 T: Into<Option<bool>>;
353
354 fn insert_records_of_one_device(
355 &mut self,
356 device_id: &str,
357 timestamps: Vec<i64>,
358 measurements: Vec<Vec<&str>>,
359 values: Vec<Vec<Value>>,
360 sorted: bool,
361 ) -> Result<()>;
362
363 fn insert_records(
364 &mut self,
365 prefix_path: Vec<&str>,
366 measurements: Vec<Vec<&str>>,
367 values: Vec<Vec<Value>>,
368 timestamps: Vec<i64>,
369 ) -> Result<()>;
370
371 fn insert_tablet(&mut self, tablet: &Tablet) -> Result<()>;
372
373 fn insert_tablets(&mut self, tablets: Vec<&Tablet>) -> Result<()>;
374
375 fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()>;
376
377 fn execute_raw_data_query(
378 &'a mut self,
379 paths: Vec<&str>,
380 start_time: i64,
381 end_time: i64,
382 ) -> Result<Box<dyn 'a + DataSet>>;
383
384 fn execute_update_statement(
385 &'a mut self,
386 statement: &str,
387 ) -> Result<Option<Box<dyn 'a + DataSet>>>;
388}