kevy_persist/aof.rs
1//! Append-only command log. Split out from `lib.rs` to keep that file
2//! under the 500-LOC house rule; the snapshot writer/reader stays there.
3
4use std::fs::{File, OpenOptions};
5use std::io::{self, BufWriter, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use kevy_resp::ArgvView;
10use kevy_store::Store;
11
12use crate::{
13 dump_store_to_buf, estimate_multibulk_bytes, write_multibulk,
14};
15
16/// 9-byte file-format header written at the start of every kevy-managed
17/// AOF as of v1.2.0. `replay_aof` strips it before parsing RESP, so
18/// non-kevy bytes accidentally written into the AOF path (e.g. a deploy
19/// pipeline redirecting shell stderr into the file) get the same loud
20/// rejection as any other corrupt frame. Pre-1.2 AOFs (no magic) still
21/// replay — the parser only consumes the magic if it sees it.
22pub(crate) const AOF_MAGIC: &[u8; 9] = b"KEVYAOF1\n";
23
24/// When to fsync the AOF to disk.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Fsync {
27 /// fsync after every write — safest, slowest.
28 Always,
29 /// fsync at most once per second (call [`Aof::maybe_sync`] periodically).
30 EverySec,
31 /// Never fsync explicitly; leave it to the OS.
32 No,
33}
34
35/// An append-only command log. Each write command is appended as a RESP
36/// multi-bulk frame; [`crate::replay_aof`] re-applies them on startup.
37///
38/// Durability model (paired with snapshots): a snapshot taken at T0 plus
39/// the AOF of writes in (T0, now] reconstructs the current state. `SAVE`
40/// writes the snapshot then [`Aof::truncate`]s the log, so replay never
41/// double-applies.
42///
43/// Sizes (`size_bytes`, `size_at_last_rewrite`) drive auto-trigger of
44/// [`Aof::rewrite_from`] (BGREWRITEAOF) via the
45/// `auto_aof_rewrite_percentage` + `auto_aof_rewrite_min_size` knobs in
46/// `kevy_config`.
47pub struct Aof {
48 file: BufWriter<File>,
49 path: PathBuf,
50 fsync: Fsync,
51 dirty: bool,
52 last_sync: Instant,
53 /// Estimated bytes currently in the AOF file (existing + appended since
54 /// open). Maintained without fstat() syscalls per append.
55 size_bytes: u64,
56 /// File size right after the most recent [`Self::rewrite_from`] (or
57 /// `Self::open` if never rewritten). Anchor for `auto_aof_rewrite_*`.
58 size_at_last_rewrite: u64,
59 /// Total rewrites successfully completed since open. Surfaced via INFO.
60 rewrites_total: u64,
61 /// Group-commit window: while `true`, an `Fsync::Always` `append` only
62 /// buffers (sets `dirty`) instead of fsyncing per command. The caller
63 /// brackets a batch of writes with [`Self::begin_group`] /
64 /// [`Self::end_group`] and `end_group` does the single fsync **before**
65 /// the batch's replies are sent — preserving "durable before reply"
66 /// while amortizing the per-command `flush()+sync_data()` syscalls.
67 /// Only the multi-command reactor entry points (pipelined socket reads,
68 /// cross-shard request batches) open a group; every other path keeps
69 /// the per-command fsync, so the default is always the safe one.
70 deferred: bool,
71 /// Non-blocking rewrite "diff buffer". While `Some`, every `append` also
72 /// tees its RESP frame here, so writes that land *during* an off-lock
73 /// rewrite are captured and replayed after the compacted snapshot. See
74 /// [`Self::begin_concurrent_rewrite`].
75 rewrite_tee: Option<Vec<u8>>,
76}
77
78/// Handoff between the two halves of a non-blocking rewrite: the serialized
79/// keyspace image (produced under the store lock) and the temp path to spill
80/// it to (off-lock). See [`Aof::begin_concurrent_rewrite`].
81pub struct RewritePlan {
82 /// The compacted AOF image (magic + one command stream per key).
83 pub body: Vec<u8>,
84 /// Same-directory temp file to spill `body` to before the final swap.
85 pub tmp: PathBuf,
86 /// Keys captured in `body` (for the resulting [`RewriteStats`]).
87 pub keys: u64,
88}
89
90/// Result of an [`Aof::rewrite_from`] call. Surfaced by `BGREWRITEAOF` /
91/// `INFO persistence`.
92#[derive(Debug, Clone, Copy)]
93pub struct RewriteStats {
94 /// Keys dumped into the new AOF.
95 pub keys: u64,
96 /// New AOF size in bytes.
97 pub bytes: u64,
98}
99
100impl Aof {
101 /// Open (creating if needed) `path` for appending. New files get the
102 /// 9-byte `AOF_MAGIC` header so replays can identify the file as
103 /// kevy-managed. Pre-existing files (legacy bare-RESP or already-
104 /// magic'd) are left untouched.
105 pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
106 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
107 let mut size = file.metadata().map(|m| m.len()).unwrap_or(0);
108 if size == 0 {
109 // Fresh file: stamp the magic header so the replayer can
110 // distinguish kevy-written AOFs from accidental writes.
111 file.write_all(AOF_MAGIC)?;
112 file.sync_data()?;
113 size = AOF_MAGIC.len() as u64;
114 }
115 Ok(Aof {
116 file: BufWriter::new(file),
117 path: path.to_path_buf(),
118 fsync,
119 dirty: false,
120 last_sync: Instant::now(),
121 size_bytes: size,
122 size_at_last_rewrite: size,
123 rewrites_total: 0,
124 deferred: false,
125 rewrite_tee: None,
126 })
127 }
128
129 /// The fsync policy this AOF was opened with (or last switched to).
130 /// Mostly for tests / INFO output; the hot path doesn't read this.
131 #[inline]
132 pub fn fsync_policy(&self) -> Fsync {
133 self.fsync
134 }
135
136 /// Switch the fsync policy at runtime (called by `CONFIG SET
137 /// appendfsync`). When tightening to `Always`, also flushes + fsyncs
138 /// any bytes still in the BufWriter so the new "every write is on
139 /// disk before reply" contract is honoured starting on the next
140 /// append, not after the dirty backlog clears.
141 pub fn set_fsync(&mut self, fsync: Fsync) -> io::Result<()> {
142 let upgrading_to_always = matches!(fsync, Fsync::Always) && !matches!(self.fsync, Fsync::Always);
143 self.fsync = fsync;
144 if upgrading_to_always && self.dirty {
145 self.file.flush()?;
146 self.file.get_ref().sync_data()?;
147 self.dirty = false;
148 self.last_sync = Instant::now();
149 }
150 Ok(())
151 }
152
153 /// Append one command, applying the fsync policy.
154 pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
155 write_multibulk(&mut self.file, args)?;
156 // Tee into the in-flight rewrite's diff buffer (off-lock rewrite in
157 // progress): re-encode the same frame so it survives the swap. Only
158 // active during the rare rewrite window — zero cost otherwise.
159 if let Some(tee) = &mut self.rewrite_tee {
160 write_multibulk(tee, args)?;
161 }
162 self.size_bytes = self
163 .size_bytes
164 .saturating_add(estimate_multibulk_bytes(args));
165 match self.fsync {
166 // Inside a group-commit window, defer the fsync to `end_group`
167 // (one per batch, still before the batch's replies). Outside
168 // one, fsync per command — the safe default for every path.
169 Fsync::Always if self.deferred => self.dirty = true,
170 Fsync::Always => {
171 self.file.flush()?;
172 self.file.get_ref().sync_data()?;
173 }
174 Fsync::EverySec | Fsync::No => self.dirty = true,
175 }
176 Ok(())
177 }
178
179 /// Open a group-commit window (no-op unless the policy is `Always`):
180 /// subsequent `append`s buffer instead of fsyncing per command. Pair
181 /// with [`Self::end_group`] **before** sending the batch's replies.
182 #[inline]
183 pub fn begin_group(&mut self) {
184 if matches!(self.fsync, Fsync::Always) {
185 self.deferred = true;
186 }
187 }
188
189 /// Close the group-commit window: one `flush()+sync_data()` for the
190 /// whole batch (if anything was buffered), then resume per-command
191 /// fsync. Must be called before the batch's replies leave the shard.
192 #[inline]
193 pub fn end_group(&mut self) -> io::Result<()> {
194 if self.deferred {
195 self.deferred = false;
196 if self.dirty {
197 self.file.flush()?;
198 self.file.get_ref().sync_data()?;
199 self.dirty = false;
200 self.last_sync = Instant::now();
201 }
202 }
203 Ok(())
204 }
205
206 /// Flush+fsync if the `EverySec` window has elapsed. Call once per loop tick.
207 pub fn maybe_sync(&mut self) -> io::Result<()> {
208 if matches!(self.fsync, Fsync::EverySec)
209 && self.dirty
210 && self.last_sync.elapsed() >= Duration::from_secs(1)
211 {
212 self.file.flush()?;
213 self.file.get_ref().sync_data()?;
214 self.dirty = false;
215 self.last_sync = Instant::now();
216 }
217 Ok(())
218 }
219
220 /// Empty the log (after a snapshot has captured the full state). The
221 /// post-truncate file keeps the `AOF_MAGIC` header so replays of
222 /// the freshly-trimmed log still identify as kevy-managed.
223 pub fn truncate(&mut self) -> io::Result<()> {
224 self.file.flush()?;
225 let f = self.file.get_mut();
226 f.set_len(0)?;
227 f.seek(SeekFrom::Start(0))?; // harmless under O_APPEND; keeps len/pos coherent
228 f.write_all(AOF_MAGIC)?;
229 f.sync_all()?;
230 self.dirty = false;
231 self.size_bytes = AOF_MAGIC.len() as u64;
232 self.size_at_last_rewrite = AOF_MAGIC.len() as u64;
233 Ok(())
234 }
235
236 /// Estimated current AOF size in bytes (file content as of last append).
237 #[inline]
238 pub fn size_bytes(&self) -> u64 {
239 self.size_bytes
240 }
241
242 /// AOF size at the most recent rewrite (or open). Auto-trigger compares
243 /// `(size_bytes - size_at_last_rewrite) * 100 / size_at_last_rewrite` to
244 /// the `auto_aof_rewrite_percentage` knob.
245 #[inline]
246 pub fn size_at_last_rewrite(&self) -> u64 {
247 self.size_at_last_rewrite
248 }
249
250 /// Successful rewrite count since `Self::open`. Surfaced in INFO.
251 #[inline]
252 pub fn rewrites_total(&self) -> u64 {
253 self.rewrites_total
254 }
255
256 /// BGREWRITEAOF: rebuild a compact AOF from `store`'s current state and
257 /// atomically swap it in.
258 ///
259 /// **v1.0 is synchronous** — the calling shard blocks for the rewrite's
260 /// duration. Each shard owns its own AOF, so the shards' rewrites
261 /// proceed independently; per-shard blocking matches Redis's `BGSAVE`
262 /// cost in a typical single-key-per-shard workload. Concurrent
263 /// (rewrite-during-writes) incrementalisation is a v1.x perf item.
264 ///
265 /// Writes to a `<path>.rewrite` temp file with fsync, then `rename(2)`s
266 /// it over the live AOF. The append handle is reopened against the new
267 /// file before this call returns, so subsequent `append` calls land in
268 /// the rewritten log.
269 pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
270 // Flush any pending writes to the OLD file first so the snapshot
271 // accounts for everything the caller intended to durabilise.
272 self.file.flush()?;
273
274 let tmp = rewrite_tmp_path(&self.path);
275 let (keys, bytes) = crate::dump_aof(&tmp, store)?;
276
277 // Atomic replacement. After this, the OLD file descriptor in
278 // `self.file` is open against an unlinked inode; new writes would
279 // go nowhere visible. Reopen against the new path.
280 std::fs::rename(&tmp, &self.path)?;
281 let f = OpenOptions::new().append(true).open(&self.path)?;
282 self.file = BufWriter::new(f);
283 self.size_bytes = bytes;
284 self.size_at_last_rewrite = bytes;
285 self.dirty = false;
286 self.rewrites_total = self.rewrites_total.saturating_add(1);
287 Ok(RewriteStats { keys, bytes })
288 }
289
290 /// Is a non-blocking rewrite mid-flight (between
291 /// [`Self::begin_concurrent_rewrite`] and `finish`/`abort`)? While true,
292 /// don't start another rewrite — `append` is teeing into the diff buffer.
293 #[inline]
294 pub fn is_rewriting(&self) -> bool {
295 self.rewrite_tee.is_some()
296 }
297
298 /// Phase 1 of a **non-blocking** rewrite (Background auto-rewrite). Must be
299 /// called under the store lock: it serializes the keyspace into an
300 /// in-memory image and starts teeing subsequent `append`s into a diff
301 /// buffer — both atomic w.r.t. other writes. The caller then spills
302 /// `plan.body` to `plan.tmp` **with the lock released** (the slow disk
303 /// write), and finally calls [`Self::finish_concurrent_rewrite`] under the
304 /// lock again. Writes that land during the off-lock spill are captured by
305 /// the tee and appended after the snapshot, so nothing is lost.
306 pub fn begin_concurrent_rewrite(&mut self, store: &Store) -> io::Result<RewritePlan> {
307 let (body, keys) = dump_store_to_buf(store);
308 self.rewrite_tee = Some(Vec::new());
309 Ok(RewritePlan {
310 body,
311 tmp: rewrite_tmp_path(&self.path),
312 keys,
313 })
314 }
315
316 /// Phase 2: the `plan.body` is already on disk at `tmp` (spilled off-lock).
317 /// Append the diff buffer (writes since `begin`), fsync, atomically swap
318 /// over the live AOF, and reopen the append handle against it. Call under
319 /// the store lock. `keys` is `plan.keys`.
320 pub fn finish_concurrent_rewrite(&mut self, tmp: &Path, keys: u64) -> io::Result<RewriteStats> {
321 let tee = self.rewrite_tee.take().unwrap_or_default();
322 {
323 let mut f = OpenOptions::new().append(true).open(tmp)?;
324 f.write_all(&tee)?;
325 f.sync_all()?;
326 }
327 std::fs::rename(tmp, &self.path)?;
328 let f = OpenOptions::new().append(true).open(&self.path)?;
329 let bytes = f.metadata().map(|m| m.len()).unwrap_or(0);
330 self.file = BufWriter::new(f);
331 self.size_bytes = bytes;
332 self.size_at_last_rewrite = bytes;
333 self.dirty = false;
334 self.rewrites_total = self.rewrites_total.saturating_add(1);
335 Ok(RewriteStats { keys, bytes })
336 }
337
338 /// Abandon an in-flight non-blocking rewrite (e.g. the off-lock spill
339 /// failed): drop the diff buffer and resume normal appends. The live AOF
340 /// is untouched, so no data is at risk; the caller deletes the temp file.
341 pub fn abort_concurrent_rewrite(&mut self) {
342 self.rewrite_tee = None;
343 }
344
345 /// Phase 1 of a **COW** rewrite: flush pending appends and start teeing
346 /// subsequent ones into the diff buffer. O(1) — the keyspace itself is
347 /// already frozen in the caller's `SnapshotView`. Returns the temp path
348 /// the background serializer must write (via [`crate::dump_aof`]),
349 /// after which [`Self::finish_concurrent_rewrite`] (same thread as the
350 /// appends) swaps it in, or [`Self::abort_concurrent_rewrite`] backs out.
351 ///
352 /// **Atomicity contract**: the `collect_snapshot` and this call must
353 /// happen with no `append` between them (same critical section / same
354 /// thread). A write squeezing in between would either miss the new AOF
355 /// (tee started late) or replay twice (tee started early) — and
356 /// commands like LPUSH are not idempotent.
357 pub fn begin_view_rewrite(&mut self) -> io::Result<std::path::PathBuf> {
358 self.file.flush()?;
359 self.rewrite_tee = Some(Vec::new());
360 Ok(rewrite_tmp_path(&self.path))
361 }
362}
363
364/// Write a fresh AOF base at `path`: just the magic header, fsynced. The
365/// COW background-save's log reset starts from this — the post-collect
366/// tee'd writes are appended by `finish_concurrent_rewrite` and the result
367/// swaps over the live AOF (the snapshot now carries the pre-collect state).
368pub fn write_aof_base(path: &Path) -> io::Result<()> {
369 let mut f = File::create(path)?;
370 f.write_all(AOF_MAGIC)?;
371 f.sync_all()
372}
373
374/// `<aof>.rewrite` — same-directory temp path so `rename(2)` stays atomic.
375fn rewrite_tmp_path(path: &Path) -> PathBuf {
376 let mut p = path.to_path_buf();
377 let new_name = match path.file_name() {
378 Some(n) => {
379 let mut s = n.to_os_string();
380 s.push(".rewrite");
381 s
382 }
383 None => std::ffi::OsString::from("aof.rewrite"),
384 };
385 p.set_file_name(new_name);
386 p
387}