redis_event/
rdb.rs

1/*!
2RDB中各项Redis数据相关的结构体定义,以及RDB解析相关的代码在此模块下
3*/
4use core::result;
5use std::any::Any;
6use std::cmp;
7use std::collections::BTreeMap;
8use std::fmt::{Debug, Error, Formatter};
9use std::io::{Cursor, Read, Result};
10use std::sync::atomic::{AtomicBool, Ordering};
11
12use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
13use log::info;
14
15use crate::cmd::connection::SELECT;
16use crate::cmd::Command;
17use crate::iter::{IntSetIter, Iter, QuickListIter, SortedSetIter, StrValIter, ZipListIter, ZipMapIter};
18use crate::{lzf, to_string, Event, EventHandler, ModuleParser, RDBParser};
19use std::cell::RefCell;
20use std::f64::{INFINITY, NAN, NEG_INFINITY};
21use std::iter::FromIterator;
22use std::rc::Rc;
23use std::str::FromStr;
24use std::sync::Arc;
25
26/// 一些解析RDB数据的方法
27pub trait RDBDecode: Read {
28    /// 读取redis响应中下一条数据的长度
29    fn read_length(&mut self) -> Result<(isize, bool)> {
30        let byte = self.read_u8()?;
31        let _type = (byte & 0xC0) >> 6;
32
33        let mut result = -1;
34        let mut is_encoded = false;
35
36        if _type == RDB_ENCVAL {
37            result = (byte & 0x3F) as isize;
38            is_encoded = true;
39        } else if _type == RDB_6BITLEN {
40            result = (byte & 0x3F) as isize;
41        } else if _type == RDB_14BITLEN {
42            let next_byte = self.read_u8()?;
43            result = (((byte as u16 & 0x3F) << 8) | next_byte as u16) as isize;
44        } else if byte == RDB_32BITLEN {
45            result = self.read_integer(4, true)?;
46        } else if byte == RDB_64BITLEN {
47            result = self.read_integer(8, true)?;
48        };
49        Ok((result, is_encoded))
50    }
51
52    /// 从流中读取一个Integer
53    fn read_integer(&mut self, size: isize, is_big_endian: bool) -> Result<isize> {
54        let mut buff = vec![0; size as usize];
55        self.read_exact(&mut buff)?;
56        let mut cursor = Cursor::new(&buff);
57
58        if is_big_endian {
59            if size == 2 {
60                return Ok(cursor.read_i16::<BigEndian>()? as isize);
61            } else if size == 4 {
62                return Ok(cursor.read_i32::<BigEndian>()? as isize);
63            } else if size == 8 {
64                return Ok(cursor.read_i64::<BigEndian>()? as isize);
65            };
66        } else {
67            if size == 2 {
68                return Ok(cursor.read_i16::<LittleEndian>()? as isize);
69            } else if size == 4 {
70                return Ok(cursor.read_i32::<LittleEndian>()? as isize);
71            } else if size == 8 {
72                return Ok(cursor.read_i64::<LittleEndian>()? as isize);
73            };
74        }
75        panic!("Invalid integer size: {}", size)
76    }
77
78    /// 从流中读取一个string
79    fn read_string(&mut self) -> Result<Vec<u8>> {
80        let (length, is_encoded) = self.read_length()?;
81        if is_encoded {
82            match length {
83                RDB_ENC_INT8 => {
84                    let int = self.read_i8()?;
85                    return Ok(int.to_string().into_bytes());
86                }
87                RDB_ENC_INT16 => {
88                    let int = self.read_integer(2, false)?;
89                    return Ok(int.to_string().into_bytes());
90                }
91                RDB_ENC_INT32 => {
92                    let int = self.read_integer(4, false)?;
93                    return Ok(int.to_string().into_bytes());
94                }
95                RDB_ENC_LZF => {
96                    let (compressed_len, _) = self.read_length()?;
97                    let (origin_len, _) = self.read_length()?;
98                    let mut compressed = vec![0; compressed_len as usize];
99                    self.read_exact(&mut compressed)?;
100                    let mut origin = vec![0; origin_len as usize];
101                    lzf::decompress(&mut compressed, compressed_len, &mut origin, origin_len);
102                    return Ok(origin);
103                }
104                _ => panic!("Invalid string length: {}", length),
105            };
106        };
107        let mut buff = vec![0; length as usize];
108        self.read_exact(&mut buff)?;
109        Ok(buff)
110    }
111
112    /// 从流中读取一个double
113    fn read_double(&mut self) -> Result<f64> {
114        let len = self.read_u8()?;
115        return match len {
116            255 => Ok(NEG_INFINITY),
117            254 => Ok(INFINITY),
118            253 => Ok(NAN),
119            _ => {
120                let mut buff = vec![0; len as usize];
121                self.read_exact(&mut buff)?;
122                let score_str = to_string(buff);
123                let score = score_str.parse::<f64>().unwrap();
124                Ok(score)
125            }
126        };
127    }
128}
129
130impl<R: Read + ?Sized> RDBDecode for R {}
131
132pub(crate) struct DefaultRDBParser {
133    pub(crate) running: Arc<AtomicBool>,
134    pub(crate) module_parser: Option<Rc<RefCell<dyn ModuleParser>>>,
135}
136
137impl RDBParser for DefaultRDBParser {
138    fn parse(&mut self, input: &mut dyn Read, _: i64, event_handler: &mut dyn EventHandler) -> Result<()> {
139        event_handler.handle(Event::RDB(Object::BOR));
140        let mut bytes = vec![0; 5];
141        // 开头5个字节: REDIS
142        input.read_exact(&mut bytes)?;
143        // 4个字节: rdb版本
144        input.read_exact(&mut bytes[..=3])?;
145        let rdb_version = String::from_utf8_lossy(&bytes[..=3]);
146        let rdb_version = rdb_version.parse::<isize>().unwrap();
147        let mut db = 0;
148
149        while self.running.load(Ordering::Relaxed) {
150            let mut meta = Meta {
151                db,
152                expire: None,
153                evict: None,
154            };
155
156            let data_type = input.read_u8()?;
157            match data_type {
158                RDB_OPCODE_AUX => {
159                    let field_name = input.read_string()?;
160                    let field_val = input.read_string()?;
161                    let field_name = to_string(field_name);
162                    let field_val = to_string(field_val);
163                    info!("{}:{}", field_name, field_val);
164                }
165                RDB_OPCODE_SELECTDB => {
166                    let (_db, _) = input.read_length()?;
167                    meta.db = _db;
168                    db = _db;
169                    let cmd = SELECT { db: _db as i32 };
170                    event_handler.handle(Event::AOF(Command::SELECT(&cmd)));
171                }
172                RDB_OPCODE_RESIZEDB => {
173                    let (total, _) = input.read_length()?;
174                    info!("db[{}] total keys: {}", db, total);
175                    let (expired, _) = input.read_length()?;
176                    info!("db[{}] expired keys: {}", db, expired);
177                }
178                RDB_OPCODE_EXPIRETIME | RDB_OPCODE_EXPIRETIME_MS => {
179                    if data_type == RDB_OPCODE_EXPIRETIME_MS {
180                        let expired_time = input.read_integer(8, false)?;
181                        meta.expire = Option::Some((ExpireType::Millisecond, expired_time as i64));
182                    } else {
183                        let expired_time = input.read_integer(4, false)?;
184                        meta.expire = Option::Some((ExpireType::Second, expired_time as i64));
185                    }
186                    let value_type = input.read_u8()?;
187                    match value_type {
188                        RDB_OPCODE_FREQ => {
189                            let val = input.read_u8()?;
190                            let value_type = input.read_u8()?;
191                            meta.evict = Option::Some((EvictType::LFU, val as i64));
192                            self.read_object(input, value_type, event_handler, &meta)?;
193                        }
194                        RDB_OPCODE_IDLE => {
195                            let (val, _) = input.read_length()?;
196                            let value_type = input.read_u8()?;
197                            meta.evict = Option::Some((EvictType::LRU, val as i64));
198                            self.read_object(input, value_type, event_handler, &meta)?;
199                        }
200                        _ => {
201                            self.read_object(input, value_type, event_handler, &meta)?;
202                        }
203                    }
204                }
205                RDB_OPCODE_FREQ => {
206                    let val = input.read_u8()?;
207                    let value_type = input.read_u8()?;
208                    meta.evict = Option::Some((EvictType::LFU, val as i64));
209                    self.read_object(input, value_type, event_handler, &meta)?;
210                }
211                RDB_OPCODE_IDLE => {
212                    let (val, _) = input.read_length()?;
213                    meta.evict = Option::Some((EvictType::LRU, val as i64));
214                    let value_type = input.read_u8()?;
215                    self.read_object(input, value_type, event_handler, &meta)?;
216                }
217                RDB_OPCODE_MODULE_AUX => {
218                    input.read_length()?;
219                    self.rdb_load_check_module_value(input)?;
220                }
221                RDB_OPCODE_EOF => {
222                    if rdb_version >= 5 {
223                        input.read_integer(8, true)?;
224                    }
225                    break;
226                }
227                _ => {
228                    self.read_object(input, data_type, event_handler, &meta)?;
229                }
230            };
231        }
232        event_handler.handle(Event::RDB(Object::EOR));
233        Ok(())
234    }
235}
236
237impl DefaultRDBParser {
238    // 根据传入的数据类型,从流中读取对应类型的数据
239    fn read_object(
240        &mut self, input: &mut dyn Read, value_type: u8, event_handler: &mut dyn EventHandler, meta: &Meta,
241    ) -> Result<()> {
242        match value_type {
243            RDB_TYPE_STRING => {
244                let key = input.read_string()?;
245                let value = input.read_string()?;
246                event_handler.handle(Event::RDB(Object::String(KeyValue {
247                    key: &key,
248                    value: &value,
249                    meta,
250                })));
251            }
252            RDB_TYPE_LIST | RDB_TYPE_SET => {
253                let key = input.read_string()?;
254                let (count, _) = input.read_length()?;
255                let mut iter = StrValIter { count, input };
256
257                let mut has_more = true;
258                while has_more {
259                    let mut val = Vec::new();
260                    for _ in 0..BATCH_SIZE {
261                        if let Ok(next_val) = iter.next() {
262                            val.push(next_val);
263                        } else {
264                            has_more = false;
265                            break;
266                        }
267                    }
268                    if !val.is_empty() {
269                        if value_type == RDB_TYPE_LIST {
270                            event_handler.handle(Event::RDB(Object::List(List {
271                                key: &key,
272                                values: &val,
273                                meta,
274                            })));
275                        } else {
276                            event_handler.handle(Event::RDB(Object::Set(Set {
277                                key: &key,
278                                members: &val,
279                                meta,
280                            })));
281                        }
282                    }
283                }
284            }
285            RDB_TYPE_ZSET => {
286                let key = input.read_string()?;
287                let (count, _) = input.read_length()?;
288                let mut iter = SortedSetIter { count, v: 1, input };
289
290                let mut has_more = true;
291                while has_more {
292                    let mut val = Vec::new();
293                    for _ in 0..BATCH_SIZE {
294                        if let Ok(next_val) = iter.next() {
295                            val.push(next_val);
296                        } else {
297                            has_more = false;
298                            break;
299                        }
300                    }
301                    if !val.is_empty() {
302                        event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
303                            key: &key,
304                            items: &val,
305                            meta,
306                        })));
307                    }
308                }
309            }
310            RDB_TYPE_ZSET_2 => {
311                let key = input.read_string()?;
312                let (count, _) = input.read_length()?;
313                let mut iter = SortedSetIter { count, v: 2, input };
314
315                let mut has_more = true;
316                while has_more {
317                    let mut val = Vec::new();
318                    for _ in 0..BATCH_SIZE {
319                        if let Ok(next_val) = iter.next() {
320                            val.push(next_val);
321                        } else {
322                            has_more = false;
323                            break;
324                        }
325                    }
326                    if !val.is_empty() {
327                        event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
328                            key: &key,
329                            items: &val,
330                            meta,
331                        })));
332                    }
333                }
334            }
335            RDB_TYPE_HASH => {
336                let key = input.read_string()?;
337                let (count, _) = input.read_length()?;
338                let mut iter = StrValIter {
339                    count: count * 2,
340                    input,
341                };
342
343                let mut has_more = true;
344                while has_more {
345                    let mut val = Vec::new();
346                    for _ in 0..BATCH_SIZE {
347                        let name;
348                        let value;
349                        if let Ok(next_val) = iter.next() {
350                            name = next_val;
351                            value = iter.next().expect("missing hash field value");
352                            val.push(Field { name, value });
353                        } else {
354                            has_more = false;
355                            break;
356                        }
357                    }
358                    if !val.is_empty() {
359                        event_handler.handle(Event::RDB(Object::Hash(Hash {
360                            key: &key,
361                            fields: &val,
362                            meta,
363                        })));
364                    }
365                }
366            }
367            RDB_TYPE_HASH_ZIPMAP => {
368                let key = input.read_string()?;
369                let bytes = input.read_string()?;
370                let cursor = &mut Cursor::new(&bytes);
371                cursor.set_position(1);
372                let mut iter = ZipMapIter { has_more: true, cursor };
373
374                let mut has_more = true;
375                while has_more {
376                    let mut fields = Vec::new();
377                    for _ in 0..BATCH_SIZE {
378                        if let Ok(field) = iter.next() {
379                            fields.push(field);
380                        } else {
381                            has_more = false;
382                            break;
383                        }
384                    }
385                    if !fields.is_empty() {
386                        event_handler.handle(Event::RDB(Object::Hash(Hash {
387                            key: &key,
388                            fields: &fields,
389                            meta,
390                        })));
391                    }
392                }
393            }
394            RDB_TYPE_LIST_ZIPLIST => {
395                let key = input.read_string()?;
396                let bytes = input.read_string()?;
397                let cursor = &mut Cursor::new(bytes);
398                // 跳过ZL_BYTES和ZL_TAIL
399                cursor.set_position(8);
400                let count = cursor.read_u16::<LittleEndian>()? as isize;
401                let mut iter = ZipListIter { count, cursor };
402
403                let mut has_more = true;
404                while has_more {
405                    let mut val = Vec::new();
406                    for _ in 0..BATCH_SIZE {
407                        if let Ok(next_val) = iter.next() {
408                            val.push(next_val);
409                        } else {
410                            has_more = false;
411                            break;
412                        }
413                    }
414                    if !val.is_empty() {
415                        event_handler.handle(Event::RDB(Object::List(List {
416                            key: &key,
417                            values: &val,
418                            meta,
419                        })));
420                    }
421                }
422            }
423            RDB_TYPE_HASH_ZIPLIST => {
424                let key = input.read_string()?;
425                let bytes = input.read_string()?;
426                let cursor = &mut Cursor::new(bytes);
427                // 跳过ZL_BYTES和ZL_TAIL
428                cursor.set_position(8);
429                let count = cursor.read_u16::<LittleEndian>()? as isize;
430                let mut iter = ZipListIter { count, cursor };
431
432                let mut has_more = true;
433                while has_more {
434                    let mut val = Vec::new();
435                    for _ in 0..BATCH_SIZE {
436                        let name;
437                        let value;
438                        if let Ok(next_val) = iter.next() {
439                            name = next_val;
440                            value = iter.next().expect("missing hash field value");
441                            val.push(Field { name, value });
442                        } else {
443                            has_more = false;
444                            break;
445                        }
446                    }
447                    if !val.is_empty() {
448                        event_handler.handle(Event::RDB(Object::Hash(Hash {
449                            key: &key,
450                            fields: &val,
451                            meta,
452                        })));
453                    }
454                }
455            }
456            RDB_TYPE_ZSET_ZIPLIST => {
457                let key = input.read_string()?;
458                let bytes = input.read_string()?;
459                let cursor = &mut Cursor::new(bytes);
460                // 跳过ZL_BYTES和ZL_TAIL
461                cursor.set_position(8);
462                let count = cursor.read_u16::<LittleEndian>()? as isize;
463                let mut iter = ZipListIter { count, cursor };
464
465                let mut has_more = true;
466                while has_more {
467                    let mut val = Vec::new();
468                    for _ in 0..BATCH_SIZE {
469                        let member;
470                        let score: f64;
471                        if let Ok(next_val) = iter.next() {
472                            member = next_val;
473                            let score_str = to_string(iter.next().expect("missing sorted set element's score"));
474                            score = score_str.parse::<f64>().unwrap();
475                            val.push(Item { member, score });
476                        } else {
477                            has_more = false;
478                            break;
479                        }
480                    }
481                    if !val.is_empty() {
482                        event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
483                            key: &key,
484                            items: &val,
485                            meta,
486                        })));
487                    }
488                }
489            }
490            RDB_TYPE_SET_INTSET => {
491                let key = input.read_string()?;
492                let bytes = input.read_string()?;
493                let mut cursor = Cursor::new(&bytes);
494                let encoding = cursor.read_i32::<LittleEndian>()?;
495                let length = cursor.read_u32::<LittleEndian>()?;
496                let mut iter = IntSetIter {
497                    encoding,
498                    count: length as isize,
499                    cursor: &mut cursor,
500                };
501
502                let mut has_more = true;
503                while has_more {
504                    let mut val = Vec::new();
505                    for _ in 0..BATCH_SIZE {
506                        if let Ok(next_val) = iter.next() {
507                            val.push(next_val);
508                        } else {
509                            has_more = false;
510                            break;
511                        }
512                    }
513                    if !val.is_empty() {
514                        event_handler.handle(Event::RDB(Object::Set(Set {
515                            key: &key,
516                            members: &val,
517                            meta,
518                        })));
519                    }
520                }
521            }
522            RDB_TYPE_LIST_QUICKLIST => {
523                let key = input.read_string()?;
524                let (count, _) = input.read_length()?;
525                let mut iter = QuickListIter {
526                    len: -1,
527                    count,
528                    input,
529                    cursor: Option::None,
530                };
531
532                let mut has_more = true;
533                while has_more {
534                    let mut val = Vec::new();
535                    for _ in 0..BATCH_SIZE {
536                        if let Ok(next_val) = iter.next() {
537                            val.push(next_val);
538                        } else {
539                            has_more = false;
540                            break;
541                        }
542                    }
543                    if !val.is_empty() {
544                        event_handler.handle(Event::RDB(Object::List(List {
545                            key: &key,
546                            values: &val,
547                            meta,
548                        })));
549                    }
550                }
551            }
552            RDB_TYPE_MODULE | RDB_TYPE_MODULE_2 => {
553                let key = input.read_string()?;
554                let (module_id, _) = input.read_length()?;
555                let module_id = module_id as usize;
556                let mut array: [char; 9] = [' '; 9];
557                for i in 0..array.len() {
558                    let i1 = 10 + (array.len() - 1 - i) * 6;
559                    let i2 = (module_id >> i1 as usize) as usize;
560                    let i3 = i2 & 63;
561                    let chr = MODULE_SET.get(i3).unwrap();
562                    array[i] = *chr;
563                }
564                let module_name: String = String::from_iter(array.iter());
565                let module_version: usize = module_id & 1023;
566                if self.module_parser.is_none() && value_type == RDB_TYPE_MODULE {
567                    panic!("MODULE {}, version {} 无法解析", module_name, module_version);
568                }
569                if let Some(parser) = &mut self.module_parser {
570                    let module: Box<dyn Module>;
571                    if value_type == RDB_TYPE_MODULE_2 {
572                        module = parser.borrow_mut().parse(input, &module_name, 2);
573                        let (len, _) = input.read_length()?;
574                        if len != 0 {
575                            panic!(
576                                "module '{}' that is not terminated by EOF marker, but {}",
577                                &module_name, len
578                            );
579                        }
580                    } else {
581                        module = parser.borrow_mut().parse(input, &module_name, module_version);
582                    }
583                    event_handler.handle(Event::RDB(Object::Module(key, module, meta)));
584                } else {
585                    // 没有parser,并且是Module 2类型的值,那就可以直接跳过了
586                    self.rdb_load_check_module_value(input)?;
587                }
588            }
589            RDB_TYPE_STREAM_LISTPACKS => {
590                let key = input.read_string()?;
591                let stream = self.read_stream_list_packs(meta, input)?;
592                event_handler.handle(Event::RDB(Object::Stream(key, stream)));
593            }
594            _ => panic!("unknown data type: {}", value_type),
595        }
596        Ok(())
597    }
598
599    fn rdb_load_check_module_value(&mut self, input: &mut dyn Read) -> Result<()> {
600        loop {
601            let (op_code, _) = input.read_length()?;
602            if op_code == RDB_MODULE_OPCODE_EOF {
603                break;
604            }
605            if op_code == RDB_MODULE_OPCODE_SINT || op_code == RDB_MODULE_OPCODE_UINT {
606                input.read_length()?;
607            } else if op_code == RDB_MODULE_OPCODE_STRING {
608                input.read_string()?;
609            } else if op_code == RDB_MODULE_OPCODE_FLOAT {
610                input.read_exact(&mut [0; 4])?;
611            } else if op_code == RDB_MODULE_OPCODE_DOUBLE {
612                input.read_exact(&mut [0; 8])?;
613            }
614        }
615        Ok(())
616    }
617
618    fn read_stream_list_packs<'a>(&mut self, meta: &'a Meta, input: &mut dyn Read) -> Result<Stream<'a>> {
619        let mut entries: BTreeMap<ID, Entry> = BTreeMap::new();
620        let (length, _) = input.read_length()?;
621        for _ in 0..length {
622            let raw_id = input.read_string()?;
623            let mut cursor = Cursor::new(&raw_id);
624            let ms = read_long(&mut cursor, 8, false)?;
625            let seq = read_long(&mut cursor, 8, false)?;
626            let base_id = ID { ms, seq };
627            let raw_list_packs = input.read_string()?;
628            let mut list_pack = Cursor::new(&raw_list_packs);
629            list_pack.set_position(6);
630            let count = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
631            let deleted = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
632            let num_fields = i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
633            let mut tmp_fields = Vec::with_capacity(num_fields as usize);
634            for _ in 0..num_fields {
635                tmp_fields.push(read_list_pack_entry(&mut list_pack)?);
636            }
637            read_list_pack_entry(&mut list_pack)?;
638
639            let total = count + deleted;
640            for _ in 0..total {
641                let mut fields = BTreeMap::new();
642                let flag = i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
643                let ms = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
644                let seq = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
645                let id = ID {
646                    ms: ms + base_id.ms,
647                    seq: seq + base_id.seq,
648                };
649                let deleted = (flag & 1) != 0;
650                if (flag & 2) != 0 {
651                    for i in 0..num_fields {
652                        let value = read_list_pack_entry(&mut list_pack)?;
653                        let field = tmp_fields.get(i as usize).unwrap().to_vec();
654                        fields.insert(field, value);
655                    }
656                    entries.insert(id, Entry { id, deleted, fields });
657                } else {
658                    let num_fields = i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
659                    for _ in 0..num_fields {
660                        let field = read_list_pack_entry(&mut list_pack)?;
661                        let value = read_list_pack_entry(&mut list_pack)?;
662                        fields.insert(field, value);
663                    }
664                    entries.insert(id, Entry { id, deleted, fields });
665                }
666                read_list_pack_entry(&mut list_pack)?;
667            }
668            let end = list_pack.read_u8()?;
669            if end != 255 {
670                panic!("listpack expect 255 but {}", end);
671            }
672        }
673        input.read_length()?;
674        input.read_length()?;
675        input.read_length()?;
676
677        let mut groups: Vec<Group> = Vec::new();
678        let (count, _) = input.read_length()?;
679        for _ in 0..count {
680            let name = input.read_string()?;
681            let (ms, _) = input.read_length()?;
682            let (seq, _) = input.read_length()?;
683            let group_last_id = ID {
684                ms: ms as i64,
685                seq: seq as i64,
686            };
687            groups.push(Group {
688                name,
689                last_id: group_last_id,
690            });
691
692            let (global_pel, _) = input.read_length()?;
693            for _ in 0..global_pel {
694                read_long(input, 8, false)?;
695                read_long(input, 8, false)?;
696                input.read_integer(8, false)?;
697                input.read_length()?;
698            }
699
700            let (consumer_count, _) = input.read_length()?;
701            for _ in 0..consumer_count {
702                input.read_string()?;
703                input.read_integer(8, false)?;
704
705                let (pel, _) = input.read_length()?;
706                for _ in 0..pel {
707                    read_long(input, 8, false)?;
708                    read_long(input, 8, false)?;
709                }
710            }
711        }
712        Ok(Stream { entries, groups, meta })
713    }
714}
715
716fn read_long(input: &mut dyn Read, length: i32, little_endian: bool) -> Result<i64> {
717    let mut r: i64 = 0;
718    for i in 0..length {
719        let v: i64 = input.read_u8()? as i64;
720        if little_endian {
721            r |= v << (i << 3) as i64;
722        } else {
723            r = (r << 8) | v;
724        }
725    }
726    Ok(r)
727}
728
729fn read_list_pack_entry(input: &mut dyn Read) -> Result<Vec<u8>> {
730    let special = input.read_u8()? as i32;
731    let skip: i32;
732    let mut bytes;
733    if (special & 0x80) == 0 {
734        skip = 1;
735        let value = special & 0x7F;
736        let value = value.to_string();
737        bytes = value.into_bytes();
738    } else if (special & 0xC0) == 0x80 {
739        let len = special & 0x3F;
740        skip = 1 + len as i32;
741        bytes = vec![0; len as usize];
742        input.read_exact(&mut bytes)?;
743    } else if (special & 0xE0) == 0xC0 {
744        skip = 2;
745        let next = input.read_u8()?;
746        let value = (((special & 0x1F) << 8) | next as i32) << 19 >> 19;
747        let value = value.to_string();
748        bytes = value.into_bytes();
749    } else if (special & 0xFF) == 0xF1 {
750        skip = 3;
751        let value = input.read_i16::<LittleEndian>()?;
752        let value = value.to_string();
753        bytes = value.into_bytes();
754    } else if (special & 0xFF) == 0xF2 {
755        skip = 4;
756        let value = input.read_i24::<LittleEndian>()?;
757        let value = value.to_string();
758        bytes = value.into_bytes();
759    } else if (special & 0xFF) == 0xF3 {
760        skip = 5;
761        let value = input.read_i32::<LittleEndian>()?;
762        let value = value.to_string();
763        bytes = value.into_bytes();
764    } else if (special & 0xFF) == 0xF4 {
765        skip = 9;
766        let value = input.read_i64::<LittleEndian>()?;
767        let value = value.to_string();
768        bytes = value.into_bytes();
769    } else if (special & 0xF0) == 0xE0 {
770        let next = input.read_u8()?;
771        let len = ((special & 0x0F) << 8) | next as i32;
772        skip = 2 + len as i32;
773        bytes = vec![0; len as usize];
774        input.read_exact(&mut bytes)?;
775    } else if (special & 0xFF) == 0xF0 {
776        let len = input.read_u32::<BigEndian>()?;
777        skip = 5 + len as i32;
778        bytes = vec![0; len as usize];
779        input.read_exact(&mut bytes)?;
780    } else {
781        panic!("{}", special)
782    }
783    if skip <= 127 {
784        let mut buf = vec![0; 1];
785        input.read_exact(&mut buf)?;
786    } else if skip < 16383 {
787        let mut buf = vec![0; 2];
788        input.read_exact(&mut buf)?;
789    } else if skip < 2097151 {
790        let mut buf = vec![0; 3];
791        input.read_exact(&mut buf)?;
792    } else if skip < 268435455 {
793        let mut buf = vec![0; 4];
794        input.read_exact(&mut buf)?;
795    } else {
796        let mut buf = vec![0; 5];
797        input.read_exact(&mut buf)?;
798    }
799    Ok(bytes)
800}
801
802pub(crate) fn read_zm_len(cursor: &mut Cursor<&Vec<u8>>) -> Result<usize> {
803    let len = cursor.read_u8()?;
804    if len <= 253 {
805        return Ok(len as usize);
806    } else if len == 254 {
807        let value = cursor.read_u32::<BigEndian>()?;
808        return Ok(value as usize);
809    }
810    Ok(len as usize)
811}
812
813pub(crate) fn read_zip_list_entry(cursor: &mut Cursor<Vec<u8>>) -> Result<Vec<u8>> {
814    if cursor.read_u8()? >= 254 {
815        cursor.read_u32::<LittleEndian>()?;
816    }
817    let flag = cursor.read_u8()?;
818    match flag >> 6 {
819        0 => {
820            let length = flag & 0x3F;
821            let mut buff = vec![0; length as usize];
822            cursor.read_exact(&mut buff)?;
823            return Ok(buff);
824        }
825        1 => {
826            let next_byte = cursor.read_u8()?;
827            let length = (((flag as u16) & 0x3F) << 8) | (next_byte as u16);
828            let mut buff = vec![0; length as usize];
829            cursor.read_exact(&mut buff)?;
830            return Ok(buff);
831        }
832        2 => {
833            let length = cursor.read_u32::<BigEndian>()?;
834            let mut buff = vec![0; length as usize];
835            cursor.read_exact(&mut buff)?;
836            return Ok(buff);
837        }
838        _ => {}
839    }
840    return match flag {
841        ZIP_INT_8BIT => {
842            let int = cursor.read_i8()?;
843            Ok(int.to_string().into_bytes())
844        }
845        ZIP_INT_16BIT => {
846            let int = cursor.read_i16::<LittleEndian>()?;
847            Ok(int.to_string().into_bytes())
848        }
849        ZIP_INT_24BIT => {
850            let int = cursor.read_i24::<LittleEndian>()?;
851            Ok(int.to_string().into_bytes())
852        }
853        ZIP_INT_32BIT => {
854            let int = cursor.read_i32::<LittleEndian>()?;
855            Ok(int.to_string().into_bytes())
856        }
857        ZIP_INT_64BIT => {
858            let int = cursor.read_i64::<LittleEndian>()?;
859            Ok(int.to_string().into_bytes())
860        }
861        _ => {
862            let result = (flag - 0xF1) as isize;
863            Ok(result.to_string().into_bytes())
864        }
865    };
866}
867
868/// 封装Redis中的各种数据类型,由`RdbHandler`统一处理
869#[derive(Debug)]
870pub enum Object<'a> {
871    /// 代表Redis中的String类型数据
872    String(KeyValue<'a>),
873    /// 代表Redis中的List类型数据
874    List(List<'a>),
875    /// 代表Redis中的Set类型数据
876    Set(Set<'a>),
877    /// 代表Redis中的SortedSet类型数据
878    SortedSet(SortedSet<'a>),
879    /// 代表Redis中的Hash类型数据
880    Hash(Hash<'a>),
881    /// 代表Redis中的module, 需要额外实现Module解析器
882    Module(Vec<u8>, Box<dyn Module>, &'a Meta),
883    /// 代表Redis中的Stream类型数据
884    Stream(Vec<u8>, Stream<'a>),
885    /// 代表rdb数据解析开始
886    BOR,
887    /// 代表rdb数据解析完毕
888    EOR,
889}
890
891pub trait Module {
892    fn as_any(&self) -> &dyn Any;
893}
894
895impl Debug for dyn Module {
896    fn fmt(&self, _: &mut Formatter) -> result::Result<(), Error> {
897        unimplemented!()
898    }
899}
900
901/// 数据的元信息, 包括数据过期类型, 内存驱逐类型, 数据所属的db
902#[derive(Debug)]
903pub struct Meta {
904    /// 数据所属的db
905    pub db: isize,
906    /// 左为过期时间类型,右为过期时间
907    pub expire: Option<(ExpireType, i64)>,
908    /// 左为内存驱逐类型,右为被驱逐掉的值
909    pub evict: Option<(EvictType, i64)>,
910}
911
912/// 过期类型
913#[derive(Debug)]
914pub enum ExpireType {
915    /// 以秒计算过期时间
916    Second,
917    /// 以毫秒计算过期时间
918    Millisecond,
919}
920
921/// 内存驱逐类型
922#[derive(Debug)]
923pub enum EvictType {
924    /// Least Recently Used
925    LRU,
926    /// Least Frequently Used
927    LFU,
928}
929
930/// 代表Redis中的String类型数据
931#[derive(Debug)]
932pub struct KeyValue<'a> {
933    /// 数据的key
934    pub key: &'a [u8],
935    /// 数据的值
936    pub value: &'a [u8],
937    /// 数据的元信息
938    pub meta: &'a Meta,
939}
940
941/// 代表Redis中的List类型数据
942#[derive(Debug)]
943pub struct List<'a> {
944    /// 数据的key
945    pub key: &'a [u8],
946    /// Set中所有的元素
947    pub values: &'a [Vec<u8>],
948    /// 数据的元信息
949    pub meta: &'a Meta,
950}
951
952/// 代表Redis中的Set类型数据
953#[derive(Debug)]
954pub struct Set<'a> {
955    /// 数据的key
956    pub key: &'a [u8],
957    /// Set中所有的元素
958    pub members: &'a [Vec<u8>],
959    /// 数据的元信息
960    pub meta: &'a Meta,
961}
962
963/// 代表Redis中的SortedSet类型数据
964#[derive(Debug)]
965pub struct SortedSet<'a> {
966    /// 数据的key
967    pub key: &'a [u8],
968    /// SortedSet中所有的元素
969    pub items: &'a [Item],
970    /// 数据的元信息
971    pub meta: &'a Meta,
972}
973
974/// SortedSet中的一条元素
975#[derive(Debug)]
976pub struct Item {
977    /// 元素值
978    pub member: Vec<u8>,
979    /// 元素的排序分数
980    pub score: f64,
981}
982
983/// 代表Redis中的Hash类型数据
984#[derive(Debug)]
985pub struct Hash<'a> {
986    /// 数据的key
987    pub key: &'a [u8],
988    /// 数据所有的字段
989    pub fields: &'a [Field],
990    /// 数据的元信息
991    pub meta: &'a Meta,
992}
993
994/// Hash类型数据中的一个字段
995#[derive(Debug)]
996pub struct Field {
997    /// 字段名
998    pub name: Vec<u8>,
999    /// 字段值
1000    pub value: Vec<u8>,
1001}
1002
1003#[derive(Debug)]
1004pub struct Stream<'a> {
1005    pub entries: BTreeMap<ID, Entry>,
1006    pub groups: Vec<Group>,
1007    /// 数据的元信息
1008    pub meta: &'a Meta,
1009}
1010
1011#[derive(Debug, Eq, Copy, Clone)]
1012pub struct ID {
1013    pub ms: i64,
1014    pub seq: i64,
1015}
1016
1017impl ID {
1018    pub fn to_string(&self) -> String {
1019        format!("{}-{}", self.ms, self.seq)
1020    }
1021}
1022
1023impl PartialEq for ID {
1024    fn eq(&self, other: &Self) -> bool {
1025        self.ms == other.ms && self.seq == other.seq
1026    }
1027
1028    fn ne(&self, other: &Self) -> bool {
1029        self.ms != other.ms || self.seq != other.seq
1030    }
1031}
1032
1033impl PartialOrd for ID {
1034    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1035        Some(self.cmp(other))
1036    }
1037
1038    fn lt(&self, other: &Self) -> bool {
1039        match self.cmp(other) {
1040            cmp::Ordering::Less => true,
1041            cmp::Ordering::Equal => false,
1042            cmp::Ordering::Greater => false,
1043        }
1044    }
1045
1046    fn le(&self, other: &Self) -> bool {
1047        match self.cmp(other) {
1048            cmp::Ordering::Less => true,
1049            cmp::Ordering::Equal => true,
1050            cmp::Ordering::Greater => false,
1051        }
1052    }
1053
1054    fn gt(&self, other: &Self) -> bool {
1055        match self.cmp(other) {
1056            cmp::Ordering::Less => false,
1057            cmp::Ordering::Equal => false,
1058            cmp::Ordering::Greater => true,
1059        }
1060    }
1061
1062    fn ge(&self, other: &Self) -> bool {
1063        match self.cmp(other) {
1064            cmp::Ordering::Less => false,
1065            cmp::Ordering::Equal => true,
1066            cmp::Ordering::Greater => true,
1067        }
1068    }
1069}
1070
1071impl Ord for ID {
1072    fn cmp(&self, other: &Self) -> cmp::Ordering {
1073        let order = self.ms.cmp(&other.ms);
1074        if order == cmp::Ordering::Equal {
1075            self.seq.cmp(&other.seq)
1076        } else {
1077            order
1078        }
1079    }
1080}
1081
1082#[derive(Debug)]
1083pub struct Entry {
1084    pub id: ID,
1085    pub deleted: bool,
1086    pub fields: BTreeMap<Vec<u8>, Vec<u8>>,
1087}
1088
1089#[derive(Debug)]
1090pub struct Group {
1091    pub name: Vec<u8>,
1092    pub last_id: ID,
1093}
1094
1095/// Map object types to RDB object types.
1096///
1097pub(crate) const RDB_TYPE_STRING: u8 = 0;
1098pub(crate) const RDB_TYPE_LIST: u8 = 1;
1099pub(crate) const RDB_TYPE_SET: u8 = 2;
1100pub(crate) const RDB_TYPE_ZSET: u8 = 3;
1101pub(crate) const RDB_TYPE_HASH: u8 = 4;
1102/// ZSET version 2 with doubles stored in binary.
1103pub(crate) const RDB_TYPE_ZSET_2: u8 = 5;
1104pub(crate) const RDB_TYPE_MODULE: u8 = 6;
1105/// Module value with annotations for parsing without
1106/// the generating module being loaded.
1107pub(crate) const RDB_TYPE_MODULE_2: u8 = 7;
1108
1109/// Object types for encoded objects.
1110///
1111pub(crate) const RDB_TYPE_HASH_ZIPMAP: u8 = 9;
1112pub(crate) const RDB_TYPE_LIST_ZIPLIST: u8 = 10;
1113pub(crate) const RDB_TYPE_SET_INTSET: u8 = 11;
1114pub(crate) const RDB_TYPE_ZSET_ZIPLIST: u8 = 12;
1115pub(crate) const RDB_TYPE_HASH_ZIPLIST: u8 = 13;
1116pub(crate) const RDB_TYPE_LIST_QUICKLIST: u8 = 14;
1117pub(crate) const RDB_TYPE_STREAM_LISTPACKS: u8 = 15;
1118
1119/// Special RDB opcodes
1120///
1121// Module auxiliary data.
1122pub(crate) const RDB_OPCODE_MODULE_AUX: u8 = 247;
1123// LRU idle time.
1124pub(crate) const RDB_OPCODE_IDLE: u8 = 248;
1125// LFU frequency.
1126pub(crate) const RDB_OPCODE_FREQ: u8 = 249;
1127// RDB aux field.
1128pub(crate) const RDB_OPCODE_AUX: u8 = 250;
1129// Hash table resize hint.
1130pub(crate) const RDB_OPCODE_RESIZEDB: u8 = 251;
1131// Expire time in milliseconds.
1132pub(crate) const RDB_OPCODE_EXPIRETIME_MS: u8 = 252;
1133// Old expire time in seconds.
1134pub(crate) const RDB_OPCODE_EXPIRETIME: u8 = 253;
1135// DB number of the following keys.
1136pub(crate) const RDB_OPCODE_SELECTDB: u8 = 254;
1137// End of the RDB file.
1138pub(crate) const RDB_OPCODE_EOF: u8 = 255;
1139
1140pub(crate) const RDB_MODULE_OPCODE_EOF: isize = 0;
1141
1142pub(crate) const RDB_MODULE_OPCODE_SINT: isize = 1;
1143pub(crate) const RDB_MODULE_OPCODE_UINT: isize = 2;
1144pub(crate) const RDB_MODULE_OPCODE_STRING: isize = 5;
1145pub(crate) const RDB_MODULE_OPCODE_FLOAT: isize = 3;
1146pub(crate) const RDB_MODULE_OPCODE_DOUBLE: isize = 4;
1147
1148pub(crate) const ZIP_INT_8BIT: u8 = 254;
1149pub(crate) const ZIP_INT_16BIT: u8 = 192;
1150pub(crate) const ZIP_INT_24BIT: u8 = 240;
1151pub(crate) const ZIP_INT_32BIT: u8 = 208;
1152pub(crate) const ZIP_INT_64BIT: u8 = 224;
1153
1154/// Defines related to the dump file format. To store 32 bits lengths for short
1155/// keys requires a lot of space, so we check the most significant 2 bits of
1156/// the first byte to interpreter the length:
1157///
1158/// 00|XXXXXX => if the two MSB are 00 the len is the 6 bits of this byte
1159/// 01|XXXXXX XXXXXXXX =>  01, the len is 14 byes, 6 bits + 8 bits of next byte
1160/// 10|000000 [32 bit integer] => A full 32 bit len in net byte order will follow
1161/// 10|000001 [64 bit integer] => A full 64 bit len in net byte order will follow
1162/// 11|OBKIND this means: specially encoded object will follow. The six bits
1163///           number specify the kind of object that follows.
1164///           See the RDB_ENC_* defines.
1165///
1166/// Lengths up to 63 are stored using a single byte, most DB keys, and may
1167/// values, will fit inside.
1168pub(crate) const RDB_ENCVAL: u8 = 3;
1169pub(crate) const RDB_6BITLEN: u8 = 0;
1170pub(crate) const RDB_14BITLEN: u8 = 1;
1171pub(crate) const RDB_32BITLEN: u8 = 0x80;
1172pub(crate) const RDB_64BITLEN: u8 = 0x81;
1173
1174/// When a length of a string object stored on disk has the first two bits
1175/// set, the remaining six bits specify a special encoding for the object
1176/// accordingly to the following defines:
1177///
1178/// 8 bit signed integer
1179pub(crate) const RDB_ENC_INT8: isize = 0;
1180/// 16 bit signed integer
1181pub(crate) const RDB_ENC_INT16: isize = 1;
1182/// 32 bit signed integer
1183pub(crate) const RDB_ENC_INT32: isize = 2;
1184/// string compressed with FASTLZ
1185pub(crate) const RDB_ENC_LZF: isize = 3;
1186pub(crate) const BATCH_SIZE: usize = 64;
1187
1188pub(crate) const MODULE_SET: [char; 64] = [
1189    'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W',
1190    'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
1191    'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_',
1192];