pub struct Store { /* private fields */ }Expand description
The embedded keyspace.
Store is Clone (since v1.1.0). A clone is a cheap Arc bump:
every clone reaches the same underlying shards + AOF + reaper + pub/sub
bus. The reaper thread is joined and each shard’s AOF is flushed exactly
once, when the last clone is dropped.
use kevy_embedded::{Config, Store};
let s = Store::open(Config::default().with_ttl_reaper_manual())?;
let s2 = s.clone();
std::thread::spawn(move || {
s2.set(b"from-thread", b"v").unwrap();
}).join().unwrap();
assert_eq!(s.get(b"from-thread")?, Some(b"v".to_vec()));Every method takes &self. Sharding (see Config::with_shards) lets a
multi-threaded consumer scale across cores; pub/sub is process-wide
(handled on shard 0).
Implementations§
Source§impl Store
impl Store
Sourcepub fn info(&self) -> KevyInfo
pub fn info(&self) -> KevyInfo
One-shot snapshot of the store’s introspection counters. See
KevyInfo. Takes the embedded mutex once; safe to call from a
health endpoint.
Sourcepub fn expire_pending_count(&self) -> usize
pub fn expire_pending_count(&self) -> usize
Number of live keys that currently carry a TTL (the expire-set size, summed across shards).
Source§impl Store
impl Store
Sourcepub fn set(&self, key: &[u8], value: &[u8]) -> Result<bool>
pub fn set(&self, key: &[u8], value: &[u8]) -> Result<bool>
SET key value (no TTL, no NX/XX). Returns true always under the
embedded API (Redis semantics: SET overwrites; NX/XX vetoes would
return false but we don’t expose those here — use Store::with
for the full surface).
Examples found in repository?
More examples
16fn bench(label: &str, store: &Store, n: usize, keys: &[Vec<u8>]) {
17 // Warm the keyspace so GET is all hits and allocations are amortized.
18 for k in keys {
19 store.set(k, VAL).unwrap();
20 }
21
22 let t = Instant::now();
23 for i in 0..n {
24 store.set(&keys[i % KEYS], VAL).unwrap();
25 }
26 let set_s = t.elapsed().as_secs_f64();
27
28 let t = Instant::now();
29 let mut hits = 0usize;
30 for i in 0..n {
31 if store.get(&keys[i % KEYS]).unwrap().is_some() {
32 hits += 1;
33 }
34 }
35 let get_s = t.elapsed().as_secs_f64();
36 std::hint::black_box(hits);
37
38 println!(
39 "[{label:<13}] SET {:>10.0} ops/s GET {:>10.0} ops/s",
40 n as f64 / set_s,
41 n as f64 / get_s
42 );
43}9fn main() -> std::io::Result<()> {
10 let s = Store::open(
11 Config::default()
12 .with_max_memory(200 * 1024)
13 .with_eviction(EvictionPolicy::AllKeysLru),
14 )?;
15
16 for i in 0..10_000 {
17 let key = format!("user:{i:05}");
18 let val = format!("user-payload-{i}");
19 s.set(key.as_bytes(), val.as_bytes())?;
20 }
21
22 println!("dbsize after insert flood: {}", s.dbsize());
23 println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24 println!("evictions_total: {}", s.evictions_total());
25
26 // Touch a recent key — it should still be live.
27 let recent = format!("user:0{}", 9999);
28 println!(
29 "user:09999 → {:?}",
30 s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31 );
32
33 Ok(())
34}17fn run(label: &str, store: &Store, threads: usize, n_per: usize, keys: &[Vec<u8>], write: bool) {
18 let start = Instant::now();
19 let handles: Vec<_> = (0..threads)
20 .map(|tid| {
21 let s = store.clone(); // Arc bump → same inner / same lock
22 let keys = keys.to_vec();
23 thread::spawn(move || {
24 let mut acc = 0usize;
25 for i in 0..n_per {
26 // De-correlate threads' key streams so they don't all hit
27 // the same key/bucket in lockstep.
28 let k = &keys[(i.wrapping_mul(31).wrapping_add(tid * 7)) % KEYS];
29 if write {
30 s.set(k, VAL).unwrap();
31 acc += 1;
32 } else if s.get(k).unwrap().is_some() {
33 acc += 1;
34 }
35 }
36 acc
37 })
38 })
39 .collect();
40 for h in handles {
41 h.join().unwrap();
42 }
43 let secs = start.elapsed().as_secs_f64();
44 let total = (threads * n_per) as f64;
45 println!(
46 "[{label:<10}] threads={threads:2} {:>10.0} ops/s ({:>5.1}M)",
47 total / secs,
48 total / secs / 1e6
49 );
50}
51
52fn main() {
53 let n_per: usize = std::env::var("KEVY_BENCH_N")
54 .ok()
55 .and_then(|s| s.parse().ok())
56 .unwrap_or(2_000_000);
57 let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
58 let shards: usize = std::env::var("KEVY_SHARDS")
59 .ok()
60 .and_then(|s| s.parse().ok())
61 .unwrap_or(1);
62
63 // Shared in-memory store (no AOF) — isolate the lock/keyspace from disk.
64 let store = Store::open(Config::default().with_shards(shards).with_ttl_reaper_manual()).unwrap();
65 for k in &keys {
66 store.set(k, VAL).unwrap();
67 }
68
69 println!(
70 "kevy-embedded MULTI-THREAD throughput — in-memory, shards={shards}, {KEYS} keys, {}B val, n={n_per}/thread",
71 VAL.len()
72 );
73 for &t in &[1usize, 2, 4, 8, 10] {
74 run("GET", &store, t, n_per, &keys, false);
75 }
76 for &t in &[1usize, 2, 4, 8, 10] {
77 run("SET", &store, t, n_per, &keys, true);
78 }
79}Sourcepub fn set_with_ttl(
&self,
key: &[u8],
value: &[u8],
ttl: Duration,
) -> Result<bool>
pub fn set_with_ttl( &self, key: &[u8], value: &[u8], ttl: Duration, ) -> Result<bool>
SET key value PX ms — overwrites + sets TTL. The AOF records an
absolute PEXPIREAT deadline (not the relative ttl) so the key
expires at the same wall-clock instant after a restart — a relative
PEXPIRE would be re-anchored to replay-time, resetting the TTL to a
fresh full duration on every restart (INC-2026-06-09).
Examples found in repository?
97fn bench_ttl_get(label: &str, manual_reaper: bool, n: usize, keys: &[Vec<u8>]) {
98 let cfg = if manual_reaper {
99 Config::default().with_ttl_reaper_manual()
100 } else {
101 Config::default() // background reaper (default) → cached clock trusted
102 };
103 let store = Store::open(cfg).unwrap();
104 let ttl = std::time::Duration::from_secs(3600); // never expires during the run
105 for k in keys {
106 store.set_with_ttl(k, VAL, ttl).unwrap();
107 }
108 let t = Instant::now();
109 let mut hits = 0usize;
110 for i in 0..n {
111 if store.get(&keys[i % KEYS]).unwrap().is_some() {
112 hits += 1;
113 }
114 }
115 std::hint::black_box(hits);
116 println!("[{label}] GET {:>10.0} ops/s", n as f64 / t.elapsed().as_secs_f64());
117}Sourcepub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>
GET key — Some(bytes) on hit, None on miss or expired.
With eviction off (maxmemory == 0, the default) this takes the read
lock and a non-mutating store lookup, so concurrent readers scale across
cores — the path a read-heavy embed cache lives on. With eviction on it
falls back to the exclusive lock + mutating get so each access still
stamps the LRU clock.
Examples found in repository?
More examples
16fn bench(label: &str, store: &Store, n: usize, keys: &[Vec<u8>]) {
17 // Warm the keyspace so GET is all hits and allocations are amortized.
18 for k in keys {
19 store.set(k, VAL).unwrap();
20 }
21
22 let t = Instant::now();
23 for i in 0..n {
24 store.set(&keys[i % KEYS], VAL).unwrap();
25 }
26 let set_s = t.elapsed().as_secs_f64();
27
28 let t = Instant::now();
29 let mut hits = 0usize;
30 for i in 0..n {
31 if store.get(&keys[i % KEYS]).unwrap().is_some() {
32 hits += 1;
33 }
34 }
35 let get_s = t.elapsed().as_secs_f64();
36 std::hint::black_box(hits);
37
38 println!(
39 "[{label:<13}] SET {:>10.0} ops/s GET {:>10.0} ops/s",
40 n as f64 / set_s,
41 n as f64 / get_s
42 );
43}
44
45fn main() {
46 let n: usize = std::env::var("KEVY_BENCH_N")
47 .ok()
48 .and_then(|s| s.parse().ok())
49 .unwrap_or(2_000_000);
50 // Keys precomputed outside the timed loop so `format!`/alloc cost isn't
51 // attributed to kevy.
52 let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
53
54 println!("kevy-embedded in-process throughput — single thread, n={n}, {KEYS} keys, {}B val", VAL.len());
55
56 let s1 = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
57 bench("in-memory", &s1, n, &keys);
58
59 let dir2 = std::env::temp_dir().join("kevy_embed_bench_everysec");
60 let _ = std::fs::remove_dir_all(&dir2);
61 let s2 = Store::open(
62 Config::default()
63 .with_persist(&dir2)
64 .with_ttl_reaper_manual()
65 .with_appendfsync(AppendFsync::EverySec),
66 )
67 .unwrap();
68 bench("aof-everysec", &s2, n, &keys);
69
70 let dir3 = std::env::temp_dir().join("kevy_embed_bench_always");
71 let _ = std::fs::remove_dir_all(&dir3);
72 let s3 = Store::open(
73 Config::default()
74 .with_persist(&dir3)
75 .with_ttl_reaper_manual()
76 .with_appendfsync(AppendFsync::Always),
77 )
78 .unwrap();
79 // Always-fsync is one fdatasync per write (no group commit on the embedded
80 // single-op path) — fsync-rate-bound, so run far fewer ops to stay bounded.
81 bench("aof-always", &s3, (n / 20).max(50_000), &keys);
82
83 // TTL'd-key GET: the mailrs path (every cache key has a TTL). With the
84 // background reaper the cached clock is trusted (no per-get Instant::now);
85 // manual mode reads a fresh clock per get — the gap is the cached-clock win.
86 bench_ttl_get("ttl GET (cached clk)", false, n, &keys); // background reaper
87 bench_ttl_get("ttl GET (fresh clk) ", true, n, &keys); // manual reaper
88
89 drop((s1, s2, s3));
90 let _ = std::fs::remove_dir_all(&dir2);
91 let _ = std::fs::remove_dir_all(&dir3);
92}
93
94/// GET throughput over keys that all carry a (long) TTL — the mailrs cache
95/// shape. `manual_reaper` toggles whether the store trusts the cached clock
96/// (background) or reads a fresh clock per get (manual).
97fn bench_ttl_get(label: &str, manual_reaper: bool, n: usize, keys: &[Vec<u8>]) {
98 let cfg = if manual_reaper {
99 Config::default().with_ttl_reaper_manual()
100 } else {
101 Config::default() // background reaper (default) → cached clock trusted
102 };
103 let store = Store::open(cfg).unwrap();
104 let ttl = std::time::Duration::from_secs(3600); // never expires during the run
105 for k in keys {
106 store.set_with_ttl(k, VAL, ttl).unwrap();
107 }
108 let t = Instant::now();
109 let mut hits = 0usize;
110 for i in 0..n {
111 if store.get(&keys[i % KEYS]).unwrap().is_some() {
112 hits += 1;
113 }
114 }
115 std::hint::black_box(hits);
116 println!("[{label}] GET {:>10.0} ops/s", n as f64 / t.elapsed().as_secs_f64());
117}9fn main() -> std::io::Result<()> {
10 let s = Store::open(
11 Config::default()
12 .with_max_memory(200 * 1024)
13 .with_eviction(EvictionPolicy::AllKeysLru),
14 )?;
15
16 for i in 0..10_000 {
17 let key = format!("user:{i:05}");
18 let val = format!("user-payload-{i}");
19 s.set(key.as_bytes(), val.as_bytes())?;
20 }
21
22 println!("dbsize after insert flood: {}", s.dbsize());
23 println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24 println!("evictions_total: {}", s.evictions_total());
25
26 // Touch a recent key — it should still be live.
27 let recent = format!("user:0{}", 9999);
28 println!(
29 "user:09999 → {:?}",
30 s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31 );
32
33 Ok(())
34}17fn run(label: &str, store: &Store, threads: usize, n_per: usize, keys: &[Vec<u8>], write: bool) {
18 let start = Instant::now();
19 let handles: Vec<_> = (0..threads)
20 .map(|tid| {
21 let s = store.clone(); // Arc bump → same inner / same lock
22 let keys = keys.to_vec();
23 thread::spawn(move || {
24 let mut acc = 0usize;
25 for i in 0..n_per {
26 // De-correlate threads' key streams so they don't all hit
27 // the same key/bucket in lockstep.
28 let k = &keys[(i.wrapping_mul(31).wrapping_add(tid * 7)) % KEYS];
29 if write {
30 s.set(k, VAL).unwrap();
31 acc += 1;
32 } else if s.get(k).unwrap().is_some() {
33 acc += 1;
34 }
35 }
36 acc
37 })
38 })
39 .collect();
40 for h in handles {
41 h.join().unwrap();
42 }
43 let secs = start.elapsed().as_secs_f64();
44 let total = (threads * n_per) as f64;
45 println!(
46 "[{label:<10}] threads={threads:2} {:>10.0} ops/s ({:>5.1}M)",
47 total / secs,
48 total / secs / 1e6
49 );
50}Sourcepub fn del(&self, keys: &[&[u8]]) -> Result<usize>
pub fn del(&self, keys: &[&[u8]]) -> Result<usize>
DEL key1 [key2 ...]. Returns the count of keys actually removed.
Keys fan out to their owning shards.
Sourcepub fn exists(&self, keys: &[&[u8]]) -> Result<usize>
pub fn exists(&self, keys: &[&[u8]]) -> Result<usize>
EXISTS key1 [key2 ...]. Count of existing keys (duplicates counted
multiple times, matching Redis).
Sourcepub fn incr_by(&self, key: &[u8], delta: i64) -> Result<i64>
pub fn incr_by(&self, key: &[u8], delta: i64) -> Result<i64>
INCRBY key delta. Negative delta does DECR-style work.
Sourcepub fn expire(&self, key: &[u8], ttl: Duration) -> Result<bool>
pub fn expire(&self, key: &[u8], ttl: Duration) -> Result<bool>
EXPIRE key seconds. Returns true if a key was touched. The AOF
records an absolute PEXPIREAT deadline (see Self::set_with_ttl)
so the TTL survives a restart unchanged.
Sourcepub fn persist(&self, key: &[u8]) -> Result<bool>
pub fn persist(&self, key: &[u8]) -> Result<bool>
PERSIST key. Returns true if a TTL was actually cleared.
Sourcepub fn ttl_ms(&self, key: &[u8]) -> i64
pub fn ttl_ms(&self, key: &[u8]) -> i64
Remaining TTL in ms (or Redis-style -1/-2 for no-TTL/no-key).
Sourcepub fn type_of(&self, key: &[u8]) -> &'static str
pub fn type_of(&self, key: &[u8]) -> &'static str
TYPE key — "string", "hash", "list", "set", "zset", or "none".
Sourcepub fn dbsize(&self) -> usize
pub fn dbsize(&self) -> usize
DBSIZE — total live keys across all shards.
Examples found in repository?
More examples
9fn main() -> std::io::Result<()> {
10 let s = Store::open(
11 Config::default()
12 .with_max_memory(200 * 1024)
13 .with_eviction(EvictionPolicy::AllKeysLru),
14 )?;
15
16 for i in 0..10_000 {
17 let key = format!("user:{i:05}");
18 let val = format!("user-payload-{i}");
19 s.set(key.as_bytes(), val.as_bytes())?;
20 }
21
22 println!("dbsize after insert flood: {}", s.dbsize());
23 println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24 println!("evictions_total: {}", s.evictions_total());
25
26 // Touch a recent key — it should still be live.
27 let recent = format!("user:0{}", 9999);
28 println!(
29 "user:09999 → {:?}",
30 s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31 );
32
33 Ok(())
34}14fn main() {
15 let path = std::env::args()
16 .nth(1)
17 .expect("usage: replay_real_aof <aof-path>");
18 let src = PathBuf::from(&path);
19 let bytes = std::fs::metadata(&src)
20 .map(|m| m.len())
21 .unwrap_or(0);
22 println!("reproducer: staging {} bytes from {}", bytes, src.display());
23
24 let dir = std::env::temp_dir().join(format!(
25 "kevy-aof-reproducer-{}",
26 std::process::id()
27 ));
28 std::fs::create_dir_all(&dir).expect("create temp dir");
29 let staged = dir.join("aof-0.aof");
30 std::fs::copy(&src, &staged).expect("copy AOF into staging dir");
31
32 println!("reproducer: opening Store from {}", dir.display());
33 let store = Store::open(
34 Config::default()
35 .with_persist(&dir)
36 .with_aof_filename("aof-0.aof"),
37 )
38 .expect("Store::open");
39
40 println!("reproducer: opened OK; dbsize = {}", store.dbsize());
41}Sourcepub fn flush(&self) -> Result<()>
pub fn flush(&self) -> Result<()>
FLUSHALL — empty every shard (each logs FLUSHALL so a replay reaches
the same empty state).
Sourcepub fn key_bytes(&self, key: &[u8]) -> Option<u64>
pub fn key_bytes(&self, key: &[u8]) -> Option<u64>
MEMORY USAGE for one key — Some(bytes) or None if absent.
Sourcepub fn used_memory(&self) -> u64
pub fn used_memory(&self) -> u64
Live used_memory estimate (summed across shards).
Examples found in repository?
9fn main() -> std::io::Result<()> {
10 let s = Store::open(
11 Config::default()
12 .with_max_memory(200 * 1024)
13 .with_eviction(EvictionPolicy::AllKeysLru),
14 )?;
15
16 for i in 0..10_000 {
17 let key = format!("user:{i:05}");
18 let val = format!("user-payload-{i}");
19 s.set(key.as_bytes(), val.as_bytes())?;
20 }
21
22 println!("dbsize after insert flood: {}", s.dbsize());
23 println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24 println!("evictions_total: {}", s.evictions_total());
25
26 // Touch a recent key — it should still be live.
27 let recent = format!("user:0{}", 9999);
28 println!(
29 "user:09999 → {:?}",
30 s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31 );
32
33 Ok(())
34}Sourcepub fn evictions_total(&self) -> u64
pub fn evictions_total(&self) -> u64
INFO-style counter: total keys evicted by maxmemory (all shards).
Examples found in repository?
9fn main() -> std::io::Result<()> {
10 let s = Store::open(
11 Config::default()
12 .with_max_memory(200 * 1024)
13 .with_eviction(EvictionPolicy::AllKeysLru),
14 )?;
15
16 for i in 0..10_000 {
17 let key = format!("user:{i:05}");
18 let val = format!("user-payload-{i}");
19 s.set(key.as_bytes(), val.as_bytes())?;
20 }
21
22 println!("dbsize after insert flood: {}", s.dbsize());
23 println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24 println!("evictions_total: {}", s.evictions_total());
25
26 // Touch a recent key — it should still be live.
27 let recent = format!("user:0{}", 9999);
28 println!(
29 "user:09999 → {:?}",
30 s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31 );
32
33 Ok(())
34}Sourcepub fn expired_keys_total(&self) -> u64
pub fn expired_keys_total(&self) -> u64
INFO-style counter: total keys expired (lazy + active reaper, all shards).
Sourcepub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> Result<usize>
pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> Result<usize>
HSET key field value [field value ...]. Returns count newly added.
Sourcepub fn hget(&self, key: &[u8], field: &[u8]) -> Result<Option<Vec<u8>>>
pub fn hget(&self, key: &[u8], field: &[u8]) -> Result<Option<Vec<u8>>>
HGET key field. None if absent.
Sourcepub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> Result<usize>
pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> Result<usize>
HDEL key field [field ...]. Returns count actually removed.
Sourcepub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> Result<usize>
pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> Result<usize>
LPUSH key value [value ...]. Returns the new list length.
Sourcepub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> Result<usize>
pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> Result<usize>
RPUSH key value [value ...]. Returns the new list length.
Sourcepub fn lpop(&self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>>
pub fn lpop(&self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>>
LPOP key count. Returns popped values from the head.
Sourcepub fn rpop(&self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>>
pub fn rpop(&self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>>
RPOP key count. Symmetric to LPOP from the tail.
Sourcepub fn llen(&self, key: &[u8]) -> Result<usize>
pub fn llen(&self, key: &[u8]) -> Result<usize>
LLEN key. Length of the list at key; 0 if absent.
Sourcepub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>
pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>
SADD key member [member ...]. Returns count newly added.
Sourcepub fn srem(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>
pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>
SREM key member [member ...]. Returns count actually removed.
Sourcepub fn smembers(&self, key: &[u8]) -> Result<Vec<Vec<u8>>>
pub fn smembers(&self, key: &[u8]) -> Result<Vec<Vec<u8>>>
SMEMBERS key. Order implementation-defined; empty if absent.
Sourcepub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> Result<usize>
pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> Result<usize>
ZADD key score member [score member ...]. Returns count newly added.
Sourcepub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>
pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> Result<usize>
ZREM key member [member ...]. Returns count actually removed.
Sourcepub fn zscore(&self, key: &[u8], member: &[u8]) -> Result<Option<f64>>
pub fn zscore(&self, key: &[u8], member: &[u8]) -> Result<Option<f64>>
ZSCORE key member. Some(score) if present.
Sourcepub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize
pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize
PUBLISH channel payload. Delivers payload to every subscriber on
channel (direct + pattern matches) inside this process. Returns
the count of receivers the message reached.
Sourcepub fn subscribe(&self, channels: &[&[u8]]) -> Subscription
pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription
Open a Subscription subscribed to channels. Drop the handle
to unsubscribe from everything atomically. Pass &[] to start
with no subscriptions and add some later via
Subscription::subscribe / Subscription::psubscribe.
Sourcepub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription
pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription
Convenience: open a Subscription starting on pattern subscriptions.
Source§impl Store
impl Store
Sourcepub fn open(config: Config) -> Result<Self>
pub fn open(config: Config) -> Result<Self>
Open an embedded keyspace per config.
- Pure in-memory when
config.data_dirisNone. - With persistence: each shard loads its snapshot then replays its AOF
(
config.shards > 1re-shards a legacy single AOF on first open). - Spawns a background TTL reaper thread when
config.ttl_reaper == Background(the default).
Examples found in repository?
More examples
9fn main() -> std::io::Result<()> {
10 let s = Store::open(
11 Config::default()
12 .with_max_memory(200 * 1024)
13 .with_eviction(EvictionPolicy::AllKeysLru),
14 )?;
15
16 for i in 0..10_000 {
17 let key = format!("user:{i:05}");
18 let val = format!("user-payload-{i}");
19 s.set(key.as_bytes(), val.as_bytes())?;
20 }
21
22 println!("dbsize after insert flood: {}", s.dbsize());
23 println!("used_memory: {} bytes (limit 200 KiB)", s.used_memory());
24 println!("evictions_total: {}", s.evictions_total());
25
26 // Touch a recent key — it should still be live.
27 let recent = format!("user:0{}", 9999);
28 println!(
29 "user:09999 → {:?}",
30 s.get(recent.as_bytes())?.as_deref().map(String::from_utf8_lossy)
31 );
32
33 Ok(())
34}14fn main() {
15 let path = std::env::args()
16 .nth(1)
17 .expect("usage: replay_real_aof <aof-path>");
18 let src = PathBuf::from(&path);
19 let bytes = std::fs::metadata(&src)
20 .map(|m| m.len())
21 .unwrap_or(0);
22 println!("reproducer: staging {} bytes from {}", bytes, src.display());
23
24 let dir = std::env::temp_dir().join(format!(
25 "kevy-aof-reproducer-{}",
26 std::process::id()
27 ));
28 std::fs::create_dir_all(&dir).expect("create temp dir");
29 let staged = dir.join("aof-0.aof");
30 std::fs::copy(&src, &staged).expect("copy AOF into staging dir");
31
32 println!("reproducer: opening Store from {}", dir.display());
33 let store = Store::open(
34 Config::default()
35 .with_persist(&dir)
36 .with_aof_filename("aof-0.aof"),
37 )
38 .expect("Store::open");
39
40 println!("reproducer: opened OK; dbsize = {}", store.dbsize());
41}52fn main() {
53 let n_per: usize = std::env::var("KEVY_BENCH_N")
54 .ok()
55 .and_then(|s| s.parse().ok())
56 .unwrap_or(2_000_000);
57 let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
58 let shards: usize = std::env::var("KEVY_SHARDS")
59 .ok()
60 .and_then(|s| s.parse().ok())
61 .unwrap_or(1);
62
63 // Shared in-memory store (no AOF) — isolate the lock/keyspace from disk.
64 let store = Store::open(Config::default().with_shards(shards).with_ttl_reaper_manual()).unwrap();
65 for k in &keys {
66 store.set(k, VAL).unwrap();
67 }
68
69 println!(
70 "kevy-embedded MULTI-THREAD throughput — in-memory, shards={shards}, {KEYS} keys, {}B val, n={n_per}/thread",
71 VAL.len()
72 );
73 for &t in &[1usize, 2, 4, 8, 10] {
74 run("GET", &store, t, n_per, &keys, false);
75 }
76 for &t in &[1usize, 2, 4, 8, 10] {
77 run("SET", &store, t, n_per, &keys, true);
78 }
79}45fn main() {
46 let n: usize = std::env::var("KEVY_BENCH_N")
47 .ok()
48 .and_then(|s| s.parse().ok())
49 .unwrap_or(2_000_000);
50 // Keys precomputed outside the timed loop so `format!`/alloc cost isn't
51 // attributed to kevy.
52 let keys: Vec<Vec<u8>> = (0..KEYS).map(|i| format!("k{i}").into_bytes()).collect();
53
54 println!("kevy-embedded in-process throughput — single thread, n={n}, {KEYS} keys, {}B val", VAL.len());
55
56 let s1 = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
57 bench("in-memory", &s1, n, &keys);
58
59 let dir2 = std::env::temp_dir().join("kevy_embed_bench_everysec");
60 let _ = std::fs::remove_dir_all(&dir2);
61 let s2 = Store::open(
62 Config::default()
63 .with_persist(&dir2)
64 .with_ttl_reaper_manual()
65 .with_appendfsync(AppendFsync::EverySec),
66 )
67 .unwrap();
68 bench("aof-everysec", &s2, n, &keys);
69
70 let dir3 = std::env::temp_dir().join("kevy_embed_bench_always");
71 let _ = std::fs::remove_dir_all(&dir3);
72 let s3 = Store::open(
73 Config::default()
74 .with_persist(&dir3)
75 .with_ttl_reaper_manual()
76 .with_appendfsync(AppendFsync::Always),
77 )
78 .unwrap();
79 // Always-fsync is one fdatasync per write (no group commit on the embedded
80 // single-op path) — fsync-rate-bound, so run far fewer ops to stay bounded.
81 bench("aof-always", &s3, (n / 20).max(50_000), &keys);
82
83 // TTL'd-key GET: the mailrs path (every cache key has a TTL). With the
84 // background reaper the cached clock is trusted (no per-get Instant::now);
85 // manual mode reads a fresh clock per get — the gap is the cached-clock win.
86 bench_ttl_get("ttl GET (cached clk)", false, n, &keys); // background reaper
87 bench_ttl_get("ttl GET (fresh clk) ", true, n, &keys); // manual reaper
88
89 drop((s1, s2, s3));
90 let _ = std::fs::remove_dir_all(&dir2);
91 let _ = std::fs::remove_dir_all(&dir3);
92}
93
94/// GET throughput over keys that all carry a (long) TTL — the mailrs cache
95/// shape. `manual_reaper` toggles whether the store trusts the cached clock
96/// (background) or reads a fresh clock per get (manual).
97fn bench_ttl_get(label: &str, manual_reaper: bool, n: usize, keys: &[Vec<u8>]) {
98 let cfg = if manual_reaper {
99 Config::default().with_ttl_reaper_manual()
100 } else {
101 Config::default() // background reaper (default) → cached clock trusted
102 };
103 let store = Store::open(cfg).unwrap();
104 let ttl = std::time::Duration::from_secs(3600); // never expires during the run
105 for k in keys {
106 store.set_with_ttl(k, VAL, ttl).unwrap();
107 }
108 let t = Instant::now();
109 let mut hits = 0usize;
110 for i in 0..n {
111 if store.get(&keys[i % KEYS]).unwrap().is_some() {
112 hits += 1;
113 }
114 }
115 std::hint::black_box(hits);
116 println!("[{label}] GET {:>10.0} ops/s", n as f64 / t.elapsed().as_secs_f64());
117}Sourcepub fn config(&self) -> &Config
pub fn config(&self) -> &Config
The active config (a clone — modifying it has no effect on the
running store). Useful for introspection / INFO-style telemetry.
Sourcepub fn with<F, R>(&self, f: F) -> R
pub fn with<F, R>(&self, f: F) -> R
Run f against the underlying kevy_store::Store under its lock. Use
for direct access to methods this crate hasn’t wrapped. The closure can
mutate, but does not auto-log to the AOF — call Self::log yourself
if the mutation must survive a crash.
Sharded stores: this targets shard 0 only. Use Self::with_key
to reach the shard owning a specific key.
Sourcepub fn with_key<F, R>(&self, key: &[u8], f: F) -> R
pub fn with_key<F, R>(&self, key: &[u8], f: F) -> R
Like Self::with but targets the shard that owns key.
Sourcepub fn collect_keys(
&self,
pattern: Option<&[u8]>,
limit: Option<usize>,
) -> Vec<Vec<u8>>
pub fn collect_keys( &self, pattern: Option<&[u8]>, limit: Option<usize>, ) -> Vec<Vec<u8>>
KEYS / SCAN-glob across every shard — the cross-shard
replacement for with(|s| s.collect_keys(pat, lim)), which only sees
shard 0 once sharding is on. Behaves identically to with(...) when
shard_count() == 1. limit bounds the total returned across shards.
Takes a read lock per shard (concurrent-safe).
Sourcepub fn for_each_shard<F: FnMut(&mut Store)>(&self, f: F)
pub fn for_each_shard<F: FnMut(&mut Store)>(&self, f: F)
Run f against each shard’s underlying kevy_store::Store (in
shard-index order) — the cross-shard escape hatch. The caller assembles
the merged result. Pairs with Self::shard_count. For a single key,
prefer Self::with_key; for a glob scan, prefer Self::collect_keys.
Sourcepub fn shard_count(&self) -> usize
pub fn shard_count(&self) -> usize
Number of keyspace shards (== Config::shards).
Sourcepub fn log(&self, parts: &[&[u8]]) -> Result<()>
pub fn log(&self, parts: &[&[u8]]) -> Result<()>
Append a raw RESP-frame argument list to the shard owning its key’s AOF. No-op when persistence is disabled.
Sourcepub fn tick(&self) -> ExpireStats
pub fn tick(&self) -> ExpireStats
Run one TTL-reaper tick across every shard. Required call cadence in
Manual mode (~10×/s to match Redis hz=10). Returns the summed stats.
Sourcepub fn rewrite_aof(&self) -> Result<Option<RewriteStats>>
pub fn rewrite_aof(&self) -> Result<Option<RewriteStats>>
BGREWRITEAOF: rebuild every shard’s AOF from current state.
Synchronous. Returns the summed stats (None if persistence is off /
no shard rewrote).
Sourcepub fn save_snapshot(&self) -> Result<bool>
pub fn save_snapshot(&self) -> Result<bool>
Snapshot every shard to its dump-{i}.rdb (single shard: the configured
name), atomically. Ok(false) when persistence is disabled.