Skip to main content

kevy_persist/
lib.rs

1//! kevy-persist — durability for a [`kevy_store::Store`].
2//!
3//! Two mechanisms, both zero-dependency pure Rust over `std::fs`:
4//!
5//! - **Snapshot (RDB-style):** [`save_snapshot`] dumps a whole store to a temp
6//!   file then atomically renames it (fsync before rename); [`load_snapshot`]
7//!   restores it. A compact, type-tagged binary format.
8//! - **AOF:** an [`Aof`] append-only command log with a configurable fsync
9//!   policy; [`replay_aof`] re-applies it on startup, tolerating a truncated
10//!   trailing frame from a crash mid-write.
11//!
12//! In a shared-nothing runtime each shard persists its own store to its own
13//! file, so there is no cross-core coordination. Part of the [kevy] server.
14//!
15//! [kevy]: https://crates.io/crates/kevy
16//!
17//! # Example (AOF)
18//!
19//! ```
20//! use kevy_persist::{Aof, Argv, Fsync, replay_aof};
21//!
22//! # fn main() -> std::io::Result<()> {
23//! let path = std::env::temp_dir().join("kevy-persist-doctest.aof");
24//! # let _ = std::fs::remove_file(&path);
25//! {
26//!     let mut aof = Aof::open(&path, Fsync::No)?;
27//!     aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))?;
28//! } // flushed on drop
29//!
30//! let mut replayed: Vec<Argv> = Vec::new();
31//! replay_aof(&path, |args| replayed.push(args))?;
32//! assert_eq!(replayed, vec![vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]]);
33//! # std::fs::remove_file(&path).ok();
34//! # Ok(())
35//! # }
36//! ```
37#![forbid(unsafe_code)]
38
39mod aof;
40pub mod layout;
41mod replay;
42pub mod reshard;
43mod rewrite_fmt;
44mod shards_meta;
45
46pub use aof::{Aof, Fsync, RewritePlan, RewriteStats, write_aof_base};
47pub use replay::replay_aof;
48pub use shards_meta::{Routing, ShardsMeta, read_shards_meta, write_shards_meta};
49pub use kevy_resp::{Argv, ArgvView};
50pub use rewrite_fmt::dump_aof;
51pub(crate) use rewrite_fmt::{dump_store_to_buf, estimate_multibulk_bytes, write_multibulk};
52use kevy_store::Store;
53use kevy_store::Value;
54// ZSet snapshot iterates ordered (member, score) pairs via `Value::ZSet`.
55use std::fs::File;
56use std::io::{self, BufReader, BufWriter, Read, Write};
57use std::path::Path;
58
59/// File magic + format version. Bump `VERSION` on any layout change.
60///
61/// v2 stored each entry's TTL as **remaining millis** (relative), so a load
62/// re-anchored the deadline to load-time — a restart reset every key to a
63/// fresh full TTL (INC-2026-06-09). v3 stores the **absolute** Unix-ms
64/// deadline, so a load reconstructs the original instant. v4 appends a
65/// consumer-group section to each `OP_STREAM` payload (groups + consumers
66/// plus PEL) — before that, SAVE/reshard silently dropped group state. The
67/// loader still accepts v2 (relative TTL) and v3 (no group section).
68const MAGIC: &[u8; 8] = b"KEVYSNAP";
69const VERSION: u8 = 4;
70const VERSION_RELATIVE_TTL: u8 = 2;
71const VERSION_ABSOLUTE_TTL: u8 = 3;
72
73// Record opcodes (one per value type). Each record is:
74//   [op][ttl: u8 flag + optional u64][key][type payload]
75const OP_EOF: u8 = 0;
76const OP_STR: u8 = 1;
77const OP_HASH: u8 = 2;
78const OP_LIST: u8 = 3;
79const OP_SET: u8 = 4;
80const OP_ZSET: u8 = 5;
81const OP_STREAM: u8 = 6;
82
83/// BufWriter capacity for bulk snapshot / AOF-rewrite writes. The 8 KiB
84/// default made SAVE ~12 % of disk bandwidth (tens of thousands of small
85/// `write(2)`s); 1 MiB amortizes the syscalls toward disk speed.
86pub(crate) const SNAPSHOT_BUF_CAP: usize = 1 << 20;
87
88/// Anything that can enumerate `(key, &Value, ttl_ms)` triples for
89/// serialization: a live [`Store`] (its `snapshot_each`, the synchronous
90/// paths) or a frozen [`kevy_store::SnapshotView`] (the COW paths — collect
91/// on the owning thread, serialize on a background one).
92pub trait SnapshotSource {
93    /// Visit every live entry as `(key, &value, remaining_ttl_ms)`.
94    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>));
95}
96
97impl SnapshotSource for Store {
98    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
99        self.snapshot_each(f);
100    }
101}
102
103impl SnapshotSource for kevy_store::SnapshotView {
104    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
105        self.each(f);
106    }
107}
108
109/// Write a point-in-time snapshot of `src` (a live [`Store`] or a frozen
110/// [`kevy_store::SnapshotView`]) to `path`, atomically: data is written to
111/// `<path>.tmp`, fsynced, then renamed over `path`.
112pub fn save_snapshot<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<()> {
113    let tmp = write_snapshot_tmp(src, path)?;
114    std::fs::rename(&tmp, path)
115}
116
117/// Serialize a point-in-time snapshot of `src` into any `Write` sink.
118/// Used by both [`write_snapshot_tmp`] (sink = `BufWriter<File>` +
119/// extra fsync after) and the v3-cluster replication path (sink =
120/// `&mut Vec<u8>` for in-memory snapshot ship, see
121/// `kevy-replicate/docs/snapshot.md`).
122///
123/// On-disk bytes are identical regardless of sink — the same magic +
124/// version header, same entry stream, same `OP_EOF` trailer. Callers
125/// that need durability (disk) wrap in `BufWriter<File>` and call
126/// `sync_all` themselves; callers that need bytes (network ship)
127/// pass a `Vec<u8>`.
128pub fn write_snapshot_to<S: SnapshotSource, W: Write>(src: &S, sink: &mut W) -> io::Result<()> {
129    let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, sink);
130    w.write_all(MAGIC)?;
131    w.write_all(&[VERSION])?;
132    // The source yields *remaining* ms; v3 persists the absolute
133    // Unix-ms deadline (now + remaining) so the TTL survives a restart.
134    let now = kevy_store::now_unix_ms();
135    // Enumeration is infallible; capture the first write error to surface.
136    let mut err: Option<io::Error> = None;
137    src.for_each_entry(|key, value, ttl| {
138        let deadline = ttl.map(|ms| now.saturating_add(ms));
139        if err.is_none()
140            && let Err(e) = write_entry(&mut w, key, value, deadline)
141        {
142            err = Some(e);
143        }
144    });
145    if let Some(e) = err {
146        return Err(e);
147    }
148    w.write_all(&[OP_EOF])?;
149    w.flush()?;
150    Ok(())
151}
152
153/// The write half of [`save_snapshot`]: produce the durable (fsynced)
154/// `<path>.tmp` and return its path **without** the final rename. For the
155/// COW background-save flow: the serializer thread writes the temp file at
156/// leisure, then the store-owning thread renames it in the same critical
157/// section that resets the AOF — keeping the snapshot/AOF commit adjacent
158/// instead of seconds apart.
159pub fn write_snapshot_tmp<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<std::path::PathBuf> {
160    let tmp = tmp_path(path);
161    {
162        let mut file = File::create(&tmp)?;
163        write_snapshot_to(src, &mut file)?;
164        file.sync_all()?; // durably on disk before the rename
165    }
166    Ok(tmp)
167}
168
169/// Load a snapshot from `path` into `store` (entries are inserted, not cleared
170/// first — call on a fresh store). Errors on a bad magic/version or truncation.
171pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
172    let r = BufReader::new(File::open(path)?);
173    load_snapshot_from(store, r)
174}
175
176/// Load a snapshot from any [`std::io::Read`] sink into `store` —
177/// symmetric to [`write_snapshot_to`]. Used by the v3-cluster
178/// replication path (sink = `&[u8]` wrapped in `std::io::Cursor`) to
179/// apply a primary-shipped snapshot to a fresh local store without
180/// touching disk. Entries are inserted, not cleared first — call on
181/// a fresh store. Errors on bad magic/version or truncation.
182pub fn load_snapshot_from<R: Read>(store: &mut Store, mut r: R) -> io::Result<()> {
183    let mut magic = [0u8; 8];
184    r.read_exact(&mut magic)?;
185    if &magic != MAGIC {
186        return Err(io::Error::new(
187            io::ErrorKind::InvalidData,
188            "kevy snapshot: bad magic",
189        ));
190    }
191    let version = read_u8(&mut r)?;
192    if !(VERSION_RELATIVE_TTL..=VERSION).contains(&version) {
193        return Err(io::Error::new(
194            io::ErrorKind::InvalidData,
195            "kevy snapshot: bad version",
196        ));
197    }
198    // v3+ stores absolute Unix-ms deadlines; convert each to remaining ms
199    // against one `now` read so the load is internally consistent. A deadline
200    // already past becomes `Some(0)` → loaded then immediately reaped (lazy
201    // get / active reaper), matching "expired key is gone". v2 ttls are
202    // already remaining, so pass them through.
203    let absolute_ttl = version >= VERSION_ABSOLUTE_TTL;
204    let now = kevy_store::now_unix_ms();
205
206    loop {
207        let op = read_u8(&mut r)?;
208        if op == OP_EOF {
209            return Ok(());
210        }
211        let raw_ttl = read_ttl(&mut r)?;
212        let ttl = if absolute_ttl {
213            raw_ttl.map(|deadline| deadline.saturating_sub(now))
214        } else {
215            raw_ttl
216        };
217        let key = read_bytes(&mut r)?;
218        match op {
219            OP_STR => {
220                let val = read_bytes(&mut r)?;
221                store.load_str(key, val, ttl);
222            }
223            OP_HASH => {
224                let n = read_u32(&mut r)? as usize;
225                let mut fields = Vec::with_capacity(n);
226                for _ in 0..n {
227                    let f = read_bytes(&mut r)?;
228                    let v = read_bytes(&mut r)?;
229                    fields.push((f, v));
230                }
231                store.load_hash(key, fields, ttl);
232            }
233            OP_LIST => {
234                let n = read_u32(&mut r)? as usize;
235                let mut items = Vec::with_capacity(n);
236                for _ in 0..n {
237                    items.push(read_bytes(&mut r)?);
238                }
239                store.load_list(key, items, ttl);
240            }
241            OP_SET => {
242                let n = read_u32(&mut r)? as usize;
243                let mut members = Vec::with_capacity(n);
244                for _ in 0..n {
245                    members.push(read_bytes(&mut r)?);
246                }
247                store.load_set(key, members, ttl);
248            }
249            OP_ZSET => {
250                let n = read_u32(&mut r)? as usize;
251                let mut pairs = Vec::with_capacity(n);
252                for _ in 0..n {
253                    let m = read_bytes(&mut r)?;
254                    let score = f64::from_bits(read_u64(&mut r)?);
255                    pairs.push((m, score));
256                }
257                store.load_zset(key, pairs, ttl);
258            }
259            OP_STREAM => {
260                let last_ms = read_u64(&mut r)?;
261                let last_seq = read_u64(&mut r)?;
262                let mxd_ms = read_u64(&mut r)?;
263                let mxd_seq = read_u64(&mut r)?;
264                let entries_added = read_u64(&mut r)?;
265                let n = read_u32(&mut r)? as usize;
266                let mut entries = Vec::with_capacity(n);
267                for _ in 0..n {
268                    let ms = read_u64(&mut r)?;
269                    let seq = read_u64(&mut r)?;
270                    let nf = read_u32(&mut r)? as usize;
271                    let mut fv = Vec::with_capacity(nf);
272                    for _ in 0..nf {
273                        let f = read_bytes(&mut r)?;
274                        let v = read_bytes(&mut r)?;
275                        fv.push((f, v));
276                    }
277                    entries.push((ms, seq, fv));
278                }
279                // v4 appends the consumer-group section; v2/v3 files
280                // predate groups-in-snapshot, so they load with none.
281                let groups = if version >= VERSION {
282                    read_stream_groups(&mut r)?
283                } else {
284                    Vec::new()
285                };
286                store.load_stream(
287                    key,
288                    entries,
289                    (last_ms, last_seq),
290                    (mxd_ms, mxd_seq),
291                    entries_added,
292                    groups,
293                    ttl,
294                );
295            }
296            other => {
297                return Err(io::Error::new(
298                    io::ErrorKind::InvalidData,
299                    format!("kevy snapshot: unknown opcode {other}"),
300                ));
301            }
302        }
303    }
304}
305
306/// Serialize one entry: `[op][ttl][key][payload]`.
307fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
308    let op = match value {
309        Value::Str(_) => OP_STR,
310        Value::Hash(_) => OP_HASH,
311        Value::List(_) => OP_LIST,
312        Value::Set(_) => OP_SET,
313        Value::ZSet(_) => OP_ZSET,
314        Value::Stream(_) => OP_STREAM,
315    };
316    w.write_all(&[op])?;
317    write_ttl(w, ttl)?;
318    write_bytes(w, key)?;
319    match value {
320        Value::Str(v) => write_bytes(w, v.as_slice()),
321        Value::Hash(h) => write_hash_payload(w, h),
322        Value::List(l) => write_list_payload(w, l),
323        Value::Set(set) => write_set_payload(w, set),
324        Value::ZSet(z) => write_zset_payload(w, z),
325        Value::Stream(s) => write_stream_payload(w, s),
326    }
327}
328
329fn write_hash_payload<W: Write>(w: &mut W, h: &kevy_store::HashData) -> io::Result<()> {
330    w.write_all(&(h.len() as u32).to_le_bytes())?;
331    for (f, v) in h {
332        write_bytes(w, f.as_slice())?;
333        write_bytes(w, v)?;
334    }
335    Ok(())
336}
337
338fn write_list_payload<W: Write>(w: &mut W, l: &kevy_store::ListData) -> io::Result<()> {
339    w.write_all(&(l.len() as u32).to_le_bytes())?;
340    for item in l {
341        write_bytes(w, item)?;
342    }
343    Ok(())
344}
345
346fn write_set_payload<W: Write>(w: &mut W, set: &kevy_store::SetData) -> io::Result<()> {
347    w.write_all(&(set.len() as u32).to_le_bytes())?;
348    for m in set {
349        write_bytes(w, m.as_slice())?;
350    }
351    Ok(())
352}
353
354fn write_zset_payload<W: Write>(w: &mut W, z: &kevy_store::ZSetData) -> io::Result<()> {
355    let entries: Vec<(&[u8], f64)> = z.ordered().collect();
356    w.write_all(&(entries.len() as u32).to_le_bytes())?;
357    for (m, score) in entries {
358        write_bytes(w, m)?;
359        w.write_all(&score.to_bits().to_le_bytes())?;
360    }
361    Ok(())
362}
363
364fn write_stream_payload<W: Write>(w: &mut W, s: &kevy_store::StreamData) -> io::Result<()> {
365    w.write_all(&s.last_id().ms.to_le_bytes())?;
366    w.write_all(&s.last_id().seq.to_le_bytes())?;
367    w.write_all(&s.max_deleted_id().ms.to_le_bytes())?;
368    w.write_all(&s.max_deleted_id().seq.to_le_bytes())?;
369    w.write_all(&s.entries_added().to_le_bytes())?;
370    w.write_all(&(s.length() as u32).to_le_bytes())?;
371    for (id, fv) in s.iter_entries() {
372        w.write_all(&id.ms.to_le_bytes())?;
373        w.write_all(&id.seq.to_le_bytes())?;
374        w.write_all(&(fv.len() as u32).to_le_bytes())?;
375        for (f, v) in fv {
376            write_bytes(w, f.as_slice())?;
377            write_bytes(w, v.as_slice())?;
378        }
379    }
380    write_stream_groups(w, &s.export_groups())
381}
382
383/// v4 consumer-group section: `[n_groups][per group: name, last_delivered,
384/// consumers (name + last_seen_ms), PEL rows]`. Tombstone PEL rows are kept
385/// — the snapshot path is the full-fidelity one (the AOF rewrite can't
386/// re-create them via XCLAIM, see `rewrite_fmt`).
387fn write_stream_groups<W: Write>(w: &mut W, groups: &[kevy_store::LoadedGroup]) -> io::Result<()> {
388    w.write_all(&(groups.len() as u32).to_le_bytes())?;
389    for g in groups {
390        write_bytes(w, &g.name)?;
391        w.write_all(&g.last_delivered.0.to_le_bytes())?;
392        w.write_all(&g.last_delivered.1.to_le_bytes())?;
393        w.write_all(&(g.consumers.len() as u32).to_le_bytes())?;
394        for (name, last_seen_ms) in &g.consumers {
395            write_bytes(w, name)?;
396            w.write_all(&last_seen_ms.to_le_bytes())?;
397        }
398        w.write_all(&(g.pel.len() as u32).to_le_bytes())?;
399        for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
400            w.write_all(&ms.to_le_bytes())?;
401            w.write_all(&seq.to_le_bytes())?;
402            write_bytes(w, consumer)?;
403            w.write_all(&delivery_time_ms.to_le_bytes())?;
404            w.write_all(&delivery_count.to_le_bytes())?;
405        }
406    }
407    Ok(())
408}
409
410/// Loader-side twin of [`write_stream_groups`].
411fn read_stream_groups<R: Read>(r: &mut R) -> io::Result<Vec<kevy_store::LoadedGroup>> {
412    let n = read_u32(r)? as usize;
413    let mut groups = Vec::with_capacity(n);
414    for _ in 0..n {
415        let name = read_bytes(r)?;
416        let last_delivered = (read_u64(r)?, read_u64(r)?);
417        let nc = read_u32(r)? as usize;
418        let mut consumers = Vec::with_capacity(nc);
419        for _ in 0..nc {
420            let cname = read_bytes(r)?;
421            consumers.push((cname, read_u64(r)?));
422        }
423        let np = read_u32(r)? as usize;
424        let mut pel = Vec::with_capacity(np);
425        for _ in 0..np {
426            let ms = read_u64(r)?;
427            let seq = read_u64(r)?;
428            let consumer = read_bytes(r)?;
429            let delivery_time_ms = read_u64(r)?;
430            let delivery_count = read_u32(r)?;
431            pel.push((ms, seq, consumer, delivery_time_ms, delivery_count));
432        }
433        groups.push(kevy_store::LoadedGroup { name, last_delivered, consumers, pel });
434    }
435    Ok(groups)
436}
437
438fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
439    match ttl {
440        Some(ms) => {
441            w.write_all(&[1u8])?;
442            w.write_all(&ms.to_le_bytes())?;
443        }
444        None => w.write_all(&[0u8])?,
445    }
446    Ok(())
447}
448
449fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
450    if read_u8(r)? == 1 {
451        Ok(Some(read_u64(r)?))
452    } else {
453        Ok(None)
454    }
455}
456
457fn tmp_path(path: &Path) -> std::path::PathBuf {
458    let mut s = path.as_os_str().to_owned();
459    s.push(".tmp");
460    s.into()
461}
462
463fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
464    w.write_all(&(b.len() as u32).to_le_bytes())?;
465    w.write_all(b)
466}
467
468fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
469    let len = read_u32(r)? as usize;
470    let mut buf = vec![0u8; len];
471    r.read_exact(&mut buf)?;
472    Ok(buf)
473}
474
475fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
476    let mut b = [0u8; 1];
477    r.read_exact(&mut b)?;
478    Ok(b[0])
479}
480
481fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
482    let mut b = [0u8; 4];
483    r.read_exact(&mut b)?;
484    Ok(u32::from_le_bytes(b))
485}
486
487fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
488    let mut b = [0u8; 8];
489    r.read_exact(&mut b)?;
490    Ok(u64::from_le_bytes(b))
491}
492
493#[cfg(test)]
494mod tests;
495#[cfg(test)]
496mod tests_aof;
497#[cfg(test)]
498mod tests_rewrite;