1use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex, MutexGuard};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
13use kevy_store::{ExpireStats, StoreError};
14
15use crate::config::{Config, TtlReaperMode};
16
17pub struct Store {
42 inner: Arc<Mutex<Inner>>,
43 config: Config,
44 reaper_stop: Option<Arc<AtomicBool>>,
45 reaper_join: Option<JoinHandle<()>>,
46}
47
48struct Inner {
49 store: kevy_store::Store,
50 aof: Option<Aof>,
51}
52
53impl Store {
54 pub fn open(config: Config) -> io::Result<Self> {
63 let mut store = kevy_store::Store::new();
64 store.set_max_memory(config.maxmemory, config.eviction_policy);
65
66 let aof = if let Some(dir) = &config.data_dir {
67 std::fs::create_dir_all(dir)?;
68 let snap_path = dir.join(&config.snapshot_filename);
69 if snap_path.exists() {
70 load_snapshot(&mut store, &snap_path)?;
71 }
72 let aof_path = dir.join(&config.aof_filename);
73 if aof_path.exists() {
74 replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
75 }
76 if config.aof {
77 Some(Aof::open(&aof_path, config.appendfsync)?)
78 } else {
79 None
80 }
81 } else {
82 None
83 };
84
85 let inner = Arc::new(Mutex::new(Inner { store, aof }));
86
87 let (reaper_stop, reaper_join) = match config.ttl_reaper {
88 TtlReaperMode::Manual => (None, None),
89 TtlReaperMode::Background => {
90 let stop = Arc::new(AtomicBool::new(false));
91 let stop_t = stop.clone();
92 let inner_t = inner.clone();
93 let interval = config.reaper_interval;
94 let samples = config.reaper_samples;
95 let rounds = config.reaper_max_rounds;
96 let handle = std::thread::Builder::new()
97 .name(String::from("kevy-embedded-reaper"))
98 .spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
99 (Some(stop), Some(handle))
100 }
101 };
102
103 Ok(Store {
104 inner,
105 config,
106 reaper_stop,
107 reaper_join,
108 })
109 }
110
111 pub fn config(&self) -> &Config {
114 &self.config
115 }
116
117 pub fn with<F, R>(&self, f: F) -> R
125 where
126 F: FnOnce(&mut kevy_store::Store) -> R,
127 {
128 let mut g = self.lock();
129 f(&mut g.store)
130 }
131
132 pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
136 let mut g = self.lock();
137 if let Some(aof) = &mut g.aof {
138 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
139 aof.append(&argv)?;
140 }
141 Ok(())
142 }
143
144 pub fn tick(&self) -> ExpireStats {
150 let mut g = self.lock();
151 g.store
152 .tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
153 }
154
155 pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
159 let mut g = self.lock();
160 let Inner { store, aof } = &mut *g;
164 let Some(aof) = aof else { return Ok(None) };
165 Ok(Some(aof.rewrite_from(store)?))
166 }
167
168 pub fn save_snapshot(&self) -> io::Result<bool> {
172 let g = self.lock();
173 let Some(dir) = self.config.data_dir.as_ref() else {
174 return Ok(false);
175 };
176 let path: PathBuf = dir.join(&self.config.snapshot_filename);
177 save_snapshot(&g.store, &path)?;
178 Ok(true)
179 }
180
181 pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
188 let mut g = self.lock();
189 let ok = g.store.set(key, value.to_vec(), None, false, false);
190 commit_write(&mut g, &[b"SET", key, value])?;
191 Ok(ok)
192 }
193
194 pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
196 let mut g = self.lock();
197 let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
198 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
199 commit_write(&mut g, &[b"SET", key, value])?;
200 commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
201 Ok(ok)
202 }
203
204 pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
206 let mut g = self.lock();
207 Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
208 }
209
210 pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
212 let mut g = self.lock();
213 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
214 let n = g.store.del(&owned);
215 if n > 0 {
216 let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
217 parts.push(b"DEL");
218 for k in keys {
219 parts.push(k);
220 }
221 commit_write(&mut g, &parts)?;
222 }
223 Ok(n)
224 }
225
226 pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
229 let mut g = self.lock();
230 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
231 Ok(g.store.exists(&owned))
232 }
233
234 pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
236 self.incr_by(key, 1)
237 }
238
239 pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
241 let mut g = self.lock();
242 let n = g.store.incr_by(key, delta).map_err(store_err)?;
243 commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
244 Ok(n)
245 }
246
247 pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
249 let mut g = self.lock();
250 let touched = g.store.expire(key, ttl);
251 if touched {
252 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
253 commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
254 }
255 Ok(touched)
256 }
257
258 pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
260 let mut g = self.lock();
261 let touched = g.store.persist(key);
262 if touched {
263 commit_write(&mut g, &[b"PERSIST", key])?;
264 }
265 Ok(touched)
266 }
267
268 pub fn ttl_ms(&self, key: &[u8]) -> i64 {
270 self.lock().store.pttl(key)
271 }
272
273 pub fn type_of(&self, key: &[u8]) -> &'static str {
275 self.lock().store.type_of(key)
276 }
277
278 pub fn dbsize(&self) -> usize {
280 self.lock().store.dbsize()
281 }
282
283 pub fn flush(&self) -> io::Result<()> {
286 let mut g = self.lock();
287 g.store.flush();
288 commit_write(&mut g, &[b"FLUSHALL"])?;
289 Ok(())
290 }
291
292 pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
294 self.lock().store.estimate_key_bytes(key)
295 }
296
297 pub fn used_memory(&self) -> u64 {
299 self.lock().store.used_memory()
300 }
301
302 pub fn evictions_total(&self) -> u64 {
304 self.lock().store.evictions_total()
305 }
306
307 pub fn expired_keys_total(&self) -> u64 {
309 self.lock().store.expired_keys_total()
310 }
311
312 pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
315 let mut g = self.lock();
316 let owned: Vec<(Vec<u8>, Vec<u8>)> =
317 pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
318 let added = g.store.hset(key, &owned).map_err(store_err)?;
319 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
320 parts.push(b"HSET");
321 parts.push(key);
322 for (f, v) in pairs {
323 parts.push(f);
324 parts.push(v);
325 }
326 commit_write(&mut g, &parts)?;
327 Ok(added)
328 }
329
330 pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
331 let mut g = self.lock();
332 Ok(g.store.hget(key, field).map_err(store_err)?.map(|v| v.to_vec()))
333 }
334
335 pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
336 let mut g = self.lock();
337 let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
338 let removed = g.store.hdel(key, &owned).map_err(store_err)?;
339 if removed > 0 {
340 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
341 parts.push(b"HDEL");
342 parts.push(key);
343 for f in fields {
344 parts.push(f);
345 }
346 commit_write(&mut g, &parts)?;
347 }
348 Ok(removed)
349 }
350
351 pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
354 push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
355 }
356
357 pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
358 push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
359 }
360
361 pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
362 pop_helper(self, key, count, false)
363 }
364
365 pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
366 pop_helper(self, key, count, true)
367 }
368
369 pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
370 self.lock().store.llen(key).map_err(store_err)
371 }
372
373 pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
376 push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
377 }
378
379 pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
380 let mut g = self.lock();
381 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
382 let removed = g.store.srem(key, &owned).map_err(store_err)?;
383 if removed > 0 {
384 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
385 parts.push(b"SREM");
386 parts.push(key);
387 for m in members {
388 parts.push(m);
389 }
390 commit_write(&mut g, &parts)?;
391 }
392 Ok(removed)
393 }
394
395 pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
396 self.lock().store.smembers(key).map_err(store_err)
397 }
398
399 pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
400 self.lock().store.scard(key).map_err(store_err)
401 }
402
403 pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
406 let mut g = self.lock();
407 let owned: Vec<(f64, Vec<u8>)> =
408 pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
409 let added = g.store.zadd(key, &owned).map_err(store_err)?;
410 let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
412 for (s, _) in pairs {
413 score_strs.push(format!("{s}").into_bytes());
414 }
415 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
416 parts.push(b"ZADD");
417 parts.push(key);
418 for (i, (_, m)) in pairs.iter().enumerate() {
419 parts.push(&score_strs[i]);
420 parts.push(m);
421 }
422 commit_write(&mut g, &parts)?;
423 Ok(added)
424 }
425
426 pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
427 let mut g = self.lock();
428 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
429 let removed = g.store.zrem(key, &owned).map_err(store_err)?;
430 if removed > 0 {
431 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
432 parts.push(b"ZREM");
433 parts.push(key);
434 for m in members {
435 parts.push(m);
436 }
437 commit_write(&mut g, &parts)?;
438 }
439 Ok(removed)
440 }
441
442 pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
443 self.lock().store.zscore(key, member).map_err(store_err)
444 }
445
446 pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
447 self.lock().store.zcard(key).map_err(store_err)
448 }
449
450 fn lock(&self) -> MutexGuard<'_, Inner> {
453 match self.inner.lock() {
457 Ok(g) => g,
458 Err(poison) => poison.into_inner(),
459 }
460 }
461}
462
463fn push_helper<F>(
464 s: &Store,
465 key: &[u8],
466 values: &[&[u8]],
467 verb: &'static [u8],
468 op: F,
469) -> io::Result<usize>
470where
471 F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
472{
473 let mut g = s.lock();
474 let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
475 let n = op(&mut g.store, key, &owned).map_err(store_err)?;
476 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
477 parts.push(verb);
478 parts.push(key);
479 for v in values {
480 parts.push(v);
481 }
482 commit_write(&mut g, &parts)?;
483 Ok(n)
484}
485
486fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
487 let mut g = s.lock();
488 let popped = if from_tail {
489 g.store.rpop(key, count).map_err(store_err)?
490 } else {
491 g.store.lpop(key, count).map_err(store_err)?
492 };
493 if !popped.is_empty() {
494 let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
495 let count_str = popped.len().to_string();
496 let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
497 commit_write(&mut g, &parts)?;
498 }
499 Ok(popped)
500}
501
502fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
503 if let Some(aof) = aof {
504 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
505 aof.append(&argv)?;
506 }
507 Ok(())
508}
509
510fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
515 log_argv(&mut inner.aof, parts)?;
516 inner.store.try_evict_after_write();
517 Ok(())
518}
519
520fn store_err(e: StoreError) -> io::Error {
521 io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
522}
523
524fn reaper_loop(
525 inner: Arc<Mutex<Inner>>,
526 stop: Arc<AtomicBool>,
527 interval: Duration,
528 samples: usize,
529 rounds: u32,
530) {
531 while !stop.load(Ordering::Relaxed) {
532 std::thread::sleep(interval);
533 if stop.load(Ordering::Relaxed) {
534 break;
535 }
536 let mut g = match inner.lock() {
537 Ok(g) => g,
538 Err(poison) => poison.into_inner(),
539 };
540 let _ = g.store.tick_expire(samples, rounds);
541 if let Some(aof) = &mut g.aof {
544 let _ = aof.maybe_sync();
545 }
546 }
547}
548
549impl Drop for Store {
550 fn drop(&mut self) {
551 if let Some(stop) = &self.reaper_stop {
553 stop.store(true, Ordering::Relaxed);
554 }
555 if let Some(j) = self.reaper_join.take() {
556 let _ = j.join();
557 }
558 let mut g = match self.inner.lock() {
565 Ok(g) => g,
566 Err(poison) => poison.into_inner(),
567 };
568 if let Some(aof) = &mut g.aof {
569 let _ = aof.maybe_sync();
570 }
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use crate::config::{AppendFsync, EvictionPolicy};
578
579 fn tmp_dir(name: &str) -> PathBuf {
580 let mut p = std::env::temp_dir();
581 let uniq = std::time::SystemTime::now()
582 .duration_since(std::time::UNIX_EPOCH)
583 .unwrap()
584 .as_nanos();
585 p.push(format!("kevy-embedded-{name}-{uniq}"));
586 p
587 }
588
589 #[test]
590 fn in_memory_roundtrip() {
591 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
592 s.set(b"k", b"v").unwrap();
593 assert_eq!(s.get(b"k").unwrap(), Some(b"v".to_vec()));
594 assert_eq!(s.dbsize(), 1);
595 s.del(&[b"k"]).unwrap();
596 assert_eq!(s.dbsize(), 0);
597 }
598
599 #[test]
600 fn persistence_round_trip_via_aof() {
601 let dir = tmp_dir("aof-rt");
602 {
603 let s = Store::open(
604 Config::default()
605 .with_persist(&dir)
606 .with_ttl_reaper_manual()
607 .with_appendfsync(AppendFsync::Always),
608 )
609 .unwrap();
610 for i in 0..50 {
611 s.set(format!("k{i}").as_bytes(), b"v").unwrap();
612 }
613 s.incr_by(b"counter", 41).unwrap();
614 s.hset(b"h", &[(b"field" as &[u8], b"val" as &[u8])]).unwrap();
615 }
616 let s2 = Store::open(
618 Config::default()
619 .with_persist(&dir)
620 .with_ttl_reaper_manual(),
621 )
622 .unwrap();
623 assert_eq!(s2.dbsize(), 52); assert_eq!(s2.get(b"k0").unwrap(), Some(b"v".to_vec()));
625 assert_eq!(s2.get(b"k49").unwrap(), Some(b"v".to_vec()));
626 assert_eq!(s2.get(b"counter").unwrap(), Some(b"41".to_vec()));
627 assert_eq!(s2.hget(b"h", b"field").unwrap(), Some(b"val".to_vec()));
628 drop(s2);
629 let _ = std::fs::remove_dir_all(&dir);
630 }
631
632 #[test]
633 fn eviction_works_under_pressure() {
634 let s = Store::open(
635 Config::default()
636 .with_ttl_reaper_manual()
637 .with_max_memory(800)
638 .with_eviction(EvictionPolicy::AllKeysLru),
639 )
640 .unwrap();
641 for i in 0..50 {
642 s.set(format!("k{i:02}").as_bytes(), b"xxxxxxxxxxxxxxxxxxxx")
643 .unwrap();
644 }
645 assert!(s.used_memory() <= 800, "got {}", s.used_memory());
646 assert!(s.evictions_total() > 0);
647 }
648
649 #[test]
650 fn manual_tick_runs_active_reaper() {
651 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
652 s.set_with_ttl(b"short", b"v", Duration::from_millis(1)).unwrap();
653 s.set(b"perm", b"v").unwrap();
654 std::thread::sleep(Duration::from_millis(20));
655 let stats = s.tick();
656 let _ = stats;
659 let _ = s.get(b"short").unwrap(); assert!(s.expired_keys_total() >= 1);
661 assert!(s.get(b"perm").unwrap().is_some());
662 }
663
664 #[test]
665 fn with_escape_hatch_works() {
666 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
667 let zsize = s.with(|store| {
668 let _ = store.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec())]);
669 store.zcard(b"z").unwrap()
670 });
671 assert_eq!(zsize, 2);
672 assert_eq!(s.type_of(b"z"), "zset");
675 }
676
677 #[test]
678 fn background_reaper_thread_drops_expired_keys() {
679 let s = Store::open(
680 Config::default().with_reaper_interval(Duration::from_millis(20)),
681 )
682 .unwrap();
683 s.set_with_ttl(b"k", b"v", Duration::from_millis(5)).unwrap();
684 std::thread::sleep(Duration::from_millis(120));
685 let _ = s.get(b"k").unwrap(); assert_eq!(s.dbsize(), 0);
688 }
689
690 #[test]
691 fn arc_sharing_across_threads() {
692 use std::sync::Arc;
693 let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual()).unwrap());
694 let mut handles = Vec::new();
695 for i in 0..8 {
696 let s = Arc::clone(&s);
697 handles.push(std::thread::spawn(move || {
698 for j in 0..50 {
699 s.set(format!("t{i}-{j}").as_bytes(), b"v").unwrap();
700 }
701 }));
702 }
703 for h in handles {
704 h.join().unwrap();
705 }
706 assert_eq!(s.dbsize(), 8 * 50);
707 }
708
709 #[test]
710 fn drop_during_reaper_does_not_deadlock() {
711 for _ in 0..4 {
715 let s = Store::open(
716 Config::default().with_reaper_interval(Duration::from_millis(5)),
717 )
718 .unwrap();
719 s.set(b"k", b"v").unwrap();
720 std::thread::sleep(Duration::from_millis(40));
722 drop(s); }
724 }
725
726 #[test]
727 fn save_snapshot_then_restart() {
728 let dir = tmp_dir("snap-rt");
729 {
730 let s = Store::open(
731 Config::default()
732 .with_persist(&dir)
733 .without_aof()
734 .with_ttl_reaper_manual(),
735 )
736 .unwrap();
737 for i in 0..10 {
738 s.set(format!("k{i}").as_bytes(), b"v").unwrap();
739 }
740 let saved = s.save_snapshot().unwrap();
741 assert!(saved);
742 }
743 let s2 = Store::open(
744 Config::default()
745 .with_persist(&dir)
746 .without_aof()
747 .with_ttl_reaper_manual(),
748 )
749 .unwrap();
750 assert_eq!(s2.dbsize(), 10);
751 let _ = std::fs::remove_dir_all(&dir);
752 }
753}