use core::result;
use std::any::Any;
use std::cell::RefMut;
use std::cmp;
use std::collections::BTreeMap;
use std::fmt::{Debug, Error, Formatter};
use std::io::{Cursor, Read, Result};
use std::sync::atomic::Ordering;
use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
use log::info;
use crate::{Event, EventHandler, to_string};
use crate::cmd::Command;
use crate::cmd::connection::SELECT;
use crate::io::Conn;
use crate::rdb::Data::Empty;
pub(crate) fn parse(input: &mut Conn,
_: isize,
mut event_handler: &mut RefMut<dyn EventHandler>) -> Result<Data<Vec<u8>, Vec<Vec<u8>>>> {
event_handler.handle(Event::RDB(Object::BOR));
let mut bytes = vec![0; 5];
input.read_exact(&mut bytes)?;
input.read_exact(&mut bytes[..=3])?;
let rdb_version = String::from_utf8_lossy(&bytes[..=3]);
let rdb_version = rdb_version.parse::<isize>().unwrap();
let mut meta = Meta {
db: 0,
expire: None,
evict: None,
};
while input.running.load(Ordering::Relaxed) {
let data_type = input.read_u8()?;
match data_type {
RDB_OPCODE_AUX => {
let field_name = input.read_string()?;
let field_val = input.read_string()?;
let field_name = to_string(field_name);
let field_val = to_string(field_val);
info!("{}:{}", field_name, field_val);
}
RDB_OPCODE_SELECTDB => {
let (db, _) = input.read_length()?;
meta.db = db;
let cmd = SELECT { db: db as i32 };
event_handler.handle(Event::AOF(Command::SELECT(&cmd)));
}
RDB_OPCODE_RESIZEDB => {
let (db, _) = input.read_length()?;
info!("db total keys: {}", db);
let (db, _) = input.read_length()?;
info!("db expired keys: {}", db);
}
RDB_OPCODE_EXPIRETIME | RDB_OPCODE_EXPIRETIME_MS => {
if data_type == RDB_OPCODE_EXPIRETIME_MS {
let expired_time = input.read_integer(8, false)?;
meta.expire = Option::Some((ExpireType::Millisecond, expired_time as i64));
} else {
let expired_time = input.read_integer(4, false)?;
meta.expire = Option::Some((ExpireType::Second, expired_time as i64));
}
let value_type = input.read_u8()?;
match value_type {
RDB_OPCODE_FREQ => {
let val = input.read_u8()?;
let value_type = input.read_u8()?;
meta.evict = Option::Some((EvictType::LFU, val as i64));
input.read_object(value_type, &mut event_handler, &meta)?;
}
RDB_OPCODE_IDLE => {
let (val, _) = input.read_length()?;
let value_type = input.read_u8()?;
meta.evict = Option::Some((EvictType::LRU, val as i64));
input.read_object(value_type, &mut event_handler, &meta)?;
}
_ => {
input.read_object(value_type, &mut event_handler, &meta)?;
}
}
}
RDB_OPCODE_FREQ => {
let val = input.read_u8()?;
let value_type = input.read_u8()?;
meta.evict = Option::Some((EvictType::LFU, val as i64));
input.read_object(value_type, &mut event_handler, &meta)?;
}
RDB_OPCODE_IDLE => {
let (val, _) = input.read_length()?;
meta.evict = Option::Some((EvictType::LRU, val as i64));
let value_type = input.read_u8()?;
input.read_object(value_type, &mut event_handler, &meta)?;
}
RDB_OPCODE_MODULE_AUX => {
input.read_length()?;
input.rdb_load_check_module_value()?;
}
RDB_OPCODE_EOF => {
if rdb_version >= 5 {
input.read_integer(8, true)?;
}
break;
}
_ => {
input.read_object(data_type, &mut event_handler, &meta)?;
}
};
};
event_handler.handle(Event::RDB(Object::EOR));
Ok(Empty)
}
pub(crate) fn read_zm_len(cursor: &mut Cursor<&Vec<u8>>) -> Result<usize> {
let len = cursor.read_u8()?;
if len <= 253 {
return Ok(len as usize);
} else if len == 254 {
let value = cursor.read_u32::<BigEndian>()?;
return Ok(value as usize);
}
Ok(len as usize)
}
pub(crate) fn read_zip_list_entry(cursor: &mut Cursor<Vec<u8>>) -> Result<Vec<u8>> {
if cursor.read_u8()? >= 254 {
cursor.read_u32::<LittleEndian>()?;
}
let flag = cursor.read_u8()?;
match flag >> 6 {
0 => {
let length = flag & 0x3F;
let mut buff = vec![0; length as usize];
cursor.read_exact(&mut buff)?;
return Ok(buff);
}
1 => {
let next_byte = cursor.read_u8()?;
let length = (((flag as u16) & 0x3F) << 8) | (next_byte as u16);
let mut buff = vec![0; length as usize];
cursor.read_exact(&mut buff)?;
return Ok(buff);
}
2 => {
let length = cursor.read_u32::<BigEndian>()?;
let mut buff = vec![0; length as usize];
cursor.read_exact(&mut buff)?;
return Ok(buff);
}
_ => {}
}
return match flag {
ZIP_INT_8BIT => {
let int = cursor.read_i8()?;
Ok(int.to_string().into_bytes())
}
ZIP_INT_16BIT => {
let int = cursor.read_i16::<LittleEndian>()?;
Ok(int.to_string().into_bytes())
}
ZIP_INT_24BIT => {
let int = cursor.read_i24::<LittleEndian>()?;
Ok(int.to_string().into_bytes())
}
ZIP_INT_32BIT => {
let int = cursor.read_i32::<LittleEndian>()?;
Ok(int.to_string().into_bytes())
}
ZIP_INT_64BIT => {
let int = cursor.read_i64::<LittleEndian>()?;
Ok(int.to_string().into_bytes())
}
_ => {
let result = (flag - 0xF1) as isize;
Ok(result.to_string().into_bytes())
}
};
}
#[derive(Debug)]
pub enum Object<'a> {
String(KeyValue<'a>),
List(List<'a>),
Set(Set<'a>),
SortedSet(SortedSet<'a>),
Hash(Hash<'a>),
Module(Vec<u8>, Box<dyn Module>),
Stream(Vec<u8>, Stream),
BOR,
EOR,
}
pub trait Module {
fn as_any(&self) -> &dyn Any;
}
impl Debug for dyn Module {
fn fmt(&self, _: &mut Formatter) -> result::Result<(), Error> {
unimplemented!()
}
}
#[derive(Debug)]
pub struct Meta {
pub db: isize,
pub expire: Option<(ExpireType, i64)>,
pub evict: Option<(EvictType, i64)>,
}
#[derive(Debug)]
pub enum ExpireType {
Second,
Millisecond,
}
#[derive(Debug)]
pub enum EvictType {
LRU,
LFU,
}
#[derive(Debug)]
pub struct KeyValue<'a> {
pub key: &'a [u8],
pub value: &'a [u8],
pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct List<'a> {
pub key: &'a [u8],
pub values: &'a [Vec<u8>],
pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct Set<'a> {
pub key: &'a [u8],
pub members: &'a [Vec<u8>],
pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct SortedSet<'a> {
pub key: &'a [u8],
pub items: &'a [Item],
pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct Item {
pub member: Vec<u8>,
pub score: f64,
}
#[derive(Debug)]
pub struct Hash<'a> {
pub key: &'a [u8],
pub fields: &'a [Field],
pub meta: &'a Meta,
}
#[derive(Debug)]
pub struct Field {
pub name: Vec<u8>,
pub value: Vec<u8>,
}
#[derive(Debug)]
pub struct Stream {
pub entries: BTreeMap<ID, Entry>,
pub groups: Vec<Group>,
}
#[derive(Debug, Eq, Copy, Clone)]
pub struct ID {
pub ms: i64,
pub seq: i64,
}
impl ID {
pub fn to_string(&self) -> String {
format!("{}-{}", self.ms, self.seq)
}
}
impl PartialEq for ID {
fn eq(&self, other: &Self) -> bool {
self.ms == other.ms && self.seq == other.seq
}
fn ne(&self, other: &Self) -> bool {
self.ms != other.ms || self.seq != other.seq
}
}
impl PartialOrd for ID {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
fn lt(&self, other: &Self) -> bool {
match self.cmp(other) {
cmp::Ordering::Less => true,
cmp::Ordering::Equal => false,
cmp::Ordering::Greater => false
}
}
fn le(&self, other: &Self) -> bool {
match self.cmp(other) {
cmp::Ordering::Less => true,
cmp::Ordering::Equal => true,
cmp::Ordering::Greater => false
}
}
fn gt(&self, other: &Self) -> bool {
match self.cmp(other) {
cmp::Ordering::Less => false,
cmp::Ordering::Equal => false,
cmp::Ordering::Greater => true
}
}
fn ge(&self, other: &Self) -> bool {
match self.cmp(other) {
cmp::Ordering::Less => false,
cmp::Ordering::Equal => true,
cmp::Ordering::Greater => true
}
}
}
impl Ord for ID {
fn cmp(&self, other: &Self) -> cmp::Ordering {
let order = self.ms.cmp(&other.ms);
if order == cmp::Ordering::Equal {
self.seq.cmp(&other.seq)
} else {
order
}
}
}
#[derive(Debug)]
pub struct Entry {
pub id: ID,
pub deleted: bool,
pub fields: BTreeMap<Vec<u8>, Vec<u8>>,
}
#[derive(Debug)]
pub struct Group {
pub name: Vec<u8>,
pub last_id: ID
}
pub(crate) const RDB_TYPE_STRING: u8 = 0;
pub(crate) const RDB_TYPE_LIST: u8 = 1;
pub(crate) const RDB_TYPE_SET: u8 = 2;
pub(crate) const RDB_TYPE_ZSET: u8 = 3;
pub(crate) const RDB_TYPE_HASH: u8 = 4;
pub(crate) const RDB_TYPE_ZSET_2: u8 = 5;
pub(crate) const RDB_TYPE_MODULE: u8 = 6;
pub(crate) const RDB_TYPE_MODULE_2: u8 = 7;
pub(crate) const RDB_TYPE_HASH_ZIPMAP: u8 = 9;
pub(crate) const RDB_TYPE_LIST_ZIPLIST: u8 = 10;
pub(crate) const RDB_TYPE_SET_INTSET: u8 = 11;
pub(crate) const RDB_TYPE_ZSET_ZIPLIST: u8 = 12;
pub(crate) const RDB_TYPE_HASH_ZIPLIST: u8 = 13;
pub(crate) const RDB_TYPE_LIST_QUICKLIST: u8 = 14;
pub(crate) const RDB_TYPE_STREAM_LISTPACKS: u8 = 15;
pub(crate) const RDB_OPCODE_MODULE_AUX: u8 = 247;
pub(crate) const RDB_OPCODE_IDLE: u8 = 248;
pub(crate) const RDB_OPCODE_FREQ: u8 = 249;
pub(crate) const RDB_OPCODE_AUX: u8 = 250;
pub(crate) const RDB_OPCODE_RESIZEDB: u8 = 251;
pub(crate) const RDB_OPCODE_EXPIRETIME_MS: u8 = 252;
pub(crate) const RDB_OPCODE_EXPIRETIME: u8 = 253;
pub(crate) const RDB_OPCODE_SELECTDB: u8 = 254;
pub(crate) const RDB_OPCODE_EOF: u8 = 255;
pub(crate) const RDB_MODULE_OPCODE_EOF: isize = 0;
pub(crate) const RDB_MODULE_OPCODE_SINT: isize = 1;
pub(crate) const RDB_MODULE_OPCODE_UINT: isize = 2;
pub(crate) const RDB_MODULE_OPCODE_STRING: isize = 5;
pub(crate) const RDB_MODULE_OPCODE_FLOAT: isize = 3;
pub(crate) const RDB_MODULE_OPCODE_DOUBLE: isize = 4;
pub(crate) const ZIP_INT_8BIT: u8 = 254;
pub(crate) const ZIP_INT_16BIT: u8 = 192;
pub(crate) const ZIP_INT_24BIT: u8 = 240;
pub(crate) const ZIP_INT_32BIT: u8 = 208;
pub(crate) const ZIP_INT_64BIT: u8 = 224;
pub(crate) const RDB_ENCVAL: u8 = 3;
pub(crate) const RDB_6BITLEN: u8 = 0;
pub(crate) const RDB_14BITLEN: u8 = 1;
pub(crate) const RDB_32BITLEN: u8 = 0x80;
pub(crate) const RDB_64BITLEN: u8 = 0x81;
pub(crate) const RDB_ENC_INT8: isize = 0;
pub(crate) const RDB_ENC_INT16: isize = 1;
pub(crate) const RDB_ENC_INT32: isize = 2;
pub(crate) const RDB_ENC_LZF: isize = 3;
pub(crate) const BATCH_SIZE: usize = 64;
pub(crate) enum Data<B, V> {
Bytes(B),
BytesVec(V),
Empty,
}