Skip to main content

wombatkv_node/
foyer_cache.rs

1#![forbid(unsafe_code)]
2//! Hybrid memory + disk cache backend.
3//!
4//! Opt-in alternative to `nvme_cache::NvmeHotCache`. Selected at runtime via
5//! `WMBT_KV_PUFFER_BACKEND=hybrid`. The cache manages both memory and on-disk
6//! tiers internally:
7//!
8//! - S3-FIFO eviction in memory
9//! - Direct I/O on Linux to bypass the OS page cache
10//! - `io_uring` async backend (when available) for the disk tier
11//! - Block-aligned device layout
12//!
13//! sync `get` fast path probes the memory tier directly to avoid a
14//! `block_on` round trip on warm hits, which is where the production
15//! perf wins live. Foyer hybrid cache: in-process RAM (L0) + on-disk
16//! SSD (L1).
17
18use std::path::{Path, PathBuf};
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use foyer::{
24    BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder,
25    HybridCachePolicy, IoEngineConfig,
26};
27use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
28
29/// Which foyer tier served a hit. Returned alongside the payload by
30/// `FoyerCache::get_with_tier` so callers can attribute latency to the
31/// right tier in observability traces.
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum FoyerHitTier {
34    /// In-memory tier (sync probe hit).
35    Ram,
36    /// On-disk / SSD tier (async hybrid get hit; memory tier missed).
37    Ssd,
38}
39
40impl FoyerHitTier {
41    #[must_use]
42    pub fn as_str(self) -> &'static str {
43        match self {
44            Self::Ram => "ram",
45            Self::Ssd => "ssd",
46        }
47    }
48}
49
50/// Tuning knobs for the hybrid cache.
51#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct FoyerCacheConfig {
53    /// Memory-tier budget in bytes.
54    pub ram_bytes: u64,
55    /// On-disk tier directory. Created if missing.
56    pub ssd_dir: PathBuf,
57    /// On-disk tier budget in bytes.
58    pub ssd_bytes: u64,
59    /// Block size for the on-disk engine. Should be at least as large as
60    /// the largest blob the caller will store.
61    pub block_size: usize,
62    /// Per-flusher I/O buffer pool size in bytes.
63    ///
64    /// Foyer's `BlockEngine` writes via N flushers, each owning an
65    /// in-RAM staging buffer of `buffer_pool_size / flushers` bytes
66    /// (foyer 0.22 default: 16 MiB / 1 flusher = 16 MiB per flusher).
67    /// An entry whose serialized size exceeds the per-flusher buffer
68    /// is silently dropped at `Buffer::push` with
69    /// `storage_queue_buffer_overflow` and **never reaches SSD**, the
70    /// failure mode that broke the `WombatKV` "foyer SSD as a hot tier
71    /// across restarts" claim for our 28-42 MB ds4 KV blobs.
72    ///
73    /// This must be larger than the biggest blob put through `put_kv`.
74    /// We default to **256 MiB** so single 30-60 MB ds4 / vLLM KV
75    /// payloads land on SSD without the caller having to think about
76    /// it. Bump via `WMBT_KV_PUFFER_BUFFER_POOL_BYTES` for larger blobs.
77    pub buffer_pool_size: usize,
78    /// Use `io_uring` on Linux when true. Falls back to psync otherwise.
79    pub iouring: bool,
80}
81
82impl Default for FoyerCacheConfig {
83    fn default() -> Self {
84        Self {
85            ram_bytes: 1024 * 1024 * 1024,
86            ssd_dir: PathBuf::from("/tmp/wombatkv-puffer"),
87            ssd_bytes: 8_u64 * 1024 * 1024 * 1024,
88            block_size: 64 * 1024 * 1024,
89            buffer_pool_size: 256 * 1024 * 1024,
90            iouring: cfg!(target_os = "linux"),
91        }
92    }
93}
94
95/// Cache operation failures surfaced to callers.
96#[derive(Debug)]
97pub enum FoyerCacheError {
98    Io(String),
99    InvalidConfig(String),
100    Build(String),
101    RuntimeInit(String),
102}
103
104impl std::fmt::Display for FoyerCacheError {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Io(message) => write!(f, "WombatKV puffer io error: {message}"),
108            Self::InvalidConfig(message) => write!(f, "WombatKV puffer invalid config: {message}"),
109            Self::Build(message) => write!(f, "WombatKV puffer build failed: {message}"),
110            Self::RuntimeInit(message) => {
111                write!(f, "WombatKV puffer runtime init failed: {message}")
112            }
113        }
114    }
115}
116
117impl std::error::Error for FoyerCacheError {}
118
119/// Hybrid memory + disk cache backed by foyer.
120///
121/// Construction requires an async runtime because foyer's builder is async.
122/// We pin a dedicated multi-thread runtime to the cache instance and reuse
123/// it for the slow path on memory miss.
124pub struct FoyerHybridCache {
125    inner: HybridCache<String, Bytes>,
126    runtime: Arc<Runtime>,
127    ssd_dir: PathBuf,
128    hits: AtomicU64,
129    misses: AtomicU64,
130    inserts: AtomicU64,
131}
132
133impl FoyerHybridCache {
134    /// Build a new hybrid cache. Creates the on-disk directory if missing.
135    /// Allocates a fresh tokio runtime for foyer's async paths.
136    pub fn open(config: FoyerCacheConfig) -> Result<Arc<Self>, FoyerCacheError> {
137        if config.ram_bytes == 0 {
138            return Err(FoyerCacheError::InvalidConfig("ram_bytes must be > 0".to_string()));
139        }
140        if config.ssd_bytes == 0 {
141            return Err(FoyerCacheError::InvalidConfig("ssd_bytes must be > 0".to_string()));
142        }
143        if config.block_size == 0 {
144            return Err(FoyerCacheError::InvalidConfig("block_size must be > 0".to_string()));
145        }
146        if config.buffer_pool_size == 0 {
147            return Err(FoyerCacheError::InvalidConfig("buffer_pool_size must be > 0".to_string()));
148        }
149        if let Some(parent) = config.ssd_dir.parent() {
150            std::fs::create_dir_all(parent).map_err(|err| {
151                FoyerCacheError::Io(format!("create ssd parent {}: {err}", parent.display()))
152            })?;
153        }
154        std::fs::create_dir_all(&config.ssd_dir).map_err(|err| {
155            FoyerCacheError::Io(format!("create ssd dir {}: {err}", config.ssd_dir.display()))
156        })?;
157
158        let runtime = Arc::new(
159            RuntimeBuilder::new_multi_thread()
160                .worker_threads(2)
161                .thread_name("wombatkv-puffer")
162                .enable_all()
163                .build()
164                .map_err(|err| FoyerCacheError::RuntimeInit(err.to_string()))?,
165        );
166        Self::open_with_runtime(config, runtime)
167    }
168
169    /// Build the cache with a caller-provided runtime. Useful for sharing a
170    /// runtime across multiple subsystems in the same process.
171    pub fn open_with_runtime(
172        config: FoyerCacheConfig,
173        runtime: Arc<Runtime>,
174    ) -> Result<Arc<Self>, FoyerCacheError> {
175        let ssd_dir = config.ssd_dir.clone();
176        let runtime_for_build = runtime.clone();
177        let inner = runtime_for_build
178            .block_on(async move { build_cache(config).await })
179            .map_err(|err| FoyerCacheError::Build(err.to_string()))?;
180
181        Ok(Arc::new(Self {
182            inner,
183            runtime,
184            ssd_dir,
185            hits: AtomicU64::new(0),
186            misses: AtomicU64::new(0),
187            inserts: AtomicU64::new(0),
188        }))
189    }
190
191    #[must_use]
192    pub fn ssd_dir(&self) -> &Path {
193        &self.ssd_dir
194    }
195
196    #[must_use]
197    pub fn hits(&self) -> u64 {
198        self.hits.load(Ordering::Relaxed)
199    }
200
201    #[must_use]
202    pub fn misses(&self) -> u64 {
203        self.misses.load(Ordering::Relaxed)
204    }
205
206    #[must_use]
207    pub fn inserts(&self) -> u64 {
208        self.inserts.load(Ordering::Relaxed)
209    }
210
211    /// Sync read. Probes the in-memory tier first to avoid a runtime hop
212    /// on warm hits. Falls through to foyer's async hybrid `get` only when
213    /// the memory tier misses.
214    ///
215    /// Convenience wrapper over `get_with_tier` for callers that don't
216    /// need to distinguish RAM vs SSD hits.
217    pub fn get(&self, key: &str) -> Option<Bytes> {
218        self.get_with_tier(key).map(|(bytes, _)| bytes)
219    }
220
221    /// Sync read with tier attribution. Returns the tier the value was
222    /// served from on hit. Lets callers attribute load latency to the
223    /// right tier, critical for diagnosing the "blob too big for RAM
224    /// tier, always SSD-streamed" case (e.g. qwen3's pre-allocated 4.7
225    /// GiB KV cache against a 2 GiB RAM budget).
226    pub fn get_with_tier(&self, key: &str) -> Option<(Bytes, FoyerHitTier)> {
227        let t0 = std::time::Instant::now();
228        let owned = key.to_string();
229        let t_owned = t0.elapsed().as_micros() as u64;
230        if let Some(entry) = self.inner.memory().get(&owned) {
231            let t_ram = t0.elapsed().as_micros() as u64;
232            self.hits.fetch_add(1, Ordering::Relaxed);
233            let v = entry.value().clone();
234            let t_clone = t0.elapsed().as_micros() as u64;
235            crate::embed::emit_timing(
236                "foyer_get_with_tier",
237                "ram_hit",
238                &[
239                    ("owned_us", t_owned),
240                    ("ram_probe_us", t_ram - t_owned),
241                    ("clone_us", t_clone - t_ram),
242                    ("total_us", t_clone),
243                ],
244            );
245            return Some((v, FoyerHitTier::Ram));
246        }
247        let t_ram_miss = t0.elapsed().as_micros() as u64;
248
249        let result = self.runtime.block_on(self.inner.get(&owned));
250        let t_ssd = t0.elapsed().as_micros() as u64;
251        if let Ok(Some(entry)) = result {
252            self.hits.fetch_add(1, Ordering::Relaxed);
253            let v = entry.value().clone();
254            let t_clone = t0.elapsed().as_micros() as u64;
255            crate::embed::emit_timing(
256                "foyer_get_with_tier",
257                "ssd_hit",
258                &[
259                    ("owned_us", t_owned),
260                    ("ram_miss_us", t_ram_miss - t_owned),
261                    ("ssd_block_on_us", t_ssd - t_ram_miss),
262                    ("clone_us", t_clone - t_ssd),
263                    ("total_us", t_clone),
264                ],
265            );
266            // Sync probe of the memory tier missed but the async
267            // hybrid get hit, value came from the disk/SSD tier.
268            Some((v, FoyerHitTier::Ssd))
269        } else {
270            self.misses.fetch_add(1, Ordering::Relaxed);
271            let t_total = t0.elapsed().as_micros() as u64;
272            crate::embed::emit_timing(
273                "foyer_get_with_tier",
274                "miss",
275                &[
276                    ("ram_miss_us", t_ram_miss - t_owned),
277                    ("ssd_miss_us", t_ssd - t_ram_miss),
278                    ("total_us", t_total),
279                ],
280            );
281            None
282        }
283    }
284
285    /// Sync existence check that does not materialize the cached payload.
286    pub fn contains(&self, key: &str) -> bool {
287        let owned = key.to_string();
288        self.inner.contains(&owned)
289    }
290
291    /// Sync write. Foyer's `insert` is sync; the disk-side flush happens on
292    /// background workers driven by the cache's spawner.
293    pub fn put(&self, key: &str, payload: Bytes) {
294        self.inner.insert(key.to_string(), payload);
295        self.inserts.fetch_add(1, Ordering::Relaxed);
296    }
297
298    /// Drop all entries from both tiers.
299    pub fn clear(&self) {
300        self.runtime.block_on(async {
301            let _ = self.inner.clear().await;
302        });
303    }
304
305    /// Block until the foyer write-queue has drained to SSD.
306    ///
307    /// Foyer's `Drop` impl spawns a close future on the cache's own
308    /// spawner but does NOT wait for it, so process exit immediately
309    /// after the last Arc clone drops can leave SSD entries un-flushed.
310    /// This method is the explicit drain hook callers should invoke
311    /// before tearing down the cache; the [`Drop`] impl on
312    /// [`FoyerHybridCache`] also calls it on a best-effort basis.
313    pub fn close(&self) {
314        self.runtime.block_on(async {
315            let _ = self.inner.close().await;
316        });
317    }
318}
319
320impl Drop for FoyerHybridCache {
321    fn drop(&mut self) {
322        // Block on foyer's close so pending SSD writes flush before the
323        // tokio runtime is torn down. Without this, the write-on-insertion
324        // path enqueues SSD writes to background workers that never get a
325        // chance to run on process exit, entries appear in RAM, never on
326        // disk, and a subsequent process restart recovers zero entries.
327        self.runtime.block_on(async {
328            let _ = self.inner.close().await;
329        });
330    }
331}
332
333type BuildError = Box<dyn std::error::Error + Send + Sync>;
334
335async fn build_cache(config: FoyerCacheConfig) -> Result<HybridCache<String, Bytes>, BuildError> {
336    let mut fs_builder = FsDeviceBuilder::new(&config.ssd_dir);
337    #[cfg(target_os = "linux")]
338    {
339        fs_builder = fs_builder.with_direct(true);
340    }
341    fs_builder = fs_builder.with_capacity(config.ssd_bytes as usize);
342    let device = fs_builder.build().map_err(|err| Box::new(err) as BuildError)?;
343
344    // `with_buffer_pool_size` is critical for large-value workloads. Foyer's
345    // default per-flusher I/O buffer is 16 MiB, so a single `put_kv` with a
346    // value larger than 16 MiB is silently dropped at the flusher buffer
347    // (`storage_queue_buffer_overflow`) and the entry never reaches SSD.
348    let engine = BlockEngineConfig::new(device)
349        .with_block_size(config.block_size)
350        .with_buffer_pool_size(config.buffer_pool_size);
351    let io_engine = io_engine_config(config.iouring);
352
353    // WriteOnInsertion (not WriteOnEviction) so entries land on SSD on
354    // every put, not only after RAM eviction. The previous policy left
355    // the SSD tier empty for typical WombatKV workloads (large single
356    // blobs that fit in RAM), so foyer recovered 0 entries on every
357    // process restart and the "warm SSD tier across restarts" claim was
358    // silently broken. Confirmed via the foyer-tier-bench microbench on
359    // 2026-05-13: under WriteOnEviction the SSD region was 8 GiB of
360    // sparse files with 0 bytes of actual data, and every restart fell
361    // through to S3.
362    let cache = HybridCacheBuilder::new()
363        .with_policy(HybridCachePolicy::WriteOnInsertion)
364        .memory(config.ram_bytes as usize)
365        .with_weighter(|key: &String, value: &Bytes| key.len() + value.len())
366        .storage()
367        .with_engine_config(engine)
368        .with_io_engine_config(io_engine)
369        .build()
370        .await
371        .map_err(|err| Box::new(err) as BuildError)?;
372
373    Ok(cache)
374}
375
376fn io_engine_config(iouring: bool) -> Box<dyn IoEngineConfig> {
377    #[cfg(target_os = "linux")]
378    if iouring {
379        return foyer::UringIoEngineConfig::new().boxed();
380    }
381    #[cfg(not(target_os = "linux"))]
382    let _ = iouring;
383    foyer::PsyncIoEngineConfig::new().boxed()
384}
385
386/// Read [`FoyerCacheConfig`] from a key/value lookup. Tested in isolation
387/// from process env so callers can drive deterministic unit tests without
388/// mutating global state.
389///
390/// Keys: `WMBT_KV_PUFFER_RAM_BYTES`, `WMBT_KV_PUFFER_DIR`, `WMBT_KV_PUFFER_DISK_BYTES`,
391/// `WMBT_KV_PUFFER_BLOCK_SIZE_BYTES`, `WMBT_KV_PUFFER_BUFFER_POOL_BYTES`,
392/// `WMBT_KV_PUFFER_IOURING`.
393#[must_use]
394pub fn config_from_lookup<F>(lookup: F) -> FoyerCacheConfig
395where
396    F: Fn(&str) -> Option<String>,
397{
398    let mut cfg = FoyerCacheConfig::default();
399    if let Some(value) = lookup("WMBT_KV_PUFFER_RAM_BYTES") {
400        if let Ok(parsed) = value.parse::<u64>() {
401            cfg.ram_bytes = parsed;
402        }
403    }
404    if let Some(value) = lookup("WMBT_KV_PUFFER_DIR") {
405        cfg.ssd_dir = PathBuf::from(value);
406    }
407    if let Some(value) = lookup("WMBT_KV_PUFFER_DISK_BYTES") {
408        if let Ok(parsed) = value.parse::<u64>() {
409            cfg.ssd_bytes = parsed;
410        }
411    }
412    if let Some(value) = lookup("WMBT_KV_PUFFER_BLOCK_SIZE_BYTES") {
413        if let Ok(parsed) = value.parse::<usize>() {
414            cfg.block_size = parsed;
415        }
416    }
417    if let Some(value) = lookup("WMBT_KV_PUFFER_BUFFER_POOL_BYTES") {
418        if let Ok(parsed) = value.parse::<usize>() {
419            cfg.buffer_pool_size = parsed;
420        }
421    }
422    if let Some(value) = lookup("WMBT_KV_PUFFER_IOURING") {
423        cfg.iouring = !(value == "0" || value.eq_ignore_ascii_case("false"));
424    }
425    cfg
426}
427
428/// Read [`FoyerCacheConfig`] from process environment variables.
429#[must_use]
430pub fn config_from_env() -> FoyerCacheConfig {
431    config_from_lookup(|key| std::env::var(key).ok())
432}
433
434/// Returns true when the supplied lookup yields `hybrid` for `WMBT_KV_PUFFER_BACKEND`.
435#[must_use]
436pub fn backend_selected_with<F>(lookup: F) -> bool
437where
438    F: Fn(&str) -> Option<String>,
439{
440    lookup("WMBT_KV_PUFFER_BACKEND").is_some_and(|value| value.eq_ignore_ascii_case("hybrid"))
441}
442
443/// Returns true when `WMBT_KV_PUFFER_BACKEND=hybrid` in the process environment.
444#[must_use]
445pub fn backend_selected() -> bool {
446    backend_selected_with(|key| std::env::var(key).ok())
447}
448
449#[cfg(test)]
450mod tests {
451    use super::{FoyerCacheConfig, FoyerCacheError, FoyerHybridCache};
452    use bytes::Bytes;
453    use tempfile::tempdir;
454
455    fn small_config(dir: std::path::PathBuf) -> FoyerCacheConfig {
456        FoyerCacheConfig {
457            ram_bytes: 4 * 1024 * 1024,
458            ssd_dir: dir,
459            ssd_bytes: 16 * 1024 * 1024,
460            block_size: 1024 * 1024,
461            buffer_pool_size: 4 * 1024 * 1024,
462            iouring: false,
463        }
464    }
465
466    #[test]
467    fn invalid_config_is_rejected() {
468        let dir = tempdir().expect("tempdir");
469        let mut cfg = small_config(dir.path().to_path_buf());
470        cfg.ram_bytes = 0;
471        assert!(matches!(FoyerHybridCache::open(cfg), Err(FoyerCacheError::InvalidConfig(_))));
472
473        let mut cfg = small_config(dir.path().to_path_buf());
474        cfg.ssd_bytes = 0;
475        assert!(matches!(FoyerHybridCache::open(cfg), Err(FoyerCacheError::InvalidConfig(_))));
476
477        let mut cfg = small_config(dir.path().to_path_buf());
478        cfg.block_size = 0;
479        assert!(matches!(FoyerHybridCache::open(cfg), Err(FoyerCacheError::InvalidConfig(_))));
480    }
481
482    #[test]
483    fn put_get_round_trip_hits_warm_path_and_returns_none_for_missing_keys() {
484        let dir = tempdir().expect("tempdir");
485        let cache = FoyerHybridCache::open(small_config(dir.path().to_path_buf()))
486            .expect("open foyer cache");
487
488        let payload = Bytes::from_static(b"wombatkv-foyer-round-trip");
489        cache.put("ns/key-1", payload.clone());
490        assert_eq!(cache.inserts(), 1);
491
492        let fetched = cache.get("ns/key-1").expect("warm hit");
493        assert_eq!(fetched, payload);
494        assert!(cache.hits() >= 1);
495
496        assert!(cache.get("ns/key-missing").is_none());
497        assert_eq!(cache.misses(), 1);
498    }
499
500    #[test]
501    fn clear_drops_all_entries_so_subsequent_get_misses() {
502        let dir = tempdir().expect("tempdir");
503        let cache = FoyerHybridCache::open(small_config(dir.path().to_path_buf()))
504            .expect("open foyer cache");
505
506        cache.put("ns/key", Bytes::from_static(b"payload"));
507        assert!(cache.get("ns/key").is_some());
508
509        cache.clear();
510        assert!(cache.get("ns/key").is_none());
511    }
512
513    #[test]
514    fn config_from_lookup_overrides_defaults() {
515        let lookup = |key: &str| match key {
516            "WMBT_KV_PUFFER_RAM_BYTES" => Some("65536".to_string()),
517            "WMBT_KV_PUFFER_DISK_BYTES" => Some("131072".to_string()),
518            "WMBT_KV_PUFFER_BLOCK_SIZE_BYTES" => Some("4096".to_string()),
519            "WMBT_KV_PUFFER_IOURING" => Some("false".to_string()),
520            "WMBT_KV_PUFFER_DIR" => Some("/tmp/wombatkv-puffer-env-test".to_string()),
521            _ => None,
522        };
523
524        let cfg = super::config_from_lookup(lookup);
525        assert_eq!(cfg.ram_bytes, 65536);
526        assert_eq!(cfg.ssd_bytes, 131072);
527        assert_eq!(cfg.block_size, 4096);
528        assert!(!cfg.iouring);
529        assert_eq!(cfg.ssd_dir.to_string_lossy(), "/tmp/wombatkv-puffer-env-test");
530    }
531
532    #[test]
533    fn config_from_lookup_falls_back_to_defaults_when_lookup_returns_none() {
534        let cfg = super::config_from_lookup(|_| None);
535        let defaults = FoyerCacheConfig::default();
536        assert_eq!(cfg.ram_bytes, defaults.ram_bytes);
537        assert_eq!(cfg.ssd_bytes, defaults.ssd_bytes);
538        assert_eq!(cfg.block_size, defaults.block_size);
539        assert_eq!(cfg.iouring, defaults.iouring);
540        assert_eq!(cfg.ssd_dir, defaults.ssd_dir);
541    }
542
543    #[test]
544    fn backend_selected_only_when_lookup_says_hybrid() {
545        assert!(!super::backend_selected_with(|_| None));
546        assert!(!super::backend_selected_with(|_| Some("legacy".to_string())));
547        assert!(super::backend_selected_with(|_| Some("hybrid".to_string())));
548        assert!(super::backend_selected_with(|_| Some("HYBRID".to_string())));
549        assert!(super::backend_selected_with(|_| Some("Hybrid".to_string())));
550    }
551}