Skip to main content

kevy_persist/
lib.rs

1//! kevy-persist — durability for a [`kevy_store::Store`].
2//!
3//! Two mechanisms, both zero-dependency pure Rust over `std::fs`:
4//!
5//! - **Snapshot (RDB-style):** [`save_snapshot`] dumps a whole store to a temp
6//!   file then atomically renames it (fsync before rename); [`load_snapshot`]
7//!   restores it. A compact, type-tagged binary format.
8//! - **AOF:** an [`Aof`] append-only command log with a configurable fsync
9//!   policy; [`replay_aof`] re-applies it on startup, tolerating a truncated
10//!   trailing frame from a crash mid-write.
11//!
12//! In a shared-nothing runtime each shard persists its own store to its own
13//! file, so there is no cross-core coordination. Part of the [kevy] server.
14//!
15//! [kevy]: https://crates.io/crates/kevy
16//!
17//! # Example (AOF)
18//!
19//! ```
20//! use kevy_persist::{Aof, Argv, Fsync, replay_aof};
21//!
22//! # fn main() -> std::io::Result<()> {
23//! let path = std::env::temp_dir().join("kevy-persist-doctest.aof");
24//! # let _ = std::fs::remove_file(&path);
25//! {
26//!     let mut aof = Aof::open(&path, Fsync::No)?;
27//!     aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))?;
28//! } // flushed on drop
29//!
30//! let mut replayed: Vec<Argv> = Vec::new();
31//! replay_aof(&path, |args| replayed.push(args))?;
32//! assert_eq!(replayed, vec![vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]]);
33//! # std::fs::remove_file(&path).ok();
34//! # Ok(())
35//! # }
36//! ```
37#![forbid(unsafe_code)]
38
39pub use kevy_resp::Argv;
40use kevy_store::{Store, Value};
41// ZSet snapshot iterates ordered (member, score) pairs via `Value::ZSet`.
42use std::fs::{File, OpenOptions};
43use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
44use std::path::{Path, PathBuf};
45use std::time::{Duration, Instant};
46
47/// File magic + format version. Bump `VERSION` on any layout change.
48const MAGIC: &[u8; 8] = b"KEVYSNAP";
49const VERSION: u8 = 2;
50
51// Record opcodes (one per value type). Each record is:
52//   [op][ttl: u8 flag + optional u64][key][type payload]
53const OP_EOF: u8 = 0;
54const OP_STR: u8 = 1;
55const OP_HASH: u8 = 2;
56const OP_LIST: u8 = 3;
57const OP_SET: u8 = 4;
58const OP_ZSET: u8 = 5;
59
60/// Write a point-in-time snapshot of `store` to `path`, atomically: data is
61/// written to `<path>.tmp`, fsynced, then renamed over `path`.
62pub fn save_snapshot(store: &Store, path: &Path) -> io::Result<()> {
63    let tmp = tmp_path(path);
64    {
65        let mut w = BufWriter::new(File::create(&tmp)?);
66        w.write_all(MAGIC)?;
67        w.write_all(&[VERSION])?;
68        // `snapshot_each` is infallible; capture the first write error to surface.
69        let mut err: Option<io::Error> = None;
70        store.snapshot_each(|key, value, ttl| {
71            if err.is_none()
72                && let Err(e) = write_entry(&mut w, key, value, ttl)
73            {
74                err = Some(e);
75            }
76        });
77        if let Some(e) = err {
78            return Err(e);
79        }
80        w.write_all(&[OP_EOF])?;
81        w.flush()?;
82        w.get_ref().sync_all()?; // durably on disk before the rename
83    }
84    std::fs::rename(&tmp, path)
85}
86
87/// Load a snapshot from `path` into `store` (entries are inserted, not cleared
88/// first — call on a fresh store). Errors on a bad magic/version or truncation.
89pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
90    let mut r = BufReader::new(File::open(path)?);
91
92    let mut magic = [0u8; 8];
93    r.read_exact(&mut magic)?;
94    if &magic != MAGIC {
95        return Err(io::Error::new(
96            io::ErrorKind::InvalidData,
97            "kevy snapshot: bad magic",
98        ));
99    }
100    if read_u8(&mut r)? != VERSION {
101        return Err(io::Error::new(
102            io::ErrorKind::InvalidData,
103            "kevy snapshot: bad version",
104        ));
105    }
106
107    loop {
108        let op = read_u8(&mut r)?;
109        if op == OP_EOF {
110            return Ok(());
111        }
112        let ttl = read_ttl(&mut r)?;
113        let key = read_bytes(&mut r)?;
114        match op {
115            OP_STR => {
116                let val = read_bytes(&mut r)?;
117                store.load_str(key, val, ttl);
118            }
119            OP_HASH => {
120                let n = read_u32(&mut r)? as usize;
121                let mut fields = Vec::with_capacity(n);
122                for _ in 0..n {
123                    let f = read_bytes(&mut r)?;
124                    let v = read_bytes(&mut r)?;
125                    fields.push((f, v));
126                }
127                store.load_hash(key, fields, ttl);
128            }
129            OP_LIST => {
130                let n = read_u32(&mut r)? as usize;
131                let mut items = Vec::with_capacity(n);
132                for _ in 0..n {
133                    items.push(read_bytes(&mut r)?);
134                }
135                store.load_list(key, items, ttl);
136            }
137            OP_SET => {
138                let n = read_u32(&mut r)? as usize;
139                let mut members = Vec::with_capacity(n);
140                for _ in 0..n {
141                    members.push(read_bytes(&mut r)?);
142                }
143                store.load_set(key, members, ttl);
144            }
145            OP_ZSET => {
146                let n = read_u32(&mut r)? as usize;
147                let mut pairs = Vec::with_capacity(n);
148                for _ in 0..n {
149                    let m = read_bytes(&mut r)?;
150                    let score = f64::from_bits(read_u64(&mut r)?);
151                    pairs.push((m, score));
152                }
153                store.load_zset(key, pairs, ttl);
154            }
155            other => {
156                return Err(io::Error::new(
157                    io::ErrorKind::InvalidData,
158                    format!("kevy snapshot: unknown opcode {other}"),
159                ));
160            }
161        }
162    }
163}
164
165/// Serialize one entry: `[op][ttl][key][payload]`.
166fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
167    match value {
168        Value::Str(v) => {
169            w.write_all(&[OP_STR])?;
170            write_ttl(w, ttl)?;
171            write_bytes(w, key)?;
172            write_bytes(w, v.as_slice())?;
173        }
174        Value::Hash(h) => {
175            w.write_all(&[OP_HASH])?;
176            write_ttl(w, ttl)?;
177            write_bytes(w, key)?;
178            w.write_all(&(h.len() as u32).to_le_bytes())?;
179            for (f, v) in h.iter() {
180                write_bytes(w, f.as_slice())?;
181                write_bytes(w, v)?;
182            }
183        }
184        Value::List(l) => {
185            w.write_all(&[OP_LIST])?;
186            write_ttl(w, ttl)?;
187            write_bytes(w, key)?;
188            w.write_all(&(l.len() as u32).to_le_bytes())?;
189            for item in l.iter() {
190                write_bytes(w, item)?;
191            }
192        }
193        Value::Set(set) => {
194            w.write_all(&[OP_SET])?;
195            write_ttl(w, ttl)?;
196            write_bytes(w, key)?;
197            w.write_all(&(set.len() as u32).to_le_bytes())?;
198            for m in set.iter() {
199                write_bytes(w, m.as_slice())?;
200            }
201        }
202        Value::ZSet(z) => {
203            w.write_all(&[OP_ZSET])?;
204            write_ttl(w, ttl)?;
205            write_bytes(w, key)?;
206            let entries: Vec<(&[u8], f64)> = z.ordered().collect();
207            w.write_all(&(entries.len() as u32).to_le_bytes())?;
208            for (m, score) in entries {
209                write_bytes(w, m)?;
210                w.write_all(&score.to_bits().to_le_bytes())?;
211            }
212        }
213    }
214    Ok(())
215}
216
217fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
218    match ttl {
219        Some(ms) => {
220            w.write_all(&[1u8])?;
221            w.write_all(&ms.to_le_bytes())?;
222        }
223        None => w.write_all(&[0u8])?,
224    }
225    Ok(())
226}
227
228fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
229    if read_u8(r)? == 1 {
230        Ok(Some(read_u64(r)?))
231    } else {
232        Ok(None)
233    }
234}
235
236fn tmp_path(path: &Path) -> std::path::PathBuf {
237    let mut s = path.as_os_str().to_owned();
238    s.push(".tmp");
239    s.into()
240}
241
242fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
243    w.write_all(&(b.len() as u32).to_le_bytes())?;
244    w.write_all(b)
245}
246
247fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
248    let len = read_u32(r)? as usize;
249    let mut buf = vec![0u8; len];
250    r.read_exact(&mut buf)?;
251    Ok(buf)
252}
253
254fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
255    let mut b = [0u8; 1];
256    r.read_exact(&mut b)?;
257    Ok(b[0])
258}
259
260fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
261    let mut b = [0u8; 4];
262    r.read_exact(&mut b)?;
263    Ok(u32::from_le_bytes(b))
264}
265
266fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
267    let mut b = [0u8; 8];
268    r.read_exact(&mut b)?;
269    Ok(u64::from_le_bytes(b))
270}
271
272// ---- AOF (append-only file) ------------------------------------------------
273
274/// When to fsync the AOF to disk.
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum Fsync {
277    /// fsync after every write — safest, slowest.
278    Always,
279    /// fsync at most once per second (call [`Aof::maybe_sync`] periodically).
280    EverySec,
281    /// Never fsync explicitly; leave it to the OS.
282    No,
283}
284
285/// An append-only command log. Each write command is appended as a RESP
286/// multi-bulk frame; [`replay_aof`] re-applies them on startup.
287///
288/// Durability model (paired with snapshots): a snapshot taken at T0 plus the
289/// AOF of writes in (T0, now] reconstructs the current state. `SAVE` writes the
290/// snapshot then [`Aof::truncate`]s the log, so replay never double-applies.
291///
292/// Sizes (`size_bytes`, `size_at_last_rewrite`) drive auto-trigger of
293/// [`Aof::rewrite_from`] (BGREWRITEAOF) via the
294/// `auto_aof_rewrite_percentage` + `auto_aof_rewrite_min_size` knobs in
295/// `kevy_config`.
296pub struct Aof {
297    file: BufWriter<File>,
298    path: PathBuf,
299    fsync: Fsync,
300    dirty: bool,
301    last_sync: Instant,
302    /// Estimated bytes currently in the AOF file (existing + appended since
303    /// open). Maintained without fstat() syscalls per append.
304    size_bytes: u64,
305    /// File size right after the most recent [`Self::rewrite_from`] (or
306    /// `Self::open` if never rewritten). Anchor for `auto_aof_rewrite_*`.
307    size_at_last_rewrite: u64,
308    /// Total rewrites successfully completed since open. Surfaced via INFO.
309    rewrites_total: u64,
310}
311
312/// Result of an [`Aof::rewrite_from`] call. Surfaced by `BGREWRITEAOF` /
313/// `INFO persistence`.
314#[derive(Debug, Clone, Copy)]
315pub struct RewriteStats {
316    /// Keys dumped into the new AOF.
317    pub keys: u64,
318    /// New AOF size in bytes.
319    pub bytes: u64,
320}
321
322impl Aof {
323    /// Open (creating if needed) `path` for appending.
324    pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
325        let file = OpenOptions::new().create(true).append(true).open(path)?;
326        let size = file.metadata().map(|m| m.len()).unwrap_or(0);
327        Ok(Aof {
328            file: BufWriter::new(file),
329            path: path.to_path_buf(),
330            fsync,
331            dirty: false,
332            last_sync: Instant::now(),
333            size_bytes: size,
334            size_at_last_rewrite: size,
335            rewrites_total: 0,
336        })
337    }
338
339    /// Append one command, applying the fsync policy.
340    pub fn append(&mut self, args: &Argv) -> io::Result<()> {
341        write_multibulk(&mut self.file, args)?;
342        self.size_bytes = self.size_bytes.saturating_add(estimate_multibulk_bytes(args));
343        match self.fsync {
344            Fsync::Always => {
345                self.file.flush()?;
346                self.file.get_ref().sync_data()?;
347            }
348            Fsync::EverySec | Fsync::No => self.dirty = true,
349        }
350        Ok(())
351    }
352
353    /// Flush+fsync if the `EverySec` window has elapsed. Call once per loop tick.
354    pub fn maybe_sync(&mut self) -> io::Result<()> {
355        if matches!(self.fsync, Fsync::EverySec)
356            && self.dirty
357            && self.last_sync.elapsed() >= Duration::from_secs(1)
358        {
359            self.file.flush()?;
360            self.file.get_ref().sync_data()?;
361            self.dirty = false;
362            self.last_sync = Instant::now();
363        }
364        Ok(())
365    }
366
367    /// Empty the log (after a snapshot has captured the full state).
368    pub fn truncate(&mut self) -> io::Result<()> {
369        self.file.flush()?;
370        let f = self.file.get_mut();
371        f.set_len(0)?;
372        f.seek(SeekFrom::Start(0))?; // harmless under O_APPEND; keeps len/pos coherent
373        f.sync_all()?;
374        self.dirty = false;
375        self.size_bytes = 0;
376        self.size_at_last_rewrite = 0;
377        Ok(())
378    }
379
380    /// Estimated current AOF size in bytes (file content as of last append).
381    #[inline]
382    pub fn size_bytes(&self) -> u64 {
383        self.size_bytes
384    }
385
386    /// AOF size at the most recent rewrite (or open). Auto-trigger compares
387    /// `(size_bytes - size_at_last_rewrite) * 100 / size_at_last_rewrite` to
388    /// the `auto_aof_rewrite_percentage` knob.
389    #[inline]
390    pub fn size_at_last_rewrite(&self) -> u64 {
391        self.size_at_last_rewrite
392    }
393
394    /// Successful rewrite count since `Self::open`. Surfaced in INFO.
395    #[inline]
396    pub fn rewrites_total(&self) -> u64 {
397        self.rewrites_total
398    }
399
400    /// BGREWRITEAOF: rebuild a compact AOF from `store`'s current state and
401    /// atomically swap it in.
402    ///
403    /// **v1.0 is synchronous** — the calling shard blocks for the rewrite's
404    /// duration. Each shard owns its own AOF, so the shards' rewrites
405    /// proceed independently; per-shard blocking matches Redis's `BGSAVE`
406    /// cost in a typical single-key-per-shard workload. Concurrent
407    /// (rewrite-during-writes) incrementalisation is a v1.x perf item.
408    ///
409    /// Writes to a `<path>.rewrite` temp file with fsync, then `rename(2)`s
410    /// it over the live AOF. The append handle is reopened against the new
411    /// file before this call returns, so subsequent `append` calls land in
412    /// the rewritten log.
413    pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
414        // Flush any pending writes to the OLD file first so the snapshot
415        // accounts for everything the caller intended to durabilise.
416        self.file.flush()?;
417
418        let tmp = rewrite_tmp_path(&self.path);
419        let (keys, bytes) = dump_store_to_aof(&tmp, store)?;
420
421        // Atomic replacement. After this, the OLD file descriptor in
422        // `self.file` is open against an unlinked inode; new writes would
423        // go nowhere visible. Reopen against the new path.
424        std::fs::rename(&tmp, &self.path)?;
425        let f = OpenOptions::new().append(true).open(&self.path)?;
426        self.file = BufWriter::new(f);
427        self.size_bytes = bytes;
428        self.size_at_last_rewrite = bytes;
429        self.dirty = false;
430        self.rewrites_total = self.rewrites_total.saturating_add(1);
431        Ok(RewriteStats { keys, bytes })
432    }
433}
434
435/// `<aof>.rewrite` — same-directory temp path so `rename(2)` stays atomic.
436fn rewrite_tmp_path(path: &Path) -> PathBuf {
437    let mut p = path.to_path_buf();
438    let new_name = match path.file_name() {
439        Some(n) => {
440            let mut s = n.to_os_string();
441            s.push(".rewrite");
442            s
443        }
444        None => std::ffi::OsString::from("aof.rewrite"),
445    };
446    p.set_file_name(new_name);
447    p
448}
449
450/// Write `store`'s current state to `path` as a sequence of mutating RESP
451/// commands; flush + fsync before returning. Returns `(keys, bytes)`.
452fn dump_store_to_aof(path: &Path, store: &Store) -> io::Result<(u64, u64)> {
453    let f = File::create(path)?;
454    let mut w = BufWriter::new(f);
455    let mut keys = 0u64;
456    let mut err: Option<io::Error> = None;
457    store.snapshot_each(|key, value, ttl_ms| {
458        if err.is_some() {
459            return;
460        }
461        if let Err(e) = write_value_as_commands(&mut w, key, value, ttl_ms) {
462            err = Some(e);
463        } else {
464            keys += 1;
465        }
466    });
467    if let Some(e) = err {
468        return Err(e);
469    }
470    w.flush()?;
471    let inner = w
472        .into_inner()
473        .map_err(|e| io::Error::other(e.to_string()))?;
474    let bytes = inner.metadata().map(|m| m.len()).unwrap_or(0);
475    inner.sync_all()?;
476    Ok((keys, bytes))
477}
478
479/// Emit one (or two, if TTL'd) RESP write commands that, when replayed,
480/// reconstruct `key`'s `value` and TTL exactly.
481fn write_value_as_commands<W: Write>(
482    w: &mut W,
483    key: &[u8],
484    value: &Value,
485    ttl_ms: Option<u64>,
486) -> io::Result<()> {
487    match value {
488        Value::Str(s) => {
489            let argv = Argv::from(vec![b"SET".to_vec(), key.to_vec(), s.to_vec()]);
490            write_multibulk(w, &argv)?;
491        }
492        Value::Hash(h) => {
493            let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + h.len() * 2);
494            argv.push(b"HSET".to_vec());
495            argv.push(key.to_vec());
496            for (f, v) in h.iter() {
497                argv.push(f.to_vec());
498                argv.push(v.clone());
499            }
500            write_multibulk(w, &Argv::from(argv))?;
501        }
502        Value::List(l) => {
503            let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + l.len());
504            argv.push(b"RPUSH".to_vec());
505            argv.push(key.to_vec());
506            for v in l.iter() {
507                argv.push(v.clone());
508            }
509            write_multibulk(w, &Argv::from(argv))?;
510        }
511        Value::Set(s) => {
512            let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + s.len());
513            argv.push(b"SADD".to_vec());
514            argv.push(key.to_vec());
515            for m in s.iter() {
516                argv.push(m.to_vec());
517            }
518            write_multibulk(w, &Argv::from(argv))?;
519        }
520        Value::ZSet(z) => {
521            let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + z.ordered().count() * 2);
522            argv.push(b"ZADD".to_vec());
523            argv.push(key.to_vec());
524            for (m, sc) in z.ordered() {
525                argv.push(fmt_zset_score(sc));
526                argv.push(m.to_vec());
527            }
528            write_multibulk(w, &Argv::from(argv))?;
529        }
530    }
531    if let Some(ms) = ttl_ms {
532        let argv = Argv::from(vec![
533            b"PEXPIRE".to_vec(),
534            key.to_vec(),
535            ms.to_string().into_bytes(),
536        ]);
537        write_multibulk(w, &argv)?;
538    }
539    Ok(())
540}
541
542/// Format a sorted-set score the way Redis does (no trailing `.0` for
543/// integers; up to 17 sig figs for non-integer doubles). Tests want the
544/// replay-roundtrip to compare byte-equal, so don't introduce locale
545/// differences (`format!` is locale-free here).
546fn fmt_zset_score(s: f64) -> Vec<u8> {
547    if s.is_finite() && s == s.trunc() && s.abs() < 1e17 {
548        format!("{}", s as i64).into_bytes()
549    } else {
550        format!("{s:.17}").into_bytes()
551    }
552}
553
554/// Cheap byte-count estimator for a single multi-bulk frame:
555/// `*<n>\r\n` + per-arg `$<len>\r\n<bytes>\r\n`. No allocation, no
556/// double-pass — accurate to within a couple of bytes per arg.
557fn estimate_multibulk_bytes(args: &Argv) -> u64 {
558    let mut n: u64 = 3 + decimal_digits(args.len() as u64) as u64;
559    for a in args.iter() {
560        n += 3 + decimal_digits(a.len() as u64) as u64 + a.len() as u64 + 2;
561    }
562    n
563}
564
565#[inline]
566fn decimal_digits(mut x: u64) -> u32 {
567    if x == 0 {
568        return 1;
569    }
570    let mut d = 0;
571    while x > 0 {
572        d += 1;
573        x /= 10;
574    }
575    d
576}
577
578/// Replay the command log at `path`, calling `apply` for each complete command.
579/// A truncated or corrupt trailing frame (e.g. a crash mid-append) is ignored.
580/// A missing file is treated as an empty log.
581pub fn replay_aof<F: FnMut(Argv)>(path: &Path, mut apply: F) -> io::Result<()> {
582    let mut data = Vec::new();
583    match File::open(path) {
584        Ok(mut f) => {
585            f.read_to_end(&mut data)?;
586        }
587        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
588        Err(e) => return Err(e),
589    }
590    let mut pos = 0;
591    while pos < data.len() {
592        match kevy_resp::parse_command(&data[pos..]) {
593            Ok(Some((args, consumed))) => {
594                apply(args);
595                pos += consumed;
596            }
597            // Incomplete or corrupt tail — stop; the prefix is intact.
598            Ok(None) | Err(_) => break,
599        }
600    }
601    Ok(())
602}
603
604fn write_multibulk<W: Write>(w: &mut W, args: &Argv) -> io::Result<()> {
605    write!(w, "*{}\r\n", args.len())?;
606    for a in args.iter() {
607        write!(w, "${}\r\n", a.len())?;
608        w.write_all(a)?;
609        w.write_all(b"\r\n")?;
610    }
611    Ok(())
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617    use std::time::Duration;
618
619    fn temp_file(name: &str) -> std::path::PathBuf {
620        let mut p = std::env::temp_dir();
621        let uniq = std::time::SystemTime::now()
622            .duration_since(std::time::UNIX_EPOCH)
623            .unwrap()
624            .as_nanos();
625        p.push(format!("kevy-{name}-{uniq}.rdb"));
626        p
627    }
628
629    #[test]
630    fn snapshot_round_trip() {
631        let path = temp_file("rt");
632
633        let mut src = Store::new();
634        src.set(b"plain", b"value".to_vec(), None, false, false);
635        src.set(b"empty", Vec::new(), None, false, false);
636        src.set(b"binary", vec![0u8, 1, 2, 255, 254], None, false, false);
637        src.set(
638            b"withttl",
639            b"soon".to_vec(),
640            Some(Duration::from_secs(100)),
641            false,
642            false,
643        );
644
645        save_snapshot(&src, &path).unwrap();
646
647        let mut dst = Store::new();
648        load_snapshot(&mut dst, &path).unwrap();
649
650        assert_eq!(dst.dbsize(), 4);
651        assert_eq!(dst.get(b"plain").unwrap(), Some(&b"value"[..]));
652        assert_eq!(dst.get(b"empty").unwrap(), Some(&b""[..]));
653        assert_eq!(
654            dst.get(b"binary").unwrap(),
655            Some(&[0u8, 1, 2, 255, 254][..])
656        );
657        assert_eq!(dst.get(b"withttl").unwrap(), Some(&b"soon"[..]));
658        // TTL survived (stored as remaining-ms, restored as a fresh deadline).
659        assert!(dst.pttl(b"withttl") > 90_000);
660
661        let _ = std::fs::remove_file(&path);
662    }
663
664    #[test]
665    fn bad_magic_is_rejected() {
666        let path = temp_file("bad");
667        std::fs::write(&path, b"NOTKEVY!....").unwrap();
668        let mut dst = Store::new();
669        assert!(load_snapshot(&mut dst, &path).is_err());
670        let _ = std::fs::remove_file(&path);
671    }
672
673    #[test]
674    fn expired_keys_are_not_saved() {
675        let path = temp_file("exp");
676        let mut src = Store::new();
677        src.set(b"live", b"1".to_vec(), None, false, false);
678        src.set(
679            b"dead",
680            b"2".to_vec(),
681            Some(Duration::from_millis(1)),
682            false,
683            false,
684        );
685        std::thread::sleep(Duration::from_millis(8));
686
687        save_snapshot(&src, &path).unwrap();
688        let mut dst = Store::new();
689        load_snapshot(&mut dst, &path).unwrap();
690
691        assert_eq!(dst.dbsize(), 1);
692        assert_eq!(dst.get(b"live").unwrap(), Some(&b"1"[..]));
693        assert_eq!(dst.get(b"dead").unwrap(), None);
694        let _ = std::fs::remove_file(&path);
695    }
696
697    #[test]
698    fn hash_snapshot_round_trip() {
699        let path = temp_file("hashrt");
700        let mut src = Store::new();
701        src.hset(
702            b"h",
703            &[
704                (b"a".to_vec(), b"1".to_vec()),
705                (b"b".to_vec(), b"two".to_vec()),
706            ],
707        )
708        .unwrap();
709        src.set(b"s", b"str".to_vec(), None, false, false);
710        save_snapshot(&src, &path).unwrap();
711
712        let mut dst = Store::new();
713        load_snapshot(&mut dst, &path).unwrap();
714        assert_eq!(dst.type_of(b"h"), "hash");
715        assert_eq!(dst.hget(b"h", b"a").unwrap(), Some(&b"1"[..]));
716        assert_eq!(dst.hget(b"h", b"b").unwrap(), Some(&b"two"[..]));
717        assert_eq!(dst.hlen(b"h"), Ok(2));
718        assert_eq!(dst.get(b"s").unwrap(), Some(&b"str"[..]));
719        let _ = std::fs::remove_file(&path);
720    }
721
722    fn cmd(parts: &[&[u8]]) -> Argv {
723        Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>())
724    }
725
726    #[test]
727    fn aof_append_and_replay() {
728        let path = temp_file("aof");
729        {
730            let mut aof = Aof::open(&path, Fsync::Always).unwrap();
731            aof.append(&cmd(&[b"SET", b"a", b"1"])).unwrap();
732            aof.append(&cmd(&[b"INCR", b"a"])).unwrap();
733            aof.append(&cmd(&[b"SET", b"b", b"hello world"])).unwrap();
734        }
735        let mut got: Vec<Argv> = Vec::new();
736        replay_aof(&path, |args| got.push(args)).unwrap();
737        assert_eq!(got.len(), 3);
738        assert_eq!(got[0], cmd(&[b"SET", b"a", b"1"]));
739        assert_eq!(got[1], cmd(&[b"INCR", b"a"]));
740        assert_eq!(got[2], cmd(&[b"SET", b"b", b"hello world"]));
741        let _ = std::fs::remove_file(&path);
742    }
743
744    #[test]
745    fn aof_truncated_tail_ignored() {
746        let path = temp_file("aoftail");
747        {
748            let mut aof = Aof::open(&path, Fsync::No).unwrap();
749            aof.append(&cmd(&[b"SET", b"a", b"1"])).unwrap();
750        }
751        // Simulate a crash mid-append: a partial frame at the end.
752        let mut f = OpenOptions::new().append(true).open(&path).unwrap();
753        f.write_all(b"*2\r\n$3\r\nSET\r\n$5\r\nhal").unwrap(); // truncated
754        drop(f);
755
756        let mut got: Vec<Argv> = Vec::new();
757        replay_aof(&path, |args| got.push(args)).unwrap();
758        assert_eq!(got, vec![cmd(&[b"SET", b"a", b"1"])]); // only the complete frame
759        let _ = std::fs::remove_file(&path);
760    }
761
762    #[test]
763    fn aof_truncate_clears() {
764        let path = temp_file("aoftrunc");
765        let mut aof = Aof::open(&path, Fsync::No).unwrap();
766        aof.append(&cmd(&[b"SET", b"a", b"1"])).unwrap();
767        aof.truncate().unwrap();
768        aof.append(&cmd(&[b"SET", b"b", b"2"])).unwrap();
769        drop(aof);
770
771        let mut got: Vec<Argv> = Vec::new();
772        replay_aof(&path, |args| got.push(args)).unwrap();
773        assert_eq!(got, vec![cmd(&[b"SET", b"b", b"2"])]); // pre-truncate write gone
774        let _ = std::fs::remove_file(&path);
775    }
776
777    #[test]
778    fn replay_missing_file_is_ok() {
779        let path = temp_file("nofile");
780        let mut n = 0;
781        replay_aof(&path, |_| n += 1).unwrap();
782        assert_eq!(n, 0);
783    }
784
785    #[test]
786    fn list_snapshot_round_trip() {
787        let path = temp_file("listrt");
788        let mut src = Store::new();
789        src.rpush(b"l", &[b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]).unwrap();
790        save_snapshot(&src, &path).unwrap();
791
792        let mut dst = Store::new();
793        load_snapshot(&mut dst, &path).unwrap();
794        assert_eq!(dst.type_of(b"l"), "list");
795        assert_eq!(dst.llen(b"l"), Ok(3));
796        assert_eq!(dst.lrange(b"l", 0, -1).unwrap(), vec![
797            b"a".to_vec(), b"b".to_vec(), b"c".to_vec()
798        ]);
799        let _ = std::fs::remove_file(&path);
800    }
801
802    #[test]
803    fn set_snapshot_round_trip() {
804        let path = temp_file("setrt");
805        let mut src = Store::new();
806        src.sadd(b"s", &[b"x".to_vec(), b"y".to_vec(), b"z".to_vec()]).unwrap();
807        save_snapshot(&src, &path).unwrap();
808
809        let mut dst = Store::new();
810        load_snapshot(&mut dst, &path).unwrap();
811        assert_eq!(dst.type_of(b"s"), "set");
812        assert_eq!(dst.scard(b"s"), Ok(3));
813        let mut members = dst.smembers(b"s").unwrap();
814        members.sort();
815        assert_eq!(members, vec![b"x".to_vec(), b"y".to_vec(), b"z".to_vec()]);
816        let _ = std::fs::remove_file(&path);
817    }
818
819    #[test]
820    fn zset_snapshot_round_trip() {
821        let path = temp_file("zsetrt");
822        let mut src = Store::new();
823        src.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec()), (0.5, b"c".to_vec())]).unwrap();
824        save_snapshot(&src, &path).unwrap();
825
826        let mut dst = Store::new();
827        load_snapshot(&mut dst, &path).unwrap();
828        assert_eq!(dst.type_of(b"z"), "zset");
829        assert_eq!(dst.zcard(b"z"), Ok(3));
830        // Ascending score order: c(0.5), a(1.0), b(2.0)
831        let range = dst.zrange(b"z", 0, -1).unwrap();
832        assert_eq!(range, vec![
833            (b"c".to_vec(), 0.5),
834            (b"a".to_vec(), 1.0),
835            (b"b".to_vec(), 2.0),
836        ]);
837        let _ = std::fs::remove_file(&path);
838    }
839
840    #[test]
841    fn all_types_snapshot_round_trip() {
842        let path = temp_file("allrt");
843        let mut src = Store::new();
844        src.set(b"str", b"hello".to_vec(), None, false, false);
845        src.hset(b"hash", &[(b"f".to_vec(), b"v".to_vec())]).unwrap();
846        src.rpush(b"list", &[b"i".to_vec()]).unwrap();
847        src.sadd(b"set", &[b"m".to_vec()]).unwrap();
848        src.zadd(b"zset", &[(1.0, b"k".to_vec())]).unwrap();
849        save_snapshot(&src, &path).unwrap();
850
851        let mut dst = Store::new();
852        load_snapshot(&mut dst, &path).unwrap();
853        assert_eq!(dst.dbsize(), 5);
854        assert_eq!(dst.type_of(b"str"), "string");
855        assert_eq!(dst.type_of(b"hash"), "hash");
856        assert_eq!(dst.type_of(b"list"), "list");
857        assert_eq!(dst.type_of(b"set"), "set");
858        assert_eq!(dst.type_of(b"zset"), "zset");
859        let _ = std::fs::remove_file(&path);
860    }
861
862    // ───────────── AOF rewrite (Wave 2 #3) ─────────────
863
864    /// Tiny dispatch helper for AOF-rewrite roundtrip tests: turn the
865    /// canonical mutating verbs the rewriter emits back into Store mutations.
866    /// Mirrors a subset of `kevy::dispatch` — enough for the verbs
867    /// `dump_store_to_aof` actually emits.
868    fn apply_for_test(store: &mut Store, args: &Argv) {
869        let verb = args[0].to_ascii_uppercase();
870        match verb.as_slice() {
871            b"SET" => {
872                store.set(&args[1], args[2].to_vec(), None, false, false);
873            }
874            b"HSET" => {
875                let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
876                let mut i = 2;
877                while i + 1 < args.len() {
878                    pairs.push((args[i].to_vec(), args[i + 1].to_vec()));
879                    i += 2;
880                }
881                store.hset(&args[1], &pairs).unwrap();
882            }
883            b"RPUSH" => {
884                let items: Vec<Vec<u8>> = args.iter().skip(2).map(|a| a.to_vec()).collect();
885                store.rpush(&args[1], &items).unwrap();
886            }
887            b"SADD" => {
888                let members: Vec<Vec<u8>> = args.iter().skip(2).map(|a| a.to_vec()).collect();
889                store.sadd(&args[1], &members).unwrap();
890            }
891            b"ZADD" => {
892                let mut pairs: Vec<(f64, Vec<u8>)> = Vec::new();
893                let mut i = 2;
894                while i + 1 < args.len() {
895                    let score: f64 = std::str::from_utf8(&args[i]).unwrap().parse().unwrap();
896                    pairs.push((score, args[i + 1].to_vec()));
897                    i += 2;
898                }
899                store.zadd(&args[1], &pairs).unwrap();
900            }
901            b"PEXPIRE" => {
902                let ms: u64 = std::str::from_utf8(&args[2]).unwrap().parse().unwrap();
903                store.expire(&args[1], Duration::from_millis(ms));
904            }
905            other => panic!("unexpected verb in AOF rewrite: {:?}", String::from_utf8_lossy(other)),
906        }
907    }
908
909    fn temp_aof(name: &str) -> std::path::PathBuf {
910        let mut p = std::env::temp_dir();
911        let uniq = std::time::SystemTime::now()
912            .duration_since(std::time::UNIX_EPOCH)
913            .unwrap()
914            .as_nanos();
915        p.push(format!("kevy-{name}-{uniq}.aof"));
916        p
917    }
918
919    #[test]
920    fn rewrite_reconstructs_full_keyspace() {
921        let path = temp_aof("rewrite-all");
922
923        let mut src = Store::new();
924        src.set(b"str", b"hello".to_vec(), None, false, false);
925        src.set(b"binary", vec![0u8, 1, 2, 255], None, false, false);
926        src.hset(b"hash", &[(b"f1".to_vec(), b"v1".to_vec()), (b"f2".to_vec(), b"v2".to_vec())])
927            .unwrap();
928        src.rpush(b"list", &[b"i1".to_vec(), b"i2".to_vec(), b"i3".to_vec()])
929            .unwrap();
930        src.sadd(b"set", &[b"m1".to_vec(), b"m2".to_vec()]).unwrap();
931        src.zadd(b"zset", &[(1.5, b"a".to_vec()), (2.5, b"b".to_vec())])
932            .unwrap();
933        src.set(
934            b"ttl",
935            b"x".to_vec(),
936            Some(Duration::from_secs(3600)),
937            false,
938            false,
939        );
940
941        let mut aof = Aof::open(&path, Fsync::Always).unwrap();
942        let stats = aof.rewrite_from(&src).unwrap();
943        assert_eq!(stats.keys, 7);
944        assert!(stats.bytes > 0);
945        assert_eq!(aof.size_bytes(), stats.bytes);
946        assert_eq!(aof.size_at_last_rewrite(), stats.bytes);
947        assert_eq!(aof.rewrites_total(), 1);
948        drop(aof);
949
950        // Replay into a fresh store; both should match.
951        let mut dst = Store::new();
952        replay_aof(&path, |args| apply_for_test(&mut dst, &args)).unwrap();
953        assert_eq!(dst.dbsize(), 7);
954        assert_eq!(dst.get(b"str").unwrap(), Some(&b"hello"[..]));
955        assert_eq!(dst.get(b"binary").unwrap(), Some(&[0u8, 1, 2, 255][..]));
956        assert_eq!(dst.hget(b"hash", b"f1").unwrap(), Some(&b"v1"[..]));
957        assert_eq!(dst.hget(b"hash", b"f2").unwrap(), Some(&b"v2"[..]));
958        assert_eq!(dst.llen(b"list").unwrap(), 3);
959        assert_eq!(dst.scard(b"set").unwrap(), 2);
960        assert_eq!(dst.zcard(b"zset").unwrap(), 2);
961        assert!(dst.pttl(b"ttl") > 3_500_000); // TTL survived
962        let _ = std::fs::remove_file(&path);
963    }
964
965    #[test]
966    fn rewrite_replaces_old_log_atomically() {
967        let path = temp_aof("rewrite-swap");
968
969        // Step 1: a stale AOF with many entries (simulating long-running
970        // history). After rewrite the new AOF must NOT carry these.
971        {
972            let mut aof = Aof::open(&path, Fsync::Always).unwrap();
973            for i in 0..50 {
974                let k = format!("k{i}");
975                let argv = Argv::from(vec![b"SET".to_vec(), k.into_bytes(), b"v".to_vec()]);
976                aof.append(&argv).unwrap();
977            }
978        }
979        let big_size = std::fs::metadata(&path).unwrap().len();
980        assert!(big_size > 0);
981
982        // Step 2: in-memory state is small (only 2 keys).
983        let mut store = Store::new();
984        store.set(b"only", b"value".to_vec(), None, false, false);
985        store.set(b"second", b"v2".to_vec(), None, false, false);
986        let mut aof = Aof::open(&path, Fsync::Always).unwrap();
987        let stats = aof.rewrite_from(&store).unwrap();
988        assert_eq!(stats.keys, 2);
989        let new_size = std::fs::metadata(&path).unwrap().len();
990        assert!(new_size < big_size, "rewrite should shrink: {new_size} vs {big_size}");
991
992        // Step 3: appending after rewrite lands in the new file.
993        aof.append(&Argv::from(vec![b"SET".to_vec(), b"third".to_vec(), b"v".to_vec()]))
994            .unwrap();
995        drop(aof);
996
997        let mut dst = Store::new();
998        replay_aof(&path, |args| apply_for_test(&mut dst, &args)).unwrap();
999        assert_eq!(dst.dbsize(), 3, "rewrite + append should yield 3 keys");
1000        let _ = std::fs::remove_file(&path);
1001    }
1002
1003    #[test]
1004    fn append_bumps_size_estimate() {
1005        let path = temp_aof("size-est");
1006        let mut aof = Aof::open(&path, Fsync::No).unwrap();
1007        assert_eq!(aof.size_bytes(), 0);
1008        aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))
1009            .unwrap();
1010        let after_one = aof.size_bytes();
1011        assert!(after_one > 0);
1012        aof.append(&Argv::from(vec![b"SET".to_vec(), b"k2".to_vec(), b"v".to_vec()]))
1013            .unwrap();
1014        assert!(aof.size_bytes() > after_one);
1015        let _ = std::fs::remove_file(&path);
1016    }
1017
1018    #[test]
1019    fn rewrite_resets_size_anchor() {
1020        let path = temp_aof("size-anchor");
1021        let mut aof = Aof::open(&path, Fsync::Always).unwrap();
1022        for _ in 0..10 {
1023            aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))
1024                .unwrap();
1025        }
1026        assert!(aof.size_bytes() > aof.size_at_last_rewrite());
1027        let store = Store::new();
1028        let stats = aof.rewrite_from(&store).unwrap();
1029        // empty store ⇒ empty rewrite
1030        assert_eq!(stats.keys, 0);
1031        assert_eq!(aof.size_bytes(), 0);
1032        assert_eq!(aof.size_at_last_rewrite(), 0);
1033        assert_eq!(aof.rewrites_total(), 1);
1034        let _ = std::fs::remove_file(&path);
1035    }
1036}