1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
//! Per-iter producer pump that drains the per-shard
//! [`kevy_replicate::source::ReplicationSource`] backlog into the
//! output buffers of streaming replicas. Split out of
//! [`crate::replication`] (which holds the accept + handshake state
//! machine) so each file stays under the 500-LOC house rule.
//!
//! Called from [`crate::shard::Shard::run`] once per reactor iteration.
//! Cost when replication is off = one `Option::is_none()` check; cost
//! with no streaming replicas = one extra `Vec::is_empty()` after.
use crate::Commands;
use crate::replication::ReplicaState;
use crate::shard::Shard;
use kevy_replicate::wire::{
SNAPSHOT_CHUNK_MAX, encode_snapshot_begin, encode_snapshot_chunk, encode_snapshot_end,
};
use std::io;
/// Per-iter per-replica byte budget. A streaming replica picks up at
/// most this many bytes of new frames per reactor iteration; the
/// remainder waits for the next pump. Prevents a single replica from
/// monopolising a shard's loop time when a large backlog drains.
/// 256 KiB ≈ ~1500 small SET frames per iter.
const PUMP_BYTE_BUDGET_PER_ITER: usize = 256 * 1024;
/// Hard cap on a streaming replica's outbound buffer (bytes appended
/// but not yet written to the socket). Reached when the replica's TCP
/// receive window is full + the primary keeps pushing. v1.18.0 policy
/// on hitting the cap: close the link. A reconnect (within the
/// reconnect window — T1.15 wiring) resumes from the source backlog
/// or full-snapshots (T1.22+). The alternatives (block, retry, or
/// silently drop frames) all corrupt the "every committed write
/// reaches every replica" invariant; closing surfaces the problem.
const STREAMING_OUTPUT_CAP: usize = 4 * 1024 * 1024;
impl<C: Commands> Shard<C> {
/// Per-iter producer pump. Three phases:
///
/// 1. Walk every replica and dispatch by state — Streaming fills
/// from backlog ([`Self::fill_streaming_output`]); SnapshotShipping
/// chunks from the in-memory snapshot buffer
/// ([`Self::pump_snapshot_chunks`]).
/// 2. [`Self::drain_streaming_outputs`] tries to write each
/// replica's pending output non-blocking; partial writes wait
/// on the next writability event.
pub(crate) fn pump_replication(&mut self) -> io::Result<()> {
let Some(src) = self.replicate.as_ref() else {
return Ok(());
};
if self.replicas.is_empty() {
return Ok(());
}
let next = src.next_offset();
for idx in 0..self.replicas.len() {
match self.replicas[idx].state {
ReplicaState::Streaming { .. } => self.fill_streaming_output(idx, next),
ReplicaState::SnapshotShipping { .. } => self.pump_snapshot_chunks(idx),
_ => {}
}
}
self.drain_streaming_outputs()
}
/// Refill one replica's output buffer with backlog frames. Skips
/// when the conn is not in Streaming, is already caught up, or
/// has too much pending output (backpressure). Closes the conn on
/// `TooOld` (need snapshot ship — T1.22+) or `Future` (corrupt
/// state from a bad peer).
fn fill_streaming_output(&mut self, idx: usize, primary_next: u64) {
let ReplicaState::Streaming { sent_offset, .. } = self.replicas[idx].state else {
return;
};
if sent_offset >= primary_next {
return; // caught up
}
let pending = self.replicas[idx].output.len() - self.replicas[idx].write_off;
if pending >= STREAMING_OUTPUT_CAP / 2 {
return; // backpressure — let the socket drain first
}
let Some(src) = self.replicate.as_ref() else {
return;
};
let frames = match src.frames_from(sent_offset) {
Ok(it) => it,
Err(kevy_replicate::source::FromOffset::TooOld) => {
// Replica fell behind the backlog window. Trigger a
// snapshot ship (T1.23): serialize the local store
// in-memory now, transition the conn to
// SnapshotShipping, the next pump iteration chunks
// it out via pump_snapshot_chunks.
if let Err(e) = self.start_snapshot_ship(idx, primary_next) {
eprintln!(
"kevy: replica fd {} snapshot ship trigger failed: {e}; dropping link",
self.replicas[idx].fd,
);
self.replicas[idx].close();
}
return;
}
Err(kevy_replicate::source::FromOffset::Future) => {
eprintln!(
"kevy: replica fd {} sent_offset {} > primary next {}; \
corrupt state, dropping link",
self.replicas[idx].fd, sent_offset, primary_next,
);
self.replicas[idx].close();
return;
}
};
// Copy frame bytes into a local Vec first so the mutable
// borrow of `self.replicas[idx].output` doesn't overlap with
// the immutable borrow of `src` via `frames`.
let mut append = Vec::new();
let mut new_sent = sent_offset;
let mut bytes_this_pump = 0usize;
for frame in frames {
if bytes_this_pump + frame.bytes.len() > PUMP_BYTE_BUDGET_PER_ITER
|| pending + bytes_this_pump + frame.bytes.len() > STREAMING_OUTPUT_CAP
{
break;
}
append.extend_from_slice(&frame.bytes);
bytes_this_pump += frame.bytes.len();
new_sent = frame.offset + 1;
}
if !append.is_empty() {
let conn = &mut self.replicas[idx];
conn.output.extend_from_slice(&append);
if let ReplicaState::Streaming { sent_offset, .. } = &mut conn.state {
*sent_offset = new_sent;
}
}
}
/// Drain every streaming / ack-pending / snapshot-shipping
/// replica's output buffer non-blocking. Partial writes wait for
/// the next writability event. Replicas whose output stays at
/// the cap after a drain attempt are closed.
fn drain_streaming_outputs(&mut self) -> io::Result<()> {
for idx in 0..self.replicas.len() {
if !matches!(
self.replicas[idx].state,
ReplicaState::Streaming { .. }
| ReplicaState::AckSent { .. }
| ReplicaState::SnapshotShipping { .. }
) {
continue;
}
if self.replicas[idx].output.len() <= self.replicas[idx].write_off {
continue;
}
self.replica_writable(idx)?;
let conn = &self.replicas[idx];
if matches!(conn.state, ReplicaState::Streaming { .. })
&& conn.output.len() - conn.write_off >= STREAMING_OUTPUT_CAP
{
eprintln!(
"kevy: streaming replica fd {} output cap ({} B) reached; \
dropping link (reconnect will resume from backlog)",
conn.fd, STREAMING_OUTPUT_CAP,
);
self.replicas[idx].close();
}
}
Ok(())
}
/// Trigger a snapshot ship (T1.23) for the replica at `idx`:
/// in-memory-serialize the local store via `kevy_persist::
/// write_snapshot_to`, push `+SNAPSHOT\r\n` to the conn's
/// output, and transition state to SnapshotShipping. The next
/// pump iteration chunks the buffer out via
/// [`Self::pump_snapshot_chunks`].
///
/// `ack_offset` is the primary's `source.next_offset()` at
/// trigger time — encoded into `+SNAPSHOT_END <ack_offset>\r\n`
/// when the snapshot ship completes, and becomes the replica's
/// new `sent_offset` for live streaming after.
///
/// T1.23.5: freeze a COW [`kevy_store::SnapshotView`] on the
/// reactor thread (O(n) shallow clone — ns/entry, much cheaper
/// than full serialization), hand it to a background worker
/// that runs `kevy_persist::write_snapshot_to` at leisure, and
/// emit `+SNAPSHOT\r\n` immediately so the replica knows the
/// ship started. The worker `mpsc::send`s the serialized bytes
/// back when done; [`Self::pump_snapshot_chunks`] polls each
/// tick via `try_recv` and starts emitting chunks once they
/// arrive. The reactor no longer pauses for the duration of
/// serialization — only for the shallow collect.
fn start_snapshot_ship(&mut self, idx: usize, ack_offset: u64) -> io::Result<()> {
let ReplicaState::Streaming { ref replica_id, .. } = self.replicas[idx].state else {
// Defensive: only Streaming replicas should reach the
// TooOld branch in fill_streaming_output.
return Ok(());
};
let replica_id = replica_id.clone();
let view = self.store.collect_snapshot();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::Builder::new()
.name(format!("kevy-snapshot-{replica_id}"))
.spawn(move || {
let mut buf = Vec::new();
if kevy_persist::write_snapshot_to(&view, &mut buf).is_ok() {
let _ = tx.send(buf);
}
// On serialization error, drop tx → receiver-side
// try_recv returns Disconnected; pump_snapshot_chunks
// treats that as a fatal error and closes the conn.
})
.expect("spawn snapshot serializer thread");
let conn = &mut self.replicas[idx];
conn.output.extend_from_slice(&encode_snapshot_begin());
conn.state = ReplicaState::SnapshotShipping {
replica_id,
ack_offset,
serializing: Some(rx),
snapshot_buf: Vec::new(),
snapshot_off: 0,
};
Ok(())
}
/// Chunk one SNAPSHOT_CHUNK_MAX worth of snapshot bytes into the
/// replica's output. When the buffer is fully sent, pushes the
/// `+SNAPSHOT_END <ack_offset>\r\n` trailer and transitions to
/// Streaming. Skips when pending output is over half the cap
/// (backpressure — drain_streaming_outputs will write what's
/// queued; next pump retries).
///
/// T1.23.5: if the background serializer hasn't delivered yet,
/// `try_recv` returns `Empty`; the pump no-ops this iteration
/// and retries next tick. If the worker thread died without
/// sending (`Disconnected`), the conn is closed.
fn pump_snapshot_chunks(&mut self, idx: usize) {
let pending = self.replicas[idx].output.len() - self.replicas[idx].write_off;
if pending >= STREAMING_OUTPUT_CAP / 2 {
return;
}
// Try to receive the serialized bytes if the worker is still
// running. Three outcomes: bytes arrived → populate
// snapshot_buf; channel empty → wait; channel closed without
// bytes → fatal, close the conn.
if let ReplicaState::SnapshotShipping {
ref mut serializing,
ref mut snapshot_buf,
..
} = self.replicas[idx].state
&& let Some(rx) = serializing.take()
{
match rx.try_recv() {
Ok(buf) => {
*snapshot_buf = buf;
// Drop the receiver — already consumed.
}
Err(std::sync::mpsc::TryRecvError::Empty) => {
// Worker still running; put the receiver
// back and try next tick.
*serializing = Some(rx);
return;
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
eprintln!(
"kevy: snapshot serializer thread died for replica fd {} — closing",
self.replicas[idx].fd,
);
self.replicas[idx].close();
return;
}
}
}
let (ack_offset, chunk_bytes, done) = {
let ReplicaState::SnapshotShipping {
ref snapshot_buf,
snapshot_off,
ack_offset,
..
} = self.replicas[idx].state
else {
return;
};
let remaining = &snapshot_buf[snapshot_off..];
if remaining.is_empty() {
(ack_offset, Vec::new(), true)
} else {
let take = remaining.len().min(SNAPSHOT_CHUNK_MAX);
(ack_offset, remaining[..take].to_vec(), false)
}
};
let conn = &mut self.replicas[idx];
if !chunk_bytes.is_empty() {
conn.output.extend_from_slice(&encode_snapshot_chunk(&chunk_bytes));
if let ReplicaState::SnapshotShipping {
ref mut snapshot_off, ..
} = conn.state
{
*snapshot_off += chunk_bytes.len();
}
}
if done {
// Snapshot fully chunked — emit the end marker and flip
// state to Streaming so the next pump fills from the
// backlog at `ack_offset`.
conn.output.extend_from_slice(&encode_snapshot_end(ack_offset));
if let ReplicaState::SnapshotShipping { replica_id, .. } = &conn.state {
let rid = replica_id.clone();
conn.state = ReplicaState::Streaming {
replica_id: rid,
sent_offset: ack_offset,
};
}
}
}
}