Skip to main content

kevy_persist/
lib.rs

1//! kevy-persist — durability for a [`kevy_store::Store`].
2//!
3//! Two mechanisms, both zero-dependency pure Rust over `std::fs`:
4//!
5//! - **Snapshot (RDB-style):** [`save_snapshot`] dumps a whole store to a temp
6//!   file then atomically renames it (fsync before rename); [`load_snapshot`]
7//!   restores it. A compact, type-tagged binary format.
8//! - **AOF:** an [`Aof`] append-only command log with a configurable fsync
9//!   policy; [`replay_aof`] re-applies it on startup, tolerating a truncated
10//!   trailing frame from a crash mid-write.
11//!
12//! In a shared-nothing runtime each shard persists its own store to its own
13//! file, so there is no cross-core coordination. Part of the [kevy] server.
14//!
15//! [kevy]: https://crates.io/crates/kevy
16//!
17//! # Example (AOF)
18//!
19//! ```
20//! use kevy_persist::{Aof, Argv, Fsync, replay_aof};
21//!
22//! # fn main() -> std::io::Result<()> {
23//! let path = std::env::temp_dir().join("kevy-persist-doctest.aof");
24//! # let _ = std::fs::remove_file(&path);
25//! {
26//!     let mut aof = Aof::open(&path, Fsync::No)?;
27//!     aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))?;
28//! } // flushed on drop
29//!
30//! let mut replayed: Vec<Argv> = Vec::new();
31//! replay_aof(&path, |args| replayed.push(args))?;
32//! assert_eq!(replayed, vec![vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]]);
33//! # std::fs::remove_file(&path).ok();
34//! # Ok(())
35//! # }
36//! ```
37#![forbid(unsafe_code)]
38
39mod aof;
40pub mod layout;
41mod replay;
42pub mod reshard;
43mod rewrite_fmt;
44mod shards_meta;
45
46pub use aof::{Aof, Fsync, RewritePlan, RewriteStats, write_aof_base};
47pub use replay::replay_aof;
48pub use shards_meta::{Routing, ShardsMeta, read_shards_meta, write_shards_meta};
49pub use kevy_resp::{Argv, ArgvView};
50pub use rewrite_fmt::dump_aof;
51pub(crate) use rewrite_fmt::{dump_store_to_buf, estimate_multibulk_bytes, write_multibulk};
52use kevy_store::Store;
53use kevy_store::Value;
54// ZSet snapshot iterates ordered (member, score) pairs via `Value::ZSet`.
55use std::fs::File;
56use std::io::{self, BufReader, BufWriter, Read, Write};
57use std::path::Path;
58
59/// File magic + format version. Bump `VERSION` on any layout change.
60///
61/// v2 stored each entry's TTL as **remaining millis** (relative), so a load
62/// re-anchored the deadline to load-time — a restart reset every key to a
63/// fresh full TTL (INC-2026-06-09). v3 stores the **absolute** Unix-ms
64/// deadline, so a load reconstructs the original instant. v4 appends a
65/// consumer-group section to each `OP_STREAM` payload (groups + consumers
66/// plus PEL) — before that, SAVE/reshard silently dropped group state. The
67/// loader still accepts v2 (relative TTL) and v3 (no group section).
68const MAGIC: &[u8; 8] = b"KEVYSNAP";
69const VERSION: u8 = 4;
70const VERSION_RELATIVE_TTL: u8 = 2;
71const VERSION_ABSOLUTE_TTL: u8 = 3;
72
73// Record opcodes (one per value type). Each record is:
74//   [op][ttl: u8 flag + optional u64][key][type payload]
75const OP_EOF: u8 = 0;
76const OP_STR: u8 = 1;
77const OP_HASH: u8 = 2;
78const OP_LIST: u8 = 3;
79const OP_SET: u8 = 4;
80const OP_ZSET: u8 = 5;
81const OP_STREAM: u8 = 6;
82
83/// BufWriter capacity for bulk snapshot / AOF-rewrite writes. The 8 KiB
84/// default made SAVE ~12 % of disk bandwidth (tens of thousands of small
85/// `write(2)`s); 1 MiB amortizes the syscalls toward disk speed.
86pub(crate) const SNAPSHOT_BUF_CAP: usize = 1 << 20;
87
88/// Anything that can enumerate `(key, &Value, ttl_ms)` triples for
89/// serialization: a live [`Store`] (its `snapshot_each`, the synchronous
90/// paths) or a frozen [`kevy_store::SnapshotView`] (the COW paths — collect
91/// on the owning thread, serialize on a background one).
92pub trait SnapshotSource {
93    /// Visit every live entry as `(key, &value, remaining_ttl_ms)`.
94    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>));
95}
96
97impl SnapshotSource for Store {
98    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
99        self.snapshot_each(f);
100    }
101}
102
103impl SnapshotSource for kevy_store::SnapshotView {
104    fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
105        self.each(f);
106    }
107}
108
109/// Write a point-in-time snapshot of `src` (a live [`Store`] or a frozen
110/// [`kevy_store::SnapshotView`]) to `path`, atomically: data is written to
111/// `<path>.tmp`, fsynced, then renamed over `path`.
112pub fn save_snapshot<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<()> {
113    let tmp = write_snapshot_tmp(src, path)?;
114    std::fs::rename(&tmp, path)
115}
116
117/// The write half of [`save_snapshot`]: produce the durable (fsynced)
118/// `<path>.tmp` and return its path **without** the final rename. For the
119/// COW background-save flow: the serializer thread writes the temp file at
120/// leisure, then the store-owning thread renames it in the same critical
121/// section that resets the AOF — keeping the snapshot/AOF commit adjacent
122/// instead of seconds apart.
123pub fn write_snapshot_tmp<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<std::path::PathBuf> {
124    let tmp = tmp_path(path);
125    {
126        let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, File::create(&tmp)?);
127        w.write_all(MAGIC)?;
128        w.write_all(&[VERSION])?;
129        // The source yields *remaining* ms; v3 persists the absolute
130        // Unix-ms deadline (now + remaining) so the TTL survives a restart.
131        let now = kevy_store::now_unix_ms();
132        // Enumeration is infallible; capture the first write error to surface.
133        let mut err: Option<io::Error> = None;
134        src.for_each_entry(|key, value, ttl| {
135            let deadline = ttl.map(|ms| now.saturating_add(ms));
136            if err.is_none()
137                && let Err(e) = write_entry(&mut w, key, value, deadline)
138            {
139                err = Some(e);
140            }
141        });
142        if let Some(e) = err {
143            return Err(e);
144        }
145        w.write_all(&[OP_EOF])?;
146        w.flush()?;
147        w.get_ref().sync_all()?; // durably on disk before the rename
148    }
149    Ok(tmp)
150}
151
152/// Load a snapshot from `path` into `store` (entries are inserted, not cleared
153/// first — call on a fresh store). Errors on a bad magic/version or truncation.
154pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
155    let mut r = BufReader::new(File::open(path)?);
156
157    let mut magic = [0u8; 8];
158    r.read_exact(&mut magic)?;
159    if &magic != MAGIC {
160        return Err(io::Error::new(
161            io::ErrorKind::InvalidData,
162            "kevy snapshot: bad magic",
163        ));
164    }
165    let version = read_u8(&mut r)?;
166    if !(VERSION_RELATIVE_TTL..=VERSION).contains(&version) {
167        return Err(io::Error::new(
168            io::ErrorKind::InvalidData,
169            "kevy snapshot: bad version",
170        ));
171    }
172    // v3+ stores absolute Unix-ms deadlines; convert each to remaining ms
173    // against one `now` read so the load is internally consistent. A deadline
174    // already past becomes `Some(0)` → loaded then immediately reaped (lazy
175    // get / active reaper), matching "expired key is gone". v2 ttls are
176    // already remaining, so pass them through.
177    let absolute_ttl = version >= VERSION_ABSOLUTE_TTL;
178    let now = kevy_store::now_unix_ms();
179
180    loop {
181        let op = read_u8(&mut r)?;
182        if op == OP_EOF {
183            return Ok(());
184        }
185        let raw_ttl = read_ttl(&mut r)?;
186        let ttl = if absolute_ttl {
187            raw_ttl.map(|deadline| deadline.saturating_sub(now))
188        } else {
189            raw_ttl
190        };
191        let key = read_bytes(&mut r)?;
192        match op {
193            OP_STR => {
194                let val = read_bytes(&mut r)?;
195                store.load_str(key, val, ttl);
196            }
197            OP_HASH => {
198                let n = read_u32(&mut r)? as usize;
199                let mut fields = Vec::with_capacity(n);
200                for _ in 0..n {
201                    let f = read_bytes(&mut r)?;
202                    let v = read_bytes(&mut r)?;
203                    fields.push((f, v));
204                }
205                store.load_hash(key, fields, ttl);
206            }
207            OP_LIST => {
208                let n = read_u32(&mut r)? as usize;
209                let mut items = Vec::with_capacity(n);
210                for _ in 0..n {
211                    items.push(read_bytes(&mut r)?);
212                }
213                store.load_list(key, items, ttl);
214            }
215            OP_SET => {
216                let n = read_u32(&mut r)? as usize;
217                let mut members = Vec::with_capacity(n);
218                for _ in 0..n {
219                    members.push(read_bytes(&mut r)?);
220                }
221                store.load_set(key, members, ttl);
222            }
223            OP_ZSET => {
224                let n = read_u32(&mut r)? as usize;
225                let mut pairs = Vec::with_capacity(n);
226                for _ in 0..n {
227                    let m = read_bytes(&mut r)?;
228                    let score = f64::from_bits(read_u64(&mut r)?);
229                    pairs.push((m, score));
230                }
231                store.load_zset(key, pairs, ttl);
232            }
233            OP_STREAM => {
234                let last_ms = read_u64(&mut r)?;
235                let last_seq = read_u64(&mut r)?;
236                let mxd_ms = read_u64(&mut r)?;
237                let mxd_seq = read_u64(&mut r)?;
238                let entries_added = read_u64(&mut r)?;
239                let n = read_u32(&mut r)? as usize;
240                let mut entries = Vec::with_capacity(n);
241                for _ in 0..n {
242                    let ms = read_u64(&mut r)?;
243                    let seq = read_u64(&mut r)?;
244                    let nf = read_u32(&mut r)? as usize;
245                    let mut fv = Vec::with_capacity(nf);
246                    for _ in 0..nf {
247                        let f = read_bytes(&mut r)?;
248                        let v = read_bytes(&mut r)?;
249                        fv.push((f, v));
250                    }
251                    entries.push((ms, seq, fv));
252                }
253                // v4 appends the consumer-group section; v2/v3 files
254                // predate groups-in-snapshot, so they load with none.
255                let groups = if version >= VERSION {
256                    read_stream_groups(&mut r)?
257                } else {
258                    Vec::new()
259                };
260                store.load_stream(
261                    key,
262                    entries,
263                    (last_ms, last_seq),
264                    (mxd_ms, mxd_seq),
265                    entries_added,
266                    groups,
267                    ttl,
268                );
269            }
270            other => {
271                return Err(io::Error::new(
272                    io::ErrorKind::InvalidData,
273                    format!("kevy snapshot: unknown opcode {other}"),
274                ));
275            }
276        }
277    }
278}
279
280/// Serialize one entry: `[op][ttl][key][payload]`.
281fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
282    match value {
283        Value::Str(v) => {
284            w.write_all(&[OP_STR])?;
285            write_ttl(w, ttl)?;
286            write_bytes(w, key)?;
287            write_bytes(w, v.as_slice())?;
288        }
289        Value::Hash(h) => {
290            w.write_all(&[OP_HASH])?;
291            write_ttl(w, ttl)?;
292            write_bytes(w, key)?;
293            w.write_all(&(h.len() as u32).to_le_bytes())?;
294            for (f, v) in h.iter() {
295                write_bytes(w, f.as_slice())?;
296                write_bytes(w, v)?;
297            }
298        }
299        Value::List(l) => {
300            w.write_all(&[OP_LIST])?;
301            write_ttl(w, ttl)?;
302            write_bytes(w, key)?;
303            w.write_all(&(l.len() as u32).to_le_bytes())?;
304            for item in l.iter() {
305                write_bytes(w, item)?;
306            }
307        }
308        Value::Set(set) => {
309            w.write_all(&[OP_SET])?;
310            write_ttl(w, ttl)?;
311            write_bytes(w, key)?;
312            w.write_all(&(set.len() as u32).to_le_bytes())?;
313            for m in set.iter() {
314                write_bytes(w, m.as_slice())?;
315            }
316        }
317        Value::ZSet(z) => {
318            w.write_all(&[OP_ZSET])?;
319            write_ttl(w, ttl)?;
320            write_bytes(w, key)?;
321            let entries: Vec<(&[u8], f64)> = z.ordered().collect();
322            w.write_all(&(entries.len() as u32).to_le_bytes())?;
323            for (m, score) in entries {
324                write_bytes(w, m)?;
325                w.write_all(&score.to_bits().to_le_bytes())?;
326            }
327        }
328        Value::Stream(s) => {
329            w.write_all(&[OP_STREAM])?;
330            write_ttl(w, ttl)?;
331            write_bytes(w, key)?;
332            w.write_all(&s.last_id().ms.to_le_bytes())?;
333            w.write_all(&s.last_id().seq.to_le_bytes())?;
334            w.write_all(&s.max_deleted_id().ms.to_le_bytes())?;
335            w.write_all(&s.max_deleted_id().seq.to_le_bytes())?;
336            w.write_all(&s.entries_added().to_le_bytes())?;
337            let len = s.length() as u32;
338            w.write_all(&len.to_le_bytes())?;
339            for (id, fv) in s.iter_entries() {
340                w.write_all(&id.ms.to_le_bytes())?;
341                w.write_all(&id.seq.to_le_bytes())?;
342                w.write_all(&(fv.len() as u32).to_le_bytes())?;
343                for (f, v) in fv {
344                    write_bytes(w, f.as_slice())?;
345                    write_bytes(w, v.as_slice())?;
346                }
347            }
348            write_stream_groups(w, &s.export_groups())?;
349        }
350    }
351    Ok(())
352}
353
354/// v4 consumer-group section: `[n_groups][per group: name, last_delivered,
355/// consumers (name + last_seen_ms), PEL rows]`. Tombstone PEL rows are kept
356/// — the snapshot path is the full-fidelity one (the AOF rewrite can't
357/// re-create them via XCLAIM, see `rewrite_fmt`).
358fn write_stream_groups<W: Write>(w: &mut W, groups: &[kevy_store::LoadedGroup]) -> io::Result<()> {
359    w.write_all(&(groups.len() as u32).to_le_bytes())?;
360    for g in groups {
361        write_bytes(w, &g.name)?;
362        w.write_all(&g.last_delivered.0.to_le_bytes())?;
363        w.write_all(&g.last_delivered.1.to_le_bytes())?;
364        w.write_all(&(g.consumers.len() as u32).to_le_bytes())?;
365        for (name, last_seen_ms) in &g.consumers {
366            write_bytes(w, name)?;
367            w.write_all(&last_seen_ms.to_le_bytes())?;
368        }
369        w.write_all(&(g.pel.len() as u32).to_le_bytes())?;
370        for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
371            w.write_all(&ms.to_le_bytes())?;
372            w.write_all(&seq.to_le_bytes())?;
373            write_bytes(w, consumer)?;
374            w.write_all(&delivery_time_ms.to_le_bytes())?;
375            w.write_all(&delivery_count.to_le_bytes())?;
376        }
377    }
378    Ok(())
379}
380
381/// Loader-side twin of [`write_stream_groups`].
382fn read_stream_groups<R: Read>(r: &mut R) -> io::Result<Vec<kevy_store::LoadedGroup>> {
383    let n = read_u32(r)? as usize;
384    let mut groups = Vec::with_capacity(n);
385    for _ in 0..n {
386        let name = read_bytes(r)?;
387        let last_delivered = (read_u64(r)?, read_u64(r)?);
388        let nc = read_u32(r)? as usize;
389        let mut consumers = Vec::with_capacity(nc);
390        for _ in 0..nc {
391            let cname = read_bytes(r)?;
392            consumers.push((cname, read_u64(r)?));
393        }
394        let np = read_u32(r)? as usize;
395        let mut pel = Vec::with_capacity(np);
396        for _ in 0..np {
397            let ms = read_u64(r)?;
398            let seq = read_u64(r)?;
399            let consumer = read_bytes(r)?;
400            let delivery_time_ms = read_u64(r)?;
401            let delivery_count = read_u32(r)?;
402            pel.push((ms, seq, consumer, delivery_time_ms, delivery_count));
403        }
404        groups.push(kevy_store::LoadedGroup { name, last_delivered, consumers, pel });
405    }
406    Ok(groups)
407}
408
409fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
410    match ttl {
411        Some(ms) => {
412            w.write_all(&[1u8])?;
413            w.write_all(&ms.to_le_bytes())?;
414        }
415        None => w.write_all(&[0u8])?,
416    }
417    Ok(())
418}
419
420fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
421    if read_u8(r)? == 1 {
422        Ok(Some(read_u64(r)?))
423    } else {
424        Ok(None)
425    }
426}
427
428fn tmp_path(path: &Path) -> std::path::PathBuf {
429    let mut s = path.as_os_str().to_owned();
430    s.push(".tmp");
431    s.into()
432}
433
434fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
435    w.write_all(&(b.len() as u32).to_le_bytes())?;
436    w.write_all(b)
437}
438
439fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
440    let len = read_u32(r)? as usize;
441    let mut buf = vec![0u8; len];
442    r.read_exact(&mut buf)?;
443    Ok(buf)
444}
445
446fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
447    let mut b = [0u8; 1];
448    r.read_exact(&mut b)?;
449    Ok(b[0])
450}
451
452fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
453    let mut b = [0u8; 4];
454    r.read_exact(&mut b)?;
455    Ok(u32::from_le_bytes(b))
456}
457
458fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
459    let mut b = [0u8; 8];
460    r.read_exact(&mut b)?;
461    Ok(u64::from_le_bytes(b))
462}
463
464#[cfg(test)]
465mod tests;
466#[cfg(test)]
467mod tests_aof;
468#[cfg(test)]
469mod tests_rewrite;