libnewbee/
lib.rs

1//! Streamed RDB Rust Parser
2
3extern crate lzf;
4extern crate byteorder;
5
6#[macro_use]
7mod com;
8mod consts;
9mod codec;
10mod types;
11mod fmt;
12
13pub use fmt::{RedisFmt, RedisCmd};
14pub use com::{Result, Error};
15
16use fmt::{RedisFormat, Group};
17use com::*;
18use codec::*;
19use types::*;
20use consts::*;
21
22use std::io::{self, Read};
23use std::mem;
24
25pub struct DefaultRdbParser {
26    local_buf: Vec<u8>,
27    cursor: usize,
28    parsed: Vec<RdbEntry>,
29    state: State,
30    end: Vec<u8>,
31}
32
33impl Default for DefaultRdbParser {
34    fn default() -> Self {
35        DefaultRdbParser {
36            local_buf: Vec::new(),
37            cursor: 0,
38            parsed: Vec::new(),
39            state: State::Header,
40            end: Vec::new(),
41        }
42    }
43}
44
45
46impl DefaultRdbParser {
47    pub fn read_to_cmd<R: Read>(&mut self, read: &mut R) -> Result<Vec<RedisCmd>> {
48        let _readed = self.read_to_local(read)?;
49        loop {
50            match self.state {
51                State::Data => {
52                    let data = match self.data() {
53                        Err(Error::Other) => {
54                            self.state = State::Crc;
55                            continue;
56                        }
57                        other => other?,
58                    };
59                    self.cursor += data.shift();
60                    self.parsed.push(data);
61                }
62                State::Sector => {
63                    let sector = match self.sector() {
64                        Err(Error::Other) => {
65                            self.state = State::Crc;
66                            continue;
67                        }
68                        otherwise => otherwise?,
69                    };
70                    self.cursor += sector.shift();
71                    self.state = State::Data;
72                }
73                State::Header => {
74                    let header = self.header()?;
75                    self.cursor += header.shift();
76                    self.state = State::Sector;
77                }
78                State::Crc => {
79                    self.end = self.crc()?;
80                    self.state = State::End;
81                }
82                State::End => {
83                    break;
84                }
85            };
86        }
87
88        let entries = self.drain_buf();
89        let mut fmts = vec![];
90        for entry in entries {
91            entry.fmt(&mut fmts);
92        }
93        let groups = Group::group(fmts);
94        Ok(groups)
95    }
96
97    fn drain_buf(&mut self) -> Vec<RdbEntry> {
98        let mut entries = vec![];
99        mem::swap(&mut entries, &mut self.parsed);
100        self.cursor = 0;
101        entries
102    }
103}
104
105impl RdbParser for DefaultRdbParser {
106    fn read_to_local<R: Read>(&mut self, read: &mut R) -> Result<usize> {
107        let start_len = self.local_buf.len();
108        let mut len = start_len;
109        let mut new_write_size = 16;
110        let ret;
111        loop {
112            if len == self.local_buf.len() {
113                new_write_size *= 2;
114                self.local_buf.resize(len + new_write_size, 0);
115            }
116
117            match read.read(&mut self.local_buf[len..]) {
118                Ok(0) => {
119                    ret = Ok(len - start_len);
120                    break;
121                }
122                Ok(n) => len += n,
123                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
124                    ret = Ok(len - start_len);
125                    break;
126                }
127                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
128                Err(e) => {
129                    ret = Err(Error::IoError(e));
130                    break;
131                }
132            }
133        }
134
135        self.local_buf.truncate(len);
136        ret
137    }
138
139    fn local_buf(&self) -> &[u8] {
140        &self.local_buf[min(self.cursor, self.local_buf.len())..]
141    }
142}
143
144
145trait RdbParser {
146    fn read_to_local<R: Read>(&mut self, read: &mut R) -> Result<usize>;
147    fn local_buf(&self) -> &[u8];
148
149    fn crc(&mut self) -> Result<Vec<u8>> {
150        let src = self.local_buf();
151        other!(src[0] != 0xff);
152        Ok(src[1..].to_vec())
153    }
154
155    fn header(&mut self) -> Result<RdbEntry> {
156        let src = self.local_buf();
157        more!(src.len() < REDIS_MAGIC_STRING.len() + 4);
158        let version = &src[REDIS_MAGIC_STRING.len()..REDIS_MAGIC_STRING.len() + 4];
159        let version_str = String::from_utf8_lossy(version);
160        let version_u32 = version_str.parse::<u32>().unwrap();
161        Ok(RdbEntry::Version(version_u32))
162    }
163
164    fn sector(&mut self) -> Result<RdbEntry> {
165        let src = self.local_buf();
166        more!(src.len() < 2);
167        other!(src[0] != REDIS_RDB_OPCODE_SELECTDB);
168        let length = Length::from_buf(&src[1..])?;
169        Ok(RdbEntry::Sector(length))
170    }
171
172    fn data(&mut self) -> Result<RdbEntry> {
173        let src = self.local_buf();
174        // meet EOF
175        if src[0] == 0xff {
176            return Err(Error::Other);
177        }
178        let expire = ExpireTime::from_buf(src)?;
179        let data = RedisData::from_buf(&src[expire.shift()..])?;
180        Ok(RdbEntry::Data {
181            expire: expire,
182            data: data,
183        })
184    }
185}
186
187
188#[derive(Debug)]
189enum State {
190    Header,
191    Sector,
192    Data,
193    Crc,
194    End,
195}
196
197#[derive(Debug)]
198enum RdbEntry {
199    Version(u32),
200    Sector(Length),
201    Data { expire: ExpireTime, data: RedisData },
202}
203
204impl Shift for RdbEntry {
205    #[inline]
206    fn shift(&self) -> usize {
207        match self {
208            // len('REDIS') + version_number
209            &RdbEntry::Version(_) => 5 + 4,
210            // 0xFE + u8
211            &RdbEntry::Sector(_) => 2,
212            &RdbEntry::Data { ref expire, ref data } => expire.shift() + data.shift(),
213        }
214    }
215}
216
217impl RedisFormat for RdbEntry {
218    fn fmt(self, buf: &mut Vec<RedisFmt>) -> usize {
219        match self {
220            RdbEntry::Data { expire, data } => {
221                let key = data.copy_key();
222                let mut count = data.fmt(buf);
223                count += expire.fmt(key, buf);
224                count
225            }
226            _ => 0,
227        }
228    }
229}