Skip to main content

Store

Struct Store 

Source
pub struct Store { /* private fields */ }
Expand description

The embedded keyspace.

Store is Clone (since v1.1.0). A clone is a cheap Arc bump: every clone reaches the same underlying shards + AOF + reaper + pub/sub bus. The reaper thread is joined and each shard’s AOF is flushed exactly once, when the last clone is dropped.

use kevy_embedded::{Config, Store};

let s = Store::open(Config::default().with_ttl_reaper_manual())?;
let s2 = s.clone();
std::thread::spawn(move || {
    s2.set(b"from-thread", b"v").unwrap();
}).join().unwrap();
assert_eq!(s.get(b"from-thread")?, Some(b"v".to_vec()));

Every method takes &self. Sharding (see Config::with_shards) lets a multi-threaded consumer scale across cores; pub/sub is process-wide (handled on shard 0).

Implementations§

Source§

impl Store

Source

pub fn info(&self) -> KevyInfo

One-shot snapshot of the store’s introspection counters. See KevyInfo. Takes the embedded mutex once; safe to call from a health endpoint.

Source

pub fn expire_pending_count(&self) -> usize

Number of live keys that currently carry a TTL (the expire-set size, summed across shards).

Source

pub fn ttl(&self, key: &[u8]) -> Option<Duration>

Remaining TTL for key as a Duration, or None when the key is absent or has no TTL (persistent). For the raw Redis PTTL sentinels (-2 no key, -1 no TTL) use Store::ttl_ms.

Source§

impl Store

Source

pub fn set(&self, key: &[u8], value: &[u8]) -> Result<bool>

SET key value (no TTL, no NX/XX). Returns true always under the embedded API (Redis semantics: SET overwrites; NX/XX vetoes would return false but we don’t expose those here — use Store::with for the full surface).

Examples found in repository?
examples/embedded.rs (line 9)
7fn main() -> std::io::Result<()> {
8    let s = Store::open(Config::default())?;
9    s.set(b"greeting", b"hello, kevy")?;
10    let v = s.get(b"greeting")?;
11    println!("greeting = {}", String::from_utf8_lossy(v.as_deref().unwrap_or(b"<missing>")));
12    println!("dbsize = {}", s.dbsize());
13    Ok(())
14}
More examples
Hide additional examples
examples/bench_embed.rs (line 19)
16fn bench(label: &str, store: &Store, n: usize, keys: &[Vec<u8>]) {
17    // Warm the keyspace so GET is all hits and allocations are amortized.
18    for k in keys {
19        store.set(k, VAL).unwrap();
20    }
21
22    let t = Instant::now();
23    for i in 0..n {
24        store.set(&keys[i % KEYS], VAL).unwrap();
25    }
26    let set_s = t.elapsed().as_secs_f64();
27
28    let t = Instant::now();
29    let mut hits = 0usize;
30    for i in 0..n {
31        if store.get(&keys[i % KEYS]).unwrap().is_some() {
32            hits += 1;
33        }
34    }
35    let get_s = t.elapsed().as_secs_f64();
36    std::hint::black_box(hits);
37
38    println!(
39        "[{label:<13}] SET {:>10.0} ops/s   GET {:>10.0} ops/s",
40        n as f64 / set_s,
41        n as f64 / get_s
42    );
43}
examples/embedded-cache.rs (line 19)
9fn main() -> std::io::Result<()> {
10    let s = Store::open(
11        Config::default()
12            .with_max_memory(200 * 1024)
13            .with_eviction(EvictionPolicy::AllKeysLru),
14    )?;
15
16    for i in 0..10_000 {
17        let key = format!("user:{i:05}");
18        let val = format!("user-payload-{i}");
19        s.set(key.as_bytes(), val.as_bytes())?;
20    }
21
22    println!("dbsize after insert flood: {}", s.dbsize());
23    println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24    println!("evictions_total: {}", s.evictions_total());
25
26    // Touch a recent key — it should still be live.
27    let recent = format!("user:0{}", 9999);
28    println!(
29        "user:09999 → {:?}",
30        s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31    );
32
33    Ok(())
34}
examples/bench_embed_mt.rs (line 30)
17fn run(label: &str, store: &Store, threads: usize, n_per: usize, keys: &[Vec<u8>], write: bool) {
18    let start = Instant::now();
19    let handles: Vec<_> = (0..threads)
20        .map(|tid| {
21            let s = store.clone(); // Arc bump → same inner / same lock
22            let keys = keys.to_vec();
23            thread::spawn(move || {
24                let mut acc = 0usize;
25                for i in 0..n_per {
26                    // De-correlate threads' key streams so they don't all hit
27                    // the same key/bucket in lockstep.
28                    let k = &keys[(i.wrapping_mul(31).wrapping_add(tid * 7)) % KEYS];
29                    if write {
30                        s.set(k, VAL).unwrap();
31                        acc += 1;
32                    } else if s.get(k).unwrap().is_some() {
33                        acc += 1;
34                    }
35                }
36                acc
37            })
38        })
39        .collect();
40    for h in handles {
41        h.join().unwrap();
42    }
43    let secs = start.elapsed().as_secs_f64();
44    let total = (threads * n_per) as f64;
45    println!(
46        "[{label:<10}] threads={threads:2}  {:>10.0} ops/s  ({:>5.1}M)",
47        total / secs,
48        total / secs / 1e6
49    );
50}
51
52fn main() {
53    let n_per: usize = std::env::var("KEVY_BENCH_N")
54        .ok()
55        .and_then(|s| s.parse().ok())
56        .unwrap_or(2_000_000);
57    let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
58    let shards: usize = std::env::var("KEVY_SHARDS")
59        .ok()
60        .and_then(|s| s.parse().ok())
61        .unwrap_or(1);
62
63    // Shared in-memory store (no AOF) — isolate the lock/keyspace from disk.
64    let store = Store::open(Config::default().with_shards(shards).with_ttl_reaper_manual()).unwrap();
65    for k in &keys {
66        store.set(k, VAL).unwrap();
67    }
68
69    println!(
70        "kevy-embedded MULTI-THREAD throughput — in-memory, shards={shards}, {KEYS} keys, {}B val, n={n_per}/thread",
71        VAL.len()
72    );
73    for &t in &[1usize, 2, 4, 8, 10] {
74        run("GET", &store, t, n_per, &keys, false);
75    }
76    for &t in &[1usize, 2, 4, 8, 10] {
77        run("SET", &store, t, n_per, &keys, true);
78    }
79}
Source

