bronzedb_protocol/
response.rs

1use super::request::Action::{self, *};
2use crate::ext::{ReadKVExt, WriteKVExt};
3use bronzedb_util::status::StatusCode::{self, *};
4use bronzedb_util::status::{Error, Result};
5use bronzedb_util::types::{Entry, Value};
6use byteorder::{ReadBytesExt, WriteBytesExt};
7use std::io::{Read, Write};
8
9pub enum Response<'a> {
10    Status(StatusCode),
11    SingleValue(Value),
12    Scanner(Box<dyn Iterator<Item = Result<Entry>> + 'a>),
13}
14
15impl<'a> Response<'a> {
16    pub fn write_to(self, mut writer: impl Write) -> Result<usize> {
17        let mut counter = 1usize; // for StatusCode
18        match self {
19            Response::Status(status) => writer.write_u8(status as u8)?,
20            Response::SingleValue(value) => {
21                writer.write_u8(OK as u8)?;
22                counter += writer.write_value(&value)?;
23            }
24            Response::Scanner(iter) => {
25                writer.write_u8(OK as u8)?;
26                for result in iter {
27                    match result {
28                        Ok((key, value)) => {
29                            writer.write_u8(OK as u8)?;
30                            counter += 1 + writer.write_key(&key)? + writer.write_value(&value)?;
31                        }
32                        Err(err) => {
33                            writer.write_u8(err.code as u8)?;
34                            Err(err)?;
35                        }
36                    }
37                }
38                writer.write_u8(Complete as u8)?;
39                counter += 1;
40            }
41        }
42        Ok(counter)
43    }
44
45    pub fn read_from(reader: &'a mut dyn Read, request_action: Action) -> Result<Self> {
46        match reader.read_u8()?.into() {
47            OK => match request_action {
48                Get => Ok(Response::SingleValue(reader.read_value()?)),
49                Delete | Set | Ping => Ok(Response::Status(OK)),
50                Scan => Ok(Response::Scanner(Box::new(ReaderIter::new(reader)))),
51                Unknown => Err(Error::new(
52                    UnknownAction,
53                    format!("unknown action: {:?}", request_action),
54                )),
55                NoResponse => unreachable!(),
56            },
57            code => Ok(Response::Status(code)),
58        }
59    }
60}
61
62struct ReaderIter<'a> {
63    reader: &'a mut dyn Read,
64    complete: bool,
65    err_occurred: bool,
66}
67
68impl<'a> ReaderIter<'a> {
69    fn new(reader: &'a mut dyn Read) -> Self {
70        Self {
71            reader,
72            complete: false,
73            err_occurred: false,
74        }
75    }
76}
77
78impl Iterator for ReaderIter<'_> {
79    type Item = Result<Entry>;
80    fn next(&mut self) -> Option<Self::Item> {
81        if self.complete || self.err_occurred {
82            return None;
83        }
84        match self.read_entry() {
85            Ok(entry) => Some(Ok(entry)),
86            Err(ref err) if err.code == Complete => None,
87            Err(err) => Some(Err(err)),
88        }
89    }
90}
91
92impl ReaderIter<'_> {
93    fn read_entry(&mut self) -> Result<Entry> {
94        match self.reader.read_u8()?.into() {
95            OK => Ok((self.reader.read_key()?.into(), self.reader.read_value()?)),
96            Complete => {
97                self.complete = true;
98                Err(Error::new(Complete, "complete"))
99            }
100            code => {
101                self.err_occurred = true;
102                Err(Error::new(code, "some error"))
103            }
104        }
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::Response::{self, *};
111    use crate::request::Action::*;
112    use crate::{MAX_KEY_LEN, MAX_VALUE_LEN};
113    use matches::matches;
114    use speculate::speculate;
115    use std::io::Cursor;
116    use bronzedb_util::status::StatusCode::{self, *};
117    use bronzedb_util::status::{Error, Result};
118    use bronzedb_util::types::Entry;
119
120    macro_rules! transfer_move {
121        ($new_resp:ident, $origin_resp:expr, $size:expr, $action:expr) => {
122            let mut buffer = Vec::new();
123            assert_eq!($size, $origin_resp.write_to(&mut buffer).unwrap());
124            let mut reader = Cursor::new(buffer);
125            let $new_resp = Response::read_from(&mut reader, $action).unwrap();
126        };
127    }
128
129    macro_rules! transfer_err {
130        ($new_resp:ident, $origin_resp:expr, $action:expr) => {
131            let mut buffer = Vec::new();
132            assert!(matches!($origin_resp.write_to(&mut buffer), Err(_err)));
133            let mut reader = Cursor::new(buffer);
134            let $new_resp = Response::read_from(&mut reader, $action).unwrap();
135        };
136    }
137
138    macro_rules! assert_status_not_ok {
139        ($status:expr) => {
140            transfer_move!(new_resp, Response::Status($status), 1usize, Get);
141            assert!(matches!(new_resp, Status(ref _x)));
142            if let Status(status) = new_resp {
143                assert_eq!($status, status);
144            }
145        };
146    }
147
148    speculate! {
149        describe "status not ok" {
150            it "io error" {
151                assert_status_not_ok!(IOError);
152            }
153
154            it "unknown action" {
155                assert_status_not_ok!(UnknownAction);
156            }
157
158            it "engine error" {
159                assert_status_not_ok!(EngineError);
160            }
161
162            it "not found" {
163                assert_status_not_ok!(NotFound);
164            }
165        }
166    }
167
168    #[test]
169    fn set_ok() {
170        transfer_move!(new_resp, Status(StatusCode::OK), 1usize, Set);
171        assert!(matches!(new_resp, Status(ref _x)));
172        if let Status(code) = new_resp {
173            assert_eq!(StatusCode::OK, code);
174        }
175    }
176
177    #[test]
178    fn delete_ok() {
179        transfer_move!(new_resp, Status(StatusCode::OK), 1usize, Delete);
180        assert!(matches!(new_resp, Status(ref _x)));
181        if let Status(code) = new_resp {
182            assert_eq!(StatusCode::OK, code);
183        }
184    }
185
186    macro_rules! assert_get_ok {
187        ($value:expr) => {
188            transfer_move!(
189                new_resp,
190                SingleValue($value.to_vec()),
191                $value.len() + 3,
192                Get
193            );
194            assert!(matches!(new_resp, SingleValue(_)));
195            if let SingleValue(value) = new_resp {
196                assert_eq!(&$value[..], value.as_slice());
197            }
198        };
199    }
200
201    speculate! {
202        describe "get ok" {
203            it "normal" {
204                assert_get_ok!(b"Hexi");
205            }
206
207            it "zero" {
208                assert_get_ok!([0; 0]);
209            }
210
211            it "max length" {
212                assert_get_ok!([0; MAX_VALUE_LEN]);
213            }
214
215            #[should_panic]
216            it "overflow" {
217                assert_get_ok!([0; MAX_VALUE_LEN + 1]);
218            }
219        }
220    }
221
222    #[test]
223    fn scan_ok() {
224        let origin_data: Vec<Entry> = vec![
225            (b"name"[..].to_vec().into(), b"Hexi"[..].into()),
226            (b""[..].to_vec().into(), b""[..].into()),
227            (
228                [0; MAX_KEY_LEN][..].to_vec().into(),
229                [0; MAX_VALUE_LEN][..].into(),
230            ),
231        ];
232
233        transfer_move!(
234            new_resp,
235            Scanner(Box::new(origin_data.iter().map(|entry| Ok(entry.clone())))),
236            2 + origin_data.len() * 5
237                + origin_data
238                    .iter()
239                    .fold(0, |size, (key, value)| size + key.len() + value.len()),
240            Scan
241        );
242        assert!(matches!(new_resp, Scanner(_)));
243        if let Scanner(iter) = new_resp {
244            let transferred_data = iter.map(|ret| ret.unwrap()).collect::<Vec<Entry>>();
245            assert_eq!(origin_data, transferred_data);
246        }
247    }
248
249    #[test]
250    fn scan_err() {
251        let origin_data: Vec<Result<Entry>> = vec![
252            Ok((b"name"[..].to_vec().into(), b"Hexi"[..].into())),
253            Err(Error::new(StatusCode::IOError, "Some IO Error")),
254            Ok((b"last_name"[..].to_vec().into(), b"Lee"[..].into())),
255        ];
256
257        transfer_err!(
258            new_resp,
259            Scanner(Box::new(origin_data.clone().into_iter())),
260            Scan
261        );
262        assert!(matches!(new_resp, Scanner(_)));
263        if let Scanner(mut iter) = new_resp {
264            assert_eq!(
265                origin_data[0].as_ref().unwrap(),
266                &iter.next().unwrap().unwrap()
267            );
268            assert!(matches!(iter.next().unwrap(), Err(_ref)));
269            assert!(matches!(iter.next(), None));
270        }
271    }
272}