1use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex, MutexGuard, Weak};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
13use kevy_store::{ExpireStats, StoreError};
14
15use crate::config::{Config, TtlReaperMode};
16use crate::pubsub::PubsubBus;
17
18#[derive(Clone)]
42pub struct Store {
43 inner: Arc<Mutex<Inner>>,
44 guard: Arc<DropGuard>,
47 config: Config,
48}
49
50pub struct WeakStore {
56 inner: Weak<Mutex<Inner>>,
57 guard: Weak<DropGuard>,
58 config: Config,
59}
60
61impl WeakStore {
62 pub fn upgrade(&self) -> Option<Store> {
65 Some(Store {
66 inner: self.inner.upgrade()?,
67 guard: self.guard.upgrade()?,
68 config: self.config.clone(),
69 })
70 }
71}
72
73pub(crate) struct Inner {
74 pub(crate) store: kevy_store::Store,
75 pub(crate) aof: Option<Aof>,
76 pub(crate) bus: PubsubBus,
77}
78
79pub(crate) struct DropGuard {
85 reaper_stop: Option<Arc<AtomicBool>>,
86 reaper_join: Mutex<Option<JoinHandle<()>>>,
87 inner_for_flush: Arc<Mutex<Inner>>,
88}
89
90impl Store {
91 pub fn open(config: Config) -> io::Result<Self> {
100 let mut store = kevy_store::Store::new();
101 store.set_max_memory(config.maxmemory, config.eviction_policy);
102
103 let aof = if let Some(dir) = &config.data_dir {
104 std::fs::create_dir_all(dir)?;
105 let snap_path = dir.join(&config.snapshot_filename);
106 if snap_path.exists() {
107 load_snapshot(&mut store, &snap_path)?;
108 }
109 let aof_path = dir.join(&config.aof_filename);
110 if aof_path.exists() {
111 replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
112 }
113 if config.aof {
114 Some(Aof::open(&aof_path, config.appendfsync)?)
115 } else {
116 None
117 }
118 } else {
119 None
120 };
121
122 let inner = Arc::new(Mutex::new(Inner {
123 store,
124 aof,
125 bus: PubsubBus::new(),
126 }));
127
128 let (reaper_stop, reaper_join) = match config.ttl_reaper {
129 TtlReaperMode::Manual => (None, None),
130 TtlReaperMode::Background => {
131 let stop = Arc::new(AtomicBool::new(false));
132 let stop_t = stop.clone();
133 let inner_t = inner.clone();
134 let interval = config.reaper_interval;
135 let samples = config.reaper_samples;
136 let rounds = config.reaper_max_rounds;
137 let handle = std::thread::Builder::new()
138 .name(String::from("kevy-embedded-reaper"))
139 .spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
140 (Some(stop), Some(handle))
141 }
142 };
143
144 let guard = Arc::new(DropGuard {
145 reaper_stop,
146 reaper_join: Mutex::new(reaper_join),
147 inner_for_flush: inner.clone(),
148 });
149
150 Ok(Store {
151 inner,
152 guard,
153 config,
154 })
155 }
156
157 pub fn downgrade(&self) -> WeakStore {
160 WeakStore {
161 inner: Arc::downgrade(&self.inner),
162 guard: Arc::downgrade(&self.guard),
163 config: self.config.clone(),
164 }
165 }
166
167 pub fn config(&self) -> &Config {
170 &self.config
171 }
172
173 pub fn with<F, R>(&self, f: F) -> R
181 where
182 F: FnOnce(&mut kevy_store::Store) -> R,
183 {
184 let mut g = self.lock();
185 f(&mut g.store)
186 }
187
188 pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
192 let mut g = self.lock();
193 if let Some(aof) = &mut g.aof {
194 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
195 aof.append(&argv)?;
196 }
197 Ok(())
198 }
199
200 pub fn tick(&self) -> ExpireStats {
206 let mut g = self.lock();
207 g.store
208 .tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
209 }
210
211 pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
215 let mut g = self.lock();
216 let Inner { store, aof, bus: _ } = &mut *g;
220 let Some(aof) = aof else { return Ok(None) };
221 Ok(Some(aof.rewrite_from(store)?))
222 }
223
224 pub fn save_snapshot(&self) -> io::Result<bool> {
228 let g = self.lock();
229 let Some(dir) = self.config.data_dir.as_ref() else {
230 return Ok(false);
231 };
232 let path: PathBuf = dir.join(&self.config.snapshot_filename);
233 save_snapshot(&g.store, &path)?;
234 Ok(true)
235 }
236
237 pub(crate) fn inner_handle(&self) -> Arc<Mutex<Inner>> {
244 self.inner.clone()
245 }
246
247 pub(crate) fn guard_handle(&self) -> Arc<DropGuard> {
250 self.guard.clone()
251 }
252
253 pub(crate) fn lock(&self) -> MutexGuard<'_, Inner> {
257 match self.inner.lock() {
258 Ok(g) => g,
259 Err(poison) => poison.into_inner(),
260 }
261 }
262}
263
264fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
271 if let Some(aof) = aof {
272 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
273 aof.append(&argv)?;
274 }
275 Ok(())
276}
277
278pub(crate) fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
283 log_argv(&mut inner.aof, parts)?;
284 inner.store.try_evict_after_write();
285 Ok(())
286}
287
288pub(crate) fn store_err(e: StoreError) -> io::Error {
289 io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
290}
291
292fn reaper_loop(
293 inner: Arc<Mutex<Inner>>,
294 stop: Arc<AtomicBool>,
295 interval: Duration,
296 samples: usize,
297 rounds: u32,
298) {
299 while !stop.load(Ordering::Relaxed) {
300 std::thread::sleep(interval);
301 if stop.load(Ordering::Relaxed) {
302 break;
303 }
304 let mut g = match inner.lock() {
305 Ok(g) => g,
306 Err(poison) => poison.into_inner(),
307 };
308 let _ = g.store.tick_expire(samples, rounds);
309 if let Some(aof) = &mut g.aof {
312 let _ = aof.maybe_sync();
313 }
314 }
315}
316
317impl Drop for DropGuard {
318 fn drop(&mut self) {
319 if let Some(stop) = &self.reaper_stop {
324 stop.store(true, Ordering::Relaxed);
325 }
326 if let Some(j) = self
327 .reaper_join
328 .lock()
329 .unwrap_or_else(|p| p.into_inner())
330 .take()
331 {
332 let _ = j.join();
333 }
334 let mut g = match self.inner_for_flush.lock() {
335 Ok(g) => g,
336 Err(poison) => poison.into_inner(),
337 };
338 if let Some(aof) = &mut g.aof {
339 let _ = aof.maybe_sync();
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::config::{AppendFsync, EvictionPolicy};
348
349 fn tmp_dir(name: &str) -> PathBuf {
350 let mut p = std::env::temp_dir();
351 let uniq = std::time::SystemTime::now()
352 .duration_since(std::time::UNIX_EPOCH)
353 .unwrap()
354 .as_nanos();
355 p.push(format!("kevy-embedded-{name}-{uniq}"));
356 p
357 }
358
359 #[test]
360 fn in_memory_roundtrip() {
361 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
362 s.set(b"k", b"v").unwrap();
363 assert_eq!(s.get(b"k").unwrap(), Some(b"v".to_vec()));
364 assert_eq!(s.dbsize(), 1);
365 s.del(&[b"k"]).unwrap();
366 assert_eq!(s.dbsize(), 0);
367 }
368
369 #[test]
370 fn persistence_round_trip_via_aof() {
371 let dir = tmp_dir("aof-rt");
372 {
373 let s = Store::open(
374 Config::default()
375 .with_persist(&dir)
376 .with_ttl_reaper_manual()
377 .with_appendfsync(AppendFsync::Always),
378 )
379 .unwrap();
380 for i in 0..50 {
381 s.set(format!("k{i}").as_bytes(), b"v").unwrap();
382 }
383 s.incr_by(b"counter", 41).unwrap();
384 s.hset(b"h", &[(b"field" as &[u8], b"val" as &[u8])]).unwrap();
385 }
386 let s2 = Store::open(
388 Config::default()
389 .with_persist(&dir)
390 .with_ttl_reaper_manual(),
391 )
392 .unwrap();
393 assert_eq!(s2.dbsize(), 52); assert_eq!(s2.get(b"k0").unwrap(), Some(b"v".to_vec()));
395 assert_eq!(s2.get(b"k49").unwrap(), Some(b"v".to_vec()));
396 assert_eq!(s2.get(b"counter").unwrap(), Some(b"41".to_vec()));
397 assert_eq!(s2.hget(b"h", b"field").unwrap(), Some(b"val".to_vec()));
398 drop(s2);
399 let _ = std::fs::remove_dir_all(&dir);
400 }
401
402 #[test]
403 fn eviction_works_under_pressure() {
404 let s = Store::open(
405 Config::default()
406 .with_ttl_reaper_manual()
407 .with_max_memory(800)
408 .with_eviction(EvictionPolicy::AllKeysLru),
409 )
410 .unwrap();
411 for i in 0..50 {
412 s.set(format!("k{i:02}").as_bytes(), b"xxxxxxxxxxxxxxxxxxxx")
413 .unwrap();
414 }
415 assert!(s.used_memory() <= 800, "got {}", s.used_memory());
416 assert!(s.evictions_total() > 0);
417 }
418
419 #[test]
420 fn manual_tick_runs_active_reaper() {
421 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
422 s.set_with_ttl(b"short", b"v", Duration::from_millis(1)).unwrap();
423 s.set(b"perm", b"v").unwrap();
424 std::thread::sleep(Duration::from_millis(20));
425 let stats = s.tick();
426 let _ = stats;
429 let _ = s.get(b"short").unwrap(); assert!(s.expired_keys_total() >= 1);
431 assert!(s.get(b"perm").unwrap().is_some());
432 }
433
434 #[test]
435 fn with_escape_hatch_works() {
436 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
437 let zsize = s.with(|store| {
438 let _ = store.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec())]);
439 store.zcard(b"z").unwrap()
440 });
441 assert_eq!(zsize, 2);
442 assert_eq!(s.type_of(b"z"), "zset");
445 }
446
447 #[test]
448 fn background_reaper_thread_drops_expired_keys() {
449 let s = Store::open(
450 Config::default().with_reaper_interval(Duration::from_millis(20)),
451 )
452 .unwrap();
453 s.set_with_ttl(b"k", b"v", Duration::from_millis(5)).unwrap();
454 std::thread::sleep(Duration::from_millis(120));
455 let _ = s.get(b"k").unwrap(); assert_eq!(s.dbsize(), 0);
458 }
459
460 #[test]
461 fn arc_sharing_across_threads() {
462 use std::sync::Arc;
463 let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual()).unwrap());
464 let mut handles = Vec::new();
465 for i in 0..8 {
466 let s = Arc::clone(&s);
467 handles.push(std::thread::spawn(move || {
468 for j in 0..50 {
469 s.set(format!("t{i}-{j}").as_bytes(), b"v").unwrap();
470 }
471 }));
472 }
473 for h in handles {
474 h.join().unwrap();
475 }
476 assert_eq!(s.dbsize(), 8 * 50);
477 }
478
479 #[test]
480 fn drop_during_reaper_does_not_deadlock() {
481 for _ in 0..4 {
485 let s = Store::open(
486 Config::default().with_reaper_interval(Duration::from_millis(5)),
487 )
488 .unwrap();
489 s.set(b"k", b"v").unwrap();
490 std::thread::sleep(Duration::from_millis(40));
492 drop(s); }
494 }
495
496 #[test]
497 fn save_snapshot_then_restart() {
498 let dir = tmp_dir("snap-rt");
499 {
500 let s = Store::open(
501 Config::default()
502 .with_persist(&dir)
503 .without_aof()
504 .with_ttl_reaper_manual(),
505 )
506 .unwrap();
507 for i in 0..10 {
508 s.set(format!("k{i}").as_bytes(), b"v").unwrap();
509 }
510 let saved = s.save_snapshot().unwrap();
511 assert!(saved);
512 }
513 let s2 = Store::open(
514 Config::default()
515 .with_persist(&dir)
516 .without_aof()
517 .with_ttl_reaper_manual(),
518 )
519 .unwrap();
520 assert_eq!(s2.dbsize(), 10);
521 let _ = std::fs::remove_dir_all(&dir);
522 }
523}