Skip to main content

kevy_replicate/
source.rs

1//! Primary-side replication source — bounded backlog of recent
2//! mutations, indexed by monotonic offset.
3//!
4//! Behaviour at a glance:
5//! - [`ReplicationSource::push_mutation`] is called on every applied
6//!   write. It assigns the next monotonic offset, encodes the frame
7//!   with [`crate::wire::encode_frame`], and appends to the backlog.
8//! - The backlog is bounded by a byte budget (`max_bytes`, fed from
9//!   `[replication]` `replication_buffer_size` in config). When a new
10//!   frame would exceed the budget, the oldest frames are dropped to
11//!   make room.
12//! - Replicas that disconnect and reconnect within the backlog window
13//!   resume via [`ReplicationSource::frames_from`]. Replicas that fall
14//!   off the back of the buffer get `Err(FromOffset::TooOld)` and the
15//!   caller initiates a full snapshot ship.
16//!
17//! The source does **not** know about replicas — slot tracking lives
18//! in [`crate::slot::SlotTable`]. The source is a passive structure
19//! the streaming loop reads; mutation/serialisation lock policy is the
20//! wiring layer's concern.
21
22use crate::wire::encode_frame;
23use kevy_resp::ArgvView;
24#[cfg(test)]
25use kevy_resp::Argv;
26
27/// One encoded mutation frame parked in the backlog.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Frame {
30    /// Monotonic offset the source assigned at push time.
31    pub offset: u64,
32    /// Wire-encoded frame bytes (envelope + offset + RESP argv).
33    pub bytes: Vec<u8>,
34}
35
36/// Reason [`ReplicationSource::frames_from`] cannot serve a replica
37/// from the backlog.
38#[derive(Debug, PartialEq, Eq)]
39pub enum FromOffset {
40    /// The replica is asking for an offset we already evicted; the
41    /// streaming loop must initiate a snapshot ship.
42    TooOld,
43    /// The replica's requested offset is greater than the next offset
44    /// we would assign — peer is ahead of us (data-dir wipe, epoch
45    /// confusion, or bug). The caller should drop the link.
46    Future,
47}
48
49/// Bounded backlog of recent replicated mutations.
50pub struct ReplicationSource {
51    next_offset: u64,
52    bytes_in_buf: usize,
53    max_bytes: usize,
54    buf: std::collections::VecDeque<Frame>,
55}
56
57impl ReplicationSource {
58    /// Create a new source with the given byte budget. `max_bytes` must
59    /// be > 0; the source guarantees at most one over-budget frame at
60    /// a time (the most recently pushed) so a single huge command does
61    /// not silently disappear before its replicas even see it.
62    pub fn new(max_bytes: usize) -> Self {
63        assert!(max_bytes > 0, "ReplicationSource max_bytes must be > 0");
64        Self {
65            next_offset: 0,
66            bytes_in_buf: 0,
67            max_bytes,
68            buf: std::collections::VecDeque::new(),
69        }
70    }
71
72    /// Next offset this source would assign. Equal to one past the
73    /// last assigned offset; equals `0` for a fresh source.
74    pub fn next_offset(&self) -> u64 {
75        self.next_offset
76    }
77
78    /// Lowest offset still in the backlog, or `None` if empty.
79    pub fn oldest_offset(&self) -> Option<u64> {
80        self.buf.front().map(|f| f.offset)
81    }
82
83    /// Highest offset still in the backlog, or `None` if empty.
84    pub fn newest_offset(&self) -> Option<u64> {
85        self.buf.back().map(|f| f.offset)
86    }
87
88    /// Total bytes occupied by frames currently in the backlog.
89    pub fn buffered_bytes(&self) -> usize {
90        self.bytes_in_buf
91    }
92
93    /// Number of frames currently in the backlog.
94    pub fn len(&self) -> usize {
95        self.buf.len()
96    }
97
98    /// Whether the backlog has no frames.
99    pub fn is_empty(&self) -> bool {
100        self.buf.is_empty()
101    }
102
103    /// Append one applied mutation. Returns the offset assigned to it.
104    /// Generic over [`ArgvView`] so the dispatcher's borrowed argv can
105    /// flow straight in — no `Argv` materialisation on the write path.
106    ///
107    /// May evict older frames if the new frame would exceed the byte
108    /// budget; the new frame is always retained (even if it is larger
109    /// than `max_bytes` on its own — losing the most recent applied
110    /// write before any replica has had a chance to ack it would be
111    /// a worse failure than briefly running over budget).
112    pub fn push_mutation<A: ArgvView + ?Sized>(&mut self, argv: &A) -> u64 {
113        let offset = self.next_offset;
114        let bytes = encode_frame(offset, argv);
115        let frame_len = bytes.len();
116
117        // Evict from the front until either the new frame fits or
118        // the buffer is empty.
119        while self.bytes_in_buf + frame_len > self.max_bytes && !self.buf.is_empty() {
120            let dropped = self.buf.pop_front().expect("non-empty checked");
121            self.bytes_in_buf -= dropped.bytes.len();
122        }
123
124        self.bytes_in_buf += frame_len;
125        self.buf.push_back(Frame { offset, bytes });
126        self.next_offset = self
127            .next_offset
128            .checked_add(1)
129            .expect("replication offset wrap — i64::MAX guard tripped");
130        offset
131    }
132
133    /// Drop every buffered frame whose offset is `< watermark` —
134    /// i.e. every replica has consumed past it. Used by the per-
135    /// shard tick (T1.22.5) to enforce a retention floor tighter
136    /// than the raw byte budget; lets the backlog reclaim space
137    /// for live frames once all consumers have advanced.
138    ///
139    /// No-op when `watermark <= oldest_offset()` (nothing to drop)
140    /// or when the buffer is empty. Updates the internal byte
141    /// accounting to stay consistent with the live buffer length.
142    pub fn drop_up_to(&mut self, watermark: u64) {
143        while let Some(front) = self.buf.front() {
144            if front.offset >= watermark {
145                break;
146            }
147            let dropped = self.buf.pop_front().expect("front-of-loop");
148            self.bytes_in_buf -= dropped.bytes.len();
149        }
150    }
151
152    /// Borrow the slice of frames with offset ≥ `from`. Suitable for
153    /// the streaming loop to write each frame's `bytes` to a replica
154    /// socket. Returns:
155    /// - `Ok(iter)` — zero or more frames in offset order (empty iter
156    ///   means the replica is caught up).
157    /// - `Err(FromOffset::TooOld)` — `from` is older than the oldest
158    ///   buffered frame; the streaming loop must snapshot-ship.
159    /// - `Err(FromOffset::Future)` — `from > next_offset()`; peer is
160    ///   ahead of us, drop the link.
161    pub fn frames_from(&self, from: u64) -> Result<FramesIter<'_>, FromOffset> {
162        if from > self.next_offset {
163            return Err(FromOffset::Future);
164        }
165        // from == next_offset → replica is exactly caught up; empty slice.
166        if from == self.next_offset {
167            return Ok(FramesIter {
168                buf: &self.buf,
169                cursor: self.buf.len(),
170            });
171        }
172        // from < next_offset: the requested frame either is still in
173        // the backlog or was evicted. Empty buf with from < next_offset
174        // means every frame ever pushed has been evicted — same TooOld
175        // outcome as `from < oldest`. (Without this branch the function
176        // returns an empty iterator and the streaming pump silently
177        // stalls — the v1.20 embed-replica restart test caught this.)
178        match self.oldest_offset() {
179            Some(oldest) if from < oldest => return Err(FromOffset::TooOld),
180            None => return Err(FromOffset::TooOld),
181            _ => {}
182        }
183        // Locate the start index. Offsets are monotonic so binary search
184        // is correct; the deque slices into two parts so we iterate.
185        let start = self.buf.iter().position(|f| f.offset >= from);
186        Ok(FramesIter {
187            buf: &self.buf,
188            cursor: start.unwrap_or(self.buf.len()),
189        })
190    }
191}
192
193/// Iterator over backlog frames returned by [`ReplicationSource::frames_from`].
194pub struct FramesIter<'a> {
195    buf: &'a std::collections::VecDeque<Frame>,
196    cursor: usize,
197}
198
199impl<'a> Iterator for FramesIter<'a> {
200    type Item = &'a Frame;
201    fn next(&mut self) -> Option<&'a Frame> {
202        let item = self.buf.get(self.cursor)?;
203        self.cursor += 1;
204        Some(item)
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::wire::decode_frame;
212
213    fn argv(args: &[&[u8]]) -> Argv {
214        let mut a = Argv::default();
215        for arg in args {
216            a.push(arg);
217        }
218        a
219    }
220
221    #[test]
222    fn fresh_source_is_empty() {
223        let s = ReplicationSource::new(1024);
224        assert!(s.is_empty());
225        assert_eq!(s.len(), 0);
226        assert_eq!(s.next_offset(), 0);
227        assert_eq!(s.oldest_offset(), None);
228        assert_eq!(s.newest_offset(), None);
229        assert_eq!(s.buffered_bytes(), 0);
230    }
231
232    #[test]
233    fn push_assigns_monotonic_offsets() {
234        let mut s = ReplicationSource::new(64 * 1024);
235        let o0 = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
236        let o1 = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
237        let o2 = s.push_mutation(&argv(&[b"DEL", b"a"]));
238        assert_eq!((o0, o1, o2), (0, 1, 2));
239        assert_eq!(s.oldest_offset(), Some(0));
240        assert_eq!(s.newest_offset(), Some(2));
241        assert_eq!(s.next_offset(), 3);
242        assert_eq!(s.len(), 3);
243    }
244
245    #[test]
246    fn pushed_frames_decode_back_to_the_pushed_argv() {
247        let mut s = ReplicationSource::new(1024);
248        let a = argv(&[b"HSET", b"h", b"f", b"v"]);
249        let off = s.push_mutation(&a);
250        let frame = s.buf.front().expect("one frame");
251        assert_eq!(frame.offset, off);
252        let (decoded_off, decoded_argv, used) = decode_frame(&frame.bytes).expect("decode");
253        assert_eq!(decoded_off, off);
254        assert_eq!(decoded_argv, a);
255        assert_eq!(used, frame.bytes.len());
256    }
257
258    #[test]
259    fn eviction_drops_oldest_when_budget_exceeded() {
260        // Each frame encodes to ~37 bytes (envelope + offset + 3-arg SET).
261        // Budget of 80 bytes holds 2 frames; pushing a 3rd evicts oldest.
262        let mut s = ReplicationSource::new(80);
263        let _ = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
264        let _ = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
265        assert_eq!(s.oldest_offset(), Some(0));
266        let _ = s.push_mutation(&argv(&[b"SET", b"c", b"3"]));
267        assert_eq!(s.oldest_offset(), Some(1));
268        assert_eq!(s.newest_offset(), Some(2));
269        assert!(s.buffered_bytes() <= 80);
270        // next_offset keeps climbing even when older frames are evicted.
271        assert_eq!(s.next_offset(), 3);
272    }
273
274    #[test]
275    fn oversized_single_frame_is_retained_against_budget() {
276        // Budget of 8 bytes — smaller than any real frame. The most
277        // recent push always survives so a freshly-applied write is
278        // never lost before any replica can see it.
279        let mut s = ReplicationSource::new(8);
280        let off = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
281        assert_eq!(s.len(), 1);
282        assert_eq!(s.oldest_offset(), Some(off));
283        assert!(s.buffered_bytes() > 8); // ran over budget; expected.
284        // Pushing again still keeps only the newest (older is evicted).
285        let off2 = s.push_mutation(&argv(&[b"DEL", b"k"]));
286        assert_eq!(s.len(), 1);
287        assert_eq!(s.oldest_offset(), Some(off2));
288    }
289
290    #[test]
291    fn frames_from_at_exact_offset_returns_that_frame_first() {
292        let mut s = ReplicationSource::new(1024);
293        for i in 0..5 {
294            let _ = s.push_mutation(&argv(&[b"SET", b"k", format!("{i}").as_bytes()]));
295        }
296        let mut it = s.frames_from(2).unwrap();
297        let f = it.next().expect("frame");
298        assert_eq!(f.offset, 2);
299        let remaining: Vec<u64> = it.map(|f| f.offset).collect();
300        assert_eq!(remaining, vec![3, 4]);
301    }
302
303    #[test]
304    fn frames_from_at_next_offset_is_empty_caught_up() {
305        let mut s = ReplicationSource::new(1024);
306        let _ = s.push_mutation(&argv(&[b"PING"]));
307        let _ = s.push_mutation(&argv(&[b"PING"]));
308        let it = s.frames_from(s.next_offset()).unwrap();
309        assert_eq!(it.count(), 0);
310    }
311
312    #[test]
313    fn frames_from_too_old_after_eviction() {
314        // Tight budget; push enough to evict offset 0.
315        let mut s = ReplicationSource::new(80);
316        for _ in 0..5 {
317            let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
318        }
319        // Offset 0 was evicted.
320        assert!(s.oldest_offset().unwrap() > 0);
321        assert!(matches!(s.frames_from(0), Err(FromOffset::TooOld)));
322    }
323
324    #[test]
325    fn frames_from_future_offset_rejected() {
326        let mut s = ReplicationSource::new(1024);
327        let _ = s.push_mutation(&argv(&[b"PING"]));
328        // next_offset is 1; asking for 2 is a future-offset peer.
329        assert!(matches!(s.frames_from(2), Err(FromOffset::Future)));
330    }
331
332    #[test]
333    fn frames_from_empty_source_at_zero_is_caught_up_not_too_old() {
334        // A fresh source has nothing buffered but is at offset 0; a
335        // replica asking from 0 is up-to-date (the source has nothing
336        // to send yet), not too-old.
337        let s = ReplicationSource::new(1024);
338        assert_eq!(s.frames_from(0).unwrap().count(), 0);
339        // Asking for offset 1 (one past empty next_offset 0) = Future.
340        assert!(matches!(s.frames_from(1), Err(FromOffset::Future)));
341    }
342
343    #[test]
344    fn push_mutation_accepts_argv_borrowed_from_dispatcher_hot_path() {
345        // The reactor's local fast path holds the parsed argv as an
346        // `ArgvBorrowed` over the connection read buffer (zero-copy);
347        // `push_mutation` must accept that view directly, not force
348        // a materialised `Argv`. Parse one with the public parser and
349        // push it; the decoded round-trip must match a hand-built Argv
350        // of the same command.
351        let resp = b"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
352        let (borrowed, consumed) = kevy_resp::parse_command_borrowed(resp)
353            .expect("parse ok")
354            .expect("complete frame");
355        assert_eq!(consumed, resp.len());
356
357        let mut s = ReplicationSource::new(1024);
358        let off = s.push_mutation(&borrowed);
359        assert_eq!(off, 0);
360
361        let frame = s.buf.front().expect("one frame");
362        let (decoded_off, decoded_argv, _) =
363            crate::wire::decode_frame(&frame.bytes).expect("decode");
364        assert_eq!(decoded_off, 0);
365        assert_eq!(decoded_argv, argv(&[b"SET", b"foo", b"bar"]));
366    }
367
368    #[test]
369    fn buffered_bytes_tracks_actual_frame_total() {
370        let mut s = ReplicationSource::new(1024);
371        let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
372        let _ = s.push_mutation(&argv(&[b"DEL", b"k"]));
373        let actual: usize = s.buf.iter().map(|f| f.bytes.len()).sum();
374        assert_eq!(s.buffered_bytes(), actual);
375    }
376
377    #[test]
378    fn drop_up_to_evicts_below_watermark() {
379        // T1.22.5: drop_up_to(w) evicts every frame with offset < w.
380        let mut s = ReplicationSource::new(64 * 1024);
381        for i in 0..5 {
382            let v = format!("v{i}");
383            let _ = s.push_mutation(&argv(&[b"SET", b"k", v.as_bytes()]));
384        }
385        assert_eq!(s.len(), 5);
386        let bytes_before = s.buffered_bytes();
387        // Watermark = 3 → drop offsets 0, 1, 2; keep 3, 4.
388        s.drop_up_to(3);
389        assert_eq!(s.len(), 2);
390        assert_eq!(s.oldest_offset(), Some(3));
391        assert_eq!(s.newest_offset(), Some(4));
392        // bytes accounting must shrink.
393        assert!(s.buffered_bytes() < bytes_before);
394        // Frames-from at the watermark works without TooOld.
395        let kept: Vec<_> = s.frames_from(3).unwrap().collect();
396        assert_eq!(kept.len(), 2);
397    }
398
399    #[test]
400    fn drop_up_to_below_oldest_is_noop() {
401        let mut s = ReplicationSource::new(64 * 1024);
402        let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
403        let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
404        assert_eq!(s.oldest_offset(), Some(0));
405        s.drop_up_to(0); // already-at-or-past-oldest
406        assert_eq!(s.len(), 2);
407    }
408
409    #[test]
410    fn drop_up_to_at_or_past_newest_drops_everything() {
411        let mut s = ReplicationSource::new(64 * 1024);
412        for _ in 0..3 {
413            let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
414        }
415        s.drop_up_to(99);
416        assert!(s.is_empty());
417        assert_eq!(s.buffered_bytes(), 0);
418    }
419}