kevy_persist/aof.rs
1//! Append-only command log. Split out from `lib.rs` to keep that file
2//! under the 500-LOC house rule; the snapshot writer/reader stays there.
3
4use std::fs::{File, OpenOptions};
5use std::io::{self, BufWriter, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use kevy_resp::ArgvView;
10use kevy_store::Store;
11
12use crate::{
13 dump_store_to_aof, estimate_multibulk_bytes, write_multibulk,
14};
15
16/// When to fsync the AOF to disk.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum Fsync {
19 /// fsync after every write — safest, slowest.
20 Always,
21 /// fsync at most once per second (call [`Aof::maybe_sync`] periodically).
22 EverySec,
23 /// Never fsync explicitly; leave it to the OS.
24 No,
25}
26
27/// An append-only command log. Each write command is appended as a RESP
28/// multi-bulk frame; [`crate::replay_aof`] re-applies them on startup.
29///
30/// Durability model (paired with snapshots): a snapshot taken at T0 plus
31/// the AOF of writes in (T0, now] reconstructs the current state. `SAVE`
32/// writes the snapshot then [`Aof::truncate`]s the log, so replay never
33/// double-applies.
34///
35/// Sizes (`size_bytes`, `size_at_last_rewrite`) drive auto-trigger of
36/// [`Aof::rewrite_from`] (BGREWRITEAOF) via the
37/// `auto_aof_rewrite_percentage` + `auto_aof_rewrite_min_size` knobs in
38/// `kevy_config`.
39pub struct Aof {
40 file: BufWriter<File>,
41 path: PathBuf,
42 fsync: Fsync,
43 dirty: bool,
44 last_sync: Instant,
45 /// Estimated bytes currently in the AOF file (existing + appended since
46 /// open). Maintained without fstat() syscalls per append.
47 size_bytes: u64,
48 /// File size right after the most recent [`Self::rewrite_from`] (or
49 /// `Self::open` if never rewritten). Anchor for `auto_aof_rewrite_*`.
50 size_at_last_rewrite: u64,
51 /// Total rewrites successfully completed since open. Surfaced via INFO.
52 rewrites_total: u64,
53}
54
55/// Result of an [`Aof::rewrite_from`] call. Surfaced by `BGREWRITEAOF` /
56/// `INFO persistence`.
57#[derive(Debug, Clone, Copy)]
58pub struct RewriteStats {
59 /// Keys dumped into the new AOF.
60 pub keys: u64,
61 /// New AOF size in bytes.
62 pub bytes: u64,
63}
64
65impl Aof {
66 /// Open (creating if needed) `path` for appending.
67 pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
68 let file = OpenOptions::new().create(true).append(true).open(path)?;
69 let size = file.metadata().map(|m| m.len()).unwrap_or(0);
70 Ok(Aof {
71 file: BufWriter::new(file),
72 path: path.to_path_buf(),
73 fsync,
74 dirty: false,
75 last_sync: Instant::now(),
76 size_bytes: size,
77 size_at_last_rewrite: size,
78 rewrites_total: 0,
79 })
80 }
81
82 /// Append one command, applying the fsync policy.
83 pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
84 write_multibulk(&mut self.file, args)?;
85 self.size_bytes = self
86 .size_bytes
87 .saturating_add(estimate_multibulk_bytes(args));
88 match self.fsync {
89 Fsync::Always => {
90 self.file.flush()?;
91 self.file.get_ref().sync_data()?;
92 }
93 Fsync::EverySec | Fsync::No => self.dirty = true,
94 }
95 Ok(())
96 }
97
98 /// Flush+fsync if the `EverySec` window has elapsed. Call once per loop tick.
99 pub fn maybe_sync(&mut self) -> io::Result<()> {
100 if matches!(self.fsync, Fsync::EverySec)
101 && self.dirty
102 && self.last_sync.elapsed() >= Duration::from_secs(1)
103 {
104 self.file.flush()?;
105 self.file.get_ref().sync_data()?;
106 self.dirty = false;
107 self.last_sync = Instant::now();
108 }
109 Ok(())
110 }
111
112 /// Empty the log (after a snapshot has captured the full state).
113 pub fn truncate(&mut self) -> io::Result<()> {
114 self.file.flush()?;
115 let f = self.file.get_mut();
116 f.set_len(0)?;
117 f.seek(SeekFrom::Start(0))?; // harmless under O_APPEND; keeps len/pos coherent
118 f.sync_all()?;
119 self.dirty = false;
120 self.size_bytes = 0;
121 self.size_at_last_rewrite = 0;
122 Ok(())
123 }
124
125 /// Estimated current AOF size in bytes (file content as of last append).
126 #[inline]
127 pub fn size_bytes(&self) -> u64 {
128 self.size_bytes
129 }
130
131 /// AOF size at the most recent rewrite (or open). Auto-trigger compares
132 /// `(size_bytes - size_at_last_rewrite) * 100 / size_at_last_rewrite` to
133 /// the `auto_aof_rewrite_percentage` knob.
134 #[inline]
135 pub fn size_at_last_rewrite(&self) -> u64 {
136 self.size_at_last_rewrite
137 }
138
139 /// Successful rewrite count since `Self::open`. Surfaced in INFO.
140 #[inline]
141 pub fn rewrites_total(&self) -> u64 {
142 self.rewrites_total
143 }
144
145 /// BGREWRITEAOF: rebuild a compact AOF from `store`'s current state and
146 /// atomically swap it in.
147 ///
148 /// **v1.0 is synchronous** — the calling shard blocks for the rewrite's
149 /// duration. Each shard owns its own AOF, so the shards' rewrites
150 /// proceed independently; per-shard blocking matches Redis's `BGSAVE`
151 /// cost in a typical single-key-per-shard workload. Concurrent
152 /// (rewrite-during-writes) incrementalisation is a v1.x perf item.
153 ///
154 /// Writes to a `<path>.rewrite` temp file with fsync, then `rename(2)`s
155 /// it over the live AOF. The append handle is reopened against the new
156 /// file before this call returns, so subsequent `append` calls land in
157 /// the rewritten log.
158 pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
159 // Flush any pending writes to the OLD file first so the snapshot
160 // accounts for everything the caller intended to durabilise.
161 self.file.flush()?;
162
163 let tmp = rewrite_tmp_path(&self.path);
164 let (keys, bytes) = dump_store_to_aof(&tmp, store)?;
165
166 // Atomic replacement. After this, the OLD file descriptor in
167 // `self.file` is open against an unlinked inode; new writes would
168 // go nowhere visible. Reopen against the new path.
169 std::fs::rename(&tmp, &self.path)?;
170 let f = OpenOptions::new().append(true).open(&self.path)?;
171 self.file = BufWriter::new(f);
172 self.size_bytes = bytes;
173 self.size_at_last_rewrite = bytes;
174 self.dirty = false;
175 self.rewrites_total = self.rewrites_total.saturating_add(1);
176 Ok(RewriteStats { keys, bytes })
177 }
178}
179
180/// `<aof>.rewrite` — same-directory temp path so `rename(2)` stays atomic.
181fn rewrite_tmp_path(path: &Path) -> PathBuf {
182 let mut p = path.to_path_buf();
183 let new_name = match path.file_name() {
184 Some(n) => {
185 let mut s = n.to_os_string();
186 s.push(".rewrite");
187 s
188 }
189 None => std::ffi::OsString::from("aof.rewrite"),
190 };
191 p.set_file_name(new_name);
192 p
193}