foyer_storage/
store.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    any::{Any, TypeId},
17    borrow::Cow,
18    fmt::Debug,
19    hash::Hash,
20    sync::Arc,
21    time::Instant,
22};
23
24use equivalent::Equivalent;
25use foyer_common::{
26    code::{HashBuilder, StorageKey, StorageValue},
27    error::Result,
28    metrics::Metrics,
29    properties::{Age, Properties},
30    spawn::Spawner,
31};
32use foyer_memory::{Cache, Piece};
33
34#[cfg(feature = "test_utils")]
35use crate::test_utils::*;
36use crate::{
37    compress::Compression,
38    engine::{
39        noop::{NoopEngine, NoopEngineConfig},
40        Engine, EngineBuildContext, EngineConfig, Load, Populated, RecoverMode,
41    },
42    io::{
43        device::{statistics::Statistics, throttle::Throttle, Device},
44        engine::{monitor::MonitoredIoEngine, psync::PsyncIoEngineConfig, IoEngineBuildContext, IoEngineConfig},
45    },
46    keeper::Keeper,
47    serde::EntrySerializer,
48    StorageFilterResult,
49};
50
51/// The disk cache engine that serves as the storage backend of `foyer`.
52pub struct Store<K, V, S, P>
53where
54    K: StorageKey,
55    V: StorageValue,
56    S: HashBuilder + Debug,
57    P: Properties,
58{
59    inner: Arc<StoreInner<K, V, S, P>>,
60}
61
62struct StoreInner<K, V, S, P>
63where
64    K: StorageKey,
65    V: StorageValue,
66    S: HashBuilder + Debug,
67    P: Properties,
68{
69    hasher: Arc<S>,
70
71    keeper: Keeper<K, V, P>,
72    engine: Arc<dyn Engine<K, V, P>>,
73
74    compression: Compression,
75
76    spawner: Spawner,
77
78    metrics: Arc<Metrics>,
79
80    #[cfg(any(test, feature = "test_utils"))]
81    load_throttle_switch: LoadThrottleSwitch,
82}
83
84impl<K, V, S, P> Debug for Store<K, V, S, P>
85where
86    K: StorageKey,
87    V: StorageValue,
88    S: HashBuilder + Debug,
89    P: Properties,
90{
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("Store")
93            .field("keeper", &self.inner.keeper)
94            .field("engine", &self.inner.engine)
95            .field("compression", &self.inner.compression)
96            .field("runtimes", &self.inner.spawner)
97            .finish()
98    }
99}
100
101impl<K, V, S, P> Clone for Store<K, V, S, P>
102where
103    K: StorageKey,
104    V: StorageValue,
105    S: HashBuilder + Debug,
106    P: Properties,
107{
108    fn clone(&self) -> Self {
109        Self {
110            inner: self.inner.clone(),
111        }
112    }
113}
114
115impl<K, V, S, P> Store<K, V, S, P>
116where
117    K: StorageKey,
118    V: StorageValue,
119    S: HashBuilder + Debug,
120    P: Properties,
121{
122    /// Close the disk cache gracefully.
123    ///
124    /// `close` will wait for all ongoing flush and reclaim tasks to finish.
125    pub async fn close(&self) -> Result<()> {
126        self.inner.engine.close().await
127    }
128
129    /// Return if the given key can be picked by the admission filter.
130    pub fn filter(&self, hash: u64, estimated_size: usize) -> StorageFilterResult {
131        self.inner.engine.filter(hash, estimated_size)
132    }
133
134    /// Push a in-memory cache piece to the disk cache write queue.
135    pub fn enqueue(&self, piece: Piece<K, V, P>, force: bool) {
136        tracing::trace!(hash = piece.hash(), "[store]: enqueue piece");
137        let now = Instant::now();
138
139        if force
140            || self
141                .filter(
142                    piece.hash(),
143                    piece.key().estimated_size() + piece.value().estimated_size(),
144                )
145                .is_admitted()
146        {
147            let estimated_size = EntrySerializer::estimated_size(piece.key(), piece.value());
148            let rpiece = self.inner.keeper.insert(piece);
149            self.inner.engine.enqueue(rpiece, estimated_size);
150        }
151
152        self.inner.metrics.storage_enqueue.increase(1);
153        self.inner
154            .metrics
155            .storage_enqueue_duration
156            .record(now.elapsed().as_secs_f64());
157    }
158
159    /// Load a cache entry from the disk cache.
160    pub async fn load<Q>(&self, key: &Q) -> Result<Load<K, V, P>>
161    where
162        Q: Hash + Equivalent<K> + ?Sized,
163    {
164        let now = Instant::now();
165
166        let hash = self.inner.hasher.hash_one(key);
167
168        if let Some(piece) = self.inner.keeper.get(hash, key) {
169            tracing::trace!(hash, "[store]: load from keeper");
170            return Ok(Load::Piece {
171                piece,
172                populated: Populated { age: Age::Young },
173            });
174        }
175
176        #[cfg(feature = "test_utils")]
177        if self.inner.load_throttle_switch.is_throttled() {
178            self.inner.metrics.storage_throttled.increase(1);
179            self.inner
180                .metrics
181                .storage_throttled_duration
182                .record(now.elapsed().as_secs_f64());
183            return Ok(Load::Throttled);
184        }
185
186        match self.inner.engine.load(hash).await {
187            Ok(Load::Entry {
188                key: k,
189                value: v,
190                populated: p,
191            }) if key.equivalent(&k) => {
192                self.inner.metrics.storage_hit.increase(1);
193                self.inner
194                    .metrics
195                    .storage_hit_duration
196                    .record(now.elapsed().as_secs_f64());
197                Ok(Load::Entry {
198                    key: k,
199                    value: v,
200                    populated: p,
201                })
202            }
203            Ok(Load::Piece { piece, populated }) if key.equivalent(piece.key()) => {
204                self.inner.metrics.storage_hit.increase(1);
205                self.inner
206                    .metrics
207                    .storage_hit_duration
208                    .record(now.elapsed().as_secs_f64());
209                Ok(Load::Piece { piece, populated })
210            }
211            Ok(Load::Entry { .. }) | Ok(Load::Piece { .. }) | Ok(Load::Miss) => {
212                self.inner.metrics.storage_miss.increase(1);
213                self.inner
214                    .metrics
215                    .storage_miss_duration
216                    .record(now.elapsed().as_secs_f64());
217                Ok(Load::Miss)
218            }
219            Ok(Load::Throttled) => {
220                self.inner.metrics.storage_throttled.increase(1);
221                self.inner
222                    .metrics
223                    .storage_throttled_duration
224                    .record(now.elapsed().as_secs_f64());
225                Ok(Load::Throttled)
226            }
227            Err(e) => {
228                self.inner.metrics.storage_error.increase(1);
229                Err(e)
230            }
231        }
232    }
233
234    /// Delete the cache entry with the given key from the disk cache.
235    pub fn delete<'a, Q>(&'a self, key: &'a Q)
236    where
237        Q: Hash + Equivalent<K> + ?Sized,
238    {
239        let now = Instant::now();
240
241        let hash = self.inner.hasher.hash_one(key);
242        self.inner.engine.delete(hash);
243
244        self.inner.metrics.storage_delete.increase(1);
245        self.inner
246            .metrics
247            .storage_delete_duration
248            .record(now.elapsed().as_secs_f64());
249    }
250
251    /// Check if the disk cache contains a cached entry with the given key.
252    ///
253    /// `contains` may return a false-positive result if there is a hash collision with the given key.
254    pub fn may_contains<Q>(&self, key: &Q) -> bool
255    where
256        Q: Hash + Equivalent<K> + ?Sized,
257    {
258        let hash = self.inner.hasher.hash_one(key);
259        self.inner.engine.may_contains(hash)
260    }
261
262    /// Delete all cached entries of the disk cache.
263    pub async fn destroy(&self) -> Result<()> {
264        self.inner.engine.destroy().await
265    }
266
267    /// Get the device of the disk cache.
268    pub fn device(&self) -> &Arc<dyn Device> {
269        self.inner.engine.device()
270    }
271
272    /// Get the statistics information of the disk cache.
273    pub fn statistics(&self) -> &Arc<Statistics> {
274        self.inner.engine.device().statistics()
275    }
276
277    /// Get the io throttle of the disk cache.
278    pub fn throttle(&self) -> &Throttle {
279        self.inner.engine.device().statistics().throttle()
280    }
281
282    /// Get the spawner.
283    pub fn spawner(&self) -> &Spawner {
284        &self.inner.spawner
285    }
286
287    /// Wait for the ongoing flush and reclaim tasks to finish.
288    pub async fn wait(&self) {
289        self.inner.engine.wait().await
290    }
291
292    /// Return the estimated serialized size of the entry.
293    pub fn entry_estimated_size(&self, key: &K, value: &V) -> usize {
294        EntrySerializer::estimated_size(key, value)
295    }
296
297    /// Get the load throttle switch for the disk cache.
298    #[cfg(feature = "test_utils")]
299    pub fn load_throttle_switch(&self) -> &LoadThrottleSwitch {
300        &self.inner.load_throttle_switch
301    }
302
303    /// If the disk cache is enabled.
304    pub fn is_enabled(&self) -> bool {
305        self.inner.engine.type_id() != TypeId::of::<Arc<NoopEngine<K, V, P>>>()
306    }
307}
308
309/// The builder of the disk cache.
310pub struct StoreBuilder<K, V, S, P>
311where
312    K: StorageKey,
313    V: StorageValue,
314    S: HashBuilder + Debug,
315    P: Properties,
316{
317    name: Cow<'static, str>,
318    memory: Cache<K, V, S, P>,
319    metrics: Arc<Metrics>,
320
321    io_engine_config: Option<Box<dyn IoEngineConfig>>,
322    engine_config: Option<Box<dyn EngineConfig<K, V, P>>>,
323
324    spawner: Option<Spawner>,
325
326    compression: Compression,
327    recover_mode: RecoverMode,
328
329    #[cfg(any(test, feature = "test_utils"))]
330    load_throttle_switch: LoadThrottleSwitch,
331}
332
333impl<K, V, S, P> Debug for StoreBuilder<K, V, S, P>
334where
335    K: StorageKey,
336    V: StorageValue,
337    S: HashBuilder + Debug,
338    P: Properties,
339{
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        f.debug_struct("StoreBuilder")
342            .field("name", &self.name)
343            .field("memory", &self.memory)
344            .field("metrics", &self.metrics)
345            .field("io_engine_builder", &self.io_engine_config)
346            .field("engine_builder", &self.engine_config)
347            .field("spawner", &self.spawner)
348            .field("compression", &self.compression)
349            .field("recover_mode", &self.recover_mode)
350            .finish()
351    }
352}
353
354impl<K, V, S, P> StoreBuilder<K, V, S, P>
355where
356    K: StorageKey,
357    V: StorageValue,
358    S: HashBuilder + Debug,
359    P: Properties,
360{
361    /// Setup disk cache store for the given in-memory cache.
362    pub fn new(name: impl Into<Cow<'static, str>>, memory: Cache<K, V, S, P>, metrics: Arc<Metrics>) -> Self {
363        Self {
364            name: name.into(),
365            memory,
366            metrics,
367
368            io_engine_config: None,
369            engine_config: None,
370
371            spawner: None,
372
373            compression: Compression::default(),
374            recover_mode: RecoverMode::default(),
375            #[cfg(any(test, feature = "test_utils"))]
376            load_throttle_switch: LoadThrottleSwitch::default(),
377        }
378    }
379
380    /// Set io engine config for the disk cache store.
381    ///
382    /// Default: [`crate::io::engine::psync::PsyncIoEngineConfig`].
383    pub fn with_io_engine_config(mut self, io_engine_builder: impl Into<Box<dyn IoEngineConfig>>) -> Self {
384        self.io_engine_config = Some(io_engine_builder.into());
385        self
386    }
387
388    /// Set engine config for the disk cache store.
389    pub fn with_engine_config(mut self, config: impl Into<Box<dyn EngineConfig<K, V, P>>>) -> Self {
390        self.engine_config = Some(config.into());
391        self
392    }
393
394    /// Set the compression algorithm of the disk cache store.
395    ///
396    /// Default: [`Compression::None`].
397    pub fn with_compression(mut self, compression: Compression) -> Self {
398        self.compression = compression;
399        self
400    }
401
402    /// Set the recover mode for the disk cache store.
403    ///
404    /// See more in [`RecoverMode`].
405    ///
406    /// Default: [`RecoverMode::Quiet`].
407    pub fn with_recover_mode(mut self, recover_mode: RecoverMode) -> Self {
408        self.recover_mode = recover_mode;
409        self
410    }
411
412    /// Configure the task spawner for the disk cache store.
413    ///
414    /// By default, it will use the current spawner that built foyer.
415    ///
416    /// For example, with tokio, it will be `tokio::runtime::Handle::current()`.
417    ///
418    /// FYI: [`Spawner`] and [`Spawner::current()`]
419    pub fn with_spawner(mut self, spawner: Spawner) -> Self {
420        self.spawner = Some(spawner);
421        self
422    }
423
424    /// Set the load throttle switch for the disk cache store.
425    #[cfg(any(test, feature = "test_utils"))]
426    pub fn with_load_throttle_switch(mut self, switch: LoadThrottleSwitch) -> Self {
427        self.load_throttle_switch = switch;
428        self
429    }
430
431    #[doc(hidden)]
432    pub fn is_noop(&self) -> bool {
433        self.engine_config.is_none()
434    }
435
436    /// Build the disk cache store with the given configuration.
437    pub async fn build(self) -> Result<Store<K, V, S, P>> {
438        let memory = self.memory;
439        let metrics = self.metrics;
440
441        let compression = self.compression;
442
443        let spawner = self.spawner.unwrap_or_else(Spawner::current);
444
445        let io_engine_builder = match self.io_engine_config {
446            Some(builder) => builder,
447            None => {
448                tracing::info!("[store builder]: No I/O engine builder is provided, use `PsyncIoEngineConfig` with default parameters as default.");
449                PsyncIoEngineConfig::new().boxed()
450            }
451        };
452        let io_engine = io_engine_builder
453            .build(IoEngineBuildContext {
454                spawner: spawner.clone(),
455            })
456            .await?;
457        let io_engine = MonitoredIoEngine::new(io_engine, metrics.clone());
458
459        let engine_builder = match self.engine_config {
460            Some(eb) => eb,
461            None => {
462                tracing::info!(
463                    "[store builder]: No engine builder is provided, run disk cache in mock mode that do nothing."
464                );
465
466                Box::<NoopEngineConfig<K, V, P>>::default()
467            }
468        };
469
470        let engine = engine_builder
471            .build(EngineBuildContext {
472                io_engine,
473                metrics: metrics.clone(),
474                spawner: spawner.clone(),
475                recover_mode: self.recover_mode,
476            })
477            .await?;
478
479        let keeper = Keeper::new(memory.shards());
480        let hasher = memory.hash_builder().clone();
481        #[cfg(any(test, feature = "test_utils"))]
482        let load_throttle_switch = self.load_throttle_switch;
483        let inner = StoreInner {
484            hasher,
485            keeper,
486            engine,
487            compression,
488            spawner,
489            metrics,
490            #[cfg(any(test, feature = "test_utils"))]
491            load_throttle_switch,
492        };
493        let inner = Arc::new(inner);
494        let store = Store { inner };
495
496        Ok(store)
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use foyer_common::hasher::ModHasher;
503    use foyer_memory::CacheBuilder;
504
505    use super::*;
506    use crate::{
507        engine::block::engine::BlockEngineConfig,
508        io::{device::fs::FsDeviceBuilder, engine::psync::PsyncIoEngineConfig},
509        DeviceBuilder,
510    };
511
512    #[tokio::test]
513    async fn test_build_with_unaligned_buffer_pool_size() {
514        let dir = tempfile::tempdir().unwrap();
515        let metrics = Arc::new(Metrics::noop());
516        let memory: Cache<u64, u64> = CacheBuilder::new(10).build();
517        let _ = StoreBuilder::new("test", memory, metrics)
518            .with_io_engine_config(PsyncIoEngineConfig::new())
519            .with_engine_config(
520                BlockEngineConfig::new(
521                    FsDeviceBuilder::new(dir.path())
522                        .with_capacity(64 * 1024)
523                        .build()
524                        .unwrap(),
525                )
526                .with_flushers(3)
527                .with_block_size(16 * 1024)
528                .with_buffer_pool_size(128 * 1024 * 1024),
529            )
530            .build()
531            .await
532            .unwrap();
533    }
534
535    #[tokio::test]
536    async fn test_entry_hash_collision() {
537        let dir = tempfile::tempdir().unwrap();
538        let metrics = Arc::new(Metrics::noop());
539        let memory: Cache<u128, String, ModHasher> =
540            CacheBuilder::new(10).with_hash_builder(ModHasher::default()).build();
541
542        let e1 = memory.insert(1, "foo".to_string());
543        let e2 = memory.insert(1 + 1 + u64::MAX as u128, "bar".to_string());
544
545        assert_eq!(memory.hash(e1.key()), memory.hash(e2.key()));
546
547        let store = StoreBuilder::new("test", memory, metrics)
548            .with_io_engine_config(PsyncIoEngineConfig::new())
549            .with_engine_config(
550                BlockEngineConfig::new(
551                    FsDeviceBuilder::new(dir.path())
552                        .with_capacity(4 * 1024 * 1024)
553                        .build()
554                        .unwrap(),
555                )
556                .with_block_size(16 * 1024),
557            )
558            .build()
559            .await
560            .unwrap();
561
562        store.enqueue(e1.piece(), true);
563        store.enqueue(e2.piece(), true);
564        store.wait().await;
565
566        let l1 = store.load(e1.key()).await.unwrap();
567        let l2 = store.load(e2.key()).await.unwrap();
568
569        assert!(matches!(l1, Load::Miss));
570        assert!(matches!(l2, Load::Entry { .. }));
571        assert_eq!(l2.entry().unwrap().1, "bar");
572    }
573}