use std::io::prelude::*;
use serde_json;
use serde;
use std;
use failure::Error;
use FaktoryError;
fn bad(expected: &'static str, got: &RawResponse) -> FaktoryError {
use std;
let stringy = match *got {
RawResponse::String(ref s) => Some(&**s),
RawResponse::Blob(ref b) => if let Ok(s) = std::str::from_utf8(b) {
Some(s)
} else {
None
},
_ => None,
};
match stringy {
Some(s) => FaktoryError::BadType {
expected,
received: s.to_string(),
},
None => FaktoryError::BadType {
expected,
received: format!("{:?}", got),
},
}
}
pub fn read_json<R: BufRead, T: serde::de::DeserializeOwned>(r: R) -> Result<Option<T>, Error> {
let rr = read(r)?;
match rr {
RawResponse::String(ref s) if s == "OK" => {
return Ok(None);
}
RawResponse::String(ref s) => {
return Ok(serde_json::from_str(s).map(Some)?);
}
RawResponse::Blob(ref b) if b == b"OK" => {
return Ok(None);
}
RawResponse::Blob(ref b) => {
if b.is_empty() {
return Ok(None);
}
return Ok(serde_json::from_slice(b).map(Some)?);
}
RawResponse::Null => return Ok(None),
_ => {}
};
Err(bad("json", &rr).into())
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Hi {
#[serde(rename = "v")]
pub version: usize,
#[serde(rename = "i")]
pub iterations: Option<usize>,
#[serde(rename = "s")]
pub salt: Option<String>,
}
pub fn read_hi<R: BufRead>(r: R) -> Result<Hi, Error> {
let rr = read(r)?;
if let RawResponse::String(ref s) = rr {
if s.starts_with("HI ") {
return Ok(serde_json::from_str(&s[3..])?);
}
}
Err(bad("server hi", &rr).into())
}
pub fn read_ok<R: BufRead>(r: R) -> Result<(), Error> {
let rr = read(r)?;
if let RawResponse::String(ref s) = rr {
if s == "OK" {
return Ok(());
}
}
Err(bad("server ok", &rr).into())
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
enum RawResponse {
String(String),
Blob(Vec<u8>),
Number(isize),
Null,
}
fn read<R: BufRead>(mut r: R) -> Result<RawResponse, Error> {
let mut cmdbuf = [0u8; 1];
r.read_exact(&mut cmdbuf)?;
match cmdbuf[0] {
b'+' => {
let mut s = String::new();
r.read_line(&mut s)?;
let l = s.len() - 2;
s.truncate(l);
Ok(RawResponse::String(s))
}
b'-' => {
let mut s = String::new();
r.read_line(&mut s)?;
let l = s.len() - 2;
s.truncate(l);
Err(FaktoryError::new(s).into())
}
b':' => {
let mut s = String::with_capacity(32);
r.read_line(&mut s)?;
let l = s.len() - 2;
s.truncate(l);
match isize::from_str_radix(&*s, 10) {
Ok(i) => Ok(RawResponse::Number(i)),
Err(_) => Err(FaktoryError::BadResponse {
typed_as: "integer",
error: "invalid integer value",
bytes: s.into_bytes(),
}.into()),
}
}
b'$' => {
let mut bytes = Vec::with_capacity(32);
r.read_until(b'\n', &mut bytes)?;
let s = std::str::from_utf8(&bytes[0..bytes.len() - 2]).map_err(|_| {
FaktoryError::BadResponse {
typed_as: "bulk string",
error: "server bulk response contains non-utf8 size prefix",
bytes: bytes[0..bytes.len() - 2].to_vec(),
}
})?;
let size = isize::from_str_radix(s, 10).map_err(|_| FaktoryError::BadResponse {
typed_as: "bulk string",
error: "server bulk response size prefix is not an integer",
bytes: s.as_bytes().to_vec(),
})?;
if size == -1 {
Ok(RawResponse::Null)
} else {
let size = size as usize;
let mut bytes = Vec::with_capacity(size);
bytes.resize(size, 0u8);
r.read_exact(&mut bytes[..])?;
r.read_exact(&mut [0u8; 2])?;
Ok(RawResponse::Blob(bytes))
}
}
b'*' => {
unimplemented!();
}
c => Err(FaktoryError::BadResponse {
typed_as: "unknown",
error: "invalid response type prefix",
bytes: vec![c],
}.into()),
}
}
impl<'a> From<&'a str> for RawResponse {
fn from(s: &'a str) -> Self {
RawResponse::String(s.to_string())
}
}
impl From<isize> for RawResponse {
fn from(i: isize) -> Self {
RawResponse::Number(i)
}
}
impl From<Vec<u8>> for RawResponse {
fn from(b: Vec<u8>) -> Self {
RawResponse::Blob(b)
}
}
#[cfg(test)]
mod test {
use std::io::{self, Cursor};
use super::{read, RawResponse};
use serde_json::{self, Map, Value};
use failure::Error;
use FaktoryError;
fn read_json<C: io::BufRead>(c: C) -> Result<Option<Value>, Error> {
super::read_json(c)
}
#[test]
fn it_parses_simple_strings() {
let c = Cursor::new(b"+OK\r\n");
assert_eq!(read(c).unwrap(), RawResponse::from("OK"));
}
#[test]
fn it_parses_numbers() {
let c = Cursor::new(b":1024\r\n");
assert_eq!(read(c).unwrap(), RawResponse::from(1024));
}
#[test]
fn it_errors_on_bad_numbers() {
let c = Cursor::new(b":x\r\n");
let r = read(c).unwrap_err();
if let &FaktoryError::BadResponse {
typed_as, error, ..
} = r.downcast_ref().unwrap()
{
assert_eq!(typed_as, "integer");
assert_eq!(error, "invalid integer value");
} else {
unreachable!();
}
}
#[test]
fn it_parses_errors() {
let c = Cursor::new(b"-ERR foo\r\n");
let r = read(c).unwrap_err();
if let &FaktoryError::Internal { ref msg } = r.downcast_ref().unwrap() {
assert_eq!(msg, "foo");
} else {
unreachable!();
}
}
#[test]
#[should_panic]
fn it_cant_do_arrays() {
let c = Cursor::new(b"*\r\n");
read(c).is_err();
}
#[test]
fn it_parses_nills() {
let c = Cursor::new(b"$-1\r\n");
assert_eq!(read(c).unwrap(), RawResponse::Null);
}
#[test]
fn it_errors_on_bad_sizes() {
let c = Cursor::new(b"$x\r\n\r\n");
let r = read(c).unwrap_err();
if let &FaktoryError::BadResponse {
typed_as, error, ..
} = r.downcast_ref().unwrap()
{
assert_eq!(typed_as, "bulk string");
assert_eq!(error, "server bulk response size prefix is not an integer");
} else {
unreachable!();
}
}
#[test]
fn it_parses_empty_bulk() {
let c = Cursor::new(b"$0\r\n\r\n");
assert_eq!(read(c).unwrap(), RawResponse::from(vec![]));
}
#[test]
fn it_parses_non_empty_bulk() {
let c = Cursor::new(b"$11\r\nHELLO WORLD\r\n");
assert_eq!(
read(c).unwrap(),
RawResponse::from(Vec::from(&b"HELLO WORLD"[..]))
);
}
#[test]
fn it_decodes_json_ok_string() {
let c = Cursor::new(b"+OK\r\n");
assert_eq!(read_json(c).unwrap(), None);
}
#[test]
fn it_decodes_json_ok_blob() {
let c = Cursor::new(b"$2\r\nOK\r\n");
assert_eq!(read_json(c).unwrap(), None);
}
#[test]
fn it_decodes_json_nill() {
let c = Cursor::new(b"$-1\r\n");
assert_eq!(read_json(c).unwrap(), None);
}
#[test]
fn it_decodes_json_empty() {
let c = Cursor::new(b"$0\r\n\r\n");
assert_eq!(read_json(c).unwrap(), None);
}
#[test]
fn it_decodes_string_json() {
let c = Cursor::new(b"+{\"hello\":1}\r\n");
let mut m = Map::new();
m.insert("hello".to_string(), Value::from(1));
assert_eq!(read_json(c).unwrap(), Some(Value::Object(m)));
}
#[test]
fn it_decodes_blob_json() {
let c = Cursor::new(b"$11\r\n{\"hello\":1}\r\n");
let mut m = Map::new();
m.insert("hello".to_string(), Value::from(1));
assert_eq!(read_json(c).unwrap(), Some(Value::Object(m)));
}
#[test]
fn it_errors_on_bad_json_blob() {
let c = Cursor::new(b"$9\r\n{\"hello\"}\r\n");
let r = read_json(c).unwrap_err();
let _: &serde_json::Error = r.downcast_ref().unwrap();
}
#[test]
fn it_errors_on_bad_json_string() {
let c = Cursor::new(b"+{\"hello\"}\r\n");
let r = read_json(c).unwrap_err();
let _: &serde_json::Error = r.downcast_ref().unwrap();
}
#[test]
fn json_error_on_number() {
let c = Cursor::new(b":9\r\n");
let r = read_json(c).unwrap_err();
if let &FaktoryError::BadType {
expected,
ref received,
} = r.downcast_ref().unwrap()
{
assert_eq!(expected, "json");
assert_eq!(received, "Number(9)");
} else {
unreachable!();
}
}
#[test]
fn it_errors_on_unknown_resp_type() {
let c = Cursor::new(b"^\r\n");
let r = read_json(c).unwrap_err();
if let &FaktoryError::BadResponse {
typed_as, error, ..
} = r.downcast_ref().unwrap()
{
assert_eq!(typed_as, "unknown");
assert_eq!(error, "invalid response type prefix");
} else {
unreachable!();
}
}
}