Skip to main content

kevy_embedded/
store.rs

1//! [`Store`] — the embedded entry point. Wraps `kevy_store::Store` with
2//! a mutex (for cross-thread access), optional AOF auto-logging, an
3//! optional background TTL reaper, and an in-process pub/sub bus.
4
5use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex, MutexGuard, Weak};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
13use kevy_store::{ExpireStats, StoreError};
14
15use crate::config::{Config, TtlReaperMode};
16use crate::pubsub::{PubsubBus, Subscription};
17
18/// The embedded keyspace.
19///
20/// **`Store` is `Clone`** (since v1.1.0). A clone is a cheap `Arc` bump:
21/// every clone reaches the same underlying `kevy_store::Store` + AOF +
22/// reaper + pub/sub bus. The reaper thread is joined and the AOF is
23/// flushed exactly once, when the **last** clone is dropped.
24///
25/// ```
26/// use kevy_embedded::{Config, Store};
27///
28/// # fn main() -> std::io::Result<()> {
29/// let s = Store::open(Config::default().with_ttl_reaper_manual())?;
30/// let s2 = s.clone();
31/// std::thread::spawn(move || {
32///     s2.set(b"from-thread", b"v").unwrap();
33/// }).join().unwrap();
34/// assert_eq!(s.get(b"from-thread")?, Some(b"v".to_vec()));
35/// # Ok(())
36/// # }
37/// ```
38///
39/// Every method takes `&self`. The internal `Arc<Mutex<Inner>>` is what
40/// makes shared access safe under contention.
41#[derive(Clone)]
42pub struct Store {
43    inner: Arc<Mutex<Inner>>,
44    /// Shared drop guard: signals + joins reaper and flushes AOF when the
45    /// LAST `Store` clone (or `Subscription`) holding a strong ref drops.
46    guard: Arc<DropGuard>,
47    config: Config,
48}
49
50/// Weak handle to a `Store` — does not keep the underlying keyspace alive.
51///
52/// Used by the URL-keyed registry in `kevy-client` so that multiple
53/// `Connection::open("mem://name")` calls share the same backing store
54/// without leaking it when all strong handles go away.
55pub struct WeakStore {
56    inner: Weak<Mutex<Inner>>,
57    guard: Weak<DropGuard>,
58    config: Config,
59}
60
61impl WeakStore {
62    /// Try to upgrade back to a `Store`. Returns `None` if the last strong
63    /// reference has already been dropped.
64    pub fn upgrade(&self) -> Option<Store> {
65        Some(Store {
66            inner: self.inner.upgrade()?,
67            guard: self.guard.upgrade()?,
68            config: self.config.clone(),
69        })
70    }
71}
72
73pub(crate) struct Inner {
74    pub(crate) store: kevy_store::Store,
75    pub(crate) aof: Option<Aof>,
76    pub(crate) bus: PubsubBus,
77}
78
79/// Owns the reaper-thread handle + a back-reference to `Inner` for the
80/// final AOF flush. Lives in an `Arc<DropGuard>` shared across every
81/// `Store` clone; the actual drop logic fires only when the last clone
82/// goes away. `JoinHandle` is wrapped in `Mutex<Option>` so `Drop` can
83/// `.take()` it while only having `&self`.
84pub(crate) struct DropGuard {
85    reaper_stop: Option<Arc<AtomicBool>>,
86    reaper_join: Mutex<Option<JoinHandle<()>>>,
87    inner_for_flush: Arc<Mutex<Inner>>,
88}
89
90impl Store {
91    /// Open an embedded keyspace per `config`.
92    ///
93    /// - Pure in-memory when `config.data_dir` is `None`.
94    /// - With persistence: loads `<data_dir>/<snapshot_filename>` first,
95    ///   then replays `<data_dir>/<aof_filename>`. Both are best-effort —
96    ///   missing files are fine, a truncated AOF tail is silently dropped.
97    /// - Spawns a background TTL reaper thread when
98    ///   `config.ttl_reaper == Background` (the default).
99    pub fn open(config: Config) -> io::Result<Self> {
100        let mut store = kevy_store::Store::new();
101        store.set_max_memory(config.maxmemory, config.eviction_policy);
102
103        let aof = if let Some(dir) = &config.data_dir {
104            std::fs::create_dir_all(dir)?;
105            let snap_path = dir.join(&config.snapshot_filename);
106            if snap_path.exists() {
107                load_snapshot(&mut store, &snap_path)?;
108            }
109            let aof_path = dir.join(&config.aof_filename);
110            if aof_path.exists() {
111                replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
112            }
113            if config.aof {
114                Some(Aof::open(&aof_path, config.appendfsync)?)
115            } else {
116                None
117            }
118        } else {
119            None
120        };
121
122        let inner = Arc::new(Mutex::new(Inner {
123            store,
124            aof,
125            bus: PubsubBus::new(),
126        }));
127
128        let (reaper_stop, reaper_join) = match config.ttl_reaper {
129            TtlReaperMode::Manual => (None, None),
130            TtlReaperMode::Background => {
131                let stop = Arc::new(AtomicBool::new(false));
132                let stop_t = stop.clone();
133                let inner_t = inner.clone();
134                let interval = config.reaper_interval;
135                let samples = config.reaper_samples;
136                let rounds = config.reaper_max_rounds;
137                let handle = std::thread::Builder::new()
138                    .name(String::from("kevy-embedded-reaper"))
139                    .spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
140                (Some(stop), Some(handle))
141            }
142        };
143
144        let guard = Arc::new(DropGuard {
145            reaper_stop,
146            reaper_join: Mutex::new(reaper_join),
147            inner_for_flush: inner.clone(),
148        });
149
150        Ok(Store {
151            inner,
152            guard,
153            config,
154        })
155    }
156
157    /// Get a weak handle that does not keep the keyspace alive.
158    /// `upgrade()` returns `None` once the last strong `Store` is dropped.
159    pub fn downgrade(&self) -> WeakStore {
160        WeakStore {
161            inner: Arc::downgrade(&self.inner),
162            guard: Arc::downgrade(&self.guard),
163            config: self.config.clone(),
164        }
165    }
166
167    /// The active config (a clone — modifying it has no effect on the
168    /// running store). Useful for introspection / `INFO`-style telemetry.
169    pub fn config(&self) -> &Config {
170        &self.config
171    }
172
173    // ---- escape hatches -------------------------------------------------
174
175    /// Run `f` against the underlying `kevy_store::Store` under the
176    /// embedded mutex. Use for direct access to methods this crate hasn't
177    /// wrapped (snapshot iteration, ZRANGE, raw collect_keys, …). The
178    /// closure can mutate, but *does not auto-log to the AOF* — call
179    /// [`Self::log`] yourself if the mutation must survive a crash.
180    pub fn with<F, R>(&self, f: F) -> R
181    where
182        F: FnOnce(&mut kevy_store::Store) -> R,
183    {
184        let mut g = self.lock();
185        f(&mut g.store)
186    }
187
188    /// Append a raw RESP-frame argument list to the AOF. Pairs with
189    /// [`Self::with`] when the closure performed a write you want to make
190    /// crash-safe. No-op when persistence is disabled.
191    pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
192        let mut g = self.lock();
193        if let Some(aof) = &mut g.aof {
194            let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
195            aof.append(&argv)?;
196        }
197        Ok(())
198    }
199
200    // ---- maintenance ----------------------------------------------------
201
202    /// Run one TTL-reaper tick. Required call cadence in `Manual` mode
203    /// (call ~10× per second to match Redis's `hz=10`); no-op cost is
204    /// one mutex lock + map-emptiness check when nothing has TTL.
205    pub fn tick(&self) -> ExpireStats {
206        let mut g = self.lock();
207        g.store
208            .tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
209    }
210
211    /// `BGREWRITEAOF`: rebuild the AOF from current state. Synchronous —
212    /// blocks until the rewrite + atomic rename completes. Returns
213    /// `Ok(None)` when persistence is disabled.
214    pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
215        let mut g = self.lock();
216        // Disjoint-field split-borrow: destructure the guard so the borrow
217        // checker sees `store` and `aof` as independent borrows, not two
218        // claims on the same `&mut Inner`.
219        let Inner { store, aof, bus: _ } = &mut *g;
220        let Some(aof) = aof else { return Ok(None) };
221        Ok(Some(aof.rewrite_from(store)?))
222    }
223
224    /// Snapshot the store to `<data_dir>/<snapshot_filename>`, atomically.
225    /// `Ok(false)` when persistence is disabled (caller can decide to
226    /// surface that or no-op).
227    pub fn save_snapshot(&self) -> io::Result<bool> {
228        let g = self.lock();
229        let Some(dir) = self.config.data_dir.as_ref() else {
230            return Ok(false);
231        };
232        let path: PathBuf = dir.join(&self.config.snapshot_filename);
233        save_snapshot(&g.store, &path)?;
234        Ok(true)
235    }
236
237    // ---- string ops -----------------------------------------------------
238
239    /// `SET key value` (no TTL, no NX/XX). Returns `true` always under
240    /// the embedded API (Redis semantics: SET overwrites; NX/XX vetoes
241    /// would return `false` but we don't expose those here — use
242    /// [`Self::with`] for the full surface).
243    pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
244        let mut g = self.lock();
245        let ok = g.store.set(key, value.to_vec(), None, false, false);
246        commit_write(&mut g, &[b"SET", key, value])?;
247        Ok(ok)
248    }
249
250    /// `SET key value PX ms` — overwrites + sets TTL.
251    pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
252        let mut g = self.lock();
253        let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
254        let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
255        commit_write(&mut g, &[b"SET", key, value])?;
256        commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
257        Ok(ok)
258    }
259
260    /// `GET key` — `Some(bytes)` on hit, `None` on miss or expired.
261    pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
262        let mut g = self.lock();
263        Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
264    }
265
266    /// `DEL key1 [key2 ...]`. Returns the count of keys actually removed.
267    pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
268        let mut g = self.lock();
269        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
270        let n = g.store.del(&owned);
271        if n > 0 {
272            let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
273            parts.push(b"DEL");
274            for k in keys {
275                parts.push(k);
276            }
277            commit_write(&mut g, &parts)?;
278        }
279        Ok(n)
280    }
281
282    /// `EXISTS key1 [key2 ...]`. Returns the count of existing keys
283    /// (duplicates counted multiple times, matching Redis).
284    pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
285        let mut g = self.lock();
286        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
287        Ok(g.store.exists(&owned))
288    }
289
290    /// `INCR key`. Returns the post-increment value.
291    pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
292        self.incr_by(key, 1)
293    }
294
295    /// `INCRBY key delta`. Negative `delta` does DECR-style work.
296    pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
297        let mut g = self.lock();
298        let n = g.store.incr_by(key, delta).map_err(store_err)?;
299        commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
300        Ok(n)
301    }
302
303    /// `EXPIRE key seconds`. Returns `true` if a key was touched.
304    pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
305        let mut g = self.lock();
306        let touched = g.store.expire(key, ttl);
307        if touched {
308            let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
309            commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
310        }
311        Ok(touched)
312    }
313
314    /// `PERSIST key`. Returns `true` if a TTL was actually cleared.
315    pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
316        let mut g = self.lock();
317        let touched = g.store.persist(key);
318        if touched {
319            commit_write(&mut g, &[b"PERSIST", key])?;
320        }
321        Ok(touched)
322    }
323
324    /// Remaining TTL in ms (or Redis-style `-1`/`-2` for no-TTL/no-key).
325    pub fn ttl_ms(&self, key: &[u8]) -> i64 {
326        self.lock().store.pttl(key)
327    }
328
329    /// `TYPE key` — `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"`.
330    pub fn type_of(&self, key: &[u8]) -> &'static str {
331        self.lock().store.type_of(key)
332    }
333
334    /// `DBSIZE` — total live keys.
335    pub fn dbsize(&self) -> usize {
336        self.lock().store.dbsize()
337    }
338
339    /// `FLUSHALL` — empty the keyspace (logged so a replay reaches the
340    /// same empty state).
341    pub fn flush(&self) -> io::Result<()> {
342        let mut g = self.lock();
343        g.store.flush();
344        commit_write(&mut g, &[b"FLUSHALL"])?;
345        Ok(())
346    }
347
348    /// `MEMORY USAGE` for one key — `Some(bytes)` or `None` if absent.
349    pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
350        self.lock().store.estimate_key_bytes(key)
351    }
352
353    /// Live `used_memory` estimate (matches `INFO memory`'s field).
354    pub fn used_memory(&self) -> u64 {
355        self.lock().store.used_memory()
356    }
357
358    /// `INFO`-style counter: total keys evicted by `maxmemory` so far.
359    pub fn evictions_total(&self) -> u64 {
360        self.lock().store.evictions_total()
361    }
362
363    /// `INFO`-style counter: total keys expired (lazy + active reaper).
364    pub fn expired_keys_total(&self) -> u64 {
365        self.lock().store.expired_keys_total()
366    }
367
368    // ---- hash ops -------------------------------------------------------
369
370    pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
371        let mut g = self.lock();
372        let owned: Vec<(Vec<u8>, Vec<u8>)> =
373            pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
374        let added = g.store.hset(key, &owned).map_err(store_err)?;
375        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
376        parts.push(b"HSET");
377        parts.push(key);
378        for (f, v) in pairs {
379            parts.push(f);
380            parts.push(v);
381        }
382        commit_write(&mut g, &parts)?;
383        Ok(added)
384    }
385
386    pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
387        let mut g = self.lock();
388        Ok(g.store.hget(key, field).map_err(store_err)?.map(|v| v.to_vec()))
389    }
390
391    pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
392        let mut g = self.lock();
393        let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
394        let removed = g.store.hdel(key, &owned).map_err(store_err)?;
395        if removed > 0 {
396            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
397            parts.push(b"HDEL");
398            parts.push(key);
399            for f in fields {
400                parts.push(f);
401            }
402            commit_write(&mut g, &parts)?;
403        }
404        Ok(removed)
405    }
406
407    // ---- list ops -------------------------------------------------------
408
409    pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
410        push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
411    }
412
413    pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
414        push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
415    }
416
417    pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
418        pop_helper(self, key, count, false)
419    }
420
421    pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
422        pop_helper(self, key, count, true)
423    }
424
425    pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
426        self.lock().store.llen(key).map_err(store_err)
427    }
428
429    // ---- set ops --------------------------------------------------------
430
431    pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
432        push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
433    }
434
435    pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
436        let mut g = self.lock();
437        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
438        let removed = g.store.srem(key, &owned).map_err(store_err)?;
439        if removed > 0 {
440            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
441            parts.push(b"SREM");
442            parts.push(key);
443            for m in members {
444                parts.push(m);
445            }
446            commit_write(&mut g, &parts)?;
447        }
448        Ok(removed)
449    }
450
451    pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
452        self.lock().store.smembers(key).map_err(store_err)
453    }
454
455    pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
456        self.lock().store.scard(key).map_err(store_err)
457    }
458
459    // ---- zset ops -------------------------------------------------------
460
461    pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
462        let mut g = self.lock();
463        let owned: Vec<(f64, Vec<u8>)> =
464            pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
465        let added = g.store.zadd(key, &owned).map_err(store_err)?;
466        // Log as `ZADD key score1 member1 score2 member2 ...`
467        let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
468        for (s, _) in pairs {
469            score_strs.push(format!("{s}").into_bytes());
470        }
471        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
472        parts.push(b"ZADD");
473        parts.push(key);
474        for (i, (_, m)) in pairs.iter().enumerate() {
475            parts.push(&score_strs[i]);
476            parts.push(m);
477        }
478        commit_write(&mut g, &parts)?;
479        Ok(added)
480    }
481
482    pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
483        let mut g = self.lock();
484        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
485        let removed = g.store.zrem(key, &owned).map_err(store_err)?;
486        if removed > 0 {
487            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
488            parts.push(b"ZREM");
489            parts.push(key);
490            for m in members {
491                parts.push(m);
492            }
493            commit_write(&mut g, &parts)?;
494        }
495        Ok(removed)
496    }
497
498    pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
499        self.lock().store.zscore(key, member).map_err(store_err)
500    }
501
502    pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
503        self.lock().store.zcard(key).map_err(store_err)
504    }
505
506    // ---- pub/sub --------------------------------------------------------
507
508    /// `PUBLISH channel payload`. Delivers `payload` to every subscriber on
509    /// `channel` (direct + pattern matches) running inside this same
510    /// process. Returns the count of receivers that the message reached.
511    pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
512        // Clone out the matching senders under the lock, then release the
513        // lock before send() so a slow receiver can't stall every other
514        // shard of the bus.
515        let plans = {
516            let g = self.lock();
517            g.bus.collect_delivery(channel, payload)
518        };
519        let mut count = 0;
520        for (frame, sender) in plans {
521            if sender.send(frame).is_ok() {
522                count += 1;
523            }
524        }
525        count
526    }
527
528    /// Open a [`Subscription`] subscribed to `channels`. The subscription
529    /// owns its receive end; drop it to unsubscribe from everything.
530    /// Pass `&[]` to start with no subscriptions and add some later via
531    /// [`Subscription::subscribe`] / [`Subscription::psubscribe`].
532    pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
533        let mut sub = Subscription::new(self.inner.clone(), self.guard.clone());
534        if !channels.is_empty() {
535            sub.subscribe(channels);
536        }
537        sub
538    }
539
540    /// Convenience: open a [`Subscription`] starting on pattern subscriptions.
541    pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
542        let mut sub = Subscription::new(self.inner.clone(), self.guard.clone());
543        if !patterns.is_empty() {
544            sub.psubscribe(patterns);
545        }
546        sub
547    }
548
549    // ---- internal -------------------------------------------------------
550
551    fn lock(&self) -> MutexGuard<'_, Inner> {
552        // Mutex poisoning is recoverable: every method holds the lock only
553        // for its own short critical section, so a panic in one method
554        // doesn't leave the store in a corrupt state. Recover.
555        match self.inner.lock() {
556            Ok(g) => g,
557            Err(poison) => poison.into_inner(),
558        }
559    }
560}
561
562fn push_helper<F>(
563    s: &Store,
564    key: &[u8],
565    values: &[&[u8]],
566    verb: &'static [u8],
567    op: F,
568) -> io::Result<usize>
569where
570    F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
571{
572    let mut g = s.lock();
573    let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
574    let n = op(&mut g.store, key, &owned).map_err(store_err)?;
575    let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
576    parts.push(verb);
577    parts.push(key);
578    for v in values {
579        parts.push(v);
580    }
581    commit_write(&mut g, &parts)?;
582    Ok(n)
583}
584
585fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
586    let mut g = s.lock();
587    let popped = if from_tail {
588        g.store.rpop(key, count).map_err(store_err)?
589    } else {
590        g.store.lpop(key, count).map_err(store_err)?
591    };
592    if !popped.is_empty() {
593        let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
594        let count_str = popped.len().to_string();
595        let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
596        commit_write(&mut g, &parts)?;
597    }
598    Ok(popped)
599}
600
601fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
602    if let Some(aof) = aof {
603        let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
604        aof.append(&argv)?;
605    }
606    Ok(())
607}
608
609/// Complete a write: AOF-log the canonical RESP command, then run the
610/// store's post-write eviction sweep. Single helper so every write wrapper
611/// stays in lockstep — forgetting to evict means a maxmemory budget would
612/// grow without bound.
613fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
614    log_argv(&mut inner.aof, parts)?;
615    inner.store.try_evict_after_write();
616    Ok(())
617}
618
619fn store_err(e: StoreError) -> io::Error {
620    io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
621}
622
623fn reaper_loop(
624    inner: Arc<Mutex<Inner>>,
625    stop: Arc<AtomicBool>,
626    interval: Duration,
627    samples: usize,
628    rounds: u32,
629) {
630    while !stop.load(Ordering::Relaxed) {
631        std::thread::sleep(interval);
632        if stop.load(Ordering::Relaxed) {
633            break;
634        }
635        let mut g = match inner.lock() {
636            Ok(g) => g,
637            Err(poison) => poison.into_inner(),
638        };
639        let _ = g.store.tick_expire(samples, rounds);
640        // EverySec AOF fsync window check — embedded mode runs this from
641        // the same reaper tick rather than a separate timer.
642        if let Some(aof) = &mut g.aof {
643            let _ = aof.maybe_sync();
644        }
645    }
646}
647
648impl Drop for DropGuard {
649    fn drop(&mut self) {
650        // Last `Store` clone is going away — stop the reaper, join it, then
651        // flush the AOF so EverySec users don't lose the last sub-second of
652        // writes. Poison recovery: a method panic earlier shouldn't strand
653        // the AOF unflushed; the writes already landed in-memory.
654        if let Some(stop) = &self.reaper_stop {
655            stop.store(true, Ordering::Relaxed);
656        }
657        if let Some(j) = self
658            .reaper_join
659            .lock()
660            .unwrap_or_else(|p| p.into_inner())
661            .take()
662        {
663            let _ = j.join();
664        }
665        let mut g = match self.inner_for_flush.lock() {
666            Ok(g) => g,
667            Err(poison) => poison.into_inner(),
668        };
669        if let Some(aof) = &mut g.aof {
670            let _ = aof.maybe_sync();
671        }
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678    use crate::config::{AppendFsync, EvictionPolicy};
679
680    fn tmp_dir(name: &str) -> PathBuf {
681        let mut p = std::env::temp_dir();
682        let uniq = std::time::SystemTime::now()
683            .duration_since(std::time::UNIX_EPOCH)
684            .unwrap()
685            .as_nanos();
686        p.push(format!("kevy-embedded-{name}-{uniq}"));
687        p
688    }
689
690    #[test]
691    fn in_memory_roundtrip() {
692        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
693        s.set(b"k", b"v").unwrap();
694        assert_eq!(s.get(b"k").unwrap(), Some(b"v".to_vec()));
695        assert_eq!(s.dbsize(), 1);
696        s.del(&[b"k"]).unwrap();
697        assert_eq!(s.dbsize(), 0);
698    }
699
700    #[test]
701    fn persistence_round_trip_via_aof() {
702        let dir = tmp_dir("aof-rt");
703        {
704            let s = Store::open(
705                Config::default()
706                    .with_persist(&dir)
707                    .with_ttl_reaper_manual()
708                    .with_appendfsync(AppendFsync::Always),
709            )
710            .unwrap();
711            for i in 0..50 {
712                s.set(format!("k{i}").as_bytes(), b"v").unwrap();
713            }
714            s.incr_by(b"counter", 41).unwrap();
715            s.hset(b"h", &[(b"field" as &[u8], b"val" as &[u8])]).unwrap();
716        }
717        // Reopen: AOF replay should reconstruct exactly the same state.
718        let s2 = Store::open(
719            Config::default()
720                .with_persist(&dir)
721                .with_ttl_reaper_manual(),
722        )
723        .unwrap();
724        assert_eq!(s2.dbsize(), 52); // 50 + counter + h
725        assert_eq!(s2.get(b"k0").unwrap(), Some(b"v".to_vec()));
726        assert_eq!(s2.get(b"k49").unwrap(), Some(b"v".to_vec()));
727        assert_eq!(s2.get(b"counter").unwrap(), Some(b"41".to_vec()));
728        assert_eq!(s2.hget(b"h", b"field").unwrap(), Some(b"val".to_vec()));
729        drop(s2);
730        let _ = std::fs::remove_dir_all(&dir);
731    }
732
733    #[test]
734    fn eviction_works_under_pressure() {
735        let s = Store::open(
736            Config::default()
737                .with_ttl_reaper_manual()
738                .with_max_memory(800)
739                .with_eviction(EvictionPolicy::AllKeysLru),
740        )
741        .unwrap();
742        for i in 0..50 {
743            s.set(format!("k{i:02}").as_bytes(), b"xxxxxxxxxxxxxxxxxxxx")
744                .unwrap();
745        }
746        assert!(s.used_memory() <= 800, "got {}", s.used_memory());
747        assert!(s.evictions_total() > 0);
748    }
749
750    #[test]
751    fn manual_tick_runs_active_reaper() {
752        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
753        s.set_with_ttl(b"short", b"v", Duration::from_millis(1)).unwrap();
754        s.set(b"perm", b"v").unwrap();
755        std::thread::sleep(Duration::from_millis(20));
756        let stats = s.tick();
757        // tick() should at least sample and reap (may take multiple ticks
758        // for sparse layouts; the call is idempotent).
759        let _ = stats;
760        let _ = s.get(b"short").unwrap(); // lazy reap path
761        assert!(s.expired_keys_total() >= 1);
762        assert!(s.get(b"perm").unwrap().is_some());
763    }
764
765    #[test]
766    fn with_escape_hatch_works() {
767        let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
768        let zsize = s.with(|store| {
769            let _ = store.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec())]);
770            store.zcard(b"z").unwrap()
771        });
772        assert_eq!(zsize, 2);
773        // Direct (un-logged) write through `with`: caller may explicitly
774        // log if they want it crash-safe. Here we just verify it landed.
775        assert_eq!(s.type_of(b"z"), "zset");
776    }
777
778    #[test]
779    fn background_reaper_thread_drops_expired_keys() {
780        let s = Store::open(
781            Config::default().with_reaper_interval(Duration::from_millis(20)),
782        )
783        .unwrap();
784        s.set_with_ttl(b"k", b"v", Duration::from_millis(5)).unwrap();
785        std::thread::sleep(Duration::from_millis(120));
786        // The active reaper should have caught it without anyone reading.
787        let _ = s.get(b"k").unwrap(); // either way, key should now be gone
788        assert_eq!(s.dbsize(), 0);
789    }
790
791    #[test]
792    fn arc_sharing_across_threads() {
793        use std::sync::Arc;
794        let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual()).unwrap());
795        let mut handles = Vec::new();
796        for i in 0..8 {
797            let s = Arc::clone(&s);
798            handles.push(std::thread::spawn(move || {
799                for j in 0..50 {
800                    s.set(format!("t{i}-{j}").as_bytes(), b"v").unwrap();
801                }
802            }));
803        }
804        for h in handles {
805            h.join().unwrap();
806        }
807        assert_eq!(s.dbsize(), 8 * 50);
808    }
809
810    #[test]
811    fn drop_during_reaper_does_not_deadlock() {
812        // Sanity: a Store with a Background reaper must drop cleanly even
813        // while the reaper is sleeping. Without the stop-flag + join the
814        // drop would either hang or race the reaper holding the mutex.
815        for _ in 0..4 {
816            let s = Store::open(
817                Config::default().with_reaper_interval(Duration::from_millis(5)),
818            )
819            .unwrap();
820            s.set(b"k", b"v").unwrap();
821            // Let the reaper actually run a couple of times.
822            std::thread::sleep(Duration::from_millis(40));
823            drop(s); // must return within a few ms
824        }
825    }
826
827    #[test]
828    fn save_snapshot_then_restart() {
829        let dir = tmp_dir("snap-rt");
830        {
831            let s = Store::open(
832                Config::default()
833                    .with_persist(&dir)
834                    .without_aof()
835                    .with_ttl_reaper_manual(),
836            )
837            .unwrap();
838            for i in 0..10 {
839                s.set(format!("k{i}").as_bytes(), b"v").unwrap();
840            }
841            let saved = s.save_snapshot().unwrap();
842            assert!(saved);
843        }
844        let s2 = Store::open(
845            Config::default()
846                .with_persist(&dir)
847                .without_aof()
848                .with_ttl_reaper_manual(),
849        )
850        .unwrap();
851        assert_eq!(s2.dbsize(), 10);
852        let _ = std::fs::remove_dir_all(&dir);
853    }
854}