1use crate::{RedisValue, RedisResult, RedisError, RedisErrorKind, FromRedisValue, from_redis_value};
2use std::num::ParseIntError;
3use std::error::Error;
4use std::fmt;
5use std::collections::HashMap;
6
7#[derive(Clone, PartialEq, PartialOrd)]
8pub struct EntryId((u64, u64));
9
10#[derive(Clone, PartialEq)]
12pub struct StreamEntry {
13 pub stream: String,
15 pub id: EntryId,
17 pub values: HashMap<String, RedisValue>,
21}
22
23#[derive(Clone, PartialEq)]
25pub struct RangeEntry {
26 pub id: EntryId,
28 pub values: HashMap<String, RedisValue>,
32}
33
34impl StreamEntry {
35 pub(crate) fn new(stream: String, id: EntryId, values: HashMap<String, RedisValue>) -> Self {
36 StreamEntry {
37 stream,
38 id,
39 values,
40 }
41 }
42}
43
44impl RangeEntry {
45 pub(crate) fn new(id: EntryId, values: HashMap<String, RedisValue>) -> Self {
46 RangeEntry {
47 id,
48 values,
49 }
50 }
51}
52
53pub(crate) fn parse_stream_entries(value: RedisValue) -> RedisResult<Vec<StreamEntry>> {
55 const LEN_FACTOR: usize = 1;
58
59 let streams: Vec<StreamInfo> = from_redis_value(&value)?;
60
61 let capacity = streams.len() * LEN_FACTOR;
62 let mut stream_entries: Vec<StreamEntry> = Vec::with_capacity(capacity);
63
64 for StreamInfo { id, entries } in streams.into_iter() {
66 for entry in entries.into_iter() {
67 let stream_entry =
68 StreamEntry::new(id.clone(), EntryId::from_string(entry.id)?, entry.key_values);
69
70 stream_entries.push(stream_entry);
71 }
72 }
73
74 Ok(stream_entries)
75}
76
77pub fn parse_range_entries(value: RedisValue) -> RedisResult<Vec<RangeEntry>> {
79 let entries: Vec<EntryInfo> = from_redis_value(&value)?;
80
81 let mut result_entries: Vec<RangeEntry> = Vec::with_capacity(entries.len());
82
83 for entry in entries.into_iter() {
85 let entry =
86 RangeEntry::new(EntryId::from_string(entry.id)?, entry.key_values);
87
88 result_entries.push(entry);
89 }
90
91 Ok(result_entries)
92}
93
94struct StreamInfo {
96 id: String,
97 entries: Vec<EntryInfo>,
98}
99
100#[derive(Debug)]
102struct EntryInfo {
103 id: String,
104 key_values: HashMap<String, RedisValue>,
105}
106
107#[derive(Clone)]
108pub enum RangeType {
109 Any,
110 GreaterThan(EntryId),
111 LessThan(EntryId),
112 GreaterLessThan(EntryId, EntryId),
113}
114
115impl RangeType {
116 pub fn is_valid(&self) -> bool {
118 match self {
119 RangeType::GreaterLessThan(left, right) => left < right,
120 _ => true
121 }
122 }
123
124 pub(crate) fn to_left_right(&self) -> (String, String) {
125 match self {
126 RangeType::Any => ("-".to_string(), "+".to_string()),
127 RangeType::GreaterThan(left) => (left.to_string(), "+".to_string()),
128 RangeType::LessThan(right) => ("-".to_string(), right.to_string()),
129 RangeType::GreaterLessThan(left, right) => (left.to_string(), right.to_string()),
130 }
131 }
132}
133
134impl fmt::Debug for StreamEntry {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 write!(f, "(stream={}, id=\"{:?}\", {:?})", self.stream, self.id, self.values)?;
137 Ok(())
138 }
139}
140
141impl fmt::Debug for RangeEntry {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 write!(f, "(id=\"{:?}\", {:?})", self.id, self.values)?;
144 Ok(())
145 }
146}
147
148impl fmt::Debug for EntryId {
149 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150 write!(f, "{}", self.to_string())?;
151 Ok(())
152 }
153}
154
155impl EntryId {
156 pub fn new(ms: u64, id: u64) -> EntryId {
157 EntryId((ms, id))
158 }
159
160 pub(crate) fn from_string(id: String) -> RedisResult<EntryId> {
162 const ENTRY_ID_CHUNK_LEN: usize = 2;
163 const ENTRY_ID_MS_POS: usize = 0;
164 const ENTRY_ID_ID_POS: usize = 1;
165
166 let tokens: Vec<&str>
167 = id.split('-').filter(|token| !token.is_empty()).collect();
168 if tokens.len() != ENTRY_ID_CHUNK_LEN {
169 return Err(
170 RedisError::new(
171 RedisErrorKind::ParseError,
172 format!("Couldn't parse a Redis entry id: {:?}", &id))
173 );
174 }
175
176 let ms = tokens[ENTRY_ID_MS_POS].parse::<u64>().map_err(&to_redis_error)?;
177 let id = tokens[ENTRY_ID_ID_POS].parse::<u64>().map_err(&to_redis_error)?;
178 Ok(Self((ms, id)))
179 }
180
181 pub fn to_string(&self) -> String {
182 format!("{}-{}", (self.0).0, (self.0).1)
183 }
184}
185
186fn to_redis_error(err: ParseIntError) -> RedisError {
187 RedisError::new(RedisErrorKind::ParseError, err.description().to_string())
188}
189
190impl FromRedisValue for EntryInfo {
191 fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
192 let (id, key_values): (String, HashMap<String, RedisValue>) = from_redis_value(value)?;
193
194 Ok(EntryInfo { id, key_values })
195 }
196}
197
198impl FromRedisValue for StreamInfo {
199 fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
200 let (id, entries): (String, Vec<EntryInfo>) = from_redis_value(value)?;
201 Ok(Self { id, entries })
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn common_test_parse_stream_entry() {
211 let entry1 = RedisValue::Array(vec![
212 RedisValue::BulkString(b"1581870410019-0".to_vec()),
213 RedisValue::Array(vec![
214 RedisValue::BulkString(b"1key1".to_vec()),
215 RedisValue::BulkString(b"1value1".to_vec()),
216 RedisValue::BulkString(b"1key2".to_vec()),
217 RedisValue::Int(2)
218 ])
219 ]);
220
221 let entry2 = RedisValue::Array(vec![
222 RedisValue::BulkString(b"1581870414714-0".to_vec()),
223 RedisValue::Array(vec![
224 RedisValue::BulkString(b"2key1".to_vec()),
225 RedisValue::BulkString(b"2value1".to_vec()),
226 RedisValue::BulkString(b"2key2".to_vec()),
227 RedisValue::BulkString(b"2value2".to_vec()),
228 RedisValue::BulkString(b"2key3".to_vec()),
229 RedisValue::BulkString(b"2value3".to_vec())
230 ])
231 ]);
232
233 let entry3 = RedisValue::Array(vec![
234 RedisValue::BulkString(b"1581855076637-0".to_vec()),
235 RedisValue::Array(vec![
236 RedisValue::BulkString(b"3key1".to_vec()),
237 RedisValue::BulkString(b"3value1".to_vec())
238 ])
239 ]);
240
241 let stream1 = RedisValue::Array(vec![
242 RedisValue::BulkString(b"stream1".to_vec()),
243 RedisValue::Array(vec![
244 entry1,
245 entry2
246 ])
247 ]);
248
249 let stream2 = RedisValue::Array(vec![
250 RedisValue::BulkString(b"stream2".to_vec()),
251 RedisValue::Array(vec![entry3])
252 ]);
253
254 let value = RedisValue::Array(vec![stream1, stream2]);
255
256 let result = parse_stream_entries(value).unwrap();
257
258 let mut entry1: HashMap<String, RedisValue> = HashMap::new();
259 entry1.insert("1key1".to_string(), RedisValue::BulkString(b"1value1".to_vec()));
260 entry1.insert("1key2".to_string(), RedisValue::Int(2));
261
262 let mut entry2: HashMap<String, RedisValue> = HashMap::new();
263 entry2.insert("2key1".to_string(), RedisValue::BulkString(b"2value1".to_vec()));
264 entry2.insert("2key2".to_string(), RedisValue::BulkString(b"2value2".to_vec()));
265 entry2.insert("2key3".to_string(), RedisValue::BulkString(b"2value3".to_vec()));
266
267 let mut entry3: HashMap<String, RedisValue> = HashMap::new();
268 entry3.insert("3key1".to_string(), RedisValue::BulkString(b"3value1".to_vec()));
269
270 let origin = vec![
271 StreamEntry::new("stream1".to_string(), EntryId((1581870410019, 0)), entry1),
272 StreamEntry::new("stream1".to_string(), EntryId((1581870414714, 0)), entry2),
273 StreamEntry::new("stream2".to_string(), EntryId((1581855076637, 0)), entry3)
274 ];
275
276 assert_eq!(origin, result);
277 }
278
279 #[test]
280 fn test_invalid_entry_id() {
281 let entry = RedisValue::Array(vec![
282 RedisValue::BulkString(b"1581855076637x0".to_vec()),
284 RedisValue::Array(vec![
285 RedisValue::BulkString(b"key".to_vec()),
286 RedisValue::Int(2)
287 ])
288 ]);
289
290 let stream = RedisValue::Array(vec![
291 RedisValue::BulkString(b"stream".to_vec()),
292 RedisValue::Array(vec![entry])
293 ]);
294
295 let value = RedisValue::Array(vec![stream]);
296
297 assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
298 }
299
300 #[test]
301 fn test_invalid_key_value() {
302 let entry = RedisValue::Array(vec![
303 RedisValue::BulkString(b"1581855076637-0".to_vec()),
304 RedisValue::Array(vec![
305 RedisValue::BulkString(b"key".to_vec())
307 ])
308 ]);
309
310 let stream = RedisValue::Array(vec![
311 RedisValue::BulkString(b"stream".to_vec()),
312 RedisValue::Array(vec![entry])
313 ]);
314
315 let value = RedisValue::Array(vec![stream]);
316
317 assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
318 }
319
320 #[test]
321 fn test_invalid_entry_structure() {
322 let entry = RedisValue::Array(vec![
323 RedisValue::BulkString(b"1581855076637-0".to_vec())
325 ]);
326
327 let stream = RedisValue::Array(vec![
328 RedisValue::BulkString(b"stream".to_vec()),
329 RedisValue::Array(vec![entry])
330 ]);
331
332 let value = RedisValue::Array(vec![stream]);
333
334 assert!(parse_stream_entries(value).is_err(), "Expect an parse error");
335 }
336}