use std::{
mem::offset_of,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use foyer_common::{
code::{Key, Value},
error::Result,
properties::Properties,
};
use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
use serde::{Deserialize, Serialize};
use super::{Eviction, Op};
use crate::record::Record;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SieveConfig;
#[derive(Debug, Default)]
pub struct SieveState {
link: LinkedListAtomicLink,
visited: AtomicBool,
}
impl SieveState {
#[inline]
fn is_visited(&self) -> bool {
self.visited.load(Ordering::Relaxed)
}
fn set_visited(&self, visited: bool) {
self.visited.store(visited, Ordering::Relaxed);
}
}
intrusive_adapter! {
Adapter<K, V, P> = Arc<Record<Sieve<K, V, P>>>: Record<Sieve<K, V, P>> {
?offset = Record::<Sieve<K, V, P>>::STATE_OFFSET + offset_of!(SieveState, link) => LinkedListAtomicLink
}
where K: Key, V: Value, P: Properties
}
pub struct Sieve<K, V, P>
where
K: Key,
V: Value,
P: Properties,
{
queue: LinkedList<Adapter<K, V, P>>,
hand: Option<Arc<Record<Sieve<K, V, P>>>>,
}
impl<K, V, P> Eviction for Sieve<K, V, P>
where
K: Key,
V: Value,
P: Properties,
{
type Config = SieveConfig;
type Key = K;
type Value = V;
type Properties = P;
type State = SieveState;
fn new(_capacity: usize, _config: &Self::Config) -> Self
where
Self: Sized,
{
Self {
queue: LinkedList::new(Adapter::new()),
hand: None,
}
}
fn update(&mut self, _: usize, _: Option<&Self::Config>) -> Result<()> {
Ok(())
}
fn push(&mut self, record: Arc<Record<Self>>) {
record.set_in_eviction(true);
self.queue.push_back(record);
}
fn pop(&mut self) -> Option<Arc<Record<Self>>> {
let mut candidate = if let Some(ref hand_ptr) = self.hand {
unsafe { self.queue.cursor_mut_from_ptr(Arc::as_ptr(hand_ptr)) }
} else {
self.queue.front_mut()
};
loop {
if let Some(record) = candidate.get() {
let state = unsafe { &*record.state().get() };
if !state.is_visited() {
break;
} else {
state.set_visited(false);
if candidate.peek_next().is_null() {
candidate = self.queue.front_mut();
} else {
candidate.move_next();
}
}
} else {
return None;
}
}
self.hand = candidate.peek_next().clone_pointer();
candidate.remove().inspect(|record| record.set_in_eviction(false))
}
fn remove(&mut self, record: &Arc<Record<Self>>) {
if let Some(ref hand_ptr) = self.hand {
if Arc::ptr_eq(hand_ptr, record) {
self.hand = None;
}
}
unsafe { self.queue.remove_from_ptr(Arc::as_ptr(record)) };
record.set_in_eviction(false);
}
fn acquire() -> Op<Self> {
Op::immutable(|_: &Self, record| {
let state = unsafe { &*record.state().get() };
state.set_visited(true);
})
}
fn release() -> Op<Self> {
Op::noop()
}
}
#[cfg(test)]
pub mod tests {
use itertools::Itertools;
use super::*;
use crate::{
eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_eq, Dump, OpExt, TestProperties},
record::Data,
};
impl<K, V> Dump for Sieve<K, V, TestProperties>
where
K: Key + Clone,
V: Value + Clone,
{
type Output = Vec<Arc<Record<Self>>>;
fn dump(&self) -> Self::Output {
let mut res = vec![];
let mut cursor = self.queue.cursor();
loop {
cursor.move_next();
match cursor.clone_pointer() {
Some(record) => res.push(record),
None => break,
}
}
res
}
}
type TestSieve = Sieve<u64, u64, TestProperties>;
#[test]
fn test_sieve_basic() {
let rs = (0..8)
.map(|i| {
Arc::new(Record::new(Data {
key: i,
value: i,
properties: TestProperties::default(),
hash: i,
weight: 1,
}))
})
.collect_vec();
let r = |i: usize| rs[i].clone();
let mut sieve = TestSieve::new(100, &SieveConfig {});
sieve.push(r(0));
sieve.push(r(1));
sieve.push(r(2));
sieve.push(r(3));
assert_ptr_vec_eq(sieve.dump(), vec![r(0), r(1), r(2), r(3)]);
sieve.acquire_immutable(&r(1));
sieve.acquire_immutable(&r(3));
let r0 = sieve.pop().unwrap();
assert_ptr_eq(&rs[0], &r0);
let r2 = sieve.pop().unwrap();
assert_ptr_eq(&rs[2], &r2);
assert_ptr_vec_eq(sieve.dump(), vec![r(1), r(3)]);
let r1 = sieve.pop().unwrap();
assert_ptr_eq(&rs[1], &r1);
assert_ptr_vec_eq(sieve.dump(), vec![r(3)]);
sieve.remove(&r(3));
assert_ptr_vec_eq(sieve.dump(), vec![]);
sieve.push(r(4));
sieve.push(r(5));
sieve.push(r(6));
sieve.clear();
assert_ptr_vec_eq(sieve.dump(), vec![]);
}
}