1use std::convert::{From, TryFrom};
2use std::str;
3
4use bytes::{Buf, BufMut, BytesMut};
5
6use crate::error::{invalid_data, to_error, Error, Result};
7
8#[derive(Debug, Clone, Eq, PartialEq)]
9pub enum Value {
10 String(String),
11 Error(String),
12 Integer(i64),
13 Bulk(Vec<u8>),
14 Null,
15 Array(Vec<Value>),
16 StaticError(&'static str),
17 StaticString(&'static str),
18}
19
20impl Value {
21 pub(crate) fn encode(&self, buf: &mut BytesMut) {
22 ValueEncoder::encode(buf, self);
23 }
24
25 pub fn as_integer(&self) -> Result<i64> {
26 match self {
27 Value::Integer(i) => Ok(*i),
28 Value::String(s) => s.parse().map_err(to_error),
29 Value::Bulk(v) => str::from_utf8(v)
30 .map_err(to_error)
31 .and_then(|s| s.parse().map_err(to_error)),
32 _ => inconvertible(self, "Integer"),
33 }
34 }
35
36 pub fn to_integer(self) -> Result<i64> {
37 self.as_integer()
38 }
39
40 pub fn as_float(&self) -> Result<f64> {
41 match self {
42 Value::Integer(i) => Ok((*i) as f64),
43 Value::String(s) => s.parse().map_err(to_error),
44 Value::Bulk(v) => str::from_utf8(v)
45 .map_err(to_error)
46 .and_then(|s| s.parse().map_err(to_error)),
47 _ => inconvertible(self, "Float"),
48 }
49 }
50
51 pub fn to_float(self) -> Result<f64> {
52 self.as_float()
53 }
54
55 pub fn as_str(&self) -> Result<&str> {
56 match self {
57 Value::Bulk(v) => str::from_utf8(v.as_slice()).map_err(to_error),
58 Value::String(s) => Ok(s.as_str()),
59 _ => inconvertible(self, "&str"),
60 }
61 }
62
63 pub fn to_string(self) -> Result<String> {
64 match self {
65 Value::Bulk(b) => Ok(String::from_utf8(b).map_err(to_error)?),
66 Value::Integer(i) => Ok(i.to_string()),
67 Value::String(s) => Ok(s),
68 Value::Null => Ok(String::new()),
69 _ => inconvertible(&self, "String"),
70 }
71 }
72
73 pub fn as_slice(&self) -> Result<&[u8]> {
74 match self {
75 Value::Bulk(v) => Ok(v.as_slice()),
76 Value::String(s) => Ok(s.as_bytes()),
77 _ => inconvertible(self, "&[u8]"),
78 }
79 }
80
81 pub fn to_bytes(self) -> Result<Vec<u8>> {
82 match self {
83 Value::Bulk(b) => Ok(b),
84 Value::String(s) => Ok(s.into_bytes()),
85 Value::Integer(i) => Ok(i.to_string().into_bytes()),
86 Value::Null => Ok(Vec::new()),
87 _ => inconvertible(&self, "Vec<u8>"),
88 }
89 }
90}
91
92fn inconvertible<A>(from: &Value, target: &str) -> Result<A> {
93 invalid_data(format!("'{:?}' is not convertible to '{}'", from, target))
94}
95
96impl TryFrom<&Value> for String {
97 type Error = Error;
98 fn try_from(val: &Value) -> Result<Self> {
99 val.as_str().map(ToOwned::to_owned)
100 }
101}
102
103impl TryFrom<Value> for String {
104 type Error = Error;
105 fn try_from(val: Value) -> Result<Self> {
106 val.to_string()
107 }
108}
109
110impl TryFrom<&Value> for Vec<u8> {
111 type Error = Error;
112
113 fn try_from(val: &Value) -> Result<Self> {
114 val.as_slice().map(ToOwned::to_owned)
115 }
116}
117
118impl TryFrom<Value> for Vec<u8> {
119 type Error = Error;
120
121 fn try_from(val: Value) -> Result<Self> {
122 val.to_bytes()
123 }
124}
125
126impl TryFrom<&Value> for Vec<String> {
127 type Error = Error;
128
129 fn try_from(val: &Value) -> Result<Self> {
130 if let Value::Array(array) = val {
131 array.iter().map(TryInto::try_into).collect()
132 } else {
133 inconvertible(val, "Vec<String>")
134 }
135 }
136}
137
138impl TryFrom<Value> for Vec<String> {
139 type Error = Error;
140
141 fn try_from(val: Value) -> Result<Self> {
142 if let Value::Array(array) = val {
143 array.into_iter().map(Value::to_string).collect()
144 } else {
145 inconvertible(&val, "Vec<String>")
146 }
147 }
148}
149
150impl From<Value> for Vec<Value> {
151 fn from(val: Value) -> Self {
152 if let Value::Array(array) = val {
153 array
154 } else {
155 Vec::from(val)
156 }
157 }
158}
159
160impl From<&Value> for Vec<Value> {
161 fn from(val: &Value) -> Self {
162 if let Value::Array(array) = val {
163 array.iter().map(Clone::clone).collect()
164 } else {
165 Vec::from(val)
166 }
167 }
168}
169
170impl TryFrom<Value> for i64 {
171 type Error = Error;
172
173 fn try_from(val: Value) -> Result<Self> {
174 val.as_integer()
175 }
176}
177
178impl TryFrom<&Value> for i64 {
179 type Error = Error;
180
181 fn try_from(val: &Value) -> Result<Self> {
182 val.as_integer()
183 }
184}
185
186impl TryFrom<Value> for f64 {
187 type Error = Error;
188
189 fn try_from(val: Value) -> Result<Self> {
190 val.as_float()
191 }
192}
193
194impl TryFrom<&Value> for f64 {
195 type Error = Error;
196
197 fn try_from(val: &Value) -> Result<Self> {
198 val.as_float()
199 }
200}
201
202impl TryFrom<Value> for Option<Vec<u8>> {
203 type Error = Error;
204
205 fn try_from(val: Value) -> Result<Self> {
206 if let Value::Null = val {
207 Ok(None)
208 } else {
209 val.to_bytes().map(Some)
210 }
211 }
212}
213
214impl TryFrom<&Value> for Option<Vec<u8>> {
215 type Error = Error;
216
217 fn try_from(val: &Value) -> Result<Self> {
218 if let Value::Null = val {
219 Ok(None)
220 } else {
221 val.as_slice().map(ToOwned::to_owned).map(Some)
222 }
223 }
224}
225
226impl From<i64> for Value {
227 fn from(i: i64) -> Self {
228 Value::Integer(i)
229 }
230}
231
232impl From<i32> for Value {
233 fn from(i: i32) -> Self {
234 Value::Integer(i64::from(i))
235 }
236}
237
238impl From<usize> for Value {
239 fn from(i: usize) -> Self {
240 Value::Integer(i as i64)
241 }
242}
243
244impl From<u32> for Value {
245 fn from(i: u32) -> Self {
246 Value::Integer(i64::from(i))
247 }
248}
249
250impl From<u64> for Value {
251 fn from(i: u64) -> Self {
252 Value::Integer(i as i64)
253 }
254}
255
256impl From<Vec<u8>> for Value {
257 fn from(b: Vec<u8>) -> Self {
258 Value::Bulk(b)
259 }
260}
261
262impl From<Option<Vec<u8>>> for Value {
263 fn from(b: Option<Vec<u8>>) -> Self {
264 match b {
265 Some(b) => Value::Bulk(b),
266 None => Value::Null,
267 }
268 }
269}
270
271impl<T> From<Vec<T>> for Value
272where
273 T: Into<Value>,
274{
275 fn from(a: Vec<T>) -> Self {
276 Value::Array(a.into_iter().map(Into::into).collect())
277 }
278}
279
280pub struct ValueEncoder;
281
282impl ValueEncoder {
283 #[inline]
284 fn ensure_capacity(buf: &mut BytesMut, size: usize) {
285 if buf.remaining_mut() < size {
286 buf.reserve(size)
287 }
288 }
289
290 #[inline]
291 fn write_crlf(buf: &mut BytesMut) {
292 buf.put_slice(b"\r\n");
293 }
294
295 fn write_header_format(buf: &mut [u8], ty: u8, number: i64) -> &[u8] {
296 let s = format!("{}{}\r\n", char::from(ty), number);
297 let s = s.as_bytes();
298 let buf = &mut buf[0..s.len()];
299 buf.copy_from_slice(s);
300 buf
301 }
302
303 fn write_header_fast(buf: &mut [u8], ty: u8, number: i64) -> &[u8] {
304 let len = buf.len();
305 buf[len - 1] = b'\n';
306 buf[len - 2] = b'\r';
307 let mut i = len - 3;
308 let mut number = number;
309 loop {
310 buf[i] = b'0' + (number % 10) as u8;
311 i -= 1;
312 number /= 10;
313 if number == 0 {
314 break;
315 }
316 }
317 buf[i] = ty;
318 &buf[i..]
319 }
320
321 fn write_header(buf: &mut BytesMut, ty: u8, number: i64) {
322 let mut hdr = [0u8; 32];
323 let hdr = if number < 0 {
324 Self::write_header_format(&mut hdr, ty, number)
325 } else {
326 Self::write_header_fast(&mut hdr, ty, number)
327 };
328 Self::ensure_capacity(buf, hdr.len());
329 buf.put(hdr)
330 }
331
332 fn write_bulk(buf: &mut BytesMut, ty: u8, bytes: &[u8]) {
333 Self::ensure_capacity(buf, bytes.len() + 32);
334 Self::write_header(buf, ty, bytes.len() as i64);
335 buf.put(bytes);
336 Self::write_crlf(buf);
337 }
338
339 fn write_string(buf: &mut BytesMut, ty: u8, str: &str) {
340 Self::ensure_capacity(buf, str.len() + 3);
341 buf.put_u8(ty);
342 buf.put(str.as_bytes());
343 Self::write_crlf(buf);
344 }
345
346 pub fn encode(buf: &mut BytesMut, value: &Value) {
347 match value {
348 Value::Null => Self::write_header(buf, b'$', -1),
349 Value::Array(a) => {
350 Self::write_header(buf, b'*', a.len() as i64);
351 for e in a {
352 Self::encode(buf, e);
353 }
354 }
355 Value::Integer(i) => Self::write_header(buf, b':', *i),
356 Value::Bulk(b) => Self::write_bulk(buf, b'$', b),
357 Value::String(s) => Self::write_string(buf, b'+', s),
358 Value::Error(e) => Self::write_string(buf, b'-', e),
359 Value::StaticError(e) => Self::write_string(buf, b'-', e),
360 Value::StaticString(e) => Self::write_string(buf, b'+', e),
361 }
362 }
363}
364
365const RESP_TYPE_BULK_STRING: u8 = b'$';
366const RESP_TYPE_ARRAY: u8 = b'*';
367const RESP_TYPE_INTEGER: u8 = b':';
368const RESP_TYPE_SIMPLE_STRING: u8 = b'+';
369const RESP_TYPE_ERROR: u8 = b'-';
370
371fn split_input(input: &mut BytesMut, at: usize) -> BytesMut {
372 let mut buf = input.split_to(at);
373 let len = buf.len();
374 buf.truncate(len - 2);
375 buf
376}
377
378#[derive(Debug, Default)]
379struct StringDecoder {
380 expect_lf: bool,
381 inspect: usize,
382}
383
384impl StringDecoder {
385 fn decode(&mut self, input: &mut BytesMut) -> Result<String> {
386 Ok(str::from_utf8(&split_input(input, self.inspect))
387 .map_err(to_error)?
388 .into())
389 }
390
391 fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<String>> {
392 let length = input.len();
393 loop {
394 if length <= self.inspect {
395 return Ok(None);
396 }
397 let inspect = self.inspect;
398 self.inspect += 1;
399 match (self.expect_lf, input[inspect]) {
400 (false, b'\r') => self.expect_lf = true,
401 (false, _) => (),
402 (true, b'\n') => return self.decode(input).map(Some),
403 (true, b) => {
404 return invalid_data(format!("Invalid last tailing line feed: '{}'", b));
405 }
406 }
407 }
408 }
409}
410
411#[derive(Debug)]
412struct BulkDecoder {
413 length_decoder: Option<IntegerDecoder>,
414 length: i64,
415}
416
417impl BulkDecoder {
418 fn try_decode_length(&mut self, input: &mut BytesMut) -> Result<bool> {
419 if let Some(length) = self.length_decoder.as_mut().unwrap().try_decode(input)? {
420 self.length_decoder = None;
421 self.length = length;
422 Ok(true)
423 } else {
424 Ok(false)
425 }
426 }
427
428 fn try_decode_bulk(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
429 if self.length < 0 {
430 return if self.length == -1 {
431 Ok(Some(Value::Null))
432 } else {
433 invalid_data(format!("Invalid bulk length: '{}'", self.length))
434 };
435 }
436 let length = self.length as usize;
437 let len = input.len();
438 if len < length + 2 {
439 return Ok(None);
440 }
441 if input[length] != b'\r' || input[length + 1] != b'\n' {
442 return invalid_data(format!(
443 "Invalid bulk tailing bytes: '[{}, {}]'",
444 input[length],
445 input[length + 1]
446 ));
447 }
448 let mut bulk = input.split_to(length + 2).to_vec();
449 bulk.truncate(length);
450 Ok(Some(Value::Bulk(bulk)))
451 }
452
453 fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
454 if self.length_decoder.is_some() && !self.try_decode_length(input)? {
455 return Ok(None);
456 }
457 self.try_decode_bulk(input)
458 }
459}
460
461impl Default for BulkDecoder {
462 fn default() -> Self {
463 BulkDecoder {
464 length_decoder: Some(IntegerDecoder::default()),
465 length: 0,
466 }
467 }
468}
469
470#[derive(Debug)]
471struct ArrayDecoder {
472 size_decoder: Option<IntegerDecoder>,
473 value_decoder: Box<ValueDecoder>,
474 array: Option<Vec<Value>>,
475 size: usize,
476}
477
478impl ArrayDecoder {
479 fn try_decode_size(&mut self, input: &mut BytesMut) -> Result<bool> {
480 if let Some(size) = self.size_decoder.as_mut().unwrap().try_decode(input)? {
481 self.size_decoder = None;
482 if size < 0 {
483 return invalid_data(format!("Invalid array size '{}'", size));
484 }
485 self.size = size as usize;
486 self.array = Some(Vec::with_capacity(self.size));
487 Ok(true)
488 } else {
489 Ok(false)
490 }
491 }
492
493 fn try_decode_element(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
494 if self.size == 0 {
495 return Ok(Some(Value::Array(Vec::new())));
496 }
497 while !input.is_empty() {
498 if let Some(value) = self.value_decoder.try_decode(input)? {
499 self.array.as_mut().unwrap().push(value);
500 if self.array.as_ref().unwrap().len() == self.size {
501 return Ok(self.array.take().map(Value::Array));
502 }
503 } else {
504 break;
505 }
506 }
507 Ok(None)
508 }
509
510 fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
511 if self.size_decoder.is_some() && !self.try_decode_size(input)? {
512 return Ok(None);
513 }
514 self.try_decode_element(input)
515 }
516}
517
518impl Default for ArrayDecoder {
519 fn default() -> Self {
520 ArrayDecoder {
521 size_decoder: Some(IntegerDecoder::default()),
522 array: None,
523 value_decoder: Default::default(),
524 size: 0,
525 }
526 }
527}
528
529#[derive(Debug, Default)]
530struct IntegerDecoder {
531 expect_lf: bool,
532 inspect: usize,
533}
534
535impl IntegerDecoder {
536 fn decode(&mut self, input: &mut BytesMut) -> Result<i64> {
537 str::from_utf8(&split_input(input, self.inspect))
538 .map_err(to_error)?
539 .parse()
540 .map_err(to_error)
541 }
542
543 fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<i64>> {
544 let length = input.len();
545 loop {
546 if length <= self.inspect {
547 return Ok(None);
548 }
549 let inspect = self.inspect;
550 self.inspect += 1;
551 match (self.expect_lf, input[inspect]) {
552 (false, b'0'..=b'9') => (),
553 (false, b'-') => (),
554 (false, b'\r') => self.expect_lf = true,
555 (true, b'\n') => return self.decode(input).map(Some),
556 (_, b) => {
557 return invalid_data(format!(
558 "Invalid byte '{}' when decoding integer {:?}",
559 b, input
560 ));
561 }
562 }
563 }
564 }
565}
566
567#[derive(Debug)]
568enum TypedDecoder {
569 String(StringDecoder),
570 Error(StringDecoder),
571 Integer(IntegerDecoder),
572 Bulk(BulkDecoder),
573 Array(ArrayDecoder),
574}
575
576impl TypedDecoder {
577 fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
578 match self {
579 TypedDecoder::String(decoder) => {
580 decoder.try_decode(input).map(|x| x.map(Value::String))
581 }
582 TypedDecoder::Error(decoder) => decoder.try_decode(input).map(|x| x.map(Value::Error)),
583 TypedDecoder::Integer(decoder) => {
584 decoder.try_decode(input).map(|x| x.map(Value::Integer))
585 }
586 TypedDecoder::Bulk(decoder) => decoder.try_decode(input),
587 TypedDecoder::Array(decoder) => decoder.try_decode(input),
588 }
589 }
590}
591
592#[derive(Debug, Default)]
593pub struct ValueDecoder {
594 decoder: Option<TypedDecoder>,
595}
596
597impl ValueDecoder {
598 pub fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
599 if input.is_empty() {
600 return Ok(None);
601 }
602 if self.decoder.is_none() {
603 let decoder = match input[0] {
604 RESP_TYPE_BULK_STRING => TypedDecoder::Bulk(BulkDecoder::default()),
605 RESP_TYPE_ARRAY => TypedDecoder::Array(ArrayDecoder::default()),
606 RESP_TYPE_INTEGER => TypedDecoder::Integer(IntegerDecoder::default()),
607 RESP_TYPE_SIMPLE_STRING => TypedDecoder::String(StringDecoder::default()),
608 RESP_TYPE_ERROR => TypedDecoder::Error(StringDecoder::default()),
609 t => return invalid_data(format!("Invalid value type '{}'", t)),
610 };
611 input.advance(1);
612 self.decoder = Some(decoder);
613 }
614 let result = self.decoder.as_mut().unwrap().try_decode(input)?;
615 if result.is_some() {
616 self.decoder = None;
617 }
618 Ok(result)
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 fn test_decode_partially(input: &BytesMut) {
627 let len = input.len();
628 for i in 1..len {
629 let mut s = input[0..i].into();
630 let v = ValueDecoder::default().try_decode(&mut s);
631 assert!(v.is_ok());
632 let v = v.unwrap();
633 assert!(v.is_none());
634 }
635 }
636
637 fn test_decode(mut input: BytesMut, expect: Value) {
638 test_decode_partially(&input);
639 let mut decoder = ValueDecoder::default();
640 if let Ok(Some(v)) = decoder.try_decode(&mut input) {
641 assert_eq!(v, expect);
642 } else {
643 assert!(false);
644 }
645 }
646
647 fn test_encode(expect: &BytesMut, value: &Value) {
648 let mut buf = BytesMut::with_capacity(128);
649 ValueEncoder::encode(&mut buf, value);
650 assert_eq!(&buf, expect);
651 }
652
653 fn test_codec(serialized: BytesMut, value: Value) {
654 test_encode(&serialized, &value);
655 test_decode(serialized, value);
656 }
657
658 #[test]
659 fn test_simple_string() {
660 test_codec(b"+OK\r\n"[..].into(), Value::String("OK".into()));
661 }
662
663 #[test]
664 fn test_error() {
665 test_codec(
666 b"-Error message\r\n"[..].into(),
667 Value::Error("Error message".into()),
668 );
669 }
670
671 #[test]
672 fn test_integer() {
673 test_codec(b":1000\r\n"[..].into(), Value::Integer(1000));
674 }
675
676 #[test]
677 fn test_bulk_string() {
678 test_codec(
679 b"$6\r\nfoobar\r\n"[..].into(),
680 Value::Bulk(b"foobar"[..].into()),
681 );
682 test_codec(b"$0\r\n\r\n"[..].into(), Value::Bulk(b""[..].into()));
683 test_codec(b"$-1\r\n"[..].into(), Value::Null);
684 }
685
686 #[test]
687 fn test_array() {
688 test_codec(b"*0\r\n"[..].into(), Value::Array(vec![]));
689 test_codec(
690 b"*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..].into(),
691 Value::Array(vec![
692 Value::Bulk(b"foo"[..].into()),
693 Value::Bulk(b"bar"[..].into()),
694 ]),
695 );
696 test_codec(
697 b"*3\r\n:1\r\n:2\r\n:3\r\n"[..].into(),
698 Value::Array(vec![
699 Value::Integer(1),
700 Value::Integer(2),
701 Value::Integer(3),
702 ]),
703 );
704 test_codec(
705 b"*5\r\n:1\r\n:2\r\n:3\r\n:4\r\n$6\r\nfoobar\r\n"[..].into(),
706 Value::Array(vec![
707 Value::Integer(1),
708 Value::Integer(2),
709 Value::Integer(3),
710 Value::Integer(4),
711 Value::Bulk(b"foobar"[..].into()),
712 ]),
713 );
714 }
715}