1pub mod remote;
21mod rpc;
22pub mod common;
23
24use crate::protocal::{TSCompressionType, TSDataType, TSEncoding};
25use std::collections::BTreeMap;
26use std::error::Error;
27
28macro_rules! to_be_vec {
29 ($v:expr) => {{
30 $v.to_be_bytes().to_vec()
31 }};
32}
33
34macro_rules! str {
35 ($v:expr) => {{
36 $v.to_string()
37 }};
38}
39
40pub type Result<T> = core::result::Result<T, Box<dyn Error>>;
41
42pub type Dictionary = BTreeMap<String, String>;
43
44#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
45pub struct MeasurementSchema {
46 pub measurement: String,
47 pub data_type: TSDataType,
48 pub encoding: TSEncoding,
49 pub compressor: TSCompressionType,
50 pub properties: Option<Dictionary>,
51}
52
53impl MeasurementSchema {
54 pub fn new(
55 measurement: String,
56 data_type: TSDataType,
57 encoding: TSEncoding,
58 compressor: TSCompressionType,
59 properties: Option<Dictionary>,
60 ) -> Self {
61 Self {
62 measurement,
63 data_type,
64 encoding,
65 compressor,
66 properties,
67 }
68 }
69}
70#[derive(Debug, Clone)]
71pub struct Tablet {
72 prefix_path: String,
73 measurement_schemas: Vec<MeasurementSchema>,
74 timestamps: Vec<i64>,
75 columns: Vec<Vec<Value>>,
76 }
78
79impl From<&Tablet> for Vec<u8> {
80 fn from(tablet: &Tablet) -> Vec<u8> {
81 let mut buffer: Vec<u8> =
82 Vec::with_capacity(tablet.get_row_count() * tablet.get_column_count() * 8);
83 tablet.columns.iter().for_each(|column| {
84 column.iter().for_each(|value| match value {
85 Value::Bool(v) => match v {
86 true => buffer.push(1),
87 false => buffer.push(0),
88 },
89 Value::Int32(v) => buffer.append(&mut to_be_vec!(v)),
90 Value::Int64(v) => buffer.append(&mut to_be_vec!(v)),
91 Value::Float(v) => buffer.append(&mut to_be_vec!(v)),
92 Value::Double(v) => buffer.append(&mut to_be_vec!(v)),
93 Value::Text(t) => {
94 let len = t.len() as i32;
95 buffer.append(&mut len.to_be_bytes().to_vec());
96 buffer.append(&mut t.as_bytes().to_vec());
97 }
98 Value::Null => unimplemented!("null value doesn't implemented for tablet"),
99 });
100 });
101 buffer
102 }
103}
104
105impl Tablet {
106 pub fn new(prefix_path: &str, measurement_schemas: Vec<MeasurementSchema>) -> Self {
107 let mut columns: Vec<Vec<Value>> = Vec::new();
108 measurement_schemas
109 .iter()
110 .for_each(|_| columns.push(Vec::new()));
111 Self {
112 prefix_path: prefix_path.to_string(),
113 timestamps: Vec::new(),
114 columns,
115 measurement_schemas,
116 }
117 }
118
119 pub fn sort(&mut self) {
120 let permutation = permutation::sort(&self.timestamps[..]);
121
122 self.timestamps = permutation.apply_slice(&self.timestamps[..]);
123 for i in 0..self.columns.len() {
124 self.columns[i] = permutation.apply_slice(&self.columns[i][..]);
125 }
126 }
127
128 pub fn get_prefix_path(&self) -> String {
129 self.prefix_path.clone()
130 }
131
132 pub fn get_measurement_schemas(&self) -> Vec<MeasurementSchema> {
133 self.measurement_schemas.clone()
134 }
135
136 pub fn add_row(&mut self, row: Vec<Value>, timestamp: i64) -> Result<()> {
137 if row.len() != self.columns.len() {
138 return Err(format!("row values '{:?}' must match columns", row).into());
139 }
140
141 row.iter().for_each(|v| {
142 assert!(
143 *v != Value::Null,
144 "Null values are currently not supported."
145 )
146 });
147
148 self.timestamps.push(timestamp);
149 self.columns
150 .iter_mut()
151 .zip(row.iter())
152 .for_each(|(column, value)| column.push(value.clone()));
153 Ok(())
154 }
155
156 pub fn get_timestamps_at(&self, row_index: usize) -> i64 {
157 assert!(row_index < self.timestamps.len());
158 self.timestamps[row_index]
159 }
160
161 pub fn get_value_at(&self, colum_index: usize, row_index: usize) -> Value {
162 assert!(colum_index < self.columns.len());
163 assert!(row_index < self.timestamps.len());
164 self.columns[colum_index][row_index].clone()
165 }
166
167 pub fn get_row_count(&self) -> usize {
168 self.timestamps.len()
169 }
170
171 pub fn get_column_count(&self) -> usize {
172 self.columns.len()
173 }
174}
175
176#[derive(Debug, Clone, PartialEq)]
177pub enum Value {
178 Bool(bool),
179 Int32(i32),
180 Int64(i64),
181 Float(f32),
182 Double(f64),
183 Text(String),
184 Null,
185}
186
187impl ToString for Value {
188 fn to_string(&self) -> String {
189 match &self {
190 Value::Bool(v) => str!(v),
191 Value::Int32(v) => str!(v),
192 Value::Int64(v) => str!(v),
193 Value::Float(v) => str!(v),
194 Value::Double(v) => str!(v),
195 Value::Text(v) => str!(v),
196 Value::Null => str!("null"),
197 }
198 }
199}
200
201impl From<Vec<u8>> for Value {
202 fn from(mut bytes: Vec<u8>) -> Self {
203 match bytes.remove(0) {
204 0 => Value::Bool(bytes.remove(0) == 1_u8),
205 1 => Value::Int32(i32::from_be_bytes(bytes.try_into().unwrap())),
206 2 => Value::Int64(i64::from_be_bytes(bytes.try_into().unwrap())),
207 3 => Value::Float(f32::from_be_bytes(bytes.try_into().unwrap())),
208 4 => Value::Double(f64::from_be_bytes(bytes.try_into().unwrap())),
209 5 => Value::Text(String::from_utf8(bytes).unwrap()),
210 _ => Value::Null,
211 }
212 }
213}
214
215impl From<&Value> for Vec<u8> {
216 fn from(v: &Value) -> Vec<u8> {
217 match v {
218 Value::Bool(v) => match v {
219 true => vec![TSDataType::Boolean as u8, 1],
220 false => vec![TSDataType::Boolean as u8, 0],
221 },
222 Value::Int32(v) => {
223 let mut buff: Vec<u8> = Vec::with_capacity(4);
224 buff.push(TSDataType::Int32 as u8);
225 buff.append(&mut to_be_vec!(v));
226 buff
227 }
228 Value::Int64(v) => {
229 let mut buff: Vec<u8> = Vec::with_capacity(8);
230 buff.push(TSDataType::Int64 as u8);
231 buff.append(&mut to_be_vec!(v));
232 buff
233 }
234 Value::Float(v) => {
235 let mut buff: Vec<u8> = Vec::with_capacity(4);
236 buff.push(TSDataType::Float as u8);
237 buff.append(&mut to_be_vec!(v));
238 buff
239 }
240 Value::Double(v) => {
241 let mut buff: Vec<u8> = Vec::with_capacity(8);
242 buff.push(TSDataType::Double as u8);
243 buff.append(&mut to_be_vec!(v));
244 buff
245 }
246 Value::Text(t) => {
247 let mut buff: Vec<u8> = Vec::with_capacity(4 + t.len());
248 let len = t.len() as i32;
249 buff.push(TSDataType::Text as u8);
250 buff.append(&mut to_be_vec!(len));
251 buff.append(&mut t.as_bytes().to_vec());
252 buff
253 }
254 Value::Null => vec![],
255 }
256 }
257}
258
259#[derive(Clone, Debug)]
260pub struct RowRecord {
261 pub timestamp: i64,
262 pub values: Vec<Value>,
263}
264pub trait DataSet: Iterator<Item = RowRecord> {
265 fn get_column_names(&self) -> Vec<String>;
266 fn get_data_types(&self) -> Vec<TSDataType>;
267 fn is_ignore_timestamp(&self) -> bool;
268}
269
270pub trait Session<'a> {
271 fn open(&mut self) -> Result<()>;
272
273 fn close(&mut self) -> Result<()>;
274
275 fn set_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
276
277 fn delete_storage_group(&mut self, storage_group_id: &str) -> Result<()>;
278
279 fn delete_storage_groups(&mut self, storage_group_ids: Vec<&str>) -> Result<()>;
280
281 fn create_timeseries<T>(
282 &mut self,
283 path: &str,
284 data_type: TSDataType,
285 encoding: TSEncoding,
286 compressor: TSCompressionType,
287 props: T,
288 attributes: T,
289 tags: T,
290 measurement_alias: Option<String>,
291 ) -> Result<()>
292 where
293 T: Into<Option<Dictionary>>;
294
295 fn create_multi_timeseries<T>(
296 &mut self,
297 paths: Vec<&str>,
298 data_types: Vec<TSDataType>,
299 encodings: Vec<TSEncoding>,
300 compressors: Vec<TSCompressionType>,
301 props_list: T,
302 attributes_list: T,
303 tags_list: T,
304 measurement_alias_list: Option<Vec<String>>,
305 ) -> Result<()>
306 where
307 T: Into<Option<Vec<Dictionary>>>;
308
309 fn delete_timeseries(&mut self, paths: Vec<&str>) -> Result<()>;
310
311 fn delete_data(&mut self, paths: Vec<&str>, start_time: i64, end_time: i64) -> Result<()>;
312
313 fn insert_string_record<T>(
314 &mut self,
315 device_id: &str,
316 measurements: Vec<&str>,
317 values: Vec<&str>,
318 timestamp: i64,
319 is_aligned: T,
320 timeout: i64
321 ) -> Result<()>
322 where
323 T: Into<Option<bool>>;
324
325 fn get_time_zone(&mut self) -> Result<String>;
326
327 fn set_time_zone(&mut self, time_zone: &str) -> Result<()>;
328
329 fn execute_statement<T>(
330 &'a mut self,
331 statement: &str,
332 timeout_ms: T,
333 ) -> Result<Box<dyn 'a + DataSet>>
334 where
335 T: Into<Option<i64>>;
336
337 fn execute_query_statement<T>(
338 &'a mut self,
339 statement: &str,
340 timeout_ms: T,
341 ) -> Result<Box<dyn 'a + DataSet>>
342 where
343 T: Into<Option<i64>>;
344
345 fn insert_record<T>(
346 &mut self,
347 device_id: &str,
348 measurements: Vec<&str>,
349 values: Vec<Value>,
350 timestamp: i64,
351 is_aligned: T,
352 ) -> Result<()>
353 where
354 T: Into<Option<bool>>;
355
356 fn insert_records_of_one_device(
357 &mut self,
358 device_id: &str,
359 timestamps: Vec<i64>,
360 measurements: Vec<Vec<&str>>,
361 values: Vec<Vec<Value>>,
362 sorted: bool,
363 ) -> Result<()>;
364
365 fn insert_records(
366 &mut self,
367 prefix_path: Vec<&str>,
368 measurements: Vec<Vec<&str>>,
369 values: Vec<Vec<Value>>,
370 timestamps: Vec<i64>,
371 ) -> Result<()>;
372
373 fn insert_tablet(&mut self, tablet: &Tablet) -> Result<()>;
374
375 fn insert_tablets(&mut self, tablets: Vec<&Tablet>) -> Result<()>;
376
377 fn execute_batch_statement(&mut self, statemens: Vec<&str>) -> Result<()>;
378
379 fn execute_raw_data_query(
380 &'a mut self,
381 paths: Vec<&str>,
382 start_time: i64,
383 end_time: i64,
384 timeout_ms: i64
385 ) -> Result<Box<dyn 'a + DataSet>>;
386
387 fn execute_update_statement(
388 &'a mut self,
389 statement: &str,
390 ) -> Result<Option<Box<dyn 'a + DataSet>>>;
391}