use crate::serde::{Deserializable, DeserializeError, Serializable, SerializeError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
cmp::Reverse,
io::{Read, Write},
sync::Arc,
};
pub type UserKey = Arc<[u8]>;
pub type UserValue = Arc<[u8]>;
pub type SeqNo = u64;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[allow(clippy::module_name_repetitions)]
pub enum ValueType {
Value,
Tombstone,
}
impl From<u8> for ValueType {
fn from(value: u8) -> Self {
match value {
0 => Self::Value,
_ => Self::Tombstone,
}
}
}
impl From<ValueType> for u8 {
fn from(value: ValueType) -> Self {
match value {
ValueType::Value => 0,
ValueType::Tombstone => 1,
}
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct ParsedInternalKey {
pub user_key: UserKey,
pub seqno: SeqNo,
pub value_type: ValueType,
}
impl std::fmt::Debug for ParsedInternalKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?}:{}:{}",
self.user_key,
self.seqno,
u8::from(self.value_type)
)
}
}
impl ParsedInternalKey {
pub fn new<K: Into<UserKey>>(user_key: K, seqno: SeqNo, value_type: ValueType) -> Self {
Self {
user_key: user_key.into(),
seqno,
value_type,
}
}
pub fn is_tombstone(&self) -> bool {
self.value_type == ValueType::Tombstone
}
}
impl PartialOrd for ParsedInternalKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ParsedInternalKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(&self.user_key, Reverse(self.seqno)).cmp(&(&other.user_key, Reverse(other.seqno)))
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct Value {
pub key: UserKey,
pub value: UserValue,
pub seqno: SeqNo,
pub value_type: ValueType,
}
impl std::fmt::Debug for Value {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?}:{}:{} => {:?}",
self.key,
self.seqno,
match self.value_type {
ValueType::Value => "V",
ValueType::Tombstone => "T",
},
self.value
)
}
}
impl From<(ParsedInternalKey, UserValue)> for Value {
fn from(val: (ParsedInternalKey, UserValue)) -> Self {
let key = val.0;
Self {
key: key.user_key,
seqno: key.seqno,
value_type: key.value_type,
value: val.1,
}
}
}
impl PartialOrd for Value {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Value {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(&self.key, Reverse(self.seqno)).cmp(&(&other.key, Reverse(other.seqno)))
}
}
impl Value {
pub fn new<K: Into<UserKey>, V: Into<UserValue>>(
key: K,
value: V,
seqno: u64,
value_type: ValueType,
) -> Self {
let k = key.into();
let v = value.into();
assert!(!k.is_empty());
assert!(k.len() <= u16::MAX.into());
assert!(u32::try_from(v.len()).is_ok());
Self {
key: k,
value: v,
value_type,
seqno,
}
}
#[doc(hidden)]
#[must_use]
pub fn size(&self) -> usize {
let key_size = self.key.len();
let value_size = self.value.len();
std::mem::size_of::<Self>() + key_size + value_size
}
#[doc(hidden)]
#[must_use]
pub fn is_tombstone(&self) -> bool {
self.value_type == ValueType::Tombstone
}
}
impl From<Value> for ParsedInternalKey {
fn from(val: Value) -> Self {
Self {
user_key: val.key,
seqno: val.seqno,
value_type: val.value_type,
}
}
}
impl Serializable for Value {
fn serialize<W: Write>(&self, writer: &mut W) -> Result<(), SerializeError> {
writer.write_u64::<BigEndian>(self.seqno)?;
writer.write_u8(u8::from(self.value_type))?;
#[allow(clippy::cast_possible_truncation)]
writer.write_u16::<BigEndian>(self.key.len() as u16)?;
writer.write_all(&self.key)?;
#[allow(clippy::cast_possible_truncation)]
writer.write_u32::<BigEndian>(self.value.len() as u32)?;
writer.write_all(&self.value)?;
Ok(())
}
}
impl Deserializable for Value {
fn deserialize<R: Read>(reader: &mut R) -> Result<Self, DeserializeError> {
let seqno = reader.read_u64::<BigEndian>()?;
let value_type = reader.read_u8()?.into();
let key_len = reader.read_u16::<BigEndian>()?;
let mut key = vec![0; key_len.into()];
reader.read_exact(&mut key)?;
let value_len = reader.read_u32::<BigEndian>()?;
let mut value = vec![0; value_len as usize];
reader.read_exact(&mut value)?;
Ok(Self::new(key, value, seqno, value_type))
}
}
#[cfg(test)]
mod tests {
use super::*;
use test_log::test;
#[test]
fn test_empty_value() -> crate::Result<()> {
let value = Value::new(vec![1, 2, 3], vec![], 42, ValueType::Value);
let mut serialized = Vec::new();
value.serialize(&mut serialized)?;
let deserialized = Value::deserialize(&mut &serialized[..])?;
assert_eq!(value, deserialized);
Ok(())
}
}