kevy_persist/aof.rs
1//! Append-only command log. Split out from `lib.rs` to keep that file
2//! under the 500-LOC house rule; the snapshot writer/reader stays there.
3
4use std::fs::{File, OpenOptions};
5use std::io::{self, BufWriter, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use kevy_resp::ArgvView;
10use kevy_store::Store;
11
12use crate::{
13 dump_store_to_aof, estimate_multibulk_bytes, write_multibulk,
14};
15
16/// 9-byte file-format header written at the start of every kevy-managed
17/// AOF as of v1.2.0. `replay_aof` strips it before parsing RESP, so
18/// non-kevy bytes accidentally written into the AOF path (e.g. a deploy
19/// pipeline redirecting shell stderr into the file) get the same loud
20/// rejection as any other corrupt frame. Pre-1.2 AOFs (no magic) still
21/// replay — the parser only consumes the magic if it sees it.
22pub(crate) const AOF_MAGIC: &[u8; 9] = b"KEVYAOF1\n";
23
24/// When to fsync the AOF to disk.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Fsync {
27 /// fsync after every write — safest, slowest.
28 Always,
29 /// fsync at most once per second (call [`Aof::maybe_sync`] periodically).
30 EverySec,
31 /// Never fsync explicitly; leave it to the OS.
32 No,
33}
34
35/// An append-only command log. Each write command is appended as a RESP
36/// multi-bulk frame; [`crate::replay_aof`] re-applies them on startup.
37///
38/// Durability model (paired with snapshots): a snapshot taken at T0 plus
39/// the AOF of writes in (T0, now] reconstructs the current state. `SAVE`
40/// writes the snapshot then [`Aof::truncate`]s the log, so replay never
41/// double-applies.
42///
43/// Sizes (`size_bytes`, `size_at_last_rewrite`) drive auto-trigger of
44/// [`Aof::rewrite_from`] (BGREWRITEAOF) via the
45/// `auto_aof_rewrite_percentage` + `auto_aof_rewrite_min_size` knobs in
46/// `kevy_config`.
47pub struct Aof {
48 file: BufWriter<File>,
49 path: PathBuf,
50 fsync: Fsync,
51 dirty: bool,
52 last_sync: Instant,
53 /// Estimated bytes currently in the AOF file (existing + appended since
54 /// open). Maintained without fstat() syscalls per append.
55 size_bytes: u64,
56 /// File size right after the most recent [`Self::rewrite_from`] (or
57 /// `Self::open` if never rewritten). Anchor for `auto_aof_rewrite_*`.
58 size_at_last_rewrite: u64,
59 /// Total rewrites successfully completed since open. Surfaced via INFO.
60 rewrites_total: u64,
61}
62
63/// Result of an [`Aof::rewrite_from`] call. Surfaced by `BGREWRITEAOF` /
64/// `INFO persistence`.
65#[derive(Debug, Clone, Copy)]
66pub struct RewriteStats {
67 /// Keys dumped into the new AOF.
68 pub keys: u64,
69 /// New AOF size in bytes.
70 pub bytes: u64,
71}
72
73impl Aof {
74 /// Open (creating if needed) `path` for appending. New files get the
75 /// 9-byte [`AOF_MAGIC`] header so replays can identify the file as
76 /// kevy-managed. Pre-existing files (legacy bare-RESP or already-
77 /// magic'd) are left untouched.
78 pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
79 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
80 let mut size = file.metadata().map(|m| m.len()).unwrap_or(0);
81 if size == 0 {
82 // Fresh file: stamp the magic header so the replayer can
83 // distinguish kevy-written AOFs from accidental writes.
84 file.write_all(AOF_MAGIC)?;
85 file.sync_data()?;
86 size = AOF_MAGIC.len() as u64;
87 }
88 Ok(Aof {
89 file: BufWriter::new(file),
90 path: path.to_path_buf(),
91 fsync,
92 dirty: false,
93 last_sync: Instant::now(),
94 size_bytes: size,
95 size_at_last_rewrite: size,
96 rewrites_total: 0,
97 })
98 }
99
100 /// Append one command, applying the fsync policy.
101 pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
102 write_multibulk(&mut self.file, args)?;
103 self.size_bytes = self
104 .size_bytes
105 .saturating_add(estimate_multibulk_bytes(args));
106 match self.fsync {
107 Fsync::Always => {
108 self.file.flush()?;
109 self.file.get_ref().sync_data()?;
110 }
111 Fsync::EverySec | Fsync::No => self.dirty = true,
112 }
113 Ok(())
114 }
115
116 /// Flush+fsync if the `EverySec` window has elapsed. Call once per loop tick.
117 pub fn maybe_sync(&mut self) -> io::Result<()> {
118 if matches!(self.fsync, Fsync::EverySec)
119 && self.dirty
120 && self.last_sync.elapsed() >= Duration::from_secs(1)
121 {
122 self.file.flush()?;
123 self.file.get_ref().sync_data()?;
124 self.dirty = false;
125 self.last_sync = Instant::now();
126 }
127 Ok(())
128 }
129
130 /// Empty the log (after a snapshot has captured the full state). The
131 /// post-truncate file keeps the [`AOF_MAGIC`] header so replays of
132 /// the freshly-trimmed log still identify as kevy-managed.
133 pub fn truncate(&mut self) -> io::Result<()> {
134 self.file.flush()?;
135 let f = self.file.get_mut();
136 f.set_len(0)?;
137 f.seek(SeekFrom::Start(0))?; // harmless under O_APPEND; keeps len/pos coherent
138 f.write_all(AOF_MAGIC)?;
139 f.sync_all()?;
140 self.dirty = false;
141 self.size_bytes = AOF_MAGIC.len() as u64;
142 self.size_at_last_rewrite = AOF_MAGIC.len() as u64;
143 Ok(())
144 }
145
146 /// Estimated current AOF size in bytes (file content as of last append).
147 #[inline]
148 pub fn size_bytes(&self) -> u64 {
149 self.size_bytes
150 }
151
152 /// AOF size at the most recent rewrite (or open). Auto-trigger compares
153 /// `(size_bytes - size_at_last_rewrite) * 100 / size_at_last_rewrite` to
154 /// the `auto_aof_rewrite_percentage` knob.
155 #[inline]
156 pub fn size_at_last_rewrite(&self) -> u64 {
157 self.size_at_last_rewrite
158 }
159
160 /// Successful rewrite count since `Self::open`. Surfaced in INFO.
161 #[inline]
162 pub fn rewrites_total(&self) -> u64 {
163 self.rewrites_total
164 }
165
166 /// BGREWRITEAOF: rebuild a compact AOF from `store`'s current state and
167 /// atomically swap it in.
168 ///
169 /// **v1.0 is synchronous** — the calling shard blocks for the rewrite's
170 /// duration. Each shard owns its own AOF, so the shards' rewrites
171 /// proceed independently; per-shard blocking matches Redis's `BGSAVE`
172 /// cost in a typical single-key-per-shard workload. Concurrent
173 /// (rewrite-during-writes) incrementalisation is a v1.x perf item.
174 ///
175 /// Writes to a `<path>.rewrite` temp file with fsync, then `rename(2)`s
176 /// it over the live AOF. The append handle is reopened against the new
177 /// file before this call returns, so subsequent `append` calls land in
178 /// the rewritten log.
179 pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
180 // Flush any pending writes to the OLD file first so the snapshot
181 // accounts for everything the caller intended to durabilise.
182 self.file.flush()?;
183
184 let tmp = rewrite_tmp_path(&self.path);
185 let (keys, bytes) = dump_store_to_aof(&tmp, store)?;
186
187 // Atomic replacement. After this, the OLD file descriptor in
188 // `self.file` is open against an unlinked inode; new writes would
189 // go nowhere visible. Reopen against the new path.
190 std::fs::rename(&tmp, &self.path)?;
191 let f = OpenOptions::new().append(true).open(&self.path)?;
192 self.file = BufWriter::new(f);
193 self.size_bytes = bytes;
194 self.size_at_last_rewrite = bytes;
195 self.dirty = false;
196 self.rewrites_total = self.rewrites_total.saturating_add(1);
197 Ok(RewriteStats { keys, bytes })
198 }
199}
200
201/// `<aof>.rewrite` — same-directory temp path so `rename(2)` stays atomic.
202fn rewrite_tmp_path(path: &Path) -> PathBuf {
203 let mut p = path.to_path_buf();
204 let new_name = match path.file_name() {
205 Some(n) => {
206 let mut s = n.to_os_string();
207 s.push(".rewrite");
208 s
209 }
210 None => std::ffi::OsString::from("aof.rewrite"),
211 };
212 p.set_file_name(new_name);
213 p
214}