pub fn set_with_ttl( &self, key: &[u8], value: &[u8], ttl: Duration, ) -> Result<bool>

SET key value PX ms — overwrites + sets TTL. The AOF records an absolute PEXPIREAT deadline (not the relative ttl) so the key expires at the same wall-clock instant after a restart — a relative PEXPIRE would be re-anchored to replay-time, resetting the TTL to a fresh full duration on every restart (INC-2026-06-09).

Examples found in repository?
examples/bench_embed.rs (line 106)
97fn bench_ttl_get(label: &str, manual_reaper: bool, n: usize, keys: &[Vec<u8>]) {
98    let cfg = if manual_reaper {
99        Config::default().with_ttl_reaper_manual()
100    } else {
101        Config::default() // background reaper (default) → cached clock trusted
102    };
103    let store = Store::open(cfg).unwrap();
104    let ttl = std::time::Duration::from_secs(3600); // never expires during the run
105    for k in keys {
106        store.set_with_ttl(k, VAL, ttl).unwrap();
107    }
108    let t = Instant::now();
109    let mut hits = 0usize;
110    for i in 0..n {
111        if store.get(&keys[i % KEYS]).unwrap().is_some() {
112            hits += 1;
113        }
114    }
115    std::hint::black_box(hits);
116    println!("[{label}] GET {:>10.0} ops/s", n as f64 / t.elapsed().as_secs_f64());
117}
Source

pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>

GET keySome(bytes) on hit, None on miss or expired.

With eviction off (maxmemory == 0, the default) this takes the read lock and a non-mutating store lookup, so concurrent readers scale across cores — the path a read-heavy embed cache lives on. With eviction on it falls back to the exclusive lock + mutating get so each access still stamps the LRU clock.

