1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
//! Per-shard replication accept + handshake glue.
//!
//! This module owns the [`ReplicaConn`] state machine and the [`Shard`]
//! methods that drive it: accepting a replica TCP connection on the
//! per-shard listener (`replication_port_base + id`), running the
//! handshake (`REPLICATE FROM <offset> ID <replica-id>` → `+ACK <offset>`),
//! and reacting to read/write readiness for live replicas.
//!
//! The frame format, source backlog, handshake parse, and slot table
//! are stones in `kevy-replicate`; this module is the cement that wires
//! them into the reactor loop.
//!
//! Lifecycle: `HandshakePending` → `AckSent` (after parse + `+ACK`
//! queued) → `Streaming { sent_offset }` (after `+ACK` drains —
//! per-iter pump in [`crate::replication_pump`] fills more frames) →
//! `Closed { replica_id, sent_offset }` (peer EOF / cap exceeded /
//! `TooOld`). The reactor reaps Closed conns once per iter; at reap
//! time, conns whose `replica_id` is `Some` are recorded into
//! [`Shard::slots`] (per T1.15) so a reconnect within
//! `reconnect_window_ms` is correlatable.
use kevy_replicate::handshake::{HandshakeError, encode_ack, parse_replicate_from};
use kevy_resp::{Argv, parse_command_into};
use kevy_sys::Socket;
/// One active replica connection on this shard.
///
/// Lives in `Shard::replicas`. See the module-level docs for the
/// lifecycle diagram.
pub struct ReplicaConn {
/// Owning socket. Dropped when the conn is dropped (closes the fd).
pub sock: Socket,
/// Cached raw fd for poller bookkeeping.
pub fd: i32,
/// Input buffer — bytes pulled off the socket waiting to parse.
pub input: Vec<u8>,
/// Output buffer — bytes queued for write_all (handshake `+ACK`
/// in this batch; streamed frames in T1.14).
pub output: Vec<u8>,
/// Cursor into `output`; the next `Socket::write` writes from
/// `output[write_off..]` and advances on partial sends.
pub write_off: usize,
/// Lifecycle state — drives the reactor's dispatch decisions.
pub state: ReplicaState,
/// Peer's `(IPv4, port)` captured at accept time (T1.28.5).
/// `(0.0.0.0, 0)` for the fallback path (peer-vanished-pre-
/// getpeername) and for synthetic conns built in unit tests.
/// Used by `tick_replication_view` to enrich the per-shard view
/// the command layer reads for `ROLE` / `INFO replication`.
pub peer: (std::net::Ipv4Addr, u16),
}
/// Replication conn lifecycle. See [`ReplicaConn`] doc for the
/// transition diagram.
///
/// `Debug` only — `SnapshotShipping` carries a `mpsc::Receiver`
/// (T1.23.5 background-serializer handle) which has no
/// `PartialEq`, so the previous `Eq`/`PartialEq` derives are gone.
/// Tests compare via `matches!` instead.
#[derive(Debug)]
pub enum ReplicaState {
/// Pre-handshake: input accumulates until a full RESP command parses.
HandshakePending,
/// `+ACK` queued in `output`; once the buffer drains the conn
/// transitions to [`Self::Streaming`].
AckSent {
/// Replica id from the handshake (passed to Streaming).
replica_id: String,
/// Offset the replica asked to resume from (passed to
/// Streaming as the initial `sent_offset`).
from_offset: u64,
},
/// Live: the streaming pump fills `output` from the source backlog
/// every reactor iteration; the writability handler drains.
Streaming {
/// Replica id (kept for observability / future INFO reporting).
replica_id: String,
/// Next offset to send. After encoding a frame at offset K,
/// advances to K + 1. In v1.18.0 this also serves as the
/// assumed-acked offset (no replica→primary ACK channel yet;
/// see module docs + T1.15 wiring + Phase 1.5 kevy-elect).
sent_offset: u64,
},
/// Snapshot ship in progress (T1.23). At trigger time the
/// reactor freezes a COW [`kevy_store::SnapshotView`] (O(n)
/// shallow clone — ns/entry) and hands it to a worker thread
/// that serializes via `kevy_persist::write_snapshot_to`. The
/// worker `mpsc::send`s the full RDB bytes back via
/// `serializing`; until the bytes arrive, `pump_snapshot_chunks`
/// is a no-op (output backpressure handles client wait). Once
/// received, the pump chunks `snapshot_buf[snapshot_off..]` per
/// SNAPSHOT_CHUNK_MAX, then on completion pushes
/// `+SNAPSHOT_END <ack_offset>` and transitions to Streaming
/// with `sent_offset = ack_offset`.
SnapshotShipping {
/// Replica id (carried into Streaming on completion).
replica_id: String,
/// Offset the snapshot was taken at; equals the primary's
/// `source.next_offset()` at snapshot-trigger time. After
/// snapshot ship completes, becomes the new `sent_offset`.
ack_offset: u64,
/// `Some(rx)` while the worker thread is serializing the
/// SnapshotView; `pump_snapshot_chunks` polls via
/// `try_recv` each tick. Cleared (set to `None`) once the
/// bytes arrive.
serializing: Option<std::sync::mpsc::Receiver<Vec<u8>>>,
/// Serialized RDB bytes once the worker delivers them.
/// Empty + `serializing.is_some()` ⇒ still waiting on the
/// background serializer (T1.23.5).
snapshot_buf: Vec<u8>,
/// Cursor into `snapshot_buf` — bytes [0..snapshot_off) have
/// been chunked into `output` already.
snapshot_off: usize,
},
/// Terminal: handshake failed, output cap exceeded, peer EOF, or
/// the source can't serve `sent_offset` (TooOld → would need a
/// snapshot ship, which arrives at T1.22+). Reactor reaps on
/// next dispatch — at reap time, any conn that had reached
/// AckSent/Streaming gets recorded in [`crate::shard::Shard::slots`]
/// (per T1.15) so a reconnect within `reconnect_window_ms` can be
/// observed/correlated. `replica_id = None` means the conn closed
/// before handshake completed (nothing to record).
Closed {
/// Handshake's replica id, if the conn ever reached AckSent.
replica_id: Option<String>,
/// Highest sent offset (== assumed-acked in v1.18 — no real
/// replica ACK channel yet, see Phase 1.5 kevy-elect). `0`
/// when the conn closed before reaching Streaming.
sent_offset: u64,
},
}
impl ReplicaConn {
/// Wrap a freshly-accepted socket. The socket must already be
/// non-blocking (set by the caller before adding to the poller).
/// Peer addr defaults to `(0.0.0.0, 0)` — call
/// [`Self::with_peer`] from the accept path to record the real
/// peer.
#[allow(dead_code)] // legacy two-arg accept; the accept path now uses with_peer
pub fn new(sock: Socket) -> Self {
Self::with_peer(sock, (std::net::Ipv4Addr::UNSPECIFIED, 0))
}
/// Wrap a freshly-accepted socket together with its peer
/// `(IPv4, port)` (captured by `Socket::peer_addr` at accept
/// time; T1.28.5). Used by the replication listener so
/// `tick_replication_view` can ship the per-replica address
/// list to the command layer.
pub fn with_peer(sock: Socket, peer: (std::net::Ipv4Addr, u16)) -> Self {
let fd = sock.raw();
Self {
sock,
fd,
input: Vec::with_capacity(256),
output: Vec::with_capacity(64),
write_off: 0,
state: ReplicaState::HandshakePending,
peer,
}
}
/// Transition to [`ReplicaState::Closed`] while preserving the
/// replica id + sent offset the conn had at the moment of close.
/// Idempotent. The reactor's reap step reads these fields to
/// record the slot in [`crate::shard::Shard::slots`] before
/// dropping the conn (T1.15).
pub fn close(&mut self) {
let (id, off) = match &self.state {
ReplicaState::HandshakePending => (None, 0),
ReplicaState::AckSent { replica_id, from_offset } => {
(Some(replica_id.clone()), *from_offset)
}
ReplicaState::Streaming { replica_id, sent_offset } => {
(Some(replica_id.clone()), *sent_offset)
}
ReplicaState::SnapshotShipping { replica_id, ack_offset, .. } => {
// Snapshot was in flight; on reconnect within the
// window the replica should retry — record the slot
// at the snapshot's ack_offset so future `INFO
// replication` / observability can see where we were.
(Some(replica_id.clone()), *ack_offset)
}
ReplicaState::Closed { .. } => return,
};
self.state = ReplicaState::Closed {
replica_id: id,
sent_offset: off,
};
}
}
/// Try to advance the conn's handshake state. Pulls a single complete
/// RESP command out of `conn.input` (if one is there), runs the
/// handshake parser, and either pushes `+ACK` to `conn.output` or
/// returns the rejection reason for the caller to log + drop the conn.
///
/// Splits the handshake state machine from the I/O loop so this can be
/// unit-tested without standing up a Shard (see the module tests).
pub(crate) fn advance_handshake(conn: &mut ReplicaConn) -> Result<(), HandshakeError> {
if !matches!(conn.state, ReplicaState::HandshakePending) {
return Ok(());
}
let mut argv = Argv::default();
let parsed = parse_command_into(&conn.input, &mut argv)
.map_err(|_| HandshakeError::BadCommand)?;
let consumed = match parsed {
Some(n) => n,
None => return Ok(()), // need more bytes — caller will read more
};
let req = parse_replicate_from(&argv)?;
conn.input.drain(..consumed);
conn.output.extend_from_slice(&encode_ack(req.from_offset));
conn.write_off = 0;
conn.state = ReplicaState::AckSent {
replica_id: req.replica_id,
from_offset: req.from_offset,
};
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
/// Build a ReplicaConn with no real socket — for handshake-only
/// state-machine tests. Uses an invalid fd that would error on any
/// real I/O call (the tests only exercise advance_handshake +
/// state inspection, never read/write the socket).
fn fake_conn() -> ReplicaConn {
// SAFETY: the tests below only inspect state + buffers and call
// advance_handshake; they never read/write/close this socket.
// fd = -1 makes any accidental I/O call return EBADF rather
// than silently corrupting an unrelated descriptor.
let sock = unsafe { Socket::from_raw_fd(-1) };
ReplicaConn {
sock,
fd: -1,
input: Vec::new(),
output: Vec::new(),
write_off: 0,
state: ReplicaState::HandshakePending,
peer: (std::net::Ipv4Addr::UNSPECIFIED, 0),
}
}
fn resp_replicate_from(offset: &str, id: &str) -> Vec<u8> {
let mut v = Vec::new();
v.extend_from_slice(b"*5\r\n");
for arg in [b"REPLICATE".as_slice(), b"FROM", offset.as_bytes(), b"ID", id.as_bytes()] {
v.extend_from_slice(format!("${}\r\n", arg.len()).as_bytes());
v.extend_from_slice(arg);
v.extend_from_slice(b"\r\n");
}
v
}
#[test]
fn close_from_handshake_pending_carries_no_id() {
let mut conn = fake_conn();
conn.close();
match conn.state {
ReplicaState::Closed { replica_id, sent_offset } => {
assert_eq!(replica_id, None);
assert_eq!(sent_offset, 0);
}
other => panic!("expected Closed, got {other:?}"),
}
}
#[test]
fn close_from_ack_sent_preserves_id_and_offset() {
let mut conn = fake_conn();
conn.input = resp_replicate_from("17", "replica-x");
advance_handshake(&mut conn).expect("handshake ok");
conn.close();
match conn.state {
ReplicaState::Closed { replica_id, sent_offset } => {
assert_eq!(replica_id.as_deref(), Some("replica-x"));
assert_eq!(sent_offset, 17);
}
other => panic!("expected Closed, got {other:?}"),
}
}
#[test]
fn close_from_streaming_preserves_id_and_offset() {
let mut conn = fake_conn();
conn.state = ReplicaState::Streaming {
replica_id: "replica-z".into(),
sent_offset: 99,
};
conn.close();
match conn.state {
ReplicaState::Closed { replica_id, sent_offset } => {
assert_eq!(replica_id.as_deref(), Some("replica-z"));
assert_eq!(sent_offset, 99);
}
other => panic!("expected Closed, got {other:?}"),
}
}
#[test]
fn close_is_idempotent() {
let mut conn = fake_conn();
conn.state = ReplicaState::Streaming {
replica_id: "r".into(),
sent_offset: 5,
};
conn.close();
let snapshot = format!("{:?}", conn.state);
conn.close(); // second call must not overwrite fields
assert_eq!(format!("{:?}", conn.state), snapshot);
}
#[test]
fn handshake_pending_to_ack_sent_on_complete_command() {
let mut conn = fake_conn();
conn.input = resp_replicate_from("42", "replica-a");
advance_handshake(&mut conn).expect("ok");
match &conn.state {
ReplicaState::AckSent { replica_id, from_offset } => {
assert_eq!(replica_id, "replica-a");
assert_eq!(*from_offset, 42);
}
other => panic!("expected AckSent, got {other:?}"),
}
assert_eq!(conn.output, b"+ACK 42\r\n");
// Input fully consumed.
assert!(conn.input.is_empty());
}
#[test]
fn partial_handshake_stays_pending_and_waits_for_more_bytes() {
let mut conn = fake_conn();
let full = resp_replicate_from("0", "replica-a");
// Hand over only the first half of the command.
conn.input = full[..full.len() / 2].to_vec();
advance_handshake(&mut conn).expect("ok");
assert!(matches!(conn.state, ReplicaState::HandshakePending));
assert!(conn.output.is_empty());
// Append the rest — handshake completes.
conn.input.extend_from_slice(&full[full.len() / 2..]);
advance_handshake(&mut conn).expect("ok");
assert!(matches!(conn.state, ReplicaState::AckSent { .. }));
}
#[test]
fn wrong_command_is_rejected_at_handshake() {
let mut conn = fake_conn();
// Valid RESP, wrong verb.
conn.input = b"*1\r\n$4\r\nPING\r\n".to_vec();
let err = advance_handshake(&mut conn).unwrap_err();
assert!(matches!(err, HandshakeError::WrongArity(_) | HandshakeError::BadCommand));
// State stays HandshakePending; the caller marks Closed on err.
assert!(matches!(conn.state, ReplicaState::HandshakePending));
}
#[test]
fn inline_form_parses_then_handshake_rejects_arity() {
// kevy-resp falls back to inline parsing on a non-`*` first
// byte, so `!garbage\r\n` parses as a 1-arg argv `["!garbage"]`.
// The RESP parser does NOT reject; the handshake layer does,
// via WrongArity. Pins down which layer is responsible.
let mut conn = fake_conn();
conn.input = b"!garbage\r\n".to_vec();
let err = advance_handshake(&mut conn).unwrap_err();
assert_eq!(err, HandshakeError::WrongArity(1));
}
#[test]
fn resp_level_malformed_input_returns_bad_command() {
// Force the RESP multi-bulk parser into an error path —
// `*1\r\n` claims one bulk arg but the body starts with `!`
// instead of `$`. parse_command_into returns ProtocolError;
// advance_handshake maps it to BadCommand.
let mut conn = fake_conn();
conn.input = b"*1\r\n!nope\r\n".to_vec();
let err = advance_handshake(&mut conn).unwrap_err();
assert_eq!(err, HandshakeError::BadCommand);
}
#[test]
fn second_call_after_ack_is_noop() {
let mut conn = fake_conn();
conn.input = resp_replicate_from("7", "r");
advance_handshake(&mut conn).unwrap();
let out_before = conn.output.clone();
// Calling again with empty input must not re-emit the ACK.
advance_handshake(&mut conn).unwrap();
assert_eq!(conn.output, out_before);
}
}