1use crate::{
2 error::{DecodeError, DecodeResult},
3 Handshake, Key, KeyLength, KeyValuePairs, MetaData, MetaDataLength, NumKeyValuePairs,
4 ProtocolVersion, ProtocolVersionSegment, RequestPattern, RequestPatternLength, TransactionId,
5 Value, ValueLength, ERROR_CODE_BYTES, HSHK, KEY_LENGTH_BYTES, MESSAGE_LENGTH_BYTES,
6 METADATA_LENGTH_BYTES, MULTI_WILDCARD_BYTES, NUM_KEY_VALUE_PAIRS_BYTES,
7 PROTOCOL_VERSION_SEGMENT_BYTES, REQUEST_PATTERN_LENGTH_BYTES, SEPARATOR_BYTES,
8 TRANSACTION_ID_BYTES, VALUE_LENGTH_BYTES, WILDCARD_BYTES,
9 {Ack, Err, PState, ServerMessage as SM, State, ACK, ERR, PSTA, STA},
10};
11use std::io::Read;
12
13pub fn read_server_message(mut data: impl Read) -> DecodeResult<SM> {
14 let mut buf = [0; MESSAGE_LENGTH_BYTES];
15 data.read_exact(&mut buf)?;
16 let mut buf = [0];
17 data.read_exact(&mut buf)?;
18 match buf[0] {
19 PSTA => read_pstate_message(data).map(SM::PState),
20 ACK => read_ack_message(data).map(SM::Ack),
21 STA => read_state_message(data).map(SM::State),
22 ERR => read_err_message(data).map(SM::Err),
23 HSHK => read_handshake_message(data).map(SM::Handshake),
24 _ => Err(DecodeError::UndefinedType(buf[0])),
25 }
26}
27
28fn read_pstate_message(mut data: impl Read) -> DecodeResult<PState> {
29 let mut buf = [0; TRANSACTION_ID_BYTES];
30 data.read_exact(&mut buf)?;
31 let transaction_id = TransactionId::from_be_bytes(buf);
32
33 let mut buf = [0; REQUEST_PATTERN_LENGTH_BYTES];
34 data.read_exact(&mut buf)?;
35 let request_pattern_length = RequestPatternLength::from_be_bytes(buf);
36
37 let mut buf = [0; NUM_KEY_VALUE_PAIRS_BYTES];
38 data.read_exact(&mut buf)?;
39 let num_key_val_pairs = NumKeyValuePairs::from_be_bytes(buf);
40
41 let mut key_value_lengths = Vec::new();
42
43 for _ in 0..num_key_val_pairs {
44 let mut buf = [0; KEY_LENGTH_BYTES];
45 data.read_exact(&mut buf)?;
46 let key_length = KeyLength::from_be_bytes(buf);
47
48 let mut buf = [0; VALUE_LENGTH_BYTES];
49 data.read_exact(&mut buf)?;
50 let value_length = ValueLength::from_be_bytes(buf);
51
52 key_value_lengths.push((key_length, value_length));
53 }
54
55 let mut buf = vec![0; request_pattern_length as usize];
56 data.read_exact(&mut buf)?;
57 let request_pattern = RequestPattern::from_utf8_lossy(&buf).to_string();
58
59 let mut key_value_pairs = KeyValuePairs::new();
60
61 for (key_length, value_length) in key_value_lengths {
62 let mut buf = vec![0; key_length as usize];
63 data.read_exact(&mut buf)?;
64 let key = Key::from_utf8(buf)?;
65
66 let mut buf = vec![0; value_length as usize];
67 data.read_exact(&mut buf)?;
68 let value = Value::from_utf8_lossy(&buf).to_string();
69
70 key_value_pairs.push((key, value).into());
71 }
72
73 Ok(PState {
74 transaction_id,
75 request_pattern,
76 key_value_pairs,
77 })
78}
79
80fn read_handshake_message(mut data: impl Read) -> DecodeResult<Handshake> {
81 let mut buf = [0; PROTOCOL_VERSION_SEGMENT_BYTES];
82 data.read_exact(&mut buf)?;
83 let major = ProtocolVersionSegment::from_be_bytes(buf);
84
85 let mut buf = [0; PROTOCOL_VERSION_SEGMENT_BYTES];
86 data.read_exact(&mut buf)?;
87 let minor = ProtocolVersionSegment::from_be_bytes(buf);
88
89 let protocol_version = ProtocolVersion { major, minor };
90
91 let mut buf = vec![0; SEPARATOR_BYTES];
92 data.read_exact(&mut buf)?;
93 let separator = buf[0] as char;
94
95 let mut buf = vec![0; WILDCARD_BYTES];
96 data.read_exact(&mut buf)?;
97 let wildcard = buf[0] as char;
98
99 let mut buf = vec![0; MULTI_WILDCARD_BYTES];
100 data.read_exact(&mut buf)?;
101 let multi_wildcard = buf[0] as char;
102
103 Ok(Handshake {
104 protocol_version,
105 separator,
106 wildcard,
107 multi_wildcard,
108 })
109}
110
111fn read_ack_message(mut data: impl Read) -> DecodeResult<Ack> {
112 let mut buf = [0; TRANSACTION_ID_BYTES];
113 data.read_exact(&mut buf)?;
114 let transaction_id = TransactionId::from_be_bytes(buf);
115
116 Ok(Ack { transaction_id })
117}
118
119fn read_state_message(mut data: impl Read) -> DecodeResult<State> {
120 let mut buf = [0; TRANSACTION_ID_BYTES];
121 data.read_exact(&mut buf)?;
122 let transaction_id = TransactionId::from_be_bytes(buf);
123
124 let mut buf = [0; KEY_LENGTH_BYTES];
125 data.read_exact(&mut buf)?;
126 let key_length = KeyLength::from_be_bytes(buf);
127
128 let mut buf = [0; VALUE_LENGTH_BYTES];
129 data.read_exact(&mut buf)?;
130 let value_length = ValueLength::from_be_bytes(buf);
131
132 let mut buf = vec![0; key_length as usize];
133 data.read_exact(&mut buf)?;
134 let key = Key::from_utf8_lossy(&buf).to_string();
135
136 let mut buf = vec![0; value_length as usize];
137 data.read_exact(&mut buf)?;
138 let value = Value::from_utf8_lossy(&buf).to_string();
139
140 Ok(State {
141 transaction_id,
142 key_value: (key, value).into(),
143 })
144}
145
146fn read_err_message(mut data: impl Read) -> DecodeResult<Err> {
147 let mut buf = [0; TRANSACTION_ID_BYTES];
148 data.read_exact(&mut buf)?;
149 let transaction_id = TransactionId::from_be_bytes(buf);
150
151 let mut buf = [0; ERROR_CODE_BYTES];
152 data.read_exact(&mut buf)?;
153 let error_code = buf[0];
154
155 let mut buf = [0; METADATA_LENGTH_BYTES];
156 data.read_exact(&mut buf)?;
157 let metadata_length = MetaDataLength::from_be_bytes(buf);
158
159 let mut buf = vec![0; metadata_length as usize];
160 data.read_exact(&mut buf)?;
161 let metadata = MetaData::from_utf8_lossy(&buf).to_string();
162
163 Ok(Err {
164 transaction_id,
165 error_code,
166 metadata,
167 })
168}
169
170#[cfg(test)]
171mod test {
172
173 use super::*;
174
175 #[test]
176 fn handshake_message_is_read_correctly() {
177 let data = [
178 0b00000000, 0b00000000, 0b00000000, 0b00001000, HSHK, 0b00000000, 0b00000001, 0b00000000, 0b00000000, b'/', b'?', b'#',
180 ];
181
182 let result = read_server_message(&data[..]).unwrap();
183
184 assert_eq!(
185 result,
186 SM::Handshake(Handshake {
187 protocol_version: ProtocolVersion { major: 1, minor: 0 },
188 separator: '/',
189 wildcard: '?',
190 multi_wildcard: '#',
191 })
192 );
193 }
194
195 #[test]
196 fn pstate_message_is_read_correctly() {
197 let data = [
198 0b00000000, 0b00000000, 0b00000000, 0b10010010, PSTA, 0b11111111, 0b11111111, 0b11111111, 0b11111111, 0b11111111, 0b11111111,
200 0b11111111, 0b11111111, 0b00000000, 0b00001111, 0b00000000, 0b00000000, 0b00000000,
201 0b00000010, 0b00000000, 0b00100010, 0b00000000, 0b00000000, 0b00000000, 0b00011010,
202 0b00000000, 0b00010100, 0b00000000, 0b00000000, 0b00000000, 0b00011000, b'w', b'h',
203 b'o', b'/', b'l', b'e', b't', b'/', b't', b'h', b'e', b'/', b'?', b'/', b'#', b'w',
204 b'h', b'o', b'/', b'l', b'e', b't', b'/', b't', b'h', b'e', b'/', b'c', b'h', b'i',
205 b'c', b'k', b'e', b'n', b'/', b'c', b'r', b'o', b's', b's', b'/', b't', b'h', b'e',
206 b'/', b'r', b'o', b'a', b'd', b'y', b'e', b'a', b'h', b',', b' ', b't', b'h', b'a',
207 b't', b' ', b'w', b'a', b's', b' ', b'm', b'e', b',', b' ', b'I', b' ', b'g', b'u',
208 b'e', b's', b's', b'w', b'h', b'o', b'/', b'l', b'e', b't', b'/', b't', b'h', b'e',
209 b'/', b'd', b'o', b'g', b's', b'/', b'o', b'u', b't', b'W', b'h', b'o', b'?', b' ',
210 b'W', b'h', b'o', b'?', b' ', b'W', b'h', b'o', b'?', b' ', b'W', b'h', b'o', b'?',
211 b' ', b'W', b'h', b'o', b'?',
212 ];
213
214 let result = read_server_message(&data[..]).unwrap();
215
216 assert_eq!(
217 result,
218 SM::PState(PState {
219 transaction_id: u64::MAX,
220 request_pattern: "who/let/the/?/#".to_owned(),
221 key_value_pairs: vec![
222 (
223 "who/let/the/chicken/cross/the/road".to_owned(),
224 "yeah, that was me, I guess".to_owned()
225 )
226 .into(),
227 (
228 "who/let/the/dogs/out".to_owned(),
229 "Who? Who? Who? Who? Who?".to_owned()
230 )
231 .into()
232 ]
233 })
234 )
235 }
236
237 #[test]
238 fn ack_message_is_read_correctly() {
239 let data = [
240 0b00000000, 0b00000000, 0b00000000, 0b00001001, ACK, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
242 0b00000000, 0b00101010,
243 ];
244
245 let result = read_server_message(&data[..]).unwrap();
246
247 assert_eq!(result, SM::Ack(Ack { transaction_id: 42 }))
248 }
249
250 #[test]
251 fn state_message_is_read_correctly() {
252 let data = [
253 0b00000000, 0b00000000, 0b00000000, 0b00010101, STA, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
255 0b00000000, 0b00101010, 0b00000000, 0b00000101, 0b00000000, 0b00000000, 0b00000000,
256 0b00000001, b'1', b'/', b'2', b'/', b'3', b'4',
257 ];
258
259 let result = read_server_message(&data[..]).unwrap();
260
261 assert_eq!(
262 result,
263 SM::State(State {
264 transaction_id: 42,
265 key_value: ("1/2/3", "4").into()
266 })
267 )
268 }
269
270 #[test]
271 fn err_message_is_read_correctly() {
272 let data = [
273 0b00000000, 0b00000000, 0b00000000, 0b00011111, ERR, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
275 0b00000000, 0b00101010, 0b00000101, 0b00000000, 0b00000000, 0b00000000, 0b00010001,
276 b'T', b'H', b'I', b'S', b' ', b'I', b'S', b' ', b'M', b'E', b'T', b'A', b'A', b'A',
277 b'!', b'!', b'!',
278 ];
279
280 let result = read_server_message(&data[..]).unwrap();
281
282 assert_eq!(
283 result,
284 SM::Err(Err {
285 transaction_id: 42,
286 error_code: 5,
287 metadata: "THIS IS METAAA!!!".to_owned()
288 })
289 )
290 }
291}