1use std::{
16 any::Any,
17 fmt::Debug,
18 hash::Hash,
19 sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc,
22 },
23};
24
25use equivalent::Equivalent;
26use foyer_common::{
27 code::{HashBuilder, Key},
28 error::Result,
29 properties::Properties,
30};
31use futures_util::future::BoxFuture;
32use hashbrown::hash_table::{Entry, HashTable};
33use tokio::sync::oneshot;
34
35use crate::{indexer::Indexer, raw::RawCacheEntry, Eviction, Piece};
36
37pub type OptionalFetch<T> = BoxFuture<'static, Result<Option<T>>>;
39pub type RequiredFetch<T> = BoxFuture<'static, Result<T>>;
41
42pub type OptionalFetchBuilder<K, V, P, C> =
44 Box<dyn FnOnce(&mut C) -> OptionalFetch<FetchTarget<K, V, P>> + Send + 'static>;
45pub type RequiredFetchBuilder<K, V, P, C> =
47 Box<dyn FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
48pub type RequiredFetchBuilderErased<K, V, P> =
50 Box<dyn FnOnce(&mut dyn Any) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
51
52pub type Waiter<T> = oneshot::Receiver<Result<T>>;
54pub type Notifier<T> = oneshot::Sender<Result<T>>;
56
57fn erase_required_fetch_builder<K, V, P, C, F>(f: F) -> RequiredFetchBuilderErased<K, V, P>
58where
59 C: Any + Send + 'static,
60 F: FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static,
61{
62 Box::new(move |ctx| {
63 let ctx: &mut C = ctx.downcast_mut::<C>().expect("fetch context type mismatch");
64 f(ctx)
65 })
66}
67
68pub fn unerase_required_fetch_builder<K, V, P, C>(
69 f: RequiredFetchBuilderErased<K, V, P>,
70) -> RequiredFetchBuilder<K, V, P, C>
71where
72 K: 'static,
73 V: 'static,
74 P: 'static,
75 C: Any + Send + 'static,
76{
77 Box::new(move |ctx| f(ctx as &mut dyn Any))
78}
79
80pub enum FetchTarget<K, V, P> {
82 Entry {
84 value: V,
86 properties: P,
88 },
89 Piece(Piece<K, V, P>),
91}
92
93impl<K, V, P> Debug for FetchTarget<K, V, P> {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("FetchTarget").finish()
96 }
97}
98
99impl<K, V, P> From<V> for FetchTarget<K, V, P>
100where
101 P: Properties,
102{
103 fn from(value: V) -> Self {
104 Self::Entry {
105 value,
106 properties: P::default(),
107 }
108 }
109}
110
111impl<K, V, P> From<(V, P)> for FetchTarget<K, V, P> {
112 fn from((value, properties): (V, P)) -> Self {
113 Self::Entry { value, properties }
114 }
115}
116
117impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
118 fn from(piece: Piece<K, V, P>) -> Self {
119 Self::Piece(piece)
120 }
121}
122
123struct Inflight<E, S, I>
124where
125 E: Eviction,
126 S: HashBuilder,
127 I: Indexer<Eviction = E>,
128{
129 id: usize,
130 close: Arc<AtomicBool>,
131 notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
132 f: Option<RequiredFetchBuilderErased<E::Key, E::Value, E::Properties>>,
135}
136
137struct InflightEntry<E, S, I>
138where
139 E: Eviction,
140 S: HashBuilder,
141 I: Indexer<Eviction = E>,
142{
143 hash: u64,
144 key: E::Key,
145 inflight: Inflight<E, S, I>,
146}
147
148pub struct InflightManager<E, S, I>
149where
150 E: Eviction,
151 S: HashBuilder,
152 I: Indexer<Eviction = E>,
153{
154 inflights: HashTable<InflightEntry<E, S, I>>,
155 next_id: usize,
156}
157
158impl<E, S, I> Default for InflightManager<E, S, I>
159where
160 E: Eviction,
161 E::Key: Key,
162 S: HashBuilder,
163 I: Indexer<Eviction = E>,
164{
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170impl<E, S, I> InflightManager<E, S, I>
171where
172 E: Eviction,
173 E::Key: Key,
174 S: HashBuilder,
175 I: Indexer<Eviction = E>,
176{
177 pub fn new() -> Self {
178 Self {
179 inflights: HashTable::new(),
180 next_id: 0,
181 }
182 }
183
184 #[expect(clippy::type_complexity)]
185 pub fn enqueue<Q, C>(
186 &mut self,
187 hash: u64,
188 key: &Q,
189 f: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
190 ) -> Enqueue<E, S, I, C>
191 where
192 Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
193 C: Any + Send + 'static,
194 {
195 match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
196 Entry::Occupied(mut o) => {
197 let entry = o.get_mut();
198 if entry.inflight.f.is_none() && f.is_some() {
199 entry.inflight.f = f.map(erase_required_fetch_builder);
200 }
201 let (tx, rx) = oneshot::channel();
202 entry.inflight.notifiers.push(tx);
203 Enqueue::Wait(rx)
204 }
205 Entry::Vacant(v) => {
206 let (tx, rx) = oneshot::channel();
207 let id = self.next_id;
208 self.next_id += 1;
209 let entry = InflightEntry {
210 hash,
211 key: key.to_owned(),
212 inflight: Inflight {
213 id,
214 close: Arc::new(AtomicBool::new(false)),
215 notifiers: vec![tx],
216 f: None,
217 },
218 };
219 v.insert(entry);
220 let close = Arc::new(AtomicBool::new(false));
221 Enqueue::Lead {
222 id,
223 close,
224 waiter: rx,
225 required_fetch_builder: f,
226 }
227 }
228 }
229 }
230
231 #[expect(clippy::type_complexity)]
232 pub fn take<Q>(
233 &mut self,
234 hash: u64,
235 key: &Q,
236 id: Option<usize>,
237 ) -> Option<Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>>
238 where
239 Q: Hash + Equivalent<E::Key> + ?Sized,
240 {
241 match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
242 Entry::Occupied(o) => match id {
243 Some(id) if id == o.get().inflight.id => Some(o.remove().0.inflight),
244 Some(_) => None,
245 None => Some(o.remove().0.inflight),
246 },
247 Entry::Vacant(..) => None,
248 }
249 .map(|inflight| {
250 inflight.close.store(true, Ordering::Relaxed);
251 inflight.notifiers
252 })
253 }
254
255 pub fn fetch_or_take<Q, C>(&mut self, hash: u64, key: &Q, id: usize) -> Option<FetchOrTake<E, S, I, C>>
256 where
257 Q: Hash + Equivalent<E::Key> + ?Sized,
258 C: Any + Send + 'static,
259 {
260 match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
261 Entry::Vacant(..) => None,
262 Entry::Occupied(mut o) => {
263 if o.get().inflight.id != id {
264 return None;
265 }
266 let f = o.get_mut().inflight.f.take();
267 match f.map(unerase_required_fetch_builder) {
268 Some(f) => Some(FetchOrTake::Fetch(f)),
269 None => {
270 let inflight = o.remove().0.inflight;
271 inflight.close.store(true, Ordering::Relaxed);
272 let notifiers = inflight.notifiers;
273 Some(FetchOrTake::Notifiers(notifiers))
274 }
275 }
276 }
277 }
278 }
279}
280
281#[expect(clippy::type_complexity)]
282pub enum Enqueue<E, S, I, C>
283where
284 E: Eviction,
285 S: HashBuilder,
286 I: Indexer<Eviction = E>,
287 C: Any + Send + 'static,
288{
289 Lead {
290 id: usize,
291 close: Arc<AtomicBool>,
292 waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
293 required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
294 },
295 Wait(Waiter<Option<RawCacheEntry<E, S, I>>>),
296}
297
298pub enum FetchOrTake<E, S, I, C>
299where
300 E: Eviction,
301 S: HashBuilder,
302 I: Indexer<Eviction = E>,
303{
304 Fetch(RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>),
305 Notifiers(Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>),
306}