1extern crate lzf;
4extern crate byteorder;
5
6#[macro_use]
7mod com;
8mod consts;
9mod codec;
10mod types;
11mod fmt;
12
13pub use fmt::{RedisFmt, RedisCmd};
14pub use com::{Result, Error};
15
16use fmt::{RedisFormat, Group};
17use com::*;
18use codec::*;
19use types::*;
20use consts::*;
21
22use std::io::{self, Read};
23use std::mem;
24
25pub struct DefaultRdbParser {
26 local_buf: Vec<u8>,
27 cursor: usize,
28 parsed: Vec<RdbEntry>,
29 state: State,
30 end: Vec<u8>,
31}
32
33impl Default for DefaultRdbParser {
34 fn default() -> Self {
35 DefaultRdbParser {
36 local_buf: Vec::new(),
37 cursor: 0,
38 parsed: Vec::new(),
39 state: State::Header,
40 end: Vec::new(),
41 }
42 }
43}
44
45
46impl DefaultRdbParser {
47 pub fn read_to_cmd<R: Read>(&mut self, read: &mut R) -> Result<Vec<RedisCmd>> {
48 let _readed = self.read_to_local(read)?;
49 loop {
50 match self.state {
51 State::Data => {
52 let data = match self.data() {
53 Err(Error::Other) => {
54 self.state = State::Crc;
55 continue;
56 }
57 other => other?,
58 };
59 self.cursor += data.shift();
60 self.parsed.push(data);
61 }
62 State::Sector => {
63 let sector = match self.sector() {
64 Err(Error::Other) => {
65 self.state = State::Crc;
66 continue;
67 }
68 otherwise => otherwise?,
69 };
70 self.cursor += sector.shift();
71 self.state = State::Data;
72 }
73 State::Header => {
74 let header = self.header()?;
75 self.cursor += header.shift();
76 self.state = State::Sector;
77 }
78 State::Crc => {
79 self.end = self.crc()?;
80 self.state = State::End;
81 }
82 State::End => {
83 break;
84 }
85 };
86 }
87
88 let entries = self.drain_buf();
89 let mut fmts = vec![];
90 for entry in entries {
91 entry.fmt(&mut fmts);
92 }
93 let groups = Group::group(fmts);
94 Ok(groups)
95 }
96
97 fn drain_buf(&mut self) -> Vec<RdbEntry> {
98 let mut entries = vec![];
99 mem::swap(&mut entries, &mut self.parsed);
100 self.cursor = 0;
101 entries
102 }
103}
104
105impl RdbParser for DefaultRdbParser {
106 fn read_to_local<R: Read>(&mut self, read: &mut R) -> Result<usize> {
107 let start_len = self.local_buf.len();
108 let mut len = start_len;
109 let mut new_write_size = 16;
110 let ret;
111 loop {
112 if len == self.local_buf.len() {
113 new_write_size *= 2;
114 self.local_buf.resize(len + new_write_size, 0);
115 }
116
117 match read.read(&mut self.local_buf[len..]) {
118 Ok(0) => {
119 ret = Ok(len - start_len);
120 break;
121 }
122 Ok(n) => len += n,
123 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
124 ret = Ok(len - start_len);
125 break;
126 }
127 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
128 Err(e) => {
129 ret = Err(Error::IoError(e));
130 break;
131 }
132 }
133 }
134
135 self.local_buf.truncate(len);
136 ret
137 }
138
139 fn local_buf(&self) -> &[u8] {
140 &self.local_buf[min(self.cursor, self.local_buf.len())..]
141 }
142}
143
144
145trait RdbParser {
146 fn read_to_local<R: Read>(&mut self, read: &mut R) -> Result<usize>;
147 fn local_buf(&self) -> &[u8];
148
149 fn crc(&mut self) -> Result<Vec<u8>> {
150 let src = self.local_buf();
151 other!(src[0] != 0xff);
152 Ok(src[1..].to_vec())
153 }
154
155 fn header(&mut self) -> Result<RdbEntry> {
156 let src = self.local_buf();
157 more!(src.len() < REDIS_MAGIC_STRING.len() + 4);
158 let version = &src[REDIS_MAGIC_STRING.len()..REDIS_MAGIC_STRING.len() + 4];
159 let version_str = String::from_utf8_lossy(version);
160 let version_u32 = version_str.parse::<u32>().unwrap();
161 Ok(RdbEntry::Version(version_u32))
162 }
163
164 fn sector(&mut self) -> Result<RdbEntry> {
165 let src = self.local_buf();
166 more!(src.len() < 2);
167 other!(src[0] != REDIS_RDB_OPCODE_SELECTDB);
168 let length = Length::from_buf(&src[1..])?;
169 Ok(RdbEntry::Sector(length))
170 }
171
172 fn data(&mut self) -> Result<RdbEntry> {
173 let src = self.local_buf();
174 if src[0] == 0xff {
176 return Err(Error::Other);
177 }
178 let expire = ExpireTime::from_buf(src)?;
179 let data = RedisData::from_buf(&src[expire.shift()..])?;
180 Ok(RdbEntry::Data {
181 expire: expire,
182 data: data,
183 })
184 }
185}
186
187
188#[derive(Debug)]
189enum State {
190 Header,
191 Sector,
192 Data,
193 Crc,
194 End,
195}
196
197#[derive(Debug)]
198enum RdbEntry {
199 Version(u32),
200 Sector(Length),
201 Data { expire: ExpireTime, data: RedisData },
202}
203
204impl Shift for RdbEntry {
205 #[inline]
206 fn shift(&self) -> usize {
207 match self {
208 &RdbEntry::Version(_) => 5 + 4,
210 &RdbEntry::Sector(_) => 2,
212 &RdbEntry::Data { ref expire, ref data } => expire.shift() + data.shift(),
213 }
214 }
215}
216
217impl RedisFormat for RdbEntry {
218 fn fmt(self, buf: &mut Vec<RedisFmt>) -> usize {
219 match self {
220 RdbEntry::Data { expire, data } => {
221 let key = data.copy_key();
222 let mut count = data.fmt(buf);
223 count += expire.fmt(key, buf);
224 count
225 }
226 _ => 0,
227 }
228 }
229}