foyer_memory/
inflight.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    any::Any,
17    fmt::Debug,
18    future::IntoFuture,
19    hash::Hash,
20    sync::{
21        atomic::{AtomicBool, Ordering},
22        Arc,
23    },
24};
25
26use equivalent::Equivalent;
27use foyer_common::{
28    code::{HashBuilder, Key},
29    error::Result,
30    properties::Properties,
31};
32use futures_util::future::BoxFuture;
33use hashbrown::hash_table::{Entry, HashTable};
34use mea::oneshot;
35
36use crate::{indexer::Indexer, raw::RawCacheEntry, Eviction, Piece};
37
38/// An optional fetch operation that may return `None` if the entry is not found.
39pub type OptionalFetch<T> = BoxFuture<'static, Result<Option<T>>>;
40/// A required fetch operation that must return a value.
41pub type RequiredFetch<T> = BoxFuture<'static, Result<T>>;
42
43/// A builder for an optional fetch operation.
44pub type OptionalFetchBuilder<K, V, P, C> =
45    Box<dyn FnOnce(&mut C) -> OptionalFetch<FetchTarget<K, V, P>> + Send + 'static>;
46/// A builder for a required fetch operation.
47pub type RequiredFetchBuilder<K, V, P, C> =
48    Box<dyn FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
49/// A type-erased builder for a required fetch operation.
50pub type RequiredFetchBuilderErased<K, V, P> =
51    Box<dyn FnOnce(&mut dyn Any) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
52
53/// A waiter for a fetch operation.
54pub type Waiter<T> = oneshot::Recv<Result<T>>;
55/// A notifier for a fetch operation.
56pub type Notifier<T> = oneshot::Sender<Result<T>>;
57
58fn erase_required_fetch_builder<K, V, P, C, F>(f: F) -> RequiredFetchBuilderErased<K, V, P>
59where
60    C: Any + Send + 'static,
61    F: FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static,
62{
63    Box::new(move |ctx| {
64        let ctx: &mut C = ctx.downcast_mut::<C>().expect("fetch context type mismatch");
65        f(ctx)
66    })
67}
68
69pub fn unerase_required_fetch_builder<K, V, P, C>(
70    f: RequiredFetchBuilderErased<K, V, P>,
71) -> RequiredFetchBuilder<K, V, P, C>
72where
73    K: 'static,
74    V: 'static,
75    P: 'static,
76    C: Any + Send + 'static,
77{
78    Box::new(move |ctx| f(ctx as &mut dyn Any))
79}
80
81/// The target of a fetch operation.
82pub enum FetchTarget<K, V, P> {
83    /// Fetched entry.
84    Entry {
85        /// Entry value.
86        value: V,
87        /// Entry properties.
88        properties: P,
89    },
90    /// Fetched piece from disk cache write queue.
91    Piece(Piece<K, V, P>),
92}
93
94impl<K, V, P> Debug for FetchTarget<K, V, P> {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("FetchTarget").finish()
97    }
98}
99
100impl<K, V, P> From<V> for FetchTarget<K, V, P>
101where
102    P: Properties,
103{
104    fn from(value: V) -> Self {
105        Self::Entry {
106            value,
107            properties: P::default(),
108        }
109    }
110}
111
112impl<K, V, P> From<(V, P)> for FetchTarget<K, V, P> {
113    fn from((value, properties): (V, P)) -> Self {
114        Self::Entry { value, properties }
115    }
116}
117
118impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
119    fn from(piece: Piece<K, V, P>) -> Self {
120        Self::Piece(piece)
121    }
122}
123
124struct Inflight<E, S, I>
125where
126    E: Eviction,
127    S: HashBuilder,
128    I: Indexer<Eviction = E>,
129{
130    id: usize,
131    close: Arc<AtomicBool>,
132    notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
133    // If a required fetch request comes in while there is already an inflight,
134    // we store the fetch builder here to let the leader perform the fetch later.
135    f: Option<RequiredFetchBuilderErased<E::Key, E::Value, E::Properties>>,
136}
137
138struct InflightEntry<E, S, I>
139where
140    E: Eviction,
141    S: HashBuilder,
142    I: Indexer<Eviction = E>,
143{
144    hash: u64,
145    key: E::Key,
146    inflight: Inflight<E, S, I>,
147}
148
149pub struct InflightManager<E, S, I>
150where
151    E: Eviction,
152    S: HashBuilder,
153    I: Indexer<Eviction = E>,
154{
155    inflights: HashTable<InflightEntry<E, S, I>>,
156    next_id: usize,
157}
158
159impl<E, S, I> Default for InflightManager<E, S, I>
160where
161    E: Eviction,
162    E::Key: Key,
163    S: HashBuilder,
164    I: Indexer<Eviction = E>,
165{
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171impl<E, S, I> InflightManager<E, S, I>
172where
173    E: Eviction,
174    E::Key: Key,
175    S: HashBuilder,
176    I: Indexer<Eviction = E>,
177{
178    pub fn new() -> Self {
179        Self {
180            inflights: HashTable::new(),
181            next_id: 0,
182        }
183    }
184
185    #[expect(clippy::type_complexity)]
186    pub fn enqueue<Q, C>(
187        &mut self,
188        hash: u64,
189        key: &Q,
190        f: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
191    ) -> Enqueue<E, S, I, C>
192    where
193        Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
194        C: Any + Send + 'static,
195    {
196        match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
197            Entry::Occupied(mut o) => {
198                let entry = o.get_mut();
199                if entry.inflight.f.is_none() && f.is_some() {
200                    entry.inflight.f = f.map(erase_required_fetch_builder);
201                }
202                let (tx, rx) = oneshot::channel();
203                entry.inflight.notifiers.push(tx);
204                Enqueue::Wait(rx.into_future())
205            }
206            Entry::Vacant(v) => {
207                let (tx, rx) = oneshot::channel();
208                let id = self.next_id;
209                self.next_id += 1;
210                let entry = InflightEntry {
211                    hash,
212                    key: key.to_owned(),
213                    inflight: Inflight {
214                        id,
215                        close: Arc::new(AtomicBool::new(false)),
216                        notifiers: vec![tx],
217                        f: None,
218                    },
219                };
220                v.insert(entry);
221                let close = Arc::new(AtomicBool::new(false));
222                Enqueue::Lead {
223                    id,
224                    close,
225                    waiter: rx.into_future(),
226                    required_fetch_builder: f,
227                }
228            }
229        }
230    }
231
232    #[expect(clippy::type_complexity)]
233    pub fn take<Q>(
234        &mut self,
235        hash: u64,
236        key: &Q,
237        id: Option<usize>,
238    ) -> Option<Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>>
239    where
240        Q: Hash + Equivalent<E::Key> + ?Sized,
241    {
242        match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
243            Entry::Occupied(o) => match id {
244                Some(id) if id == o.get().inflight.id => Some(o.remove().0.inflight),
245                Some(_) => None,
246                None => Some(o.remove().0.inflight),
247            },
248            Entry::Vacant(..) => None,
249        }
250        .map(|inflight| {
251            inflight.close.store(true, Ordering::Relaxed);
252            inflight.notifiers
253        })
254    }
255
256    pub fn fetch_or_take<Q, C>(&mut self, hash: u64, key: &Q, id: usize) -> Option<FetchOrTake<E, S, I, C>>
257    where
258        Q: Hash + Equivalent<E::Key> + ?Sized,
259        C: Any + Send + 'static,
260    {
261        match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
262            Entry::Vacant(..) => None,
263            Entry::Occupied(mut o) => {
264                if o.get().inflight.id != id {
265                    return None;
266                }
267                let f = o.get_mut().inflight.f.take();
268                match f.map(unerase_required_fetch_builder) {
269                    Some(f) => Some(FetchOrTake::Fetch(f)),
270                    None => {
271                        let inflight = o.remove().0.inflight;
272                        inflight.close.store(true, Ordering::Relaxed);
273                        let notifiers = inflight.notifiers;
274                        Some(FetchOrTake::Notifiers(notifiers))
275                    }
276                }
277            }
278        }
279    }
280}
281
282#[expect(clippy::type_complexity)]
283pub enum Enqueue<E, S, I, C>
284where
285    E: Eviction,
286    S: HashBuilder,
287    I: Indexer<Eviction = E>,
288    C: Any + Send + 'static,
289{
290    Lead {
291        id: usize,
292        close: Arc<AtomicBool>,
293        waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
294        required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
295    },
296    Wait(Waiter<Option<RawCacheEntry<E, S, I>>>),
297}
298
299pub enum FetchOrTake<E, S, I, C>
300where
301    E: Eviction,
302    S: HashBuilder,
303    I: Indexer<Eviction = E>,
304{
305    Fetch(RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>),
306    Notifiers(Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>),
307}