rsq/
kdb.rs

1use std::net::TcpStream;
2use std::io::{BufReader, BufWriter};
3use std::io::prelude::*;
4use std::io::{Error, Write};
5use byteorder::{LittleEndian, WriteBytesExt};
6use crate::KObj;
7use super::header::Header;
8use super::ktype::KType;
9
10const UNSUPPORTED_TYPES: [i8;11] = [100, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112];
11
12pub struct Kdb {
13    host: String,
14    port: u16,
15    user: String,
16    pass: String,
17    reader: Option<BufReader<TcpStream>>,
18    writer: Option<BufWriter<TcpStream>>
19}
20
21impl Kdb {
22
23    pub fn new(host: &str, port: u16, user: &str, pass: &str) -> Kdb {
24        Kdb {
25            host: host.to_string(),
26            port,
27            user: user.to_string(),
28            pass: pass.to_string(),
29            reader: None,
30            writer: None
31        }
32    }
33
34    pub fn open(&mut self) -> Result<(),Error> {
35        let mut stream = TcpStream::connect(format!("{}:{}",self.host,self.port))?;
36        let response = format!("{}:{}{}",self.user, self.pass, "\x06\x00");
37        stream.write(response.as_bytes())?;
38        stream.read_exact(&mut [0; 1])?;
39        self.reader = Some(BufReader::new(stream.try_clone()?));
40        self.writer = Some(BufWriter::new(stream));
41        Ok(())
42    }
43
44    pub fn reader(&mut self) -> &mut BufReader<TcpStream> {
45        self.reader.as_mut().unwrap()
46    }
47
48    pub fn writer(&mut self) -> &mut BufWriter<TcpStream> {
49        self.writer.as_mut().unwrap()
50    }
51
52    pub fn close(&mut self) -> Result<(), Error> {
53        self.reader = None;
54        self.writer = None;
55        Ok(())
56    }
57
58    pub fn send_async(&mut self, data: &KObj) -> Result<(), Error> {
59        if self.writer.is_none() {
60            self.open()?;
61        };
62        let header_bytes = vec![1, 0, 0, 0];
63        let mut data_bytes = data.serialize();
64        let type_bytes = vec![data.type_as_bytes()];
65        let mut size_bytes = vec![];
66        size_bytes.write_i32::<LittleEndian>((4 + header_bytes.len() + data_bytes.len() + type_bytes.len()) as i32)?;
67        data_bytes.splice(0..0, type_bytes);
68        data_bytes.splice(0..0, size_bytes);
69        data_bytes.splice(0..0, header_bytes);
70        let writer = self.writer();
71        writer.write(&data_bytes)?;
72        Ok(())
73    }
74
75    pub fn read(&mut self) -> KObj {
76        if self.reader.is_none() {
77            match self.open() {
78                Ok(_) => {},
79                Err(e) => {return KObj::Error(format!("{}",e))}
80            };
81        };
82        let msg_header = Header::read(self);
83        let reader = self.reader();
84
85
86        let mut msg_type = [0;1];
87        reader.read_exact(&mut msg_type).unwrap();
88        let msg_type = i8::from_le_bytes(msg_type);
89
90        if UNSUPPORTED_TYPES.contains(&msg_type){
91            // clear the buffer and return error
92            reader.read_exact(&mut vec![0;(msg_header.length - 8) as usize]).unwrap();
93            return KObj::Error(String::from("type unsupported by rsq"))
94        };
95        let data = self.read_data(msg_type);
96
97        if msg_header.protocol == 1 {
98            self.send_response(&KObj::Atom(KType::Boolean(true))).unwrap();
99        };
100
101        data
102
103    }
104
105    fn extract_atom(&mut self, len: usize) -> Vec<u8> {
106        let mut vec = vec![0;len];
107        self.reader().read_exact(&mut vec).unwrap();
108        vec
109    }
110
111    fn extract_string(&mut self) -> Vec<u8> {
112        let stream = self.reader();
113        stream.read_exact(&mut [0;1]).unwrap(); // discard attribute
114
115        let mut len = [0;4];
116        stream.read_exact(&mut len).unwrap();
117        let len = u32::from_le_bytes(len) as usize;
118
119        let mut string = vec![0;len];
120        stream.read_exact(&mut string).unwrap();
121        string
122    }
123
124    fn extract_sym(&mut self) -> Vec<u8> {
125        let stream = self.reader();
126        let mut sym = vec![];
127        let mut bit = [1;1];
128        loop {
129            stream.read_exact(&mut bit).unwrap();
130            if bit[0] == 0 { break };
131            sym.push(bit[0]);
132        }
133        sym
134    }
135
136    fn read_atom(&mut self, ktype: KType) -> KObj {
137        let vec_data = match ktype {
138            KType::Boolean(_)   => self.extract_atom(1),
139            KType::Guid(_)      => self.extract_atom(16),
140            KType::Byte(_)      => self.extract_atom(1),
141            KType::Short(_)     => self.extract_atom(2),
142            KType::Int(_)       => self.extract_atom(4),
143            KType::Long(_)      => self.extract_atom(8),
144            KType::Real(_)      => self.extract_atom(4),
145            KType::Float(_)     => self.extract_atom(8),
146            KType::Char(_)      => self.extract_atom(1),
147            KType::String(_)    => self.extract_string(),
148            KType::Symbol(_)    => self.extract_sym(),
149            KType::Timestamp(_) => self.extract_atom(8),
150            KType::Month(_)     => self.extract_atom(4),
151            KType::Date(_)      => self.extract_atom(4),
152            KType::Datetime(_)  => self.extract_atom(8),
153            KType::Timespan(_)  => self.extract_atom(8),
154            KType::Minute(_)    => self.extract_atom(4),
155            KType::Second(_)    => self.extract_atom(4),
156            KType::Time(_)      => self.extract_atom(4),
157            KType::Unary(_)     => self.extract_atom(1),
158            KType::Operator(_)  => self.extract_atom(1),
159        };
160        KObj::Atom(ktype).deserialize(&vec_data)
161    }
162
163    fn read_uniform_list(&mut self, msg_type: i8, len: u32) -> KObj {
164        let mut list = vec![];
165        for _ in 0..len {
166            let data = self.read_data(-1 * msg_type);
167            list.push(data);
168        };  
169        KObj::List(list)
170    }
171
172    fn read_generic_list(&mut self, len:u32) -> KObj {
173        let mut list = vec![];
174        for _ in 0..len{
175            let mut msg_type = [0;1];
176            self.reader().read_exact(&mut msg_type).unwrap();
177            let msg_code = i8::from_le_bytes(msg_type);
178            list.push(self.read_data(msg_code));
179        };  
180        KObj::List(list)
181    }  
182
183    fn read_list(&mut self, msg_type: i8) -> KObj {
184        let mut attr = [0;1];
185        self.reader().read_exact(&mut attr).unwrap(); // throw away attribute for now
186        let mut len = [0;4];                     // extract vector length
187        self.reader().read_exact(&mut len).unwrap();
188        let len = u32::from_le_bytes(len);
189        if msg_type == 0 {
190            self.read_generic_list(len)
191        } else {
192            self.read_uniform_list(msg_type, len)
193        }
194    }
195
196    fn read_dict(&mut self) -> KObj {
197
198        let mut key_type = [0;1];
199        self.reader().read_exact(&mut key_type).unwrap();
200        let key_type = i8::from_le_bytes(key_type);
201
202        let keys = self.read_data(key_type);
203
204        let mut val_type = [0;1];
205        self.reader().read_exact(&mut val_type).unwrap();
206        let val_type = i8::from_le_bytes(val_type);
207        let vals = self.read_data(val_type);
208
209        let keys: Vec<KObj> = match keys {
210             KObj::List(k) => k,
211            _ => return KObj::Error("keys of dictionary must be a list".to_string()) // this shouldn't happen
212        };
213
214        let vals = match vals {
215            KObj::List(k) => k,
216           _ => return KObj::Error("keys of dictionary must be a list".to_string()) // this shouldn't happen
217        };
218        
219        KObj::Dict(keys, vals)
220        
221    }
222
223    fn read_table(&mut self) -> KObj {
224
225        let mut key_type = [0;1];
226        self.reader().read_exact(&mut key_type).unwrap();
227        let key_type = i8::from_le_bytes(key_type);
228
229        let keys = self.read_data(key_type);
230
231        let mut val_type = [0;1];
232        self.reader().read_exact(&mut val_type).unwrap();
233        let val_type = i8::from_le_bytes(val_type);
234        let vals = self.read_data(val_type);
235
236        let keys: Vec<KObj> = match keys {
237             KObj::List(k) => k,
238            _ => return KObj::Error("keys of dictionary must be a list".to_string()) // this shouldn't happen
239        };
240
241        let vals = match vals {
242            KObj::List(k) => k,
243           _ => return KObj::Error("keys of dictionary must be a list".to_string()) // this shouldn't happen
244        };
245        
246        KObj::Table(keys, vals)
247     
248    }
249
250    fn read_error(&mut self) -> KObj {
251        let error_msg = self.extract_sym();
252        KObj::Error(String::from_utf8(error_msg.to_vec()).unwrap())
253    }
254
255    fn read_data(&mut self, msg_type: i8) -> KObj {
256        let mut kobj = KObj::new(msg_type);
257        kobj = match kobj {
258            KObj::Atom(k) => self.read_atom(k),
259            KObj::List(_) => self.read_list(msg_type),
260            KObj::GenericList(_) => self.read_list(msg_type),
261            KObj::Dict(_,_) => self.read_dict(),
262            KObj::Table(_,_) => {
263                self.reader().read_exact(&mut[0;2]).unwrap();
264                self.read_table()
265            },
266            KObj::Error(_) => {
267                self.read_error()
268            }
269        };
270        kobj
271    }
272
273    pub fn send_sync(&mut self, data: &KObj) -> Result<KObj, Error> {
274        if self.writer.is_none() {
275            self.open()?;
276        };
277        let header_bytes = vec![1, 1, 0, 0];
278        let mut data_bytes = data.serialize();
279        let type_bytes = vec![data.type_as_bytes()];
280        let mut size_bytes = vec![];
281        size_bytes.write_i32::<LittleEndian>((4 + header_bytes.len() + data_bytes.len() + type_bytes.len()) as i32).unwrap();
282        data_bytes.splice(0..0, type_bytes);
283        data_bytes.splice(0..0, size_bytes);
284        data_bytes.splice(0..0, header_bytes);
285        // println!("{:?}", data_bytes);
286        self.writer().write(&data_bytes).unwrap();
287        self.writer().flush().unwrap();    
288        let response = self.read();
289        Ok(response)
290    }
291
292    pub fn send_response(&mut self, data: &KObj) -> Result<(), Error> {
293        if self.writer.is_none() {
294            self.open()?;
295        };
296        let header_bytes = vec![1, 2, 0, 0];
297        let mut data_bytes = data.serialize();
298        let type_bytes = vec![data.type_as_bytes()];
299        let mut size_bytes = vec![];
300        size_bytes.write_i32::<LittleEndian>((4 + header_bytes.len() + data_bytes.len() + type_bytes.len()) as i32).unwrap();
301        data_bytes.splice(0..0, type_bytes);
302        data_bytes.splice(0..0, size_bytes);
303        data_bytes.splice(0..0, header_bytes);
304        self.writer().write(&data_bytes).unwrap();
305        self.writer().flush().unwrap();    
306        Ok(())
307    }
308}