use crate::btree::var_len::{decode_var_int_from_slice, encode_var_int};
use crate::spec::{DataType, Datum};
use std::cmp::Ordering;
const TIMESTAMP_COMPACT_PRECISION: u32 = 3;
const DECIMAL_COMPACT_PRECISION: u32 = 18;
pub type KeyComparator = Box<dyn Fn(&[u8], &[u8]) -> Ordering + Send + Sync>;
pub fn make_key_comparator(data_type: &DataType) -> KeyComparator {
match data_type {
DataType::TinyInt(_) => Box::new(|a: &[u8], b: &[u8]| (a[0] as i8).cmp(&(b[0] as i8))),
DataType::SmallInt(_) => Box::new(|a: &[u8], b: &[u8]| {
let av = i16::from_le_bytes(a[..2].try_into().unwrap());
let bv = i16::from_le_bytes(b[..2].try_into().unwrap());
av.cmp(&bv)
}),
DataType::Int(_) | DataType::Date(_) | DataType::Time(_) => {
Box::new(|a: &[u8], b: &[u8]| {
let av = i32::from_le_bytes(a[..4].try_into().unwrap());
let bv = i32::from_le_bytes(b[..4].try_into().unwrap());
av.cmp(&bv)
})
}
DataType::BigInt(_) => Box::new(|a: &[u8], b: &[u8]| {
let av = i64::from_le_bytes(a[..8].try_into().unwrap());
let bv = i64::from_le_bytes(b[..8].try_into().unwrap());
av.cmp(&bv)
}),
DataType::Float(_) => Box::new(|a: &[u8], b: &[u8]| {
let av = f32::from_le_bytes(a[..4].try_into().unwrap());
let bv = f32::from_le_bytes(b[..4].try_into().unwrap());
av.total_cmp(&bv)
}),
DataType::Double(_) => Box::new(|a: &[u8], b: &[u8]| {
let av = f64::from_le_bytes(a[..8].try_into().unwrap());
let bv = f64::from_le_bytes(b[..8].try_into().unwrap());
av.total_cmp(&bv)
}),
DataType::Timestamp(t) if t.precision() > TIMESTAMP_COMPACT_PRECISION => {
Box::new(|a: &[u8], b: &[u8]| {
let a_millis = i64::from_le_bytes(a[..8].try_into().unwrap());
let b_millis = i64::from_le_bytes(b[..8].try_into().unwrap());
let (a_nanos, _) = decode_var_int_from_slice(a, 8);
let (b_nanos, _) = decode_var_int_from_slice(b, 8);
a_millis.cmp(&b_millis).then_with(|| a_nanos.cmp(&b_nanos))
})
}
DataType::LocalZonedTimestamp(t) if t.precision() > TIMESTAMP_COMPACT_PRECISION => {
Box::new(|a: &[u8], b: &[u8]| {
let a_millis = i64::from_le_bytes(a[..8].try_into().unwrap());
let b_millis = i64::from_le_bytes(b[..8].try_into().unwrap());
let (a_nanos, _) = decode_var_int_from_slice(a, 8);
let (b_nanos, _) = decode_var_int_from_slice(b, 8);
a_millis.cmp(&b_millis).then_with(|| a_nanos.cmp(&b_nanos))
})
}
DataType::Decimal(d) if d.precision() > DECIMAL_COMPACT_PRECISION => {
Box::new(|a: &[u8], b: &[u8]| {
let a_neg = !a.is_empty() && (a[0] & 0x80) != 0;
let b_neg = !b.is_empty() && (b[0] & 0x80) != 0;
match (a_neg, b_neg) {
(true, false) => Ordering::Less,
(false, true) => Ordering::Greater,
_ => a.cmp(b),
}
})
}
DataType::Timestamp(_) | DataType::LocalZonedTimestamp(_) => {
Box::new(|a: &[u8], b: &[u8]| {
let av = i64::from_le_bytes(a[..8].try_into().unwrap());
let bv = i64::from_le_bytes(b[..8].try_into().unwrap());
av.cmp(&bv)
})
}
DataType::Decimal(_) => Box::new(|a: &[u8], b: &[u8]| {
let av = i64::from_le_bytes(a[..8].try_into().unwrap());
let bv = i64::from_le_bytes(b[..8].try_into().unwrap());
av.cmp(&bv)
}),
_ => Box::new(|a: &[u8], b: &[u8]| a.cmp(b)),
}
}
pub fn serialize_datum(datum: &Datum, data_type: &DataType) -> Vec<u8> {
match datum {
Datum::Bool(v) => vec![*v as u8],
Datum::TinyInt(v) => vec![*v as u8],
Datum::SmallInt(v) => v.to_le_bytes().to_vec(),
Datum::Int(v) | Datum::Date(v) | Datum::Time(v) => v.to_le_bytes().to_vec(),
Datum::Long(v) => v.to_le_bytes().to_vec(),
Datum::Float(v) => v.to_le_bytes().to_vec(),
Datum::Double(v) => v.to_le_bytes().to_vec(),
Datum::String(v) => v.as_bytes().to_vec(),
Datum::Timestamp { millis, nanos } | Datum::LocalZonedTimestamp { millis, nanos } => {
let precision = match data_type {
DataType::Timestamp(t) => t.precision(),
DataType::LocalZonedTimestamp(t) => t.precision(),
_ => 3,
};
let mut buf = millis.to_le_bytes().to_vec();
if precision > TIMESTAMP_COMPACT_PRECISION {
encode_var_int(&mut buf, *nanos).unwrap();
}
buf
}
Datum::Decimal {
unscaled,
precision,
..
} => {
if *precision <= DECIMAL_COMPACT_PRECISION {
(*unscaled as i64).to_le_bytes().to_vec()
} else {
unscaled.to_be_bytes().to_vec()
}
}
Datum::Bytes(v) => v.clone(),
}
}