1use super::request::Action::{self, *};
2use crate::ext::{ReadKVExt, WriteKVExt};
3use bronzedb_util::status::StatusCode::{self, *};
4use bronzedb_util::status::{Error, Result};
5use bronzedb_util::types::{Entry, Value};
6use byteorder::{ReadBytesExt, WriteBytesExt};
7use std::io::{Read, Write};
8
9pub enum Response<'a> {
10 Status(StatusCode),
11 SingleValue(Value),
12 Scanner(Box<dyn Iterator<Item = Result<Entry>> + 'a>),
13}
14
15impl<'a> Response<'a> {
16 pub fn write_to(self, mut writer: impl Write) -> Result<usize> {
17 let mut counter = 1usize; match self {
19 Response::Status(status) => writer.write_u8(status as u8)?,
20 Response::SingleValue(value) => {
21 writer.write_u8(OK as u8)?;
22 counter += writer.write_value(&value)?;
23 }
24 Response::Scanner(iter) => {
25 writer.write_u8(OK as u8)?;
26 for result in iter {
27 match result {
28 Ok((key, value)) => {
29 writer.write_u8(OK as u8)?;
30 counter += 1 + writer.write_key(&key)? + writer.write_value(&value)?;
31 }
32 Err(err) => {
33 writer.write_u8(err.code as u8)?;
34 Err(err)?;
35 }
36 }
37 }
38 writer.write_u8(Complete as u8)?;
39 counter += 1;
40 }
41 }
42 Ok(counter)
43 }
44
45 pub fn read_from(reader: &'a mut dyn Read, request_action: Action) -> Result<Self> {
46 match reader.read_u8()?.into() {
47 OK => match request_action {
48 Get => Ok(Response::SingleValue(reader.read_value()?)),
49 Delete | Set | Ping => Ok(Response::Status(OK)),
50 Scan => Ok(Response::Scanner(Box::new(ReaderIter::new(reader)))),
51 Unknown => Err(Error::new(
52 UnknownAction,
53 format!("unknown action: {:?}", request_action),
54 )),
55 NoResponse => unreachable!(),
56 },
57 code => Ok(Response::Status(code)),
58 }
59 }
60}
61
62struct ReaderIter<'a> {
63 reader: &'a mut dyn Read,
64 complete: bool,
65 err_occurred: bool,
66}
67
68impl<'a> ReaderIter<'a> {
69 fn new(reader: &'a mut dyn Read) -> Self {
70 Self {
71 reader,
72 complete: false,
73 err_occurred: false,
74 }
75 }
76}
77
78impl Iterator for ReaderIter<'_> {
79 type Item = Result<Entry>;
80 fn next(&mut self) -> Option<Self::Item> {
81 if self.complete || self.err_occurred {
82 return None;
83 }
84 match self.read_entry() {
85 Ok(entry) => Some(Ok(entry)),
86 Err(ref err) if err.code == Complete => None,
87 Err(err) => Some(Err(err)),
88 }
89 }
90}
91
92impl ReaderIter<'_> {
93 fn read_entry(&mut self) -> Result<Entry> {
94 match self.reader.read_u8()?.into() {
95 OK => Ok((self.reader.read_key()?.into(), self.reader.read_value()?)),
96 Complete => {
97 self.complete = true;
98 Err(Error::new(Complete, "complete"))
99 }
100 code => {
101 self.err_occurred = true;
102 Err(Error::new(code, "some error"))
103 }
104 }
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::Response::{self, *};
111 use crate::request::Action::*;
112 use crate::{MAX_KEY_LEN, MAX_VALUE_LEN};
113 use matches::matches;
114 use speculate::speculate;
115 use std::io::Cursor;
116 use bronzedb_util::status::StatusCode::{self, *};
117 use bronzedb_util::status::{Error, Result};
118 use bronzedb_util::types::Entry;
119
120 macro_rules! transfer_move {
121 ($new_resp:ident, $origin_resp:expr, $size:expr, $action:expr) => {
122 let mut buffer = Vec::new();
123 assert_eq!($size, $origin_resp.write_to(&mut buffer).unwrap());
124 let mut reader = Cursor::new(buffer);
125 let $new_resp = Response::read_from(&mut reader, $action).unwrap();
126 };
127 }
128
129 macro_rules! transfer_err {
130 ($new_resp:ident, $origin_resp:expr, $action:expr) => {
131 let mut buffer = Vec::new();
132 assert!(matches!($origin_resp.write_to(&mut buffer), Err(_err)));
133 let mut reader = Cursor::new(buffer);
134 let $new_resp = Response::read_from(&mut reader, $action).unwrap();
135 };
136 }
137
138 macro_rules! assert_status_not_ok {
139 ($status:expr) => {
140 transfer_move!(new_resp, Response::Status($status), 1usize, Get);
141 assert!(matches!(new_resp, Status(ref _x)));
142 if let Status(status) = new_resp {
143 assert_eq!($status, status);
144 }
145 };
146 }
147
148 speculate! {
149 describe "status not ok" {
150 it "io error" {
151 assert_status_not_ok!(IOError);
152 }
153
154 it "unknown action" {
155 assert_status_not_ok!(UnknownAction);
156 }
157
158 it "engine error" {
159 assert_status_not_ok!(EngineError);
160 }
161
162 it "not found" {
163 assert_status_not_ok!(NotFound);
164 }
165 }
166 }
167
168 #[test]
169 fn set_ok() {
170 transfer_move!(new_resp, Status(StatusCode::OK), 1usize, Set);
171 assert!(matches!(new_resp, Status(ref _x)));
172 if let Status(code) = new_resp {
173 assert_eq!(StatusCode::OK, code);
174 }
175 }
176
177 #[test]
178 fn delete_ok() {
179 transfer_move!(new_resp, Status(StatusCode::OK), 1usize, Delete);
180 assert!(matches!(new_resp, Status(ref _x)));
181 if let Status(code) = new_resp {
182 assert_eq!(StatusCode::OK, code);
183 }
184 }
185
186 macro_rules! assert_get_ok {
187 ($value:expr) => {
188 transfer_move!(
189 new_resp,
190 SingleValue($value.to_vec()),
191 $value.len() + 3,
192 Get
193 );
194 assert!(matches!(new_resp, SingleValue(_)));
195 if let SingleValue(value) = new_resp {
196 assert_eq!(&$value[..], value.as_slice());
197 }
198 };
199 }
200
201 speculate! {
202 describe "get ok" {
203 it "normal" {
204 assert_get_ok!(b"Hexi");
205 }
206
207 it "zero" {
208 assert_get_ok!([0; 0]);
209 }
210
211 it "max length" {
212 assert_get_ok!([0; MAX_VALUE_LEN]);
213 }
214
215 #[should_panic]
216 it "overflow" {
217 assert_get_ok!([0; MAX_VALUE_LEN + 1]);
218 }
219 }
220 }
221
222 #[test]
223 fn scan_ok() {
224 let origin_data: Vec<Entry> = vec![
225 (b"name"[..].to_vec().into(), b"Hexi"[..].into()),
226 (b""[..].to_vec().into(), b""[..].into()),
227 (
228 [0; MAX_KEY_LEN][..].to_vec().into(),
229 [0; MAX_VALUE_LEN][..].into(),
230 ),
231 ];
232
233 transfer_move!(
234 new_resp,
235 Scanner(Box::new(origin_data.iter().map(|entry| Ok(entry.clone())))),
236 2 + origin_data.len() * 5
237 + origin_data
238 .iter()
239 .fold(0, |size, (key, value)| size + key.len() + value.len()),
240 Scan
241 );
242 assert!(matches!(new_resp, Scanner(_)));
243 if let Scanner(iter) = new_resp {
244 let transferred_data = iter.map(|ret| ret.unwrap()).collect::<Vec<Entry>>();
245 assert_eq!(origin_data, transferred_data);
246 }
247 }
248
249 #[test]
250 fn scan_err() {
251 let origin_data: Vec<Result<Entry>> = vec![
252 Ok((b"name"[..].to_vec().into(), b"Hexi"[..].into())),
253 Err(Error::new(StatusCode::IOError, "Some IO Error")),
254 Ok((b"last_name"[..].to_vec().into(), b"Lee"[..].into())),
255 ];
256
257 transfer_err!(
258 new_resp,
259 Scanner(Box::new(origin_data.clone().into_iter())),
260 Scan
261 );
262 assert!(matches!(new_resp, Scanner(_)));
263 if let Scanner(mut iter) = new_resp {
264 assert_eq!(
265 origin_data[0].as_ref().unwrap(),
266 &iter.next().unwrap().unwrap()
267 );
268 assert!(matches!(iter.next().unwrap(), Err(_ref)));
269 assert!(matches!(iter.next(), None));
270 }
271 }
272}