redis_client/
reader.rs

1use errors::{ParsingError, RedisError};
2use results::RedisResult;
3use std::io::BufReader;
4use std::io::prelude::*;
5use std::net::TcpStream;
6
7/// Reads the TcpStream buffer and parse the result according to the redis protocol specification 
8/// building either RedisResult or RedisError.
9pub struct Reader;
10
11impl Reader {
12    /// Read the stream expecting one response.
13    /// Determine the type of the response
14    pub fn read(buffer: &mut BufReader<TcpStream>) -> Result<RedisResult, RedisError> {
15        
16        let mut head_line = String::new();
17        try!(buffer.read_line(&mut head_line));
18
19        let identifier = head_line.remove(0);
20
21        match identifier{
22            '$' => Reader::read_bulk_string(&head_line, buffer),
23            '*' => Reader::read_array(&head_line, buffer),
24            '+' => Reader::read_string(&head_line),
25            ':' => Reader::read_integer(&head_line),
26            '-' => Reader::read_error(&head_line),
27            _ => Err(RedisError::Parse(ParsingError::BadIdentifier(identifier.to_string()))),
28        }
29    }
30
31    /// Read the stream and expect several responses
32    pub fn read_pipeline(buffer: &mut BufReader<TcpStream>, cmd_nb: usize) -> Result<Vec<RedisResult>, RedisError> {
33        let mut results: Vec<RedisResult> = Vec::with_capacity(cmd_nb);
34        let mut remaining_cmd = cmd_nb;
35        loop {
36            if remaining_cmd == 0 {
37                break;
38            }
39            remaining_cmd -= 1;
40
41            match Reader::read(buffer) {
42                Ok(value) => results.push(value),
43                Err(RedisError::Response(err)) => results.push(RedisResult::String(err)),
44                Err(err) => return Err(err),
45            };
46        }
47        Ok(results)
48    }
49
50    /// Read a bulk string response
51    fn read_bulk_string(head_line: & String, buffer: &mut BufReader<TcpStream>) -> Result<RedisResult, RedisError> {
52        let read_byte_nb: i64 = try!(head_line.trim().parse());
53
54        if read_byte_nb < 0 {
55            Ok(RedisResult::Nil)
56        } else {
57            let mut result: Vec<u8> = Vec::with_capacity((read_byte_nb + 2) as usize);
58            loop {
59                let length = {
60                    let buf = try!(buffer.fill_buf());
61                    result.extend(buf.iter().cloned());
62
63                    buf.len()
64                };
65                
66
67                if result.len() >= (read_byte_nb + 2) as usize {
68                    buffer.consume(length - (result.len() - (read_byte_nb + 2) as usize));
69                    break;
70                } else {
71                    buffer.consume(length);
72                }
73            }
74            result.truncate(read_byte_nb as usize);
75
76            Ok(RedisResult::Bytes(result))
77        }
78    }
79
80    /// Read a simple string response
81    fn read_string(simple_str: & String) -> Result<RedisResult, RedisError> {
82        Ok(RedisResult::String(simple_str.trim().to_string()))
83    }
84
85    /// Read an integer response
86    fn read_integer(integer_str: & String) -> Result<RedisResult, RedisError> {
87        Ok(RedisResult::Int(try!(integer_str.trim().parse::<i64>())))
88    }
89
90    /// Read an error response
91    fn read_error(error_str: & String) -> Result<RedisResult, RedisError> {
92        Err(RedisError::Response(error_str.to_string()))
93    }
94
95    /// Read an array response
96    fn read_array(array_str: & String, buffer: &mut BufReader<TcpStream>) -> Result<RedisResult, RedisError> {
97        let mut read_elmt_nb: i64 = try!(array_str.trim().parse());
98
99        if read_elmt_nb < 0 {
100            Ok(RedisResult::Nil)
101        } else if read_elmt_nb == 0 {
102            Ok(RedisResult::Array(Vec::new()))
103        }else {
104            let mut result: Vec<RedisResult> = Vec::with_capacity(read_elmt_nb as usize);
105
106            loop {
107                match Reader::read(buffer) {
108                    Ok(value) => result.push(value),
109                    Err(RedisError::Response(err)) => result.push(RedisResult::String(err)),
110                    Err(err) => return Err(err),
111                };
112
113                read_elmt_nb -= 1;
114                if read_elmt_nb == 0 {
115                    break;
116                }
117            }
118            Ok(RedisResult::Array(result))
119        }
120    }
121
122}