foyer_memory/
inflight.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    any::Any,
17    fmt::Debug,
18    hash::Hash,
19    sync::{
20        atomic::{AtomicBool, Ordering},
21        Arc,
22    },
23};
24
25use equivalent::Equivalent;
26use foyer_common::{
27    code::{HashBuilder, Key},
28    error::Result,
29    properties::Properties,
30};
31use futures_util::future::BoxFuture;
32use hashbrown::hash_table::{Entry, HashTable};
33use tokio::sync::oneshot;
34
35use crate::{indexer::Indexer, raw::RawCacheEntry, Eviction, Piece};
36
37/// An optional fetch operation that may return `None` if the entry is not found.
38pub type OptionalFetch<T> = BoxFuture<'static, Result<Option<T>>>;
39/// A required fetch operation that must return a value.
40pub type RequiredFetch<T> = BoxFuture<'static, Result<T>>;
41
42/// A builder for an optional fetch operation.
43pub type OptionalFetchBuilder<K, V, P, C> =
44    Box<dyn FnOnce(&mut C) -> OptionalFetch<FetchTarget<K, V, P>> + Send + 'static>;
45/// A builder for a required fetch operation.
46pub type RequiredFetchBuilder<K, V, P, C> =
47    Box<dyn FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
48/// A type-erased builder for a required fetch operation.
49pub type RequiredFetchBuilderErased<K, V, P> =
50    Box<dyn FnOnce(&mut dyn Any) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static>;
51
52/// A waiter for a fetch operation.
53pub type Waiter<T> = oneshot::Receiver<Result<T>>;
54/// A notifier for a fetch operation.
55pub type Notifier<T> = oneshot::Sender<Result<T>>;
56
57fn erase_required_fetch_builder<K, V, P, C, F>(f: F) -> RequiredFetchBuilderErased<K, V, P>
58where
59    C: Any + Send + 'static,
60    F: FnOnce(&mut C) -> RequiredFetch<FetchTarget<K, V, P>> + Send + 'static,
61{
62    Box::new(move |ctx| {
63        let ctx: &mut C = ctx.downcast_mut::<C>().expect("fetch context type mismatch");
64        f(ctx)
65    })
66}
67
68pub fn unerase_required_fetch_builder<K, V, P, C>(
69    f: RequiredFetchBuilderErased<K, V, P>,
70) -> RequiredFetchBuilder<K, V, P, C>
71where
72    K: 'static,
73    V: 'static,
74    P: 'static,
75    C: Any + Send + 'static,
76{
77    Box::new(move |ctx| f(ctx as &mut dyn Any))
78}
79
80/// The target of a fetch operation.
81pub enum FetchTarget<K, V, P> {
82    /// Fetched entry.
83    Entry {
84        /// Entry value.
85        value: V,
86        /// Entry properties.
87        properties: P,
88    },
89    /// Fetched piece from disk cache write queue.
90    Piece(Piece<K, V, P>),
91}
92
93impl<K, V, P> Debug for FetchTarget<K, V, P> {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct("FetchTarget").finish()
96    }
97}
98
99impl<K, V, P> From<V> for FetchTarget<K, V, P>
100where
101    P: Properties,
102{
103    fn from(value: V) -> Self {
104        Self::Entry {
105            value,
106            properties: P::default(),
107        }
108    }
109}
110
111impl<K, V, P> From<(V, P)> for FetchTarget<K, V, P> {
112    fn from((value, properties): (V, P)) -> Self {
113        Self::Entry { value, properties }
114    }
115}
116
117impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
118    fn from(piece: Piece<K, V, P>) -> Self {
119        Self::Piece(piece)
120    }
121}
122
123struct Inflight<E, S, I>
124where
125    E: Eviction,
126    S: HashBuilder,
127    I: Indexer<Eviction = E>,
128{
129    id: usize,
130    close: Arc<AtomicBool>,
131    notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
132    // If a required fetch request comes in while there is already an inflight,
133    // we store the fetch builder here to let the leader perform the fetch later.
134    f: Option<RequiredFetchBuilderErased<E::Key, E::Value, E::Properties>>,
135}
136
137struct InflightEntry<E, S, I>
138where
139    E: Eviction,
140    S: HashBuilder,
141    I: Indexer<Eviction = E>,
142{
143    hash: u64,
144    key: E::Key,
145    inflight: Inflight<E, S, I>,
146}
147
148pub struct InflightManager<E, S, I>
149where
150    E: Eviction,
151    S: HashBuilder,
152    I: Indexer<Eviction = E>,
153{
154    inflights: HashTable<InflightEntry<E, S, I>>,
155    next_id: usize,
156}
157
158impl<E, S, I> Default for InflightManager<E, S, I>
159where
160    E: Eviction,
161    E::Key: Key,
162    S: HashBuilder,
163    I: Indexer<Eviction = E>,
164{
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170impl<E, S, I> InflightManager<E, S, I>
171where
172    E: Eviction,
173    E::Key: Key,
174    S: HashBuilder,
175    I: Indexer<Eviction = E>,
176{
177    pub fn new() -> Self {
178        Self {
179            inflights: HashTable::new(),
180            next_id: 0,
181        }
182    }
183
184    #[expect(clippy::type_complexity)]
185    pub fn enqueue<Q, C>(
186        &mut self,
187        hash: u64,
188        key: &Q,
189        f: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
190    ) -> Enqueue<E, S, I, C>
191    where
192        Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
193        C: Any + Send + 'static,
194    {
195        match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
196            Entry::Occupied(mut o) => {
197                let entry = o.get_mut();
198                if entry.inflight.f.is_none() && f.is_some() {
199                    entry.inflight.f = f.map(erase_required_fetch_builder);
200                }
201                let (tx, rx) = oneshot::channel();
202                entry.inflight.notifiers.push(tx);
203                Enqueue::Wait(rx)
204            }
205            Entry::Vacant(v) => {
206                let (tx, rx) = oneshot::channel();
207                let id = self.next_id;
208                self.next_id += 1;
209                let entry = InflightEntry {
210                    hash,
211                    key: key.to_owned(),
212                    inflight: Inflight {
213                        id,
214                        close: Arc::new(AtomicBool::new(false)),
215                        notifiers: vec![tx],
216                        f: None,
217                    },
218                };
219                v.insert(entry);
220                let close = Arc::new(AtomicBool::new(false));
221                Enqueue::Lead {
222                    id,
223                    close,
224                    waiter: rx,
225                    required_fetch_builder: f,
226                }
227            }
228        }
229    }
230
231    #[expect(clippy::type_complexity)]
232    pub fn take<Q>(
233        &mut self,
234        hash: u64,
235        key: &Q,
236        id: Option<usize>,
237    ) -> Option<Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>>
238    where
239        Q: Hash + Equivalent<E::Key> + ?Sized,
240    {
241        match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
242            Entry::Occupied(o) => match id {
243                Some(id) if id == o.get().inflight.id => Some(o.remove().0.inflight),
244                Some(_) => None,
245                None => Some(o.remove().0.inflight),
246            },
247            Entry::Vacant(..) => None,
248        }
249        .map(|inflight| {
250            inflight.close.store(true, Ordering::Relaxed);
251            inflight.notifiers
252        })
253    }
254
255    pub fn fetch_or_take<Q, C>(&mut self, hash: u64, key: &Q, id: usize) -> Option<FetchOrTake<E, S, I, C>>
256    where
257        Q: Hash + Equivalent<E::Key> + ?Sized,
258        C: Any + Send + 'static,
259    {
260        match self.inflights.entry(hash, |e| key.equivalent(&e.key), |e| e.hash) {
261            Entry::Vacant(..) => None,
262            Entry::Occupied(mut o) => {
263                if o.get().inflight.id != id {
264                    return None;
265                }
266                let f = o.get_mut().inflight.f.take();
267                match f.map(unerase_required_fetch_builder) {
268                    Some(f) => Some(FetchOrTake::Fetch(f)),
269                    None => {
270                        let inflight = o.remove().0.inflight;
271                        inflight.close.store(true, Ordering::Relaxed);
272                        let notifiers = inflight.notifiers;
273                        Some(FetchOrTake::Notifiers(notifiers))
274                    }
275                }
276            }
277        }
278    }
279}
280
281#[expect(clippy::type_complexity)]
282pub enum Enqueue<E, S, I, C>
283where
284    E: Eviction,
285    S: HashBuilder,
286    I: Indexer<Eviction = E>,
287    C: Any + Send + 'static,
288{
289    Lead {
290        id: usize,
291        close: Arc<AtomicBool>,
292        waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
293        required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
294    },
295    Wait(Waiter<Option<RawCacheEntry<E, S, I>>>),
296}
297
298pub enum FetchOrTake<E, S, I, C>
299where
300    E: Eviction,
301    S: HashBuilder,
302    I: Indexer<Eviction = E>,
303{
304    Fetch(RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>),
305    Notifiers(Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>),
306}