use crate::{
KeyEnDeOrdered, MapxOrd, ValueEnDe,
basic::{mapx_ord::MapxOrdIter as LargeIter, orphan::Orphan},
common::dirty_count as dc,
common::error::Result,
};
use parking_lot::Mutex;
use ruc::eg;
use serde::{Deserialize, Serialize, de};
use std::{
collections::{BTreeMap, BTreeSet, btree_set::Iter as SmallIter},
fmt,
marker::PhantomData,
ops::{Bound, Not},
};
pub trait SlotType:
Clone
+ Ord
+ fmt::Debug
+ Not<Output = Self>
+ KeyEnDeOrdered
+ Serialize
+ de::DeserializeOwned
+ 'static
{
const MIN: Self;
const MAX: Self;
fn floor_align(&self, base: &Self) -> Self;
fn checked_pow(&self, exp: u32) -> Option<Self>;
fn max_val(self, other: Self) -> Self;
fn saturating_add(&self, rhs: &Self) -> Self;
fn as_i128(&self) -> i128;
fn as_u64(&self) -> u64;
}
macro_rules! impl_slot_type {
($($t:ty),+) => { $(
impl SlotType for $t {
const MIN: Self = <$t>::MIN;
const MAX: Self = <$t>::MAX;
#[inline]
fn floor_align(&self, base: &Self) -> Self { self / base * base }
#[inline]
fn checked_pow(&self, exp: u32) -> Option<Self> { <$t>::checked_pow(*self, exp) }
#[inline]
fn max_val(self, other: Self) -> Self { Ord::max(self, other) }
#[inline]
fn saturating_add(&self, rhs: &Self) -> Self { <$t>::saturating_add(*self, *rhs) }
#[inline]
fn as_i128(&self) -> i128 { *self as i128 }
#[inline]
fn as_u64(&self) -> u64 { *self as u64 }
}
)+ };
}
impl_slot_type!(u32, u64, u128);
type EntryCnt = u64;
type SkipNum = EntryCnt;
type TakeNum = EntryCnt;
type Distance = i128;
type PageSize = u16;
type PageIndex = u32;
const INLINE_CAPACITY_THRESHOLD: usize = 8;
#[derive(Debug)]
pub struct SlotDex<S, K>
where
S: SlotType,
K: Clone + Ord + KeyEnDeOrdered,
{
data: MapxOrd<S, DataCtner<K>>,
total: Orphan<EntryCnt>,
tiers: Vec<Tier<S>>,
tier_capacity: S,
swap_order: bool,
}
impl<S, K> Serialize for SlotDex<S, K>
where
S: SlotType,
K: Clone + Ord + KeyEnDeOrdered,
{
fn serialize<Ser>(&self, serializer: Ser) -> std::result::Result<Ser::Ok, Ser::Error>
where
Ser: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(5)?;
t.serialize_element(&self.data)?;
t.serialize_element(&self.total)?;
t.serialize_element(&self.tiers)?;
t.serialize_element(&self.tier_capacity)?;
t.serialize_element(&self.swap_order)?;
t.end()
}
}
impl<'de, S, K> Deserialize<'de> for SlotDex<S, K>
where
S: SlotType,
K: Clone + Ord + KeyEnDeOrdered,
{
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Vis<S, K>(PhantomData<(S, K)>);
impl<'de, S, K> serde::de::Visitor<'de> for Vis<S, K>
where
S: SlotType,
K: Clone + Ord + KeyEnDeOrdered,
{
type Value = SlotDex<S, K>;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("SlotDex")
}
fn visit_seq<A: serde::de::SeqAccess<'de>>(
self,
mut seq: A,
) -> std::result::Result<SlotDex<S, K>, A::Error> {
let data = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
let total = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
let tiers = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
let tier_capacity = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
let swap_order = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(4, &self))?;
let mut me = SlotDex {
data,
total,
tiers,
tier_capacity,
swap_order,
};
me.ensure_count();
Ok(me)
}
}
deserializer.deserialize_tuple(5, Vis(PhantomData))
}
}
impl<S, K> SlotDex<S, K>
where
S: SlotType,
K: Clone + Ord + KeyEnDeOrdered,
{
pub fn new(tier_capacity: S, swap_order: bool) -> Self {
assert!(tier_capacity > S::MIN, "SlotDex: tier_capacity must be > 0");
Self {
data: MapxOrd::new(),
total: Orphan::new(dc::set_dirty(0)),
tiers: vec![],
tier_capacity,
swap_order,
}
}
#[inline(always)]
pub fn instance_id(&self) -> u64 {
self.data.instance_id()
}
pub fn save_meta(&mut self) -> Result<u64> {
let raw = self.total.get_value();
self.total.set_value(&dc::clear_dirty(raw));
let id = self.instance_id();
crate::common::save_instance_meta(id, self)?;
Ok(id)
}
pub fn from_meta(instance_id: u64) -> Result<Self> {
crate::common::load_instance_meta(instance_id)
}
fn ensure_count(&mut self) {
let raw = self.total.get_value();
if dc::is_dirty(raw) {
let actual: u64 = self.data.iter().map(|(_, d)| d.len() as u64).sum();
self.total.set_value(&dc::set_dirty(actual));
} else {
self.total.set_value(&dc::set_dirty(raw));
}
}
pub fn insert(&mut self, slot: S, k: K) -> Result<()> {
let slot = self.to_storage_slot(slot);
self.ensure_tier_capacity();
let mut ctner = self.data.get(&slot).unwrap_or_default();
if ctner.insert(k) {
self.data.insert(&slot, &ctner);
self.tiers.iter_mut().for_each(|t| {
t.ensure_cache();
let slot_floor = slot.floor_align(&t.floor_base);
let c = t.cache.get_mut();
let mut v = c.get(&slot_floor).copied().unwrap_or(0);
if 0 == v {
*t.entry_count.get_mut() += 1;
if let Some(l) = t.len_cache.as_mut() {
*l += 1;
}
}
v += 1;
c.insert(slot_floor.clone(), v);
t.store.insert(&slot_floor, &v);
});
let t = self.total.get_value();
self.total.set_value(&dc::inc(t));
}
Ok(())
}
pub fn remove(&mut self, slot: S, k: &K) {
let slot = self.to_storage_slot(slot);
let (exist, empty, d) = match self.data.get(&slot) {
Some(mut d) => {
let existed = d.remove(k);
(existed, d.is_empty(), d)
}
_ => {
return;
}
};
if empty {
self.data.remove(&slot);
} else if exist {
self.data.insert(&slot, &d);
}
if exist {
loop {
let dominated = self.tiers.last_mut().is_some_and(|top| {
if top.len() < 2 {
top.store.clear();
*top.entry_count.get_mut() = 0;
top.cache.get_mut().clear();
true
} else {
false
}
});
if dominated {
self.tiers.pop();
} else {
break;
}
}
self.tiers.iter_mut().for_each(|t| {
t.ensure_cache();
let slot_floor = slot.floor_align(&t.floor_base);
let c = t.cache.get_mut();
let cnt = match c.get(&slot_floor).copied() {
Some(n) => n,
None => return,
};
if 1 == cnt {
c.remove(&slot_floor);
t.store.remove(&slot_floor);
t.dec_len();
} else {
let new_cnt = cnt - 1;
c.insert(slot_floor.clone(), new_cnt);
t.store.insert(&slot_floor, &new_cnt);
}
});
let t = self.total.get_value();
self.total.set_value(&dc::dec(t));
}
}
pub fn clear(&mut self) {
self.total.set_value(&dc::zero(self.total.get_value()));
self.data.clear();
self.tiers.iter_mut().for_each(|t| {
t.store.clear();
*t.entry_count.get_mut() = 0;
t.cache.get_mut().clear();
});
self.tiers.clear();
}
pub fn get_entries_by_page(
&self,
page_size: PageSize,
page_index: PageIndex, reverse_order: bool,
) -> Vec<K> {
self.get_entries_by_page_slot(None, None, page_size, page_index, reverse_order)
}
pub fn get_entries_by_page_slot(
&self,
slot_left_bound: Option<S>, slot_right_bound: Option<S>, page_size: PageSize,
page_index: PageIndex, reverse_order: bool,
) -> Vec<K> {
let (slot_min, slot_max, storage_is_reversed) =
self.transform_range(slot_left_bound, slot_right_bound);
if slot_max < slot_min {
return vec![];
}
if 0 == page_size || 0 == self.total() {
return vec![];
}
self.get_entries(
slot_min,
slot_max,
page_size,
page_index,
reverse_order ^ storage_is_reversed,
)
}
fn slot_entry_cnt(&self, slot: &S) -> EntryCnt {
self.data
.get(slot)
.map(|d| d.len() as EntryCnt)
.unwrap_or(0)
}
fn distance_to_the_leftmost_slot(&self, slot: &S) -> Distance {
if *slot == S::MIN {
return 0;
}
let mut left_bound = S::MIN;
let mut ret = 0;
for t in self.tiers.iter().rev() {
t.ensure_cache();
let right_bound = slot.floor_align(&t.floor_base);
ret += t
.cache
.lock()
.range(left_bound.clone()..right_bound.clone())
.map(|(_, cnt)| *cnt as Distance)
.sum::<Distance>();
left_bound = right_bound
}
ret += self
.data
.range(left_bound..slot.clone())
.map(|(_, d)| d.len() as Distance)
.sum::<Distance>();
ret
}
fn offsets_from_the_leftmost_slot(
&self,
slot_start: &S, slot_end: &S, page_size: PageSize,
page_index: PageIndex,
reverse: bool,
) -> (SkipNum, TakeNum) {
if slot_start > slot_end {
return (0, 0);
}
if reverse {
let mut skip_n = self.distance_to_the_leftmost_slot(slot_end)
+ self.slot_entry_cnt(slot_end) as Distance
- (page_size as Distance) * (1 + page_index as Distance);
let distance_of_slot_start = self.distance_to_the_leftmost_slot(slot_start);
let take_n = if distance_of_slot_start <= skip_n {
page_size
} else {
let back_shift = (distance_of_slot_start.saturating_sub(skip_n))
.min(PageSize::MAX as Distance);
skip_n = distance_of_slot_start;
page_size.saturating_sub(back_shift as PageSize)
};
(skip_n as SkipNum, take_n as TakeNum)
} else {
let skip_n = self.distance_to_the_leftmost_slot(slot_start)
+ (page_size as Distance) * (page_index as Distance);
(skip_n as SkipNum, page_size as TakeNum)
}
}
fn locate_page_start(&self, global_skip_n: EntryCnt) -> (Bound<S>, SkipNum) {
let mut slot_start = Bound::Included(S::MIN);
let mut remaining: u64 = global_skip_n;
for t in self.tiers.iter().rev() {
t.ensure_cache();
let c = t.cache.lock();
let mut hdr = c.range((slot_start.clone(), Bound::Unbounded)).peekable();
while let Some(entry_cnt) = hdr.next().map(|(_, cnt)| *cnt) {
if entry_cnt > remaining {
break;
} else {
slot_start = hdr
.peek()
.map(|(s, _)| Bound::Included((*s).clone()))
.unwrap_or(Bound::Excluded(S::MAX));
remaining -= entry_cnt;
}
}
}
let mut hdr = self
.data
.range((slot_start.clone(), Bound::Unbounded))
.peekable();
while let Some(entry_cnt) = hdr.next().map(|(_, entries)| entries.len() as u64) {
if entry_cnt > remaining {
break;
} else {
slot_start = hdr
.peek()
.map(|(s, _)| Bound::Included((*s).clone()))
.unwrap_or(Bound::Excluded(S::MAX));
remaining -= entry_cnt;
}
}
(slot_start, remaining)
}
fn get_entries(
&self,
slot_start: S, slot_end: S, page_size: PageSize,
page_index: PageIndex,
reverse: bool,
) -> Vec<K> {
if slot_end < slot_start {
return vec![];
}
let (global_skip_n, take_n) = self.offsets_from_the_leftmost_slot(
&slot_start,
&slot_end,
page_size,
page_index,
reverse,
);
let mut ret = Vec::with_capacity(take_n as usize);
let (slot_start_actual, local_skip_n) = self.locate_page_start(global_skip_n);
let mut skip_n = local_skip_n as usize;
let take_n = take_n as usize;
for (_, entries) in self
.data
.range((slot_start_actual, Bound::Included(slot_end)))
{
entries
.iter()
.skip(skip_n)
.take(take_n - ret.len())
.for_each(|entry| ret.push(entry));
skip_n = 0;
if ret.len() >= take_n {
assert_eq!(ret.len(), take_n);
break;
}
}
if reverse {
ret.reverse();
}
ret
}
pub fn entry_cnt_within_two_slots(&self, slot_start: S, slot_end: S) -> EntryCnt {
let (slot_min, slot_max, _) =
self.transform_range(Some(slot_start), Some(slot_end));
if slot_min > slot_max {
0
} else {
let cnt = self.distance_to_the_leftmost_slot(&slot_max)
- self.distance_to_the_leftmost_slot(&slot_min)
+ self.slot_entry_cnt(&slot_max) as Distance;
cnt as EntryCnt
}
}
pub fn total_by_slot(&self, slot_start: Option<S>, slot_end: Option<S>) -> EntryCnt {
let slot_start = slot_start.unwrap_or(S::MIN);
let slot_end = slot_end.unwrap_or(S::MAX);
if S::MIN == slot_start && S::MAX == slot_end {
dc::count(self.total.get_value())
} else {
self.entry_cnt_within_two_slots(slot_start, slot_end)
}
}
pub fn total(&self) -> EntryCnt {
self.total_by_slot(None, None)
}
fn ensure_tier_capacity(&mut self) {
let tiers_len = self.tiers.len();
if let Some(top) = self.tiers.last_mut() {
if (top.len() as i128) <= self.tier_capacity.as_i128() {
return;
}
top.ensure_cache();
let entries: Vec<(S, EntryCnt)> = top
.cache
.get_mut()
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
let mut newtop = Tier::new(tiers_len as u32, &self.tier_capacity);
for (slot, cnt) in entries {
let slot_floor = slot.floor_align(&newtop.floor_base);
let c = newtop.cache.get_mut();
let v = c.get(&slot_floor).copied().unwrap_or(0);
if 0 == v {
*newtop.entry_count.get_mut() += 1;
if let Some(l) = newtop.len_cache.as_mut() {
*l += 1;
}
}
let new_v = v + cnt;
c.insert(slot_floor.clone(), new_v);
newtop.store.insert(&slot_floor, &new_v);
}
self.tiers.push(newtop);
} else {
let mut newtop = Tier::new(tiers_len as u32, &self.tier_capacity);
for (slot, entries) in self.data.iter() {
let slot_floor = slot.floor_align(&newtop.floor_base);
let c = newtop.cache.get_mut();
let v = c.get(&slot_floor).copied().unwrap_or(0);
if 0 == v {
*newtop.entry_count.get_mut() += 1;
if let Some(l) = newtop.len_cache.as_mut() {
*l += 1;
}
}
let new_v = v + entries.len() as EntryCnt;
c.insert(slot_floor.clone(), new_v);
newtop.store.insert(&slot_floor, &new_v);
}
self.tiers.push(newtop);
}
}
#[inline(always)]
fn to_storage_slot(&self, logical_slot: S) -> S {
if self.swap_order {
!logical_slot
} else {
logical_slot
}
}
fn transform_range(
&self,
logical_min: Option<S>,
logical_max: Option<S>,
) -> (S, S, bool) {
let min = logical_min.unwrap_or(S::MIN);
let max = logical_max.unwrap_or(S::MAX);
if self.swap_order {
(self.to_storage_slot(max), self.to_storage_slot(min), true)
} else {
(min, max, false)
}
}
}
pub type SlotDex32<K> = SlotDex<u32, K>;
pub type SlotDex64<K> = SlotDex<u64, K>;
pub type SlotDex128<K> = SlotDex<u128, K>;
enum DataCtner<K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
Small(BTreeSet<K>),
Large { map: MapxOrd<K, ()>, len: usize },
}
impl<K> fmt::Debug for DataCtner<K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Small(s) => f.debug_tuple("Small").field(&s.len()).finish(),
Self::Large { len, .. } => {
f.debug_struct("Large").field("len", len).finish()
}
}
}
}
const TAG_SMALL: u8 = 0;
const TAG_LARGE: u8 = 1;
impl<K> ValueEnDe for DataCtner<K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
fn try_encode(&self) -> ruc::Result<Vec<u8>> {
match self {
Self::Small(set) => {
let mut buf = vec![TAG_SMALL];
let count = set.len() as u32;
buf.extend_from_slice(&count.to_le_bytes());
for k in set {
let kb = k.to_bytes();
buf.extend_from_slice(&(kb.len() as u32).to_le_bytes());
buf.extend_from_slice(&kb);
}
Ok(buf)
}
Self::Large { map, len } => {
let mut buf = vec![TAG_LARGE];
let handle_bytes = map.encode();
buf.extend_from_slice(&(handle_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(&handle_bytes);
buf.extend_from_slice(&(*len as u64).to_le_bytes());
Ok(buf)
}
}
}
fn decode(bytes: &[u8]) -> ruc::Result<Self> {
if bytes.is_empty() {
return Err(eg!("empty DataCtner bytes"));
}
match bytes[0] {
TAG_SMALL => {
let mut off = 1;
if bytes.len() < off + 4 {
return Err(eg!("truncated count"));
}
let count =
u32::from_le_bytes(bytes[off..off + 4].try_into().unwrap()) as usize;
off += 4;
let mut set = BTreeSet::new();
for _ in 0..count {
if bytes.len() < off + 4 {
return Err(eg!("truncated key len"));
}
let klen =
u32::from_le_bytes(bytes[off..off + 4].try_into().unwrap())
as usize;
off += 4;
if bytes.len() < off + klen {
return Err(eg!("truncated key data"));
}
let k =
K::from_slice(&bytes[off..off + klen]).map_err(|e| eg!(e))?;
off += klen;
set.insert(k);
}
Ok(Self::Small(set))
}
TAG_LARGE => {
let mut off = 1;
if bytes.len() < off + 4 {
return Err(eg!("truncated handle len"));
}
let hlen =
u32::from_le_bytes(bytes[off..off + 4].try_into().unwrap()) as usize;
off += 4;
if bytes.len() < off + hlen {
return Err(eg!("truncated handle data"));
}
let map =
MapxOrd::decode(&bytes[off..off + hlen]).map_err(|e| eg!(e))?;
off += hlen;
if bytes.len() < off + 8 {
return Err(eg!("truncated len"));
}
let len =
u64::from_le_bytes(bytes[off..off + 8].try_into().unwrap()) as usize;
Ok(Self::Large { map, len })
}
_ => Err(eg!("unknown DataCtner tag")),
}
}
}
impl<K> DataCtner<K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
fn new() -> Self {
Self::Small(BTreeSet::new())
}
fn len(&self) -> usize {
match self {
Self::Small(i) => i.len(),
Self::Large { len, .. } => *len,
}
}
fn is_empty(&self) -> bool {
0 == self.len()
}
fn try_upgrade(&mut self) {
let inner_set = match self {
Self::Small(set) if set.len() >= INLINE_CAPACITY_THRESHOLD => set,
_ => return,
};
let set_len = inner_set.len();
let new_map = inner_set.iter().fold(MapxOrd::new(), |mut acc, k| {
acc.insert(k, &());
acc
});
*self = Self::Large {
map: new_map,
len: set_len,
};
}
fn insert(&mut self, k: K) -> bool {
match self {
Self::Small(set) => {
if set.len() >= INLINE_CAPACITY_THRESHOLD && !set.contains(&k) {
self.try_upgrade();
return self.insert(k);
}
set.insert(k)
}
Self::Large { map, len } => {
let existed = map.get(&k).is_some();
map.insert(&k, &());
if !existed {
*len += 1;
}
!existed
}
}
}
fn remove(&mut self, target: &K) -> bool {
match self {
Self::Small(i) => i.remove(target),
Self::Large { map, len } => {
let existed = map.get(target).is_some();
if existed {
map.remove(target);
*len -= 1;
}
existed
}
}
}
fn iter(&self) -> DataCtnerIter<'_, K> {
match self {
Self::Small(i) => DataCtnerIter::Small(i.iter()),
Self::Large { map, .. } => DataCtnerIter::Large(Box::new(map.iter())),
}
}
}
impl<K> Default for DataCtner<K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
fn default() -> Self {
Self::new()
}
}
enum DataCtnerIter<'a, K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
Small(SmallIter<'a, K>),
Large(Box<LargeIter<'a, K, ()>>),
}
impl<K> Iterator for DataCtnerIter<'_, K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
type Item = K;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Small(i) => i.next().cloned(),
Self::Large(i) => i.next().map(|j| j.0),
}
}
}
impl<K> DoubleEndedIterator for DataCtnerIter<'_, K>
where
K: Clone + Ord + KeyEnDeOrdered,
{
fn next_back(&mut self) -> Option<Self::Item> {
match self {
Self::Small(i) => i.next_back().cloned(),
Self::Large(i) => i.next_back().map(|j| j.0),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(bound = "S: SlotType + Serialize + de::DeserializeOwned")]
struct Tier<S: SlotType> {
floor_base: S,
store: MapxOrd<S, EntryCnt>,
#[serde(skip)]
cache: Mutex<BTreeMap<S, EntryCnt>>,
entry_count: Orphan<usize>,
#[serde(skip)]
len_cache: Option<usize>,
}
impl<S: SlotType> Tier<S> {
fn new(tier_idx: u32, tier_capacity: &S) -> Self {
let pow = 1 + tier_idx;
Self {
floor_base: tier_capacity
.checked_pow(pow)
.filter(|v| *v != S::MIN)
.unwrap_or(S::MAX),
store: MapxOrd::new(),
cache: Mutex::new(BTreeMap::new()),
entry_count: Orphan::new(0),
len_cache: Some(0),
}
}
fn ensure_cache(&self) {
let mut c = self.cache.lock();
if c.is_empty() && self.entry_count.get_value() > 0 {
for (k, v) in self.store.iter() {
c.insert(k, v);
}
}
}
#[inline(always)]
fn len(&mut self) -> usize {
if let Some(l) = self.len_cache {
l
} else {
let l = self.entry_count.get_value();
self.len_cache = Some(l);
l
}
}
fn dec_len(&mut self) {
*self.entry_count.get_mut() -= 1;
if let Some(l) = self.len_cache.as_mut() {
*l -= 1;
}
}
}
fn _assert_send_sync() {
fn require<T: Send + Sync>() {}
require::<SlotDex<u64, u64>>();
}
#[cfg(test)]
mod test;