use std::{
any::Any,
fmt::Debug,
future::IntoFuture,
hash::Hash,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use equivalent::Equivalent;
use foyer_common::{
code::{HashBuilder, Key},
error::Result,
properties::Properties,
};
use futures_util::future::BoxFuture;
use hashbrown::hash_table::{Entry, HashTable};
use mea::oneshot;
use crate::{indexer::Indexer, raw::RawCacheEntry, Eviction, Piece};
pub type OptionalFetch<T> = BoxFuture<'static, Result<Option<T>>>;
pub type RequiredFetch<T> = BoxFuture<'static, Result<T>>;
pub type OptionalFetchBuilder<K, V, P, C> =
Box<dyn FnOnce(&mut C) -> OptionalFetch<FetchTarget<K, V, P>> + Send + 'static>;
pub type RequiredFetchBuilder<K, V, P, C> =
Box<dyn FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
pub type RequiredFetchBuilderErased<K, V, P> =
Box<dyn FnOnce(&mut dyn Any) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
pub type Waiter<T> = oneshot::Recv<Result<T>>;
pub type Notifier<T> = oneshot::Sender<Result<T>>;
fn erase_required_fetch_builder<K, V, P, C, F>(f: F) -> RequiredFetchBuilderErased<K, V, P>
where
C: Any + Send + 'static,
F: FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static,
{
Box::new(move |ctx| {
let ctx: &mut C = ctx.downcast_mut::<C>().expect("fetch context type mismatch");
f(ctx)
})
}
pub fn unerase_required_fetch_builder<K, V, P, C>(
f: RequiredFetchBuilderErased<K, V, P>,
) -> RequiredFetchBuilder<K, V, P, C>
where
K: 'static,
V: 'static,
P: 'static,
C: Any + Send + 'static,
{
Box::new(move |ctx| f(ctx as &mut dyn Any))
}
pub enum FetchTarget<K, V, P> {
Entry {
value: V,
properties: P,
},
Piece(Piece<K, V, P>),
}
impl<K, V, P> Debug for FetchTarget<K, V, P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FetchTarget").finish()
}
}
impl<K, V, P> From<V> for FetchTarget<K, V, P>
where
P: Properties,
{
fn from(value: V) -> Self {
Self::Entry {
value,
properties: P::default(),
}
}
}
impl<K, V, P> From<(V, P)> for FetchTarget<K, V, P> {
fn from((value, properties): (V, P)) -> Self {
Self::Entry { value, properties }
}
}
impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
fn from(piece: Piece<K, V, P>) -> Self {
Self::Piece(piece)
}
}
struct Inflight<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
id: usize,
close: Arc<AtomicBool>,
notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
f: Option<RequiredFetchBuilderErased<E::Key, E::Value, E::Properties>>,
}
struct InflightEntry<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
hash: u64,
key: E::Key,
inflight: Inflight<E, S, I>,
}
pub struct InflightManager<E, S, I>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
inflights: HashTable<InflightEntry<E, S, I>>,
next_id: usize,
}
impl<E, S, I> Default for InflightManager<E, S, I>
where
E: Eviction,
E::Key: Key,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
fn default() -> Self {
Self::new()
}
}
impl<E, S, I> InflightManager<E, S, I>
where
E: Eviction,
E::Key: Key,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
pub fn new() -> Self {
Self {
inflights: HashTable::new(),
next_id: 0,
}
}
#[expect(clippy::type_complexity)]
pub fn enqueue<Q, C>(
&mut self,
hash: u64,
key: &Q,
f: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
) -> Enqueue<E, S, I, C>
where
Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
C: Any + Send + 'static,
{
match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
Entry::Occupied(mut o) => {
let entry = o.get_mut();
if entry.inflight.f.is_none() && f.is_some() {
entry.inflight.f = f.map(erase_required_fetch_builder);
}
let (tx, rx) = oneshot::channel();
entry.inflight.notifiers.push(tx);
Enqueue::Wait(rx.into_future())
}
Entry::Vacant(v) => {
let (tx, rx) = oneshot::channel();
let id = self.next_id;
self.next_id += 1;
let entry = InflightEntry {
hash,
key: key.to_owned(),
inflight: Inflight {
id,
close: Arc::new(AtomicBool::new(false)),
notifiers: vec![tx],
f: None,
},
};
v.insert(entry);
let close = Arc::new(AtomicBool::new(false));
Enqueue::Lead {
id,
close,
waiter: rx.into_future(),
required_fetch_builder: f,
}
}
}
}
#[expect(clippy::type_complexity)]
pub fn take<Q>(
&mut self,
hash: u64,
key: &Q,
id: Option<usize>,
) -> Option<Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
{
match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
Entry::Occupied(o) => match id {
Some(id) if id == o.get().inflight.id => Some(o.remove().0.inflight),
Some(_) => None,
None => Some(o.remove().0.inflight),
},
Entry::Vacant(..) => None,
}
.map(|inflight| {
inflight.close.store(true, Ordering::Relaxed);
inflight.notifiers
})
}
pub fn fetch_or_take<Q, C>(&mut self, hash: u64, key: &Q, id: usize) -> Option<FetchOrTake<E, S, I, C>>
where
Q: Hash + Equivalent<E::Key> + ?Sized,
C: Any + Send + 'static,
{
match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
Entry::Vacant(..) => None,
Entry::Occupied(mut o) => {
if o.get().inflight.id != id {
return None;
}
let f = o.get_mut().inflight.f.take();
match f.map(unerase_required_fetch_builder) {
Some(f) => Some(FetchOrTake::Fetch(f)),
None => {
let inflight = o.remove().0.inflight;
inflight.close.store(true, Ordering::Relaxed);
let notifiers = inflight.notifiers;
Some(FetchOrTake::Notifiers(notifiers))
}
}
}
}
}
}
#[expect(clippy::type_complexity)]
pub enum Enqueue<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
C: Any + Send + 'static,
{
Lead {
id: usize,
close: Arc<AtomicBool>,
waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
},
Wait(Waiter<Option<RawCacheEntry<E, S, I>>>),
}
pub enum FetchOrTake<E, S, I, C>
where
E: Eviction,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
Fetch(RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>),
Notifiers(Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>),
}