Skip to main content

kevy_persist/
aof.rs

1//! Append-only command log. Split out from `lib.rs` to keep that file
2//! under the 500-LOC house rule; the snapshot writer/reader stays there.
3
4use std::fs::{File, OpenOptions};
5use std::io::{self, BufWriter, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use kevy_resp::ArgvView;
10use kevy_store::Store;
11
12use crate::{
13    dump_store_to_buf, estimate_multibulk_bytes, write_multibulk,
14};
15
16/// 9-byte file-format header written at the start of every kevy-managed
17/// AOF as of v1.2.0. `replay_aof` strips it before parsing RESP, so
18/// non-kevy bytes accidentally written into the AOF path (e.g. a deploy
19/// pipeline redirecting shell stderr into the file) get the same loud
20/// rejection as any other corrupt frame. Pre-1.2 AOFs (no magic) still
21/// replay — the parser only consumes the magic if it sees it.
22pub(crate) const AOF_MAGIC: &[u8; 9] = b"KEVYAOF1\n";
23
24/// When to fsync the AOF to disk.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Fsync {
27    /// fsync after every write — safest, slowest.
28    Always,
29    /// fsync at most once per second (call [`Aof::maybe_sync`] periodically).
30    EverySec,
31    /// Never fsync explicitly; leave it to the OS.
32    No,
33}
34
35/// An append-only command log. Each write command is appended as a RESP
36/// multi-bulk frame; [`crate::replay_aof`] re-applies them on startup.
37///
38/// Durability model (paired with snapshots): a snapshot taken at T0 plus
39/// the AOF of writes in (T0, now] reconstructs the current state. `SAVE`
40/// writes the snapshot then [`Aof::truncate`]s the log, so replay never
41/// double-applies.
42///
43/// Sizes (`size_bytes`, `size_at_last_rewrite`) drive auto-trigger of
44/// [`Aof::rewrite_from`] (BGREWRITEAOF) via the
45/// `auto_aof_rewrite_percentage` + `auto_aof_rewrite_min_size` knobs in
46/// `kevy_config`.
47pub struct Aof {
48    file: BufWriter<File>,
49    path: PathBuf,
50    fsync: Fsync,
51    dirty: bool,
52    last_sync: Instant,
53    /// Estimated bytes currently in the AOF file (existing + appended since
54    /// open). Maintained without fstat() syscalls per append.
55    size_bytes: u64,
56    /// File size right after the most recent [`Self::rewrite_from`] (or
57    /// `Self::open` if never rewritten). Anchor for `auto_aof_rewrite_*`.
58    size_at_last_rewrite: u64,
59    /// Total rewrites successfully completed since open. Surfaced via INFO.
60    rewrites_total: u64,
61    /// Group-commit window: while `true`, an `Fsync::Always` `append` only
62    /// buffers (sets `dirty`) instead of fsyncing per command. The caller
63    /// brackets a batch of writes with [`Self::begin_group`] /
64    /// [`Self::end_group`] and `end_group` does the single fsync **before**
65    /// the batch's replies are sent — preserving "durable before reply"
66    /// while amortizing the per-command `flush()+sync_data()` syscalls.
67    /// Only the multi-command reactor entry points (pipelined socket reads,
68    /// cross-shard request batches) open a group; every other path keeps
69    /// the per-command fsync, so the default is always the safe one.
70    deferred: bool,
71    /// Non-blocking rewrite "diff buffer". While `Some`, every `append` also
72    /// tees its RESP frame here, so writes that land *during* an off-lock
73    /// rewrite are captured and replayed after the compacted snapshot. See
74    /// [`Self::begin_concurrent_rewrite`].
75    rewrite_tee: Option<Vec<u8>>,
76}
77
78/// Handoff between the two halves of a non-blocking rewrite: the serialized
79/// keyspace image (produced under the store lock) and the temp path to spill
80/// it to (off-lock). See [`Aof::begin_concurrent_rewrite`].
81pub struct RewritePlan {
82    /// The compacted AOF image (magic + one command stream per key).
83    pub body: Vec<u8>,
84    /// Same-directory temp file to spill `body` to before the final swap.
85    pub tmp: PathBuf,
86    /// Keys captured in `body` (for the resulting [`RewriteStats`]).
87    pub keys: u64,
88}
89
90/// Result of an [`Aof::rewrite_from`] call. Surfaced by `BGREWRITEAOF` /
91/// `INFO persistence`.
92#[derive(Debug, Clone, Copy)]
93pub struct RewriteStats {
94    /// Keys dumped into the new AOF.
95    pub keys: u64,
96    /// New AOF size in bytes.
97    pub bytes: u64,
98}
99
100impl Aof {
101    /// Open (creating if needed) `path` for appending. New files get the
102    /// 9-byte `AOF_MAGIC` header so replays can identify the file as
103    /// kevy-managed. Pre-existing files (legacy bare-RESP or already-
104    /// magic'd) are left untouched.
105    pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
106        let mut file = OpenOptions::new().create(true).append(true).open(path)?;
107        let mut size = file.metadata().map(|m| m.len()).unwrap_or(0);
108        if size == 0 {
109            // Fresh file: stamp the magic header so the replayer can
110            // distinguish kevy-written AOFs from accidental writes.
111            file.write_all(AOF_MAGIC)?;
112            file.sync_data()?;
113            size = AOF_MAGIC.len() as u64;
114        }
115        Ok(Aof {
116            file: BufWriter::new(file),
117            path: path.to_path_buf(),
118            fsync,
119            dirty: false,
120            last_sync: Instant::now(),
121            size_bytes: size,
122            size_at_last_rewrite: size,
123            rewrites_total: 0,
124            deferred: false,
125            rewrite_tee: None,
126        })
127    }
128
129    /// The fsync policy this AOF was opened with (or last switched to).
130    /// Mostly for tests / INFO output; the hot path doesn't read this.
131    #[inline]
132    pub fn fsync_policy(&self) -> Fsync {
133        self.fsync
134    }
135
136    /// Switch the fsync policy at runtime (called by `CONFIG SET
137    /// appendfsync`). When tightening to `Always`, also flushes + fsyncs
138    /// any bytes still in the BufWriter so the new "every write is on
139    /// disk before reply" contract is honoured starting on the next
140    /// append, not after the dirty backlog clears.
141    pub fn set_fsync(&mut self, fsync: Fsync) -> io::Result<()> {
142        let upgrading_to_always = matches!(fsync, Fsync::Always) && !matches!(self.fsync, Fsync::Always);
143        self.fsync = fsync;
144        if upgrading_to_always && self.dirty {
145            self.file.flush()?;
146            self.file.get_ref().sync_data()?;
147            self.dirty = false;
148            self.last_sync = Instant::now();
149        }
150        Ok(())
151    }
152
153    /// Append one command, applying the fsync policy.
154    pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
155        write_multibulk(&mut self.file, args)?;
156        // Tee into the in-flight rewrite's diff buffer (off-lock rewrite in
157        // progress): re-encode the same frame so it survives the swap. Only
158        // active during the rare rewrite window — zero cost otherwise.
159        if let Some(tee) = &mut self.rewrite_tee {
160            write_multibulk(tee, args)?;
161        }
162        self.size_bytes = self
163            .size_bytes
164            .saturating_add(estimate_multibulk_bytes(args));
165        match self.fsync {
166            // Inside a group-commit window, defer the fsync to `end_group`
167            // (one per batch, still before the batch's replies). Outside
168            // one, fsync per command — the safe default for every path.
169            Fsync::Always if self.deferred => self.dirty = true,
170            Fsync::Always => {
171                self.file.flush()?;
172                self.file.get_ref().sync_data()?;
173            }
174            Fsync::EverySec | Fsync::No => self.dirty = true,
175        }
176        Ok(())
177    }
178
179    /// Open a group-commit window (no-op unless the policy is `Always`):
180    /// subsequent `append`s buffer instead of fsyncing per command. Pair
181    /// with [`Self::end_group`] **before** sending the batch's replies.
182    #[inline]
183    pub fn begin_group(&mut self) {
184        if matches!(self.fsync, Fsync::Always) {
185            self.deferred = true;
186        }
187    }
188
189    /// Close the group-commit window: one `flush()+sync_data()` for the
190    /// whole batch (if anything was buffered), then resume per-command
191    /// fsync. Must be called before the batch's replies leave the shard.
192    #[inline]
193    pub fn end_group(&mut self) -> io::Result<()> {
194        if self.deferred {
195            self.deferred = false;
196            if self.dirty {
197                self.file.flush()?;
198                self.file.get_ref().sync_data()?;
199                self.dirty = false;
200                self.last_sync = Instant::now();
201            }
202        }
203        Ok(())
204    }
205
206    /// Flush+fsync if the `EverySec` window has elapsed. Call once per loop tick.
207    pub fn maybe_sync(&mut self) -> io::Result<()> {
208        if matches!(self.fsync, Fsync::EverySec)
209            && self.dirty
210            && self.last_sync.elapsed() >= Duration::from_secs(1)
211        {
212            self.file.flush()?;
213            self.file.get_ref().sync_data()?;
214            self.dirty = false;
215            self.last_sync = Instant::now();
216        }
217        Ok(())
218    }
219
220    /// Empty the log (after a snapshot has captured the full state). The
221    /// post-truncate file keeps the `AOF_MAGIC` header so replays of
222    /// the freshly-trimmed log still identify as kevy-managed.
223    pub fn truncate(&mut self) -> io::Result<()> {
224        self.file.flush()?;
225        let f = self.file.get_mut();
226        f.set_len(0)?;
227        f.seek(SeekFrom::Start(0))?; // harmless under O_APPEND; keeps len/pos coherent
228        f.write_all(AOF_MAGIC)?;
229        f.sync_all()?;
230        self.dirty = false;
231        self.size_bytes = AOF_MAGIC.len() as u64;
232        self.size_at_last_rewrite = AOF_MAGIC.len() as u64;
233        Ok(())
234    }
235
236    /// Estimated current AOF size in bytes (file content as of last append).
237    #[inline]
238    pub fn size_bytes(&self) -> u64 {
239        self.size_bytes
240    }
241
242    /// AOF size at the most recent rewrite (or open). Auto-trigger compares
243    /// `(size_bytes - size_at_last_rewrite) * 100 / size_at_last_rewrite` to
244    /// the `auto_aof_rewrite_percentage` knob.
245    #[inline]
246    pub fn size_at_last_rewrite(&self) -> u64 {
247        self.size_at_last_rewrite
248    }
249
250    /// Successful rewrite count since `Self::open`. Surfaced in INFO.
251    #[inline]
252    pub fn rewrites_total(&self) -> u64 {
253        self.rewrites_total
254    }
255
256    /// BGREWRITEAOF: rebuild a compact AOF from `store`'s current state and
257    /// atomically swap it in.
258    ///
259    /// **v1.0 is synchronous** — the calling shard blocks for the rewrite's
260    /// duration. Each shard owns its own AOF, so the shards' rewrites
261    /// proceed independently; per-shard blocking matches Redis's `BGSAVE`
262    /// cost in a typical single-key-per-shard workload. Concurrent
263    /// (rewrite-during-writes) incrementalisation is a v1.x perf item.
264    ///
265    /// Writes to a `<path>.rewrite` temp file with fsync, then `rename(2)`s
266    /// it over the live AOF. The append handle is reopened against the new
267    /// file before this call returns, so subsequent `append` calls land in
268    /// the rewritten log.
269    pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
270        // Flush any pending writes to the OLD file first so the snapshot
271        // accounts for everything the caller intended to durabilise.
272        self.file.flush()?;
273
274        let tmp = rewrite_tmp_path(&self.path);
275        let (keys, bytes) = crate::dump_aof(&tmp, store)?;
276
277        // Atomic replacement. After this, the OLD file descriptor in
278        // `self.file` is open against an unlinked inode; new writes would
279        // go nowhere visible. Reopen against the new path.
280        std::fs::rename(&tmp, &self.path)?;
281        let f = OpenOptions::new().append(true).open(&self.path)?;
282        self.file = BufWriter::new(f);
283        self.size_bytes = bytes;
284        self.size_at_last_rewrite = bytes;
285        self.dirty = false;
286        self.rewrites_total = self.rewrites_total.saturating_add(1);
287        Ok(RewriteStats { keys, bytes })
288    }
289
290    /// Is a non-blocking rewrite mid-flight (between
291    /// [`Self::begin_concurrent_rewrite`] and `finish`/`abort`)? While true,
292    /// don't start another rewrite — `append` is teeing into the diff buffer.
293    #[inline]
294    pub fn is_rewriting(&self) -> bool {
295        self.rewrite_tee.is_some()
296    }
297
298    /// Phase 1 of a **non-blocking** rewrite (Background auto-rewrite). Must be
299    /// called under the store lock: it serializes the keyspace into an
300    /// in-memory image and starts teeing subsequent `append`s into a diff
301    /// buffer — both atomic w.r.t. other writes. The caller then spills
302    /// `plan.body` to `plan.tmp` **with the lock released** (the slow disk
303    /// write), and finally calls [`Self::finish_concurrent_rewrite`] under the
304    /// lock again. Writes that land during the off-lock spill are captured by
305    /// the tee and appended after the snapshot, so nothing is lost.
306    pub fn begin_concurrent_rewrite(&mut self, store: &Store) -> io::Result<RewritePlan> {
307        let (body, keys) = dump_store_to_buf(store);
308        self.rewrite_tee = Some(Vec::new());
309        Ok(RewritePlan {
310            body,
311            tmp: rewrite_tmp_path(&self.path),
312            keys,
313        })
314    }
315
316    /// Phase 2: the `plan.body` is already on disk at `tmp` (spilled off-lock).
317    /// Append the diff buffer (writes since `begin`), fsync, atomically swap
318    /// over the live AOF, and reopen the append handle against it. Call under
319    /// the store lock. `keys` is `plan.keys`.
320    pub fn finish_concurrent_rewrite(&mut self, tmp: &Path, keys: u64) -> io::Result<RewriteStats> {
321        let tee = self.rewrite_tee.take().unwrap_or_default();
322        {
323            let mut f = OpenOptions::new().append(true).open(tmp)?;
324            f.write_all(&tee)?;
325            f.sync_all()?;
326        }
327        std::fs::rename(tmp, &self.path)?;
328        let f = OpenOptions::new().append(true).open(&self.path)?;
329        let bytes = f.metadata().map(|m| m.len()).unwrap_or(0);
330        self.file = BufWriter::new(f);
331        self.size_bytes = bytes;
332        self.size_at_last_rewrite = bytes;
333        self.dirty = false;
334        self.rewrites_total = self.rewrites_total.saturating_add(1);
335        Ok(RewriteStats { keys, bytes })
336    }
337
338    /// Abandon an in-flight non-blocking rewrite (e.g. the off-lock spill
339    /// failed): drop the diff buffer and resume normal appends. The live AOF
340    /// is untouched, so no data is at risk; the caller deletes the temp file.
341    pub fn abort_concurrent_rewrite(&mut self) {
342        self.rewrite_tee = None;
343    }
344
345    /// Phase 1 of a **COW** rewrite: flush pending appends and start teeing
346    /// subsequent ones into the diff buffer. O(1) — the keyspace itself is
347    /// already frozen in the caller's `SnapshotView`. Returns the temp path
348    /// the background serializer must write (via [`crate::dump_aof`]),
349    /// after which [`Self::finish_concurrent_rewrite`] (same thread as the
350    /// appends) swaps it in, or [`Self::abort_concurrent_rewrite`] backs out.
351    ///
352    /// **Atomicity contract**: the `collect_snapshot` and this call must
353    /// happen with no `append` between them (same critical section / same
354    /// thread). A write squeezing in between would either miss the new AOF
355    /// (tee started late) or replay twice (tee started early) — and
356    /// commands like LPUSH are not idempotent.
357    pub fn begin_view_rewrite(&mut self) -> io::Result<std::path::PathBuf> {
358        self.file.flush()?;
359        self.rewrite_tee = Some(Vec::new());
360        Ok(rewrite_tmp_path(&self.path))
361    }
362}
363
364/// Write a fresh AOF base at `path`: just the magic header, fsynced. The
365/// COW background-save's log reset starts from this — the post-collect
366/// tee'd writes are appended by `finish_concurrent_rewrite` and the result
367/// swaps over the live AOF (the snapshot now carries the pre-collect state).
368pub fn write_aof_base(path: &Path) -> io::Result<()> {
369    let mut f = File::create(path)?;
370    f.write_all(AOF_MAGIC)?;
371    f.sync_all()
372}
373
374/// `<aof>.rewrite` — same-directory temp path so `rename(2)` stays atomic.
375fn rewrite_tmp_path(path: &Path) -> PathBuf {
376    let mut p = path.to_path_buf();
377    let new_name = match path.file_name() {
378        Some(n) => {
379            let mut s = n.to_os_string();
380            s.push(".rewrite");
381            s
382        }
383        None => std::ffi::OsString::from("aof.rewrite"),
384    };
385    p.set_file_name(new_name);
386    p
387}