use core::result;
use std::any::Any;
use std::cmp;
use std::collections::BTreeMap;
use std::fmt::{Debug, Error, Formatter};
use std::io::{Cursor, Read, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
use log::info;
use crate::cmd::connection::SELECT;
use crate::cmd::Command;
use crate::iter::{
    IntSetIter, Iter, QuickListIter, SortedSetIter, StrValIter, ZipListIter, ZipMapIter,
};
use crate::{lzf, to_string, Event, EventHandler, ModuleParser, RDBParser};
use std::cell::RefCell;
use std::f64::{INFINITY, NAN, NEG_INFINITY};
use std::iter::FromIterator;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
pub trait RDBDecode: Read {
    
    fn read_length(&mut self) -> Result<(isize, bool)> {
        let byte = self.read_u8()?;
        let _type = (byte & 0xC0) >> 6;
        let mut result = -1;
        let mut is_encoded = false;
        if _type == RDB_ENCVAL {
            result = (byte & 0x3F) as isize;
            is_encoded = true;
        } else if _type == RDB_6BITLEN {
            result = (byte & 0x3F) as isize;
        } else if _type == RDB_14BITLEN {
            let next_byte = self.read_u8()?;
            result = (((byte as u16 & 0x3F) << 8) | next_byte as u16) as isize;
        } else if byte == RDB_32BITLEN {
            result = self.read_integer(4, true)?;
        } else if byte == RDB_64BITLEN {
            result = self.read_integer(8, true)?;
        };
        Ok((result, is_encoded))
    }
    
    fn read_integer(&mut self, size: isize, is_big_endian: bool) -> Result<isize> {
        let mut buff = vec![0; size as usize];
        self.read_exact(&mut buff)?;
        let mut cursor = Cursor::new(&buff);
        if is_big_endian {
            if size == 2 {
                return Ok(cursor.read_i16::<BigEndian>()? as isize);
            } else if size == 4 {
                return Ok(cursor.read_i32::<BigEndian>()? as isize);
            } else if size == 8 {
                return Ok(cursor.read_i64::<BigEndian>()? as isize);
            };
        } else {
            if size == 2 {
                return Ok(cursor.read_i16::<LittleEndian>()? as isize);
            } else if size == 4 {
                return Ok(cursor.read_i32::<LittleEndian>()? as isize);
            } else if size == 8 {
                return Ok(cursor.read_i64::<LittleEndian>()? as isize);
            };
        }
        panic!("Invalid integer size: {}", size)
    }
    
    fn read_string(&mut self) -> Result<Vec<u8>> {
        let (length, is_encoded) = self.read_length()?;
        if is_encoded {
            match length {
                RDB_ENC_INT8 => {
                    let int = self.read_i8()?;
                    return Ok(int.to_string().into_bytes());
                }
                RDB_ENC_INT16 => {
                    let int = self.read_integer(2, false)?;
                    return Ok(int.to_string().into_bytes());
                }
                RDB_ENC_INT32 => {
                    let int = self.read_integer(4, false)?;
                    return Ok(int.to_string().into_bytes());
                }
                RDB_ENC_LZF => {
                    let (compressed_len, _) = self.read_length()?;
                    let (origin_len, _) = self.read_length()?;
                    let mut compressed = vec![0; compressed_len as usize];
                    self.read_exact(&mut compressed)?;
                    let mut origin = vec![0; origin_len as usize];
                    lzf::decompress(&mut compressed, compressed_len, &mut origin, origin_len);
                    return Ok(origin);
                }
                _ => panic!("Invalid string length: {}", length),
            };
        };
        let mut buff = vec![0; length as usize];
        self.read_exact(&mut buff)?;
        Ok(buff)
    }
    
    fn read_double(&mut self) -> Result<f64> {
        let len = self.read_u8()?;
        return match len {
            255 => Ok(NEG_INFINITY),
            254 => Ok(INFINITY),
            253 => Ok(NAN),
            _ => {
                let mut buff = vec![0; len as usize];
                self.read_exact(&mut buff)?;
                let score_str = to_string(buff);
                let score = score_str.parse::<f64>().unwrap();
                Ok(score)
            }
        };
    }
}
impl<R: Read + ?Sized> RDBDecode for R {}
pub(crate) struct DefaultRDBParser {
    pub(crate) running: Arc<AtomicBool>,
    pub(crate) module_parser: Option<Rc<RefCell<dyn ModuleParser>>>,
}
impl RDBParser for DefaultRDBParser {
    fn parse(
        &mut self,
        input: &mut dyn Read,
        _: i64,
        event_handler: &mut dyn EventHandler,
    ) -> Result<()> {
        event_handler.handle(Event::RDB(Object::BOR));
        let mut bytes = vec![0; 5];
        
        input.read_exact(&mut bytes)?;
        
        input.read_exact(&mut bytes[..=3])?;
        let rdb_version = String::from_utf8_lossy(&bytes[..=3]);
        let rdb_version = rdb_version.parse::<isize>().unwrap();
        let mut db = 0;
        while self.running.load(Ordering::Relaxed) {
            let mut meta = Meta {
                db,
                expire: None,
                evict: None,
            };
            let data_type = input.read_u8()?;
            match data_type {
                RDB_OPCODE_AUX => {
                    let field_name = input.read_string()?;
                    let field_val = input.read_string()?;
                    let field_name = to_string(field_name);
                    let field_val = to_string(field_val);
                    info!("{}:{}", field_name, field_val);
                }
                RDB_OPCODE_SELECTDB => {
                    let (_db, _) = input.read_length()?;
                    meta.db = _db;
                    db = _db;
                    let cmd = SELECT { db: _db as i32 };
                    event_handler.handle(Event::AOF(Command::SELECT(&cmd)));
                }
                RDB_OPCODE_RESIZEDB => {
                    let (total, _) = input.read_length()?;
                    info!("db[{}] total keys: {}", db, total);
                    let (expired, _) = input.read_length()?;
                    info!("db[{}] expired keys: {}", db, expired);
                }
                RDB_OPCODE_EXPIRETIME | RDB_OPCODE_EXPIRETIME_MS => {
                    if data_type == RDB_OPCODE_EXPIRETIME_MS {
                        let expired_time = input.read_integer(8, false)?;
                        meta.expire = Option::Some((ExpireType::Millisecond, expired_time as i64));
                    } else {
                        let expired_time = input.read_integer(4, false)?;
                        meta.expire = Option::Some((ExpireType::Second, expired_time as i64));
                    }
                    let value_type = input.read_u8()?;
                    match value_type {
                        RDB_OPCODE_FREQ => {
                            let val = input.read_u8()?;
                            let value_type = input.read_u8()?;
                            meta.evict = Option::Some((EvictType::LFU, val as i64));
                            self.read_object(input, value_type, event_handler, &meta)?;
                        }
                        RDB_OPCODE_IDLE => {
                            let (val, _) = input.read_length()?;
                            let value_type = input.read_u8()?;
                            meta.evict = Option::Some((EvictType::LRU, val as i64));
                            self.read_object(input, value_type, event_handler, &meta)?;
                        }
                        _ => {
                            self.read_object(input, value_type, event_handler, &meta)?;
                        }
                    }
                }
                RDB_OPCODE_FREQ => {
                    let val = input.read_u8()?;
                    let value_type = input.read_u8()?;
                    meta.evict = Option::Some((EvictType::LFU, val as i64));
                    self.read_object(input, value_type, event_handler, &meta)?;
                }
                RDB_OPCODE_IDLE => {
                    let (val, _) = input.read_length()?;
                    meta.evict = Option::Some((EvictType::LRU, val as i64));
                    let value_type = input.read_u8()?;
                    self.read_object(input, value_type, event_handler, &meta)?;
                }
                RDB_OPCODE_MODULE_AUX => {
                    input.read_length()?;
                    self.rdb_load_check_module_value(input)?;
                }
                RDB_OPCODE_EOF => {
                    if rdb_version >= 5 {
                        input.read_integer(8, true)?;
                    }
                    break;
                }
                _ => {
                    self.read_object(input, data_type, event_handler, &meta)?;
                }
            };
        }
        event_handler.handle(Event::RDB(Object::EOR));
        Ok(())
    }
}
impl DefaultRDBParser {
    
    fn read_object(
        &mut self,
        input: &mut dyn Read,
        value_type: u8,
        event_handler: &mut dyn EventHandler,
        meta: &Meta,
    ) -> Result<()> {
        match value_type {
            RDB_TYPE_STRING => {
                let key = input.read_string()?;
                let value = input.read_string()?;
                event_handler.handle(Event::RDB(Object::String(KeyValue {
                    key: &key,
                    value: &value,
                    meta,
                })));
            }
            RDB_TYPE_LIST | RDB_TYPE_SET => {
                let key = input.read_string()?;
                let (count, _) = input.read_length()?;
                let mut iter = StrValIter { count, input };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        if let Ok(next_val) = iter.next() {
                            val.push(next_val);
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        if value_type == RDB_TYPE_LIST {
                            event_handler.handle(Event::RDB(Object::List(List {
                                key: &key,
                                values: &val,
                                meta,
                            })));
                        } else {
                            event_handler.handle(Event::RDB(Object::Set(Set {
                                key: &key,
                                members: &val,
                                meta,
                            })));
                        }
                    }
                }
            }
            RDB_TYPE_ZSET => {
                let key = input.read_string()?;
                let (count, _) = input.read_length()?;
                let mut iter = SortedSetIter { count, v: 1, input };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        if let Ok(next_val) = iter.next() {
                            val.push(next_val);
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
                            key: &key,
                            items: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_ZSET_2 => {
                let key = input.read_string()?;
                let (count, _) = input.read_length()?;
                let mut iter = SortedSetIter { count, v: 2, input };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        if let Ok(next_val) = iter.next() {
                            val.push(next_val);
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
                            key: &key,
                            items: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_HASH => {
                let key = input.read_string()?;
                let (count, _) = input.read_length()?;
                let mut iter = StrValIter {
                    count: count * 2,
                    input,
                };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        let name;
                        let value;
                        if let Ok(next_val) = iter.next() {
                            name = next_val;
                            value = iter.next().expect("missing hash field value");
                            val.push(Field { name, value });
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::Hash(Hash {
                            key: &key,
                            fields: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_HASH_ZIPMAP => {
                let key = input.read_string()?;
                let bytes = input.read_string()?;
                let cursor = &mut Cursor::new(&bytes);
                cursor.set_position(1);
                let mut iter = ZipMapIter {
                    has_more: true,
                    cursor,
                };
                let mut has_more = true;
                while has_more {
                    let mut fields = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        if let Ok(field) = iter.next() {
                            fields.push(field);
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !fields.is_empty() {
                        event_handler.handle(Event::RDB(Object::Hash(Hash {
                            key: &key,
                            fields: &fields,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_LIST_ZIPLIST => {
                let key = input.read_string()?;
                let bytes = input.read_string()?;
                let cursor = &mut Cursor::new(bytes);
                
                cursor.set_position(8);
                let count = cursor.read_u16::<LittleEndian>()? as isize;
                let mut iter = ZipListIter { count, cursor };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        if let Ok(next_val) = iter.next() {
                            val.push(next_val);
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::List(List {
                            key: &key,
                            values: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_HASH_ZIPLIST => {
                let key = input.read_string()?;
                let bytes = input.read_string()?;
                let cursor = &mut Cursor::new(bytes);
                
                cursor.set_position(8);
                let count = cursor.read_u16::<LittleEndian>()? as isize;
                let mut iter = ZipListIter { count, cursor };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        let name;
                        let value;
                        if let Ok(next_val) = iter.next() {
                            name = next_val;
                            value = iter.next().expect("missing hash field value");
                            val.push(Field { name, value });
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::Hash(Hash {
                            key: &key,
                            fields: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_ZSET_ZIPLIST => {
                let key = input.read_string()?;
                let bytes = input.read_string()?;
                let cursor = &mut Cursor::new(bytes);
                
                cursor.set_position(8);
                let count = cursor.read_u16::<LittleEndian>()? as isize;
                let mut iter = ZipListIter { count, cursor };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        let member;
                        let score: f64;
                        if let Ok(next_val) = iter.next() {
                            member = next_val;
                            let score_str =
                                to_string(iter.next().expect("missing sorted set element's score"));
                            score = score_str.parse::<f64>().unwrap();
                            val.push(Item { member, score });
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::SortedSet(SortedSet {
                            key: &key,
                            items: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_SET_INTSET => {
                let key = input.read_string()?;
                let bytes = input.read_string()?;
                let mut cursor = Cursor::new(&bytes);
                let encoding = cursor.read_i32::<LittleEndian>()?;
                let length = cursor.read_u32::<LittleEndian>()?;
                let mut iter = IntSetIter {
                    encoding,
                    count: length as isize,
                    cursor: &mut cursor,
                };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        if let Ok(next_val) = iter.next() {
                            val.push(next_val);
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::Set(Set {
                            key: &key,
                            members: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_LIST_QUICKLIST => {
                let key = input.read_string()?;
                let (count, _) = input.read_length()?;
                let mut iter = QuickListIter {
                    len: -1,
                    count,
                    input,
                    cursor: Option::None,
                };
                let mut has_more = true;
                while has_more {
                    let mut val = Vec::new();
                    for _ in 0..BATCH_SIZE {
                        if let Ok(next_val) = iter.next() {
                            val.push(next_val);
                        } else {
                            has_more = false;
                            break;
                        }
                    }
                    if !val.is_empty() {
                        event_handler.handle(Event::RDB(Object::List(List {
                            key: &key,
                            values: &val,
                            meta,
                        })));
                    }
                }
            }
            RDB_TYPE_MODULE | RDB_TYPE_MODULE_2 => {
                let key = input.read_string()?;
                let (module_id, _) = input.read_length()?;
                let module_id = module_id as usize;
                let mut array: [char; 9] = [' '; 9];
                for i in 0..array.len() {
                    let i1 = 10 + (array.len() - 1 - i) * 6;
                    let i2 = (module_id >> i1 as usize) as usize;
                    let i3 = i2 & 63;
                    let chr = MODULE_SET.get(i3).unwrap();
                    array[i] = *chr;
                }
                let module_name: String = String::from_iter(array.iter());
                let module_version: usize = module_id & 1023;
                if self.module_parser.is_none() && value_type == RDB_TYPE_MODULE {
                    panic!(
                        "MODULE {}, version {} 无法解析",
                        module_name, module_version
                    );
                }
                if let Some(parser) = &mut self.module_parser {
                    let module: Box<dyn Module>;
                    if value_type == RDB_TYPE_MODULE_2 {
                        module = parser.borrow_mut().parse(input, &module_name, 2);
                        let (len, _) = input.read_length()?;
                        if len != 0 {
                            panic!(
                                "module '{}' that is not terminated by EOF marker, but {}",
                                &module_name, len
                            );
                        }
                    } else {
                        module = parser
                            .borrow_mut()
                            .parse(input, &module_name, module_version);
                    }
                    event_handler.handle(Event::RDB(Object::Module(key, module, meta)));
                } else {
                    
                    self.rdb_load_check_module_value(input)?;
                }
            }
            RDB_TYPE_STREAM_LISTPACKS => {
                let key = input.read_string()?;
                let stream = self.read_stream_list_packs(meta, input)?;
                event_handler.handle(Event::RDB(Object::Stream(key, stream)));
            }
            _ => panic!("unknown data type: {}", value_type),
        }
        Ok(())
    }
    fn rdb_load_check_module_value(&mut self, input: &mut dyn Read) -> Result<()> {
        loop {
            let (op_code, _) = input.read_length()?;
            if op_code == RDB_MODULE_OPCODE_EOF {
                break;
            }
            if op_code == RDB_MODULE_OPCODE_SINT || op_code == RDB_MODULE_OPCODE_UINT {
                input.read_length()?;
            } else if op_code == RDB_MODULE_OPCODE_STRING {
                input.read_string()?;
            } else if op_code == RDB_MODULE_OPCODE_FLOAT {
                input.read_exact(&mut [0; 4])?;
            } else if op_code == RDB_MODULE_OPCODE_DOUBLE {
                input.read_exact(&mut [0; 8])?;
            }
        }
        Ok(())
    }
    fn read_stream_list_packs<'a>(
        &mut self,
        meta: &'a Meta,
        input: &mut dyn Read,
    ) -> Result<Stream<'a>> {
        let mut entries: BTreeMap<ID, Entry> = BTreeMap::new();
        let (length, _) = input.read_length()?;
        for _ in 0..length {
            let raw_id = input.read_string()?;
            let mut cursor = Cursor::new(&raw_id);
            let ms = read_long(&mut cursor, 8, false)?;
            let seq = read_long(&mut cursor, 8, false)?;
            let base_id = ID { ms, seq };
            let raw_list_packs = input.read_string()?;
            let mut list_pack = Cursor::new(&raw_list_packs);
            list_pack.set_position(6);
            let count = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
            let deleted = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
            let num_fields =
                i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
            let mut tmp_fields = Vec::with_capacity(num_fields as usize);
            for _ in 0..num_fields {
                tmp_fields.push(read_list_pack_entry(&mut list_pack)?);
            }
            read_list_pack_entry(&mut list_pack)?;
            let total = count + deleted;
            for _ in 0..total {
                let mut fields = BTreeMap::new();
                let flag =
                    i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
                let ms = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
                let seq = i64::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
                let id = ID {
                    ms: ms + base_id.ms,
                    seq: seq + base_id.seq,
                };
                let deleted = (flag & 1) != 0;
                if (flag & 2) != 0 {
                    for i in 0..num_fields {
                        let value = read_list_pack_entry(&mut list_pack)?;
                        let field = tmp_fields.get(i as usize).unwrap().to_vec();
                        fields.insert(field, value);
                    }
                    entries.insert(
                        id,
                        Entry {
                            id,
                            deleted,
                            fields,
                        },
                    );
                } else {
                    let num_fields =
                        i32::from_str(&to_string(read_list_pack_entry(&mut list_pack)?)).unwrap();
                    for _ in 0..num_fields {
                        let field = read_list_pack_entry(&mut list_pack)?;
                        let value = read_list_pack_entry(&mut list_pack)?;
                        fields.insert(field, value);
                    }
                    entries.insert(
                        id,
                        Entry {
                            id,
                            deleted,
                            fields,
                        },
                    );
                }
                read_list_pack_entry(&mut list_pack)?;
            }
            let end = list_pack.read_u8()?;
            if end != 255 {
                panic!("listpack expect 255 but {}", end);
            }
        }
        input.read_length()?;
        input.read_length()?;
        input.read_length()?;
        let mut groups: Vec<Group> = Vec::new();
        let (count, _) = input.read_length()?;
        for _ in 0..count {
            let name = input.read_string()?;
            let (ms, _) = input.read_length()?;
            let (seq, _) = input.read_length()?;
            let group_last_id = ID {
                ms: ms as i64,
                seq: seq as i64,
            };
            groups.push(Group {
                name,
                last_id: group_last_id,
            });
            let (global_pel, _) = input.read_length()?;
            for _ in 0..global_pel {
                read_long(input, 8, false)?;
                read_long(input, 8, false)?;
                input.read_integer(8, false)?;
                input.read_length()?;
            }
            let (consumer_count, _) = input.read_length()?;
            for _ in 0..consumer_count {
                input.read_string()?;
                input.read_integer(8, false)?;
                let (pel, _) = input.read_length()?;
                for _ in 0..pel {
                    read_long(input, 8, false)?;
                    read_long(input, 8, false)?;
                }
            }
        }
        Ok(Stream {
            entries,
            groups,
            meta,
        })
    }
}
fn read_long(input: &mut dyn Read, length: i32, little_endian: bool) -> Result<i64> {
    let mut r: i64 = 0;
    for i in 0..length {
        let v: i64 = input.read_u8()? as i64;
        if little_endian {
            r |= v << (i << 3) as i64;
        } else {
            r = (r << 8) | v;
        }
    }
    Ok(r)
}
fn read_list_pack_entry(input: &mut dyn Read) -> Result<Vec<u8>> {
    let special = input.read_u8()? as i32;
    let skip: i32;
    let mut bytes;
    if (special & 0x80) == 0 {
        skip = 1;
        let value = special & 0x7F;
        let value = value.to_string();
        bytes = value.into_bytes();
    } else if (special & 0xC0) == 0x80 {
        let len = special & 0x3F;
        skip = 1 + len as i32;
        bytes = vec![0; len as usize];
        input.read_exact(&mut bytes)?;
    } else if (special & 0xE0) == 0xC0 {
        skip = 2;
        let next = input.read_u8()?;
        let value = (((special & 0x1F) << 8) | next as i32) << 19 >> 19;
        let value = value.to_string();
        bytes = value.into_bytes();
    } else if (special & 0xFF) == 0xF1 {
        skip = 3;
        let value = input.read_i16::<LittleEndian>()?;
        let value = value.to_string();
        bytes = value.into_bytes();
    } else if (special & 0xFF) == 0xF2 {
        skip = 4;
        let value = input.read_i24::<LittleEndian>()?;
        let value = value.to_string();
        bytes = value.into_bytes();
    } else if (special & 0xFF) == 0xF3 {
        skip = 5;
        let value = input.read_i32::<LittleEndian>()?;
        let value = value.to_string();
        bytes = value.into_bytes();
    } else if (special & 0xFF) == 0xF4 {
        skip = 9;
        let value = input.read_i64::<LittleEndian>()?;
        let value = value.to_string();
        bytes = value.into_bytes();
    } else if (special & 0xF0) == 0xE0 {
        let next = input.read_u8()?;
        let len = ((special & 0x0F) << 8) | next as i32;
        skip = 2 + len as i32;
        bytes = vec![0; len as usize];
        input.read_exact(&mut bytes)?;
    } else if (special & 0xFF) == 0xF0 {
        let len = input.read_u32::<BigEndian>()?;
        skip = 5 + len as i32;
        bytes = vec![0; len as usize];
        input.read_exact(&mut bytes)?;
    } else {
        panic!("{}", special)
    }
    if skip <= 127 {
        let mut buf = vec![0; 1];
        input.read_exact(&mut buf)?;
    } else if skip < 16383 {
        let mut buf = vec![0; 2];
        input.read_exact(&mut buf)?;
    } else if skip < 2097151 {
        let mut buf = vec![0; 3];
        input.read_exact(&mut buf)?;
    } else if skip < 268435455 {
        let mut buf = vec![0; 4];
        input.read_exact(&mut buf)?;
    } else {
        let mut buf = vec![0; 5];
        input.read_exact(&mut buf)?;
    }
    Ok(bytes)
}
pub(crate) fn read_zm_len(cursor: &mut Cursor<&Vec<u8>>) -> Result<usize> {
    let len = cursor.read_u8()?;
    if len <= 253 {
        return Ok(len as usize);
    } else if len == 254 {
        let value = cursor.read_u32::<BigEndian>()?;
        return Ok(value as usize);
    }
    Ok(len as usize)
}
pub(crate) fn read_zip_list_entry(cursor: &mut Cursor<Vec<u8>>) -> Result<Vec<u8>> {
    if cursor.read_u8()? >= 254 {
        cursor.read_u32::<LittleEndian>()?;
    }
    let flag = cursor.read_u8()?;
    match flag >> 6 {
        0 => {
            let length = flag & 0x3F;
            let mut buff = vec![0; length as usize];
            cursor.read_exact(&mut buff)?;
            return Ok(buff);
        }
        1 => {
            let next_byte = cursor.read_u8()?;
            let length = (((flag as u16) & 0x3F) << 8) | (next_byte as u16);
            let mut buff = vec![0; length as usize];
            cursor.read_exact(&mut buff)?;
            return Ok(buff);
        }
        2 => {
            let length = cursor.read_u32::<BigEndian>()?;
            let mut buff = vec![0; length as usize];
            cursor.read_exact(&mut buff)?;
            return Ok(buff);
        }
        _ => {}
    }
    return match flag {
        ZIP_INT_8BIT => {
            let int = cursor.read_i8()?;
            Ok(int.to_string().into_bytes())
        }
        ZIP_INT_16BIT => {
            let int = cursor.read_i16::<LittleEndian>()?;
            Ok(int.to_string().into_bytes())
        }
        ZIP_INT_24BIT => {
            let int = cursor.read_i24::<LittleEndian>()?;
            Ok(int.to_string().into_bytes())
        }
        ZIP_INT_32BIT => {
            let int = cursor.read_i32::<LittleEndian>()?;
            Ok(int.to_string().into_bytes())
        }
        ZIP_INT_64BIT => {
            let int = cursor.read_i64::<LittleEndian>()?;
            Ok(int.to_string().into_bytes())
        }
        _ => {
            let result = (flag - 0xF1) as isize;
            Ok(result.to_string().into_bytes())
        }
    };
}
#[derive(Debug)]
pub enum Object<'a> {
    
    String(KeyValue<'a>),
    
    List(List<'a>),
    
    Set(Set<'a>),
    
    SortedSet(SortedSet<'a>),
    
    Hash(Hash<'a>),
    
    Module(Vec<u8>, Box<dyn Module>, &'a Meta),
    
    Stream(Vec<u8>, Stream<'a>),
    
    BOR,
    
    EOR,
}
pub trait Module {
    fn as_any(&self) -> &dyn Any;
}
impl Debug for dyn Module {
    fn fmt(&self, _: &mut Formatter) -> result::Result<(), Error> {
        unimplemented!()
    }
}
#[derive(Debug)]
pub struct Meta {
    
    pub db: isize,
    
    pub expire: Option<(ExpireType, i64)>,
    
    pub evict: Option<(EvictType, i64)>,
}
#[derive(Debug)]
pub enum ExpireType {
    
    Second,
    
    Millisecond,
}
#[derive(Debug)]
pub enum EvictType {
    
    LRU,
    
    LFU,
}
#[derive(Debug)]
pub struct KeyValue<'a> {
    
    pub key: &'a [u8],
    
    pub value: &'a [u8],
    
    pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct List<'a> {
    
    pub key: &'a [u8],
    
    pub values: &'a [Vec<u8>],
    
    pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct Set<'a> {
    
    pub key: &'a [u8],
    
    pub members: &'a [Vec<u8>],
    
    pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct SortedSet<'a> {
    
    pub key: &'a [u8],
    
    pub items: &'a [Item],
    
    pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct Item {
    
    pub member: Vec<u8>,
    
    pub score: f64,
}
#[derive(Debug)]
pub struct Hash<'a> {
    
    pub key: &'a [u8],
    
    pub fields: &'a [Field],
    
    pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct Field {
    
    pub name: Vec<u8>,
    
    pub value: Vec<u8>,
}
#[derive(Debug)]
pub struct Stream<'a> {
    pub entries: BTreeMap<ID, Entry>,
    pub groups: Vec<Group>,
    
    pub meta: &'a Meta,
}
#[derive(Debug, Eq, Copy, Clone)]
pub struct ID {
    pub ms: i64,
    pub seq: i64,
}
impl ID {
    pub fn to_string(&self) -> String {
        format!("{}-{}", self.ms, self.seq)
    }
}
impl PartialEq for ID {
    fn eq(&self, other: &Self) -> bool {
        self.ms == other.ms && self.seq == other.seq
    }
    fn ne(&self, other: &Self) -> bool {
        self.ms != other.ms || self.seq != other.seq
    }
}
impl PartialOrd for ID {
    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
        Some(self.cmp(other))
    }
    fn lt(&self, other: &Self) -> bool {
        match self.cmp(other) {
            cmp::Ordering::Less => true,
            cmp::Ordering::Equal => false,
            cmp::Ordering::Greater => false,
        }
    }
    fn le(&self, other: &Self) -> bool {
        match self.cmp(other) {
            cmp::Ordering::Less => true,
            cmp::Ordering::Equal => true,
            cmp::Ordering::Greater => false,
        }
    }
    fn gt(&self, other: &Self) -> bool {
        match self.cmp(other) {
            cmp::Ordering::Less => false,
            cmp::Ordering::Equal => false,
            cmp::Ordering::Greater => true,
        }
    }
    fn ge(&self, other: &Self) -> bool {
        match self.cmp(other) {
            cmp::Ordering::Less => false,
            cmp::Ordering::Equal => true,
            cmp::Ordering::Greater => true,
        }
    }
}
impl Ord for ID {
    fn cmp(&self, other: &Self) -> cmp::Ordering {
        let order = self.ms.cmp(&other.ms);
        if order == cmp::Ordering::Equal {
            self.seq.cmp(&other.seq)
        } else {
            order
        }
    }
}
#[derive(Debug)]
pub struct Entry {
    pub id: ID,
    pub deleted: bool,
    pub fields: BTreeMap<Vec<u8>, Vec<u8>>,
}
#[derive(Debug)]
pub struct Group {
    pub name: Vec<u8>,
    pub last_id: ID,
}
pub(crate) const RDB_TYPE_STRING: u8 = 0;
pub(crate) const RDB_TYPE_LIST: u8 = 1;
pub(crate) const RDB_TYPE_SET: u8 = 2;
pub(crate) const RDB_TYPE_ZSET: u8 = 3;
pub(crate) const RDB_TYPE_HASH: u8 = 4;
pub(crate) const RDB_TYPE_ZSET_2: u8 = 5;
pub(crate) const RDB_TYPE_MODULE: u8 = 6;
pub(crate) const RDB_TYPE_MODULE_2: u8 = 7;
pub(crate) const RDB_TYPE_HASH_ZIPMAP: u8 = 9;
pub(crate) const RDB_TYPE_LIST_ZIPLIST: u8 = 10;
pub(crate) const RDB_TYPE_SET_INTSET: u8 = 11;
pub(crate) const RDB_TYPE_ZSET_ZIPLIST: u8 = 12;
pub(crate) const RDB_TYPE_HASH_ZIPLIST: u8 = 13;
pub(crate) const RDB_TYPE_LIST_QUICKLIST: u8 = 14;
pub(crate) const RDB_TYPE_STREAM_LISTPACKS: u8 = 15;
pub(crate) const RDB_OPCODE_MODULE_AUX: u8 = 247;
pub(crate) const RDB_OPCODE_IDLE: u8 = 248;
pub(crate) const RDB_OPCODE_FREQ: u8 = 249;
pub(crate) const RDB_OPCODE_AUX: u8 = 250;
pub(crate) const RDB_OPCODE_RESIZEDB: u8 = 251;
pub(crate) const RDB_OPCODE_EXPIRETIME_MS: u8 = 252;
pub(crate) const RDB_OPCODE_EXPIRETIME: u8 = 253;
pub(crate) const RDB_OPCODE_SELECTDB: u8 = 254;
pub(crate) const RDB_OPCODE_EOF: u8 = 255;
pub(crate) const RDB_MODULE_OPCODE_EOF: isize = 0;
pub(crate) const RDB_MODULE_OPCODE_SINT: isize = 1;
pub(crate) const RDB_MODULE_OPCODE_UINT: isize = 2;
pub(crate) const RDB_MODULE_OPCODE_STRING: isize = 5;
pub(crate) const RDB_MODULE_OPCODE_FLOAT: isize = 3;
pub(crate) const RDB_MODULE_OPCODE_DOUBLE: isize = 4;
pub(crate) const ZIP_INT_8BIT: u8 = 254;
pub(crate) const ZIP_INT_16BIT: u8 = 192;
pub(crate) const ZIP_INT_24BIT: u8 = 240;
pub(crate) const ZIP_INT_32BIT: u8 = 208;
pub(crate) const ZIP_INT_64BIT: u8 = 224;
pub(crate) const RDB_ENCVAL: u8 = 3;
pub(crate) const RDB_6BITLEN: u8 = 0;
pub(crate) const RDB_14BITLEN: u8 = 1;
pub(crate) const RDB_32BITLEN: u8 = 0x80;
pub(crate) const RDB_64BITLEN: u8 = 0x81;
pub(crate) const RDB_ENC_INT8: isize = 0;
pub(crate) const RDB_ENC_INT16: isize = 1;
pub(crate) const RDB_ENC_INT32: isize = 2;
pub(crate) const RDB_ENC_LZF: isize = 3;
pub(crate) const BATCH_SIZE: usize = 64;
pub(crate) const MODULE_SET: [char; 64] = [
    'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S',
    'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
    'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4',
    '5', '6', '7', '8', '9', '-', '_',
];