1use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex, MutexGuard, Weak};
9use std::thread::JoinHandle;
10use std::time::Duration;
11
12use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
13use kevy_store::{ExpireStats, StoreError};
14
15use crate::config::{Config, TtlReaperMode};
16use crate::pubsub::{PubsubBus, Subscription};
17
18#[derive(Clone)]
42pub struct Store {
43 inner: Arc<Mutex<Inner>>,
44 guard: Arc<DropGuard>,
47 config: Config,
48}
49
50pub struct WeakStore {
56 inner: Weak<Mutex<Inner>>,
57 guard: Weak<DropGuard>,
58 config: Config,
59}
60
61impl WeakStore {
62 pub fn upgrade(&self) -> Option<Store> {
65 Some(Store {
66 inner: self.inner.upgrade()?,
67 guard: self.guard.upgrade()?,
68 config: self.config.clone(),
69 })
70 }
71}
72
73pub(crate) struct Inner {
74 pub(crate) store: kevy_store::Store,
75 pub(crate) aof: Option<Aof>,
76 pub(crate) bus: PubsubBus,
77}
78
79pub(crate) struct DropGuard {
85 reaper_stop: Option<Arc<AtomicBool>>,
86 reaper_join: Mutex<Option<JoinHandle<()>>>,
87 inner_for_flush: Arc<Mutex<Inner>>,
88}
89
90impl Store {
91 pub fn open(config: Config) -> io::Result<Self> {
100 let mut store = kevy_store::Store::new();
101 store.set_max_memory(config.maxmemory, config.eviction_policy);
102
103 let aof = if let Some(dir) = &config.data_dir {
104 std::fs::create_dir_all(dir)?;
105 let snap_path = dir.join(&config.snapshot_filename);
106 if snap_path.exists() {
107 load_snapshot(&mut store, &snap_path)?;
108 }
109 let aof_path = dir.join(&config.aof_filename);
110 if aof_path.exists() {
111 replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
112 }
113 if config.aof {
114 Some(Aof::open(&aof_path, config.appendfsync)?)
115 } else {
116 None
117 }
118 } else {
119 None
120 };
121
122 let inner = Arc::new(Mutex::new(Inner {
123 store,
124 aof,
125 bus: PubsubBus::new(),
126 }));
127
128 let (reaper_stop, reaper_join) = match config.ttl_reaper {
129 TtlReaperMode::Manual => (None, None),
130 TtlReaperMode::Background => {
131 let stop = Arc::new(AtomicBool::new(false));
132 let stop_t = stop.clone();
133 let inner_t = inner.clone();
134 let interval = config.reaper_interval;
135 let samples = config.reaper_samples;
136 let rounds = config.reaper_max_rounds;
137 let handle = std::thread::Builder::new()
138 .name(String::from("kevy-embedded-reaper"))
139 .spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
140 (Some(stop), Some(handle))
141 }
142 };
143
144 let guard = Arc::new(DropGuard {
145 reaper_stop,
146 reaper_join: Mutex::new(reaper_join),
147 inner_for_flush: inner.clone(),
148 });
149
150 Ok(Store {
151 inner,
152 guard,
153 config,
154 })
155 }
156
157 pub fn downgrade(&self) -> WeakStore {
160 WeakStore {
161 inner: Arc::downgrade(&self.inner),
162 guard: Arc::downgrade(&self.guard),
163 config: self.config.clone(),
164 }
165 }
166
167 pub fn config(&self) -> &Config {
170 &self.config
171 }
172
173 pub fn with<F, R>(&self, f: F) -> R
181 where
182 F: FnOnce(&mut kevy_store::Store) -> R,
183 {
184 let mut g = self.lock();
185 f(&mut g.store)
186 }
187
188 pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
192 let mut g = self.lock();
193 if let Some(aof) = &mut g.aof {
194 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
195 aof.append(&argv)?;
196 }
197 Ok(())
198 }
199
200 pub fn tick(&self) -> ExpireStats {
206 let mut g = self.lock();
207 g.store
208 .tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
209 }
210
211 pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
215 let mut g = self.lock();
216 let Inner { store, aof, bus: _ } = &mut *g;
220 let Some(aof) = aof else { return Ok(None) };
221 Ok(Some(aof.rewrite_from(store)?))
222 }
223
224 pub fn save_snapshot(&self) -> io::Result<bool> {
228 let g = self.lock();
229 let Some(dir) = self.config.data_dir.as_ref() else {
230 return Ok(false);
231 };
232 let path: PathBuf = dir.join(&self.config.snapshot_filename);
233 save_snapshot(&g.store, &path)?;
234 Ok(true)
235 }
236
237 pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
244 let mut g = self.lock();
245 let ok = g.store.set(key, value.to_vec(), None, false, false);
246 commit_write(&mut g, &[b"SET", key, value])?;
247 Ok(ok)
248 }
249
250 pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
252 let mut g = self.lock();
253 let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
254 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
255 commit_write(&mut g, &[b"SET", key, value])?;
256 commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
257 Ok(ok)
258 }
259
260 pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
262 let mut g = self.lock();
263 Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
264 }
265
266 pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
268 let mut g = self.lock();
269 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
270 let n = g.store.del(&owned);
271 if n > 0 {
272 let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
273 parts.push(b"DEL");
274 for k in keys {
275 parts.push(k);
276 }
277 commit_write(&mut g, &parts)?;
278 }
279 Ok(n)
280 }
281
282 pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
285 let mut g = self.lock();
286 let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
287 Ok(g.store.exists(&owned))
288 }
289
290 pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
292 self.incr_by(key, 1)
293 }
294
295 pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
297 let mut g = self.lock();
298 let n = g.store.incr_by(key, delta).map_err(store_err)?;
299 commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
300 Ok(n)
301 }
302
303 pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
305 let mut g = self.lock();
306 let touched = g.store.expire(key, ttl);
307 if touched {
308 let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
309 commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
310 }
311 Ok(touched)
312 }
313
314 pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
316 let mut g = self.lock();
317 let touched = g.store.persist(key);
318 if touched {
319 commit_write(&mut g, &[b"PERSIST", key])?;
320 }
321 Ok(touched)
322 }
323
324 pub fn ttl_ms(&self, key: &[u8]) -> i64 {
326 self.lock().store.pttl(key)
327 }
328
329 pub fn type_of(&self, key: &[u8]) -> &'static str {
331 self.lock().store.type_of(key)
332 }
333
334 pub fn dbsize(&self) -> usize {
336 self.lock().store.dbsize()
337 }
338
339 pub fn flush(&self) -> io::Result<()> {
342 let mut g = self.lock();
343 g.store.flush();
344 commit_write(&mut g, &[b"FLUSHALL"])?;
345 Ok(())
346 }
347
348 pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
350 self.lock().store.estimate_key_bytes(key)
351 }
352
353 pub fn used_memory(&self) -> u64 {
355 self.lock().store.used_memory()
356 }
357
358 pub fn evictions_total(&self) -> u64 {
360 self.lock().store.evictions_total()
361 }
362
363 pub fn expired_keys_total(&self) -> u64 {
365 self.lock().store.expired_keys_total()
366 }
367
368 pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
371 let mut g = self.lock();
372 let owned: Vec<(Vec<u8>, Vec<u8>)> =
373 pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
374 let added = g.store.hset(key, &owned).map_err(store_err)?;
375 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
376 parts.push(b"HSET");
377 parts.push(key);
378 for (f, v) in pairs {
379 parts.push(f);
380 parts.push(v);
381 }
382 commit_write(&mut g, &parts)?;
383 Ok(added)
384 }
385
386 pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
387 let mut g = self.lock();
388 Ok(g.store.hget(key, field).map_err(store_err)?.map(|v| v.to_vec()))
389 }
390
391 pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
392 let mut g = self.lock();
393 let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
394 let removed = g.store.hdel(key, &owned).map_err(store_err)?;
395 if removed > 0 {
396 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
397 parts.push(b"HDEL");
398 parts.push(key);
399 for f in fields {
400 parts.push(f);
401 }
402 commit_write(&mut g, &parts)?;
403 }
404 Ok(removed)
405 }
406
407 pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
410 push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
411 }
412
413 pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
414 push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
415 }
416
417 pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
418 pop_helper(self, key, count, false)
419 }
420
421 pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
422 pop_helper(self, key, count, true)
423 }
424
425 pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
426 self.lock().store.llen(key).map_err(store_err)
427 }
428
429 pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
432 push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
433 }
434
435 pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
436 let mut g = self.lock();
437 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
438 let removed = g.store.srem(key, &owned).map_err(store_err)?;
439 if removed > 0 {
440 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
441 parts.push(b"SREM");
442 parts.push(key);
443 for m in members {
444 parts.push(m);
445 }
446 commit_write(&mut g, &parts)?;
447 }
448 Ok(removed)
449 }
450
451 pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
452 self.lock().store.smembers(key).map_err(store_err)
453 }
454
455 pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
456 self.lock().store.scard(key).map_err(store_err)
457 }
458
459 pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
462 let mut g = self.lock();
463 let owned: Vec<(f64, Vec<u8>)> =
464 pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
465 let added = g.store.zadd(key, &owned).map_err(store_err)?;
466 let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
468 for (s, _) in pairs {
469 score_strs.push(format!("{s}").into_bytes());
470 }
471 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
472 parts.push(b"ZADD");
473 parts.push(key);
474 for (i, (_, m)) in pairs.iter().enumerate() {
475 parts.push(&score_strs[i]);
476 parts.push(m);
477 }
478 commit_write(&mut g, &parts)?;
479 Ok(added)
480 }
481
482 pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
483 let mut g = self.lock();
484 let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
485 let removed = g.store.zrem(key, &owned).map_err(store_err)?;
486 if removed > 0 {
487 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
488 parts.push(b"ZREM");
489 parts.push(key);
490 for m in members {
491 parts.push(m);
492 }
493 commit_write(&mut g, &parts)?;
494 }
495 Ok(removed)
496 }
497
498 pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
499 self.lock().store.zscore(key, member).map_err(store_err)
500 }
501
502 pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
503 self.lock().store.zcard(key).map_err(store_err)
504 }
505
506 pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
512 let plans = {
516 let g = self.lock();
517 g.bus.collect_delivery(channel, payload)
518 };
519 let mut count = 0;
520 for (frame, sender) in plans {
521 if sender.send(frame).is_ok() {
522 count += 1;
523 }
524 }
525 count
526 }
527
528 pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
533 let mut sub = Subscription::new(self.inner.clone(), self.guard.clone());
534 if !channels.is_empty() {
535 sub.subscribe(channels);
536 }
537 sub
538 }
539
540 pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
542 let mut sub = Subscription::new(self.inner.clone(), self.guard.clone());
543 if !patterns.is_empty() {
544 sub.psubscribe(patterns);
545 }
546 sub
547 }
548
549 fn lock(&self) -> MutexGuard<'_, Inner> {
552 match self.inner.lock() {
556 Ok(g) => g,
557 Err(poison) => poison.into_inner(),
558 }
559 }
560}
561
562fn push_helper<F>(
563 s: &Store,
564 key: &[u8],
565 values: &[&[u8]],
566 verb: &'static [u8],
567 op: F,
568) -> io::Result<usize>
569where
570 F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
571{
572 let mut g = s.lock();
573 let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
574 let n = op(&mut g.store, key, &owned).map_err(store_err)?;
575 let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
576 parts.push(verb);
577 parts.push(key);
578 for v in values {
579 parts.push(v);
580 }
581 commit_write(&mut g, &parts)?;
582 Ok(n)
583}
584
585fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
586 let mut g = s.lock();
587 let popped = if from_tail {
588 g.store.rpop(key, count).map_err(store_err)?
589 } else {
590 g.store.lpop(key, count).map_err(store_err)?
591 };
592 if !popped.is_empty() {
593 let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
594 let count_str = popped.len().to_string();
595 let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
596 commit_write(&mut g, &parts)?;
597 }
598 Ok(popped)
599}
600
601fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
602 if let Some(aof) = aof {
603 let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
604 aof.append(&argv)?;
605 }
606 Ok(())
607}
608
609fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
614 log_argv(&mut inner.aof, parts)?;
615 inner.store.try_evict_after_write();
616 Ok(())
617}
618
619fn store_err(e: StoreError) -> io::Error {
620 io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
621}
622
623fn reaper_loop(
624 inner: Arc<Mutex<Inner>>,
625 stop: Arc<AtomicBool>,
626 interval: Duration,
627 samples: usize,
628 rounds: u32,
629) {
630 while !stop.load(Ordering::Relaxed) {
631 std::thread::sleep(interval);
632 if stop.load(Ordering::Relaxed) {
633 break;
634 }
635 let mut g = match inner.lock() {
636 Ok(g) => g,
637 Err(poison) => poison.into_inner(),
638 };
639 let _ = g.store.tick_expire(samples, rounds);
640 if let Some(aof) = &mut g.aof {
643 let _ = aof.maybe_sync();
644 }
645 }
646}
647
648impl Drop for DropGuard {
649 fn drop(&mut self) {
650 if let Some(stop) = &self.reaper_stop {
655 stop.store(true, Ordering::Relaxed);
656 }
657 if let Some(j) = self
658 .reaper_join
659 .lock()
660 .unwrap_or_else(|p| p.into_inner())
661 .take()
662 {
663 let _ = j.join();
664 }
665 let mut g = match self.inner_for_flush.lock() {
666 Ok(g) => g,
667 Err(poison) => poison.into_inner(),
668 };
669 if let Some(aof) = &mut g.aof {
670 let _ = aof.maybe_sync();
671 }
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678 use crate::config::{AppendFsync, EvictionPolicy};
679
680 fn tmp_dir(name: &str) -> PathBuf {
681 let mut p = std::env::temp_dir();
682 let uniq = std::time::SystemTime::now()
683 .duration_since(std::time::UNIX_EPOCH)
684 .unwrap()
685 .as_nanos();
686 p.push(format!("kevy-embedded-{name}-{uniq}"));
687 p
688 }
689
690 #[test]
691 fn in_memory_roundtrip() {
692 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
693 s.set(b"k", b"v").unwrap();
694 assert_eq!(s.get(b"k").unwrap(), Some(b"v".to_vec()));
695 assert_eq!(s.dbsize(), 1);
696 s.del(&[b"k"]).unwrap();
697 assert_eq!(s.dbsize(), 0);
698 }
699
700 #[test]
701 fn persistence_round_trip_via_aof() {
702 let dir = tmp_dir("aof-rt");
703 {
704 let s = Store::open(
705 Config::default()
706 .with_persist(&dir)
707 .with_ttl_reaper_manual()
708 .with_appendfsync(AppendFsync::Always),
709 )
710 .unwrap();
711 for i in 0..50 {
712 s.set(format!("k{i}").as_bytes(), b"v").unwrap();
713 }
714 s.incr_by(b"counter", 41).unwrap();
715 s.hset(b"h", &[(b"field" as &[u8], b"val" as &[u8])]).unwrap();
716 }
717 let s2 = Store::open(
719 Config::default()
720 .with_persist(&dir)
721 .with_ttl_reaper_manual(),
722 )
723 .unwrap();
724 assert_eq!(s2.dbsize(), 52); assert_eq!(s2.get(b"k0").unwrap(), Some(b"v".to_vec()));
726 assert_eq!(s2.get(b"k49").unwrap(), Some(b"v".to_vec()));
727 assert_eq!(s2.get(b"counter").unwrap(), Some(b"41".to_vec()));
728 assert_eq!(s2.hget(b"h", b"field").unwrap(), Some(b"val".to_vec()));
729 drop(s2);
730 let _ = std::fs::remove_dir_all(&dir);
731 }
732
733 #[test]
734 fn eviction_works_under_pressure() {
735 let s = Store::open(
736 Config::default()
737 .with_ttl_reaper_manual()
738 .with_max_memory(800)
739 .with_eviction(EvictionPolicy::AllKeysLru),
740 )
741 .unwrap();
742 for i in 0..50 {
743 s.set(format!("k{i:02}").as_bytes(), b"xxxxxxxxxxxxxxxxxxxx")
744 .unwrap();
745 }
746 assert!(s.used_memory() <= 800, "got {}", s.used_memory());
747 assert!(s.evictions_total() > 0);
748 }
749
750 #[test]
751 fn manual_tick_runs_active_reaper() {
752 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
753 s.set_with_ttl(b"short", b"v", Duration::from_millis(1)).unwrap();
754 s.set(b"perm", b"v").unwrap();
755 std::thread::sleep(Duration::from_millis(20));
756 let stats = s.tick();
757 let _ = stats;
760 let _ = s.get(b"short").unwrap(); assert!(s.expired_keys_total() >= 1);
762 assert!(s.get(b"perm").unwrap().is_some());
763 }
764
765 #[test]
766 fn with_escape_hatch_works() {
767 let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
768 let zsize = s.with(|store| {
769 let _ = store.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec())]);
770 store.zcard(b"z").unwrap()
771 });
772 assert_eq!(zsize, 2);
773 assert_eq!(s.type_of(b"z"), "zset");
776 }
777
778 #[test]
779 fn background_reaper_thread_drops_expired_keys() {
780 let s = Store::open(
781 Config::default().with_reaper_interval(Duration::from_millis(20)),
782 )
783 .unwrap();
784 s.set_with_ttl(b"k", b"v", Duration::from_millis(5)).unwrap();
785 std::thread::sleep(Duration::from_millis(120));
786 let _ = s.get(b"k").unwrap(); assert_eq!(s.dbsize(), 0);
789 }
790
791 #[test]
792 fn arc_sharing_across_threads() {
793 use std::sync::Arc;
794 let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual()).unwrap());
795 let mut handles = Vec::new();
796 for i in 0..8 {
797 let s = Arc::clone(&s);
798 handles.push(std::thread::spawn(move || {
799 for j in 0..50 {
800 s.set(format!("t{i}-{j}").as_bytes(), b"v").unwrap();
801 }
802 }));
803 }
804 for h in handles {
805 h.join().unwrap();
806 }
807 assert_eq!(s.dbsize(), 8 * 50);
808 }
809
810 #[test]
811 fn drop_during_reaper_does_not_deadlock() {
812 for _ in 0..4 {
816 let s = Store::open(
817 Config::default().with_reaper_interval(Duration::from_millis(5)),
818 )
819 .unwrap();
820 s.set(b"k", b"v").unwrap();
821 std::thread::sleep(Duration::from_millis(40));
823 drop(s); }
825 }
826
827 #[test]
828 fn save_snapshot_then_restart() {
829 let dir = tmp_dir("snap-rt");
830 {
831 let s = Store::open(
832 Config::default()
833 .with_persist(&dir)
834 .without_aof()
835 .with_ttl_reaper_manual(),
836 )
837 .unwrap();
838 for i in 0..10 {
839 s.set(format!("k{i}").as_bytes(), b"v").unwrap();
840 }
841 let saved = s.save_snapshot().unwrap();
842 assert!(saved);
843 }
844 let s2 = Store::open(
845 Config::default()
846 .with_persist(&dir)
847 .without_aof()
848 .with_ttl_reaper_manual(),
849 )
850 .unwrap();
851 assert_eq!(s2.dbsize(), 10);
852 let _ = std::fs::remove_dir_all(&dir);
853 }
854}