Skip to main content

kevy_embedded/
store.rs

1//! [`Store`] — the embedded entry point. Wraps `kevy_store::Store` with
2//! a mutex (for cross-thread access), optional AOF auto-logging, an
3//! optional background TTL reaper, and an in-process pub/sub bus.
4
5use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex, MutexGuard, Weak};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
13use kevy_store::{ExpireStats, StoreError};
14
15use crate::config::{Config, TtlReaperMode};
16use crate::pubsub::PubsubBus;
17
18/// The embedded keyspace.
19///
20/// **`Store` is `Clone`** (since v1.1.0). A clone is a cheap `Arc` bump:
21/// every clone reaches the same underlying `kevy_store::Store` + AOF +
22/// reaper + pub/sub bus. The reaper thread is joined and the AOF is
23/// flushed exactly once, when the **last** clone is dropped.
24///
25/// ```
26/// use kevy_embedded::{Config, Store};
27///
28/// # fn main() -> std::io::Result<()> {
29/// let s = Store::open(Config::default().with_ttl_reaper_manual())?;
30/// let s2 = s.clone();
31/// std::thread::spawn(move || {
32///     s2.set(b"from-thread", b"v").unwrap();
33/// }).join().unwrap();
34/// assert_eq!(s.get(b"from-thread")?, Some(b"v".to_vec()));
35/// # Ok(())
36/// # }
37/// ```
38///
39/// Every method takes `&self`. The internal `Arc<Mutex<Inner>>` is what
40/// makes shared access safe under contention.
41#[derive(Clone)]
42pub struct Store {
43    inner: Arc<Mutex<Inner>>,
44    /// Shared drop guard: signals + joins reaper and flushes AOF when the
45    /// LAST `Store` clone (or `Subscription`) holding a strong ref drops.
46    guard: Arc<DropGuard>,
47    config: Config,
48}
49
50/// Weak handle to a `Store` — does not keep the underlying keyspace alive.
51///
52/// Used by the URL-keyed registry in `kevy-client` so that multiple
53/// `Connection::open("mem://name")` calls share the same backing store
54/// without leaking it when all strong handles go away.
55pub struct WeakStore {
56    inner: Weak<Mutex<Inner>>,
57    guard: Weak<DropGuard>,
58    config: Config,
59}
60
61impl WeakStore {
62    /// Try to upgrade back to a `Store`. Returns `None` if the last strong
63    /// reference has already been dropped.
64    pub fn upgrade(&self) -> Option<Store> {
65        Some(Store {
66            inner: self.inner.upgrade()?,
67            guard: self.guard.upgrade()?,
68            config: self.config.clone(),
69        })
70    }
71}
72
73pub(crate) struct Inner {
74    pub(crate) store: kevy_store::Store,
75    pub(crate) aof: Option<Aof>,
76    pub(crate) bus: PubsubBus,
77}
78
79/// Owns the reaper-thread handle + a back-reference to `Inner` for the
80/// final AOF flush. Lives in an `Arc<DropGuard>` shared across every
81/// `Store` clone; the actual drop logic fires only when the last clone
82/// goes away. `JoinHandle` is wrapped in `Mutex<Option>` so `Drop` can
83/// `.take()` it while only having `&self`.
84pub(crate) struct DropGuard {
85    reaper_stop: Option<Arc<AtomicBool>>,
86    reaper_join: Mutex<Option<JoinHandle<()>>>,
87    inner_for_flush: Arc<Mutex<Inner>>,
88}
89
90impl Store {
91    /// Open an embedded keyspace per `config`.
92    ///
93    /// - Pure in-memory when `config.data_dir` is `None`.
94    /// - With persistence: loads `<data_dir>/<snapshot_filename>` first,
95    ///   then replays `<data_dir>/<aof_filename>`. Both are best-effort —
96    ///   missing files are fine, a truncated AOF tail is silently dropped.
97    /// - Spawns a background TTL reaper thread when
98    ///   `config.ttl_reaper == Background` (the default).
99    pub fn open(config: Config) -> io::Result<Self> {
100        let mut store = kevy_store::Store::new();
101        store.set_max_memory(config.maxmemory, config.eviction_policy);
102
103        let aof = if let Some(dir) = &config.data_dir {
104            std::fs::create_dir_all(dir)?;
105            let snap_path = dir.join(&config.snapshot_filename);
106            if snap_path.exists() {
107                load_snapshot(&mut store, &snap_path)?;
108            }
109            let aof_path = dir.join(&config.aof_filename);
110            if aof_path.exists() {
111                replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
112            }
113            if config.aof {
114                Some(Aof::open(&aof_path, config.appendfsync)?)
115            } else {
116                None
117            }
118        } else {
119            None
120        };
121
122        let inner = Arc::new(Mutex::new(Inner {
123            store,
124            aof,
125            bus: PubsubBus::new(),
126        }));
127
128        let (reaper_stop, reaper_join) = match config.ttl_reaper {
129            TtlReaperMode::Manual => (None, None),
130            TtlReaperMode::Background => {
131                let stop = Arc::new(AtomicBool::new(false));
132                let stop_t = stop.clone();
133                let inner_t = inner.clone();
134                let interval = config.reaper_interval;
135                let samples = config.reaper_samples;
136                let rounds = config.reaper_max_rounds;
137                let handle = std::thread::Builder::new()
138                    .name(String::from("kevy-embedded-reaper"))
139                    .spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
140                (Some(stop), Some(handle))
141            }
142        };
143
144        let guard = Arc::new(DropGuard {
145            reaper_stop,
146            reaper_join: Mutex::new(reaper_join),
147            inner_for_flush: inner.clone(),
148        });
149
150        Ok(Store {
151            inner,
152            guard,
153            config,
154        })
155    }
156
157    /// Get a weak handle that does not keep the keyspace alive.
158    /// `upgrade()` returns `None` once the last strong `Store` is dropped.
159    pub fn downgrade(&self) -> WeakStore {
160        WeakStore {
161            inner: Arc::downgrade(&self.inner),
162            guard: Arc::downgrade(&self.guard),
163            config: self.config.clone(),
164        }
165    }
166
167    /// The active config (a clone — modifying it has no effect on the
168    /// running store). Useful for introspection / `INFO`-style telemetry.
169    pub fn config(&self) -> &Config {
170        &self.config
171    }
172
173    // ---- escape hatches -------------------------------------------------
174
175    /// Run `f` against the underlying `kevy_store::Store` under the
176    /// embedded mutex. Use for direct access to methods this crate hasn't
177    /// wrapped (snapshot iteration, ZRANGE, raw collect_keys, …). The
178    /// closure can mutate, but *does not auto-log to the AOF* — call
179    /// [`Self::log`] yourself if the mutation must survive a crash.
180    pub fn with<F, R>(&self, f: F) -> R
181    where
182        F: FnOnce(&mut kevy_store::Store) -> R,
183    {
184        let mut g = self.lock();
185        f(&mut g.store)
186    }
187
188    /// Append a raw RESP-frame argument list to the AOF. Pairs with
189    /// [`Self::with`] when the closure performed a write you want to make
190    /// crash-safe. No-op when persistence is disabled.
191    pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
192        let mut g = self.lock();
193        if let Some(aof) = &mut g.aof {
194            let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
195            aof.append(&argv)?;
196        }
197        Ok(())
198    }
199
200    // ---- maintenance ----------------------------------------------------
201
202    /// Run one TTL-reaper tick. Required call cadence in `Manual` mode
203    /// (call ~10× per second to match Redis's `hz=10`); no-op cost is
204    /// one mutex lock + map-emptiness check when nothing has TTL.
205    pub fn tick(&self) -> ExpireStats {
206        let mut g = self.lock();
207        g.store
208            .tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
209    }
210
211    /// `BGREWRITEAOF`: rebuild the AOF from current state. Synchronous —
212    /// blocks until the rewrite + atomic rename completes. Returns
213    /// `Ok(None)` when persistence is disabled.
214    pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
215        let mut g = self.lock();
216        // Disjoint-field split-borrow: destructure the guard so the borrow
217        // checker sees `store` and `aof` as independent borrows, not two
218        // claims on the same `&mut Inner`.
219        let Inner { store, aof, bus: _ } = &mut *g;
220        let Some(aof) = aof else { return Ok(None) };
221        Ok(Some(aof.rewrite_from(store)?))
222    }
223
224    /// Snapshot the store to `<data_dir>/<snapshot_filename>`, atomically.
225    /// `Ok(false)` when persistence is disabled (caller can decide to
226    /// surface that or no-op).
227    pub fn save_snapshot(&self) -> io::Result<bool> {
228        let g = self.lock();
229        let Some(dir) = self.config.data_dir.as_ref() else {
230            return Ok(false);
231        };
232        let path: PathBuf = dir.join(&self.config.snapshot_filename);
233        save_snapshot(&g.store, &path)?;
234        Ok(true)
235    }
236
237    // String / hash / list / set / zset / pub-sub data-type methods live
238    // in `crate::ops` (kept under the 500-LOC file cap). Look there for
239    // e.g. `Store::set` / `Store::hset` / `Store::publish`.
240
241    /// Crate-internal: clone the shared `Arc<Mutex<Inner>>` handle, used
242    /// by `ops.rs::Store::subscribe` to hand the bus to a `Subscription`.
243    pub(crate) fn inner_handle(&self) -> Arc<Mutex<Inner>> {
244        self.inner.clone()
245    }
246
247    /// Crate-internal: clone the shared `Arc<DropGuard>` so a live
248    /// `Subscription` keeps the reaper + AOF flush alive until it drops.
249    pub(crate) fn guard_handle(&self) -> Arc<DropGuard> {
250        self.guard.clone()
251    }
252
253    /// Crate-internal: acquire the embedded mutex. Recovers from poison
254    /// because every method's critical section is short — a panic in one
255    /// doesn't corrupt the keyspace.
256    pub(crate) fn lock(&self) -> MutexGuard<'_, Inner> {
257        match self.inner.lock() {
258            Ok(g) => g,
259            Err(poison) => poison.into_inner(),
260        }
261    }
262}
263
264// ─────────────────────────────────────────────────────────────────────────
265// DELETION MARKER (replaced below): everything from the original `set`
266// method through the closing brace of `impl Store` previously lived here.
267// They've been moved to `crate::ops` — see crates/kevy-embedded/src/ops.rs.
268// ─────────────────────────────────────────────────────────────────────────
269
270fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
271    if let Some(aof) = aof {
272        let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
273        aof.append(&argv)?;
274    }
275    Ok(())
276}
277
278/// Complete a write: AOF-log the canonical RESP command, then run the
279/// store's post-write eviction sweep. Single helper so every write wrapper
280/// stays in lockstep — forgetting to evict means a maxmemory budget would
281/// grow without bound.
282pub(crate) fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
283    log_argv(&mut inner.aof, parts)?;
284    inner.store.try_evict_after_write();
285    Ok(())
286}
287
288pub(crate) fn store_err(e: StoreError) -> io::Error {
289    io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
290}
291
292fn reaper_loop(
293    inner: Arc<Mutex<Inner>>,
294    stop: Arc<AtomicBool>,
295    interval: Duration,
296    samples: usize,
297    rounds: u32,
298) {
299    while !stop.load(Ordering::Relaxed) {
300        std::thread::sleep(interval);
301        if stop.load(Ordering::Relaxed) {
302            break;
303        }
304        let mut g = match inner.lock() {
305            Ok(g) => g,
306            Err(poison) => poison.into_inner(),
307        };
308        let _ = g.store.tick_expire(samples, rounds);
309        // EverySec AOF fsync window check — embedded mode runs this from
310        // the same reaper tick rather than a separate timer.
311        if let Some(aof) = &mut g.aof {
312            let _ = aof.maybe_sync();
313        }
314    }
315}
316
317impl Drop for DropGuard {
318    fn drop(&mut self) {
319        // Last `Store` clone is going away — stop the reaper, join it, then
320        // flush the AOF so EverySec users don't lose the last sub-second of
321        // writes. Poison recovery: a method panic earlier shouldn't strand
322        // the AOF unflushed; the writes already landed in-memory.
323        if let Some(stop) = &self.reaper_stop {
324            stop.store(true, Ordering::Relaxed);
325        }
326        if let Some(j) = self
327            .reaper_join
328            .lock()
329            .unwrap_or_else(|p| p.into_inner())
330            .take()
331        {
332            let _ = j.join();
333        }
334        let mut g = match self.inner_for_flush.lock() {
335            Ok(g) => g,
336            Err(poison) => poison.into_inner(),
337        };
338        if let Some(aof) = &mut g.aof {
339            let _ = aof.maybe_sync();
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use crate::config::{AppendFsync, EvictionPolicy};
348
349    fn tmp_dir(name: &str) -> PathBuf {
350        let mut p = std::env::temp_dir();
351        let uniq = std::time::SystemTime::now()
352            .duration_since(std::time::UNIX_EPOCH)
353            .unwrap()
354            .as_nanos();
355        p.push(format!("kevy-embedded-{name}-{uniq}"));
356        p
357    }
358
359    #[test]
360    fn in_memory_roundtrip() {
361        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
362        s.set(b"k", b"v").unwrap();
363        assert_eq!(s.get(b"k").unwrap(), Some(b"v".to_vec()));
364        assert_eq!(s.dbsize(), 1);
365        s.del(&[b"k"]).unwrap();
366        assert_eq!(s.dbsize(), 0);
367    }
368
369    #[test]
370    fn persistence_round_trip_via_aof() {
371        let dir = tmp_dir("aof-rt");
372        {
373            let s = Store::open(
374                Config::default()
375                    .with_persist(&dir)
376                    .with_ttl_reaper_manual()
377                    .with_appendfsync(AppendFsync::Always),
378            )
379            .unwrap();
380            for i in 0..50 {
381                s.set(format!("k{i}").as_bytes(), b"v").unwrap();
382            }
383            s.incr_by(b"counter", 41).unwrap();
384            s.hset(b"h", &[(b"field" as &[u8], b"val" as &[u8])]).unwrap();
385        }
386        // Reopen: AOF replay should reconstruct exactly the same state.
387        let s2 = Store::open(
388            Config::default()
389                .with_persist(&dir)
390                .with_ttl_reaper_manual(),
391        )
392        .unwrap();
393        assert_eq!(s2.dbsize(), 52); // 50 + counter + h
394        assert_eq!(s2.get(b"k0").unwrap(), Some(b"v".to_vec()));
395        assert_eq!(s2.get(b"k49").unwrap(), Some(b"v".to_vec()));
396        assert_eq!(s2.get(b"counter").unwrap(), Some(b"41".to_vec()));
397        assert_eq!(s2.hget(b"h", b"field").unwrap(), Some(b"val".to_vec()));
398        drop(s2);
399        let _ = std::fs::remove_dir_all(&dir);
400    }
401
402    #[test]
403    fn eviction_works_under_pressure() {
404        let s = Store::open(
405            Config::default()
406                .with_ttl_reaper_manual()
407                .with_max_memory(800)
408                .with_eviction(EvictionPolicy::AllKeysLru),
409        )
410        .unwrap();
411        for i in 0..50 {
412            s.set(format!("k{i:02}").as_bytes(), b"xxxxxxxxxxxxxxxxxxxx")
413                .unwrap();
414        }
415        assert!(s.used_memory() <= 800, "got {}", s.used_memory());
416        assert!(s.evictions_total() > 0);
417    }
418
419    #[test]
420    fn manual_tick_runs_active_reaper() {
421        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
422        s.set_with_ttl(b"short", b"v", Duration::from_millis(1)).unwrap();
423        s.set(b"perm", b"v").unwrap();
424        std::thread::sleep(Duration::from_millis(20));
425        let stats = s.tick();
426        // tick() should at least sample and reap (may take multiple ticks
427        // for sparse layouts; the call is idempotent).
428        let _ = stats;
429        let _ = s.get(b"short").unwrap(); // lazy reap path
430        assert!(s.expired_keys_total() >= 1);
431        assert!(s.get(b"perm").unwrap().is_some());
432    }
433
434    #[test]
435    fn with_escape_hatch_works() {
436        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
437        let zsize = s.with(|store| {
438            let _ = store.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec())]);
439            store.zcard(b"z").unwrap()
440        });
441        assert_eq!(zsize, 2);
442        // Direct (un-logged) write through `with`: caller may explicitly
443        // log if they want it crash-safe. Here we just verify it landed.
444        assert_eq!(s.type_of(b"z"), "zset");
445    }
446
447    #[test]
448    fn background_reaper_thread_drops_expired_keys() {
449        let s = Store::open(
450            Config::default().with_reaper_interval(Duration::from_millis(20)),
451        )
452        .unwrap();
453        s.set_with_ttl(b"k", b"v", Duration::from_millis(5)).unwrap();
454        std::thread::sleep(Duration::from_millis(120));
455        // The active reaper should have caught it without anyone reading.
456        let _ = s.get(b"k").unwrap(); // either way, key should now be gone
457        assert_eq!(s.dbsize(), 0);
458    }
459
460    #[test]
461    fn arc_sharing_across_threads() {
462        use std::sync::Arc;
463        let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual()).unwrap());
464        let mut handles = Vec::new();
465        for i in 0..8 {
466            let s = Arc::clone(&s);
467            handles.push(std::thread::spawn(move || {
468                for j in 0..50 {
469                    s.set(format!("t{i}-{j}").as_bytes(), b"v").unwrap();
470                }
471            }));
472        }
473        for h in handles {
474            h.join().unwrap();
475        }
476        assert_eq!(s.dbsize(), 8 * 50);
477    }
478
479    #[test]
480    fn drop_during_reaper_does_not_deadlock() {
481        // Sanity: a Store with a Background reaper must drop cleanly even
482        // while the reaper is sleeping. Without the stop-flag + join the
483        // drop would either hang or race the reaper holding the mutex.
484        for _ in 0..4 {
485            let s = Store::open(
486                Config::default().with_reaper_interval(Duration::from_millis(5)),
487            )
488            .unwrap();
489            s.set(b"k", b"v").unwrap();
490            // Let the reaper actually run a couple of times.
491            std::thread::sleep(Duration::from_millis(40));
492            drop(s); // must return within a few ms
493        }
494    }
495
496    #[test]
497    fn save_snapshot_then_restart() {
498        let dir = tmp_dir("snap-rt");
499        {
500            let s = Store::open(
501                Config::default()
502                    .with_persist(&dir)
503                    .without_aof()
504                    .with_ttl_reaper_manual(),
505            )
506            .unwrap();
507            for i in 0..10 {
508                s.set(format!("k{i}").as_bytes(), b"v").unwrap();
509            }
510            let saved = s.save_snapshot().unwrap();
511            assert!(saved);
512        }
513        let s2 = Store::open(
514            Config::default()
515                .with_persist(&dir)
516                .without_aof()
517                .with_ttl_reaper_manual(),
518        )
519        .unwrap();
520        assert_eq!(s2.dbsize(), 10);
521        let _ = std::fs::remove_dir_all(&dir);
522    }
523}