1use core::result;
5use std::any::Any;
6use std::cmp;
7use std::collections::BTreeMap;
8use std::fmt::{Debug, Error, Formatter};
9use std::io::{Cursor, Read, Result};
10use std::sync::atomic::{AtomicBool, Ordering};
11
12use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
13use log::info;
14
15use crate::cmd::connection::SELECT;
16use crate::cmd::Command;
17use crate::iter::{IntSetIter, Iter, QuickListIter, SortedSetIter, StrValIter, ZipListIter, ZipMapIter};
18use crate::{lzf, to_string, Event, EventHandler, ModuleParser, RDBParser};
19use std::cell::RefCell;
20use std::f64::{INFINITY, NAN, NEG_INFINITY};
21use std::iter::FromIterator;
22use std::rc::Rc;
23use std::str::FromStr;
24use std::sync::Arc;
25
26pub trait RDBDecode: Read {
28 fn read_length(&mut self) -> Result<(isize, bool)> {
30 let byte = self.read_u8()?;
31 let _type = (byte & 0xC0) >> 6;
32
33 let mut result = -1;
34 let mut is_encoded = false;
35
36 if _type == RDB_ENCVAL {
37 result = (byte & 0x3F) as isize;
38 is_encoded = true;
39 } else if _type == RDB_6BITLEN {
40 result = (byte & 0x3F) as isize;
41 } else if _type == RDB_14BITLEN {
42 let next_byte = self.read_u8()?;
43 result = (((byte as u16 & 0x3F) << 8) | next_byte as u16) as isize;
44 } else if byte == RDB_32BITLEN {
45 result = self.read_integer(4, true)?;
46 } else if byte == RDB_64BITLEN {
47 result = self.read_integer(8, true)?;
48 };
49 Ok((result, is_encoded))
50 }
51
52 fn read_integer(&mut self, size: isize, is_big_endian: bool) -> Result<isize> {
54 let mut buff = vec![0; size as usize];
55 self.read_exact(&mut buff)?;
56 let mut cursor = Cursor::new(&buff);
57
58 if is_big_endian {
59 if size == 2 {
60 return Ok(cursor.read_i16::<BigEndian>()? as isize);
61 } else if size == 4 {
62 return Ok(cursor.read_i32::<BigEndian>()? as isize);
63 } else if size == 8 {
64 return Ok(cursor.read_i64::<BigEndian>()? as isize);
65 };
66 } else {
67 if size == 2 {
68 return Ok(cursor.read_i16::<LittleEndian>()? as isize);
69 } else if size == 4 {
70 return Ok(cursor.read_i32::<LittleEndian>()? as isize);
71 } else if size == 8 {
72 return Ok(cursor.read_i64::<LittleEndian>()? as isize);
73 };
74 }
75 panic!("Invalid integer size: {}", size)
76 }
77
78 fn read_string(&mut self) -> Result<Vec<u8>> {
80 let (length, is_encoded) = self.read_length()?;
81 if is_encoded {
82 match length {
83 RDB_ENC_INT8 => {
84 let int = self.read_i8()?;
85 return Ok(int.to_string().into_bytes());
86 }
87 RDB_ENC_INT16 => {
88 let int = self.read_integer(2, false)?;
89 return Ok(int.to_string().into_bytes());
90 }
91 RDB_ENC_INT32 => {
92 let int = self.read_integer(4, false)?;
93 return Ok(int.to_string().into_bytes());
94 }
95 RDB_ENC_LZF => {
96 let (compressed_len, _) = self.read_length()?;
97 let (origin_len, _) = self.read_length()?;
98 let mut compressed = vec![0; compressed_len as usize];
99 self.read_exact(&mut compressed)?;
100 let mut origin = vec![0; origin_len as usize];
101 lzf::decompress(&mut compressed, compressed_len, &mut origin, origin_len);
102 return Ok(origin);
103 }
104 _ => panic!("Invalid string length: {}", length),
105 };
106 };
107 let mut buff = vec![0; length as usize];
108 self.read_exact(&mut buff)?;
109 Ok(buff)
110 }
111
112 fn read_double(&mut self) -> Result<f64> {
114 let len = self.read_u8()?;
115 return match len {
116 255 => Ok(NEG_INFINITY),
117 254 => Ok(INFINITY),
118 253 => Ok(NAN),
119 _ => {
120 let mut buff = vec![0; len as usize];
121 self.read_exact(&mut buff)?;
122 let score_str = to_string(buff);
123 let score = score_str.parse::<f64>().unwrap();
124 Ok(score)
125 }
126 };
127 }
128}
129
130impl<R: Read + ?Sized> RDBDecode for R {}
131
132pub(crate) struct DefaultRDBParser {
133 pub(crate) running: Arc<AtomicBool>,
134 pub(crate) module_parser: Option<Rc<RefCell<dyn ModuleParser>>>,
135}
136
137impl RDBParser for DefaultRDBParser {
138 fn parse(&mut self, input: &mut dyn Read, _: i64, event_handler: &mut dyn EventHandler) -> Result<()> {
139 event_handler.handle(Event::RDB(Object::BOR));
140 let mut bytes = vec![0; 5];
141 input.read_exact(&mut bytes)?;
143 input.read_exact(&mut bytes[..=3])?;
145 let rdb_version = String::from_utf8_lossy(&bytes[..=3]);
146 let rdb_version = rdb_version.parse::<isize>().unwrap();
147 let mut db = 0;
148
149 while self.running.load(Ordering::Relaxed) {
150 let mut meta = Meta {
151 db,
152 expire: None,
153 evict: None,
154 };
155
156 let data_type = input.read_u8()?;
157 match data_type {
158 RDB_OPCODE_AUX => {
159 let field_name = input.read_string()?;
160 let field_val = input.read_string()?;
161 let field_name = to_string(field_name);
162 let field_val = to_string(field_val);
163 info!("{}:{}", field_name, field_val);
164 }
165 RDB_OPCODE_SELECTDB => {
166 let (_db, _) = input.read_length()?;
167 meta.db = _db;
168 db = _db;
169 let cmd = SELECT { db: _db as i32 };
170 event_handler.handle(Event::AOF(Command::SELECT(&cmd)));
171 }
172 RDB_OPCODE_RESIZEDB => {
173 let (total, _) = input.read_length()?;
174 info!("db[{}] total keys: {}", db, total);
175 let (expired, _) = input.read_length()?;
176 info!("db[{}] expired keys: {}", db, expired);
177 }
178 RDB_OPCODE_EXPIRETIME | RDB_OPCODE_EXPIRETIME_MS => {
179 if data_type == RDB_OPCODE_EXPIRETIME_MS {
180 let expired_time = input.read_integer(8, false)?;
181 meta.expire = Option::Some((ExpireType::Millisecond, expired_time as i64));
182 } else {
183 let expired_time = input.read_integer(4, false)?;
184 meta.expire = Option::Some((ExpireType::Second, expired_time as i64));
185 }
186 let value_type = input.read_u8()?;
187 match value_type {
188 RDB_OPCODE_FREQ => {
189 let val = input.read_u8()?;
190 let value_type = input.read_u8()?;
191 meta.evict = Option::Some((EvictType::LFU, val as i64));
192 self.read_object(input, value_type, event_handler, &meta)?;
193 }
194 RDB_OPCODE_IDLE => {
195 let (val, _) = input.read_length()?;
196 let value_type = input.read_u8()?;
197 meta.evict = Option::Some((EvictType::LRU, val as i64));
198 self.read_object(input, value_type, event_handler, &meta)?;
199 }
200 _ => {
201 self.read_object(input, value_type, event_handler, &meta)?;
202 }
203 }
204 }
205 RDB_OPCODE_FREQ => {
206 let val = input.read_u8()?;
207 let value_type = input.read_u8()?;
208 meta.evict = Option::Some((EvictType::LFU, val as i64));
209 self.read_object(input, value_type, event_handler, &meta)?;
210 }
211 RDB_OPCODE_IDLE => {
212 let (val, _) = input.read_length()?;
213 meta.evict = Option::Some((EvictType::LRU, val as i64));
214 let value_type = input.read_u8()?;
215 self.read_object(input, value_type, event_handler, &meta)?;
216 }
217 RDB_OPCODE_MODULE_AUX => {
218 input.read_length()?;
219 self.rdb_load_check_module_value(input)?;
220 }
221 RDB_OPCODE_EOF => {
222 if rdb_version >= 5 {
223 input.read_integer(8, true)?;
224 }
225 break;
226 }
227 _ => {
228 self.read_object(input, data_type, event_handler, &meta)?;
229 }
230 };
231 }
232 event_handler.handle(Event::RDB(Object::EOR));
233 Ok(())
234 }
235}
236
237impl DefaultRDBParser {
238 fn read_object(
240 &mut self, input: &mut dyn Read, value_type: u8, event_handler: &mut dyn EventHandler, meta: &Meta,
241 ) -> Result<()> {
242 match value_type {
243 RDB_TYPE_STRING => {
244 let key = input.read_string()?;
245 let value = input.read_string()?;
246 event_handler.handle(Event::RDB(Object::String(KeyValue {
247 key: &key,
248 value: &value,
249 meta,
250 })));
251 }
252 RDB_TYPE_LIST | RDB_TYPE_SET => {
253 let key = input.read_string()?;
254 let (count, _) = input.read_length()?;
255 let mut iter = StrValIter { count, input };
256
257 let mut has_more = true;
258 while has_more {
259 let mut val = Vec::new();
260 for _ in 0..BATCH_SIZE {
261 if let Ok(next_val) = iter.next() {
262 val.push(next_val);
263 } else {
264 has_more = false;
265 break;
266 }
267 }
268 if !val.is_empty() {
269 if value_type == RDB_TYPE_LIST {
270 event_handler.handle(Event::RDB(Object::List(List {
271 key: &key,
272 values: &val,
273 meta,
274 })));
275 } else {
276 event_handler.handle(Event::RDB(Object::Set(Set {
277 key: &key,
278 members: &val,
279 meta,
280 })));
281 }
282 }
283 }
284 }
285 RDB_TYPE_ZSET => {
286 let key = input.read_string()?;
287 let (count, _) = input.read_length()?;
288 let mut iter = SortedSetIter { count, v: 1, input };
289
290 let mut has_more = true;
291 while has_more {
292 let mut val = Vec::new();
293 for _ in 0..BATCH_SIZE {
294 if let Ok(next_val) = iter.next() {
295 val.push(next_val);
296 } else {
297 has_more = false;
298 break;
299 }
300 }
301 if !val.is_empty() {
302 event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
303 key: &key,
304 items: &val,
305 meta,
306 })));
307 }
308 }
309 }
310 RDB_TYPE_ZSET_2 => {
311 let key = input.read_string()?;
312 let (count, _) = input.read_length()?;
313 let mut iter = SortedSetIter { count, v: 2, input };
314
315 let mut has_more = true;
316 while has_more {
317 let mut val = Vec::new();
318 for _ in 0..BATCH_SIZE {
319 if let Ok(next_val) = iter.next() {
320 val.push(next_val);
321 } else {
322 has_more = false;
323 break;
324 }
325 }
326 if !val.is_empty() {
327 event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
328 key: &key,
329 items: &val,
330 meta,
331 })));
332 }
333 }
334 }
335 RDB_TYPE_HASH => {
336 let key = input.read_string()?;
337 let (count, _) = input.read_length()?;
338 let mut iter = StrValIter {
339 count: count * 2,
340 input,
341 };
342
343 let mut has_more = true;
344 while has_more {
345 let mut val = Vec::new();
346 for _ in 0..BATCH_SIZE {
347 let name;
348 let value;
349 if let Ok(next_val) = iter.next() {
350 name = next_val;
351 value = iter.next().expect("missing hash field value");
352 val.push(Field { name, value });
353 } else {
354 has_more = false;
355 break;
356 }
357 }
358 if !val.is_empty() {
359 event_handler.handle(Event::RDB(Object::Hash(Hash {
360 key: &key,
361 fields: &val,
362 meta,
363 })));
364 }
365 }
366 }
367 RDB_TYPE_HASH_ZIPMAP => {
368 let key = input.read_string()?;
369 let bytes = input.read_string()?;
370 let cursor = &mut Cursor::new(&bytes);
371 cursor.set_position(1);
372 let mut iter = ZipMapIter { has_more: true, cursor };
373
374 let mut has_more = true;
375 while has_more {
376 let mut fields = Vec::new();
377 for _ in 0..BATCH_SIZE {
378 if let Ok(field) = iter.next() {
379 fields.push(field);
380 } else {
381 has_more = false;
382 break;
383 }
384 }
385 if !fields.is_empty() {
386 event_handler.handle(Event::RDB(Object::Hash(Hash {
387 key: &key,
388 fields: &fields,
389 meta,
390 })));
391 }
392 }
393 }
394 RDB_TYPE_LIST_ZIPLIST => {
395 let key = input.read_string()?;
396 let bytes = input.read_string()?;
397 let cursor = &mut Cursor::new(bytes);
398 cursor.set_position(8);
400 let count = cursor.read_u16::<LittleEndian>()? as isize;
401 let mut iter = ZipListIter { count, cursor };
402
403 let mut has_more = true;
404 while has_more {
405 let mut val = Vec::new();
406 for _ in 0..BATCH_SIZE {
407 if let Ok(next_val) = iter.next() {
408 val.push(next_val);
409 } else {
410 has_more = false;
411 break;
412 }
413 }
414 if !val.is_empty() {
415 event_handler.handle(Event::RDB(Object::List(List {
416 key: &key,
417 values: &val,
418 meta,
419 })));
420 }
421 }
422 }
423 RDB_TYPE_HASH_ZIPLIST => {
424 let key = input.read_string()?;
425 let bytes = input.read_string()?;
426 let cursor = &mut Cursor::new(bytes);
427 cursor.set_position(8);
429 let count = cursor.read_u16::<LittleEndian>()? as isize;
430 let mut iter = ZipListIter { count, cursor };
431
432 let mut has_more = true;
433 while has_more {
434 let mut val = Vec::new();
435 for _ in 0..BATCH_SIZE {
436 let name;
437 let value;
438 if let Ok(next_val) = iter.next() {
439 name = next_val;
440 value = iter.next().expect("missing hash field value");
441 val.push(Field { name, value });
442 } else {
443 has_more = false;
444 break;
445 }
446 }
447 if !val.is_empty() {
448 event_handler.handle(Event::RDB(Object::Hash(Hash {
449 key: &key,
450 fields: &val,
451 meta,
452 })));
453 }
454 }
455 }
456 RDB_TYPE_ZSET_ZIPLIST => {
457 let key = input.read_string()?;
458 let bytes = input.read_string()?;
459 let cursor = &mut Cursor::new(bytes);
460 cursor.set_position(8);
462 let count = cursor.read_u16::<LittleEndian>()? as isize;
463 let mut iter = ZipListIter { count, cursor };
464
465 let mut has_more = true;
466 while has_more {
467 let mut val = Vec::new();
468 for _ in 0..BATCH_SIZE {
469 let member;
470 let score: f64;
471 if let Ok(next_val) = iter.next() {
472 member = next_val;
473 let score_str = to_string(iter.next().expect("missing sorted set element's score"));
474 score = score_str.parse::<f64>().unwrap();
475 val.push(Item { member, score });
476 } else {
477 has_more = false;
478 break;
479 }
480 }
481 if !val.is_empty() {
482 event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
483 key: &key,
484 items: &val,
485 meta,
486 })));
487 }
488 }
489 }
490 RDB_TYPE_SET_INTSET => {
491 let key = input.read_string()?;
492 let bytes = input.read_string()?;
493 let mut cursor = Cursor::new(&bytes);
494 let encoding = cursor.read_i32::<LittleEndian>()?;
495 let length = cursor.read_u32::<LittleEndian>()?;
496 let mut iter = IntSetIter {
497 encoding,
498 count: length as isize,
499 cursor: &mut cursor,
500 };
501
502 let mut has_more = true;
503 while has_more {
504 let mut val = Vec::new();
505 for _ in 0..BATCH_SIZE {
506 if let Ok(next_val) = iter.next() {
507 val.push(next_val);
508 } else {
509 has_more = false;
510 break;
511 }
512 }
513 if !val.is_empty() {
514 event_handler.handle(Event::RDB(Object::Set(Set {
515 key: &key,
516 members: &val,
517 meta,
518 })));
519 }
520 }
521 }
522 RDB_TYPE_LIST_QUICKLIST => {
523 let key = input.read_string()?;
524 let (count, _) = input.read_length()?;
525 let mut iter = QuickListIter {
526 len: -1,
527 count,
528 input,
529 cursor: Option::None,
530 };
531
532 let mut has_more = true;
533 while has_more {
534 let mut val = Vec::new();
535 for _ in 0..BATCH_SIZE {
536 if let Ok(next_val) = iter.next() {
537 val.push(next_val);
538 } else {
539 has_more = false;
540 break;
541 }
542 }
543 if !val.is_empty() {
544 event_handler.handle(Event::RDB(Object::List(List {
545 key: &key,
546 values: &val,
547 meta,
548 })));
549 }
550 }
551 }
552 RDB_TYPE_MODULE | RDB_TYPE_MODULE_2 => {
553 let key = input.read_string()?;
554 let (module_id, _) = input.read_length()?;
555 let module_id = module_id as usize;
556 let mut array: [char; 9] = [' '; 9];
557 for i in 0..array.len() {
558 let i1 = 10 + (array.len() - 1 - i) * 6;
559 let i2 = (module_id >> i1 as usize) as usize;
560 let i3 = i2 & 63;
561 let chr = MODULE_SET.get(i3).unwrap();
562 array[i] = *chr;
563 }
564 let module_name: String = String::from_iter(array.iter());
565 let module_version: usize = module_id & 1023;
566 if self.module_parser.is_none() && value_type == RDB_TYPE_MODULE {
567 panic!("MODULE {}, version {} 无法解析", module_name, module_version);
568 }
569 if let Some(parser) = &mut self.module_parser {
570 let module: Box<dyn Module>;
571 if value_type == RDB_TYPE_MODULE_2 {
572 module = parser.borrow_mut().parse(input, &module_name, 2);
573 let (len, _) = input.read_length()?;
574 if len != 0 {
575 panic!(
576 "module '{}' that is not terminated by EOF marker, but {}",
577 &module_name, len
578 );
579 }
580 } else {
581 module = parser.borrow_mut().parse(input, &module_name, module_version);
582 }
583 event_handler.handle(Event::RDB(Object::Module(key, module, meta)));
584 } else {
585 self.rdb_load_check_module_value(input)?;
587 }
588 }
589 RDB_TYPE_STREAM_LISTPACKS => {
590 let key = input.read_string()?;
591 let stream = self.read_stream_list_packs(meta, input)?;
592 event_handler.handle(Event::RDB(Object::Stream(key, stream)));
593 }
594 _ => panic!("unknown data type: {}", value_type),
595 }
596 Ok(())
597 }
598
599 fn rdb_load_check_module_value(&mut self, input: &mut dyn Read) -> Result<()> {
600 loop {
601 let (op_code, _) = input.read_length()?;
602 if op_code == RDB_MODULE_OPCODE_EOF {
603 break;
604 }
605 if op_code == RDB_MODULE_OPCODE_SINT || op_code == RDB_MODULE_OPCODE_UINT {
606 input.read_length()?;
607 } else if op_code == RDB_MODULE_OPCODE_STRING {
608 input.read_string()?;
609 } else if op_code == RDB_MODULE_OPCODE_FLOAT {
610 input.read_exact(&mut [0; 4])?;
611 } else if op_code == RDB_MODULE_OPCODE_DOUBLE {
612 input.read_exact(&mut [0; 8])?;
613 }
614 }
615 Ok(())
616 }
617
618 fn read_stream_list_packs<'a>(&mut self, meta: &'a Meta, input: &mut dyn Read) -> Result<Stream<'a>> {
619 let mut entries: BTreeMap<ID, Entry> = BTreeMap::new();
620 let (length, _) = input.read_length()?;
621 for _ in 0..length {
622 let raw_id = input.read_string()?;
623 let mut cursor = Cursor::new(&raw_id);
624 let ms = read_long(&mut cursor, 8, false)?;
625 let seq = read_long(&mut cursor, 8, false)?;
626 let base_id = ID { ms, seq };
627 let raw_list_packs = input.read_string()?;
628 let mut list_pack = Cursor::new(&raw_list_packs);
629 list_pack.set_position(6);
630 let count = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
631 let deleted = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
632 let num_fields = i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
633 let mut tmp_fields = Vec::with_capacity(num_fields as usize);
634 for _ in 0..num_fields {
635 tmp_fields.push(read_list_pack_entry(&mut list_pack)?);
636 }
637 read_list_pack_entry(&mut list_pack)?;
638
639 let total = count + deleted;
640 for _ in 0..total {
641 let mut fields = BTreeMap::new();
642 let flag = i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
643 let ms = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
644 let seq = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
645 let id = ID {
646 ms: ms + base_id.ms,
647 seq: seq + base_id.seq,
648 };
649 let deleted = (flag & 1) != 0;
650 if (flag & 2) != 0 {
651 for i in 0..num_fields {
652 let value = read_list_pack_entry(&mut list_pack)?;
653 let field = tmp_fields.get(i as usize).unwrap().to_vec();
654 fields.insert(field, value);
655 }
656 entries.insert(id, Entry { id, deleted, fields });
657 } else {
658 let num_fields = i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
659 for _ in 0..num_fields {
660 let field = read_list_pack_entry(&mut list_pack)?;
661 let value = read_list_pack_entry(&mut list_pack)?;
662 fields.insert(field, value);
663 }
664 entries.insert(id, Entry { id, deleted, fields });
665 }
666 read_list_pack_entry(&mut list_pack)?;
667 }
668 let end = list_pack.read_u8()?;
669 if end != 255 {
670 panic!("listpack expect 255 but {}", end);
671 }
672 }
673 input.read_length()?;
674 input.read_length()?;
675 input.read_length()?;
676
677 let mut groups: Vec<Group> = Vec::new();
678 let (count, _) = input.read_length()?;
679 for _ in 0..count {
680 let name = input.read_string()?;
681 let (ms, _) = input.read_length()?;
682 let (seq, _) = input.read_length()?;
683 let group_last_id = ID {
684 ms: ms as i64,
685 seq: seq as i64,
686 };
687 groups.push(Group {
688 name,
689 last_id: group_last_id,
690 });
691
692 let (global_pel, _) = input.read_length()?;
693 for _ in 0..global_pel {
694 read_long(input, 8, false)?;
695 read_long(input, 8, false)?;
696 input.read_integer(8, false)?;
697 input.read_length()?;
698 }
699
700 let (consumer_count, _) = input.read_length()?;
701 for _ in 0..consumer_count {
702 input.read_string()?;
703 input.read_integer(8, false)?;
704
705 let (pel, _) = input.read_length()?;
706 for _ in 0..pel {
707 read_long(input, 8, false)?;
708 read_long(input, 8, false)?;
709 }
710 }
711 }
712 Ok(Stream { entries, groups, meta })
713 }
714}
715
716fn read_long(input: &mut dyn Read, length: i32, little_endian: bool) -> Result<i64> {
717 let mut r: i64 = 0;
718 for i in 0..length {
719 let v: i64 = input.read_u8()? as i64;
720 if little_endian {
721 r |= v << (i << 3) as i64;
722 } else {
723 r = (r << 8) | v;
724 }
725 }
726 Ok(r)
727}
728
729fn read_list_pack_entry(input: &mut dyn Read) -> Result<Vec<u8>> {
730 let special = input.read_u8()? as i32;
731 let skip: i32;
732 let mut bytes;
733 if (special & 0x80) == 0 {
734 skip = 1;
735 let value = special & 0x7F;
736 let value = value.to_string();
737 bytes = value.into_bytes();
738 } else if (special & 0xC0) == 0x80 {
739 let len = special & 0x3F;
740 skip = 1 + len as i32;
741 bytes = vec![0; len as usize];
742 input.read_exact(&mut bytes)?;
743 } else if (special & 0xE0) == 0xC0 {
744 skip = 2;
745 let next = input.read_u8()?;
746 let value = (((special & 0x1F) << 8) | next as i32) << 19 >> 19;
747 let value = value.to_string();
748 bytes = value.into_bytes();
749 } else if (special & 0xFF) == 0xF1 {
750 skip = 3;
751 let value = input.read_i16::<LittleEndian>()?;
752 let value = value.to_string();
753 bytes = value.into_bytes();
754 } else if (special & 0xFF) == 0xF2 {
755 skip = 4;
756 let value = input.read_i24::<LittleEndian>()?;
757 let value = value.to_string();
758 bytes = value.into_bytes();
759 } else if (special & 0xFF) == 0xF3 {
760 skip = 5;
761 let value = input.read_i32::<LittleEndian>()?;
762 let value = value.to_string();
763 bytes = value.into_bytes();
764 } else if (special & 0xFF) == 0xF4 {
765 skip = 9;
766 let value = input.read_i64::<LittleEndian>()?;
767 let value = value.to_string();
768 bytes = value.into_bytes();
769 } else if (special & 0xF0) == 0xE0 {
770 let next = input.read_u8()?;
771 let len = ((special & 0x0F) << 8) | next as i32;
772 skip = 2 + len as i32;
773 bytes = vec![0; len as usize];
774 input.read_exact(&mut bytes)?;
775 } else if (special & 0xFF) == 0xF0 {
776 let len = input.read_u32::<BigEndian>()?;
777 skip = 5 + len as i32;
778 bytes = vec![0; len as usize];
779 input.read_exact(&mut bytes)?;
780 } else {
781 panic!("{}", special)
782 }
783 if skip <= 127 {
784 let mut buf = vec![0; 1];
785 input.read_exact(&mut buf)?;
786 } else if skip < 16383 {
787 let mut buf = vec![0; 2];
788 input.read_exact(&mut buf)?;
789 } else if skip < 2097151 {
790 let mut buf = vec![0; 3];
791 input.read_exact(&mut buf)?;
792 } else if skip < 268435455 {
793 let mut buf = vec![0; 4];
794 input.read_exact(&mut buf)?;
795 } else {
796 let mut buf = vec![0; 5];
797 input.read_exact(&mut buf)?;
798 }
799 Ok(bytes)
800}
801
802pub(crate) fn read_zm_len(cursor: &mut Cursor<&Vec<u8>>) -> Result<usize> {
803 let len = cursor.read_u8()?;
804 if len <= 253 {
805 return Ok(len as usize);
806 } else if len == 254 {
807 let value = cursor.read_u32::<BigEndian>()?;
808 return Ok(value as usize);
809 }
810 Ok(len as usize)
811}
812
813pub(crate) fn read_zip_list_entry(cursor: &mut Cursor<Vec<u8>>) -> Result<Vec<u8>> {
814 if cursor.read_u8()? >= 254 {
815 cursor.read_u32::<LittleEndian>()?;
816 }
817 let flag = cursor.read_u8()?;
818 match flag >> 6 {
819 0 => {
820 let length = flag & 0x3F;
821 let mut buff = vec![0; length as usize];
822 cursor.read_exact(&mut buff)?;
823 return Ok(buff);
824 }
825 1 => {
826 let next_byte = cursor.read_u8()?;
827 let length = (((flag as u16) & 0x3F) << 8) | (next_byte as u16);
828 let mut buff = vec![0; length as usize];
829 cursor.read_exact(&mut buff)?;
830 return Ok(buff);
831 }
832 2 => {
833 let length = cursor.read_u32::<BigEndian>()?;
834 let mut buff = vec![0; length as usize];
835 cursor.read_exact(&mut buff)?;
836 return Ok(buff);
837 }
838 _ => {}
839 }
840 return match flag {
841 ZIP_INT_8BIT => {
842 let int = cursor.read_i8()?;
843 Ok(int.to_string().into_bytes())
844 }
845 ZIP_INT_16BIT => {
846 let int = cursor.read_i16::<LittleEndian>()?;
847 Ok(int.to_string().into_bytes())
848 }
849 ZIP_INT_24BIT => {
850 let int = cursor.read_i24::<LittleEndian>()?;
851 Ok(int.to_string().into_bytes())
852 }
853 ZIP_INT_32BIT => {
854 let int = cursor.read_i32::<LittleEndian>()?;
855 Ok(int.to_string().into_bytes())
856 }
857 ZIP_INT_64BIT => {
858 let int = cursor.read_i64::<LittleEndian>()?;
859 Ok(int.to_string().into_bytes())
860 }
861 _ => {
862 let result = (flag - 0xF1) as isize;
863 Ok(result.to_string().into_bytes())
864 }
865 };
866}
867
868#[derive(Debug)]
870pub enum Object<'a> {
871 String(KeyValue<'a>),
873 List(List<'a>),
875 Set(Set<'a>),
877 SortedSet(SortedSet<'a>),
879 Hash(Hash<'a>),
881 Module(Vec<u8>, Box<dyn Module>, &'a Meta),
883 Stream(Vec<u8>, Stream<'a>),
885 BOR,
887 EOR,
889}
890
891pub trait Module {
892 fn as_any(&self) -> &dyn Any;
893}
894
895impl Debug for dyn Module {
896 fn fmt(&self, _: &mut Formatter) -> result::Result<(), Error> {
897 unimplemented!()
898 }
899}
900
901#[derive(Debug)]
903pub struct Meta {
904 pub db: isize,
906 pub expire: Option<(ExpireType, i64)>,
908 pub evict: Option<(EvictType, i64)>,
910}
911
912#[derive(Debug)]
914pub enum ExpireType {
915 Second,
917 Millisecond,
919}
920
921#[derive(Debug)]
923pub enum EvictType {
924 LRU,
926 LFU,
928}
929
930#[derive(Debug)]
932pub struct KeyValue<'a> {
933 pub key: &'a [u8],
935 pub value: &'a [u8],
937 pub meta: &'a Meta,
939}
940
941#[derive(Debug)]
943pub struct List<'a> {
944 pub key: &'a [u8],
946 pub values: &'a [Vec<u8>],
948 pub meta: &'a Meta,
950}
951
952#[derive(Debug)]
954pub struct Set<'a> {
955 pub key: &'a [u8],
957 pub members: &'a [Vec<u8>],
959 pub meta: &'a Meta,
961}
962
963#[derive(Debug)]
965pub struct SortedSet<'a> {
966 pub key: &'a [u8],
968 pub items: &'a [Item],
970 pub meta: &'a Meta,
972}
973
974#[derive(Debug)]
976pub struct Item {
977 pub member: Vec<u8>,
979 pub score: f64,
981}
982
983#[derive(Debug)]
985pub struct Hash<'a> {
986 pub key: &'a [u8],
988 pub fields: &'a [Field],
990 pub meta: &'a Meta,
992}
993
994#[derive(Debug)]
996pub struct Field {
997 pub name: Vec<u8>,
999 pub value: Vec<u8>,
1001}
1002
1003#[derive(Debug)]
1004pub struct Stream<'a> {
1005 pub entries: BTreeMap<ID, Entry>,
1006 pub groups: Vec<Group>,
1007 pub meta: &'a Meta,
1009}
1010
1011#[derive(Debug, Eq, Copy, Clone)]
1012pub struct ID {
1013 pub ms: i64,
1014 pub seq: i64,
1015}
1016
1017impl ID {
1018 pub fn to_string(&self) -> String {
1019 format!("{}-{}", self.ms, self.seq)
1020 }
1021}
1022
1023impl PartialEq for ID {
1024 fn eq(&self, other: &Self) -> bool {
1025 self.ms == other.ms && self.seq == other.seq
1026 }
1027
1028 fn ne(&self, other: &Self) -> bool {
1029 self.ms != other.ms || self.seq != other.seq
1030 }
1031}
1032
1033impl PartialOrd for ID {
1034 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1035 Some(self.cmp(other))
1036 }
1037
1038 fn lt(&self, other: &Self) -> bool {
1039 match self.cmp(other) {
1040 cmp::Ordering::Less => true,
1041 cmp::Ordering::Equal => false,
1042 cmp::Ordering::Greater => false,
1043 }
1044 }
1045
1046 fn le(&self, other: &Self) -> bool {
1047 match self.cmp(other) {
1048 cmp::Ordering::Less => true,
1049 cmp::Ordering::Equal => true,
1050 cmp::Ordering::Greater => false,
1051 }
1052 }
1053
1054 fn gt(&self, other: &Self) -> bool {
1055 match self.cmp(other) {
1056 cmp::Ordering::Less => false,
1057 cmp::Ordering::Equal => false,
1058 cmp::Ordering::Greater => true,
1059 }
1060 }
1061
1062 fn ge(&self, other: &Self) -> bool {
1063 match self.cmp(other) {
1064 cmp::Ordering::Less => false,
1065 cmp::Ordering::Equal => true,
1066 cmp::Ordering::Greater => true,
1067 }
1068 }
1069}
1070
1071impl Ord for ID {
1072 fn cmp(&self, other: &Self) -> cmp::Ordering {
1073 let order = self.ms.cmp(&other.ms);
1074 if order == cmp::Ordering::Equal {
1075 self.seq.cmp(&other.seq)
1076 } else {
1077 order
1078 }
1079 }
1080}
1081
1082#[derive(Debug)]
1083pub struct Entry {
1084 pub id: ID,
1085 pub deleted: bool,
1086 pub fields: BTreeMap<Vec<u8>, Vec<u8>>,
1087}
1088
1089#[derive(Debug)]
1090pub struct Group {
1091 pub name: Vec<u8>,
1092 pub last_id: ID,
1093}
1094
1095pub(crate) const RDB_TYPE_STRING: u8 = 0;
1098pub(crate) const RDB_TYPE_LIST: u8 = 1;
1099pub(crate) const RDB_TYPE_SET: u8 = 2;
1100pub(crate) const RDB_TYPE_ZSET: u8 = 3;
1101pub(crate) const RDB_TYPE_HASH: u8 = 4;
1102pub(crate) const RDB_TYPE_ZSET_2: u8 = 5;
1104pub(crate) const RDB_TYPE_MODULE: u8 = 6;
1105pub(crate) const RDB_TYPE_MODULE_2: u8 = 7;
1108
1109pub(crate) const RDB_TYPE_HASH_ZIPMAP: u8 = 9;
1112pub(crate) const RDB_TYPE_LIST_ZIPLIST: u8 = 10;
1113pub(crate) const RDB_TYPE_SET_INTSET: u8 = 11;
1114pub(crate) const RDB_TYPE_ZSET_ZIPLIST: u8 = 12;
1115pub(crate) const RDB_TYPE_HASH_ZIPLIST: u8 = 13;
1116pub(crate) const RDB_TYPE_LIST_QUICKLIST: u8 = 14;
1117pub(crate) const RDB_TYPE_STREAM_LISTPACKS: u8 = 15;
1118
1119pub(crate) const RDB_OPCODE_MODULE_AUX: u8 = 247;
1123pub(crate) const RDB_OPCODE_IDLE: u8 = 248;
1125pub(crate) const RDB_OPCODE_FREQ: u8 = 249;
1127pub(crate) const RDB_OPCODE_AUX: u8 = 250;
1129pub(crate) const RDB_OPCODE_RESIZEDB: u8 = 251;
1131pub(crate) const RDB_OPCODE_EXPIRETIME_MS: u8 = 252;
1133pub(crate) const RDB_OPCODE_EXPIRETIME: u8 = 253;
1135pub(crate) const RDB_OPCODE_SELECTDB: u8 = 254;
1137pub(crate) const RDB_OPCODE_EOF: u8 = 255;
1139
1140pub(crate) const RDB_MODULE_OPCODE_EOF: isize = 0;
1141
1142pub(crate) const RDB_MODULE_OPCODE_SINT: isize = 1;
1143pub(crate) const RDB_MODULE_OPCODE_UINT: isize = 2;
1144pub(crate) const RDB_MODULE_OPCODE_STRING: isize = 5;
1145pub(crate) const RDB_MODULE_OPCODE_FLOAT: isize = 3;
1146pub(crate) const RDB_MODULE_OPCODE_DOUBLE: isize = 4;
1147
1148pub(crate) const ZIP_INT_8BIT: u8 = 254;
1149pub(crate) const ZIP_INT_16BIT: u8 = 192;
1150pub(crate) const ZIP_INT_24BIT: u8 = 240;
1151pub(crate) const ZIP_INT_32BIT: u8 = 208;
1152pub(crate) const ZIP_INT_64BIT: u8 = 224;
1153
1154pub(crate) const RDB_ENCVAL: u8 = 3;
1169pub(crate) const RDB_6BITLEN: u8 = 0;
1170pub(crate) const RDB_14BITLEN: u8 = 1;
1171pub(crate) const RDB_32BITLEN: u8 = 0x80;
1172pub(crate) const RDB_64BITLEN: u8 = 0x81;
1173
1174pub(crate) const RDB_ENC_INT8: isize = 0;
1180pub(crate) const RDB_ENC_INT16: isize = 1;
1182pub(crate) const RDB_ENC_INT32: isize = 2;
1184pub(crate) const RDB_ENC_LZF: isize = 3;
1186pub(crate) const BATCH_SIZE: usize = 64;
1187
1188pub(crate) const MODULE_SET: [char; 64] = [
1189 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W',
1190 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
1191 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_',
1192];