Examples found in repository?
examples/embedded.rs (line 10)
7fn main() -> std::io::Result<()> {
8    let s = Store::open(Config::default())?;
9    s.set(b"greeting", b"hello, kevy")?;
10    let v = s.get(b"greeting")?;
11    println!("greeting = {}", String::from_utf8_lossy(v.as_deref().unwrap_or(b"<missing>")));
12    println!("dbsize = {}", s.dbsize());
13    Ok(())
14}
More examples
Hide additional examples
examples/bench_embed.rs (line 31)
16fn bench(label: &str, store: &Store, n: usize, keys: &[Vec<u8>]) {
17    // Warm the keyspace so GET is all hits and allocations are amortized.
18    for k in keys {
19        store.set(k, VAL).unwrap();
20    }
21
22    let t = Instant::now();
23    for i in 0..n {
24        store.set(&keys[i % KEYS], VAL).unwrap();
25    }
26    let set_s = t.elapsed().as_secs_f64();
27
28    let t = Instant::now();
29    let mut hits = 0usize;
30    for i in 0..n {
31        if store.get(&keys[i % KEYS]).unwrap().is_some() {
32            hits += 1;
33        }
34    }
35    let get_s = t.elapsed().as_secs_f64();
36    std::hint::black_box(hits);
37
38    println!(
39        "[{label:<13}] SET {:>10.0} ops/s   GET {:>10.0} ops/s",
40        n as f64 / set_s,
41        n as f64 / get_s
42    );
43}
44
45fn main() {
46    let n: usize = std::env::var("KEVY_BENCH_N")
47        .ok()
48        .and_then(|s| s.parse().ok())
49        .unwrap_or(2_000_000);
50    // Keys precomputed outside the timed loop so `format!`/alloc cost isn't
51    // attributed to kevy.
52    let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
53
54    println!("kevy-embedded in-process throughput — single thread, n={n}, {KEYS} keys, {}B val", VAL.len());
55
56    let s1 = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
57    bench("in-memory", &s1, n, &keys);
58
59    let dir2 = std::env::temp_dir().join("kevy_embed_bench_everysec");
60    let _ = std::fs::remove_dir_all(&dir2);
61    let s2 = Store::open(
62        Config::default()
63            .with_persist(&dir2)
64            .with_ttl_reaper_manual()
65            .with_appendfsync(AppendFsync::EverySec),
66    )
67    .unwrap();
68    bench("aof-everysec", &s2, n, &keys);
69
70    let dir3 = std::env::temp_dir().join("kevy_embed_bench_always");
71    let _ = std::fs::remove_dir_all(&dir3);
72    let s3 = Store::open(
73        Config::default()
74            .with_persist(&dir3)
75            .with_ttl_reaper_manual()
76            .with_appendfsync(AppendFsync::Always),
77    )
78    .unwrap();
79    // Always-fsync is one fdatasync per write (no group commit on the embedded
80    // single-op path) — fsync-rate-bound, so run far fewer ops to stay bounded.
81    bench("aof-always", &s3, (n / 20).max(50_000), &keys);
82
83    // TTL'd-key GET: the mailrs path (every cache key has a TTL). With the
84    // background reaper the cached clock is trusted (no per-get Instant::now);
85    // manual mode reads a fresh clock per get — the gap is the cached-clock win.
86    bench_ttl_get("ttl GET (cached clk)", false, n, &keys); // background reaper
87    bench_ttl_get("ttl GET (fresh clk) ", true, n, &keys); // manual reaper
88
89    drop((s1, s2, s3));
90    let _ = std::fs::remove_dir_all(&dir2);
91    let _ = std::fs::remove_dir_all(&dir3);
92}
93
94/// GET throughput over keys that all carry a (long) TTL — the mailrs cache
95/// shape. `manual_reaper` toggles whether the store trusts the cached clock
96/// (background) or reads a fresh clock per get (manual).
97fn bench_ttl_get(label: &str, manual_reaper: bool, n: usize, keys: &[Vec<u8>]) {
98    let cfg = if manual_reaper {
99        Config::default().with_ttl_reaper_manual()
100    } else {
101        Config::default() // background reaper (default) → cached clock trusted
102    };
103    let store = Store::open(cfg).unwrap();
104    let ttl = std::time::Duration::from_secs(3600); // never expires during the run
105    for k in keys {
106        store.set_with_ttl(k, VAL, ttl).unwrap();
107    }
108    let t = Instant::now();
109    let mut hits = 0usize;
110    for i in 0..n {
111        if store.get(&keys[i % KEYS]).unwrap().is_some() {
112            hits += 1;
113        }
114    }
115    std::hint::black_box(hits);
116    println!("[{label}] GET {:>10.0} ops/s", n as f64 / t.elapsed().as_secs_f64());
117}
examples/embedded-cache.rs (line 30)
9fn main() -> std::io::Result<()> {
10    let s = Store::open(
11        Config::default()
12            .with_max_memory(200 * 1024)
13            .with_eviction(EvictionPolicy::AllKeysLru),
14    )?;
15
16    for i in 0..10_000 {
17        let key = format!("user:{i:05}");
18        let val = format!("user-payload-{i}");
19        s.set(key.as_bytes(), val.as_bytes())?;
20    }
21
22    println!("dbsize after insert flood: {}", s.dbsize());
23    println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24    println!("evictions_total: {}", s.evictions_total());
25
26    // Touch a recent key — it should still be live.
27    let recent = format!("user:0{}", 9999);
28    println!(
29        "user:09999 → {:?}",
30        s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31    );
32
33    Ok(())
34}
examples/bench_embed_mt.rs (line 32)
17fn run(label: &str, store: &Store, threads: usize, n_per: usize, keys: &[Vec<u8>], write: bool) {
18    let start = Instant::now();
19    let handles: Vec<_> = (0..threads)
20        .map(|tid| {
21            let s = store.clone(); // Arc bump → same inner / same lock
22            let keys = keys.to_vec();
23            thread::spawn(move || {
24                let mut acc = 0usize;
25                for i in 0..n_per {
26                    // De-correlate threads' key streams so they don't all hit
27                    // the same key/bucket in lockstep.
28                    let k = &keys[(i.wrapping_mul(31).wrapping_add(tid * 7)) % KEYS];
29                    if write {
30                        s.set(k, VAL).unwrap();
31                        acc += 1;
32                    } else if s.get(k).unwrap().is_some() {
33                        acc += 1;
34                    }
35                }
36                acc
37            })
38        })
39        .collect();
40    for h in handles {
41        h.join().unwrap();
42    }
43    let secs = start.elapsed().as_secs_f64();
44    let total = (threads * n_per) as f64;
45    println!(
46        "[{label:<10}] threads={threads:2}  {:>10.0} ops/s  ({:>5.1}M)",
47        total / secs,
48        total / secs / 1e6
49    );
50}
Source

