1use std::{
16 any::Any,
17 fmt::Debug,
18 future::IntoFuture,
19 hash::Hash,
20 sync::{
21 atomic::{AtomicBool, Ordering},
22 Arc,
23 },
24};
25
26use equivalent::Equivalent;
27use foyer_common::{
28 code::{HashBuilder, Key},
29 error::Result,
30 properties::Properties,
31};
32use futures_util::future::BoxFuture;
33use hashbrown::hash_table::{Entry, HashTable};
34use mea::oneshot;
35
36use crate::{indexer::Indexer, raw::RawCacheEntry, Eviction, Piece};
37
38pub type OptionalFetch<T> = BoxFuture<'static, Result<Option<T>>>;
40pub type RequiredFetch<T> = BoxFuture<'static, Result<T>>;
42
43pub type OptionalFetchBuilder<K, V, P, C> =
45 Box<dyn FnOnce(&mut C) -> OptionalFetch<FetchTarget<K, V, P>> + Send + 'static>;
46pub type RequiredFetchBuilder<K, V, P, C> =
48 Box<dyn FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
49pub type RequiredFetchBuilderErased<K, V, P> =
51 Box<dyn FnOnce(&mut dyn Any) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
52
53pub type Waiter<T> = oneshot::Recv<Result<T>>;
55pub type Notifier<T> = oneshot::Sender<Result<T>>;
57
58fn erase_required_fetch_builder<K, V, P, C, F>(f: F) -> RequiredFetchBuilderErased<K, V, P>
59where
60 C: Any + Send + 'static,
61 F: FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static,
62{
63 Box::new(move |ctx| {
64 let ctx: &mut C = ctx.downcast_mut::<C>().expect("fetch context type mismatch");
65 f(ctx)
66 })
67}
68
69pub fn unerase_required_fetch_builder<K, V, P, C>(
70 f: RequiredFetchBuilderErased<K, V, P>,
71) -> RequiredFetchBuilder<K, V, P, C>
72where
73 K: 'static,
74 V: 'static,
75 P: 'static,
76 C: Any + Send + 'static,
77{
78 Box::new(move |ctx| f(ctx as &mut dyn Any))
79}
80
81pub enum FetchTarget<K, V, P> {
83 Entry {
85 value: V,
87 properties: P,
89 },
90 Piece(Piece<K, V, P>),
92}
93
94impl<K, V, P> Debug for FetchTarget<K, V, P> {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("FetchTarget").finish()
97 }
98}
99
100impl<K, V, P> From<V> for FetchTarget<K, V, P>
101where
102 P: Properties,
103{
104 fn from(value: V) -> Self {
105 Self::Entry {
106 value,
107 properties: P::default(),
108 }
109 }
110}
111
112impl<K, V, P> From<(V, P)> for FetchTarget<K, V, P> {
113 fn from((value, properties): (V, P)) -> Self {
114 Self::Entry { value, properties }
115 }
116}
117
118impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
119 fn from(piece: Piece<K, V, P>) -> Self {
120 Self::Piece(piece)
121 }
122}
123
124struct Inflight<E, S, I>
125where
126 E: Eviction,
127 S: HashBuilder,
128 I: Indexer<Eviction = E>,
129{
130 id: usize,
131 close: Arc<AtomicBool>,
132 notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
133 f: Option<RequiredFetchBuilderErased<E::Key, E::Value, E::Properties>>,
136}
137
138struct InflightEntry<E, S, I>
139where
140 E: Eviction,
141 S: HashBuilder,
142 I: Indexer<Eviction = E>,
143{
144 hash: u64,
145 key: E::Key,
146 inflight: Inflight<E, S, I>,
147}
148
149pub struct InflightManager<E, S, I>
150where
151 E: Eviction,
152 S: HashBuilder,
153 I: Indexer<Eviction = E>,
154{
155 inflights: HashTable<InflightEntry<E, S, I>>,
156 next_id: usize,
157}
158
159impl<E, S, I> Default for InflightManager<E, S, I>
160where
161 E: Eviction,
162 E::Key: Key,
163 S: HashBuilder,
164 I: Indexer<Eviction = E>,
165{
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171impl<E, S, I> InflightManager<E, S, I>
172where
173 E: Eviction,
174 E::Key: Key,
175 S: HashBuilder,
176 I: Indexer<Eviction = E>,
177{
178 pub fn new() -> Self {
179 Self {
180 inflights: HashTable::new(),
181 next_id: 0,
182 }
183 }
184
185 #[expect(clippy::type_complexity)]
186 pub fn enqueue<Q, C>(
187 &mut self,
188 hash: u64,
189 key: &Q,
190 f: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
191 ) -> Enqueue<E, S, I, C>
192 where
193 Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
194 C: Any + Send + 'static,
195 {
196 match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
197 Entry::Occupied(mut o) => {
198 let entry = o.get_mut();
199 if entry.inflight.f.is_none() && f.is_some() {
200 entry.inflight.f = f.map(erase_required_fetch_builder);
201 }
202 let (tx, rx) = oneshot::channel();
203 entry.inflight.notifiers.push(tx);
204 Enqueue::Wait(rx.into_future())
205 }
206 Entry::Vacant(v) => {
207 let (tx, rx) = oneshot::channel();
208 let id = self.next_id;
209 self.next_id += 1;
210 let entry = InflightEntry {
211 hash,
212 key: key.to_owned(),
213 inflight: Inflight {
214 id,
215 close: Arc::new(AtomicBool::new(false)),
216 notifiers: vec![tx],
217 f: None,
218 },
219 };
220 v.insert(entry);
221 let close = Arc::new(AtomicBool::new(false));
222 Enqueue::Lead {
223 id,
224 close,
225 waiter: rx.into_future(),
226 required_fetch_builder: f,
227 }
228 }
229 }
230 }
231
232 #[expect(clippy::type_complexity)]
233 pub fn take<Q>(
234 &mut self,
235 hash: u64,
236 key: &Q,
237 id: Option<usize>,
238 ) -> Option<Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>>
239 where
240 Q: Hash + Equivalent<E::Key> + ?Sized,
241 {
242 match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
243 Entry::Occupied(o) => match id {
244 Some(id) if id == o.get().inflight.id => Some(o.remove().0.inflight),
245 Some(_) => None,
246 None => Some(o.remove().0.inflight),
247 },
248 Entry::Vacant(..) => None,
249 }
250 .map(|inflight| {
251 inflight.close.store(true, Ordering::Relaxed);
252 inflight.notifiers
253 })
254 }
255
256 pub fn fetch_or_take<Q, C>(&mut self, hash: u64, key: &Q, id: usize) -> Option<FetchOrTake<E, S, I, C>>
257 where
258 Q: Hash + Equivalent<E::Key> + ?Sized,
259 C: Any + Send + 'static,
260 {
261 match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
262 Entry::Vacant(..) => None,
263 Entry::Occupied(mut o) => {
264 if o.get().inflight.id != id {
265 return None;
266 }
267 let f = o.get_mut().inflight.f.take();
268 match f.map(unerase_required_fetch_builder) {
269 Some(f) => Some(FetchOrTake::Fetch(f)),
270 None => {
271 let inflight = o.remove().0.inflight;
272 inflight.close.store(true, Ordering::Relaxed);
273 let notifiers = inflight.notifiers;
274 Some(FetchOrTake::Notifiers(notifiers))
275 }
276 }
277 }
278 }
279 }
280}
281
282#[expect(clippy::type_complexity)]
283pub enum Enqueue<E, S, I, C>
284where
285 E: Eviction,
286 S: HashBuilder,
287 I: Indexer<Eviction = E>,
288 C: Any + Send + 'static,
289{
290 Lead {
291 id: usize,
292 close: Arc<AtomicBool>,
293 waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
294 required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
295 },
296 Wait(Waiter<Option<RawCacheEntry<E, S, I>>>),
297}
298
299pub enum FetchOrTake<E, S, I, C>
300where
301 E: Eviction,
302 S: HashBuilder,
303 I: Indexer<Eviction = E>,
304{
305 Fetch(RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>),
306 Notifiers(Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>),
307}