Skip to main content

kevy_embedded/
store.rs

1//! [`Store`] — the embedded entry point. Wraps `kevy_store::Store` with
2//! a mutex (for cross-thread access), optional AOF auto-logging, an
3//! optional background TTL reaper, and an in-process pub/sub bus.
4
5use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex, MutexGuard, Weak};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
13use kevy_store::{ExpireStats, StoreError};
14
15use crate::config::{Config, TtlReaperMode};
16use crate::pubsub::PubsubBus;
17
18/// The embedded keyspace.
19///
20/// **`Store` is `Clone`** (since v1.1.0). A clone is a cheap `Arc` bump:
21/// every clone reaches the same underlying `kevy_store::Store` + AOF +
22/// reaper + pub/sub bus. The reaper thread is joined and the AOF is
23/// flushed exactly once, when the **last** clone is dropped.
24///
25/// ```
26/// use kevy_embedded::{Config, Store};
27///
28/// # fn main() -> std::io::Result<()> {
29/// let s = Store::open(Config::default().with_ttl_reaper_manual())?;
30/// let s2 = s.clone();
31/// std::thread::spawn(move || {
32///     s2.set(b"from-thread", b"v").unwrap();
33/// }).join().unwrap();
34/// assert_eq!(s.get(b"from-thread")?, Some(b"v".to_vec()));
35/// # Ok(())
36/// # }
37/// ```
38///
39/// Every method takes `&self`. The internal `Arc<Mutex<Inner>>` is what
40/// makes shared access safe under contention.
41#[derive(Clone)]
42pub struct Store {
43    inner: Arc<Mutex<Inner>>,
44    /// Shared drop guard: signals + joins reaper and flushes AOF when the
45    /// LAST `Store` clone (or `Subscription`) holding a strong ref drops.
46    guard: Arc<DropGuard>,
47    config: Config,
48}
49
50/// Weak handle to a `Store` — does not keep the underlying keyspace alive.
51///
52/// Used by the URL-keyed registry in `kevy-client` so that multiple
53/// `Connection::open("mem://name")` calls share the same backing store
54/// without leaking it when all strong handles go away.
55pub struct WeakStore {
56    inner: Weak<Mutex<Inner>>,
57    guard: Weak<DropGuard>,
58    config: Config,
59}
60
61impl WeakStore {
62    /// Try to upgrade back to a `Store`. Returns `None` if the last strong
63    /// reference has already been dropped.
64    pub fn upgrade(&self) -> Option<Store> {
65        Some(Store {
66            inner: self.inner.upgrade()?,
67            guard: self.guard.upgrade()?,
68            config: self.config.clone(),
69        })
70    }
71}
72
73pub(crate) struct Inner {
74    pub(crate) store: kevy_store::Store,
75    pub(crate) aof: Option<Aof>,
76    pub(crate) bus: PubsubBus,
77}
78
79/// Owns the reaper-thread handle + a back-reference to `Inner` for the
80/// final AOF flush. Lives in an `Arc<DropGuard>` shared across every
81/// `Store` clone; the actual drop logic fires only when the last clone
82/// goes away. `JoinHandle` is wrapped in `Mutex<Option>` so `Drop` can
83/// `.take()` it while only having `&self`.
84pub(crate) struct DropGuard {
85    reaper_stop: Option<Arc<AtomicBool>>,
86    reaper_join: Mutex<Option<JoinHandle<()>>>,
87    inner_for_flush: Arc<Mutex<Inner>>,
88}
89
90impl Store {
91    /// Open an embedded keyspace per `config`.
92    ///
93    /// - Pure in-memory when `config.data_dir` is `None`.
94    /// - With persistence: loads `<data_dir>/<snapshot_filename>` first,
95    ///   then replays `<data_dir>/<aof_filename>`. Both are best-effort —
96    ///   missing files are fine, a truncated AOF tail is silently dropped.
97    /// - Spawns a background TTL reaper thread when
98    ///   `config.ttl_reaper == Background` (the default).
99    pub fn open(config: Config) -> io::Result<Self> {
100        let (store, aof) = init_persistent_store(&config)?;
101        let inner = Arc::new(Mutex::new(Inner {
102            store,
103            aof,
104            bus: PubsubBus::new(),
105        }));
106        let (reaper_stop, reaper_join) = spawn_reaper(&config, &inner)?;
107        let guard = Arc::new(DropGuard {
108            reaper_stop,
109            reaper_join: Mutex::new(reaper_join),
110            inner_for_flush: inner.clone(),
111        });
112        Ok(Store {
113            inner,
114            guard,
115            config,
116        })
117    }
118
119    /// Get a weak handle that does not keep the keyspace alive.
120    /// `upgrade()` returns `None` once the last strong `Store` is dropped.
121    pub fn downgrade(&self) -> WeakStore {
122        WeakStore {
123            inner: Arc::downgrade(&self.inner),
124            guard: Arc::downgrade(&self.guard),
125            config: self.config.clone(),
126        }
127    }
128
129    /// The active config (a clone — modifying it has no effect on the
130    /// running store). Useful for introspection / `INFO`-style telemetry.
131    pub fn config(&self) -> &Config {
132        &self.config
133    }
134
135    // ---- escape hatches -------------------------------------------------
136
137    /// Run `f` against the underlying `kevy_store::Store` under the
138    /// embedded mutex. Use for direct access to methods this crate hasn't
139    /// wrapped (snapshot iteration, ZRANGE, raw collect_keys, …). The
140    /// closure can mutate, but *does not auto-log to the AOF* — call
141    /// [`Self::log`] yourself if the mutation must survive a crash.
142    pub fn with<F, R>(&self, f: F) -> R
143    where
144        F: FnOnce(&mut kevy_store::Store) -> R,
145    {
146        let mut g = self.lock();
147        f(&mut g.store)
148    }
149
150    /// Append a raw RESP-frame argument list to the AOF. Pairs with
151    /// [`Self::with`] when the closure performed a write you want to make
152    /// crash-safe. No-op when persistence is disabled.
153    pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
154        let mut g = self.lock();
155        if let Some(aof) = &mut g.aof {
156            let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
157            aof.append(&argv)?;
158        }
159        Ok(())
160    }
161
162    // ---- maintenance ----------------------------------------------------
163
164    /// Run one TTL-reaper tick. Required call cadence in `Manual` mode
165    /// (call ~10× per second to match Redis's `hz=10`); no-op cost is
166    /// one mutex lock + map-emptiness check when nothing has TTL.
167    pub fn tick(&self) -> ExpireStats {
168        let mut g = self.lock();
169        let stats = g
170            .store
171            .tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds);
172        // Auto-AOF-rewrite check rides the same caller-driven tick in Manual
173        // mode (Background mode runs it from the reaper thread instead).
174        maybe_auto_rewrite(
175            &mut g,
176            self.config.auto_aof_rewrite_pct,
177            self.config.auto_aof_rewrite_min_size,
178        );
179        stats
180    }
181
182    /// `BGREWRITEAOF`: rebuild the AOF from current state. Synchronous —
183    /// blocks until the rewrite + atomic rename completes. Returns
184    /// `Ok(None)` when persistence is disabled.
185    pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
186        let mut g = self.lock();
187        // Disjoint-field split-borrow: destructure the guard so the borrow
188        // checker sees `store` and `aof` as independent borrows, not two
189        // claims on the same `&mut Inner`.
190        let Inner { store, aof, bus: _ } = &mut *g;
191        let Some(aof) = aof else { return Ok(None) };
192        Ok(Some(aof.rewrite_from(store)?))
193    }
194
195    /// Snapshot the store to `<data_dir>/<snapshot_filename>`, atomically.
196    /// `Ok(false)` when persistence is disabled (caller can decide to
197    /// surface that or no-op).
198    pub fn save_snapshot(&self) -> io::Result<bool> {
199        let g = self.lock();
200        let Some(dir) = self.config.data_dir.as_ref() else {
201            return Ok(false);
202        };
203        let path: PathBuf = dir.join(&self.config.snapshot_filename);
204        save_snapshot(&g.store, &path)?;
205        Ok(true)
206    }
207
208    // String / hash / list / set / zset / pub-sub data-type methods live
209    // in `crate::ops` (kept under the 500-LOC file cap). Look there for
210    // e.g. `Store::set` / `Store::hset` / `Store::publish`.
211
212    /// Crate-internal: clone the shared `Arc<Mutex<Inner>>` handle, used
213    /// by `ops.rs::Store::subscribe` to hand the bus to a `Subscription`.
214    pub(crate) fn inner_handle(&self) -> Arc<Mutex<Inner>> {
215        self.inner.clone()
216    }
217
218    /// Crate-internal: clone the shared `Arc<DropGuard>` so a live
219    /// `Subscription` keeps the reaper + AOF flush alive until it drops.
220    pub(crate) fn guard_handle(&self) -> Arc<DropGuard> {
221        self.guard.clone()
222    }
223
224    /// Crate-internal: acquire the embedded mutex. Recovers from poison
225    /// because every method's critical section is short — a panic in one
226    /// doesn't corrupt the keyspace.
227    pub(crate) fn lock(&self) -> MutexGuard<'_, Inner> {
228        match self.inner.lock() {
229            Ok(g) => g,
230            Err(poison) => poison.into_inner(),
231        }
232    }
233}
234
235/// Build the `kevy_store::Store` and (optionally) its `Aof`. Loads any
236/// pre-existing snapshot and replays any pre-existing AOF before
237/// returning. `data_dir = None` ⇒ pure in-memory (both return values
238/// are the empty store + `None`).
239fn init_persistent_store(config: &Config) -> io::Result<(kevy_store::Store, Option<Aof>)> {
240    let mut store = kevy_store::Store::new();
241    store.set_max_memory(config.maxmemory, config.eviction_policy);
242    let aof = if let Some(dir) = &config.data_dir {
243        std::fs::create_dir_all(dir)?;
244        let snap_path = dir.join(&config.snapshot_filename);
245        if snap_path.exists() {
246            load_snapshot(&mut store, &snap_path)?;
247        }
248        let aof_path = dir.join(&config.aof_filename);
249        if aof_path.exists() {
250            replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
251        }
252        if config.aof {
253            Some(Aof::open(&aof_path, config.appendfsync)?)
254        } else {
255            None
256        }
257    } else {
258        None
259    };
260    Ok((store, aof))
261}
262
263/// Start the background TTL reaper thread, returning its stop signal +
264/// join handle. `TtlReaperMode::Manual` returns `(None, None)` so the
265/// caller-driven reap is in charge instead.
266#[allow(clippy::type_complexity)] // inline tuple keeps the pair colocated
267fn spawn_reaper(
268    config: &Config,
269    inner: &Arc<Mutex<Inner>>,
270) -> io::Result<(Option<Arc<AtomicBool>>, Option<JoinHandle<()>>)> {
271    match config.ttl_reaper {
272        TtlReaperMode::Manual => Ok((None, None)),
273        TtlReaperMode::Background => {
274            let stop = Arc::new(AtomicBool::new(false));
275            let stop_t = stop.clone();
276            let inner_t = inner.clone();
277            let interval = config.reaper_interval;
278            let samples = config.reaper_samples;
279            let rounds = config.reaper_max_rounds;
280            let rw_pct = config.auto_aof_rewrite_pct;
281            let rw_min = config.auto_aof_rewrite_min_size;
282            let handle = std::thread::Builder::new()
283                .name(String::from("kevy-embedded-reaper"))
284                .spawn(move || {
285                    reaper_loop(inner_t, stop_t, interval, samples, rounds, rw_pct, rw_min)
286                })?;
287            Ok((Some(stop), Some(handle)))
288        }
289    }
290}
291
292// ─────────────────────────────────────────────────────────────────────────
293// DELETION MARKER (replaced below): everything from the original `set`
294// method through the closing brace of `impl Store` previously lived here.
295// They've been moved to `crate::ops` — see crates/kevy-embedded/src/ops.rs.
296// ─────────────────────────────────────────────────────────────────────────
297
298fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
299    if let Some(aof) = aof {
300        let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
301        aof.append(&argv)?;
302    }
303    Ok(())
304}
305
306/// Complete a write: AOF-log the canonical RESP command, then run the
307/// store's post-write eviction sweep. Single helper so every write wrapper
308/// stays in lockstep — forgetting to evict means a maxmemory budget would
309/// grow without bound.
310pub(crate) fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
311    log_argv(&mut inner.aof, parts)?;
312    inner.store.try_evict_after_write();
313    Ok(())
314}
315
316pub(crate) fn store_err(e: StoreError) -> io::Error {
317    io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
318}
319
320#[allow(clippy::too_many_arguments)] // reaper config knobs, all primitives
321fn reaper_loop(
322    inner: Arc<Mutex<Inner>>,
323    stop: Arc<AtomicBool>,
324    interval: Duration,
325    samples: usize,
326    rounds: u32,
327    rewrite_pct: u32,
328    rewrite_min_size: u64,
329) {
330    while !stop.load(Ordering::Relaxed) {
331        std::thread::sleep(interval);
332        if stop.load(Ordering::Relaxed) {
333            break;
334        }
335        let mut g = match inner.lock() {
336            Ok(g) => g,
337            Err(poison) => poison.into_inner(),
338        };
339        let _ = g.store.tick_expire(samples, rounds);
340        // EverySec AOF fsync window check — embedded mode runs this from
341        // the same reaper tick rather than a separate timer.
342        if let Some(aof) = &mut g.aof {
343            let _ = aof.maybe_sync();
344        }
345        maybe_auto_rewrite(&mut g, rewrite_pct, rewrite_min_size);
346    }
347}
348
349/// Auto-`BGREWRITEAOF`: rewrite the AOF when it has grown `pct` percent past
350/// its size at the last rewrite and is at least `min_size` bytes (Redis's
351/// `auto-aof-rewrite-percentage` / `-min-size`). Mirrors the server runtime's
352/// `Shard::maybe_auto_rewrite_aof`. Runs under the held `Inner` lock, so it
353/// briefly blocks writers while it rewrites — acceptable because it fires
354/// rarely (only when the AOF has doubled past the floor). `pct == 0` disables.
355pub(crate) fn maybe_auto_rewrite(g: &mut Inner, pct: u32, min_size: u64) {
356    if pct == 0 {
357        return;
358    }
359    let Inner { store, aof, .. } = g;
360    let Some(aof) = aof else { return };
361    let cur = aof.size_bytes();
362    if cur < min_size {
363        return;
364    }
365    let baseline = aof.size_at_last_rewrite().max(1);
366    // (cur - baseline) * 100 / baseline ≥ pct  ⇔  cur * 100 ≥ baseline * (100 + pct)
367    if cur.saturating_mul(100) < baseline.saturating_mul(100u64.saturating_add(pct as u64)) {
368        return;
369    }
370    if let Err(e) = aof.rewrite_from(store) {
371        eprintln!("kevy: embedded auto AOF rewrite failed: {e}");
372    }
373}
374
375impl Drop for DropGuard {
376    fn drop(&mut self) {
377        // Last `Store` clone is going away — stop the reaper, join it, then
378        // flush the AOF so EverySec users don't lose the last sub-second of
379        // writes. Poison recovery: a method panic earlier shouldn't strand
380        // the AOF unflushed; the writes already landed in-memory.
381        if let Some(stop) = &self.reaper_stop {
382            stop.store(true, Ordering::Relaxed);
383        }
384        if let Some(j) = self
385            .reaper_join
386            .lock()
387            .unwrap_or_else(|p| p.into_inner())
388            .take()
389        {
390            let _ = j.join();
391        }
392        let mut g = match self.inner_for_flush.lock() {
393            Ok(g) => g,
394            Err(poison) => poison.into_inner(),
395        };
396        if let Some(aof) = &mut g.aof {
397            let _ = aof.maybe_sync();
398        }
399    }
400}
401
402#[cfg(test)]
403#[path = "store_tests.rs"]
404mod tests;