use crossbeam_epoch::{Atomic, Guard, Owned, Shared, pin};
use dashmap::DashMap;
use std::fmt;
use std::hash::Hash;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[derive(Debug)]
pub(crate) struct Node<K> {
key: Option<K>,
size: usize,
prev: Atomic<Node<K>>,
next: Atomic<Node<K>>,
}
pub(crate) struct LinkedList<K> {
head: Atomic<Node<K>>,
tail: Atomic<Node<K>>,
len: AtomicUsize,
}
impl<K> fmt::Debug for LinkedList<K> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LinkedList")
.field("len", &self.len.load(Ordering::Relaxed))
.finish()
}
}
impl<K: Eq> LinkedList<K> {
pub(crate) fn new() -> Self {
let guard = &pin();
let tail = Owned::new(Node {
key: None, size: 0,
prev: Atomic::null(),
next: Atomic::null(),
})
.into_shared(guard);
let head = Owned::new(Node {
key: None, size: 0,
prev: Atomic::null(),
next: Atomic::from(tail),
})
.into_shared(guard);
unsafe { tail.deref().prev.store(head, Ordering::Relaxed) };
Self {
head: Atomic::from(head),
tail: Atomic::from(tail),
len: AtomicUsize::new(0),
}
}
pub(crate) fn push_front<'guard>(
&self,
key: K,
size: usize,
guard: &'guard Guard,
) -> Shared<'guard, Node<K>> {
let new_node = Owned::new(Node {
key: Some(key),
size,
prev: Atomic::null(),
next: Atomic::null(),
})
.into_shared(guard);
loop {
let head = self.head.load(Ordering::Acquire, guard);
let first = unsafe { head.deref().next.load(Ordering::Acquire, guard) };
unsafe { new_node.deref().prev.store(head, Ordering::Relaxed) };
unsafe { new_node.deref().next.store(first, Ordering::Relaxed) };
if unsafe {
head.deref().next.compare_exchange(
first,
new_node,
Ordering::AcqRel,
Ordering::Acquire,
guard,
)
}
.is_ok()
{
unsafe { first.deref().prev.store(new_node, Ordering::Release) };
self.len.fetch_add(1, Ordering::Relaxed);
return new_node;
}
}
}
pub(crate) fn pop_back<'guard>(&self, guard: &'guard Guard) -> Option<Shared<'guard, Node<K>>> {
loop {
let tail = self.tail.load(Ordering::Acquire, guard);
let last = unsafe { tail.deref().prev.load(Ordering::Acquire, guard) };
if last == self.head.load(Ordering::Relaxed, guard) {
return None; }
if self.unlink(last, guard).is_ok() {
return Some(last);
}
}
}
pub(crate) fn unlink<'guard>(
&self,
node: Shared<'guard, Node<K>>,
guard: &'guard Guard,
) -> Result<(), ()> {
let prev = unsafe { node.deref().prev.load(Ordering::Acquire, guard) };
let next = unsafe { node.deref().next.load(Ordering::Acquire, guard) };
if unsafe {
prev.deref().next.compare_exchange(
node,
next,
Ordering::AcqRel,
Ordering::Acquire,
guard,
)
}
.is_err()
{
return Err(()); }
unsafe {
next.deref().prev.compare_exchange(
node,
prev,
Ordering::AcqRel,
Ordering::Acquire,
guard,
)
}
.ok();
self.len.fetch_sub(1, Ordering::Relaxed);
Ok(())
}
pub(crate) fn move_to_front<'guard>(&self, node: Shared<'guard, Node<K>>, guard: &'guard Guard)
where
K: Clone,
{
if self.unlink(node, guard).is_ok() {
let key = unsafe { node.deref().key.as_ref().unwrap().clone() };
let size = unsafe { node.deref().size };
unsafe { guard.defer_destroy(node) };
self.push_front(key, size, guard);
}
}
pub(crate) fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
}
impl<K> Drop for LinkedList<K> {
fn drop(&mut self) {
let guard = &pin();
let head = self.head.load(Ordering::Relaxed, guard);
let tail = self.tail.load(Ordering::Relaxed, guard);
unsafe {
guard.defer_destroy(head);
guard.defer_destroy(tail);
}
}
}
#[derive(Debug)]
pub struct ArcManager<K: Hash + Eq + Clone + 'static> {
p: AtomicU64,
capacity: u64,
t1: LinkedList<K>,
t1_size: AtomicU64,
t2: LinkedList<K>,
t2_size: AtomicU64,
b1: LinkedList<K>,
b2: LinkedList<K>,
t1_map: DashMap<K, Shared<'static, Node<K>>>,
t2_map: DashMap<K, Shared<'static, Node<K>>>,
b1_map: DashMap<K, Shared<'static, Node<K>>>,
b2_map: DashMap<K, Shared<'static, Node<K>>>,
}
impl<K: Hash + Eq + Clone + 'static> ArcManager<K> {
pub fn new(capacity: u64) -> Self {
Self {
p: AtomicU64::new(0),
capacity,
t1: LinkedList::new(),
t1_size: AtomicU64::new(0),
t2: LinkedList::new(),
t2_size: AtomicU64::new(0),
b1: LinkedList::new(),
b2: LinkedList::new(),
t1_map: DashMap::new(),
t2_map: DashMap::new(),
b1_map: DashMap::new(),
b2_map: DashMap::new(),
}
}
pub fn hit(&self, key: &K) {
let guard = &pin();
if let Some(entry) = self.t1_map.remove(key) {
let node_ptr = entry.1;
if self.t1.unlink(node_ptr, guard).is_ok() {
let size = unsafe { node_ptr.deref().size };
let key_clone = unsafe { node_ptr.deref().key.as_ref().unwrap().clone() };
unsafe { guard.defer_destroy(node_ptr) };
self.t1_size.fetch_sub(size as u64, Ordering::Relaxed);
let new_node_ptr = self.t2.push_front(key_clone.clone(), size, guard);
let static_ptr = unsafe { std::mem::transmute(new_node_ptr) };
self.t2_map.insert(key_clone, static_ptr);
self.t2_size.fetch_add(size as u64, Ordering::Relaxed);
}
} else if let Some(entry) = self.t2_map.get(key) {
let node_ptr = *entry.value();
self.t2.move_to_front(node_ptr, guard);
}
}
pub fn miss(&self, key: K, size: usize) {
let guard = &pin();
if let Some(entry) = self.b1_map.remove(&key) {
let b1_len = self.b1.len() as u64;
let b2_len = self.b2.len() as u64;
let delta = if b1_len == 0 {
0
} else if b1_len >= b2_len {
1
} else {
b2_len / b1_len
};
let p = self.p.load(Ordering::Relaxed);
self.p.store(
p.saturating_add(delta).min(self.capacity),
Ordering::Relaxed,
);
let node_ptr = entry.1;
if self.b1.unlink(node_ptr, guard).is_ok() {
let key_clone = unsafe { node_ptr.deref().key.as_ref().unwrap().clone() };
unsafe { guard.defer_destroy(node_ptr) };
let new_node_ptr = self.t2.push_front(key_clone.clone(), size, guard);
let static_ptr = unsafe { std::mem::transmute(new_node_ptr) };
self.t2_map.insert(key_clone, static_ptr);
self.t2_size.fetch_add(size as u64, Ordering::Relaxed);
}
} else if let Some(entry) = self.b2_map.remove(&key) {
let b1_len = self.b1.len() as u64;
let b2_len = self.b2.len() as u64;
let delta = if b2_len == 0 {
0
} else if b2_len >= b1_len {
1
} else {
b1_len / b2_len
};
let p = self.p.load(Ordering::Relaxed);
self.p.store(p.saturating_sub(delta), Ordering::Relaxed);
let node_ptr = entry.1;
if self.b2.unlink(node_ptr, guard).is_ok() {
let key_clone = unsafe { node_ptr.deref().key.as_ref().unwrap().clone() };
unsafe { guard.defer_destroy(node_ptr) };
let new_node_ptr = self.t2.push_front(key_clone.clone(), size, guard);
let static_ptr = unsafe { std::mem::transmute(new_node_ptr) };
self.t2_map.insert(key_clone, static_ptr);
self.t2_size.fetch_add(size as u64, Ordering::Relaxed);
}
}
else {
let new_node_ptr = self.t1.push_front(key.clone(), size, guard);
let static_ptr = unsafe { std::mem::transmute(new_node_ptr) };
self.t1_map.insert(key, static_ptr);
self.t1_size.fetch_add(size as u64, Ordering::Relaxed);
}
}
pub fn find_victim(&self) -> Option<K> {
let guard = &pin();
let t1_size = self.t1_size.load(Ordering::Relaxed);
let t2_size = self.t2_size.load(Ordering::Relaxed);
if t1_size + t2_size < self.capacity {
return None; }
let p = self.p.load(Ordering::Relaxed);
if self.t1.len() > 0 && (t1_size >= p || self.t2.len() == 0) {
self.t1.pop_back(guard).map(|evicted_node| {
let evicted_key = unsafe { evicted_node.deref().key.as_ref().unwrap().clone() };
let evicted_size = unsafe { evicted_node.deref().size };
self.t1_size
.fetch_sub(evicted_size as u64, Ordering::Relaxed);
self.t1_map.remove(&evicted_key);
let ghost_node_ptr = self.b1.push_front(evicted_key.clone(), 0, guard);
let static_ptr = unsafe { std::mem::transmute(ghost_node_ptr) };
self.b1_map.insert(evicted_key.clone(), static_ptr);
if self.b1.len() > self.t2.len() {
if let Some(b1_evicted) = self.b1.pop_back(guard) {
let key = unsafe { b1_evicted.deref().key.as_ref().unwrap() };
self.b1_map.remove(key);
unsafe { guard.defer_destroy(b1_evicted) };
}
}
unsafe { guard.defer_destroy(evicted_node) };
evicted_key
})
} else if self.t2.len() > 0 {
self.t2.pop_back(guard).map(|evicted_node| {
let evicted_key = unsafe { evicted_node.deref().key.as_ref().unwrap().clone() };
let evicted_size = unsafe { evicted_node.deref().size };
self.t2_size
.fetch_sub(evicted_size as u64, Ordering::Relaxed);
self.t2_map.remove(&evicted_key);
let ghost_node_ptr = self.b2.push_front(evicted_key.clone(), 0, guard);
let static_ptr = unsafe { std::mem::transmute(ghost_node_ptr) };
self.b2_map.insert(evicted_key.clone(), static_ptr);
if self.b2.len() > self.t1.len() {
if let Some(b2_evicted) = self.b2.pop_back(guard) {
let key = unsafe { b2_evicted.deref().key.as_ref().unwrap() };
self.b2_map.remove(key);
unsafe { guard.defer_destroy(b2_evicted) };
}
}
unsafe { guard.defer_destroy(evicted_node) };
evicted_key
})
} else {
None
}
}
}
impl<K: Hash + Eq + Clone + 'static> Drop for ArcManager<K> {
fn drop(&mut self) {
}
}
unsafe impl<K: Hash + Eq + Clone + Send + 'static> Send for ArcManager<K> {}
unsafe impl<K: Hash + Eq + Clone + Send + Sync + 'static> Sync for ArcManager<K> {}