1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//! Per-tick housekeeping for the [`Shard`] reactor — pulled out of
//! [`crate::shard`] to keep that file under the 500-LOC house rule.
//!
//! Called from the reactor's tick branch (once per `tick_interval_ms`,
//! 100 ms by default). Each `Some` value from the embedder's
//! [`crate::Commands::live_runtime_config`] tick is applied to the
//! shard's live state, and the auto-AOF-rewrite check fires if the
//! live AOF has grown past its threshold.
use crate::Commands;
use crate::shard::Shard;
use std::time::Duration;
impl<C: Commands> Shard<C> {
/// Pull the live runtime knobs from the [`crate::Commands`] impl
/// and apply each `Some` to the shard's state. Called from the
/// tick branch (once per `tick_interval_ms`) so the cost is
/// amortised across thousands of commands; embedders that never
/// hot-swap inherit the trait default (all-None → zero work
/// beyond one struct build).
pub(crate) fn apply_live_runtime_config(&mut self, tick_interval: &mut Option<Duration>) {
let live = self.commands.live_runtime_config();
if let Some(f) = live.appendfsync
&& let Some(aof) = &mut self.aof
{
// A failure to flush on policy tighten is logged but doesn't
// bring the shard down — the policy itself still takes effect
// and subsequent appends will retry the sync.
if let Err(e) = aof.set_fsync(f) {
eprintln!("kevy: shard {} set_fsync failed: {e}", self.id);
}
}
if let Some(p) = live.auto_aof_rewrite_pct {
self.auto_aof_rewrite_pct = p;
}
if let Some(m) = live.auto_aof_rewrite_min_size {
self.auto_aof_rewrite_min_size = m;
}
if let Some(ms) = live.tick_interval_ms {
*tick_interval = if ms == 0 {
None
} else {
Some(Duration::from_millis(ms))
};
}
if let Some(flags) = live.notify_flags {
self.notify_flags = flags;
}
if let Some(t) = live.slowlog_slower_than_micros {
self.slowlog.slower_than_micros = t;
}
if let Some(n) = live.slowlog_max_len {
self.slowlog.max_len = n;
let cap = n as usize;
while self.slowlog.buf.len() > cap {
self.slowlog.buf.pop_front();
}
}
}
/// Check whether the live AOF has grown enough to warrant an automatic
/// `BGREWRITEAOF`, and run it inline if so. Called from the tick path
/// — at most every `tick_interval_ms`, so the cost is amortised across
/// thousands of writes per check. No-op when AOF is disabled, when the
/// `auto_aof_rewrite_pct` knob is `0`, or when the current AOF is
/// smaller than `auto_aof_rewrite_min_size`.
pub(crate) fn maybe_auto_rewrite_aof(&mut self) {
if self.auto_aof_rewrite_pct == 0 {
return;
}
let Some(aof) = &self.aof else { return };
let cur = aof.size_bytes();
if cur < self.auto_aof_rewrite_min_size {
return;
}
let baseline = aof.size_at_last_rewrite().max(1);
// (cur - baseline) * 100 / baseline ≥ pct ⇔ cur * 100 ≥ baseline * (100 + pct)
let lhs = cur.saturating_mul(100);
let rhs = baseline.saturating_mul(100u64.saturating_add(self.auto_aof_rewrite_pct as u64));
if lhs < rhs {
return;
}
let aof = self.aof.as_mut().expect("just checked");
if let Err(e) = aof.rewrite_from(&self.store) {
eprintln!("kevy: shard {} auto AOF rewrite failed: {e}", self.id);
}
}
}