kevy_replicate/replica.rs
1//! Replica-side client — connect to a primary's replication listener,
2//! perform the handshake, then yield decoded mutation frames in order.
3//!
4//! The client is **synchronous + blocking** by design: it slots into a
5//! dedicated thread on the replica node alongside (but separate from)
6//! the regular kevy reactor. An async surface is a Phase 4 deliverable
7//! (`kevy-client-async`, the only crate carved out of the 0-dep rule).
8//!
9//! Hot loop usage:
10//!
11//! ```no_run
12//! use kevy_replicate::replica::ReplicaClient;
13//!
14//! let mut client = ReplicaClient::connect("127.0.0.1:16004", "replica-a", 0)
15//! .expect("connect ok");
16//! while let Some(result) = client.next() {
17//! let frame = result.expect("decode ok");
18//! // apply frame.argv at frame.offset — caller's responsibility (T1.19)
19//! drop(frame);
20//! }
21//! ```
22//!
23//! Errors map to actionable next steps for the caller:
24//! - [`ReplicaError::HandshakeRejected`] / [`ReplicaError::AckMalformed`]
25//! — primary refused or replied with garbage; drop the link, log,
26//! maybe back off and retry.
27//! - [`ReplicaError::Truncated`] — peer EOF mid-frame; treat as a
28//! disconnect, reconnect later.
29//! - [`ReplicaError::OffsetGap { expected, got }`] — frames arrived
30//! out of order or with a skip; per plan T1.20 the caller should
31//! trigger a full snapshot resync. v1.18.0 surfaces the gap; the
32//! snapshot ship machinery itself lands at T1.22.
33//! - [`ReplicaError::Frame`] — wire-level decode error; same
34//! action as Truncated (drop + reconnect).
35
36use crate::wire::WireError;
37use kevy_resp::Argv;
38use std::io::{self, Read, Write};
39use std::net::{TcpStream, ToSocketAddrs};
40use std::time::Duration;
41
42/// A decoded mutation frame the replica should apply to its local
43/// store. Ownership of the [`Argv`] passes to the caller.
44#[derive(Debug)]
45pub struct DecodedFrame {
46 /// Monotonic offset the primary assigned at apply-time.
47 pub offset: u64,
48 /// Wire-decoded argv — feed to the dispatcher the same way AOF
49 /// replay does (cmd name + arg bytes).
50 pub argv: Argv,
51}
52
53/// Event yielded by [`ReplicaClient::next_event`]. A driver loop
54/// pattern-matches and applies each:
55/// - [`Self::Frame`] → run through the local dispatcher.
56/// - [`Self::SnapshotBegin`] → caller should reset / prepare the
57/// local store for a fresh-from-snapshot fill.
58/// - [`Self::SnapshotChunk`] → append the bytes to the caller's
59/// accumulating snapshot buffer.
60/// - [`Self::SnapshotEnd`] → caller hands the accumulated buffer to
61/// `kevy_persist::load_snapshot`; [`ReplicaClient`] has already
62/// advanced `expected_offset` to `ack_offset`, so the next
63/// [`Self::Frame`] arrives at `ack_offset` with no gap.
64#[derive(Debug)]
65pub enum ReplicaEvent {
66 /// A live mutation frame.
67 Frame(DecodedFrame),
68 /// Snapshot ship begin marker (`+SNAPSHOT\r\n`).
69 SnapshotBegin,
70 /// One snapshot chunk's payload bytes (RESP bulk string body).
71 SnapshotChunk(Vec<u8>),
72 /// Snapshot ship end marker carrying the offset the next live
73 /// frame will have.
74 SnapshotEnd {
75 /// The offset the primary's `next_offset` was at when the
76 /// snapshot started. After this event, [`ReplicaClient::expected_offset`]
77 /// equals this value.
78 ack_offset: u64,
79 },
80}
81
82/// Errors a replica client can surface to its driver loop.
83#[derive(Debug)]
84pub enum ReplicaError {
85 /// Primary closed the connection or never replied during the
86 /// handshake / `+ACK` exchange.
87 HandshakeRejected,
88 /// `+ACK` line was malformed (didn't start with `+ACK `, didn't
89 /// parse the offset).
90 AckMalformed,
91 /// Peer closed the connection mid-frame; reconnect to resume.
92 Truncated,
93 /// Wire-level decode error (envelope shape wrong, payload
94 /// malformed, etc.).
95 Frame(WireError),
96 /// Frame arrived with an offset other than the expected next.
97 /// Caller should trigger a full snapshot resync (T1.22).
98 OffsetGap {
99 /// The offset the client expected next (= `last_seen + 1`).
100 expected: u64,
101 /// The offset the primary actually sent.
102 got: u64,
103 },
104 /// While streaming a snapshot, the primary sent bytes that were
105 /// neither a snapshot chunk nor `+SNAPSHOT_END`. v1.18.0 forbids
106 /// interleaving live frames inside a snapshot (see `docs/snapshot.md`).
107 UnexpectedInSnapshot,
108 /// `next_frame` was called but the next event is a snapshot
109 /// marker / chunk. Callers that want the snapshot-aware surface
110 /// must use [`ReplicaClient::next_event`].
111 SnapshotInProgress,
112 /// Underlying socket I/O failure.
113 Io(io::Error),
114}
115
116impl std::fmt::Display for ReplicaError {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 match self {
119 Self::HandshakeRejected => write!(f, "primary rejected replication handshake"),
120 Self::AckMalformed => write!(f, "primary sent malformed +ACK"),
121 Self::Truncated => write!(f, "replication stream truncated by peer"),
122 Self::Frame(e) => write!(f, "replication frame decode error: {e}"),
123 Self::OffsetGap { expected, got } => {
124 write!(f, "replication offset gap: expected {expected}, got {got}")
125 }
126 Self::UnexpectedInSnapshot => {
127 write!(f, "primary sent non-chunk bytes mid-snapshot")
128 }
129 Self::SnapshotInProgress => {
130 write!(f, "snapshot in progress; use next_event() to consume")
131 }
132 Self::Io(e) => write!(f, "replication socket I/O error: {e}"),
133 }
134 }
135}
136
137impl std::error::Error for ReplicaError {}
138
139impl From<io::Error> for ReplicaError {
140 fn from(e: io::Error) -> Self {
141 ReplicaError::Io(e)
142 }
143}
144
145impl From<WireError> for ReplicaError {
146 fn from(e: WireError) -> Self {
147 match e {
148 WireError::Truncated => ReplicaError::Truncated,
149 other => ReplicaError::Frame(other),
150 }
151 }
152}
153
154/// One blocking TCP connection to a primary's per-shard replication
155/// listener. After [`Self::connect`] completes the handshake, the
156/// client behaves as an `Iterator<Item = Result<DecodedFrame, ReplicaError>>`
157/// yielding frames in offset order until the peer disconnects or a
158/// hard error surfaces.
159pub struct ReplicaClient {
160 pub(crate) sock: TcpStream,
161 /// Bytes pulled off the socket waiting to parse the next frame.
162 pub(crate) buf: Vec<u8>,
163 /// Position into `buf` where the next decode attempt starts. We
164 /// drain `buf` only when this passes a high-water mark, so per-
165 /// frame work avoids repeated `Vec::drain` shifts.
166 pub(crate) cursor: usize,
167 /// Offset the primary advertised at handshake (`+ACK <N>` value).
168 /// Currently informational; T1.20 / T1.22 use it for gap-detection
169 /// decisions (re-handshake vs full sync).
170 pub(crate) primary_offset_at_handshake: u64,
171 /// The next offset we expect from the stream. Initially the
172 /// `from_offset` we requested; advances by 1 on each accepted frame.
173 pub(crate) expected_offset: u64,
174 /// `true` while we're between `+SNAPSHOT` and `+SNAPSHOT_END`.
175 /// In this state, only chunk + end-marker bytes are valid; a
176 /// `*2\r\n` (live frame envelope) returns
177 /// [`ReplicaError::UnexpectedInSnapshot`] per the v1.18 spec
178 /// (`docs/snapshot.md` — interleaving is a T1.25 extension).
179 pub(crate) in_snapshot: bool,
180}
181
182impl ReplicaClient {
183 /// Connect to `addr`, send `REPLICATE FROM <from_offset> ID <replica_id>`,
184 /// read the `+ACK <offset>` reply, and return a ready-to-iterate
185 /// client. Blocks until the handshake completes or the connect
186 /// times out (`connect_timeout` argument).
187 pub fn connect<A: ToSocketAddrs>(
188 addr: A,
189 replica_id: &str,
190 from_offset: u64,
191 ) -> Result<Self, ReplicaError> {
192 Self::connect_with_timeout(addr, replica_id, from_offset, Duration::from_secs(5))
193 }
194
195 /// [`Self::connect`] with an explicit connect timeout. Useful for
196 /// tests that don't want to wait the default 5 s when a port is
197 /// closed.
198 pub fn connect_with_timeout<A: ToSocketAddrs>(
199 addr: A,
200 replica_id: &str,
201 from_offset: u64,
202 connect_timeout: Duration,
203 ) -> Result<Self, ReplicaError> {
204 // Resolve + connect with timeout. ToSocketAddrs returns an
205 // iterator; we try each address until one succeeds.
206 let mut last_err: Option<io::Error> = None;
207 let mut sock: Option<TcpStream> = None;
208 for sa in addr.to_socket_addrs().map_err(ReplicaError::Io)? {
209 match TcpStream::connect_timeout(&sa, connect_timeout) {
210 Ok(s) => {
211 sock = Some(s);
212 break;
213 }
214 Err(e) => last_err = Some(e),
215 }
216 }
217 let mut sock = sock.ok_or_else(|| {
218 ReplicaError::Io(last_err.unwrap_or_else(|| {
219 io::Error::new(io::ErrorKind::InvalidInput, "no socket address resolved")
220 }))
221 })?;
222
223 // Send the handshake. `encode_replicate_from` is a private
224 // helper so the on-the-wire shape is one place to change.
225 let req = encode_replicate_from(from_offset, replica_id);
226 sock.write_all(&req)?;
227
228 // Read the `+ACK <offset>\r\n` reply. Use a small read timeout
229 // so a primary that opens the socket but never replies doesn't
230 // hang the replica forever.
231 sock.set_read_timeout(Some(connect_timeout))?;
232 let primary_offset = read_ack(&mut sock)?;
233 // Clear the read timeout for normal streaming (replica may sit
234 // for minutes with no frames if the primary is idle).
235 sock.set_read_timeout(None)?;
236 sock.set_nonblocking(false)?; // explicit: blocking reads after handshake.
237
238 Ok(ReplicaClient {
239 sock,
240 buf: Vec::with_capacity(8 * 1024),
241 cursor: 0,
242 primary_offset_at_handshake: primary_offset,
243 expected_offset: from_offset,
244 in_snapshot: false,
245 })
246 }
247
248 /// Offset the primary reported at handshake (`+ACK <N>` value).
249 /// Informational — exposed so callers can log + future T1.22
250 /// snapshot-ship logic can compare against the local applied
251 /// offset to decide resume vs full-sync.
252 pub fn primary_offset_at_handshake(&self) -> u64 {
253 self.primary_offset_at_handshake
254 }
255
256 /// Return a `try_clone`'d handle on the underlying socket. The
257 /// clone shares the same kernel file description, so calling
258 /// `shutdown(Shutdown::Both)` on it unblocks any in-flight
259 /// blocking read on the original (and vice versa). T1.29.5 uses
260 /// this to interrupt a runner thread parked in `next_event` when
261 /// `REPLICAOF` retargets or `REPLICAOF NO ONE` demotes — without
262 /// this handle, the runner stays blocked until the upstream peer
263 /// closes the connection.
264 pub fn socket_handle(&self) -> io::Result<TcpStream> {
265 self.sock.try_clone()
266 }
267
268 /// The offset the next frame should carry. Advances on every
269 /// successful `next()`.
270 pub fn expected_offset(&self) -> u64 {
271 self.expected_offset
272 }
273
274 /// Pull the next frame from the stream. Frame-only convenience —
275 /// returns [`ReplicaError::SnapshotInProgress`] if the primary is
276 /// sending a snapshot. Callers that need the snapshot-aware
277 /// surface (T1.22) must use [`Self::next_event`] instead.
278 /// Returns `None` on clean peer EOF (no buffered bytes left).
279 pub fn next_frame(&mut self) -> Option<Result<DecodedFrame, ReplicaError>> {
280 match self.next_event()? {
281 Ok(ReplicaEvent::Frame(f)) => Some(Ok(f)),
282 Ok(_) => Some(Err(ReplicaError::SnapshotInProgress)),
283 Err(e) => Some(Err(e)),
284 }
285 }
286
287 /// Drop already-consumed prefix when the cursor has walked past
288 /// 4 KiB of buffer (amortises per-frame work without doing a full
289 /// `drain` on every frame). Used by the event-decoding helpers
290 /// in [`crate::replica_decode`].
291 pub(crate) fn maybe_compact_buf(&mut self) {
292 if self.cursor >= 4 * 1024 {
293 self.buf.drain(..self.cursor);
294 self.cursor = 0;
295 }
296 }
297}
298
299impl Iterator for ReplicaClient {
300 type Item = Result<DecodedFrame, ReplicaError>;
301 /// Frame-only iterator. Use [`ReplicaClient::next_event`] for the
302 /// snapshot-aware surface.
303 fn next(&mut self) -> Option<Self::Item> {
304 self.next_frame()
305 }
306}
307
308/// Compose a `REPLICATE FROM <offset> ID <id>` RESP2 multi-bulk
309/// request — symmetric to `handshake::parse_replicate_from` on the
310/// primary side.
311fn encode_replicate_from(from_offset: u64, replica_id: &str) -> Vec<u8> {
312 let mut v = Vec::with_capacity(64 + replica_id.len());
313 v.extend_from_slice(b"*5\r\n");
314 let offset_str = from_offset.to_string();
315 for arg in [
316 b"REPLICATE".as_slice(),
317 b"FROM",
318 offset_str.as_bytes(),
319 b"ID",
320 replica_id.as_bytes(),
321 ] {
322 let header = format!("${}\r\n", arg.len());
323 v.extend_from_slice(header.as_bytes());
324 v.extend_from_slice(arg);
325 v.extend_from_slice(b"\r\n");
326 }
327 v
328}
329
330/// Read `+ACK <offset>\r\n` from `sock`, return the parsed offset.
331/// Pulls one byte at a time — the reply is < 30 bytes, so the per-
332/// byte syscall cost is negligible and avoids a buffering surface
333/// we'd have to thread into the client struct just for the handshake.
334fn read_ack(sock: &mut TcpStream) -> Result<u64, ReplicaError> {
335 let mut line = Vec::with_capacity(32);
336 let mut b = [0u8; 1];
337 loop {
338 match sock.read(&mut b) {
339 Ok(0) => return Err(ReplicaError::HandshakeRejected),
340 Ok(_) => {
341 line.push(b[0]);
342 if line.len() >= 2 && line.ends_with(b"\r\n") {
343 break;
344 }
345 if line.len() > 256 {
346 return Err(ReplicaError::AckMalformed);
347 }
348 }
349 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
350 Err(e) => return Err(ReplicaError::Io(e)),
351 }
352 }
353 parse_ack_line(&line)
354}
355
356fn parse_ack_line(line: &[u8]) -> Result<u64, ReplicaError> {
357 let body = line.strip_suffix(b"\r\n").ok_or(ReplicaError::AckMalformed)?;
358 let body = body.strip_prefix(b"+ACK ").ok_or(ReplicaError::AckMalformed)?;
359 let s = std::str::from_utf8(body).map_err(|_| ReplicaError::AckMalformed)?;
360 s.parse::<u64>().map_err(|_| ReplicaError::AckMalformed)
361}
362
363#[cfg(test)]
364impl ReplicaClient {
365 /// Test-only constructor that wraps an already-connected socket
366 /// without doing the handshake. Lets unit tests drive the event
367 /// loop against canned bytes from the other end of a TcpStream pair.
368 pub(crate) fn from_socket_for_test(sock: TcpStream, expected_offset: u64) -> Self {
369 Self {
370 sock,
371 buf: Vec::with_capacity(8 * 1024),
372 cursor: 0,
373 primary_offset_at_handshake: expected_offset,
374 expected_offset,
375 in_snapshot: false,
376 }
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 #[test]
385 fn encoded_replicate_from_matches_what_primary_parses() {
386 // Round-trip: encode here, parse via the primary-side parser.
387 let bytes = encode_replicate_from(42, "replica-a");
388 let mut argv = Argv::default();
389 let consumed = kevy_resp::parse_command_into(&bytes, &mut argv)
390 .expect("parse ok")
391 .expect("complete");
392 assert_eq!(consumed, bytes.len());
393 let req = crate::handshake::parse_replicate_from(&argv).expect("handshake ok");
394 assert_eq!(req.from_offset, 42);
395 assert_eq!(req.replica_id, "replica-a");
396 }
397
398 #[test]
399 fn ack_line_parses_offsets() {
400 assert_eq!(parse_ack_line(b"+ACK 0\r\n").unwrap(), 0);
401 assert_eq!(parse_ack_line(b"+ACK 42\r\n").unwrap(), 42);
402 assert_eq!(parse_ack_line(b"+ACK 12345678\r\n").unwrap(), 12_345_678);
403 }
404
405 #[test]
406 fn ack_line_rejects_malformed() {
407 assert!(matches!(
408 parse_ack_line(b"+PONG\r\n"),
409 Err(ReplicaError::AckMalformed)
410 ));
411 assert!(matches!(
412 parse_ack_line(b"+ACK abc\r\n"),
413 Err(ReplicaError::AckMalformed)
414 ));
415 assert!(matches!(
416 parse_ack_line(b"-ERR nope\r\n"),
417 Err(ReplicaError::AckMalformed)
418 ));
419 // Missing CRLF.
420 assert!(matches!(
421 parse_ack_line(b"+ACK 1"),
422 Err(ReplicaError::AckMalformed)
423 ));
424 }
425
426 #[test]
427 fn ack_line_rejects_offset_overflow() {
428 // 21+ digits — beyond u64::MAX. parse::<u64>() returns Err →
429 // AckMalformed.
430 assert!(matches!(
431 parse_ack_line(b"+ACK 99999999999999999999999\r\n"),
432 Err(ReplicaError::AckMalformed)
433 ));
434 }
435
436 #[test]
437 fn from_io_error_wraps_into_io_variant() {
438 let e: ReplicaError = io::Error::new(io::ErrorKind::ConnectionRefused, "x").into();
439 assert!(matches!(e, ReplicaError::Io(_)));
440 }
441
442 #[test]
443 fn from_wire_error_truncated_maps_to_truncated() {
444 let e: ReplicaError = WireError::Truncated.into();
445 assert!(matches!(e, ReplicaError::Truncated));
446 }
447
448 #[test]
449 fn from_wire_error_other_maps_to_frame() {
450 let e: ReplicaError = WireError::BadEnvelope.into();
451 assert!(matches!(e, ReplicaError::Frame(_)));
452 }
453
454}