1#![forbid(unsafe_code)]
38
39pub use kevy_resp::Argv;
40use kevy_store::{Store, Value};
41use std::fs::{File, OpenOptions};
43use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
44use std::path::{Path, PathBuf};
45use std::time::{Duration, Instant};
46
47const MAGIC: &[u8; 8] = b"KEVYSNAP";
49const VERSION: u8 = 2;
50
51const OP_EOF: u8 = 0;
54const OP_STR: u8 = 1;
55const OP_HASH: u8 = 2;
56const OP_LIST: u8 = 3;
57const OP_SET: u8 = 4;
58const OP_ZSET: u8 = 5;
59
60pub fn save_snapshot(store: &Store, path: &Path) -> io::Result<()> {
63 let tmp = tmp_path(path);
64 {
65 let mut w = BufWriter::new(File::create(&tmp)?);
66 w.write_all(MAGIC)?;
67 w.write_all(&[VERSION])?;
68 let mut err: Option<io::Error> = None;
70 store.snapshot_each(|key, value, ttl| {
71 if err.is_none()
72 && let Err(e) = write_entry(&mut w, key, value, ttl)
73 {
74 err = Some(e);
75 }
76 });
77 if let Some(e) = err {
78 return Err(e);
79 }
80 w.write_all(&[OP_EOF])?;
81 w.flush()?;
82 w.get_ref().sync_all()?; }
84 std::fs::rename(&tmp, path)
85}
86
87pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
90 let mut r = BufReader::new(File::open(path)?);
91
92 let mut magic = [0u8; 8];
93 r.read_exact(&mut magic)?;
94 if &magic != MAGIC {
95 return Err(io::Error::new(
96 io::ErrorKind::InvalidData,
97 "kevy snapshot: bad magic",
98 ));
99 }
100 if read_u8(&mut r)? != VERSION {
101 return Err(io::Error::new(
102 io::ErrorKind::InvalidData,
103 "kevy snapshot: bad version",
104 ));
105 }
106
107 loop {
108 let op = read_u8(&mut r)?;
109 if op == OP_EOF {
110 return Ok(());
111 }
112 let ttl = read_ttl(&mut r)?;
113 let key = read_bytes(&mut r)?;
114 match op {
115 OP_STR => {
116 let val = read_bytes(&mut r)?;
117 store.load_str(key, val, ttl);
118 }
119 OP_HASH => {
120 let n = read_u32(&mut r)? as usize;
121 let mut fields = Vec::with_capacity(n);
122 for _ in 0..n {
123 let f = read_bytes(&mut r)?;
124 let v = read_bytes(&mut r)?;
125 fields.push((f, v));
126 }
127 store.load_hash(key, fields, ttl);
128 }
129 OP_LIST => {
130 let n = read_u32(&mut r)? as usize;
131 let mut items = Vec::with_capacity(n);
132 for _ in 0..n {
133 items.push(read_bytes(&mut r)?);
134 }
135 store.load_list(key, items, ttl);
136 }
137 OP_SET => {
138 let n = read_u32(&mut r)? as usize;
139 let mut members = Vec::with_capacity(n);
140 for _ in 0..n {
141 members.push(read_bytes(&mut r)?);
142 }
143 store.load_set(key, members, ttl);
144 }
145 OP_ZSET => {
146 let n = read_u32(&mut r)? as usize;
147 let mut pairs = Vec::with_capacity(n);
148 for _ in 0..n {
149 let m = read_bytes(&mut r)?;
150 let score = f64::from_bits(read_u64(&mut r)?);
151 pairs.push((m, score));
152 }
153 store.load_zset(key, pairs, ttl);
154 }
155 other => {
156 return Err(io::Error::new(
157 io::ErrorKind::InvalidData,
158 format!("kevy snapshot: unknown opcode {other}"),
159 ));
160 }
161 }
162 }
163}
164
165fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
167 match value {
168 Value::Str(v) => {
169 w.write_all(&[OP_STR])?;
170 write_ttl(w, ttl)?;
171 write_bytes(w, key)?;
172 write_bytes(w, v.as_slice())?;
173 }
174 Value::Hash(h) => {
175 w.write_all(&[OP_HASH])?;
176 write_ttl(w, ttl)?;
177 write_bytes(w, key)?;
178 w.write_all(&(h.len() as u32).to_le_bytes())?;
179 for (f, v) in h.iter() {
180 write_bytes(w, f.as_slice())?;
181 write_bytes(w, v)?;
182 }
183 }
184 Value::List(l) => {
185 w.write_all(&[OP_LIST])?;
186 write_ttl(w, ttl)?;
187 write_bytes(w, key)?;
188 w.write_all(&(l.len() as u32).to_le_bytes())?;
189 for item in l.iter() {
190 write_bytes(w, item)?;
191 }
192 }
193 Value::Set(set) => {
194 w.write_all(&[OP_SET])?;
195 write_ttl(w, ttl)?;
196 write_bytes(w, key)?;
197 w.write_all(&(set.len() as u32).to_le_bytes())?;
198 for m in set.iter() {
199 write_bytes(w, m.as_slice())?;
200 }
201 }
202 Value::ZSet(z) => {
203 w.write_all(&[OP_ZSET])?;
204 write_ttl(w, ttl)?;
205 write_bytes(w, key)?;
206 let entries: Vec<(&[u8], f64)> = z.ordered().collect();
207 w.write_all(&(entries.len() as u32).to_le_bytes())?;
208 for (m, score) in entries {
209 write_bytes(w, m)?;
210 w.write_all(&score.to_bits().to_le_bytes())?;
211 }
212 }
213 }
214 Ok(())
215}
216
217fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
218 match ttl {
219 Some(ms) => {
220 w.write_all(&[1u8])?;
221 w.write_all(&ms.to_le_bytes())?;
222 }
223 None => w.write_all(&[0u8])?,
224 }
225 Ok(())
226}
227
228fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
229 if read_u8(r)? == 1 {
230 Ok(Some(read_u64(r)?))
231 } else {
232 Ok(None)
233 }
234}
235
236fn tmp_path(path: &Path) -> std::path::PathBuf {
237 let mut s = path.as_os_str().to_owned();
238 s.push(".tmp");
239 s.into()
240}
241
242fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
243 w.write_all(&(b.len() as u32).to_le_bytes())?;
244 w.write_all(b)
245}
246
247fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
248 let len = read_u32(r)? as usize;
249 let mut buf = vec![0u8; len];
250 r.read_exact(&mut buf)?;
251 Ok(buf)
252}
253
254fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
255 let mut b = [0u8; 1];
256 r.read_exact(&mut b)?;
257 Ok(b[0])
258}
259
260fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
261 let mut b = [0u8; 4];
262 r.read_exact(&mut b)?;
263 Ok(u32::from_le_bytes(b))
264}
265
266fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
267 let mut b = [0u8; 8];
268 r.read_exact(&mut b)?;
269 Ok(u64::from_le_bytes(b))
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum Fsync {
277 Always,
279 EverySec,
281 No,
283}
284
285pub struct Aof {
297 file: BufWriter<File>,
298 path: PathBuf,
299 fsync: Fsync,
300 dirty: bool,
301 last_sync: Instant,
302 size_bytes: u64,
305 size_at_last_rewrite: u64,
308 rewrites_total: u64,
310}
311
312#[derive(Debug, Clone, Copy)]
315pub struct RewriteStats {
316 pub keys: u64,
318 pub bytes: u64,
320}
321
322impl Aof {
323 pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
325 let file = OpenOptions::new().create(true).append(true).open(path)?;
326 let size = file.metadata().map(|m| m.len()).unwrap_or(0);
327 Ok(Aof {
328 file: BufWriter::new(file),
329 path: path.to_path_buf(),
330 fsync,
331 dirty: false,
332 last_sync: Instant::now(),
333 size_bytes: size,
334 size_at_last_rewrite: size,
335 rewrites_total: 0,
336 })
337 }
338
339 pub fn append(&mut self, args: &Argv) -> io::Result<()> {
341 write_multibulk(&mut self.file, args)?;
342 self.size_bytes = self.size_bytes.saturating_add(estimate_multibulk_bytes(args));
343 match self.fsync {
344 Fsync::Always => {
345 self.file.flush()?;
346 self.file.get_ref().sync_data()?;
347 }
348 Fsync::EverySec | Fsync::No => self.dirty = true,
349 }
350 Ok(())
351 }
352
353 pub fn maybe_sync(&mut self) -> io::Result<()> {
355 if matches!(self.fsync, Fsync::EverySec)
356 && self.dirty
357 && self.last_sync.elapsed() >= Duration::from_secs(1)
358 {
359 self.file.flush()?;
360 self.file.get_ref().sync_data()?;
361 self.dirty = false;
362 self.last_sync = Instant::now();
363 }
364 Ok(())
365 }
366
367 pub fn truncate(&mut self) -> io::Result<()> {
369 self.file.flush()?;
370 let f = self.file.get_mut();
371 f.set_len(0)?;
372 f.seek(SeekFrom::Start(0))?; f.sync_all()?;
374 self.dirty = false;
375 self.size_bytes = 0;
376 self.size_at_last_rewrite = 0;
377 Ok(())
378 }
379
380 #[inline]
382 pub fn size_bytes(&self) -> u64 {
383 self.size_bytes
384 }
385
386 #[inline]
390 pub fn size_at_last_rewrite(&self) -> u64 {
391 self.size_at_last_rewrite
392 }
393
394 #[inline]
396 pub fn rewrites_total(&self) -> u64 {
397 self.rewrites_total
398 }
399
400 pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
414 self.file.flush()?;
417
418 let tmp = rewrite_tmp_path(&self.path);
419 let (keys, bytes) = dump_store_to_aof(&tmp, store)?;
420
421 std::fs::rename(&tmp, &self.path)?;
425 let f = OpenOptions::new().append(true).open(&self.path)?;
426 self.file = BufWriter::new(f);
427 self.size_bytes = bytes;
428 self.size_at_last_rewrite = bytes;
429 self.dirty = false;
430 self.rewrites_total = self.rewrites_total.saturating_add(1);
431 Ok(RewriteStats { keys, bytes })
432 }
433}
434
435fn rewrite_tmp_path(path: &Path) -> PathBuf {
437 let mut p = path.to_path_buf();
438 let new_name = match path.file_name() {
439 Some(n) => {
440 let mut s = n.to_os_string();
441 s.push(".rewrite");
442 s
443 }
444 None => std::ffi::OsString::from("aof.rewrite"),
445 };
446 p.set_file_name(new_name);
447 p
448}
449
450fn dump_store_to_aof(path: &Path, store: &Store) -> io::Result<(u64, u64)> {
453 let f = File::create(path)?;
454 let mut w = BufWriter::new(f);
455 let mut keys = 0u64;
456 let mut err: Option<io::Error> = None;
457 store.snapshot_each(|key, value, ttl_ms| {
458 if err.is_some() {
459 return;
460 }
461 if let Err(e) = write_value_as_commands(&mut w, key, value, ttl_ms) {
462 err = Some(e);
463 } else {
464 keys += 1;
465 }
466 });
467 if let Some(e) = err {
468 return Err(e);
469 }
470 w.flush()?;
471 let inner = w
472 .into_inner()
473 .map_err(|e| io::Error::other(e.to_string()))?;
474 let bytes = inner.metadata().map(|m| m.len()).unwrap_or(0);
475 inner.sync_all()?;
476 Ok((keys, bytes))
477}
478
479fn write_value_as_commands<W: Write>(
482 w: &mut W,
483 key: &[u8],
484 value: &Value,
485 ttl_ms: Option<u64>,
486) -> io::Result<()> {
487 match value {
488 Value::Str(s) => {
489 let argv = Argv::from(vec![b"SET".to_vec(), key.to_vec(), s.to_vec()]);
490 write_multibulk(w, &argv)?;
491 }
492 Value::Hash(h) => {
493 let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + h.len() * 2);
494 argv.push(b"HSET".to_vec());
495 argv.push(key.to_vec());
496 for (f, v) in h.iter() {
497 argv.push(f.to_vec());
498 argv.push(v.clone());
499 }
500 write_multibulk(w, &Argv::from(argv))?;
501 }
502 Value::List(l) => {
503 let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + l.len());
504 argv.push(b"RPUSH".to_vec());
505 argv.push(key.to_vec());
506 for v in l.iter() {
507 argv.push(v.clone());
508 }
509 write_multibulk(w, &Argv::from(argv))?;
510 }
511 Value::Set(s) => {
512 let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + s.len());
513 argv.push(b"SADD".to_vec());
514 argv.push(key.to_vec());
515 for m in s.iter() {
516 argv.push(m.to_vec());
517 }
518 write_multibulk(w, &Argv::from(argv))?;
519 }
520 Value::ZSet(z) => {
521 let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + z.ordered().count() * 2);
522 argv.push(b"ZADD".to_vec());
523 argv.push(key.to_vec());
524 for (m, sc) in z.ordered() {
525 argv.push(fmt_zset_score(sc));
526 argv.push(m.to_vec());
527 }
528 write_multibulk(w, &Argv::from(argv))?;
529 }
530 }
531 if let Some(ms) = ttl_ms {
532 let argv = Argv::from(vec![
533 b"PEXPIRE".to_vec(),
534 key.to_vec(),
535 ms.to_string().into_bytes(),
536 ]);
537 write_multibulk(w, &argv)?;
538 }
539 Ok(())
540}
541
542fn fmt_zset_score(s: f64) -> Vec<u8> {
547 if s.is_finite() && s == s.trunc() && s.abs() < 1e17 {
548 format!("{}", s as i64).into_bytes()
549 } else {
550 format!("{s:.17}").into_bytes()
551 }
552}
553
554fn estimate_multibulk_bytes(args: &Argv) -> u64 {
558 let mut n: u64 = 3 + decimal_digits(args.len() as u64) as u64;
559 for a in args.iter() {
560 n += 3 + decimal_digits(a.len() as u64) as u64 + a.len() as u64 + 2;
561 }
562 n
563}
564
565#[inline]
566fn decimal_digits(mut x: u64) -> u32 {
567 if x == 0 {
568 return 1;
569 }
570 let mut d = 0;
571 while x > 0 {
572 d += 1;
573 x /= 10;
574 }
575 d
576}
577
578pub fn replay_aof<F: FnMut(Argv)>(path: &Path, mut apply: F) -> io::Result<()> {
582 let mut data = Vec::new();
583 match File::open(path) {
584 Ok(mut f) => {
585 f.read_to_end(&mut data)?;
586 }
587 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
588 Err(e) => return Err(e),
589 }
590 let mut pos = 0;
591 while pos < data.len() {
592 match kevy_resp::parse_command(&data[pos..]) {
593 Ok(Some((args, consumed))) => {
594 apply(args);
595 pos += consumed;
596 }
597 Ok(None) | Err(_) => break,
599 }
600 }
601 Ok(())
602}
603
604fn write_multibulk<W: Write>(w: &mut W, args: &Argv) -> io::Result<()> {
605 write!(w, "*{}\r\n", args.len())?;
606 for a in args.iter() {
607 write!(w, "${}\r\n", a.len())?;
608 w.write_all(a)?;
609 w.write_all(b"\r\n")?;
610 }
611 Ok(())
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617 use std::time::Duration;
618
619 fn temp_file(name: &str) -> std::path::PathBuf {
620 let mut p = std::env::temp_dir();
621 let uniq = std::time::SystemTime::now()
622 .duration_since(std::time::UNIX_EPOCH)
623 .unwrap()
624 .as_nanos();
625 p.push(format!("kevy-{name}-{uniq}.rdb"));
626 p
627 }
628
629 #[test]
630 fn snapshot_round_trip() {
631 let path = temp_file("rt");
632
633 let mut src = Store::new();
634 src.set(b"plain", b"value".to_vec(), None, false, false);
635 src.set(b"empty", Vec::new(), None, false, false);
636 src.set(b"binary", vec![0u8, 1, 2, 255, 254], None, false, false);
637 src.set(
638 b"withttl",
639 b"soon".to_vec(),
640 Some(Duration::from_secs(100)),
641 false,
642 false,
643 );
644
645 save_snapshot(&src, &path).unwrap();
646
647 let mut dst = Store::new();
648 load_snapshot(&mut dst, &path).unwrap();
649
650 assert_eq!(dst.dbsize(), 4);
651 assert_eq!(dst.get(b"plain").unwrap(), Some(&b"value"[..]));
652 assert_eq!(dst.get(b"empty").unwrap(), Some(&b""[..]));
653 assert_eq!(
654 dst.get(b"binary").unwrap(),
655 Some(&[0u8, 1, 2, 255, 254][..])
656 );
657 assert_eq!(dst.get(b"withttl").unwrap(), Some(&b"soon"[..]));
658 assert!(dst.pttl(b"withttl") > 90_000);
660
661 let _ = std::fs::remove_file(&path);
662 }
663
664 #[test]
665 fn bad_magic_is_rejected() {
666 let path = temp_file("bad");
667 std::fs::write(&path, b"NOTKEVY!....").unwrap();
668 let mut dst = Store::new();
669 assert!(load_snapshot(&mut dst, &path).is_err());
670 let _ = std::fs::remove_file(&path);
671 }
672
673 #[test]
674 fn expired_keys_are_not_saved() {
675 let path = temp_file("exp");
676 let mut src = Store::new();
677 src.set(b"live", b"1".to_vec(), None, false, false);
678 src.set(
679 b"dead",
680 b"2".to_vec(),
681 Some(Duration::from_millis(1)),
682 false,
683 false,
684 );
685 std::thread::sleep(Duration::from_millis(8));
686
687 save_snapshot(&src, &path).unwrap();
688 let mut dst = Store::new();
689 load_snapshot(&mut dst, &path).unwrap();
690
691 assert_eq!(dst.dbsize(), 1);
692 assert_eq!(dst.get(b"live").unwrap(), Some(&b"1"[..]));
693 assert_eq!(dst.get(b"dead").unwrap(), None);
694 let _ = std::fs::remove_file(&path);
695 }
696
697 #[test]
698 fn hash_snapshot_round_trip() {
699 let path = temp_file("hashrt");
700 let mut src = Store::new();
701 src.hset(
702 b"h",
703 &[
704 (b"a".to_vec(), b"1".to_vec()),
705 (b"b".to_vec(), b"two".to_vec()),
706 ],
707 )
708 .unwrap();
709 src.set(b"s", b"str".to_vec(), None, false, false);
710 save_snapshot(&src, &path).unwrap();
711
712 let mut dst = Store::new();
713 load_snapshot(&mut dst, &path).unwrap();
714 assert_eq!(dst.type_of(b"h"), "hash");
715 assert_eq!(dst.hget(b"h", b"a").unwrap(), Some(&b"1"[..]));
716 assert_eq!(dst.hget(b"h", b"b").unwrap(), Some(&b"two"[..]));
717 assert_eq!(dst.hlen(b"h"), Ok(2));
718 assert_eq!(dst.get(b"s").unwrap(), Some(&b"str"[..]));
719 let _ = std::fs::remove_file(&path);
720 }
721
722 fn cmd(parts: &[&[u8]]) -> Argv {
723 Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>())
724 }
725
726 #[test]
727 fn aof_append_and_replay() {
728 let path = temp_file("aof");
729 {
730 let mut aof = Aof::open(&path, Fsync::Always).unwrap();
731 aof.append(&cmd(&[b"SET", b"a", b"1"])).unwrap();
732 aof.append(&cmd(&[b"INCR", b"a"])).unwrap();
733 aof.append(&cmd(&[b"SET", b"b", b"hello world"])).unwrap();
734 }
735 let mut got: Vec<Argv> = Vec::new();
736 replay_aof(&path, |args| got.push(args)).unwrap();
737 assert_eq!(got.len(), 3);
738 assert_eq!(got[0], cmd(&[b"SET", b"a", b"1"]));
739 assert_eq!(got[1], cmd(&[b"INCR", b"a"]));
740 assert_eq!(got[2], cmd(&[b"SET", b"b", b"hello world"]));
741 let _ = std::fs::remove_file(&path);
742 }
743
744 #[test]
745 fn aof_truncated_tail_ignored() {
746 let path = temp_file("aoftail");
747 {
748 let mut aof = Aof::open(&path, Fsync::No).unwrap();
749 aof.append(&cmd(&[b"SET", b"a", b"1"])).unwrap();
750 }
751 let mut f = OpenOptions::new().append(true).open(&path).unwrap();
753 f.write_all(b"*2\r\n$3\r\nSET\r\n$5\r\nhal").unwrap(); drop(f);
755
756 let mut got: Vec<Argv> = Vec::new();
757 replay_aof(&path, |args| got.push(args)).unwrap();
758 assert_eq!(got, vec![cmd(&[b"SET", b"a", b"1"])]); let _ = std::fs::remove_file(&path);
760 }
761
762 #[test]
763 fn aof_truncate_clears() {
764 let path = temp_file("aoftrunc");
765 let mut aof = Aof::open(&path, Fsync::No).unwrap();
766 aof.append(&cmd(&[b"SET", b"a", b"1"])).unwrap();
767 aof.truncate().unwrap();
768 aof.append(&cmd(&[b"SET", b"b", b"2"])).unwrap();
769 drop(aof);
770
771 let mut got: Vec<Argv> = Vec::new();
772 replay_aof(&path, |args| got.push(args)).unwrap();
773 assert_eq!(got, vec![cmd(&[b"SET", b"b", b"2"])]); let _ = std::fs::remove_file(&path);
775 }
776
777 #[test]
778 fn replay_missing_file_is_ok() {
779 let path = temp_file("nofile");
780 let mut n = 0;
781 replay_aof(&path, |_| n += 1).unwrap();
782 assert_eq!(n, 0);
783 }
784
785 #[test]
786 fn list_snapshot_round_trip() {
787 let path = temp_file("listrt");
788 let mut src = Store::new();
789 src.rpush(b"l", &[b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]).unwrap();
790 save_snapshot(&src, &path).unwrap();
791
792 let mut dst = Store::new();
793 load_snapshot(&mut dst, &path).unwrap();
794 assert_eq!(dst.type_of(b"l"), "list");
795 assert_eq!(dst.llen(b"l"), Ok(3));
796 assert_eq!(dst.lrange(b"l", 0, -1).unwrap(), vec![
797 b"a".to_vec(), b"b".to_vec(), b"c".to_vec()
798 ]);
799 let _ = std::fs::remove_file(&path);
800 }
801
802 #[test]
803 fn set_snapshot_round_trip() {
804 let path = temp_file("setrt");
805 let mut src = Store::new();
806 src.sadd(b"s", &[b"x".to_vec(), b"y".to_vec(), b"z".to_vec()]).unwrap();
807 save_snapshot(&src, &path).unwrap();
808
809 let mut dst = Store::new();
810 load_snapshot(&mut dst, &path).unwrap();
811 assert_eq!(dst.type_of(b"s"), "set");
812 assert_eq!(dst.scard(b"s"), Ok(3));
813 let mut members = dst.smembers(b"s").unwrap();
814 members.sort();
815 assert_eq!(members, vec![b"x".to_vec(), b"y".to_vec(), b"z".to_vec()]);
816 let _ = std::fs::remove_file(&path);
817 }
818
819 #[test]
820 fn zset_snapshot_round_trip() {
821 let path = temp_file("zsetrt");
822 let mut src = Store::new();
823 src.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec()), (0.5, b"c".to_vec())]).unwrap();
824 save_snapshot(&src, &path).unwrap();
825
826 let mut dst = Store::new();
827 load_snapshot(&mut dst, &path).unwrap();
828 assert_eq!(dst.type_of(b"z"), "zset");
829 assert_eq!(dst.zcard(b"z"), Ok(3));
830 let range = dst.zrange(b"z", 0, -1).unwrap();
832 assert_eq!(range, vec![
833 (b"c".to_vec(), 0.5),
834 (b"a".to_vec(), 1.0),
835 (b"b".to_vec(), 2.0),
836 ]);
837 let _ = std::fs::remove_file(&path);
838 }
839
840 #[test]
841 fn all_types_snapshot_round_trip() {
842 let path = temp_file("allrt");
843 let mut src = Store::new();
844 src.set(b"str", b"hello".to_vec(), None, false, false);
845 src.hset(b"hash", &[(b"f".to_vec(), b"v".to_vec())]).unwrap();
846 src.rpush(b"list", &[b"i".to_vec()]).unwrap();
847 src.sadd(b"set", &[b"m".to_vec()]).unwrap();
848 src.zadd(b"zset", &[(1.0, b"k".to_vec())]).unwrap();
849 save_snapshot(&src, &path).unwrap();
850
851 let mut dst = Store::new();
852 load_snapshot(&mut dst, &path).unwrap();
853 assert_eq!(dst.dbsize(), 5);
854 assert_eq!(dst.type_of(b"str"), "string");
855 assert_eq!(dst.type_of(b"hash"), "hash");
856 assert_eq!(dst.type_of(b"list"), "list");
857 assert_eq!(dst.type_of(b"set"), "set");
858 assert_eq!(dst.type_of(b"zset"), "zset");
859 let _ = std::fs::remove_file(&path);
860 }
861
862 fn apply_for_test(store: &mut Store, args: &Argv) {
869 let verb = args[0].to_ascii_uppercase();
870 match verb.as_slice() {
871 b"SET" => {
872 store.set(&args[1], args[2].to_vec(), None, false, false);
873 }
874 b"HSET" => {
875 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
876 let mut i = 2;
877 while i + 1 < args.len() {
878 pairs.push((args[i].to_vec(), args[i + 1].to_vec()));
879 i += 2;
880 }
881 store.hset(&args[1], &pairs).unwrap();
882 }
883 b"RPUSH" => {
884 let items: Vec<Vec<u8>> = args.iter().skip(2).map(|a| a.to_vec()).collect();
885 store.rpush(&args[1], &items).unwrap();
886 }
887 b"SADD" => {
888 let members: Vec<Vec<u8>> = args.iter().skip(2).map(|a| a.to_vec()).collect();
889 store.sadd(&args[1], &members).unwrap();
890 }
891 b"ZADD" => {
892 let mut pairs: Vec<(f64, Vec<u8>)> = Vec::new();
893 let mut i = 2;
894 while i + 1 < args.len() {
895 let score: f64 = std::str::from_utf8(&args[i]).unwrap().parse().unwrap();
896 pairs.push((score, args[i + 1].to_vec()));
897 i += 2;
898 }
899 store.zadd(&args[1], &pairs).unwrap();
900 }
901 b"PEXPIRE" => {
902 let ms: u64 = std::str::from_utf8(&args[2]).unwrap().parse().unwrap();
903 store.expire(&args[1], Duration::from_millis(ms));
904 }
905 other => panic!("unexpected verb in AOF rewrite: {:?}", String::from_utf8_lossy(other)),
906 }
907 }
908
909 fn temp_aof(name: &str) -> std::path::PathBuf {
910 let mut p = std::env::temp_dir();
911 let uniq = std::time::SystemTime::now()
912 .duration_since(std::time::UNIX_EPOCH)
913 .unwrap()
914 .as_nanos();
915 p.push(format!("kevy-{name}-{uniq}.aof"));
916 p
917 }
918
919 #[test]
920 fn rewrite_reconstructs_full_keyspace() {
921 let path = temp_aof("rewrite-all");
922
923 let mut src = Store::new();
924 src.set(b"str", b"hello".to_vec(), None, false, false);
925 src.set(b"binary", vec![0u8, 1, 2, 255], None, false, false);
926 src.hset(b"hash", &[(b"f1".to_vec(), b"v1".to_vec()), (b"f2".to_vec(), b"v2".to_vec())])
927 .unwrap();
928 src.rpush(b"list", &[b"i1".to_vec(), b"i2".to_vec(), b"i3".to_vec()])
929 .unwrap();
930 src.sadd(b"set", &[b"m1".to_vec(), b"m2".to_vec()]).unwrap();
931 src.zadd(b"zset", &[(1.5, b"a".to_vec()), (2.5, b"b".to_vec())])
932 .unwrap();
933 src.set(
934 b"ttl",
935 b"x".to_vec(),
936 Some(Duration::from_secs(3600)),
937 false,
938 false,
939 );
940
941 let mut aof = Aof::open(&path, Fsync::Always).unwrap();
942 let stats = aof.rewrite_from(&src).unwrap();
943 assert_eq!(stats.keys, 7);
944 assert!(stats.bytes > 0);
945 assert_eq!(aof.size_bytes(), stats.bytes);
946 assert_eq!(aof.size_at_last_rewrite(), stats.bytes);
947 assert_eq!(aof.rewrites_total(), 1);
948 drop(aof);
949
950 let mut dst = Store::new();
952 replay_aof(&path, |args| apply_for_test(&mut dst, &args)).unwrap();
953 assert_eq!(dst.dbsize(), 7);
954 assert_eq!(dst.get(b"str").unwrap(), Some(&b"hello"[..]));
955 assert_eq!(dst.get(b"binary").unwrap(), Some(&[0u8, 1, 2, 255][..]));
956 assert_eq!(dst.hget(b"hash", b"f1").unwrap(), Some(&b"v1"[..]));
957 assert_eq!(dst.hget(b"hash", b"f2").unwrap(), Some(&b"v2"[..]));
958 assert_eq!(dst.llen(b"list").unwrap(), 3);
959 assert_eq!(dst.scard(b"set").unwrap(), 2);
960 assert_eq!(dst.zcard(b"zset").unwrap(), 2);
961 assert!(dst.pttl(b"ttl") > 3_500_000); let _ = std::fs::remove_file(&path);
963 }
964
965 #[test]
966 fn rewrite_replaces_old_log_atomically() {
967 let path = temp_aof("rewrite-swap");
968
969 {
972 let mut aof = Aof::open(&path, Fsync::Always).unwrap();
973 for i in 0..50 {
974 let k = format!("k{i}");
975 let argv = Argv::from(vec![b"SET".to_vec(), k.into_bytes(), b"v".to_vec()]);
976 aof.append(&argv).unwrap();
977 }
978 }
979 let big_size = std::fs::metadata(&path).unwrap().len();
980 assert!(big_size > 0);
981
982 let mut store = Store::new();
984 store.set(b"only", b"value".to_vec(), None, false, false);
985 store.set(b"second", b"v2".to_vec(), None, false, false);
986 let mut aof = Aof::open(&path, Fsync::Always).unwrap();
987 let stats = aof.rewrite_from(&store).unwrap();
988 assert_eq!(stats.keys, 2);
989 let new_size = std::fs::metadata(&path).unwrap().len();
990 assert!(new_size < big_size, "rewrite should shrink: {new_size} vs {big_size}");
991
992 aof.append(&Argv::from(vec![b"SET".to_vec(), b"third".to_vec(), b"v".to_vec()]))
994 .unwrap();
995 drop(aof);
996
997 let mut dst = Store::new();
998 replay_aof(&path, |args| apply_for_test(&mut dst, &args)).unwrap();
999 assert_eq!(dst.dbsize(), 3, "rewrite + append should yield 3 keys");
1000 let _ = std::fs::remove_file(&path);
1001 }
1002
1003 #[test]
1004 fn append_bumps_size_estimate() {
1005 let path = temp_aof("size-est");
1006 let mut aof = Aof::open(&path, Fsync::No).unwrap();
1007 assert_eq!(aof.size_bytes(), 0);
1008 aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))
1009 .unwrap();
1010 let after_one = aof.size_bytes();
1011 assert!(after_one > 0);
1012 aof.append(&Argv::from(vec![b"SET".to_vec(), b"k2".to_vec(), b"v".to_vec()]))
1013 .unwrap();
1014 assert!(aof.size_bytes() > after_one);
1015 let _ = std::fs::remove_file(&path);
1016 }
1017
1018 #[test]
1019 fn rewrite_resets_size_anchor() {
1020 let path = temp_aof("size-anchor");
1021 let mut aof = Aof::open(&path, Fsync::Always).unwrap();
1022 for _ in 0..10 {
1023 aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))
1024 .unwrap();
1025 }
1026 assert!(aof.size_bytes() > aof.size_at_last_rewrite());
1027 let store = Store::new();
1028 let stats = aof.rewrite_from(&store).unwrap();
1029 assert_eq!(stats.keys, 0);
1031 assert_eq!(aof.size_bytes(), 0);
1032 assert_eq!(aof.size_at_last_rewrite(), 0);
1033 assert_eq!(aof.rewrites_total(), 1);
1034 let _ = std::fs::remove_file(&path);
1035 }
1036}