use super::column_type::ColumnType;
use crate::{binlog_error::BinlogError, ext::cursor_ext::CursorExt};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use std::io::{Cursor, Read};
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub enum ColumnValue {
None,
Tiny(i8),
UnsignedTiny(u8),
Short(i16),
UnsignedShort(u16),
Long(i32),
UnsignedLong(u32),
LongLong(i64),
UnsignedLongLong(u64),
Float(f32),
Double(f64),
Decimal(String),
Time(String),
Date(String),
DateTime(String),
Timestamp(i64),
Year(u16),
String(Vec<u8>),
Blob(Vec<u8>),
Bit(u64),
Set(u64),
Enum(u32),
Json(Vec<u8>),
}
const DIG_PER_DEC: usize = 9;
const COMPRESSED_BYTES: [usize; 10] = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4];
impl ColumnValue {
pub fn parse(
cursor: &mut Cursor<&Vec<u8>>,
column_type: ColumnType,
column_meta: u16,
column_length: u16,
) -> Result<Self, BinlogError> {
let value = match column_type {
ColumnType::Bit => ColumnValue::Bit(Self::parse_bit(cursor, column_meta)?),
ColumnType::Tiny => ColumnValue::Tiny(cursor.read_i8()?),
ColumnType::Short => ColumnValue::Short(cursor.read_i16::<LittleEndian>()?),
ColumnType::Int24 => ColumnValue::Long(cursor.read_i24::<LittleEndian>()?),
ColumnType::Long => ColumnValue::Long(cursor.read_i32::<LittleEndian>()?),
ColumnType::LongLong => ColumnValue::LongLong(cursor.read_i64::<LittleEndian>()?),
ColumnType::Float => ColumnValue::Float(cursor.read_f32::<LittleEndian>()?),
ColumnType::Double => ColumnValue::Double(cursor.read_f64::<LittleEndian>()?),
ColumnType::NewDecimal => {
let precision = (column_meta & 0xFF) as usize;
let scale = (column_meta >> 8) as usize;
ColumnValue::Decimal(Self::parse_decimal(cursor, precision, scale)?)
}
ColumnType::Date => ColumnValue::Date(Self::parse_date(cursor)?),
ColumnType::Time => ColumnValue::Time(Self::parse_time(cursor)?),
ColumnType::Time2 => ColumnValue::Time(Self::parse_time2(cursor, column_meta)?),
ColumnType::TimeStamp => ColumnValue::Timestamp(Self::parse_timestamp(cursor)?),
ColumnType::TimeStamp2 => {
ColumnValue::Timestamp(Self::parse_timestamp2(cursor, column_meta)?)
}
ColumnType::DateTime => ColumnValue::DateTime(Self::parse_datetime(cursor)?),
ColumnType::DateTime2 => {
ColumnValue::DateTime(Self::parse_datetime2(cursor, column_meta)?)
}
ColumnType::Year => ColumnValue::Year(cursor.read_u8()? as u16 + 1900),
ColumnType::VarChar | ColumnType::VarString => {
ColumnValue::String(Self::parse_string(cursor, column_meta)?)
}
ColumnType::String => ColumnValue::String(Self::parse_string(cursor, column_length)?),
ColumnType::Blob
| ColumnType::Geometry
| ColumnType::TinyBlob
| ColumnType::MediumBlob
| ColumnType::LongBlob => ColumnValue::Blob(Self::parse_blob(cursor, column_meta)?),
ColumnType::Enum => {
ColumnValue::Enum(cursor.read_int::<LittleEndian>(column_length as usize)? as u32)
}
ColumnType::Set => {
ColumnValue::Set(cursor.read_int::<LittleEndian>(column_length as usize)? as u64)
}
ColumnType::Json => ColumnValue::Json(Self::parse_blob(cursor, column_meta)?),
_ => {
return Err(BinlogError::UnsupportedColumnType(format!(
"{:?}",
column_type
)))
}
};
Ok(value)
}
fn parse_bit(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<u64, BinlogError> {
let bit_count = (column_meta >> 8) * 8 + (column_meta & 0xFF);
let bytes = cursor.read_bits_as_bytes(bit_count as usize, true)?;
let mut result = 0u64;
for i in 0..bytes.len() {
result |= (bytes[i] as u64) << (i * 8);
}
Ok(result)
}
fn parse_date(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
let date_val = cursor.read_u24::<LittleEndian>()?;
let day = date_val % 32;
let month = (date_val >> 5) % 16;
let year = date_val >> 9;
Ok(format!("{}-{:02}-{:02}", year, month, day))
}
fn parse_time(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
let time_val = cursor.read_u24::<LittleEndian>()?;
let hour = (time_val / 100) / 100;
let minute = (time_val / 100) % 100;
let second = time_val % 100;
Ok(format!("{:02}:{:02}:{:02}", hour, minute, second))
}
fn parse_time2(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<String, BinlogError> {
let time_val = cursor.read_u24::<BigEndian>()?;
let hour = (time_val >> 12) % (1 << 10);
let minute = (time_val >> 6) % (1 << 6);
let second = time_val % (1 << 6);
let micros = Self::parse_fraction(cursor, column_meta)?;
Ok(format!(
"{:02}:{:02}:{:02}.{:06}",
hour, minute, second, micros
))
}
fn parse_fraction(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<u32, BinlogError> {
let mut fraction = 0;
let length = ((column_meta + 1) / 2) as u32;
if length > 0 {
fraction = cursor.read_uint::<BigEndian>(length as usize)?;
fraction *= u64::pow(100, 3 - length);
}
Ok(fraction as u32)
}
fn parse_timestamp(cursor: &mut Cursor<&Vec<u8>>) -> Result<i64, BinlogError> {
Ok((cursor.read_u32::<LittleEndian>()? * 1000) as i64)
}
fn parse_timestamp2(
cursor: &mut Cursor<&Vec<u8>>,
column_meta: u16,
) -> Result<i64, BinlogError> {
let second = cursor.read_u32::<BigEndian>()?;
let micros = Self::parse_fraction(cursor, column_meta)?;
Ok(1000000 * second as i64 + micros as i64)
}
fn parse_datetime(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
let datetime_val = cursor.read_u64::<LittleEndian>()? * 1000;
let date_val = datetime_val / 1000000;
let time_val = datetime_val % 1000000;
let year = ((date_val / 100) / 100) as u32;
let month = ((date_val / 100) % 100) as u32;
let day = (date_val % 100) as u32;
let hour = ((time_val / 100) / 100) as u32;
let minute = ((time_val / 100) % 100) as u32;
let second = (time_val % 100) as u32;
Ok(format!(
"{}-{:02}-{:02} {:02}:{:02}:{:02}",
year, month, day, hour, minute, second,
))
}
fn parse_datetime2(
cursor: &mut Cursor<&Vec<u8>>,
column_meta: u16,
) -> Result<String, BinlogError> {
let val = cursor.read_uint::<BigEndian>(5)? - 0x8000000000;
let micros = Self::parse_fraction(cursor, column_meta)?;
let d_val = val >> 17;
let t_val = val % (1 << 17);
let year = ((d_val >> 5) / 13) as u32;
let month = ((d_val >> 5) % 13) as u32;
let day = (d_val % (1 << 5)) as u32;
let hour = ((val >> 12) % (1 << 5)) as u32;
let minute = ((t_val >> 6) % (1 << 6)) as u32;
let second = (t_val % (1 << 6)) as u32;
Ok(format!(
"{}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
year, month, day, hour, minute, second, micros,
))
}
fn parse_string(
cursor: &mut Cursor<&Vec<u8>>,
column_meta: u16,
) -> Result<Vec<u8>, BinlogError> {
let size = if column_meta < 256 {
cursor.read_u8()? as usize
} else {
cursor.read_u16::<LittleEndian>()? as usize
};
cursor.read_bytes(size)
}
fn parse_blob(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<Vec<u8>, BinlogError> {
let size = cursor.read_uint::<LittleEndian>(column_meta as usize)? as usize;
cursor.read_bytes(size)
}
pub fn parse_decimal(
cursor: &mut Cursor<&Vec<u8>>,
precision: usize,
scale: usize,
) -> Result<String, BinlogError> {
let integral = precision - scale;
let uncomp_intg = integral / DIG_PER_DEC;
let uncomp_frac = scale / DIG_PER_DEC;
let comp_intg = integral - (uncomp_intg * DIG_PER_DEC);
let comp_frac = scale - (uncomp_frac * DIG_PER_DEC);
let comp_frac_bytes = COMPRESSED_BYTES[comp_frac];
let comp_intg_bytes = COMPRESSED_BYTES[comp_intg];
let total_bytes = 4 * uncomp_intg + 4 * uncomp_frac + comp_frac_bytes + comp_intg_bytes;
let mut buf = vec![0u8; total_bytes];
cursor.read_exact(&mut buf)?;
let is_negative = (buf[0] & 0x80) == 0;
buf[0] ^= 0x80;
if is_negative {
for i in 0..buf.len() {
buf[i] ^= 0xFF;
}
}
let mut intg_str = String::new();
if is_negative {
intg_str = "-".to_string();
}
let mut decimal_cursor = Cursor::new(buf);
let mut is_intg_empty = true;
if comp_intg_bytes > 0 {
let value = decimal_cursor.read_uint::<BigEndian>(comp_intg_bytes)?;
if value > 0 {
intg_str += value.to_string().as_str();
is_intg_empty = false;
}
}
for _ in 0..uncomp_intg {
let value = decimal_cursor.read_u32::<BigEndian>()?;
if is_intg_empty {
if value > 0 {
intg_str += value.to_string().as_str();
is_intg_empty = false;
}
} else {
intg_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str();
}
}
if is_intg_empty {
intg_str += "0";
}
let mut frac_str = String::new();
for _ in 0..uncomp_frac {
let value = decimal_cursor.read_u32::<BigEndian>()?;
frac_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str();
}
if comp_frac_bytes > 0 {
let value = decimal_cursor.read_uint::<BigEndian>(comp_frac_bytes)?;
frac_str += format!("{value:0size$}", value = value, size = comp_frac).as_str();
}
if frac_str.is_empty() {
Ok(intg_str)
} else {
Ok(intg_str + "." + frac_str.as_str())
}
}
}