pub fn del(&self, keys: &[&[u8]]) -> Result<usize>

DEL key1 [key2 ...]. Returns the count of keys actually removed. Keys fan out to their owning shards.

Source

pub fn exists(&self, keys: &[&[u8]]) -> Result<usize>

EXISTS key1 [key2 ...]. Count of existing keys (duplicates counted multiple times, matching Redis).

Source

pub fn incr(&self, key: &[u8]) -> Result<i64>

INCR key. Returns the post-increment value.

Source

pub fn incr_by(&self, key: &[u8], delta: i64) -> Result<i64>

INCRBY key delta. Negative delta does DECR-style work.

Source

pub fn expire(&self, key: &[u8], ttl: Duration) -> Result<bool>

EXPIRE key seconds. Returns true if a key was touched. The AOF records an absolute PEXPIREAT deadline (see Self::set_with_ttl) so the TTL survives a restart unchanged.

Source

pub fn persist(&self, key: &[u8]) -> Result<bool>

PERSIST key. Returns true if a TTL was actually cleared.

Source

pub fn ttl_ms(&self, key: &[u8]) -> i64

Remaining TTL in ms (or Redis-style -1/-2 for no-TTL/no-key).

Source

pub fn type_of(&self, key: &[u8]) -> &'static str

TYPE key"string", "hash", "list", "set", "zset", or "none".

Source

pub fn dbsize(&self) -> usize

DBSIZE — total live keys across all shards.

