use std::{
collections::HashSet,
fmt::Debug,
future::ready,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use parking_lot::Mutex;
use tokio::sync::oneshot;
use crate::{io::device::statistics::Statistics, StorageFilterCondition, StorageFilterResult};
#[derive(Debug)]
pub struct Biased {
admits: HashSet<u64>,
}
impl Biased {
pub fn new(admits: impl IntoIterator<Item = u64>) -> Self {
Self {
admits: admits.into_iter().collect(),
}
}
}
impl StorageFilterCondition for Biased {
fn filter(&self, _: &Arc<Statistics>, hash: u64, _: usize) -> StorageFilterResult {
if self.admits.contains(&hash) {
StorageFilterResult::Admit
} else {
StorageFilterResult::Reject
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Record {
Admit(u64),
Evict(u64),
}
#[derive(Debug, Default)]
struct RecorderInner {
records: Mutex<Vec<Record>>,
}
impl RecorderInner {
fn dump(&self) -> Vec<Record> {
self.records.lock().clone()
}
fn remains(&self) -> HashSet<u64> {
let records = self.dump();
let mut res = HashSet::default();
for record in records {
match record {
Record::Admit(key) => {
res.insert(key);
}
Record::Evict(key) => {
res.remove(&key);
}
}
}
res
}
}
#[derive(Debug)]
pub struct AdmitRecorder {
inner: Arc<RecorderInner>,
}
#[derive(Debug)]
pub struct EvictRecorder {
inner: Arc<RecorderInner>,
}
impl StorageFilterCondition for AdmitRecorder {
fn filter(&self, _: &Arc<Statistics>, hash: u64, _: usize) -> StorageFilterResult {
self.inner.records.lock().push(Record::Admit(hash));
StorageFilterResult::Admit
}
}
impl StorageFilterCondition for EvictRecorder {
fn filter(&self, _: &Arc<Statistics>, hash: u64, _: usize) -> StorageFilterResult {
self.inner.records.lock().push(Record::Evict(hash));
StorageFilterResult::Reject
}
}
#[derive(Debug, Default, Clone)]
pub struct Recorder {
inner: Arc<RecorderInner>,
}
impl Recorder {
pub fn dump(&self) -> Vec<Record> {
self.inner.dump()
}
pub fn remains(&self) -> HashSet<u64> {
self.inner.remains()
}
pub fn admission(&self) -> AdmitRecorder {
AdmitRecorder {
inner: self.inner.clone(),
}
}
pub fn eviction(&self) -> EvictRecorder {
EvictRecorder {
inner: self.inner.clone(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct LoadThrottleSwitch {
throttled: Arc<AtomicBool>,
}
impl LoadThrottleSwitch {
pub fn is_throttled(&self) -> bool {
self.throttled.load(Ordering::Relaxed)
}
pub fn throttle(&self) {
self.throttled.store(true, Ordering::Relaxed);
}
pub fn unthrottle(&self) {
self.throttled.store(false, Ordering::Relaxed);
}
}
#[derive(Debug, Clone, Default)]
pub struct Switch {
hold: Arc<AtomicBool>,
}
impl Switch {
pub fn is_on(&self) -> bool {
self.hold.load(Ordering::Relaxed)
}
pub fn on(&self) {
self.hold.store(true, Ordering::Relaxed);
}
pub fn off(&self) {
self.hold.store(false, Ordering::Relaxed);
}
}
#[derive(Debug, Default)]
struct HolderInner {
holdees: Vec<oneshot::Sender<()>>,
holding: bool,
}
#[derive(Debug, Clone, Default)]
pub struct Holder {
inner: Arc<Mutex<HolderInner>>,
}
impl Holder {
pub fn is_held(&self) -> bool {
self.inner.lock().holding
}
pub fn hold(&self) {
self.inner.lock().holding = true;
}
pub fn unhold(&self) {
let mut inner = self.inner.lock();
inner.holding = false;
for tx in inner.holdees.drain(..) {
let _ = tx.send(());
}
}
pub fn wait(&self) -> BoxFuture<'static, ()> {
let mut inner = self.inner.lock();
if !inner.holding {
return ready(()).boxed();
}
let (tx, rx) = oneshot::channel();
inner.holdees.push(tx);
async move {
let _ = rx.await;
}
.boxed()
}
}