1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//! Per-tick housekeeping for the [`Shard`] reactor — pulled out of
//! [`crate::shard`] to keep that file under the 500-LOC house rule.
//!
//! Called from the reactor's tick branch (once per `tick_interval_ms`,
//! 100 ms by default). Each `Some` value from the embedder's
//! [`crate::Commands::live_runtime_config`] tick is applied to the
//! shard's live state, and the auto-AOF-rewrite check fires if the
//! live AOF has grown past its threshold.
use crate::Commands;
use crate::replication::ReplicaState;
use crate::shard::Shard;
use std::time::Duration;
impl<C: Commands> Shard<C> {
/// Pull the live runtime knobs from the [`crate::Commands`] impl
/// and apply each `Some` to the shard's state. Called from the
/// tick branch (once per `tick_interval_ms`) so the cost is
/// amortised across thousands of commands; embedders that never
/// hot-swap inherit the trait default (all-None → zero work
/// beyond one struct build).
pub(crate) fn apply_live_runtime_config(&mut self, tick_interval: &mut Option<Duration>) {
let live = self.commands.live_runtime_config();
if let Some(f) = live.appendfsync
&& let Some(aof) = &mut self.aof
{
// A failure to flush on policy tighten is logged but doesn't
// bring the shard down — the policy itself still takes effect
// and subsequent appends will retry the sync.
if let Err(e) = aof.set_fsync(f) {
eprintln!("kevy: shard {} set_fsync failed: {e}", self.id);
}
}
if let Some(p) = live.auto_aof_rewrite_pct {
self.auto_aof_rewrite_pct = p;
}
if let Some(m) = live.auto_aof_rewrite_min_size {
self.auto_aof_rewrite_min_size = m;
}
if let Some(ms) = live.tick_interval_ms {
*tick_interval = if ms == 0 {
None
} else {
Some(Duration::from_millis(ms))
};
}
if let Some(flags) = live.notify_flags {
self.notify_flags = flags;
}
if let Some(t) = live.slowlog_slower_than_micros {
self.slowlog.slower_than_micros = t;
}
if let Some(n) = live.slowlog_max_len {
self.slowlog.max_len = n;
let cap = n as usize;
while self.slowlog.buf.len() > cap {
self.slowlog.buf.pop_front();
}
}
}
/// Check whether the live AOF has grown enough to warrant an automatic
/// `BGREWRITEAOF`, and run it inline if so. Called from the tick path
/// — at most every `tick_interval_ms`, so the cost is amortised across
/// thousands of writes per check. No-op when AOF is disabled, when the
/// `auto_aof_rewrite_pct` knob is `0`, or when the current AOF is
/// smaller than `auto_aof_rewrite_min_size`.
pub(crate) fn maybe_auto_rewrite_aof(&mut self) {
if self.auto_aof_rewrite_pct == 0 {
return;
}
let Some(aof) = &self.aof else { return };
let cur = aof.size_bytes();
if cur < self.auto_aof_rewrite_min_size {
return;
}
let baseline = aof.size_at_last_rewrite().max(1);
// (cur - baseline) * 100 / baseline ≥ pct ⇔ cur * 100 ≥ baseline * (100 + pct)
let lhs = cur.saturating_mul(100);
let rhs = baseline.saturating_mul(100u64.saturating_add(u64::from(self.auto_aof_rewrite_pct)));
if lhs < rhs {
return;
}
self.start_bg_rewrite();
}
/// Tick half of background persistence: apply any finished BGSAVE /
/// rewrite (commit or abort — see `poll_persist_done`), then check the
/// auto-rewrite threshold.
pub(crate) fn tick_persist(&mut self) {
self.poll_persist_done();
self.maybe_auto_rewrite_aof();
let in_flight =
self.persist.busy() || self.aof.as_ref().is_some_and(kevy_persist::Aof::is_rewriting);
let rewrites = self.aof.as_ref().map_or(0, kevy_persist::Aof::rewrites_total);
self.commands.on_persist_stats(in_flight, rewrites);
}
/// Publish this shard's replication view (master offset + connected
/// replicas count) to the embedder. No-op when replication is off
/// (the standalone fast path: one Option-discriminant check + an
/// early return). Same per-tick cadence as
/// [`Self::tick_persist`]; the command layer that serves `ROLE` /
/// `INFO replication` reads from the thread-local the embedder
/// stashes in [`crate::Commands::on_replication_view`].
/// T1.22.5: compute the per-shard backlog retention watermark
/// — `min(live sent_offsets, slot.min_acked_offset)` — and tell
/// the source to drop frames every consumer has moved past.
/// No-op when no consumer position exists yet (cold startup,
/// no replicas / no slots) so a brand-new replica still finds
/// the full backlog. Pure win on the steady-state: a slow
/// replica can pin retention via its `sent_offset`, but
/// fast/closed replicas no longer hold bytes the slow one is
/// catching up to.
pub(crate) fn tick_replication_watermark(&mut self) {
let Some(src) = self.replicate.as_mut() else { return };
let mut watermark: Option<u64> = None;
for c in &self.replicas {
let off = match &c.state {
crate::replication::ReplicaState::AckSent { from_offset, .. } => *from_offset,
crate::replication::ReplicaState::Streaming { sent_offset, .. } => *sent_offset,
crate::replication::ReplicaState::SnapshotShipping { ack_offset, .. } => *ack_offset,
_ => continue,
};
watermark = Some(watermark.map_or(off, |w| w.min(off)));
}
if let Some(slot_min) = self.slots.min_acked_offset() {
watermark = Some(watermark.map_or(slot_min, |w| w.min(slot_min)));
}
if let Some(w) = watermark {
src.drop_up_to(w);
}
}
pub(crate) fn tick_replication_view(&mut self) {
let Some(src) = &self.replicate else { return };
let offset = src.next_offset();
// Collect per-replica `(ipv4, port, sent_offset)` from every
// handshake-complete replica conn. `peer` was captured at
// accept time (T1.28.5); `sent_offset` is the live value
// from the state machine. For `SnapshotShipping`, report
// `ack_offset` (the snapshot's frozen-at offset) since
// streaming hasn't started yet.
let mut replicas = Vec::with_capacity(self.replicas.len());
for c in &self.replicas {
let sent = match &c.state {
ReplicaState::AckSent { from_offset, .. } => *from_offset,
ReplicaState::Streaming { sent_offset, .. } => *sent_offset,
ReplicaState::SnapshotShipping { ack_offset, .. } => *ack_offset,
_ => continue,
};
replicas.push((c.peer.0, c.peer.1, sent));
}
self.commands.on_replication_view(offset, replicas);
}
}