Examples found in repository?
examples/embedded.rs (line 12)
7fn main() -> std::io::Result<()> {
8    let s = Store::open(Config::default())?;
9    s.set(b"greeting", b"hello, kevy")?;
10    let v = s.get(b"greeting")?;
11    println!("greeting = {}", String::from_utf8_lossy(v.as_deref().unwrap_or(b"<missing>")));
12    println!("dbsize = {}", s.dbsize());
13    Ok(())
14}
More examples
Hide additional examples
examples/embedded-cache.rs (line 22)
9fn main() -> std::io::Result<()> {
10    let s = Store::open(
11        Config::default()
12            .with_max_memory(200 * 1024)
13            .with_eviction(EvictionPolicy::AllKeysLru),
14    )?;
15
16    for i in 0..10_000 {
17        let key = format!("user:{i:05}");
18        let val = format!("user-payload-{i}");
19        s.set(key.as_bytes(), val.as_bytes())?;
20    }
21
22    println!("dbsize after insert flood: {}", s.dbsize());
23    println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24    println!("evictions_total: {}", s.evictions_total());
25
26    // Touch a recent key — it should still be live.
27    let recent = format!("user:0{}", 9999);
28    println!(
29        "user:09999 → {:?}",
30        s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31    );
32
33    Ok(())
34}
examples/replay_real_aof.rs (line 40)
14fn main() {
15    let path = std::env::args()
16        .nth(1)
17        .expect("usage: replay_real_aof <aof-path>");
18    let src = PathBuf::from(&path);
19    let bytes = std::fs::metadata(&src)
20        .map(|m| m.len())
21        .unwrap_or(0);
22    println!("reproducer: staging {} bytes from {}", bytes, src.display());
23
24    let dir = std::env::temp_dir().join(format!(
25        "kevy-aof-reproducer-{}",
26        std::process::id()
27    ));
28    std::fs::create_dir_all(&dir).expect("create temp dir");
29    let staged = dir.join("aof-0.aof");
30    std::fs::copy(&src, &staged).expect("copy AOF into staging dir");
31
32    println!("reproducer: opening Store from {}", dir.display());
33    let store = Store::open(
34        Config::default()
35            .with_persist(&dir)
36            .with_aof_filename("aof-0.aof"),
37    )
38    .expect("Store::open");
39
40    println!("reproducer: opened OK; dbsize = {}", store.dbsize());
41}
Source

pub fn flush(&self) -> Result<()>

FLUSHALL — empty every shard (each logs FLUSHALL so a replay reaches the same empty state).

Source

pub fn key_bytes(&self, key: &[u8]) -> Option<u64>

MEMORY USAGE for one key — Some(bytes) or None if absent.

Source

pub fn used_memory(&self) -> u64

Live used_memory estimate (summed across shards).

Examples found in repository?
examples/embedded-cache.rs (line 23)
9fn main() -> std::io::Result<()> {
10    let s = Store::open(
11        Config::default()
12            .with_max_memory(200 * 1024)
13            .with_eviction(EvictionPolicy::AllKeysLru),
14    )?;
15
16    for i in 0..10_000 {
17        let key = format!("user:{i:05}");
18        let val = format!("user-payload-{i}");
19        s.set(key.as_bytes(), val.as_bytes())?;
20    }
21
22    println!("dbsize after insert flood: {}", s.dbsize());
23    println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24    println!("evictions_total: {}", s.evictions_total());
25
26    // Touch a recent key — it should still be live.
27    let recent = format!("user:0{}", 9999);
28    println!(
29        "user:09999 → {:?}",
30        s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31    );
32
33    Ok(())
34}
Source

pub fn evictions_total(&self) -> u64

INFO-style counter: total keys evicted by maxmemory (all shards).

Examples found in repository?
examples/embedded-cache.rs (line 24)
9fn main() -> std::io::Result<()> {
10    let s = Store::open(
11        Config::default()
12            .with_max_memory(200 * 1024)
13            .with_eviction(EvictionPolicy::AllKeysLru),
14    )?;
15
16    for i in 0..10_000 {
17        let key = format!("user:{i:05}");
18        let val = format!("user-payload-{i}");
19        s.set(key.as_bytes(), val.as_bytes())?;
20    }
21
22    println!("dbsize after insert flood: {}", s.dbsize());
23    println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24    println!("evictions_total: {}", s.evictions_total());
25
26    // Touch a recent key — it should still be live.
27    let recent = format!("user:0{}", 9999);
28    println!(
29        "user:09999 → {:?}",
30        s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31    );
32
33    Ok(())
34}
Source

