Skip to main content

kevy_replicate/
replica.rs

1//! Replica-side client — connect to a primary's replication listener,
2//! perform the handshake, then yield decoded mutation frames in order.
3//!
4//! The client is **synchronous + blocking** by design: it slots into a
5//! dedicated thread on the replica node alongside (but separate from)
6//! the regular kevy reactor. An async surface is a Phase 4 deliverable
7//! (`kevy-client-async`, the only crate carved out of the 0-dep rule).
8//!
9//! Hot loop usage:
10//!
11//! ```no_run
12//! use kevy_replicate::replica::ReplicaClient;
13//!
14//! let mut client = ReplicaClient::connect("127.0.0.1:16004", "replica-a", 0)
15//!     .expect("connect ok");
16//! while let Some(result) = client.next() {
17//!     let frame = result.expect("decode ok");
18//!     // apply frame.argv at frame.offset — caller's responsibility (T1.19)
19//!     drop(frame);
20//! }
21//! ```
22//!
23//! Errors map to actionable next steps for the caller:
24//! - [`ReplicaError::HandshakeRejected`] / [`ReplicaError::AckMalformed`]
25//!   — primary refused or replied with garbage; drop the link, log,
26//!   maybe back off and retry.
27//! - [`ReplicaError::Truncated`] — peer EOF mid-frame; treat as a
28//!   disconnect, reconnect later.
29//! - [`ReplicaError::OffsetGap { expected, got }`] — frames arrived
30//!   out of order or with a skip; per plan T1.20 the caller should
31//!   trigger a full snapshot resync. v1.18.0 surfaces the gap; the
32//!   snapshot ship machinery itself lands at T1.22.
33//! - [`ReplicaError::Frame`] — wire-level decode error; same
34//!   action as Truncated (drop + reconnect).
35
36use crate::wire::WireError;
37use kevy_resp::Argv;
38use std::io::{self, Read, Write};
39use std::net::{TcpStream, ToSocketAddrs};
40use std::time::Duration;
41
42/// A decoded mutation frame the replica should apply to its local
43/// store. Ownership of the [`Argv`] passes to the caller.
44#[derive(Debug)]
45pub struct DecodedFrame {
46    /// Monotonic offset the primary assigned at apply-time.
47    pub offset: u64,
48    /// Wire-decoded argv — feed to the dispatcher the same way AOF
49    /// replay does (cmd name + arg bytes).
50    pub argv: Argv,
51}
52
53/// Event yielded by [`ReplicaClient::next_event`]. A driver loop
54/// pattern-matches and applies each:
55/// - [`Self::Frame`] → run through the local dispatcher.
56/// - [`Self::SnapshotBegin`] → caller should reset / prepare the
57///   local store for a fresh-from-snapshot fill.
58/// - [`Self::SnapshotChunk`] → append the bytes to the caller's
59///   accumulating snapshot buffer.
60/// - [`Self::SnapshotEnd`] → caller hands the accumulated buffer to
61///   `kevy_persist::load_snapshot`; [`ReplicaClient`] has already
62///   advanced `expected_offset` to `ack_offset`, so the next
63///   [`Self::Frame`] arrives at `ack_offset` with no gap.
64#[derive(Debug)]
65pub enum ReplicaEvent {
66    /// A live mutation frame.
67    Frame(DecodedFrame),
68    /// Snapshot ship begin marker (`+SNAPSHOT\r\n`).
69    SnapshotBegin,
70    /// One snapshot chunk's payload bytes (RESP bulk string body).
71    SnapshotChunk(Vec<u8>),
72    /// Snapshot ship end marker carrying the offset the next live
73    /// frame will have.
74    SnapshotEnd {
75        /// The offset the primary's `next_offset` was at when the
76        /// snapshot started. After this event, [`ReplicaClient::expected_offset`]
77        /// equals this value.
78        ack_offset: u64,
79    },
80}
81
82/// Errors a replica client can surface to its driver loop.
83#[derive(Debug)]
84pub enum ReplicaError {
85    /// Primary closed the connection or never replied during the
86    /// handshake / `+ACK` exchange.
87    HandshakeRejected,
88    /// `+ACK` line was malformed (didn't start with `+ACK `, didn't
89    /// parse the offset).
90    AckMalformed,
91    /// Peer closed the connection mid-frame; reconnect to resume.
92    Truncated,
93    /// Wire-level decode error (envelope shape wrong, payload
94    /// malformed, etc.).
95    Frame(WireError),
96    /// Frame arrived with an offset other than the expected next.
97    /// Caller should trigger a full snapshot resync (T1.22).
98    OffsetGap {
99        /// The offset the client expected next (= `last_seen + 1`).
100        expected: u64,
101        /// The offset the primary actually sent.
102        got: u64,
103    },
104    /// While streaming a snapshot, the primary sent bytes that were
105    /// neither a snapshot chunk nor `+SNAPSHOT_END`. v1.18.0 forbids
106    /// interleaving live frames inside a snapshot (see `docs/snapshot.md`).
107    UnexpectedInSnapshot,
108    /// `next_frame` was called but the next event is a snapshot
109    /// marker / chunk. Callers that want the snapshot-aware surface
110    /// must use [`ReplicaClient::next_event`].
111    SnapshotInProgress,
112    /// Underlying socket I/O failure.
113    Io(io::Error),
114}
115
116impl std::fmt::Display for ReplicaError {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self {
119            Self::HandshakeRejected => write!(f, "primary rejected replication handshake"),
120            Self::AckMalformed => write!(f, "primary sent malformed +ACK"),
121            Self::Truncated => write!(f, "replication stream truncated by peer"),
122            Self::Frame(e) => write!(f, "replication frame decode error: {e}"),
123            Self::OffsetGap { expected, got } => {
124                write!(f, "replication offset gap: expected {expected}, got {got}")
125            }
126            Self::UnexpectedInSnapshot => {
127                write!(f, "primary sent non-chunk bytes mid-snapshot")
128            }
129            Self::SnapshotInProgress => {
130                write!(f, "snapshot in progress; use next_event() to consume")
131            }
132            Self::Io(e) => write!(f, "replication socket I/O error: {e}"),
133        }
134    }
135}
136
137impl std::error::Error for ReplicaError {}
138
139impl From<io::Error> for ReplicaError {
140    fn from(e: io::Error) -> Self {
141        ReplicaError::Io(e)
142    }
143}
144
145impl From<WireError> for ReplicaError {
146    fn from(e: WireError) -> Self {
147        match e {
148            WireError::Truncated => ReplicaError::Truncated,
149            other => ReplicaError::Frame(other),
150        }
151    }
152}
153
154/// One blocking TCP connection to a primary's per-shard replication
155/// listener. After [`Self::connect`] completes the handshake, the
156/// client behaves as an `Iterator<Item = Result<DecodedFrame, ReplicaError>>`
157/// yielding frames in offset order until the peer disconnects or a
158/// hard error surfaces.
159pub struct ReplicaClient {
160    pub(crate) sock: TcpStream,
161    /// Bytes pulled off the socket waiting to parse the next frame.
162    pub(crate) buf: Vec<u8>,
163    /// Position into `buf` where the next decode attempt starts. We
164    /// drain `buf` only when this passes a high-water mark, so per-
165    /// frame work avoids repeated `Vec::drain` shifts.
166    pub(crate) cursor: usize,
167    /// Offset the primary advertised at handshake (`+ACK <N>` value).
168    /// Currently informational; T1.20 / T1.22 use it for gap-detection
169    /// decisions (re-handshake vs full sync).
170    pub(crate) primary_offset_at_handshake: u64,
171    /// The next offset we expect from the stream. Initially the
172    /// `from_offset` we requested; advances by 1 on each accepted frame.
173    pub(crate) expected_offset: u64,
174    /// `true` while we're between `+SNAPSHOT` and `+SNAPSHOT_END`.
175    /// In this state, only chunk + end-marker bytes are valid; a
176    /// `*2\r\n` (live frame envelope) returns
177    /// [`ReplicaError::UnexpectedInSnapshot`] per the v1.18 spec
178    /// (`docs/snapshot.md` — interleaving is a T1.25 extension).
179    pub(crate) in_snapshot: bool,
180}
181
182impl ReplicaClient {
183    /// Connect to `addr`, send `REPLICATE FROM <from_offset> ID <replica_id>`,
184    /// read the `+ACK <offset>` reply, and return a ready-to-iterate
185    /// client. Blocks until the handshake completes or the connect
186    /// times out (`connect_timeout` argument).
187    pub fn connect<A: ToSocketAddrs>(
188        addr: A,
189        replica_id: &str,
190        from_offset: u64,
191    ) -> Result<Self, ReplicaError> {
192        Self::connect_with_timeout(addr, replica_id, from_offset, Duration::from_secs(5))
193    }
194
195    /// [`Self::connect`] with an explicit connect timeout. Useful for
196    /// tests that don't want to wait the default 5 s when a port is
197    /// closed.
198    pub fn connect_with_timeout<A: ToSocketAddrs>(
199        addr: A,
200        replica_id: &str,
201        from_offset: u64,
202        connect_timeout: Duration,
203    ) -> Result<Self, ReplicaError> {
204        // Resolve + connect with timeout. ToSocketAddrs returns an
205        // iterator; we try each address until one succeeds.
206        let mut last_err: Option<io::Error> = None;
207        let mut sock: Option<TcpStream> = None;
208        for sa in addr.to_socket_addrs().map_err(ReplicaError::Io)? {
209            match TcpStream::connect_timeout(&sa, connect_timeout) {
210                Ok(s) => {
211                    sock = Some(s);
212                    break;
213                }
214                Err(e) => last_err = Some(e),
215            }
216        }
217        let mut sock = sock.ok_or_else(|| {
218            ReplicaError::Io(last_err.unwrap_or_else(|| {
219                io::Error::new(io::ErrorKind::InvalidInput, "no socket address resolved")
220            }))
221        })?;
222
223        // Send the handshake. `encode_replicate_from` is a private
224        // helper so the on-the-wire shape is one place to change.
225        let req = encode_replicate_from(from_offset, replica_id);
226        sock.write_all(&req)?;
227
228        // Read the `+ACK <offset>\r\n` reply. Use a small read timeout
229        // so a primary that opens the socket but never replies doesn't
230        // hang the replica forever.
231        sock.set_read_timeout(Some(connect_timeout))?;
232        let primary_offset = read_ack(&mut sock)?;
233        // Clear the read timeout for normal streaming (replica may sit
234        // for minutes with no frames if the primary is idle).
235        sock.set_read_timeout(None)?;
236        sock.set_nonblocking(false)?; // explicit: blocking reads after handshake.
237
238        Ok(ReplicaClient {
239            sock,
240            buf: Vec::with_capacity(8 * 1024),
241            cursor: 0,
242            primary_offset_at_handshake: primary_offset,
243            expected_offset: from_offset,
244            in_snapshot: false,
245        })
246    }
247
248    /// Offset the primary reported at handshake (`+ACK <N>` value).
249    /// Informational — exposed so callers can log + future T1.22
250    /// snapshot-ship logic can compare against the local applied
251    /// offset to decide resume vs full-sync.
252    pub fn primary_offset_at_handshake(&self) -> u64 {
253        self.primary_offset_at_handshake
254    }
255
256    /// Return a `try_clone`'d handle on the underlying socket. The
257    /// clone shares the same kernel file description, so calling
258    /// `shutdown(Shutdown::Both)` on it unblocks any in-flight
259    /// blocking read on the original (and vice versa). T1.29.5 uses
260    /// this to interrupt a runner thread parked in `next_event` when
261    /// `REPLICAOF` retargets or `REPLICAOF NO ONE` demotes — without
262    /// this handle, the runner stays blocked until the upstream peer
263    /// closes the connection.
264    pub fn socket_handle(&self) -> io::Result<TcpStream> {
265        self.sock.try_clone()
266    }
267
268    /// The offset the next frame should carry. Advances on every
269    /// successful `next()`.
270    pub fn expected_offset(&self) -> u64 {
271        self.expected_offset
272    }
273
274    /// Pull the next frame from the stream. Frame-only convenience —
275    /// returns [`ReplicaError::SnapshotInProgress`] if the primary is
276    /// sending a snapshot. Callers that need the snapshot-aware
277    /// surface (T1.22) must use [`Self::next_event`] instead.
278    /// Returns `None` on clean peer EOF (no buffered bytes left).
279    pub fn next_frame(&mut self) -> Option<Result<DecodedFrame, ReplicaError>> {
280        match self.next_event()? {
281            Ok(ReplicaEvent::Frame(f)) => Some(Ok(f)),
282            Ok(_) => Some(Err(ReplicaError::SnapshotInProgress)),
283            Err(e) => Some(Err(e)),
284        }
285    }
286
287    /// Drop already-consumed prefix when the cursor has walked past
288    /// 4 KiB of buffer (amortises per-frame work without doing a full
289    /// `drain` on every frame). Used by the event-decoding helpers
290    /// in [`crate::replica_decode`].
291    pub(crate) fn maybe_compact_buf(&mut self) {
292        if self.cursor >= 4 * 1024 {
293            self.buf.drain(..self.cursor);
294            self.cursor = 0;
295        }
296    }
297}
298
299impl Iterator for ReplicaClient {
300    type Item = Result<DecodedFrame, ReplicaError>;
301    /// Frame-only iterator. Use [`ReplicaClient::next_event`] for the
302    /// snapshot-aware surface.
303    fn next(&mut self) -> Option<Self::Item> {
304        self.next_frame()
305    }
306}
307
308/// Compose a `REPLICATE FROM <offset> ID <id>` RESP2 multi-bulk
309/// request — symmetric to `handshake::parse_replicate_from` on the
310/// primary side.
311fn encode_replicate_from(from_offset: u64, replica_id: &str) -> Vec<u8> {
312    let mut v = Vec::with_capacity(64 + replica_id.len());
313    v.extend_from_slice(b"*5\r\n");
314    let offset_str = from_offset.to_string();
315    for arg in [
316        b"REPLICATE".as_slice(),
317        b"FROM",
318        offset_str.as_bytes(),
319        b"ID",
320        replica_id.as_bytes(),
321    ] {
322        let header = format!("${}\r\n", arg.len());
323        v.extend_from_slice(header.as_bytes());
324        v.extend_from_slice(arg);
325        v.extend_from_slice(b"\r\n");
326    }
327    v
328}
329
330/// Read `+ACK <offset>\r\n` from `sock`, return the parsed offset.
331/// Pulls one byte at a time — the reply is < 30 bytes, so the per-
332/// byte syscall cost is negligible and avoids a buffering surface
333/// we'd have to thread into the client struct just for the handshake.
334fn read_ack(sock: &mut TcpStream) -> Result<u64, ReplicaError> {
335    let mut line = Vec::with_capacity(32);
336    let mut b = [0u8; 1];
337    loop {
338        match sock.read(&mut b) {
339            Ok(0) => return Err(ReplicaError::HandshakeRejected),
340            Ok(_) => {
341                line.push(b[0]);
342                if line.len() >= 2 && line.ends_with(b"\r\n") {
343                    break;
344                }
345                if line.len() > 256 {
346                    return Err(ReplicaError::AckMalformed);
347                }
348            }
349            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
350            Err(e) => return Err(ReplicaError::Io(e)),
351        }
352    }
353    parse_ack_line(&line)
354}
355
356fn parse_ack_line(line: &[u8]) -> Result<u64, ReplicaError> {
357    let body = line.strip_suffix(b"\r\n").ok_or(ReplicaError::AckMalformed)?;
358    let body = body.strip_prefix(b"+ACK ").ok_or(ReplicaError::AckMalformed)?;
359    let s = std::str::from_utf8(body).map_err(|_| ReplicaError::AckMalformed)?;
360    s.parse::<u64>().map_err(|_| ReplicaError::AckMalformed)
361}
362
363#[cfg(test)]
364impl ReplicaClient {
365    /// Test-only constructor that wraps an already-connected socket
366    /// without doing the handshake. Lets unit tests drive the event
367    /// loop against canned bytes from the other end of a TcpStream pair.
368    pub(crate) fn from_socket_for_test(sock: TcpStream, expected_offset: u64) -> Self {
369        Self {
370            sock,
371            buf: Vec::with_capacity(8 * 1024),
372            cursor: 0,
373            primary_offset_at_handshake: expected_offset,
374            expected_offset,
375            in_snapshot: false,
376        }
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    #[test]
385    fn encoded_replicate_from_matches_what_primary_parses() {
386        // Round-trip: encode here, parse via the primary-side parser.
387        let bytes = encode_replicate_from(42, "replica-a");
388        let mut argv = Argv::default();
389        let consumed = kevy_resp::parse_command_into(&bytes, &mut argv)
390            .expect("parse ok")
391            .expect("complete");
392        assert_eq!(consumed, bytes.len());
393        let req = crate::handshake::parse_replicate_from(&argv).expect("handshake ok");
394        assert_eq!(req.from_offset, 42);
395        assert_eq!(req.replica_id, "replica-a");
396    }
397
398    #[test]
399    fn ack_line_parses_offsets() {
400        assert_eq!(parse_ack_line(b"+ACK 0\r\n").unwrap(), 0);
401        assert_eq!(parse_ack_line(b"+ACK 42\r\n").unwrap(), 42);
402        assert_eq!(parse_ack_line(b"+ACK 12345678\r\n").unwrap(), 12_345_678);
403    }
404
405    #[test]
406    fn ack_line_rejects_malformed() {
407        assert!(matches!(
408            parse_ack_line(b"+PONG\r\n"),
409            Err(ReplicaError::AckMalformed)
410        ));
411        assert!(matches!(
412            parse_ack_line(b"+ACK abc\r\n"),
413            Err(ReplicaError::AckMalformed)
414        ));
415        assert!(matches!(
416            parse_ack_line(b"-ERR nope\r\n"),
417            Err(ReplicaError::AckMalformed)
418        ));
419        // Missing CRLF.
420        assert!(matches!(
421            parse_ack_line(b"+ACK 1"),
422            Err(ReplicaError::AckMalformed)
423        ));
424    }
425
426    #[test]
427    fn ack_line_rejects_offset_overflow() {
428        // 21+ digits — beyond u64::MAX. parse::<u64>() returns Err →
429        // AckMalformed.
430        assert!(matches!(
431            parse_ack_line(b"+ACK 99999999999999999999999\r\n"),
432            Err(ReplicaError::AckMalformed)
433        ));
434    }
435
436    #[test]
437    fn from_io_error_wraps_into_io_variant() {
438        let e: ReplicaError = io::Error::new(io::ErrorKind::ConnectionRefused, "x").into();
439        assert!(matches!(e, ReplicaError::Io(_)));
440    }
441
442    #[test]
443    fn from_wire_error_truncated_maps_to_truncated() {
444        let e: ReplicaError = WireError::Truncated.into();
445        assert!(matches!(e, ReplicaError::Truncated));
446    }
447
448    #[test]
449    fn from_wire_error_other_maps_to_frame() {
450        let e: ReplicaError = WireError::BadEnvelope.into();
451        assert!(matches!(e, ReplicaError::Frame(_)));
452    }
453
454}