Skip to main content

kevy_persist/
aof.rs

1//! Append-only command log. Split out from `lib.rs` to keep that file
2//! under the 500-LOC house rule; the snapshot writer/reader stays there.
3
4use std::fs::{File, OpenOptions};
5use std::io::{self, BufWriter, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use kevy_resp::ArgvView;
10use kevy_store::Store;
11
12use crate::{
13    dump_store_to_aof, estimate_multibulk_bytes, write_multibulk,
14};
15
16/// 9-byte file-format header written at the start of every kevy-managed
17/// AOF as of v1.2.0. `replay_aof` strips it before parsing RESP, so
18/// non-kevy bytes accidentally written into the AOF path (e.g. a deploy
19/// pipeline redirecting shell stderr into the file) get the same loud
20/// rejection as any other corrupt frame. Pre-1.2 AOFs (no magic) still
21/// replay — the parser only consumes the magic if it sees it.
22pub(crate) const AOF_MAGIC: &[u8; 9] = b"KEVYAOF1\n";
23
24/// When to fsync the AOF to disk.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Fsync {
27    /// fsync after every write — safest, slowest.
28    Always,
29    /// fsync at most once per second (call [`Aof::maybe_sync`] periodically).
30    EverySec,
31    /// Never fsync explicitly; leave it to the OS.
32    No,
33}
34
35/// An append-only command log. Each write command is appended as a RESP
36/// multi-bulk frame; [`crate::replay_aof`] re-applies them on startup.
37///
38/// Durability model (paired with snapshots): a snapshot taken at T0 plus
39/// the AOF of writes in (T0, now] reconstructs the current state. `SAVE`
40/// writes the snapshot then [`Aof::truncate`]s the log, so replay never
41/// double-applies.
42///
43/// Sizes (`size_bytes`, `size_at_last_rewrite`) drive auto-trigger of
44/// [`Aof::rewrite_from`] (BGREWRITEAOF) via the
45/// `auto_aof_rewrite_percentage` + `auto_aof_rewrite_min_size` knobs in
46/// `kevy_config`.
47pub struct Aof {
48    file: BufWriter<File>,
49    path: PathBuf,
50    fsync: Fsync,
51    dirty: bool,
52    last_sync: Instant,
53    /// Estimated bytes currently in the AOF file (existing + appended since
54    /// open). Maintained without fstat() syscalls per append.
55    size_bytes: u64,
56    /// File size right after the most recent [`Self::rewrite_from`] (or
57    /// `Self::open` if never rewritten). Anchor for `auto_aof_rewrite_*`.
58    size_at_last_rewrite: u64,
59    /// Total rewrites successfully completed since open. Surfaced via INFO.
60    rewrites_total: u64,
61    /// Group-commit window: while `true`, an `Fsync::Always` `append` only
62    /// buffers (sets `dirty`) instead of fsyncing per command. The caller
63    /// brackets a batch of writes with [`Self::begin_group`] /
64    /// [`Self::end_group`] and `end_group` does the single fsync **before**
65    /// the batch's replies are sent — preserving "durable before reply"
66    /// while amortizing the per-command `flush()+sync_data()` syscalls.
67    /// Only the multi-command reactor entry points (pipelined socket reads,
68    /// cross-shard request batches) open a group; every other path keeps
69    /// the per-command fsync, so the default is always the safe one.
70    deferred: bool,
71}
72
73/// Result of an [`Aof::rewrite_from`] call. Surfaced by `BGREWRITEAOF` /
74/// `INFO persistence`.
75#[derive(Debug, Clone, Copy)]
76pub struct RewriteStats {
77    /// Keys dumped into the new AOF.
78    pub keys: u64,
79    /// New AOF size in bytes.
80    pub bytes: u64,
81}
82
83impl Aof {
84    /// Open (creating if needed) `path` for appending. New files get the
85    /// 9-byte [`AOF_MAGIC`] header so replays can identify the file as
86    /// kevy-managed. Pre-existing files (legacy bare-RESP or already-
87    /// magic'd) are left untouched.
88    pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
89        let mut file = OpenOptions::new().create(true).append(true).open(path)?;
90        let mut size = file.metadata().map(|m| m.len()).unwrap_or(0);
91        if size == 0 {
92            // Fresh file: stamp the magic header so the replayer can
93            // distinguish kevy-written AOFs from accidental writes.
94            file.write_all(AOF_MAGIC)?;
95            file.sync_data()?;
96            size = AOF_MAGIC.len() as u64;
97        }
98        Ok(Aof {
99            file: BufWriter::new(file),
100            path: path.to_path_buf(),
101            fsync,
102            dirty: false,
103            last_sync: Instant::now(),
104            size_bytes: size,
105            size_at_last_rewrite: size,
106            rewrites_total: 0,
107            deferred: false,
108        })
109    }
110
111    /// The fsync policy this AOF was opened with (or last switched to).
112    /// Mostly for tests / INFO output; the hot path doesn't read this.
113    #[inline]
114    pub fn fsync_policy(&self) -> Fsync {
115        self.fsync
116    }
117
118    /// Switch the fsync policy at runtime (called by `CONFIG SET
119    /// appendfsync`). When tightening to `Always`, also flushes + fsyncs
120    /// any bytes still in the BufWriter so the new "every write is on
121    /// disk before reply" contract is honoured starting on the next
122    /// append, not after the dirty backlog clears.
123    pub fn set_fsync(&mut self, fsync: Fsync) -> io::Result<()> {
124        let upgrading_to_always = matches!(fsync, Fsync::Always) && !matches!(self.fsync, Fsync::Always);
125        self.fsync = fsync;
126        if upgrading_to_always && self.dirty {
127            self.file.flush()?;
128            self.file.get_ref().sync_data()?;
129            self.dirty = false;
130            self.last_sync = Instant::now();
131        }
132        Ok(())
133    }
134
135    /// Append one command, applying the fsync policy.
136    pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
137        write_multibulk(&mut self.file, args)?;
138        self.size_bytes = self
139            .size_bytes
140            .saturating_add(estimate_multibulk_bytes(args));
141        match self.fsync {
142            // Inside a group-commit window, defer the fsync to `end_group`
143            // (one per batch, still before the batch's replies). Outside
144            // one, fsync per command — the safe default for every path.
145            Fsync::Always if self.deferred => self.dirty = true,
146            Fsync::Always => {
147                self.file.flush()?;
148                self.file.get_ref().sync_data()?;
149            }
150            Fsync::EverySec | Fsync::No => self.dirty = true,
151        }
152        Ok(())
153    }
154
155    /// Open a group-commit window (no-op unless the policy is `Always`):
156    /// subsequent `append`s buffer instead of fsyncing per command. Pair
157    /// with [`Self::end_group`] **before** sending the batch's replies.
158    #[inline]
159    pub fn begin_group(&mut self) {
160        if matches!(self.fsync, Fsync::Always) {
161            self.deferred = true;
162        }
163    }
164
165    /// Close the group-commit window: one `flush()+sync_data()` for the
166    /// whole batch (if anything was buffered), then resume per-command
167    /// fsync. Must be called before the batch's replies leave the shard.
168    #[inline]
169    pub fn end_group(&mut self) -> io::Result<()> {
170        if self.deferred {
171            self.deferred = false;
172            if self.dirty {
173                self.file.flush()?;
174                self.file.get_ref().sync_data()?;
175                self.dirty = false;
176                self.last_sync = Instant::now();
177            }
178        }
179        Ok(())
180    }
181
182    /// Flush+fsync if the `EverySec` window has elapsed. Call once per loop tick.
183    pub fn maybe_sync(&mut self) -> io::Result<()> {
184        if matches!(self.fsync, Fsync::EverySec)
185            && self.dirty
186            && self.last_sync.elapsed() >= Duration::from_secs(1)
187        {
188            self.file.flush()?;
189            self.file.get_ref().sync_data()?;
190            self.dirty = false;
191            self.last_sync = Instant::now();
192        }
193        Ok(())
194    }
195
196    /// Empty the log (after a snapshot has captured the full state). The
197    /// post-truncate file keeps the [`AOF_MAGIC`] header so replays of
198    /// the freshly-trimmed log still identify as kevy-managed.
199    pub fn truncate(&mut self) -> io::Result<()> {
200        self.file.flush()?;
201        let f = self.file.get_mut();
202        f.set_len(0)?;
203        f.seek(SeekFrom::Start(0))?; // harmless under O_APPEND; keeps len/pos coherent
204        f.write_all(AOF_MAGIC)?;
205        f.sync_all()?;
206        self.dirty = false;
207        self.size_bytes = AOF_MAGIC.len() as u64;
208        self.size_at_last_rewrite = AOF_MAGIC.len() as u64;
209        Ok(())
210    }
211
212    /// Estimated current AOF size in bytes (file content as of last append).
213    #[inline]
214    pub fn size_bytes(&self) -> u64 {
215        self.size_bytes
216    }
217
218    /// AOF size at the most recent rewrite (or open). Auto-trigger compares
219    /// `(size_bytes - size_at_last_rewrite) * 100 / size_at_last_rewrite` to
220    /// the `auto_aof_rewrite_percentage` knob.
221    #[inline]
222    pub fn size_at_last_rewrite(&self) -> u64 {
223        self.size_at_last_rewrite
224    }
225
226    /// Successful rewrite count since `Self::open`. Surfaced in INFO.
227    #[inline]
228    pub fn rewrites_total(&self) -> u64 {
229        self.rewrites_total
230    }
231
232    /// BGREWRITEAOF: rebuild a compact AOF from `store`'s current state and
233    /// atomically swap it in.
234    ///
235    /// **v1.0 is synchronous** — the calling shard blocks for the rewrite's
236    /// duration. Each shard owns its own AOF, so the shards' rewrites
237    /// proceed independently; per-shard blocking matches Redis's `BGSAVE`
238    /// cost in a typical single-key-per-shard workload. Concurrent
239    /// (rewrite-during-writes) incrementalisation is a v1.x perf item.
240    ///
241    /// Writes to a `<path>.rewrite` temp file with fsync, then `rename(2)`s
242    /// it over the live AOF. The append handle is reopened against the new
243    /// file before this call returns, so subsequent `append` calls land in
244    /// the rewritten log.
245    pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
246        // Flush any pending writes to the OLD file first so the snapshot
247        // accounts for everything the caller intended to durabilise.
248        self.file.flush()?;
249
250        let tmp = rewrite_tmp_path(&self.path);
251        let (keys, bytes) = dump_store_to_aof(&tmp, store)?;
252
253        // Atomic replacement. After this, the OLD file descriptor in
254        // `self.file` is open against an unlinked inode; new writes would
255        // go nowhere visible. Reopen against the new path.
256        std::fs::rename(&tmp, &self.path)?;
257        let f = OpenOptions::new().append(true).open(&self.path)?;
258        self.file = BufWriter::new(f);
259        self.size_bytes = bytes;
260        self.size_at_last_rewrite = bytes;
261        self.dirty = false;
262        self.rewrites_total = self.rewrites_total.saturating_add(1);
263        Ok(RewriteStats { keys, bytes })
264    }
265}
266
267/// `<aof>.rewrite` — same-directory temp path so `rename(2)` stays atomic.
268fn rewrite_tmp_path(path: &Path) -> PathBuf {
269    let mut p = path.to_path_buf();
270    let new_name = match path.file_name() {
271        Some(n) => {
272            let mut s = n.to_os_string();
273            s.push(".rewrite");
274            s
275        }
276        None => std::ffi::OsString::from("aof.rewrite"),
277    };
278    p.set_file_name(new_name);
279    p
280}