pub fn expired_keys_total(&self) -> u64

INFO-style counter: total keys expired (lazy + active reaper, all shards).

Source

pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> Result<usize>

HSET key field value [field value ...]. Returns count newly added.

Source

pub fn hget(&self, key: &[u8], field: &[u8]) -> Result<Option<Vec<u8>>>

HGET key field. None if absent.

Source

pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> Result<usize>

HDEL key field [field ...]. Returns count actually removed.

Source

pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> Result<usize>

LPUSH key value [value ...]. Returns the new list length.

Source

pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> Result<usize>

RPUSH key value [value ...]. Returns the new list length.

Source

pub fn lpop(&self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>>

LPOP key count. Returns popped values from the head.

Source

pub fn rpop(&self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>>

RPOP key count. Symmetric to LPOP from the tail.

Source

pub fn llen(&self, key: &[u8]) -> Result<usize>

LLEN key. Length of the list at key; 0 if absent.

Source

pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>

SADD key member [member ...]. Returns count newly added.

Source

pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>

SREM key member [member ...]. Returns count actually removed.

Source

pub fn smembers(&self, key: &[u8]) -> Result<Vec<Vec<u8>>>

SMEMBERS key. Order implementation-defined; empty if absent.

Source

pub fn scard(&self, key: &[u8]) -> Result<usize>

SCARD key. Member count; 0 if absent.

Source

pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> Result<usize>

ZADD key score member [score member ...]. Returns count newly added.

Source

pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>

ZREM key member [member ...]. Returns count actually removed.

Source

pub fn zscore(&self, key: &[u8], member: &[u8]) -> Result<Option<f64>>

ZSCORE key member. Some(score) if present.

Source

pub fn zcard(&self, key: &[u8]) -> Result<usize>

ZCARD key. Member count; 0 if absent.

Source

pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize

PUBLISH channel payload. Delivers payload to every subscriber on channel (direct + pattern matches) inside this process. Returns the count of receivers the message reached.

Source

pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription

Open a Subscription subscribed to channels. Drop the handle to unsubscribe from everything atomically. Pass &[] to start with no subscriptions and add some later via Subscription::subscribe / Subscription::psubscribe.

Source

pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription

Convenience: open a Subscription starting on pattern subscriptions.

Source§

impl Store

Source

pub fn open(config: Config) -> Result<Self>

Open an embedded keyspace per config.

  • Pure in-memory when config.data_dir is None.
  • With persistence: each shard loads its snapshot then replays its AOF (config.shards > 1 re-shards a legacy single AOF on first open).
  • Spawns a background TTL reaper thread when config.ttl_reaper == Background (the default).
Examples found in repository?
examples/embedded.rs (line 8)
7fn main() -> std::io::Result<()> {
8    let s = Store::open(Config::default())?;
9    s.set(b"greeting", b"hello, kevy")?;
10    let v = s.get(b"greeting")?;
11    println!("greeting = {}", String::from_utf8_lossy(v.as_deref().unwrap_or(b"<missing>")));
12    println!("dbsize = {}", s.dbsize());
13    Ok(())
14}
More examples
Hide additional examples
examples/embedded-cache.rs (lines 10-14)
9fn main() -> std::io::Result<()> {
10    let s = Store::open(
11        Config::default()
12            .with_max_memory(200 * 1024)
13            .with_eviction(EvictionPolicy::AllKeysLru),
14    )?;
15
16    for i in 0..10_000 {
17        let key = format!("user:{i:05}");
18        let val = format!("user-payload-{i}");
19        s.set(key.as_bytes(), val.as_bytes())?;
20    }
21
22    println!("dbsize after insert flood: {}", s.dbsize());
23    println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24    println!("evictions_total: {}", s.evictions_total());
25
26    // Touch a recent key — it should still be live.
27    let recent = format!("user:0{}", 9999);
28    println!(
29        "user:09999 → {:?}",
30        s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31    );
32
33    Ok(())
34}
examples/replay_real_aof.rs (lines 33-37)
14fn main() {
15    let path = std::env::args()
16        .nth(1)
17        .expect("usage: replay_real_aof <aof-path>");
18    let src = PathBuf::from(&path);
19    let bytes = std::fs::metadata(&src)
20        .map(|m| m.len())
21        .unwrap_or(0);
22    println!("reproducer: staging {} bytes from {}", bytes, src.display());
23
24    let dir = std::env::temp_dir().join(format!(
25        "kevy-aof-reproducer-{}",
26        std::process::id()
27    ));
28    std::fs::create_dir_all(&dir).expect("create temp dir");
29    let staged = dir.join("aof-0.aof");
30    std::fs::copy(&src, &staged).expect("copy AOF into staging dir");
31
32    println!("reproducer: opening Store from {}", dir.display());
33    let store = Store::open(
34        Config::default()
35            .with_persist(&dir)
36            .with_aof_filename("aof-0.aof"),
37    )
38    .expect("Store::open");
39
40    println!("reproducer: opened OK; dbsize = {}", store.dbsize());
41}
examples/bench_embed_mt.rs (line 64)
52fn main() {
53    let n_per: usize = std::env::var("KEVY_BENCH_N")
54        .ok()
55        .and_then(|s| s.parse().ok())
56        .unwrap_or(2_000_000);
57    let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
58    let shards: usize = std::env::var("KEVY_SHARDS")
59        .ok()
60        .and_then(|s| s.parse().ok())
61        .unwrap_or(1);
62
63    // Shared in-memory store (no AOF) — isolate the lock/keyspace from disk.
64    let store = Store::open(Config::default().with_shards(shards).with_ttl_reaper_manual()).unwrap();
65    for k in &keys {
66        store.set(k, VAL).unwrap();
67    }
68
69    println!(
70        "kevy-embedded MULTI-THREAD throughput — in-memory, shards={shards}, {KEYS} keys, {}B val, n={n_per}/thread",
71        VAL.len()
72    );
73    for &t in &[1usize, 2, 4, 8, 10] {
74        run("GET", &store, t, n_per, &keys, false);
75    }
76    for &t in &[1usize, 2, 4, 8, 10] {
77        run("SET", &store, t, n_per, &keys, true);
78    }
79}
examples/bench_embed.rs (line 56)
45fn main() {
46    let n: usize = std::env::var("KEVY_BENCH_N")
47        .ok()
48        .and_then(|s| s.parse().ok())
49        .unwrap_or(2_000_000);
50    // Keys precomputed outside the timed loop so `format!`/alloc cost isn't
51    // attributed to kevy.
52    let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
53
54    println!("kevy-embedded in-process throughput — single thread, n={n}, {KEYS} keys, {}B val", VAL.len());
55
56    let s1 = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
57    bench("in-memory", &s1, n, &keys);
58
59    let dir2 = std::env::temp_dir().join("kevy_embed_bench_everysec");
60    let _ = std::fs::remove_dir_all(&dir2);
61    let s2 = Store::open(
62        Config::default()
63            .with_persist(&dir2)
64            .with_ttl_reaper_manual()
65            .with_appendfsync(AppendFsync::EverySec),
66    )
67    .unwrap();
68    bench("aof-everysec", &s2, n, &keys);
69
70    let dir3 = std::env::temp_dir().join("kevy_embed_bench_always");
71    let _ = std::fs::remove_dir_all(&dir3);
72    let s3 = Store::open(
73        Config::default()
74            .with_persist(&dir3)
75            .with_ttl_reaper_manual()
76            .with_appendfsync(AppendFsync::Always),
77    )
78    .unwrap();
79    // Always-fsync is one fdatasync per write (no group commit on the embedded
80    // single-op path) — fsync-rate-bound, so run far fewer ops to stay bounded.
81    bench("aof-always", &s3, (n / 20).max(50_000), &keys);
82
83    // TTL'd-key GET: the mailrs path (every cache key has a TTL). With the
84    // background reaper the cached clock is trusted (no per-get Instant::now);
85    // manual mode reads a fresh clock per get — the gap is the cached-clock win.
86    bench_ttl_get("ttl GET (cached clk)", false, n, &keys); // background reaper
87    bench_ttl_get("ttl GET (fresh clk) ", true, n, &keys); // manual reaper
88
89    drop((s1, s2, s3));
90    let _ = std::fs::remove_dir_all(&dir2);
91    let _ = std::fs::remove_dir_all(&dir3);
92}
93
94/// GET throughput over keys that all carry a (long) TTL — the mailrs cache
95/// shape. `manual_reaper` toggles whether the store trusts the cached clock
96/// (background) or reads a fresh clock per get (manual).
97fn bench_ttl_get(label: &str, manual_reaper: bool, n: usize, keys: &[Vec<u8>]) {
98    let cfg = if manual_reaper {
99        Config::default().with_ttl_reaper_manual()
100    } else {
101        Config::default() // background reaper (default) → cached clock trusted
102    };
103    let store = Store::open(cfg).unwrap();
104    let ttl = std::time::Duration::from_secs(3600); // never expires during the run
105    for k in keys {
106        store.set_with_ttl(k, VAL, ttl).unwrap();
107    }
108    let t = Instant::now();
109    let mut hits = 0usize;
110    for i in 0..n {
111        if store.get(&keys[i % KEYS]).unwrap().is_some() {
112            hits += 1;
113        }
114    }
115    std::hint::black_box(hits);
116    println!("[{label}] GET {:>10.0} ops/s", n as f64 / t.elapsed().as_secs_f64());
117}
Source

pub fn downgrade(&self) -> WeakStore

Get a weak handle that does not keep the keyspace alive.

Source

pub fn config(&self) -> &Config

The active config (a clone — modifying it has no effect on the running store). Useful for introspection / INFO-style telemetry.

Source

pub fn with<F, R>(&self, f: F) -> R
where F: FnOnce(&mut Store) -> R,

Run f against the underlying kevy_store::Store under its lock. Use for direct access to methods this crate hasn’t wrapped. The closure can mutate, but does not auto-log to the AOF — call Self::log yourself if the mutation must survive a crash.

Sharded stores: this targets shard 0 only. Use Self::with_key to reach the shard owning a specific key.

Source

pub fn with_key<F, R>(&self, key: &[u8], f: F) -> R
where F: FnOnce(&mut Store) -> R,

Like Self::with but targets the shard that owns key.

Source

pub fn collect_keys( &self, pattern: Option<&[u8]>, limit: Option<usize>, ) -> Vec<Vec<u8>>

KEYS / SCAN-glob across every shard — the cross-shard replacement for with(|s| s.collect_keys(pat, lim)), which only sees shard 0 once sharding is on. Behaves identically to with(...) when shard_count() == 1. limit bounds the total returned across shards. Takes a read lock per shard (concurrent-safe).

Source

pub fn for_each_shard<F: FnMut(&mut Store)>(&self, f: F)

Run f against each shard’s underlying kevy_store::Store (in shard-index order) — the cross-shard escape hatch. The caller assembles the merged result. Pairs with Self::shard_count. For a single key, prefer Self::with_key; for a glob scan, prefer Self::collect_keys.

Source

pub fn shard_count(&self) -> usize

Number of keyspace shards (== Config::shards).

Source

pub fn log(&self, parts: &[&[u8]]) -> Result<()>

Append a raw RESP-frame argument list to the shard owning its key’s AOF. No-op when persistence is disabled.

Source

pub fn tick(&self) -> ExpireStats

Run one TTL-reaper tick across every shard. Required call cadence in Manual mode (~10×/s to match Redis hz=10). Returns the summed stats.

Source

pub fn rewrite_aof(&self) -> Result<Option<RewriteStats>>

BGREWRITEAOF: rebuild every shard’s AOF from current state. Synchronous. Returns the summed stats (None if persistence is off / no shard rewrote).

Source

pub fn save_snapshot(&self) -> Result<bool>

Snapshot every shard to its dump-{i}.rdb (single shard: the configured name), atomically. Ok(false) when persistence is disabled.

Trait Implementations§

Source§

impl Clone for Store

Source§

fn clone(&self) -> Store

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl !RefUnwindSafe for Store

§

impl !UnwindSafe for Store

§

impl Freeze for Store

§

impl Send for Store

§

impl Sync for Store

§

impl Unpin for Store

§

impl UnsafeUnpin for Store

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.