Skip to main content

kevy_persist/
lib.rs

1//! kevy-persist — durability for a [`kevy_store::Store`].
2//!
3//! Two mechanisms, both zero-dependency pure Rust over `std::fs`:
4//!
5//! - **Snapshot (RDB-style):** [`save_snapshot`] dumps a whole store to a temp
6//!   file then atomically renames it (fsync before rename); [`load_snapshot`]
7//!   restores it. A compact, type-tagged binary format.
8//! - **AOF:** an [`Aof`] append-only command log with a configurable fsync
9//!   policy; [`replay_aof`] re-applies it on startup, tolerating a truncated
10//!   trailing frame from a crash mid-write.
11//!
12//! In a shared-nothing runtime each shard persists its own store to its own
13//! file, so there is no cross-core coordination. Part of the [kevy] server.
14//!
15//! [kevy]: https://crates.io/crates/kevy
16//!
17//! # Example (AOF)
18//!
19//! ```
20//! use kevy_persist::{Aof, Argv, Fsync, replay_aof};
21//!
22//! # fn main() -> std::io::Result<()> {
23//! let path = std::env::temp_dir().join("kevy-persist-doctest.aof");
24//! # let _ = std::fs::remove_file(&path);
25//! {
26//!     let mut aof = Aof::open(&path, Fsync::No)?;
27//!     aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))?;
28//! } // flushed on drop
29//!
30//! let mut replayed: Vec<Argv> = Vec::new();
31//! replay_aof(&path, |args| replayed.push(args))?;
32//! assert_eq!(replayed, vec![vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]]);
33//! # std::fs::remove_file(&path).ok();
34//! # Ok(())
35//! # }
36//! ```
37#![forbid(unsafe_code)]
38
39mod aof;
40pub mod layout;
41mod replay;
42pub mod reshard;
43mod rewrite_fmt;
44mod shards_meta;
45mod snapshot_payload;
46
47pub use aof::{Aof, Fsync, RewritePlan, RewriteStats, write_aof_base};
48pub use replay::replay_aof;
49pub use shards_meta::{Routing, ShardsMeta, read_shards_meta, write_shards_meta};
50pub use kevy_resp::{Argv, ArgvView};
51pub use rewrite_fmt::dump_aof;
52pub(crate) use rewrite_fmt::{dump_store_to_buf, estimate_multibulk_bytes, write_multibulk};
53use kevy_store::Store;
54use kevy_store::Value;
55// ZSet snapshot iterates ordered (member, score) pairs via `Value::ZSet`.
56use std::fs::File;
57use std::io::{self, BufReader, BufWriter, Read, Write};
58use std::path::Path;
59
60/// File magic + format version. Bump `VERSION` on any layout change.
61///
62/// v2 stored each entry's TTL as **remaining millis** (relative), so a load
63/// re-anchored the deadline to load-time — a restart reset every key to a
64/// fresh full TTL (INC-2026-06-09). v3 stores the **absolute** Unix-ms
65/// deadline, so a load reconstructs the original instant. v4 appends a
66/// consumer-group section to each `OP_STREAM` payload (groups + consumers
67/// plus PEL) — before that, SAVE/reshard silently dropped group state. The
68/// loader still accepts v2 (relative TTL) and v3 (no group section).
69const MAGIC: &[u8; 8] = b"KEVYSNAP";
70const VERSION: u8 = 4;
71const VERSION_RELATIVE_TTL: u8 = 2;
72const VERSION_ABSOLUTE_TTL: u8 = 3;
73
74// Record opcodes (one per value type). Each record is:
75//   [op][ttl: u8 flag + optional u64][key][type payload]
76const OP_EOF: u8 = 0;
77const OP_STR: u8 = 1;
78const OP_HASH: u8 = 2;
79const OP_LIST: u8 = 3;
80const OP_SET: u8 = 4;
81const OP_ZSET: u8 = 5;
82const OP_STREAM: u8 = 6;
83
84/// BufWriter capacity for bulk snapshot / AOF-rewrite writes. The 8 KiB
85/// default made SAVE ~12 % of disk bandwidth (tens of thousands of small
86/// `write(2)`s); 1 MiB amortizes the syscalls toward disk speed.
87pub(crate) const SNAPSHOT_BUF_CAP: usize = 1 << 20;
88
89/// Anything that can enumerate `(key, &Value, ttl_ms)` triples for
90/// serialization: a live [`Store`] (its `snapshot_each`, the synchronous
91/// paths) or a frozen [`kevy_store::SnapshotView`] (the COW paths — collect
92/// on the owning thread, serialize on a background one).
93pub trait SnapshotSource {
94    /// Visit every live entry as `(key, &value, remaining_ttl_ms)`.
95    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>));
96}
97
98impl SnapshotSource for Store {
99    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
100        self.snapshot_each(f);
101    }
102}
103
104impl SnapshotSource for kevy_store::SnapshotView {
105    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
106        self.each(f);
107    }
108}
109
110/// Write a point-in-time snapshot of `src` (a live [`Store`] or a frozen
111/// [`kevy_store::SnapshotView`]) to `path`, atomically: data is written to
112/// `<path>.tmp`, fsynced, then renamed over `path`.
113pub fn save_snapshot<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<()> {
114    let tmp = write_snapshot_tmp(src, path)?;
115    std::fs::rename(&tmp, path)
116}
117
118/// Serialize a point-in-time snapshot of `src` into any `Write` sink.
119/// Used by both [`write_snapshot_tmp`] (sink = `BufWriter<File>` +
120/// extra fsync after) and the v3-cluster replication path (sink =
121/// `&mut Vec<u8>` for in-memory snapshot ship, see
122/// `kevy-replicate/docs/snapshot.md`).
123///
124/// On-disk bytes are identical regardless of sink — the same magic +
125/// version header, same entry stream, same `OP_EOF` trailer. Callers
126/// that need durability (disk) wrap in `BufWriter<File>` and call
127/// `sync_all` themselves; callers that need bytes (network ship)
128/// pass a `Vec<u8>`.
129pub fn write_snapshot_to<S: SnapshotSource, W: Write>(src: &S, sink: &mut W) -> io::Result<()> {
130    let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, sink);
131    w.write_all(MAGIC)?;
132    w.write_all(&[VERSION])?;
133    // The source yields *remaining* ms; v3 persists the absolute
134    // Unix-ms deadline (now + remaining) so the TTL survives a restart.
135    let now = kevy_store::now_unix_ms();
136    // Enumeration is infallible; capture the first write error to surface.
137    let mut err: Option<io::Error> = None;
138    src.for_each_entry(|key, value, ttl| {
139        let deadline = ttl.map(|ms| now.saturating_add(ms));
140        if err.is_none()
141            && let Err(e) = write_entry(&mut w, key, value, deadline)
142        {
143            err = Some(e);
144        }
145    });
146    if let Some(e) = err {
147        return Err(e);
148    }
149    w.write_all(&[OP_EOF])?;
150    w.flush()?;
151    Ok(())
152}
153
154/// The write half of [`save_snapshot`]: produce the durable (fsynced)
155/// `<path>.tmp` and return its path **without** the final rename. For the
156/// COW background-save flow: the serializer thread writes the temp file at
157/// leisure, then the store-owning thread renames it in the same critical
158/// section that resets the AOF — keeping the snapshot/AOF commit adjacent
159/// instead of seconds apart.
160pub fn write_snapshot_tmp<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<std::path::PathBuf> {
161    let tmp = tmp_path(path);
162    {
163        let mut file = File::create(&tmp)?;
164        write_snapshot_to(src, &mut file)?;
165        file.sync_all()?; // durably on disk before the rename
166    }
167    Ok(tmp)
168}
169
170/// Load a snapshot from `path` into `store` (entries are inserted, not cleared
171/// first — call on a fresh store). Errors on a bad magic/version or truncation.
172pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
173    let r = BufReader::new(File::open(path)?);
174    load_snapshot_from(store, r)
175}
176
177/// Load a snapshot from any [`std::io::Read`] sink into `store` —
178/// symmetric to [`write_snapshot_to`]. Used by the v3-cluster
179/// replication path (sink = `&[u8]` wrapped in `std::io::Cursor`) to
180/// apply a primary-shipped snapshot to a fresh local store without
181/// touching disk. Entries are inserted, not cleared first — call on
182/// a fresh store. Errors on bad magic/version or truncation.
183pub fn load_snapshot_from<R: Read>(store: &mut Store, mut r: R) -> io::Result<()> {
184    let mut magic = [0u8; 8];
185    r.read_exact(&mut magic)?;
186    if &magic != MAGIC {
187        return Err(io::Error::new(
188            io::ErrorKind::InvalidData,
189            "kevy snapshot: bad magic",
190        ));
191    }
192    let version = read_u8(&mut r)?;
193    if !(VERSION_RELATIVE_TTL..=VERSION).contains(&version) {
194        return Err(io::Error::new(
195            io::ErrorKind::InvalidData,
196            "kevy snapshot: bad version",
197        ));
198    }
199    // v3+ stores absolute Unix-ms deadlines; convert each to remaining ms
200    // against one `now` read so the load is internally consistent. A deadline
201    // already past becomes `Some(0)` → loaded then immediately reaped (lazy
202    // get / active reaper), matching "expired key is gone". v2 ttls are
203    // already remaining, so pass them through.
204    let absolute_ttl = version >= VERSION_ABSOLUTE_TTL;
205    let now = kevy_store::now_unix_ms();
206
207    loop {
208        let op = read_u8(&mut r)?;
209        if op == OP_EOF {
210            return Ok(());
211        }
212        let raw_ttl = read_ttl(&mut r)?;
213        let ttl = if absolute_ttl {
214            raw_ttl.map(|deadline| deadline.saturating_sub(now))
215        } else {
216            raw_ttl
217        };
218        let key = read_bytes(&mut r)?;
219        match op {
220            OP_STR => {
221                let val = read_bytes(&mut r)?;
222                store.load_str(key, val, ttl);
223            }
224            OP_HASH => {
225                let n = read_u32(&mut r)? as usize;
226                let mut fields = Vec::with_capacity(n);
227                for _ in 0..n {
228                    let f = read_bytes(&mut r)?;
229                    let v = read_bytes(&mut r)?;
230                    fields.push((f, v));
231                }
232                store.load_hash(key, fields, ttl);
233            }
234            OP_LIST => {
235                let n = read_u32(&mut r)? as usize;
236                let mut items = Vec::with_capacity(n);
237                for _ in 0..n {
238                    items.push(read_bytes(&mut r)?);
239                }
240                store.load_list(key, items, ttl);
241            }
242            OP_SET => {
243                let n = read_u32(&mut r)? as usize;
244                let mut members = Vec::with_capacity(n);
245                for _ in 0..n {
246                    members.push(read_bytes(&mut r)?);
247                }
248                store.load_set(key, members, ttl);
249            }
250            OP_ZSET => {
251                let n = read_u32(&mut r)? as usize;
252                let mut pairs = Vec::with_capacity(n);
253                for _ in 0..n {
254                    let m = read_bytes(&mut r)?;
255                    let score = f64::from_bits(read_u64(&mut r)?);
256                    pairs.push((m, score));
257                }
258                store.load_zset(key, pairs, ttl);
259            }
260            OP_STREAM => {
261                let last_ms = read_u64(&mut r)?;
262                let last_seq = read_u64(&mut r)?;
263                let mxd_ms = read_u64(&mut r)?;
264                let mxd_seq = read_u64(&mut r)?;
265                let entries_added = read_u64(&mut r)?;
266                let n = read_u32(&mut r)? as usize;
267                let mut entries = Vec::with_capacity(n);
268                for _ in 0..n {
269                    let ms = read_u64(&mut r)?;
270                    let seq = read_u64(&mut r)?;
271                    let nf = read_u32(&mut r)? as usize;
272                    let mut fv = Vec::with_capacity(nf);
273                    for _ in 0..nf {
274                        let f = read_bytes(&mut r)?;
275                        let v = read_bytes(&mut r)?;
276                        fv.push((f, v));
277                    }
278                    entries.push((ms, seq, fv));
279                }
280                // v4 appends the consumer-group section; v2/v3 files
281                // predate groups-in-snapshot, so they load with none.
282                let groups = if version >= VERSION {
283                    read_stream_groups(&mut r)?
284                } else {
285                    Vec::new()
286                };
287                store.load_stream(
288                    key,
289                    entries,
290                    (last_ms, last_seq),
291                    (mxd_ms, mxd_seq),
292                    entries_added,
293                    groups,
294                    ttl,
295                );
296            }
297            other => {
298                return Err(io::Error::new(
299                    io::ErrorKind::InvalidData,
300                    format!("kevy snapshot: unknown opcode {other}"),
301                ));
302            }
303        }
304    }
305}
306
307/// Serialize one entry: `[op][ttl][key][payload]`.
308fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
309    let op = match value {
310        Value::Str(_) | Value::Int(_) | Value::ArcBulk(_) => OP_STR, // L1/L2: all reuse OP_STR.
311
312        Value::Hash(_) | Value::SmallHashInline(_) => OP_HASH,
313        Value::List(_) | Value::SmallListInline(_) => OP_LIST,
314        // A.7 O5: both Set encodings share the OP_SET wire format —
315        // payload is `[len: u32 LE][bulk: len-prefixed bytes]*`, agnostic
316        // of whether the in-memory representation is `SmallSetInline` or
317        // `Arc<KevySet>`.
318        Value::Set(_) | Value::SmallSetInline(_) => OP_SET,
319        Value::ZSet(_) | Value::SmallZSetInline(_) => OP_ZSET,
320        Value::Stream(_) => OP_STREAM,
321    };
322    w.write_all(&[op])?;
323    write_ttl(w, ttl)?;
324    write_bytes(w, key)?;
325    match value {
326        Value::Str(v) => write_bytes(w, v.as_slice()),
327        Value::Int(n) => write_bytes(w, n.to_string().as_bytes()),
328        Value::ArcBulk(a) => write_bytes(w, a.as_ref()),
329        Value::Hash(h) => snapshot_payload::write_hash_payload(w, h),
330        Value::SmallHashInline(h) => snapshot_payload::write_small_hash_payload(w, h),
331        Value::List(l) => snapshot_payload::write_list_payload(w, l),
332        Value::SmallListInline(l) => snapshot_payload::write_small_list_payload(w, l),
333        Value::Set(set) => snapshot_payload::write_set_payload(w, set),
334        Value::SmallSetInline(s) => snapshot_payload::write_small_set_payload(w, s),
335        Value::ZSet(z) => snapshot_payload::write_zset_payload(w, z),
336        Value::SmallZSetInline(z) => snapshot_payload::write_small_zset_payload(w, z),
337        Value::Stream(s) => snapshot_payload::write_stream_payload(w, s),
338    }
339}
340
341/// v4 consumer-group section: `[n_groups][per group: name, last_delivered,
342/// consumers (name + last_seen_ms), PEL rows]`. Tombstone PEL rows are kept
343/// — the snapshot path is the full-fidelity one (the AOF rewrite can't
344/// re-create them via XCLAIM, see `rewrite_fmt`).
345pub(crate) fn write_stream_groups<W: Write>(w: &mut W, groups: &[kevy_store::LoadedGroup]) -> io::Result<()> {
346    w.write_all(&(groups.len() as u32).to_le_bytes())?;
347    for g in groups {
348        write_bytes(w, &g.name)?;
349        w.write_all(&g.last_delivered.0.to_le_bytes())?;
350        w.write_all(&g.last_delivered.1.to_le_bytes())?;
351        w.write_all(&(g.consumers.len() as u32).to_le_bytes())?;
352        for (name, last_seen_ms) in &g.consumers {
353            write_bytes(w, name)?;
354            w.write_all(&last_seen_ms.to_le_bytes())?;
355        }
356        w.write_all(&(g.pel.len() as u32).to_le_bytes())?;
357        for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
358            w.write_all(&ms.to_le_bytes())?;
359            w.write_all(&seq.to_le_bytes())?;
360            write_bytes(w, consumer)?;
361            w.write_all(&delivery_time_ms.to_le_bytes())?;
362            w.write_all(&delivery_count.to_le_bytes())?;
363        }
364    }
365    Ok(())
366}
367
368/// Loader-side twin of [`write_stream_groups`].
369fn read_stream_groups<R: Read>(r: &mut R) -> io::Result<Vec<kevy_store::LoadedGroup>> {
370    let n = read_u32(r)? as usize;
371    let mut groups = Vec::with_capacity(n);
372    for _ in 0..n {
373        let name = read_bytes(r)?;
374        let last_delivered = (read_u64(r)?, read_u64(r)?);
375        let nc = read_u32(r)? as usize;
376        let mut consumers = Vec::with_capacity(nc);
377        for _ in 0..nc {
378            let cname = read_bytes(r)?;
379            consumers.push((cname, read_u64(r)?));
380        }
381        let np = read_u32(r)? as usize;
382        let mut pel = Vec::with_capacity(np);
383        for _ in 0..np {
384            let ms = read_u64(r)?;
385            let seq = read_u64(r)?;
386            let consumer = read_bytes(r)?;
387            let delivery_time_ms = read_u64(r)?;
388            let delivery_count = read_u32(r)?;
389            pel.push((ms, seq, consumer, delivery_time_ms, delivery_count));
390        }
391        groups.push(kevy_store::LoadedGroup { name, last_delivered, consumers, pel });
392    }
393    Ok(groups)
394}
395
396fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
397    match ttl {
398        Some(ms) => {
399            w.write_all(&[1u8])?;
400            w.write_all(&ms.to_le_bytes())?;
401        }
402        None => w.write_all(&[0u8])?,
403    }
404    Ok(())
405}
406
407fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
408    if read_u8(r)? == 1 {
409        Ok(Some(read_u64(r)?))
410    } else {
411        Ok(None)
412    }
413}
414
415fn tmp_path(path: &Path) -> std::path::PathBuf {
416    let mut s = path.as_os_str().to_owned();
417    s.push(".tmp");
418    s.into()
419}
420
421pub(crate) fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
422    w.write_all(&(b.len() as u32).to_le_bytes())?;
423    w.write_all(b)
424}
425
426fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
427    let len = read_u32(r)? as usize;
428    let mut buf = vec![0u8; len];
429    r.read_exact(&mut buf)?;
430    Ok(buf)
431}
432
433fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
434    let mut b = [0u8; 1];
435    r.read_exact(&mut b)?;
436    Ok(b[0])
437}
438
439fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
440    let mut b = [0u8; 4];
441    r.read_exact(&mut b)?;
442    Ok(u32::from_le_bytes(b))
443}
444
445fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
446    let mut b = [0u8; 8];
447    r.read_exact(&mut b)?;
448    Ok(u64::from_le_bytes(b))
449}
450
451#[cfg(test)]
452mod tests;
453#[cfg(test)]
454mod tests_aof;
455#[cfg(test)]
456mod tests_rewrite;