pub use kevy_bytes::SmallBytes;
use kevy_map::{KevyMap, KevySet};
use std::cmp::Ordering;
use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
pub type HashData = KevyMap<SmallBytes, Vec<u8>>;
pub type ListData = VecDeque<Vec<u8>>;
pub type SetData = KevySet<SmallBytes>;
#[derive(Clone, Copy, PartialEq)]
pub struct Score(pub f64);
impl Eq for Score {}
impl Ord for Score {
fn cmp(&self, other: &Self) -> Ordering {
self.0.total_cmp(&other.0)
}
}
impl PartialOrd for Score {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub struct ScoreBound {
pub value: f64,
pub exclusive: bool,
}
impl ScoreBound {
pub(crate) fn ge_ok(&self, s: f64) -> bool {
if self.exclusive {
s > self.value
} else {
s >= self.value
}
}
pub(crate) fn le_ok(&self, s: f64) -> bool {
if self.exclusive {
s < self.value
} else {
s <= self.value
}
}
}
#[derive(Default, Clone)]
pub struct ZSetData {
pub(crate) by_member: KevyMap<SmallBytes, f64>,
pub(crate) by_score: BTreeSet<(Score, Vec<u8>)>,
}
impl ZSetData {
pub(crate) fn insert(&mut self, member: &[u8], score: f64) -> bool {
let is_new = match self.by_member.insert(SmallBytes::from_slice(member), score) {
Some(old) => {
self.by_score.remove(&(Score(old), member.to_vec()));
false
}
None => true,
};
self.by_score.insert((Score(score), member.to_vec()));
is_new
}
pub(crate) fn remove(&mut self, member: &[u8]) -> bool {
match self.by_member.remove(member) {
Some(old) => {
self.by_score.remove(&(Score(old), member.to_vec()));
true
}
None => false,
}
}
pub(crate) fn len(&self) -> usize {
self.by_member.len()
}
pub fn ordered(&self) -> impl Iterator<Item = (&[u8], f64)> {
self.by_score.iter().map(|(s, m)| (m.as_slice(), s.0))
}
}
#[derive(Clone)]
pub enum Value {
Str(SmallBytes),
Int(i64),
ArcBulk(Arc<[u8]>),
Hash(Arc<HashData>),
List(Arc<ListData>),
Set(Arc<SetData>),
ZSet(Arc<ZSetData>),
Stream(Arc<crate::stream::StreamData>),
SmallSetInline(crate::small_set::SmallSetData),
SmallHashInline(crate::small_hash::SmallHashData),
SmallListInline(crate::small_list::SmallListData),
SmallZSetInline(crate::small_zset::SmallZSetData),
}
pub const BULK_THRESHOLD: usize = 64;
const _: () = {
assert!(std::mem::size_of::<Value>() <= 32);
};
pub const HEAP_HEAVY_BYTES: usize = 4 * 1024;
pub type BioDropSender = std::sync::mpsc::Sender<Vec<Box<Value>>>;
impl Value {
pub fn type_name(&self) -> &'static str {
match self {
Value::Str(_) | Value::Int(_) | Value::ArcBulk(_) => "string",
Value::Hash(_) | Value::SmallHashInline(_) => "hash",
Value::List(_) | Value::SmallListInline(_) => "list",
Value::Set(_) | Value::SmallSetInline(_) => "set",
Value::ZSet(_) | Value::SmallZSetInline(_) => "zset",
Value::Stream(_) => "stream",
}
}
pub fn weight(&self) -> u64 {
match self {
Value::Str(s) => s.heap_bytes() as u64,
Value::Int(_) => 0,
Value::ArcBulk(a) => a.len() as u64,
Value::Hash(h) => collection_overhead(h.capacity(), HASH_SLOT_BYTES) + h
.iter()
.map(|(f, v)| f.heap_bytes() as u64 + v.capacity() as u64)
.sum::<u64>(),
Value::List(l) => (l.capacity() as u64).saturating_mul(LIST_SLOT_BYTES)
+ l.iter().map(|v| v.capacity() as u64).sum::<u64>(),
Value::Set(s) => collection_overhead(s.capacity(), SET_SLOT_BYTES) + s
.iter()
.map(|m| m.heap_bytes() as u64)
.sum::<u64>(),
Value::SmallSetInline(_)
| Value::SmallHashInline(_)
| Value::SmallListInline(_)
| Value::SmallZSetInline(_) => 0,
Value::ZSet(z) => collection_overhead(z.by_member.capacity(), HASH_SLOT_BYTES)
+ z.by_member
.iter()
.map(|(m, _)| m.heap_bytes() as u64)
.sum::<u64>()
+ (z.by_score.len() as u64).saturating_mul(BTREE_SLOT_BYTES),
Value::Stream(s) => s.weight(),
}
}
#[inline]
pub fn is_heap_heavy(&self) -> bool {
match self {
Value::Str(_)
| Value::Int(_)
| Value::SmallSetInline(_)
| Value::SmallHashInline(_)
| Value::SmallListInline(_)
| Value::SmallZSetInline(_) => false,
Value::ArcBulk(a) => a.len() >= HEAP_HEAVY_BYTES,
Value::Hash(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
Value::List(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
Value::Set(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
Value::ZSet(a) => {
std::sync::Arc::strong_count(a) == 1 && a.by_member.len() > 0
}
Value::Stream(a) => {
std::sync::Arc::strong_count(a) == 1 && a.length() > 0
}
}
}
}
const _: fn() = || {
fn assert_send<T: Send>() {}
assert_send::<Value>();
};
pub(crate) const HASH_SLOT_BYTES: u64 = 32;
pub(crate) const SET_SLOT_BYTES: u64 = 24;
pub(crate) const LIST_SLOT_BYTES: u64 = 24;
pub(crate) const BTREE_SLOT_BYTES: u64 = 40;
pub const ENTRY_OVERHEAD: u64 = 96;
#[inline]
fn collection_overhead(capacity: usize, per_slot: u64) -> u64 {
(capacity as u64).saturating_mul(per_slot)
}
#[inline]
pub fn hash_field_weight(field: &SmallBytes, value_cap: usize) -> u64 {
field.heap_bytes() as u64 + value_cap as u64 + HASH_SLOT_BYTES
}
#[inline]
pub fn set_member_weight(member: &SmallBytes) -> u64 {
member.heap_bytes() as u64 + SET_SLOT_BYTES
}
#[inline]
pub fn list_item_weight(value_cap: usize) -> u64 {
LIST_SLOT_BYTES + value_cap as u64
}
#[inline]
pub fn zset_member_weight(member: &SmallBytes) -> u64 {
member.heap_bytes() as u64 + HASH_SLOT_BYTES + BTREE_SLOT_BYTES
}