1use std::net::TcpStream;
2use std::io::{BufReader, BufWriter};
3use std::io::prelude::*;
4use std::io::{Error, Write};
5use byteorder::{LittleEndian, WriteBytesExt};
6use crate::KObj;
7use super::header::Header;
8use super::ktype::KType;
9
10const UNSUPPORTED_TYPES: [i8;11] = [100, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112];
11
12pub struct Kdb {
13 host: String,
14 port: u16,
15 user: String,
16 pass: String,
17 reader: Option<BufReader<TcpStream>>,
18 writer: Option<BufWriter<TcpStream>>
19}
20
21impl Kdb {
22
23 pub fn new(host: &str, port: u16, user: &str, pass: &str) -> Kdb {
24 Kdb {
25 host: host.to_string(),
26 port,
27 user: user.to_string(),
28 pass: pass.to_string(),
29 reader: None,
30 writer: None
31 }
32 }
33
34 pub fn open(&mut self) -> Result<(),Error> {
35 let mut stream = TcpStream::connect(format!("{}:{}",self.host,self.port))?;
36 let response = format!("{}:{}{}",self.user, self.pass, "\x06\x00");
37 stream.write(response.as_bytes())?;
38 stream.read_exact(&mut [0; 1])?;
39 self.reader = Some(BufReader::new(stream.try_clone()?));
40 self.writer = Some(BufWriter::new(stream));
41 Ok(())
42 }
43
44 pub fn reader(&mut self) -> &mut BufReader<TcpStream> {
45 self.reader.as_mut().unwrap()
46 }
47
48 pub fn writer(&mut self) -> &mut BufWriter<TcpStream> {
49 self.writer.as_mut().unwrap()
50 }
51
52 pub fn close(&mut self) -> Result<(), Error> {
53 self.reader = None;
54 self.writer = None;
55 Ok(())
56 }
57
58 pub fn send_async(&mut self, data: &KObj) -> Result<(), Error> {
59 if self.writer.is_none() {
60 self.open()?;
61 };
62 let header_bytes = vec![1, 0, 0, 0];
63 let mut data_bytes = data.serialize();
64 let type_bytes = vec![data.type_as_bytes()];
65 let mut size_bytes = vec![];
66 size_bytes.write_i32::<LittleEndian>((4 + header_bytes.len() + data_bytes.len() + type_bytes.len()) as i32)?;
67 data_bytes.splice(0..0, type_bytes);
68 data_bytes.splice(0..0, size_bytes);
69 data_bytes.splice(0..0, header_bytes);
70 let writer = self.writer();
71 writer.write(&data_bytes)?;
72 Ok(())
73 }
74
75 pub fn read(&mut self) -> KObj {
76 if self.reader.is_none() {
77 match self.open() {
78 Ok(_) => {},
79 Err(e) => {return KObj::Error(format!("{}",e))}
80 };
81 };
82 let msg_header = Header::read(self);
83 let reader = self.reader();
84
85
86 let mut msg_type = [0;1];
87 reader.read_exact(&mut msg_type).unwrap();
88 let msg_type = i8::from_le_bytes(msg_type);
89
90 if UNSUPPORTED_TYPES.contains(&msg_type){
91 reader.read_exact(&mut vec![0;(msg_header.length - 8) as usize]).unwrap();
93 return KObj::Error(String::from("type unsupported by rsq"))
94 };
95 let data = self.read_data(msg_type);
96
97 if msg_header.protocol == 1 {
98 self.send_response(&KObj::Atom(KType::Boolean(true))).unwrap();
99 };
100
101 data
102
103 }
104
105 fn extract_atom(&mut self, len: usize) -> Vec<u8> {
106 let mut vec = vec![0;len];
107 self.reader().read_exact(&mut vec).unwrap();
108 vec
109 }
110
111 fn extract_string(&mut self) -> Vec<u8> {
112 let stream = self.reader();
113 stream.read_exact(&mut [0;1]).unwrap(); let mut len = [0;4];
116 stream.read_exact(&mut len).unwrap();
117 let len = u32::from_le_bytes(len) as usize;
118
119 let mut string = vec![0;len];
120 stream.read_exact(&mut string).unwrap();
121 string
122 }
123
124 fn extract_sym(&mut self) -> Vec<u8> {
125 let stream = self.reader();
126 let mut sym = vec![];
127 let mut bit = [1;1];
128 loop {
129 stream.read_exact(&mut bit).unwrap();
130 if bit[0] == 0 { break };
131 sym.push(bit[0]);
132 }
133 sym
134 }
135
136 fn read_atom(&mut self, ktype: KType) -> KObj {
137 let vec_data = match ktype {
138 KType::Boolean(_) => self.extract_atom(1),
139 KType::Guid(_) => self.extract_atom(16),
140 KType::Byte(_) => self.extract_atom(1),
141 KType::Short(_) => self.extract_atom(2),
142 KType::Int(_) => self.extract_atom(4),
143 KType::Long(_) => self.extract_atom(8),
144 KType::Real(_) => self.extract_atom(4),
145 KType::Float(_) => self.extract_atom(8),
146 KType::Char(_) => self.extract_atom(1),
147 KType::String(_) => self.extract_string(),
148 KType::Symbol(_) => self.extract_sym(),
149 KType::Timestamp(_) => self.extract_atom(8),
150 KType::Month(_) => self.extract_atom(4),
151 KType::Date(_) => self.extract_atom(4),
152 KType::Datetime(_) => self.extract_atom(8),
153 KType::Timespan(_) => self.extract_atom(8),
154 KType::Minute(_) => self.extract_atom(4),
155 KType::Second(_) => self.extract_atom(4),
156 KType::Time(_) => self.extract_atom(4),
157 KType::Unary(_) => self.extract_atom(1),
158 KType::Operator(_) => self.extract_atom(1),
159 };
160 KObj::Atom(ktype).deserialize(&vec_data)
161 }
162
163 fn read_uniform_list(&mut self, msg_type: i8, len: u32) -> KObj {
164 let mut list = vec![];
165 for _ in 0..len {
166 let data = self.read_data(-1 * msg_type);
167 list.push(data);
168 };
169 KObj::List(list)
170 }
171
172 fn read_generic_list(&mut self, len:u32) -> KObj {
173 let mut list = vec![];
174 for _ in 0..len{
175 let mut msg_type = [0;1];
176 self.reader().read_exact(&mut msg_type).unwrap();
177 let msg_code = i8::from_le_bytes(msg_type);
178 list.push(self.read_data(msg_code));
179 };
180 KObj::List(list)
181 }
182
183 fn read_list(&mut self, msg_type: i8) -> KObj {
184 let mut attr = [0;1];
185 self.reader().read_exact(&mut attr).unwrap(); let mut len = [0;4]; self.reader().read_exact(&mut len).unwrap();
188 let len = u32::from_le_bytes(len);
189 if msg_type == 0 {
190 self.read_generic_list(len)
191 } else {
192 self.read_uniform_list(msg_type, len)
193 }
194 }
195
196 fn read_dict(&mut self) -> KObj {
197
198 let mut key_type = [0;1];
199 self.reader().read_exact(&mut key_type).unwrap();
200 let key_type = i8::from_le_bytes(key_type);
201
202 let keys = self.read_data(key_type);
203
204 let mut val_type = [0;1];
205 self.reader().read_exact(&mut val_type).unwrap();
206 let val_type = i8::from_le_bytes(val_type);
207 let vals = self.read_data(val_type);
208
209 let keys: Vec<KObj> = match keys {
210 KObj::List(k) => k,
211 _ => return KObj::Error("keys of dictionary must be a list".to_string()) };
213
214 let vals = match vals {
215 KObj::List(k) => k,
216 _ => return KObj::Error("keys of dictionary must be a list".to_string()) };
218
219 KObj::Dict(keys, vals)
220
221 }
222
223 fn read_table(&mut self) -> KObj {
224
225 let mut key_type = [0;1];
226 self.reader().read_exact(&mut key_type).unwrap();
227 let key_type = i8::from_le_bytes(key_type);
228
229 let keys = self.read_data(key_type);
230
231 let mut val_type = [0;1];
232 self.reader().read_exact(&mut val_type).unwrap();
233 let val_type = i8::from_le_bytes(val_type);
234 let vals = self.read_data(val_type);
235
236 let keys: Vec<KObj> = match keys {
237 KObj::List(k) => k,
238 _ => return KObj::Error("keys of dictionary must be a list".to_string()) };
240
241 let vals = match vals {
242 KObj::List(k) => k,
243 _ => return KObj::Error("keys of dictionary must be a list".to_string()) };
245
246 KObj::Table(keys, vals)
247
248 }
249
250 fn read_error(&mut self) -> KObj {
251 let error_msg = self.extract_sym();
252 KObj::Error(String::from_utf8(error_msg.to_vec()).unwrap())
253 }
254
255 fn read_data(&mut self, msg_type: i8) -> KObj {
256 let mut kobj = KObj::new(msg_type);
257 kobj = match kobj {
258 KObj::Atom(k) => self.read_atom(k),
259 KObj::List(_) => self.read_list(msg_type),
260 KObj::GenericList(_) => self.read_list(msg_type),
261 KObj::Dict(_,_) => self.read_dict(),
262 KObj::Table(_,_) => {
263 self.reader().read_exact(&mut[0;2]).unwrap();
264 self.read_table()
265 },
266 KObj::Error(_) => {
267 self.read_error()
268 }
269 };
270 kobj
271 }
272
273 pub fn send_sync(&mut self, data: &KObj) -> Result<KObj, Error> {
274 if self.writer.is_none() {
275 self.open()?;
276 };
277 let header_bytes = vec![1, 1, 0, 0];
278 let mut data_bytes = data.serialize();
279 let type_bytes = vec![data.type_as_bytes()];
280 let mut size_bytes = vec![];
281 size_bytes.write_i32::<LittleEndian>((4 + header_bytes.len() + data_bytes.len() + type_bytes.len()) as i32).unwrap();
282 data_bytes.splice(0..0, type_bytes);
283 data_bytes.splice(0..0, size_bytes);
284 data_bytes.splice(0..0, header_bytes);
285 self.writer().write(&data_bytes).unwrap();
287 self.writer().flush().unwrap();
288 let response = self.read();
289 Ok(response)
290 }
291
292 pub fn send_response(&mut self, data: &KObj) -> Result<(), Error> {
293 if self.writer.is_none() {
294 self.open()?;
295 };
296 let header_bytes = vec![1, 2, 0, 0];
297 let mut data_bytes = data.serialize();
298 let type_bytes = vec![data.type_as_bytes()];
299 let mut size_bytes = vec![];
300 size_bytes.write_i32::<LittleEndian>((4 + header_bytes.len() + data_bytes.len() + type_bytes.len()) as i32).unwrap();
301 data_bytes.splice(0..0, type_bytes);
302 data_bytes.splice(0..0, size_bytes);
303 data_bytes.splice(0..0, header_bytes);
304 self.writer().write(&data_bytes).unwrap();
305 self.writer().flush().unwrap();
306 Ok(())
307 }
308}