1use std::{
16 any::{Any, TypeId},
17 borrow::Cow,
18 fmt::Debug,
19 hash::Hash,
20 sync::Arc,
21 time::Instant,
22};
23
24use equivalent::Equivalent;
25use foyer_common::{
26 code::{HashBuilder, StorageKey, StorageValue},
27 error::Result,
28 metrics::Metrics,
29 properties::{Age, Properties},
30 spawn::Spawner,
31};
32use foyer_memory::{Cache, Piece};
33
34#[cfg(feature = "test_utils")]
35use crate::test_utils::*;
36use crate::{
37 compress::Compression,
38 engine::{
39 noop::{NoopEngine, NoopEngineConfig},
40 Engine, EngineBuildContext, EngineConfig, Load, Populated, RecoverMode,
41 },
42 io::{
43 device::{statistics::Statistics, throttle::Throttle, Device},
44 engine::{monitor::MonitoredIoEngine, psync::PsyncIoEngineConfig, IoEngineBuildContext, IoEngineConfig},
45 },
46 keeper::Keeper,
47 serde::EntrySerializer,
48 StorageFilterResult,
49};
50
51pub struct Store<K, V, S, P>
53where
54 K: StorageKey,
55 V: StorageValue,
56 S: HashBuilder + Debug,
57 P: Properties,
58{
59 inner: Arc<StoreInner<K, V, S, P>>,
60}
61
62struct StoreInner<K, V, S, P>
63where
64 K: StorageKey,
65 V: StorageValue,
66 S: HashBuilder + Debug,
67 P: Properties,
68{
69 hasher: Arc<S>,
70
71 keeper: Keeper<K, V, P>,
72 engine: Arc<dyn Engine<K, V, P>>,
73
74 compression: Compression,
75
76 spawner: Spawner,
77
78 metrics: Arc<Metrics>,
79
80 #[cfg(any(test, feature = "test_utils"))]
81 load_throttle_switch: LoadThrottleSwitch,
82}
83
84impl<K, V, S, P> Debug for Store<K, V, S, P>
85where
86 K: StorageKey,
87 V: StorageValue,
88 S: HashBuilder + Debug,
89 P: Properties,
90{
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("Store")
93 .field("keeper", &self.inner.keeper)
94 .field("engine", &self.inner.engine)
95 .field("compression", &self.inner.compression)
96 .field("runtimes", &self.inner.spawner)
97 .finish()
98 }
99}
100
101impl<K, V, S, P> Clone for Store<K, V, S, P>
102where
103 K: StorageKey,
104 V: StorageValue,
105 S: HashBuilder + Debug,
106 P: Properties,
107{
108 fn clone(&self) -> Self {
109 Self {
110 inner: self.inner.clone(),
111 }
112 }
113}
114
115impl<K, V, S, P> Store<K, V, S, P>
116where
117 K: StorageKey,
118 V: StorageValue,
119 S: HashBuilder + Debug,
120 P: Properties,
121{
122 pub async fn close(&self) -> Result<()> {
126 self.inner.engine.close().await
127 }
128
129 pub fn filter(&self, hash: u64, estimated_size: usize) -> StorageFilterResult {
131 self.inner.engine.filter(hash, estimated_size)
132 }
133
134 pub fn enqueue(&self, piece: Piece<K, V, P>, force: bool) {
136 tracing::trace!(hash = piece.hash(), "[store]: enqueue piece");
137 let now = Instant::now();
138
139 if force
140 || self
141 .filter(
142 piece.hash(),
143 piece.key().estimated_size() + piece.value().estimated_size(),
144 )
145 .is_admitted()
146 {
147 let estimated_size = EntrySerializer::estimated_size(piece.key(), piece.value());
148 let rpiece = self.inner.keeper.insert(piece);
149 self.inner.engine.enqueue(rpiece, estimated_size);
150 }
151
152 self.inner.metrics.storage_enqueue.increase(1);
153 self.inner
154 .metrics
155 .storage_enqueue_duration
156 .record(now.elapsed().as_secs_f64());
157 }
158
159 pub async fn load<Q>(&self, key: &Q) -> Result<Load<K, V, P>>
161 where
162 Q: Hash + Equivalent<K> + ?Sized,
163 {
164 let now = Instant::now();
165
166 let hash = self.inner.hasher.hash_one(key);
167
168 if let Some(piece) = self.inner.keeper.get(hash, key) {
169 tracing::trace!(hash, "[store]: load from keeper");
170 return Ok(Load::Piece {
171 piece,
172 populated: Populated { age: Age::Young },
173 });
174 }
175
176 #[cfg(feature = "test_utils")]
177 if self.inner.load_throttle_switch.is_throttled() {
178 self.inner.metrics.storage_throttled.increase(1);
179 self.inner
180 .metrics
181 .storage_throttled_duration
182 .record(now.elapsed().as_secs_f64());
183 return Ok(Load::Throttled);
184 }
185
186 match self.inner.engine.load(hash).await {
187 Ok(Load::Entry {
188 key: k,
189 value: v,
190 populated: p,
191 }) if key.equivalent(&k) => {
192 self.inner.metrics.storage_hit.increase(1);
193 self.inner
194 .metrics
195 .storage_hit_duration
196 .record(now.elapsed().as_secs_f64());
197 Ok(Load::Entry {
198 key: k,
199 value: v,
200 populated: p,
201 })
202 }
203 Ok(Load::Piece { piece, populated }) if key.equivalent(piece.key()) => {
204 self.inner.metrics.storage_hit.increase(1);
205 self.inner
206 .metrics
207 .storage_hit_duration
208 .record(now.elapsed().as_secs_f64());
209 Ok(Load::Piece { piece, populated })
210 }
211 Ok(Load::Entry { .. }) | Ok(Load::Piece { .. }) | Ok(Load::Miss) => {
212 self.inner.metrics.storage_miss.increase(1);
213 self.inner
214 .metrics
215 .storage_miss_duration
216 .record(now.elapsed().as_secs_f64());
217 Ok(Load::Miss)
218 }
219 Ok(Load::Throttled) => {
220 self.inner.metrics.storage_throttled.increase(1);
221 self.inner
222 .metrics
223 .storage_throttled_duration
224 .record(now.elapsed().as_secs_f64());
225 Ok(Load::Throttled)
226 }
227 Err(e) => {
228 self.inner.metrics.storage_error.increase(1);
229 Err(e)
230 }
231 }
232 }
233
234 pub fn delete<'a, Q>(&'a self, key: &'a Q)
236 where
237 Q: Hash + Equivalent<K> + ?Sized,
238 {
239 let now = Instant::now();
240
241 let hash = self.inner.hasher.hash_one(key);
242 self.inner.engine.delete(hash);
243
244 self.inner.metrics.storage_delete.increase(1);
245 self.inner
246 .metrics
247 .storage_delete_duration
248 .record(now.elapsed().as_secs_f64());
249 }
250
251 pub fn may_contains<Q>(&self, key: &Q) -> bool
255 where
256 Q: Hash + Equivalent<K> + ?Sized,
257 {
258 let hash = self.inner.hasher.hash_one(key);
259 self.inner.engine.may_contains(hash)
260 }
261
262 pub async fn destroy(&self) -> Result<()> {
264 self.inner.engine.destroy().await
265 }
266
267 pub fn device(&self) -> &Arc<dyn Device> {
269 self.inner.engine.device()
270 }
271
272 pub fn statistics(&self) -> &Arc<Statistics> {
274 self.inner.engine.device().statistics()
275 }
276
277 pub fn throttle(&self) -> &Throttle {
279 self.inner.engine.device().statistics().throttle()
280 }
281
282 pub fn spawner(&self) -> &Spawner {
284 &self.inner.spawner
285 }
286
287 pub async fn wait(&self) {
289 self.inner.engine.wait().await
290 }
291
292 pub fn entry_estimated_size(&self, key: &K, value: &V) -> usize {
294 EntrySerializer::estimated_size(key, value)
295 }
296
297 #[cfg(feature = "test_utils")]
299 pub fn load_throttle_switch(&self) -> &LoadThrottleSwitch {
300 &self.inner.load_throttle_switch
301 }
302
303 pub fn is_enabled(&self) -> bool {
305 self.inner.engine.type_id() != TypeId::of::<Arc<NoopEngine<K, V, P>>>()
306 }
307}
308
309pub struct StoreBuilder<K, V, S, P>
311where
312 K: StorageKey,
313 V: StorageValue,
314 S: HashBuilder + Debug,
315 P: Properties,
316{
317 name: Cow<'static, str>,
318 memory: Cache<K, V, S, P>,
319 metrics: Arc<Metrics>,
320
321 io_engine_config: Option<Box<dyn IoEngineConfig>>,
322 engine_config: Option<Box<dyn EngineConfig<K, V, P>>>,
323
324 spawner: Option<Spawner>,
325
326 compression: Compression,
327 recover_mode: RecoverMode,
328
329 #[cfg(any(test, feature = "test_utils"))]
330 load_throttle_switch: LoadThrottleSwitch,
331}
332
333impl<K, V, S, P> Debug for StoreBuilder<K, V, S, P>
334where
335 K: StorageKey,
336 V: StorageValue,
337 S: HashBuilder + Debug,
338 P: Properties,
339{
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 f.debug_struct("StoreBuilder")
342 .field("name", &self.name)
343 .field("memory", &self.memory)
344 .field("metrics", &self.metrics)
345 .field("io_engine_builder", &self.io_engine_config)
346 .field("engine_builder", &self.engine_config)
347 .field("spawner", &self.spawner)
348 .field("compression", &self.compression)
349 .field("recover_mode", &self.recover_mode)
350 .finish()
351 }
352}
353
354impl<K, V, S, P> StoreBuilder<K, V, S, P>
355where
356 K: StorageKey,
357 V: StorageValue,
358 S: HashBuilder + Debug,
359 P: Properties,
360{
361 pub fn new(name: impl Into<Cow<'static, str>>, memory: Cache<K, V, S, P>, metrics: Arc<Metrics>) -> Self {
363 Self {
364 name: name.into(),
365 memory,
366 metrics,
367
368 io_engine_config: None,
369 engine_config: None,
370
371 spawner: None,
372
373 compression: Compression::default(),
374 recover_mode: RecoverMode::default(),
375 #[cfg(any(test, feature = "test_utils"))]
376 load_throttle_switch: LoadThrottleSwitch::default(),
377 }
378 }
379
380 pub fn with_io_engine_config(mut self, io_engine_builder: impl Into<Box<dyn IoEngineConfig>>) -> Self {
384 self.io_engine_config = Some(io_engine_builder.into());
385 self
386 }
387
388 pub fn with_engine_config(mut self, config: impl Into<Box<dyn EngineConfig<K, V, P>>>) -> Self {
390 self.engine_config = Some(config.into());
391 self
392 }
393
394 pub fn with_compression(mut self, compression: Compression) -> Self {
398 self.compression = compression;
399 self
400 }
401
402 pub fn with_recover_mode(mut self, recover_mode: RecoverMode) -> Self {
408 self.recover_mode = recover_mode;
409 self
410 }
411
412 pub fn with_spawner(mut self, spawner: Spawner) -> Self {
420 self.spawner = Some(spawner);
421 self
422 }
423
424 #[cfg(any(test, feature = "test_utils"))]
426 pub fn with_load_throttle_switch(mut self, switch: LoadThrottleSwitch) -> Self {
427 self.load_throttle_switch = switch;
428 self
429 }
430
431 #[doc(hidden)]
432 pub fn is_noop(&self) -> bool {
433 self.engine_config.is_none()
434 }
435
436 pub async fn build(self) -> Result<Store<K, V, S, P>> {
438 let memory = self.memory;
439 let metrics = self.metrics;
440
441 let compression = self.compression;
442
443 let spawner = self.spawner.unwrap_or_else(Spawner::current);
444
445 let io_engine_builder = match self.io_engine_config {
446 Some(builder) => builder,
447 None => {
448 tracing::info!("[store builder]: No I/O engine builder is provided, use `PsyncIoEngineConfig` with default parameters as default.");
449 PsyncIoEngineConfig::new().boxed()
450 }
451 };
452 let io_engine = io_engine_builder
453 .build(IoEngineBuildContext {
454 spawner: spawner.clone(),
455 })
456 .await?;
457 let io_engine = MonitoredIoEngine::new(io_engine, metrics.clone());
458
459 let engine_builder = match self.engine_config {
460 Some(eb) => eb,
461 None => {
462 tracing::info!(
463 "[store builder]: No engine builder is provided, run disk cache in mock mode that do nothing."
464 );
465
466 Box::<NoopEngineConfig<K, V, P>>::default()
467 }
468 };
469
470 let engine = engine_builder
471 .build(EngineBuildContext {
472 io_engine,
473 metrics: metrics.clone(),
474 spawner: spawner.clone(),
475 recover_mode: self.recover_mode,
476 })
477 .await?;
478
479 let keeper = Keeper::new(memory.shards());
480 let hasher = memory.hash_builder().clone();
481 #[cfg(any(test, feature = "test_utils"))]
482 let load_throttle_switch = self.load_throttle_switch;
483 let inner = StoreInner {
484 hasher,
485 keeper,
486 engine,
487 compression,
488 spawner,
489 metrics,
490 #[cfg(any(test, feature = "test_utils"))]
491 load_throttle_switch,
492 };
493 let inner = Arc::new(inner);
494 let store = Store { inner };
495
496 Ok(store)
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use foyer_common::hasher::ModHasher;
503 use foyer_memory::CacheBuilder;
504
505 use super::*;
506 use crate::{
507 engine::block::engine::BlockEngineConfig,
508 io::{device::fs::FsDeviceBuilder, engine::psync::PsyncIoEngineConfig},
509 DeviceBuilder,
510 };
511
512 #[tokio::test]
513 async fn test_build_with_unaligned_buffer_pool_size() {
514 let dir = tempfile::tempdir().unwrap();
515 let metrics = Arc::new(Metrics::noop());
516 let memory: Cache<u64, u64> = CacheBuilder::new(10).build();
517 let _ = StoreBuilder::new("test", memory, metrics)
518 .with_io_engine_config(PsyncIoEngineConfig::new())
519 .with_engine_config(
520 BlockEngineConfig::new(
521 FsDeviceBuilder::new(dir.path())
522 .with_capacity(64 * 1024)
523 .build()
524 .unwrap(),
525 )
526 .with_flushers(3)
527 .with_block_size(16 * 1024)
528 .with_buffer_pool_size(128 * 1024 * 1024),
529 )
530 .build()
531 .await
532 .unwrap();
533 }
534
535 #[tokio::test]
536 async fn test_entry_hash_collision() {
537 let dir = tempfile::tempdir().unwrap();
538 let metrics = Arc::new(Metrics::noop());
539 let memory: Cache<u128, String, ModHasher> =
540 CacheBuilder::new(10).with_hash_builder(ModHasher::default()).build();
541
542 let e1 = memory.insert(1, "foo".to_string());
543 let e2 = memory.insert(1 + 1 + u64::MAX as u128, "bar".to_string());
544
545 assert_eq!(memory.hash(e1.key()), memory.hash(e2.key()));
546
547 let store = StoreBuilder::new("test", memory, metrics)
548 .with_io_engine_config(PsyncIoEngineConfig::new())
549 .with_engine_config(
550 BlockEngineConfig::new(
551 FsDeviceBuilder::new(dir.path())
552 .with_capacity(4 * 1024 * 1024)
553 .build()
554 .unwrap(),
555 )
556 .with_block_size(16 * 1024),
557 )
558 .build()
559 .await
560 .unwrap();
561
562 store.enqueue(e1.piece(), true);
563 store.enqueue(e2.piece(), true);
564 store.wait().await;
565
566 let l1 = store.load(e1.key()).await.unwrap();
567 let l2 = store.load(e2.key()).await.unwrap();
568
569 assert!(matches!(l1, Load::Miss));
570 assert!(matches!(l2, Load::Entry { .. }));
571 assert_eq!(l2.entry().unwrap().1, "bar");
572 }
573}