use std::{
borrow::Borrow,
ops::{Range, RangeBounds},
sync::Arc,
};
use dashmap::DashMap;
use parking_lot::RwLock;
use skiplist::ordered_skiplist::OrderedSkipList;
use crate::{Api, Key, Member, Score, score_member::ScoreMember};
pub struct Zset<K, M, S>
where
M: Member<K>,
K: Key,
S: Score,
{
map: DashMap<K, ScoreMember<K, M, S>>,
list: RwLock<OrderedSkipList<ScoreMember<K, M, S>>>,
}
fn resolve_range(range: impl RangeBounds<usize>, len: usize) -> Range<usize> {
let start = match range.start_bound() {
std::ops::Bound::Included(&s) => s,
std::ops::Bound::Excluded(&s) => s + 1,
std::ops::Bound::Unbounded => 0,
};
let end = match range.end_bound() {
std::ops::Bound::Included(&e) => e + 1,
std::ops::Bound::Excluded(&e) => e,
std::ops::Bound::Unbounded => len,
};
start.min(len)..end.min(len)
}
macro_rules! with_range_iter {
($self:expr, $range:expr, $closure:expr) => {{
let list = $self.list.read();
let len = list.len();
let range = resolve_range($range, len);
let iter = list.index_range(range);
$closure(iter)
}};
}
impl<K, M, S> Zset<K, M, S>
where
K: Key,
M: Member<K>,
S: Score,
{
pub fn new() -> Self {
Self {
map: DashMap::new(),
list: RwLock::new(OrderedSkipList::new()),
}
}
fn with_item_by_rank<F, R>(&self, rank: usize, f: F) -> Option<R>
where
F: FnOnce(&ScoreMember<K, M, S>) -> R,
{
self.list.read().get(rank).map(f)
}
}
impl<K, M, S> Default for Zset<K, M, S>
where
K: Key,
M: Member<K>,
S: Score,
{
fn default() -> Self {
Self::new()
}
}
impl<K, M, S> Api<M, S> for Zset<K, M, S>
where
K: Key,
M: Member<K>,
S: Score,
{
type K = K;
type Item = crate::ArcM<K, M>;
fn add(&self, member: impl Into<Self::Item>, score: S) -> bool {
let member = member.into();
let key: &Self::K = (*member).borrow();
let key = key.clone();
let mut list = self.list.write();
{
if let Some(mut item) = self.map.get_mut(&key) {
if item.score == score {
return false; }
list.remove(&*item);
item.score = score.clone();
let new_item = item.clone();
list.insert(new_item);
return true;
}
}
let sm = ScoreMember { score, member };
self.map.insert(key.clone(), sm.clone());
list.insert(sm);
false
}
fn rm(&self, member: impl Borrow<Self::K>) -> bool {
if let Some((_, sm)) = self.map.remove(member.borrow()) {
let mut list = self.list.write();
list.remove(&sm).is_some()
} else {
false
}
}
fn rm_range_by_rank(&self, range: impl RangeBounds<usize>) -> usize {
let mut list = self.list.write();
let len = list.len();
let range = resolve_range(range, len);
if range.is_empty() {
return 0;
}
let mut removed_count = 0;
for i in (range.start..range.end).rev() {
let item = list.remove_index(i);
self.map.remove(item.member.borrow());
removed_count += 1;
}
removed_count
}
fn score(&self, member: impl Borrow<Self::K>) -> Option<S> {
self.map.get(member.borrow()).map(|r| r.score.clone())
}
fn rank(&self, member: impl Borrow<Self::K>) -> Option<usize> {
self
.map
.get(member.borrow())
.and_then(|r| self.list.read().index_of(r.value()))
}
fn len(&self) -> usize {
self.map.len()
}
fn is_empty(&self) -> bool {
self.map.is_empty()
}
fn contains(&self, member: impl Borrow<Self::K>) -> bool {
self.map.contains_key(member.borrow())
}
fn range(&self, range: impl RangeBounds<usize>) -> Vec<Arc<M>> {
with_range_iter!(self, range, |iter: skiplist::ordered_skiplist::Iter<
ScoreMember<K, M, S>,
>| {
iter.map(|item| item.member.inner.clone()).collect()
})
}
fn range_with_scores(&self, range: impl RangeBounds<usize>) -> Vec<(Arc<M>, S)> {
with_range_iter!(self, range, |iter: skiplist::ordered_skiplist::Iter<
ScoreMember<K, M, S>,
>| {
iter
.map(|item| (item.member.inner.clone(), item.score.clone()))
.collect()
})
}
fn get(&self, rank: usize) -> Option<Arc<M>> {
self.with_item_by_rank(rank, |item| item.member.inner.clone())
}
fn get_with_score(&self, rank: usize) -> Option<(Arc<M>, S)> {
self.with_item_by_rank(rank, |item| (item.member.inner.clone(), item.score.clone()))
}
}
use std::ops::{Add, BitAnd, BitAndAssign, BitOr, BitOrAssign};
impl<'b, K, M, S> BitOr<&'b Zset<K, M, S>> for &Zset<K, M, S>
where
K: Key,
M: Member<K>,
S: Score + Add<Output = S>,
{
type Output = Zset<K, M, S>;
fn bitor(self, rhs: &'b Zset<K, M, S>) -> Self::Output {
let (smaller, larger) = if self.len() < rhs.len() {
(self, rhs)
} else {
(rhs, self)
};
let mut result = Zset::new();
for (member, score) in larger.range_with_scores(..) {
result.add(member, score);
}
result |= smaller;
result
}
}
#[allow(clippy::suspicious_op_assign_impl)]
impl<'a, K, M, S> BitOrAssign<&'a Zset<K, M, S>> for Zset<K, M, S>
where
K: Key,
M: Member<K>,
S: Score + Add<Output = S>,
{
fn bitor_assign(&mut self, rhs: &'a Zset<K, M, S>) {
for (member, score) in rhs.range_with_scores(0..rhs.len()) {
let new_score = self
.score(member.as_ref().borrow())
.map_or_else(|| score.clone(), |s| s + score.clone());
self.add(member, new_score);
}
}
}
impl<'b, K, M, S> BitAnd<&'b Zset<K, M, S>> for &Zset<K, M, S>
where
K: Key,
M: Member<K>,
S: Score + Add<Output = S>,
{
type Output = Zset<K, M, S>;
fn bitand(self, rhs: &'b Zset<K, M, S>) -> Self::Output {
let new_zset = Zset::new();
let (smaller, larger) = if self.len() <= rhs.len() {
(self, rhs)
} else {
(rhs, self)
};
for (member, score1) in smaller.range_with_scores(0..smaller.len()) {
if let Some(score2) = larger.score(member.as_ref().borrow()) {
new_zset.add(member, score1 + score2);
}
}
new_zset
}
}
#[allow(clippy::suspicious_op_assign_impl)]
impl<'a, K, M, S> BitAndAssign<&'a Zset<K, M, S>> for Zset<K, M, S>
where
K: Key,
M: Member<K>,
S: Score + Add<Output = S>,
{
fn bitand_assign(&mut self, rhs: &'a Zset<K, M, S>) {
let to_remove: Vec<K> = self
.map
.iter()
.filter(|item| rhs.score(item.key()).is_none())
.map(|item| item.key().clone())
.collect();
for k in to_remove {
self.rm(&k);
}
for mut item in self.map.iter_mut() {
if let Some(score2) = rhs.score(item.key()) {
let mut list = self.list.write();
list.remove(&*item);
item.score = item.score.clone() + score2;
list.insert(item.clone());
}
}
}
}