Skip to main content

kevy_embedded/
store.rs

1//! [`Store`] — the embedded entry point. Wraps `kevy_store::Store` with
2//! per-shard locks (for cross-thread access), optional AOF auto-logging, an
3//! optional background TTL reaper, and an in-process pub/sub bus.
4
5use std::io;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
8use std::thread::JoinHandle;
9use std::time::Instant;
10
11use crate::metric::KevyMetric;
12
13use kevy_persist::{Aof, Argv, RewriteStats};
14use kevy_store::{ExpireStats, StoreError};
15
16use crate::config::Config;
17use crate::pubsub::PubsubBus;
18use crate::shard::{build_shards, shard_idx};
19
20/// The keyspace shards (`hash(key) % n`), each a fully independent
21/// `kevy_store::Store` + AOF behind its own lock. `n == 1` (the default) is a
22/// one-element vec = the original single-lock store.
23pub(crate) type Shards = Arc<Vec<Arc<RwLock<Inner>>>>;
24
25/// The embedded keyspace.
26///
27/// **`Store` is `Clone`** (since v1.1.0). A clone is a cheap `Arc` bump:
28/// every clone reaches the same underlying shards + AOF + reaper + pub/sub
29/// bus. The reaper thread is joined and each shard's AOF is flushed exactly
30/// once, when the **last** clone is dropped.
31///
32/// ```
33/// use kevy_embedded::{Config, Store};
34///
35/// # fn main() -> std::io::Result<()> {
36/// let s = Store::open(Config::default().with_ttl_reaper_manual())?;
37/// let s2 = s.clone();
38/// std::thread::spawn(move || {
39///     s2.set(b"from-thread", b"v").unwrap();
40/// }).join().unwrap();
41/// assert_eq!(s.get(b"from-thread")?, Some(b"v".to_vec()));
42/// # Ok(())
43/// # }
44/// ```
45///
46/// Every method takes `&self`. Sharding (see [`Config::with_shards`]) lets a
47/// multi-threaded consumer scale across cores; pub/sub is process-wide
48/// (handled on shard 0).
49#[derive(Clone)]
50pub struct Store {
51    shards: Shards,
52    /// Shared drop guard: signals + joins reaper and flushes AOFs when the
53    /// LAST `Store` clone (or `Subscription`) holding a strong ref drops.
54    guard: Arc<DropGuard>,
55    config: Config,
56}
57
58/// Weak handle to a `Store` — does not keep the underlying keyspace alive.
59///
60/// Used by the URL-keyed registry in `kevy-client` so that multiple
61/// `Connection::open("mem://name")` calls share the same backing store
62/// without leaking it when all strong handles go away.
63pub struct WeakStore {
64    shards: Weak<Vec<Arc<RwLock<Inner>>>>,
65    guard: Weak<DropGuard>,
66    config: Config,
67}
68
69impl WeakStore {
70    /// Try to upgrade back to a `Store`. Returns `None` if the last strong
71    /// reference has already been dropped.
72    pub fn upgrade(&self) -> Option<Store> {
73        Some(Store {
74            shards: self.shards.upgrade()?,
75            guard: self.guard.upgrade()?,
76            config: self.config.clone(),
77        })
78    }
79}
80
81pub(crate) struct Inner {
82    pub(crate) store: kevy_store::Store,
83    pub(crate) aof: Option<Aof>,
84    /// Pub/sub bus. Only shard 0's is ever used (pub/sub is process-wide);
85    /// other shards carry an idle one (cheap).
86    pub(crate) bus: PubsubBus,
87}
88
89impl Inner {
90    pub(crate) fn new(store: kevy_store::Store, aof: Option<Aof>) -> Self {
91        Inner { store, aof, bus: PubsubBus::new() }
92    }
93}
94
95/// Owns the reaper-thread handle + the shards for the final AOF flush. Lives
96/// in an `Arc<DropGuard>` shared across every `Store` clone; the drop logic
97/// fires only when the last clone goes away.
98pub(crate) struct DropGuard {
99    reaper_stop: Option<Arc<AtomicBool>>,
100    reaper_join: Mutex<Option<JoinHandle<()>>>,
101    shards_for_flush: Shards,
102}
103
104impl Store {
105    /// Open an embedded keyspace per `config`.
106    ///
107    /// - Pure in-memory when `config.data_dir` is `None`.
108    /// - With persistence: each shard loads its snapshot then replays its AOF
109    ///   (`config.shards > 1` re-shards a legacy single AOF on first open).
110    /// - Spawns a background TTL reaper thread when
111    ///   `config.ttl_reaper == Background` (the default).
112    pub fn open(config: Config) -> io::Result<Self> {
113        let shards: Shards = Arc::new(build_shards(&config)?);
114        let (reaper_stop, reaper_join) = crate::reaper::spawn_reaper(&config, &shards)?;
115        let guard = Arc::new(DropGuard {
116            reaper_stop,
117            reaper_join: Mutex::new(reaper_join),
118            shards_for_flush: shards.clone(),
119        });
120        Ok(Store { shards, guard, config })
121    }
122
123    /// Get a weak handle that does not keep the keyspace alive.
124    pub fn downgrade(&self) -> WeakStore {
125        WeakStore {
126            shards: Arc::downgrade(&self.shards),
127            guard: Arc::downgrade(&self.guard),
128            config: self.config.clone(),
129        }
130    }
131
132    /// The active config (a clone — modifying it has no effect on the
133    /// running store). Useful for introspection / `INFO`-style telemetry.
134    pub fn config(&self) -> &Config {
135        &self.config
136    }
137
138    // ---- escape hatches -------------------------------------------------
139
140    /// Run `f` against the underlying `kevy_store::Store` under its lock. Use
141    /// for direct access to methods this crate hasn't wrapped. The closure can
142    /// mutate, but *does not auto-log to the AOF* — call [`Self::log`] yourself
143    /// if the mutation must survive a crash.
144    ///
145    /// **Sharded stores:** this targets shard 0 only. Use [`Self::with_key`]
146    /// to reach the shard owning a specific key.
147    pub fn with<F, R>(&self, f: F) -> R
148    where
149        F: FnOnce(&mut kevy_store::Store) -> R,
150    {
151        let mut g = self.lock();
152        f(&mut g.store)
153    }
154
155    /// Like [`Self::with`] but targets the shard that owns `key`.
156    pub fn with_key<F, R>(&self, key: &[u8], f: F) -> R
157    where
158        F: FnOnce(&mut kevy_store::Store) -> R,
159    {
160        let mut g = self.wshard(key);
161        f(&mut g.store)
162    }
163
164    /// `KEYS` / `SCAN`-glob across **every shard** — the cross-shard
165    /// replacement for `with(|s| s.collect_keys(pat, lim))`, which only sees
166    /// shard 0 once sharding is on. Behaves identically to `with(...)` when
167    /// `shard_count() == 1`. `limit` bounds the *total* returned across shards.
168    /// Takes a read lock per shard (concurrent-safe).
169    pub fn collect_keys(&self, pattern: Option<&[u8]>, limit: Option<usize>) -> Vec<Vec<u8>> {
170        let mut out = Vec::new();
171        for shard in self.shards.iter() {
172            if limit.is_some_and(|l| out.len() >= l) {
173                break;
174            }
175            let remaining = limit.map(|l| l - out.len());
176            out.extend(lock_read(shard).store.collect_keys(pattern, remaining));
177        }
178        out
179    }
180
181    /// Run `f` against **each shard's** underlying `kevy_store::Store` (in
182    /// shard-index order) — the cross-shard escape hatch. The caller assembles
183    /// the merged result. Pairs with [`Self::shard_count`]. For a single key,
184    /// prefer [`Self::with_key`]; for a glob scan, prefer [`Self::collect_keys`].
185    pub fn for_each_shard<F: FnMut(&mut kevy_store::Store)>(&self, mut f: F) {
186        for shard in self.shards.iter() {
187            f(&mut lock_write(shard).store);
188        }
189    }
190
191    /// Number of keyspace shards (`== Config::shards`).
192    #[inline]
193    pub fn shard_count(&self) -> usize {
194        self.shards.len()
195    }
196
197    /// Append a raw RESP-frame argument list to the shard owning its key's
198    /// AOF. No-op when persistence is disabled.
199    pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
200        let mut g = match parts.get(1) {
201            Some(key) => self.wshard(key),
202            None => self.lock(),
203        };
204        if let Some(aof) = &mut g.aof {
205            let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
206            aof.append(&argv)?;
207        }
208        Ok(())
209    }
210
211    // ---- maintenance ----------------------------------------------------
212
213    /// Run one TTL-reaper tick across every shard. Required call cadence in
214    /// `Manual` mode (~10×/s to match Redis `hz=10`). Returns the summed stats.
215    pub fn tick(&self) -> ExpireStats {
216        let mut total = ExpireStats::default();
217        for shard in self.shards.iter() {
218            let stats = {
219                let mut g = lock_write(shard);
220                g.store.tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
221            };
222            total.sampled += stats.sampled;
223            total.expired += stats.expired;
224            // Auto-rewrite rides the caller-driven tick in Manual mode; the
225            // non-blocking path releases the lock for the disk spill.
226            crate::reaper::concurrent_auto_rewrite(
227                shard,
228                self.config.auto_aof_rewrite_pct,
229                self.config.auto_aof_rewrite_min_size,
230                self.config.metric_sink.as_ref(),
231            );
232        }
233        total
234    }
235
236    /// `BGREWRITEAOF`: rebuild every shard's AOF from current state.
237    /// Synchronous. Returns the summed stats (`None` if persistence is off /
238    /// no shard rewrote).
239    pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
240        let mut agg: Option<RewriteStats> = None;
241        for shard in self.shards.iter() {
242            let start = Instant::now();
243            // Phase 1 (locked): freeze the COW view + start the tee —
244            // O(n)-shallow, no serialization under the lock.
245            let (view, tmp, before_bytes) = {
246                let mut g = lock_write(shard);
247                let Inner { store, aof, bus: _ } = &mut *g;
248                let Some(aof) = aof else { continue };
249                if aof.is_rewriting() {
250                    continue;
251                }
252                let before = aof.size_bytes();
253                let view = store.collect_snapshot();
254                (view, aof.begin_view_rewrite()?, before)
255            };
256            // Phase 2 (unlocked): serialize + fsync the compacted log.
257            let keys = match kevy_persist::dump_aof(&tmp, &view) {
258                Ok((keys, _)) => keys,
259                Err(e) => {
260                    let mut g = lock_write(shard);
261                    if let Some(aof) = &mut g.aof {
262                        aof.abort_concurrent_rewrite();
263                    }
264                    let _ = std::fs::remove_file(&tmp);
265                    return Err(e);
266                }
267            };
268            // Phase 3 (locked): append the tee'd diff and swap.
269            let mut g = lock_write(shard);
270            let Some(aof) = &mut g.aof else { continue };
271            let stats = match aof.finish_concurrent_rewrite(&tmp, keys) {
272                Ok(s) => s,
273                Err(e) => {
274                    aof.abort_concurrent_rewrite();
275                    let _ = std::fs::remove_file(&tmp);
276                    return Err(e);
277                }
278            };
279            if let Some(sink) = &self.config.metric_sink {
280                sink.emit(KevyMetric::Rewrite {
281                    keys: stats.keys,
282                    before_bytes,
283                    after_bytes: stats.bytes,
284                    elapsed_ms: start.elapsed().as_millis() as u64,
285                });
286            }
287            let acc = agg.get_or_insert(RewriteStats { keys: 0, bytes: 0 });
288            acc.keys += stats.keys;
289            acc.bytes += stats.bytes;
290        }
291        Ok(agg)
292    }
293
294    /// Snapshot every shard to its `dump-{i}.rdb` (single shard: the configured
295    /// name), atomically. `Ok(false)` when persistence is disabled.
296    pub fn save_snapshot(&self) -> io::Result<bool> {
297        let Some(dir) = self.config.data_dir.as_ref() else {
298            return Ok(false);
299        };
300        let n = self.shards.len();
301        for (i, shard) in self.shards.iter().enumerate() {
302            let name = if n == 1 {
303                self.config.snapshot_filename.clone()
304            } else {
305                kevy_persist::layout::snapshot_file(i)
306            };
307            save_shard_snapshot(shard, &dir.join(name))?;
308        }
309        Ok(true)
310    }
311
312    // Data-type methods live in `crate::ops` / `crate::info`.
313
314    /// Crate-internal: clone shard 0's handle for a `Subscription`'s bus.
315    pub(crate) fn inner_handle(&self) -> Arc<RwLock<Inner>> {
316        self.shards[0].clone()
317    }
318
319    /// Crate-internal: clone the shared `Arc<DropGuard>`.
320    pub(crate) fn guard_handle(&self) -> Arc<DropGuard> {
321        self.guard.clone()
322    }
323
324    fn shard_for(&self, key: &[u8]) -> &Arc<RwLock<Inner>> {
325        &self.shards[shard_idx(key, self.shards.len())]
326    }
327
328    /// Write-lock the shard owning `key`.
329    pub(crate) fn wshard(&self, key: &[u8]) -> RwLockWriteGuard<'_, Inner> {
330        lock_write(self.shard_for(key))
331    }
332
333    /// Read-lock the shard owning `key` (GET fast path — concurrent readers
334    /// across shards run in parallel).
335    pub(crate) fn rshard(&self, key: &[u8]) -> RwLockReadGuard<'_, Inner> {
336        lock_read(self.shard_for(key))
337    }
338
339    /// Write-lock shard 0 — pub/sub bus + keyless escape hatches.
340    pub(crate) fn lock(&self) -> RwLockWriteGuard<'_, Inner> {
341        lock_write(&self.shards[0])
342    }
343
344    /// Run `f` over every shard's write guard, summing a `usize` (DBSIZE etc.).
345    pub(crate) fn sum_shards<F: Fn(&mut Inner) -> usize>(&self, f: F) -> usize {
346        self.shards.iter().map(|s| f(&mut lock_write(s))).sum()
347    }
348
349    /// Run `f` over every shard's write guard, summing a `u64`.
350    pub(crate) fn sum_shards_u64<F: Fn(&mut Inner) -> u64>(&self, f: F) -> u64 {
351        self.shards.iter().map(|s| f(&mut lock_write(s))).sum()
352    }
353
354    /// Run a fallible `f` over every shard (mutating, e.g. FLUSHALL).
355    pub(crate) fn try_for_each_shard<F: FnMut(&mut Inner) -> io::Result<()>>(
356        &self,
357        mut f: F,
358    ) -> io::Result<()> {
359        for s in self.shards.iter() {
360            f(&mut lock_write(s))?;
361        }
362        Ok(())
363    }
364}
365
366/// Write-lock an `Inner`, recovering from poison (short critical sections; a
367/// panic in one doesn't corrupt the keyspace).
368/// Save one shard's snapshot with the snapshot+log contract intact: after
369/// a successful save the AOF holds **only post-collect writes**, so a
370/// restart replays them over the snapshot without double-applying history
371/// (non-idempotent commands like RPUSH duplicated before this).
372///
373/// Phase 1 (write lock): freeze the COW view + start the AOF tee — no
374/// write may land between the two (the tee atomicity contract). Phase 2
375/// (unlocked): serialize the view to the snapshot's durable tmp. Phase 3
376/// (write lock): commit — snapshot rename and tee'd AOF reset adjacent,
377/// so the snapshot/log commit window stays microseconds.
378fn save_shard_snapshot(shard: &RwLock<Inner>, path: &std::path::Path) -> io::Result<()> {
379    let (view, reset_tmp) = freeze_for_save(shard)?;
380    let tmp = match kevy_persist::write_snapshot_tmp(&view, path) {
381        Ok(t) => t,
382        Err(e) => {
383            if reset_tmp.is_some()
384                && let Some(aof) = &mut lock_write(shard).aof
385            {
386                aof.abort_concurrent_rewrite();
387            }
388            return Err(e);
389        }
390    };
391    let mut g = lock_write(shard);
392    std::fs::rename(&tmp, path)?;
393    if let (Some(reset), Some(aof)) = (reset_tmp, &mut g.aof) {
394        let swap = kevy_persist::write_aof_base(&reset)
395            .and_then(|()| aof.finish_concurrent_rewrite(&reset, 0));
396        if let Err(e) = swap {
397            aof.abort_concurrent_rewrite();
398            let _ = std::fs::remove_file(&reset);
399            return Err(e);
400        }
401    }
402    Ok(())
403}
404
405/// Phase-1 helper: collect the view and start the tee under one write
406/// lock. A racing background auto-rewrite owns the tee; it runs its slow
407/// half off-lock and finishes in milliseconds, so wait it out (bounded)
408/// rather than saving a snapshot whose log would double-apply on replay.
409fn freeze_for_save(
410    shard: &RwLock<Inner>,
411) -> io::Result<(kevy_store::SnapshotView, Option<std::path::PathBuf>)> {
412    for _ in 0..2000 {
413        {
414            let mut g = lock_write(shard);
415            let Inner { store, aof, .. } = &mut *g;
416            match aof {
417                Some(a) if a.is_rewriting() => {} // racing rewrite — retry
418                Some(a) => {
419                    let view = store.collect_snapshot();
420                    return Ok((view, Some(a.begin_view_rewrite()?)));
421                }
422                None => return Ok((store.collect_snapshot(), None)),
423            }
424        }
425        std::thread::sleep(std::time::Duration::from_millis(5));
426    }
427    Err(io::Error::new(
428        io::ErrorKind::TimedOut,
429        "kevy-embedded: AOF rewrite still in flight after 10s; snapshot aborted",
430    ))
431}
432
433pub(crate) fn lock_write(shard: &RwLock<Inner>) -> RwLockWriteGuard<'_, Inner> {
434    shard.write().unwrap_or_else(|p| p.into_inner())
435}
436
437/// Read-lock an `Inner`, recovering from poison.
438pub(crate) fn lock_read(shard: &RwLock<Inner>) -> RwLockReadGuard<'_, Inner> {
439    shard.read().unwrap_or_else(|p| p.into_inner())
440}
441
442fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
443    if let Some(aof) = aof {
444        let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
445        aof.append(&argv)?;
446    }
447    Ok(())
448}
449
450/// Complete a write on one shard: AOF-log the canonical RESP command, then run
451/// that shard's post-write eviction sweep.
452pub(crate) fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
453    log_argv(&mut inner.aof, parts)?;
454    inner.store.try_evict_after_write();
455    Ok(())
456}
457
458pub(crate) fn store_err(e: StoreError) -> io::Error {
459    io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
460}
461
462impl Drop for DropGuard {
463    fn drop(&mut self) {
464        // Stop + join the reaper, then flush every shard's AOF so EverySec
465        // users don't lose the last sub-second of writes.
466        if let Some(stop) = &self.reaper_stop {
467            stop.store(true, Ordering::Relaxed);
468        }
469        if let Some(j) = self
470            .reaper_join
471            .lock()
472            .unwrap_or_else(|p| p.into_inner())
473            .take()
474        {
475            let _ = j.join();
476        }
477        for shard in self.shards_for_flush.iter() {
478            let mut g = lock_write(shard);
479            if let Some(aof) = &mut g.aof {
480                let _ = aof.maybe_sync();
481            }
482        }
483    }
484}
485
486#[cfg(test)]
487#[path = "store_tests.rs"]
488mod tests;
489#[cfg(test)]
490#[path = "store_tests_shard.rs"]
491mod tests_shard;