Skip to main content

rusty_tarantool/tarantool/
packets.rs

1#![allow(non_camel_case_types)]
2use std::io;
3use std::io::Cursor;
4use std::str;
5
6use crate::tarantool::tools;
7use rmpv::Value;
8use serde::{Deserialize, Serialize};
9
10use bytes::Bytes;
11use std::collections::HashMap;
12
13/// tarantool auth packet
14#[derive(Debug, Clone)]
15pub struct AuthPacket {
16    pub login: String,
17    pub password: String,
18}
19
20/// tarantool packet intended for serialize and cross thread send
21#[derive(Debug, Clone)]
22pub struct CommandPacket {
23    pub code: Code,
24    pub internal_fields: Vec<(Key, Value)>,
25    pub command_field: Vec<(Key, Vec<u8>)>,
26}
27
28/// Tarantool request enum (auth or ordinary packet)
29#[derive(Debug, Clone)]
30pub enum TarantoolRequest {
31    Auth(AuthPacket),
32    Command(CommandPacket),
33}
34
35/// Tarantool response struct
36///
37/// use any decode method to decode tarantool response to custom struct by serde
38/// please look examples
39/// https://github.com/zheludkovm/RustyTarantool/tree/master/examples
40///
41#[derive(Debug)]
42pub struct TarantoolResponse {
43    pub code: u64,
44    pub data: Bytes,
45    pub sql_metadata: Option<Bytes>,
46    pub sql_info: Option<Bytes>,
47}
48
49pub struct TarantoolSqlResponse {
50   response: TarantoolResponse,
51}
52
53pub type UntypedRow = Vec<Value>;
54
55#[derive(Debug, PartialEq)]
56pub enum SqlMetaType {
57    boolean,
58    integer,
59 	unsigned,
60 	number,
61 	string,
62 	varbinary,
63 	scalar,
64    unknown(String)
65}
66
67impl<'de> Deserialize<'de> for SqlMetaType {
68    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
69        where D: serde::de::Deserializer<'de>
70    {
71        let s = String::deserialize(deserializer)?;
72        Ok(match s.as_str() {
73            "boolean" => SqlMetaType::boolean,
74            "integer" => SqlMetaType::integer,
75            "unsigned" => SqlMetaType::unsigned,
76            "number" => SqlMetaType::number,
77            "string" => SqlMetaType::string,
78            "varbinary" => SqlMetaType::varbinary,
79            "scalar" => SqlMetaType::scalar,
80
81            v => SqlMetaType::unknown(String::from( v))
82        })
83    }
84}
85
86#[derive(Debug, Deserialize, PartialEq)]
87pub struct SqlResultMetadataFieldInfo{
88    name:String,
89    sql_type:SqlMetaType
90}
91
92#[derive(Debug, Deserialize, PartialEq)]
93pub struct SqlResultMetadata {
94    pub fields : Option<Vec<SqlResultMetadataFieldInfo>>,
95    pub row_count: Option<u64>,
96    pub auto_increment_ids: Option<Vec<u64>>,
97}
98
99#[derive(Debug, Clone)]
100pub enum Code {
101    SELECT = 0x01,
102    INSERT = 0x02,
103    REPLACE = 0x03,
104    UPDATE = 0x04,
105    DELETE = 0x05,
106    OLD_CALL = 0x06,
107    AUTH = 0x07,
108    EVAL = 0x08,
109    UPSERT = 0x09,
110    CALL = 0x0a,
111    PING = 0x040,
112    SUBSCRIBE = 0x066,
113    EXECUTE = 0x0b,
114    // PREPARE = 0x0d,
115}
116
117#[derive(Debug, Clone)]
118pub enum SqlInfo {
119    SQL_INFO_ROW_COUNT = 0x00,
120    SQL_INFO_AUTO_INCREMENT_IDS = 0x01,
121}
122
123#[derive(Debug, Copy, Clone)]
124pub enum Key {
125    //header
126    CODE = 0x00,
127    SYNC = 0x01,
128    SCHEMA_ID = 0x05,
129
130    //body
131    SPACE = 0x10,
132    INDEX = 0x11,
133    LIMIT = 0x12,
134    OFFSET = 0x13,
135    ITERATOR = 0x14,
136    KEY = 0x20,
137    TUPLE = 0x21,
138    FUNCTION = 0x22,
139    USER_NAME = 0x23,
140    EXPRESSION = 0x27,
141    UPSERT_OPS = 0x28,
142    DATA = 0x30,
143    ERROR = 0x31,
144    METADATA = 0x32,
145    SQL_INFO = 0x42,
146
147    STMT_ID = 0x43,
148    SQL_TEXT = 0x40,
149    SQL_BIND = 0x41,
150    OPTIONS = 0x2b,
151}
152
153impl TarantoolResponse {
154    pub fn new_short_response(code: u64, data: Bytes) -> TarantoolResponse {
155        TarantoolResponse { code, data, sql_info: None, sql_metadata: None }
156    }
157
158    pub fn new_full_response(code: u64, data: Bytes, sql_metadata: Option<Bytes>, sql_info: Option<Bytes>) -> TarantoolResponse {
159        TarantoolResponse { code, data, sql_info, sql_metadata }
160    }
161
162    /// decode tarantool response to any serder deserializable struct
163    pub fn decode<'de, T>(self) -> io::Result<T>
164    where
165        T: Deserialize<'de>,
166    {
167        tools::decode_serde(Cursor::new(self.data))
168    }
169
170    /// decode tarantool response to any serder deserializable struct
171    pub fn decode_result_set<'de, T>(self) -> io::Result<Vec<T>>
172        where
173            T: Deserialize<'de>,
174    {
175        tools::decode_serde(Cursor::new(self.data))
176    }
177
178    /// decode tarantool response to tuple wih one element and return this element
179    pub fn decode_single<'de, T>(self) -> io::Result<T>
180    where
181        T: Deserialize<'de>,
182    {
183        let (res,) = tools::decode_serde(Cursor::new(self.data))?;
184        Ok(res)
185    }
186
187    /// decode tarantool response to tuple of two elements
188    pub fn decode_pair<'de, T1, T2>(self) -> io::Result<(T1, T2)>
189    where
190        T1: Deserialize<'de>,
191        T2: Deserialize<'de>,
192    {
193        Ok(tools::decode_serde(Cursor::new(self.data))?)
194    }
195
196    ///decode tarantool response to three elements
197    pub fn decode_trio<'de, T1, T2, T3>(self) -> io::Result<(T1, T2, T3)>
198    where
199        T1: Deserialize<'de>,
200        T2: Deserialize<'de>,
201        T3: Deserialize<'de>,
202    {
203        let (r1, r2, r3) = tools::decode_serde(Cursor::new(self.data))?;
204        Ok((r1, r2, r3))
205    }
206}
207
208impl Into<TarantoolSqlResponse> for TarantoolResponse {
209    fn into(self) -> TarantoolSqlResponse {
210        TarantoolSqlResponse{ response:self}
211    }
212}
213
214
215
216impl  TarantoolSqlResponse {
217    /// decode tarantool response to any serder deserializable struct
218    pub fn decode_result_set<'de, T>(self) -> io::Result<Vec<T>>
219        where
220            T: Deserialize<'de>,
221    {
222        tools::decode_serde(Cursor::new(self.response.data))
223    }
224
225    ///decode rows to vec of columns
226    pub fn decode_untyped_result_set(self) -> io::Result<Vec<UntypedRow>> {
227        tools::decode_serde(Cursor::new(self.response.data))
228    }
229
230    ///result set metadata
231    pub fn metadata(&self) -> SqlResultMetadata {
232        let sql_info : Option<HashMap<u8, Value>> = tools::decode_serde_optional(&self.response.sql_info);
233        println!("sql_info={:?}", sql_info);
234        let sql_info_fields = sql_info
235            .map(|mut info| {
236                (info.remove(&(SqlInfo::SQL_INFO_ROW_COUNT as u8)).and_then(|v|v.as_u64()),
237                 info.remove(&(SqlInfo::SQL_INFO_AUTO_INCREMENT_IDS as u8))
238                     .and_then(|val|val.as_array().map(|arr|{
239                         arr.iter().flat_map(|e|e.as_u64()).collect()
240                     }))
241                )
242            }).unwrap_or((None, None));
243
244        let fields : Option<Vec<SqlResultMetadataFieldInfo>> = tools::decode_serde_optional( &self.response.sql_metadata);
245        SqlResultMetadata {
246            fields,
247            row_count:sql_info_fields.0,
248            auto_increment_ids:sql_info_fields.1
249        }
250    }
251
252}
253
254
255
256impl CommandPacket {
257    pub fn call<T>(function: &str, params: &T) -> io::Result<CommandPacket>
258    where
259        T: Serialize,
260    {
261        CommandPacket::call_raw(function, tools::serialize_to_vec_u8(params)?)
262    }
263
264    pub fn call_raw(function: &str, params: Vec<u8>) -> io::Result<CommandPacket>
265    {
266        Ok(CommandPacket {
267            code: Code::CALL,
268            internal_fields: vec![(Key::FUNCTION, Value::from(function))],
269            command_field: vec![(Key::TUPLE, params)],
270        })
271    }
272
273    pub fn select<T>(
274        space: i32,
275        index: i32,
276        key: &T,
277        offset: i32,
278        limit: i32,
279        iterator: i32,
280    ) -> io::Result<CommandPacket>
281    where
282        T: Serialize,
283    {
284        Ok(CommandPacket {
285            code: Code::SELECT,
286            internal_fields: vec![
287                (Key::SPACE, Value::from(space)),
288                (Key::INDEX, Value::from(index)),
289                (Key::ITERATOR, Value::from(iterator)),
290                (Key::LIMIT, Value::from(limit)),
291                (Key::OFFSET, Value::from(offset)),
292            ],
293            command_field: vec![(Key::KEY, tools::serialize_to_vec_u8(key)?)],
294        })
295    }
296
297    pub fn insert<T>(space: i32, tuple: &T) -> io::Result<CommandPacket>
298    where
299        T: Serialize,
300    {
301        Ok(CommandPacket {
302            code: Code::INSERT,
303            internal_fields: vec![(Key::SPACE, Value::from(space))],
304            command_field: vec![(Key::TUPLE, tools::serialize_to_vec_u8(tuple)?)],
305        })
306    }
307
308    pub fn replace<T>(space: i32, tuple: &T) -> io::Result<CommandPacket>
309    where
310        T: Serialize,
311    {
312        Ok(CommandPacket {
313            code: Code::REPLACE,
314            internal_fields: vec![(Key::SPACE, Value::from(space))],
315            command_field: vec![(Key::TUPLE, tools::serialize_to_vec_u8(tuple)?)],
316        })
317    }
318
319    pub fn replace_raw(space: i32, tuple_raw: Vec<u8>) -> io::Result<CommandPacket> {
320        Ok(CommandPacket {
321            code: Code::REPLACE,
322            internal_fields: vec![(Key::SPACE, Value::from(space))],
323            command_field: vec![(Key::TUPLE, tuple_raw)],
324        })
325    }
326
327    pub fn update<T, T2>(space: i32, key: &T2, args: &T) -> io::Result<CommandPacket>
328    where
329        T: Serialize,
330        T2: Serialize,
331    {
332        Ok(CommandPacket {
333            code: Code::UPDATE,
334            internal_fields: vec![(Key::SPACE, Value::from(space))],
335            command_field: vec![
336                (Key::KEY, tools::serialize_to_vec_u8(key)?),
337                (Key::TUPLE, tools::serialize_to_vec_u8(args)?),
338            ],
339        })
340    }
341
342    pub fn upsert<T, T2, T3>(space: i32, key: &T2, def: &T3, args: &T) -> io::Result<CommandPacket>
343    where
344        T: Serialize,
345        T2: Serialize,
346        T3: Serialize,
347    {
348        Ok(CommandPacket {
349            code: Code::UPSERT,
350            internal_fields: vec![(Key::SPACE, Value::from(space))],
351            command_field: vec![
352                (Key::KEY, tools::serialize_to_vec_u8(key)?),
353                (Key::TUPLE, tools::serialize_to_vec_u8(def)?),
354                (Key::UPSERT_OPS, tools::serialize_to_vec_u8(args)?),
355            ],
356        })
357    }
358
359    pub fn delete<T>(space: i32, key: &T) -> io::Result<CommandPacket>
360    where
361        T: Serialize,
362    {
363        Ok(CommandPacket {
364            code: Code::DELETE,
365            internal_fields: vec![(Key::SPACE, Value::from(space))],
366            command_field: vec![(Key::KEY, tools::serialize_to_vec_u8(key)?)],
367        })
368    }
369
370    pub fn eval<T>(expression: String, args: &T) -> io::Result<CommandPacket>
371    where
372        T: Serialize,
373    {
374        Ok(CommandPacket {
375            code: Code::EVAL,
376            internal_fields: vec![(Key::EXPRESSION, Value::from(expression))],
377            command_field: vec![(Key::TUPLE, tools::serialize_to_vec_u8(args)?)],
378        })
379    }
380
381    pub fn ping() -> io::Result<CommandPacket> {
382        Ok(CommandPacket {
383            code: Code::PING,
384            internal_fields: vec![],
385            command_field: vec![],
386        })
387    }
388
389    pub fn exec_sql<T>(sql: &str, args: &T) -> io::Result<CommandPacket>
390        where
391            T: Serialize,
392    {
393        CommandPacket::exec_sql_raw(sql, tools::serialize_to_vec_u8(args)?)
394    }
395
396    pub fn exec_sql_raw(sql: &str, args_raw: Vec<u8>) -> io::Result<CommandPacket>
397    {
398        Ok(CommandPacket {
399            code: Code::EXECUTE,
400            internal_fields: vec![(Key::SQL_TEXT, Value::from(sql))],
401            command_field: vec![
402                (Key::SQL_BIND, args_raw),
403                (Key::OPTIONS, tools::serialize_to_vec_u8(&())?),
404            ],
405        })
406    }
407
408    // pub fn prepare_stmt(sql: String) -> io::Result<CommandPacket>
409    // {
410    //     Ok(CommandPacket {
411    //         code: Code::PREPARE,
412    //         internal_fields: vec![(Key::SQL_TEXT, Value::from(sql))],
413    //         command_field: vec![],
414    //     })
415    // }
416}