Skip to main content

kevy_persist/
aof.rs

1//! Append-only command log. Split out from `lib.rs` to keep that file
2//! under the 500-LOC house rule; the snapshot writer/reader stays there.
3
4use std::fs::{File, OpenOptions};
5use std::io::{self, BufWriter, Seek, SeekFrom, Write};
6use std::path::{Path, PathBuf};
7use std::time::{Duration, Instant};
8
9use kevy_resp::ArgvView;
10use kevy_store::Store;
11
12use crate::{
13    dump_store_to_aof, estimate_multibulk_bytes, write_multibulk,
14};
15
16/// When to fsync the AOF to disk.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum Fsync {
19    /// fsync after every write — safest, slowest.
20    Always,
21    /// fsync at most once per second (call [`Aof::maybe_sync`] periodically).
22    EverySec,
23    /// Never fsync explicitly; leave it to the OS.
24    No,
25}
26
27/// An append-only command log. Each write command is appended as a RESP
28/// multi-bulk frame; [`crate::replay_aof`] re-applies them on startup.
29///
30/// Durability model (paired with snapshots): a snapshot taken at T0 plus
31/// the AOF of writes in (T0, now] reconstructs the current state. `SAVE`
32/// writes the snapshot then [`Aof::truncate`]s the log, so replay never
33/// double-applies.
34///
35/// Sizes (`size_bytes`, `size_at_last_rewrite`) drive auto-trigger of
36/// [`Aof::rewrite_from`] (BGREWRITEAOF) via the
37/// `auto_aof_rewrite_percentage` + `auto_aof_rewrite_min_size` knobs in
38/// `kevy_config`.
39pub struct Aof {
40    file: BufWriter<File>,
41    path: PathBuf,
42    fsync: Fsync,
43    dirty: bool,
44    last_sync: Instant,
45    /// Estimated bytes currently in the AOF file (existing + appended since
46    /// open). Maintained without fstat() syscalls per append.
47    size_bytes: u64,
48    /// File size right after the most recent [`Self::rewrite_from`] (or
49    /// `Self::open` if never rewritten). Anchor for `auto_aof_rewrite_*`.
50    size_at_last_rewrite: u64,
51    /// Total rewrites successfully completed since open. Surfaced via INFO.
52    rewrites_total: u64,
53}
54
55/// Result of an [`Aof::rewrite_from`] call. Surfaced by `BGREWRITEAOF` /
56/// `INFO persistence`.
57#[derive(Debug, Clone, Copy)]
58pub struct RewriteStats {
59    /// Keys dumped into the new AOF.
60    pub keys: u64,
61    /// New AOF size in bytes.
62    pub bytes: u64,
63}
64
65impl Aof {
66    /// Open (creating if needed) `path` for appending.
67    pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
68        let file = OpenOptions::new().create(true).append(true).open(path)?;
69        let size = file.metadata().map(|m| m.len()).unwrap_or(0);
70        Ok(Aof {
71            file: BufWriter::new(file),
72            path: path.to_path_buf(),
73            fsync,
74            dirty: false,
75            last_sync: Instant::now(),
76            size_bytes: size,
77            size_at_last_rewrite: size,
78            rewrites_total: 0,
79        })
80    }
81
82    /// Append one command, applying the fsync policy.
83    pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
84        write_multibulk(&mut self.file, args)?;
85        self.size_bytes = self
86            .size_bytes
87            .saturating_add(estimate_multibulk_bytes(args));
88        match self.fsync {
89            Fsync::Always => {
90                self.file.flush()?;
91                self.file.get_ref().sync_data()?;
92            }
93            Fsync::EverySec | Fsync::No => self.dirty = true,
94        }
95        Ok(())
96    }
97
98    /// Flush+fsync if the `EverySec` window has elapsed. Call once per loop tick.
99    pub fn maybe_sync(&mut self) -> io::Result<()> {
100        if matches!(self.fsync, Fsync::EverySec)
101            && self.dirty
102            && self.last_sync.elapsed() >= Duration::from_secs(1)
103        {
104            self.file.flush()?;
105            self.file.get_ref().sync_data()?;
106            self.dirty = false;
107            self.last_sync = Instant::now();
108        }
109        Ok(())
110    }
111
112    /// Empty the log (after a snapshot has captured the full state).
113    pub fn truncate(&mut self) -> io::Result<()> {
114        self.file.flush()?;
115        let f = self.file.get_mut();
116        f.set_len(0)?;
117        f.seek(SeekFrom::Start(0))?; // harmless under O_APPEND; keeps len/pos coherent
118        f.sync_all()?;
119        self.dirty = false;
120        self.size_bytes = 0;
121        self.size_at_last_rewrite = 0;
122        Ok(())
123    }
124
125    /// Estimated current AOF size in bytes (file content as of last append).
126    #[inline]
127    pub fn size_bytes(&self) -> u64 {
128        self.size_bytes
129    }
130
131    /// AOF size at the most recent rewrite (or open). Auto-trigger compares
132    /// `(size_bytes - size_at_last_rewrite) * 100 / size_at_last_rewrite` to
133    /// the `auto_aof_rewrite_percentage` knob.
134    #[inline]
135    pub fn size_at_last_rewrite(&self) -> u64 {
136        self.size_at_last_rewrite
137    }
138
139    /// Successful rewrite count since `Self::open`. Surfaced in INFO.
140    #[inline]
141    pub fn rewrites_total(&self) -> u64 {
142        self.rewrites_total
143    }
144
145    /// BGREWRITEAOF: rebuild a compact AOF from `store`'s current state and
146    /// atomically swap it in.
147    ///
148    /// **v1.0 is synchronous** — the calling shard blocks for the rewrite's
149    /// duration. Each shard owns its own AOF, so the shards' rewrites
150    /// proceed independently; per-shard blocking matches Redis's `BGSAVE`
151    /// cost in a typical single-key-per-shard workload. Concurrent
152    /// (rewrite-during-writes) incrementalisation is a v1.x perf item.
153    ///
154    /// Writes to a `<path>.rewrite` temp file with fsync, then `rename(2)`s
155    /// it over the live AOF. The append handle is reopened against the new
156    /// file before this call returns, so subsequent `append` calls land in
157    /// the rewritten log.
158    pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
159        // Flush any pending writes to the OLD file first so the snapshot
160        // accounts for everything the caller intended to durabilise.
161        self.file.flush()?;
162
163        let tmp = rewrite_tmp_path(&self.path);
164        let (keys, bytes) = dump_store_to_aof(&tmp, store)?;
165
166        // Atomic replacement. After this, the OLD file descriptor in
167        // `self.file` is open against an unlinked inode; new writes would
168        // go nowhere visible. Reopen against the new path.
169        std::fs::rename(&tmp, &self.path)?;
170        let f = OpenOptions::new().append(true).open(&self.path)?;
171        self.file = BufWriter::new(f);
172        self.size_bytes = bytes;
173        self.size_at_last_rewrite = bytes;
174        self.dirty = false;
175        self.rewrites_total = self.rewrites_total.saturating_add(1);
176        Ok(RewriteStats { keys, bytes })
177    }
178}
179
180/// `<aof>.rewrite` — same-directory temp path so `rename(2)` stays atomic.
181fn rewrite_tmp_path(path: &Path) -> PathBuf {
182    let mut p = path.to_path_buf();
183    let new_name = match path.file_name() {
184        Some(n) => {
185            let mut s = n.to_os_string();
186            s.push(".rewrite");
187            s
188        }
189        None => std::ffi::OsString::from("aof.rewrite"),
190    };
191    p.set_file_name(new_name);
192    p
193}