1#![forbid(unsafe_code)]
2use std::path::{Path, PathBuf};
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use foyer::{
24 BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder,
25 HybridCachePolicy, IoEngineConfig,
26};
27use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
28
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum FoyerHitTier {
34 Ram,
36 Ssd,
38}
39
40impl FoyerHitTier {
41 #[must_use]
42 pub fn as_str(self) -> &'static str {
43 match self {
44 Self::Ram => "ram",
45 Self::Ssd => "ssd",
46 }
47 }
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct FoyerCacheConfig {
53 pub ram_bytes: u64,
55 pub ssd_dir: PathBuf,
57 pub ssd_bytes: u64,
59 pub block_size: usize,
62 pub buffer_pool_size: usize,
78 pub iouring: bool,
80}
81
82impl Default for FoyerCacheConfig {
83 fn default() -> Self {
84 Self {
85 ram_bytes: 1024 * 1024 * 1024,
86 ssd_dir: PathBuf::from("/tmp/wombatkv-puffer"),
87 ssd_bytes: 8_u64 * 1024 * 1024 * 1024,
88 block_size: 64 * 1024 * 1024,
89 buffer_pool_size: 256 * 1024 * 1024,
90 iouring: cfg!(target_os = "linux"),
91 }
92 }
93}
94
95#[derive(Debug)]
97pub enum FoyerCacheError {
98 Io(String),
99 InvalidConfig(String),
100 Build(String),
101 RuntimeInit(String),
102}
103
104impl std::fmt::Display for FoyerCacheError {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 Self::Io(message) => write!(f, "WombatKV puffer io error: {message}"),
108 Self::InvalidConfig(message) => write!(f, "WombatKV puffer invalid config: {message}"),
109 Self::Build(message) => write!(f, "WombatKV puffer build failed: {message}"),
110 Self::RuntimeInit(message) => {
111 write!(f, "WombatKV puffer runtime init failed: {message}")
112 }
113 }
114 }
115}
116
117impl std::error::Error for FoyerCacheError {}
118
119pub struct FoyerHybridCache {
125 inner: HybridCache<String, Bytes>,
126 runtime: Arc<Runtime>,
127 ssd_dir: PathBuf,
128 hits: AtomicU64,
129 misses: AtomicU64,
130 inserts: AtomicU64,
131}
132
133impl FoyerHybridCache {
134 pub fn open(config: FoyerCacheConfig) -> Result<Arc<Self>, FoyerCacheError> {
137 if config.ram_bytes == 0 {
138 return Err(FoyerCacheError::InvalidConfig("ram_bytes must be > 0".to_string()));
139 }
140 if config.ssd_bytes == 0 {
141 return Err(FoyerCacheError::InvalidConfig("ssd_bytes must be > 0".to_string()));
142 }
143 if config.block_size == 0 {
144 return Err(FoyerCacheError::InvalidConfig("block_size must be > 0".to_string()));
145 }
146 if config.buffer_pool_size == 0 {
147 return Err(FoyerCacheError::InvalidConfig("buffer_pool_size must be > 0".to_string()));
148 }
149 if let Some(parent) = config.ssd_dir.parent() {
150 std::fs::create_dir_all(parent).map_err(|err| {
151 FoyerCacheError::Io(format!("create ssd parent {}: {err}", parent.display()))
152 })?;
153 }
154 std::fs::create_dir_all(&config.ssd_dir).map_err(|err| {
155 FoyerCacheError::Io(format!("create ssd dir {}: {err}", config.ssd_dir.display()))
156 })?;
157
158 let runtime = Arc::new(
159 RuntimeBuilder::new_multi_thread()
160 .worker_threads(2)
161 .thread_name("wombatkv-puffer")
162 .enable_all()
163 .build()
164 .map_err(|err| FoyerCacheError::RuntimeInit(err.to_string()))?,
165 );
166 Self::open_with_runtime(config, runtime)
167 }
168
169 pub fn open_with_runtime(
172 config: FoyerCacheConfig,
173 runtime: Arc<Runtime>,
174 ) -> Result<Arc<Self>, FoyerCacheError> {
175 let ssd_dir = config.ssd_dir.clone();
176 let runtime_for_build = runtime.clone();
177 let inner = runtime_for_build
178 .block_on(async move { build_cache(config).await })
179 .map_err(|err| FoyerCacheError::Build(err.to_string()))?;
180
181 Ok(Arc::new(Self {
182 inner,
183 runtime,
184 ssd_dir,
185 hits: AtomicU64::new(0),
186 misses: AtomicU64::new(0),
187 inserts: AtomicU64::new(0),
188 }))
189 }
190
191 #[must_use]
192 pub fn ssd_dir(&self) -> &Path {
193 &self.ssd_dir
194 }
195
196 #[must_use]
197 pub fn hits(&self) -> u64 {
198 self.hits.load(Ordering::Relaxed)
199 }
200
201 #[must_use]
202 pub fn misses(&self) -> u64 {
203 self.misses.load(Ordering::Relaxed)
204 }
205
206 #[must_use]
207 pub fn inserts(&self) -> u64 {
208 self.inserts.load(Ordering::Relaxed)
209 }
210
211 pub fn get(&self, key: &str) -> Option<Bytes> {
218 self.get_with_tier(key).map(|(bytes, _)| bytes)
219 }
220
221 pub fn get_with_tier(&self, key: &str) -> Option<(Bytes, FoyerHitTier)> {
227 let t0 = std::time::Instant::now();
228 let owned = key.to_string();
229 let t_owned = t0.elapsed().as_micros() as u64;
230 if let Some(entry) = self.inner.memory().get(&owned) {
231 let t_ram = t0.elapsed().as_micros() as u64;
232 self.hits.fetch_add(1, Ordering::Relaxed);
233 let v = entry.value().clone();
234 let t_clone = t0.elapsed().as_micros() as u64;
235 crate::embed::emit_timing(
236 "foyer_get_with_tier",
237 "ram_hit",
238 &[
239 ("owned_us", t_owned),
240 ("ram_probe_us", t_ram - t_owned),
241 ("clone_us", t_clone - t_ram),
242 ("total_us", t_clone),
243 ],
244 );
245 return Some((v, FoyerHitTier::Ram));
246 }
247 let t_ram_miss = t0.elapsed().as_micros() as u64;
248
249 let result = self.runtime.block_on(self.inner.get(&owned));
250 let t_ssd = t0.elapsed().as_micros() as u64;
251 if let Ok(Some(entry)) = result {
252 self.hits.fetch_add(1, Ordering::Relaxed);
253 let v = entry.value().clone();
254 let t_clone = t0.elapsed().as_micros() as u64;
255 crate::embed::emit_timing(
256 "foyer_get_with_tier",
257 "ssd_hit",
258 &[
259 ("owned_us", t_owned),
260 ("ram_miss_us", t_ram_miss - t_owned),
261 ("ssd_block_on_us", t_ssd - t_ram_miss),
262 ("clone_us", t_clone - t_ssd),
263 ("total_us", t_clone),
264 ],
265 );
266 Some((v, FoyerHitTier::Ssd))
269 } else {
270 self.misses.fetch_add(1, Ordering::Relaxed);
271 let t_total = t0.elapsed().as_micros() as u64;
272 crate::embed::emit_timing(
273 "foyer_get_with_tier",
274 "miss",
275 &[
276 ("ram_miss_us", t_ram_miss - t_owned),
277 ("ssd_miss_us", t_ssd - t_ram_miss),
278 ("total_us", t_total),
279 ],
280 );
281 None
282 }
283 }
284
285 pub fn contains(&self, key: &str) -> bool {
287 let owned = key.to_string();
288 self.inner.contains(&owned)
289 }
290
291 pub fn put(&self, key: &str, payload: Bytes) {
294 self.inner.insert(key.to_string(), payload);
295 self.inserts.fetch_add(1, Ordering::Relaxed);
296 }
297
298 pub fn clear(&self) {
300 self.runtime.block_on(async {
301 let _ = self.inner.clear().await;
302 });
303 }
304
305 pub fn close(&self) {
314 self.runtime.block_on(async {
315 let _ = self.inner.close().await;
316 });
317 }
318}
319
320impl Drop for FoyerHybridCache {
321 fn drop(&mut self) {
322 self.runtime.block_on(async {
328 let _ = self.inner.close().await;
329 });
330 }
331}
332
333type BuildError = Box<dyn std::error::Error + Send + Sync>;
334
335async fn build_cache(config: FoyerCacheConfig) -> Result<HybridCache<String, Bytes>, BuildError> {
336 let mut fs_builder = FsDeviceBuilder::new(&config.ssd_dir);
337 #[cfg(target_os = "linux")]
338 {
339 fs_builder = fs_builder.with_direct(true);
340 }
341 fs_builder = fs_builder.with_capacity(config.ssd_bytes as usize);
342 let device = fs_builder.build().map_err(|err| Box::new(err) as BuildError)?;
343
344 let engine = BlockEngineConfig::new(device)
349 .with_block_size(config.block_size)
350 .with_buffer_pool_size(config.buffer_pool_size);
351 let io_engine = io_engine_config(config.iouring);
352
353 let cache = HybridCacheBuilder::new()
363 .with_policy(HybridCachePolicy::WriteOnInsertion)
364 .memory(config.ram_bytes as usize)
365 .with_weighter(|key: &String, value: &Bytes| key.len() + value.len())
366 .storage()
367 .with_engine_config(engine)
368 .with_io_engine_config(io_engine)
369 .build()
370 .await
371 .map_err(|err| Box::new(err) as BuildError)?;
372
373 Ok(cache)
374}
375
376fn io_engine_config(iouring: bool) -> Box<dyn IoEngineConfig> {
377 #[cfg(target_os = "linux")]
378 if iouring {
379 return foyer::UringIoEngineConfig::new().boxed();
380 }
381 #[cfg(not(target_os = "linux"))]
382 let _ = iouring;
383 foyer::PsyncIoEngineConfig::new().boxed()
384}
385
386#[must_use]
394pub fn config_from_lookup<F>(lookup: F) -> FoyerCacheConfig
395where
396 F: Fn(&str) -> Option<String>,
397{
398 let mut cfg = FoyerCacheConfig::default();
399 if let Some(value) = lookup("WMBT_KV_PUFFER_RAM_BYTES") {
400 if let Ok(parsed) = value.parse::<u64>() {
401 cfg.ram_bytes = parsed;
402 }
403 }
404 if let Some(value) = lookup("WMBT_KV_PUFFER_DIR") {
405 cfg.ssd_dir = PathBuf::from(value);
406 }
407 if let Some(value) = lookup("WMBT_KV_PUFFER_DISK_BYTES") {
408 if let Ok(parsed) = value.parse::<u64>() {
409 cfg.ssd_bytes = parsed;
410 }
411 }
412 if let Some(value) = lookup("WMBT_KV_PUFFER_BLOCK_SIZE_BYTES") {
413 if let Ok(parsed) = value.parse::<usize>() {
414 cfg.block_size = parsed;
415 }
416 }
417 if let Some(value) = lookup("WMBT_KV_PUFFER_BUFFER_POOL_BYTES") {
418 if let Ok(parsed) = value.parse::<usize>() {
419 cfg.buffer_pool_size = parsed;
420 }
421 }
422 if let Some(value) = lookup("WMBT_KV_PUFFER_IOURING") {
423 cfg.iouring = !(value == "0" || value.eq_ignore_ascii_case("false"));
424 }
425 cfg
426}
427
428#[must_use]
430pub fn config_from_env() -> FoyerCacheConfig {
431 config_from_lookup(|key| std::env::var(key).ok())
432}
433
434#[must_use]
436pub fn backend_selected_with<F>(lookup: F) -> bool
437where
438 F: Fn(&str) -> Option<String>,
439{
440 lookup("WMBT_KV_PUFFER_BACKEND").is_some_and(|value| value.eq_ignore_ascii_case("hybrid"))
441}
442
443#[must_use]
445pub fn backend_selected() -> bool {
446 backend_selected_with(|key| std::env::var(key).ok())
447}
448
449#[cfg(test)]
450mod tests {
451 use super::{FoyerCacheConfig, FoyerCacheError, FoyerHybridCache};
452 use bytes::Bytes;
453 use tempfile::tempdir;
454
455 fn small_config(dir: std::path::PathBuf) -> FoyerCacheConfig {
456 FoyerCacheConfig {
457 ram_bytes: 4 * 1024 * 1024,
458 ssd_dir: dir,
459 ssd_bytes: 16 * 1024 * 1024,
460 block_size: 1024 * 1024,
461 buffer_pool_size: 4 * 1024 * 1024,
462 iouring: false,
463 }
464 }
465
466 #[test]
467 fn invalid_config_is_rejected() {
468 let dir = tempdir().expect("tempdir");
469 let mut cfg = small_config(dir.path().to_path_buf());
470 cfg.ram_bytes = 0;
471 assert!(matches!(FoyerHybridCache::open(cfg), Err(FoyerCacheError::InvalidConfig(_))));
472
473 let mut cfg = small_config(dir.path().to_path_buf());
474 cfg.ssd_bytes = 0;
475 assert!(matches!(FoyerHybridCache::open(cfg), Err(FoyerCacheError::InvalidConfig(_))));
476
477 let mut cfg = small_config(dir.path().to_path_buf());
478 cfg.block_size = 0;
479 assert!(matches!(FoyerHybridCache::open(cfg), Err(FoyerCacheError::InvalidConfig(_))));
480 }
481
482 #[test]
483 fn put_get_round_trip_hits_warm_path_and_returns_none_for_missing_keys() {
484 let dir = tempdir().expect("tempdir");
485 let cache = FoyerHybridCache::open(small_config(dir.path().to_path_buf()))
486 .expect("open foyer cache");
487
488 let payload = Bytes::from_static(b"wombatkv-foyer-round-trip");
489 cache.put("ns/key-1", payload.clone());
490 assert_eq!(cache.inserts(), 1);
491
492 let fetched = cache.get("ns/key-1").expect("warm hit");
493 assert_eq!(fetched, payload);
494 assert!(cache.hits() >= 1);
495
496 assert!(cache.get("ns/key-missing").is_none());
497 assert_eq!(cache.misses(), 1);
498 }
499
500 #[test]
501 fn clear_drops_all_entries_so_subsequent_get_misses() {
502 let dir = tempdir().expect("tempdir");
503 let cache = FoyerHybridCache::open(small_config(dir.path().to_path_buf()))
504 .expect("open foyer cache");
505
506 cache.put("ns/key", Bytes::from_static(b"payload"));
507 assert!(cache.get("ns/key").is_some());
508
509 cache.clear();
510 assert!(cache.get("ns/key").is_none());
511 }
512
513 #[test]
514 fn config_from_lookup_overrides_defaults() {
515 let lookup = |key: &str| match key {
516 "WMBT_KV_PUFFER_RAM_BYTES" => Some("65536".to_string()),
517 "WMBT_KV_PUFFER_DISK_BYTES" => Some("131072".to_string()),
518 "WMBT_KV_PUFFER_BLOCK_SIZE_BYTES" => Some("4096".to_string()),
519 "WMBT_KV_PUFFER_IOURING" => Some("false".to_string()),
520 "WMBT_KV_PUFFER_DIR" => Some("/tmp/wombatkv-puffer-env-test".to_string()),
521 _ => None,
522 };
523
524 let cfg = super::config_from_lookup(lookup);
525 assert_eq!(cfg.ram_bytes, 65536);
526 assert_eq!(cfg.ssd_bytes, 131072);
527 assert_eq!(cfg.block_size, 4096);
528 assert!(!cfg.iouring);
529 assert_eq!(cfg.ssd_dir.to_string_lossy(), "/tmp/wombatkv-puffer-env-test");
530 }
531
532 #[test]
533 fn config_from_lookup_falls_back_to_defaults_when_lookup_returns_none() {
534 let cfg = super::config_from_lookup(|_| None);
535 let defaults = FoyerCacheConfig::default();
536 assert_eq!(cfg.ram_bytes, defaults.ram_bytes);
537 assert_eq!(cfg.ssd_bytes, defaults.ssd_bytes);
538 assert_eq!(cfg.block_size, defaults.block_size);
539 assert_eq!(cfg.iouring, defaults.iouring);
540 assert_eq!(cfg.ssd_dir, defaults.ssd_dir);
541 }
542
543 #[test]
544 fn backend_selected_only_when_lookup_says_hybrid() {
545 assert!(!super::backend_selected_with(|_| None));
546 assert!(!super::backend_selected_with(|_| Some("legacy".to_string())));
547 assert!(super::backend_selected_with(|_| Some("hybrid".to_string())));
548 assert!(super::backend_selected_with(|_| Some("HYBRID".to_string())));
549 assert!(super::backend_selected_with(|_| Some("Hybrid".to_string())));
550 }
551}