Skip to main content

kevy_embedded/
store.rs

1//! [`Store`] — the embedded entry point. Wraps `kevy_store::Store` with
2//! a mutex (for cross-thread access), optional AOF auto-logging, and an
3//! optional background TTL reaper.
4
5use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex, MutexGuard};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
13use kevy_store::{ExpireStats, StoreError};
14
15use crate::config::{Config, TtlReaperMode};
16
17/// The embedded keyspace.
18///
19/// `Store` itself is **not** `Clone` (the reaper-thread `JoinHandle` is
20/// owned uniquely). To share one keyspace across threads, wrap the store
21/// in an `Arc`:
22///
23/// ```
24/// use std::sync::Arc;
25/// use kevy_embedded::{Config, Store};
26///
27/// # fn main() -> std::io::Result<()> {
28/// let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual())?);
29/// let s2 = Arc::clone(&s);
30/// std::thread::spawn(move || {
31///     s2.set(b"from-thread", b"v").unwrap();
32/// }).join().unwrap();
33/// assert_eq!(s.get(b"from-thread")?, Some(b"v".to_vec()));
34/// # Ok(())
35/// # }
36/// ```
37///
38/// Every method takes `&self`, so an `Arc<Store>` reaches the same
39/// underlying `kevy_store::Store` + AOF + reaper. The internal
40/// `Arc<Mutex<Inner>>` is what makes that safe under contention.
41pub struct Store {
42    inner: Arc<Mutex<Inner>>,
43    config: Config,
44    reaper_stop: Option<Arc<AtomicBool>>,
45    reaper_join: Option<JoinHandle<()>>,
46}
47
48struct Inner {
49    store: kevy_store::Store,
50    aof: Option<Aof>,
51}
52
53impl Store {
54    /// Open an embedded keyspace per `config`.
55    ///
56    /// - Pure in-memory when `config.data_dir` is `None`.
57    /// - With persistence: loads `<data_dir>/<snapshot_filename>` first,
58    ///   then replays `<data_dir>/<aof_filename>`. Both are best-effort —
59    ///   missing files are fine, a truncated AOF tail is silently dropped.
60    /// - Spawns a background TTL reaper thread when
61    ///   `config.ttl_reaper == Background` (the default).
62    pub fn open(config: Config) -> io::Result<Self> {
63        let mut store = kevy_store::Store::new();
64        store.set_max_memory(config.maxmemory, config.eviction_policy);
65
66        let aof = if let Some(dir) = &config.data_dir {
67            std::fs::create_dir_all(dir)?;
68            let snap_path = dir.join(&config.snapshot_filename);
69            if snap_path.exists() {
70                load_snapshot(&mut store, &snap_path)?;
71            }
72            let aof_path = dir.join(&config.aof_filename);
73            if aof_path.exists() {
74                replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
75            }
76            if config.aof {
77                Some(Aof::open(&aof_path, config.appendfsync)?)
78            } else {
79                None
80            }
81        } else {
82            None
83        };
84
85        let inner = Arc::new(Mutex::new(Inner { store, aof }));
86
87        let (reaper_stop, reaper_join) = match config.ttl_reaper {
88            TtlReaperMode::Manual => (None, None),
89            TtlReaperMode::Background => {
90                let stop = Arc::new(AtomicBool::new(false));
91                let stop_t = stop.clone();
92                let inner_t = inner.clone();
93                let interval = config.reaper_interval;
94                let samples = config.reaper_samples;
95                let rounds = config.reaper_max_rounds;
96                let handle = std::thread::Builder::new()
97                    .name(String::from("kevy-embedded-reaper"))
98                    .spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
99                (Some(stop), Some(handle))
100            }
101        };
102
103        Ok(Store {
104            inner,
105            config,
106            reaper_stop,
107            reaper_join,
108        })
109    }
110
111    /// The active config (a clone — modifying it has no effect on the
112    /// running store). Useful for introspection / `INFO`-style telemetry.
113    pub fn config(&self) -> &Config {
114        &self.config
115    }
116
117    // ---- escape hatches -------------------------------------------------
118
119    /// Run `f` against the underlying `kevy_store::Store` under the
120    /// embedded mutex. Use for direct access to methods this crate hasn't
121    /// wrapped (snapshot iteration, ZRANGE, raw collect_keys, …). The
122    /// closure can mutate, but *does not auto-log to the AOF* — call
123    /// [`Self::log`] yourself if the mutation must survive a crash.
124    pub fn with<F, R>(&self, f: F) -> R
125    where
126        F: FnOnce(&mut kevy_store::Store) -> R,
127    {
128        let mut g = self.lock();
129        f(&mut g.store)
130    }
131
132    /// Append a raw RESP-frame argument list to the AOF. Pairs with
133    /// [`Self::with`] when the closure performed a write you want to make
134    /// crash-safe. No-op when persistence is disabled.
135    pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
136        let mut g = self.lock();
137        if let Some(aof) = &mut g.aof {
138            let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
139            aof.append(&argv)?;
140        }
141        Ok(())
142    }
143
144    // ---- maintenance ----------------------------------------------------
145
146    /// Run one TTL-reaper tick. Required call cadence in `Manual` mode
147    /// (call ~10× per second to match Redis's `hz=10`); no-op cost is
148    /// one mutex lock + map-emptiness check when nothing has TTL.
149    pub fn tick(&self) -> ExpireStats {
150        let mut g = self.lock();
151        g.store
152            .tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
153    }
154
155    /// `BGREWRITEAOF`: rebuild the AOF from current state. Synchronous —
156    /// blocks until the rewrite + atomic rename completes. Returns
157    /// `Ok(None)` when persistence is disabled.
158    pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
159        let mut g = self.lock();
160        // Disjoint-field split-borrow: destructure the guard so the borrow
161        // checker sees `store` and `aof` as independent borrows, not two
162        // claims on the same `&mut Inner`.
163        let Inner { store, aof } = &mut *g;
164        let Some(aof) = aof else { return Ok(None) };
165        Ok(Some(aof.rewrite_from(store)?))
166    }
167
168    /// Snapshot the store to `<data_dir>/<snapshot_filename>`, atomically.
169    /// `Ok(false)` when persistence is disabled (caller can decide to
170    /// surface that or no-op).
171    pub fn save_snapshot(&self) -> io::Result<bool> {
172        let g = self.lock();
173        let Some(dir) = self.config.data_dir.as_ref() else {
174            return Ok(false);
175        };
176        let path: PathBuf = dir.join(&self.config.snapshot_filename);
177        save_snapshot(&g.store, &path)?;
178        Ok(true)
179    }
180
181    // ---- string ops -----------------------------------------------------
182
183    /// `SET key value` (no TTL, no NX/XX). Returns `true` always under
184    /// the embedded API (Redis semantics: SET overwrites; NX/XX vetoes
185    /// would return `false` but we don't expose those here — use
186    /// [`Self::with`] for the full surface).
187    pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
188        let mut g = self.lock();
189        let ok = g.store.set(key, value.to_vec(), None, false, false);
190        commit_write(&mut g, &[b"SET", key, value])?;
191        Ok(ok)
192    }
193
194    /// `SET key value PX ms` — overwrites + sets TTL.
195    pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
196        let mut g = self.lock();
197        let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
198        let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
199        commit_write(&mut g, &[b"SET", key, value])?;
200        commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
201        Ok(ok)
202    }
203
204    /// `GET key` — `Some(bytes)` on hit, `None` on miss or expired.
205    pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
206        let mut g = self.lock();
207        Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
208    }
209
210    /// `DEL key1 [key2 ...]`. Returns the count of keys actually removed.
211    pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
212        let mut g = self.lock();
213        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
214        let n = g.store.del(&owned);
215        if n > 0 {
216            let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
217            parts.push(b"DEL");
218            for k in keys {
219                parts.push(k);
220            }
221            commit_write(&mut g, &parts)?;
222        }
223        Ok(n)
224    }
225
226    /// `EXISTS key1 [key2 ...]`. Returns the count of existing keys
227    /// (duplicates counted multiple times, matching Redis).
228    pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
229        let mut g = self.lock();
230        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
231        Ok(g.store.exists(&owned))
232    }
233
234    /// `INCR key`. Returns the post-increment value.
235    pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
236        self.incr_by(key, 1)
237    }
238
239    /// `INCRBY key delta`. Negative `delta` does DECR-style work.
240    pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
241        let mut g = self.lock();
242        let n = g.store.incr_by(key, delta).map_err(store_err)?;
243        commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
244        Ok(n)
245    }
246
247    /// `EXPIRE key seconds`. Returns `true` if a key was touched.
248    pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
249        let mut g = self.lock();
250        let touched = g.store.expire(key, ttl);
251        if touched {
252            let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
253            commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
254        }
255        Ok(touched)
256    }
257
258    /// `PERSIST key`. Returns `true` if a TTL was actually cleared.
259    pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
260        let mut g = self.lock();
261        let touched = g.store.persist(key);
262        if touched {
263            commit_write(&mut g, &[b"PERSIST", key])?;
264        }
265        Ok(touched)
266    }
267
268    /// Remaining TTL in ms (or Redis-style `-1`/`-2` for no-TTL/no-key).
269    pub fn ttl_ms(&self, key: &[u8]) -> i64 {
270        self.lock().store.pttl(key)
271    }
272
273    /// `TYPE key` — `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"`.
274    pub fn type_of(&self, key: &[u8]) -> &'static str {
275        self.lock().store.type_of(key)
276    }
277
278    /// `DBSIZE` — total live keys.
279    pub fn dbsize(&self) -> usize {
280        self.lock().store.dbsize()
281    }
282
283    /// `FLUSHALL` — empty the keyspace (logged so a replay reaches the
284    /// same empty state).
285    pub fn flush(&self) -> io::Result<()> {
286        let mut g = self.lock();
287        g.store.flush();
288        commit_write(&mut g, &[b"FLUSHALL"])?;
289        Ok(())
290    }
291
292    /// `MEMORY USAGE` for one key — `Some(bytes)` or `None` if absent.
293    pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
294        self.lock().store.estimate_key_bytes(key)
295    }
296
297    /// Live `used_memory` estimate (matches `INFO memory`'s field).
298    pub fn used_memory(&self) -> u64 {
299        self.lock().store.used_memory()
300    }
301
302    /// `INFO`-style counter: total keys evicted by `maxmemory` so far.
303    pub fn evictions_total(&self) -> u64 {
304        self.lock().store.evictions_total()
305    }
306
307    /// `INFO`-style counter: total keys expired (lazy + active reaper).
308    pub fn expired_keys_total(&self) -> u64 {
309        self.lock().store.expired_keys_total()
310    }
311
312    // ---- hash ops -------------------------------------------------------
313
314    pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
315        let mut g = self.lock();
316        let owned: Vec<(Vec<u8>, Vec<u8>)> =
317            pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
318        let added = g.store.hset(key, &owned).map_err(store_err)?;
319        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
320        parts.push(b"HSET");
321        parts.push(key);
322        for (f, v) in pairs {
323            parts.push(f);
324            parts.push(v);
325        }
326        commit_write(&mut g, &parts)?;
327        Ok(added)
328    }
329
330    pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
331        let mut g = self.lock();
332        Ok(g.store.hget(key, field).map_err(store_err)?.map(|v| v.to_vec()))
333    }
334
335    pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
336        let mut g = self.lock();
337        let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
338        let removed = g.store.hdel(key, &owned).map_err(store_err)?;
339        if removed > 0 {
340            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
341            parts.push(b"HDEL");
342            parts.push(key);
343            for f in fields {
344                parts.push(f);
345            }
346            commit_write(&mut g, &parts)?;
347        }
348        Ok(removed)
349    }
350
351    // ---- list ops -------------------------------------------------------
352
353    pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
354        push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
355    }
356
357    pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
358        push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
359    }
360
361    pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
362        pop_helper(self, key, count, false)
363    }
364
365    pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
366        pop_helper(self, key, count, true)
367    }
368
369    pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
370        self.lock().store.llen(key).map_err(store_err)
371    }
372
373    // ---- set ops --------------------------------------------------------
374
375    pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
376        push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
377    }
378
379    pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
380        let mut g = self.lock();
381        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
382        let removed = g.store.srem(key, &owned).map_err(store_err)?;
383        if removed > 0 {
384            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
385            parts.push(b"SREM");
386            parts.push(key);
387            for m in members {
388                parts.push(m);
389            }
390            commit_write(&mut g, &parts)?;
391        }
392        Ok(removed)
393    }
394
395    pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
396        self.lock().store.smembers(key).map_err(store_err)
397    }
398
399    pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
400        self.lock().store.scard(key).map_err(store_err)
401    }
402
403    // ---- zset ops -------------------------------------------------------
404
405    pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
406        let mut g = self.lock();
407        let owned: Vec<(f64, Vec<u8>)> =
408            pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
409        let added = g.store.zadd(key, &owned).map_err(store_err)?;
410        // Log as `ZADD key score1 member1 score2 member2 ...`
411        let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
412        for (s, _) in pairs {
413            score_strs.push(format!("{s}").into_bytes());
414        }
415        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
416        parts.push(b"ZADD");
417        parts.push(key);
418        for (i, (_, m)) in pairs.iter().enumerate() {
419            parts.push(&score_strs[i]);
420            parts.push(m);
421        }
422        commit_write(&mut g, &parts)?;
423        Ok(added)
424    }
425
426    pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
427        let mut g = self.lock();
428        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
429        let removed = g.store.zrem(key, &owned).map_err(store_err)?;
430        if removed > 0 {
431            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
432            parts.push(b"ZREM");
433            parts.push(key);
434            for m in members {
435                parts.push(m);
436            }
437            commit_write(&mut g, &parts)?;
438        }
439        Ok(removed)
440    }
441
442    pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
443        self.lock().store.zscore(key, member).map_err(store_err)
444    }
445
446    pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
447        self.lock().store.zcard(key).map_err(store_err)
448    }
449
450    // ---- internal -------------------------------------------------------
451
452    fn lock(&self) -> MutexGuard<'_, Inner> {
453        // Mutex poisoning is recoverable: every method holds the lock only
454        // for its own short critical section, so a panic in one method
455        // doesn't leave the store in a corrupt state. Recover.
456        match self.inner.lock() {
457            Ok(g) => g,
458            Err(poison) => poison.into_inner(),
459        }
460    }
461}
462
463fn push_helper<F>(
464    s: &Store,
465    key: &[u8],
466    values: &[&[u8]],
467    verb: &'static [u8],
468    op: F,
469) -> io::Result<usize>
470where
471    F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
472{
473    let mut g = s.lock();
474    let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
475    let n = op(&mut g.store, key, &owned).map_err(store_err)?;
476    let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
477    parts.push(verb);
478    parts.push(key);
479    for v in values {
480        parts.push(v);
481    }
482    commit_write(&mut g, &parts)?;
483    Ok(n)
484}
485
486fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
487    let mut g = s.lock();
488    let popped = if from_tail {
489        g.store.rpop(key, count).map_err(store_err)?
490    } else {
491        g.store.lpop(key, count).map_err(store_err)?
492    };
493    if !popped.is_empty() {
494        let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
495        let count_str = popped.len().to_string();
496        let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
497        commit_write(&mut g, &parts)?;
498    }
499    Ok(popped)
500}
501
502fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
503    if let Some(aof) = aof {
504        let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
505        aof.append(&argv)?;
506    }
507    Ok(())
508}
509
510/// Complete a write: AOF-log the canonical RESP command, then run the
511/// store's post-write eviction sweep. Single helper so every write wrapper
512/// stays in lockstep — forgetting to evict means a maxmemory budget would
513/// grow without bound.
514fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
515    log_argv(&mut inner.aof, parts)?;
516    inner.store.try_evict_after_write();
517    Ok(())
518}
519
520fn store_err(e: StoreError) -> io::Error {
521    io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
522}
523
524fn reaper_loop(
525    inner: Arc<Mutex<Inner>>,
526    stop: Arc<AtomicBool>,
527    interval: Duration,
528    samples: usize,
529    rounds: u32,
530) {
531    while !stop.load(Ordering::Relaxed) {
532        std::thread::sleep(interval);
533        if stop.load(Ordering::Relaxed) {
534            break;
535        }
536        let mut g = match inner.lock() {
537            Ok(g) => g,
538            Err(poison) => poison.into_inner(),
539        };
540        let _ = g.store.tick_expire(samples, rounds);
541        // EverySec AOF fsync window check — embedded mode runs this from
542        // the same reaper tick rather than a separate timer.
543        if let Some(aof) = &mut g.aof {
544            let _ = aof.maybe_sync();
545        }
546    }
547}
548
549impl Drop for Store {
550    fn drop(&mut self) {
551        // Signal reaper to stop; it'll exit on next wake-up (≤ interval).
552        if let Some(stop) = &self.reaper_stop {
553            stop.store(true, Ordering::Relaxed);
554        }
555        if let Some(j) = self.reaper_join.take() {
556            let _ = j.join();
557        }
558        // Final AOF flush (BufWriter Drop also handles it, but be explicit
559        // so EverySec users don't lose the last sub-second of writes when
560        // dropping the store cleanly). Recover from poison: a panic in some
561        // method during this session shouldn't strand the AOF unflushed,
562        // since the underlying writes already landed in-memory before the
563        // panic — we just need to push the BufWriter contents out.
564        let mut g = match self.inner.lock() {
565            Ok(g) => g,
566            Err(poison) => poison.into_inner(),
567        };
568        if let Some(aof) = &mut g.aof {
569            let _ = aof.maybe_sync();
570        }
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use crate::config::{AppendFsync, EvictionPolicy};
578
579    fn tmp_dir(name: &str) -> PathBuf {
580        let mut p = std::env::temp_dir();
581        let uniq = std::time::SystemTime::now()
582            .duration_since(std::time::UNIX_EPOCH)
583            .unwrap()
584            .as_nanos();
585        p.push(format!("kevy-embedded-{name}-{uniq}"));
586        p
587    }
588
589    #[test]
590    fn in_memory_roundtrip() {
591        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
592        s.set(b"k", b"v").unwrap();
593        assert_eq!(s.get(b"k").unwrap(), Some(b"v".to_vec()));
594        assert_eq!(s.dbsize(), 1);
595        s.del(&[b"k"]).unwrap();
596        assert_eq!(s.dbsize(), 0);
597    }
598
599    #[test]
600    fn persistence_round_trip_via_aof() {
601        let dir = tmp_dir("aof-rt");
602        {
603            let s = Store::open(
604                Config::default()
605                    .with_persist(&dir)
606                    .with_ttl_reaper_manual()
607                    .with_appendfsync(AppendFsync::Always),
608            )
609            .unwrap();
610            for i in 0..50 {
611                s.set(format!("k{i}").as_bytes(), b"v").unwrap();
612            }
613            s.incr_by(b"counter", 41).unwrap();
614            s.hset(b"h", &[(b"field" as &[u8], b"val" as &[u8])]).unwrap();
615        }
616        // Reopen: AOF replay should reconstruct exactly the same state.
617        let s2 = Store::open(
618            Config::default()
619                .with_persist(&dir)
620                .with_ttl_reaper_manual(),
621        )
622        .unwrap();
623        assert_eq!(s2.dbsize(), 52); // 50 + counter + h
624        assert_eq!(s2.get(b"k0").unwrap(), Some(b"v".to_vec()));
625        assert_eq!(s2.get(b"k49").unwrap(), Some(b"v".to_vec()));
626        assert_eq!(s2.get(b"counter").unwrap(), Some(b"41".to_vec()));
627        assert_eq!(s2.hget(b"h", b"field").unwrap(), Some(b"val".to_vec()));
628        drop(s2);
629        let _ = std::fs::remove_dir_all(&dir);
630    }
631
632    #[test]
633    fn eviction_works_under_pressure() {
634        let s = Store::open(
635            Config::default()
636                .with_ttl_reaper_manual()
637                .with_max_memory(800)
638                .with_eviction(EvictionPolicy::AllKeysLru),
639        )
640        .unwrap();
641        for i in 0..50 {
642            s.set(format!("k{i:02}").as_bytes(), b"xxxxxxxxxxxxxxxxxxxx")
643                .unwrap();
644        }
645        assert!(s.used_memory() <= 800, "got {}", s.used_memory());
646        assert!(s.evictions_total() > 0);
647    }
648
649    #[test]
650    fn manual_tick_runs_active_reaper() {
651        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
652        s.set_with_ttl(b"short", b"v", Duration::from_millis(1)).unwrap();
653        s.set(b"perm", b"v").unwrap();
654        std::thread::sleep(Duration::from_millis(20));
655        let stats = s.tick();
656        // tick() should at least sample and reap (may take multiple ticks
657        // for sparse layouts; the call is idempotent).
658        let _ = stats;
659        let _ = s.get(b"short").unwrap(); // lazy reap path
660        assert!(s.expired_keys_total() >= 1);
661        assert!(s.get(b"perm").unwrap().is_some());
662    }
663
664    #[test]
665    fn with_escape_hatch_works() {
666        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
667        let zsize = s.with(|store| {
668            let _ = store.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec())]);
669            store.zcard(b"z").unwrap()
670        });
671        assert_eq!(zsize, 2);
672        // Direct (un-logged) write through `with`: caller may explicitly
673        // log if they want it crash-safe. Here we just verify it landed.
674        assert_eq!(s.type_of(b"z"), "zset");
675    }
676
677    #[test]
678    fn background_reaper_thread_drops_expired_keys() {
679        let s = Store::open(
680            Config::default().with_reaper_interval(Duration::from_millis(20)),
681        )
682        .unwrap();
683        s.set_with_ttl(b"k", b"v", Duration::from_millis(5)).unwrap();
684        std::thread::sleep(Duration::from_millis(120));
685        // The active reaper should have caught it without anyone reading.
686        let _ = s.get(b"k").unwrap(); // either way, key should now be gone
687        assert_eq!(s.dbsize(), 0);
688    }
689
690    #[test]
691    fn arc_sharing_across_threads() {
692        use std::sync::Arc;
693        let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual()).unwrap());
694        let mut handles = Vec::new();
695        for i in 0..8 {
696            let s = Arc::clone(&s);
697            handles.push(std::thread::spawn(move || {
698                for j in 0..50 {
699                    s.set(format!("t{i}-{j}").as_bytes(), b"v").unwrap();
700                }
701            }));
702        }
703        for h in handles {
704            h.join().unwrap();
705        }
706        assert_eq!(s.dbsize(), 8 * 50);
707    }
708
709    #[test]
710    fn drop_during_reaper_does_not_deadlock() {
711        // Sanity: a Store with a Background reaper must drop cleanly even
712        // while the reaper is sleeping. Without the stop-flag + join the
713        // drop would either hang or race the reaper holding the mutex.
714        for _ in 0..4 {
715            let s = Store::open(
716                Config::default().with_reaper_interval(Duration::from_millis(5)),
717            )
718            .unwrap();
719            s.set(b"k", b"v").unwrap();
720            // Let the reaper actually run a couple of times.
721            std::thread::sleep(Duration::from_millis(40));
722            drop(s); // must return within a few ms
723        }
724    }
725
726    #[test]
727    fn save_snapshot_then_restart() {
728        let dir = tmp_dir("snap-rt");
729        {
730            let s = Store::open(
731                Config::default()
732                    .with_persist(&dir)
733                    .without_aof()
734                    .with_ttl_reaper_manual(),
735            )
736            .unwrap();
737            for i in 0..10 {
738                s.set(format!("k{i}").as_bytes(), b"v").unwrap();
739            }
740            let saved = s.save_snapshot().unwrap();
741            assert!(saved);
742        }
743        let s2 = Store::open(
744            Config::default()
745                .with_persist(&dir)
746                .without_aof()
747                .with_ttl_reaper_manual(),
748        )
749        .unwrap();
750        assert_eq!(s2.dbsize(), 10);
751        let _ = std::fs::remove_dir_all(&dir);
752    }
753}