1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
//! Per-shard background persistence — the serialize-and-spill half of COW
//! snapshots/rewrites (`BGSAVE`, `BGREWRITEAOF`, the tick auto-rewrite).
//!
//! The shard thread freezes a [`SnapshotView`] (O(n)-shallow, see
//! `kevy_store::Store::collect_snapshot`) and hands it to a lazily-spawned
//! worker thread that does the actual serialization + disk I/O. Completions
//! come back over a channel and are applied from the shard's tick:
//!
//! - **BgSave**: the worker writes the snapshot's durable `<path>.tmp`
//! (no rename) and the tick renames it *and* swaps in the tee'd AOF
//! reset in one adjacent critical section — the snapshot/AOF commit
//! stays microseconds apart (same exposure as the synchronous `SAVE`),
//! not the seconds the background write takes.
//! - **RewriteAof**: the worker dumps the view as RESP commands to the
//! `.rewrite` temp; the tick appends the tee'd diff and swaps
//! (`Aof::finish_concurrent_rewrite`).
//!
//! One job in flight per shard (the Redis single-bgsave discipline); a
//! request landing while busy is skipped with a log line. A failed job
//! aborts the tee — the live AOF and the previous snapshot are untouched.
use crate::Commands;
use crate::shard::Shard;
use kevy_store::SnapshotView;
use std::io;
use std::path::PathBuf;
use std::sync::mpsc;
pub(crate) enum PersistJob {
/// Write `view` to `snap_path`'s durable tmp. `aof_reset` = an AOF tee
/// was started; the completion swaps in a fresh log of post-collect
/// writes (the COW replacement for the old save-then-truncate).
Save {
view: SnapshotView,
snap_path: PathBuf,
aof_reset: Option<PathBuf>,
},
/// Dump `view` as RESP commands at the AOF's `.rewrite` tmp.
Rewrite { view: SnapshotView, tmp: PathBuf },
}
pub(crate) enum PersistDone {
Save {
result: io::Result<PathBuf>, // the written snapshot tmp
snap_path: PathBuf,
aof_reset: Option<PathBuf>,
},
Rewrite {
result: io::Result<u64>, // keys dumped
tmp: PathBuf,
},
}
/// Lazily-spawned single-thread persister. Dropping it closes the channel;
/// the worker exits after finishing any in-flight job.
pub(crate) struct PersistWorker {
chans: Option<(mpsc::Sender<PersistJob>, mpsc::Receiver<PersistDone>)>,
in_flight: bool,
}
impl PersistWorker {
pub(crate) fn new() -> Self {
Self { chans: None, in_flight: false }
}
/// One job in flight at a time — callers check before collecting a view.
#[inline]
pub(crate) fn busy(&self) -> bool {
self.in_flight
}
/// Hand a job to the worker (spawning it on first use). Returns `false`
/// (without panicking) if the worker thread died — callers log + abort.
pub(crate) fn submit(&mut self, shard_id: usize, job: PersistJob) -> bool {
let (tx, _) = self.chans.get_or_insert_with(|| {
let (tx, job_rx) = mpsc::channel::<PersistJob>();
let (done_tx, done_rx) = mpsc::channel::<PersistDone>();
std::thread::Builder::new()
.name(format!("kevy-persist-{shard_id}"))
.spawn(move || {
while let Ok(job) = job_rx.recv() {
let done = run_job(job);
if done_tx.send(done).is_err() {
return; // shard gone — nothing to report to
}
}
})
.expect("spawn persist worker");
(tx, done_rx)
});
if tx.send(job).is_err() {
return false;
}
self.in_flight = true;
true
}
/// Non-blocking completion poll (called from the shard tick).
pub(crate) fn try_complete(&mut self) -> Option<PersistDone> {
let (_, rx) = self.chans.as_ref()?;
match rx.try_recv() {
Ok(done) => {
self.in_flight = false;
Some(done)
}
Err(_) => None,
}
}
}
fn run_job(job: PersistJob) -> PersistDone {
match job {
PersistJob::Save { view, snap_path, aof_reset } => PersistDone::Save {
result: kevy_persist::write_snapshot_tmp(&view, &snap_path),
snap_path,
aof_reset,
},
PersistJob::Rewrite { view, tmp } => PersistDone::Rewrite {
result: kevy_persist::dump_aof(&tmp, &view).map(|(keys, _bytes)| keys),
tmp,
},
}
}
impl<C: Commands> Shard<C> {
/// `BGSAVE` on this shard: freeze the view, start the AOF tee (the
/// post-collect writes become the reset log), hand off. Skipped with a
/// log line if a background job or rewrite is already in flight.
#[cold]
pub(crate) fn start_bg_save(&mut self) {
if self.persist.busy() || self.aof.as_ref().is_some_and(kevy_persist::Aof::is_rewriting) {
eprintln!("kevy: shard {} bgsave skipped (persist job in flight)", self.id);
return;
}
// collect + begin_view_rewrite back-to-back on this thread: no
// append can land between them (the tee atomicity contract).
let view = self.store.collect_snapshot();
let aof_reset = match &mut self.aof {
Some(aof) => match aof.begin_view_rewrite() {
Ok(tmp) => Some(tmp),
Err(e) => {
// Snapshot still proceeds; the AOF just isn't reset, so
// a replay stays correct (snapshot ∪ full log ⊇ state —
// the log is replayed over the *older* snapshot only
// until the next successful save).
eprintln!("kevy: shard {} bgsave aof tee failed: {e}", self.id);
None
}
},
None => None,
};
let job = PersistJob::Save { view, snap_path: self.snapshot_path(), aof_reset };
if !self.persist.submit(self.id, job) {
eprintln!("kevy: shard {} persist worker unavailable", self.id);
if let Some(aof) = &mut self.aof {
aof.abort_concurrent_rewrite();
}
}
}
/// `BGREWRITEAOF` / tick auto-rewrite on this shard: freeze the view,
/// start the tee, dump off-thread. No-op without an AOF (matches the
/// old synchronous behavior); skipped if a job is already in flight.
#[cold]
pub(crate) fn start_bg_rewrite(&mut self) {
if self.persist.busy() || self.aof.as_ref().is_none_or(kevy_persist::Aof::is_rewriting) {
if self.aof.is_some() {
eprintln!("kevy: shard {} aof rewrite skipped (persist job in flight)", self.id);
}
return;
}
let view = self.store.collect_snapshot();
let aof = self.aof.as_mut().expect("checked above");
let tmp = match aof.begin_view_rewrite() {
Ok(t) => t,
Err(e) => {
eprintln!("kevy: shard {} aof rewrite begin failed: {e}", self.id);
return;
}
};
if !self.persist.submit(self.id, PersistJob::Rewrite { view, tmp }) {
eprintln!("kevy: shard {} persist worker unavailable", self.id);
self.aof.as_mut().expect("checked").abort_concurrent_rewrite();
}
}
/// Apply a finished background job (tick path). Success commits in one
/// adjacent critical section (snapshot rename + AOF swap); failure
/// aborts the tee and leaves the previous snapshot + live AOF intact.
#[cold]
pub(crate) fn poll_persist_done(&mut self) {
let Some(done) = self.persist.try_complete() else { return };
match done {
PersistDone::Save { result: Ok(tmp), snap_path, aof_reset } => {
if let Err(e) = std::fs::rename(&tmp, &snap_path) {
eprintln!("kevy: shard {} bgsave rename failed: {e}", self.id);
self.abort_persist_tee(aof_reset);
return;
}
if let (Some(reset_tmp), Some(aof)) = (aof_reset, &mut self.aof) {
// The reset log's base is just the magic header; the
// tee'd post-collect writes are appended by finish.
let swap = kevy_persist::write_aof_base(&reset_tmp)
.and_then(|()| aof.finish_concurrent_rewrite(&reset_tmp, 0));
if let Err(e) = swap {
eprintln!("kevy: shard {} bgsave aof reset failed: {e}", self.id);
aof.abort_concurrent_rewrite();
let _ = std::fs::remove_file(&reset_tmp);
}
}
}
PersistDone::Save { result: Err(e), aof_reset, .. } => {
eprintln!("kevy: shard {} bgsave failed: {e}", self.id);
self.abort_persist_tee(aof_reset);
}
PersistDone::Rewrite { result: Ok(keys), tmp } => {
if let Some(aof) = &mut self.aof
&& let Err(e) = aof.finish_concurrent_rewrite(&tmp, keys)
{
eprintln!("kevy: shard {} aof rewrite swap failed: {e}", self.id);
aof.abort_concurrent_rewrite();
let _ = std::fs::remove_file(&tmp);
}
}
PersistDone::Rewrite { result: Err(e), tmp } => {
eprintln!("kevy: shard {} aof rewrite failed: {e}", self.id);
if let Some(aof) = &mut self.aof {
aof.abort_concurrent_rewrite();
}
let _ = std::fs::remove_file(&tmp);
}
}
}
fn abort_persist_tee(&mut self, aof_reset: Option<PathBuf>) {
if aof_reset.is_some()
&& let Some(aof) = &mut self.aof
{
aof.abort_concurrent_rewrite();
}
}
}