Skip to main content

kevy_replicate/
source.rs

1//! Primary-side replication source — bounded backlog of recent
2//! mutations, indexed by monotonic offset.
3//!
4//! Behaviour at a glance:
5//! - [`ReplicationSource::push_mutation`] is called on every applied
6//!   write. It assigns the next monotonic offset, encodes the frame
7//!   with [`crate::wire::encode_frame`], and appends to the backlog.
8//! - The backlog is bounded by a byte budget (`max_bytes`, fed from
9//!   `[replication]` `replication_buffer_size` in config). When a new
10//!   frame would exceed the budget, the oldest frames are dropped to
11//!   make room.
12//! - Replicas that disconnect and reconnect within the backlog window
13//!   resume via [`ReplicationSource::frames_from`]. Replicas that fall
14//!   off the back of the buffer get `Err(FromOffset::TooOld)` and the
15//!   caller initiates a full snapshot ship.
16//!
17//! The source does **not** know about replicas — slot tracking lives
18//! in [`crate::slot::SlotTable`]. The source is a passive structure
19//! the streaming loop reads; mutation/serialisation lock policy is the
20//! wiring layer's concern.
21
22use crate::wire::encode_frame;
23use kevy_resp::ArgvView;
24#[cfg(test)]
25use kevy_resp::Argv;
26
27/// One encoded mutation frame parked in the backlog.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Frame {
30    /// Monotonic offset the source assigned at push time.
31    pub offset: u64,
32    /// Wire-encoded frame bytes (envelope + offset + RESP argv).
33    pub bytes: Vec<u8>,
34}
35
36/// Reason [`ReplicationSource::frames_from`] cannot serve a replica
37/// from the backlog.
38#[derive(Debug, PartialEq, Eq)]
39pub enum FromOffset {
40    /// The replica is asking for an offset we already evicted; the
41    /// streaming loop must initiate a snapshot ship.
42    TooOld,
43    /// The replica's requested offset is greater than the next offset
44    /// we would assign — peer is ahead of us (data-dir wipe, epoch
45    /// confusion, or bug). The caller should drop the link.
46    Future,
47}
48
49/// Bounded backlog of recent replicated mutations.
50pub struct ReplicationSource {
51    next_offset: u64,
52    bytes_in_buf: usize,
53    max_bytes: usize,
54    buf: std::collections::VecDeque<Frame>,
55}
56
57impl ReplicationSource {
58    /// Create a new source with the given byte budget. `max_bytes` must
59    /// be > 0; the source guarantees at most one over-budget frame at
60    /// a time (the most recently pushed) so a single huge command does
61    /// not silently disappear before its replicas even see it.
62    pub fn new(max_bytes: usize) -> Self {
63        assert!(max_bytes > 0, "ReplicationSource max_bytes must be > 0");
64        Self {
65            next_offset: 0,
66            bytes_in_buf: 0,
67            max_bytes,
68            buf: std::collections::VecDeque::new(),
69        }
70    }
71
72    /// Next offset this source would assign. Equal to one past the
73    /// last assigned offset; equals `0` for a fresh source.
74    pub fn next_offset(&self) -> u64 {
75        self.next_offset
76    }
77
78    /// Lowest offset still in the backlog, or `None` if empty.
79    pub fn oldest_offset(&self) -> Option<u64> {
80        self.buf.front().map(|f| f.offset)
81    }
82
83    /// Highest offset still in the backlog, or `None` if empty.
84    pub fn newest_offset(&self) -> Option<u64> {
85        self.buf.back().map(|f| f.offset)
86    }
87
88    /// Total bytes occupied by frames currently in the backlog.
89    pub fn buffered_bytes(&self) -> usize {
90        self.bytes_in_buf
91    }
92
93    /// Number of frames currently in the backlog.
94    pub fn len(&self) -> usize {
95        self.buf.len()
96    }
97
98    /// Whether the backlog has no frames.
99    pub fn is_empty(&self) -> bool {
100        self.buf.is_empty()
101    }
102
103    /// Append one applied mutation. Returns the offset assigned to it.
104    /// Generic over [`ArgvView`] so the dispatcher's borrowed argv can
105    /// flow straight in — no `Argv` materialisation on the write path.
106    ///
107    /// May evict older frames if the new frame would exceed the byte
108    /// budget; the new frame is always retained (even if it is larger
109    /// than `max_bytes` on its own — losing the most recent applied
110    /// write before any replica has had a chance to ack it would be
111    /// a worse failure than briefly running over budget).
112    pub fn push_mutation<A: ArgvView + ?Sized>(&mut self, argv: &A) -> u64 {
113        let offset = self.next_offset;
114        let bytes = encode_frame(offset, argv);
115        let frame_len = bytes.len();
116
117        // Evict from the front until either the new frame fits or
118        // the buffer is empty.
119        while self.bytes_in_buf + frame_len > self.max_bytes && !self.buf.is_empty() {
120            let dropped = self.buf.pop_front().expect("non-empty checked");
121            self.bytes_in_buf -= dropped.bytes.len();
122        }
123
124        self.bytes_in_buf += frame_len;
125        self.buf.push_back(Frame { offset, bytes });
126        self.next_offset = self
127            .next_offset
128            .checked_add(1)
129            .expect("replication offset wrap — i64::MAX guard tripped");
130        offset
131    }
132
133    /// Drop every buffered frame whose offset is `< watermark` —
134    /// i.e. every replica has consumed past it. Used by the per-
135    /// shard tick (T1.22.5) to enforce a retention floor tighter
136    /// than the raw byte budget; lets the backlog reclaim space
137    /// for live frames once all consumers have advanced.
138    ///
139    /// No-op when `watermark <= oldest_offset()` (nothing to drop)
140    /// or when the buffer is empty. Updates the internal byte
141    /// accounting to stay consistent with the live buffer length.
142    pub fn drop_up_to(&mut self, watermark: u64) {
143        while let Some(front) = self.buf.front() {
144            if front.offset >= watermark {
145                break;
146            }
147            let dropped = self.buf.pop_front().expect("front-of-loop");
148            self.bytes_in_buf -= dropped.bytes.len();
149        }
150    }
151
152    /// Borrow the slice of frames with offset ≥ `from`. Suitable for
153    /// the streaming loop to write each frame's `bytes` to a replica
154    /// socket. Returns:
155    /// - `Ok(iter)` — zero or more frames in offset order (empty iter
156    ///   means the replica is caught up).
157    /// - `Err(FromOffset::TooOld)` — `from` is older than the oldest
158    ///   buffered frame; the streaming loop must snapshot-ship.
159    /// - `Err(FromOffset::Future)` — `from > next_offset()`; peer is
160    ///   ahead of us, drop the link.
161    pub fn frames_from(&self, from: u64) -> Result<FramesIter<'_>, FromOffset> {
162        if from > self.next_offset {
163            return Err(FromOffset::Future);
164        }
165        // from == next_offset → replica is exactly caught up; empty slice.
166        if let Some(oldest) = self.oldest_offset()
167            && from < oldest
168        {
169            return Err(FromOffset::TooOld);
170        }
171        // Locate the start index. Offsets are monotonic so binary search
172        // is correct; the deque slices into two parts so we iterate.
173        let start = self.buf.iter().position(|f| f.offset >= from);
174        Ok(FramesIter {
175            buf: &self.buf,
176            cursor: start.unwrap_or(self.buf.len()),
177        })
178    }
179}
180
181/// Iterator over backlog frames returned by [`ReplicationSource::frames_from`].
182pub struct FramesIter<'a> {
183    buf: &'a std::collections::VecDeque<Frame>,
184    cursor: usize,
185}
186
187impl<'a> Iterator for FramesIter<'a> {
188    type Item = &'a Frame;
189    fn next(&mut self) -> Option<&'a Frame> {
190        let item = self.buf.get(self.cursor)?;
191        self.cursor += 1;
192        Some(item)
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::wire::decode_frame;
200
201    fn argv(args: &[&[u8]]) -> Argv {
202        let mut a = Argv::default();
203        for arg in args {
204            a.push(arg);
205        }
206        a
207    }
208
209    #[test]
210    fn fresh_source_is_empty() {
211        let s = ReplicationSource::new(1024);
212        assert!(s.is_empty());
213        assert_eq!(s.len(), 0);
214        assert_eq!(s.next_offset(), 0);
215        assert_eq!(s.oldest_offset(), None);
216        assert_eq!(s.newest_offset(), None);
217        assert_eq!(s.buffered_bytes(), 0);
218    }
219
220    #[test]
221    fn push_assigns_monotonic_offsets() {
222        let mut s = ReplicationSource::new(64 * 1024);
223        let o0 = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
224        let o1 = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
225        let o2 = s.push_mutation(&argv(&[b"DEL", b"a"]));
226        assert_eq!((o0, o1, o2), (0, 1, 2));
227        assert_eq!(s.oldest_offset(), Some(0));
228        assert_eq!(s.newest_offset(), Some(2));
229        assert_eq!(s.next_offset(), 3);
230        assert_eq!(s.len(), 3);
231    }
232
233    #[test]
234    fn pushed_frames_decode_back_to_the_pushed_argv() {
235        let mut s = ReplicationSource::new(1024);
236        let a = argv(&[b"HSET", b"h", b"f", b"v"]);
237        let off = s.push_mutation(&a);
238        let frame = s.buf.front().expect("one frame");
239        assert_eq!(frame.offset, off);
240        let (decoded_off, decoded_argv, used) = decode_frame(&frame.bytes).expect("decode");
241        assert_eq!(decoded_off, off);
242        assert_eq!(decoded_argv, a);
243        assert_eq!(used, frame.bytes.len());
244    }
245
246    #[test]
247    fn eviction_drops_oldest_when_budget_exceeded() {
248        // Each frame encodes to ~37 bytes (envelope + offset + 3-arg SET).
249        // Budget of 80 bytes holds 2 frames; pushing a 3rd evicts oldest.
250        let mut s = ReplicationSource::new(80);
251        let _ = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
252        let _ = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
253        assert_eq!(s.oldest_offset(), Some(0));
254        let _ = s.push_mutation(&argv(&[b"SET", b"c", b"3"]));
255        assert_eq!(s.oldest_offset(), Some(1));
256        assert_eq!(s.newest_offset(), Some(2));
257        assert!(s.buffered_bytes() <= 80);
258        // next_offset keeps climbing even when older frames are evicted.
259        assert_eq!(s.next_offset(), 3);
260    }
261
262    #[test]
263    fn oversized_single_frame_is_retained_against_budget() {
264        // Budget of 8 bytes — smaller than any real frame. The most
265        // recent push always survives so a freshly-applied write is
266        // never lost before any replica can see it.
267        let mut s = ReplicationSource::new(8);
268        let off = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
269        assert_eq!(s.len(), 1);
270        assert_eq!(s.oldest_offset(), Some(off));
271        assert!(s.buffered_bytes() > 8); // ran over budget; expected.
272        // Pushing again still keeps only the newest (older is evicted).
273        let off2 = s.push_mutation(&argv(&[b"DEL", b"k"]));
274        assert_eq!(s.len(), 1);
275        assert_eq!(s.oldest_offset(), Some(off2));
276    }
277
278    #[test]
279    fn frames_from_at_exact_offset_returns_that_frame_first() {
280        let mut s = ReplicationSource::new(1024);
281        for i in 0..5 {
282            let _ = s.push_mutation(&argv(&[b"SET", b"k", format!("{i}").as_bytes()]));
283        }
284        let mut it = s.frames_from(2).unwrap();
285        let f = it.next().expect("frame");
286        assert_eq!(f.offset, 2);
287        let remaining: Vec<u64> = it.map(|f| f.offset).collect();
288        assert_eq!(remaining, vec![3, 4]);
289    }
290
291    #[test]
292    fn frames_from_at_next_offset_is_empty_caught_up() {
293        let mut s = ReplicationSource::new(1024);
294        let _ = s.push_mutation(&argv(&[b"PING"]));
295        let _ = s.push_mutation(&argv(&[b"PING"]));
296        let it = s.frames_from(s.next_offset()).unwrap();
297        assert_eq!(it.count(), 0);
298    }
299
300    #[test]
301    fn frames_from_too_old_after_eviction() {
302        // Tight budget; push enough to evict offset 0.
303        let mut s = ReplicationSource::new(80);
304        for _ in 0..5 {
305            let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
306        }
307        // Offset 0 was evicted.
308        assert!(s.oldest_offset().unwrap() > 0);
309        assert!(matches!(s.frames_from(0), Err(FromOffset::TooOld)));
310    }
311
312    #[test]
313    fn frames_from_future_offset_rejected() {
314        let mut s = ReplicationSource::new(1024);
315        let _ = s.push_mutation(&argv(&[b"PING"]));
316        // next_offset is 1; asking for 2 is a future-offset peer.
317        assert!(matches!(s.frames_from(2), Err(FromOffset::Future)));
318    }
319
320    #[test]
321    fn frames_from_empty_source_at_zero_is_caught_up_not_too_old() {
322        // A fresh source has nothing buffered but is at offset 0; a
323        // replica asking from 0 is up-to-date (the source has nothing
324        // to send yet), not too-old.
325        let s = ReplicationSource::new(1024);
326        assert_eq!(s.frames_from(0).unwrap().count(), 0);
327        // Asking for offset 1 (one past empty next_offset 0) = Future.
328        assert!(matches!(s.frames_from(1), Err(FromOffset::Future)));
329    }
330
331    #[test]
332    fn push_mutation_accepts_argv_borrowed_from_dispatcher_hot_path() {
333        // The reactor's local fast path holds the parsed argv as an
334        // `ArgvBorrowed` over the connection read buffer (zero-copy);
335        // `push_mutation` must accept that view directly, not force
336        // a materialised `Argv`. Parse one with the public parser and
337        // push it; the decoded round-trip must match a hand-built Argv
338        // of the same command.
339        let resp = b"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
340        let (borrowed, consumed) = kevy_resp::parse_command_borrowed(resp)
341            .expect("parse ok")
342            .expect("complete frame");
343        assert_eq!(consumed, resp.len());
344
345        let mut s = ReplicationSource::new(1024);
346        let off = s.push_mutation(&borrowed);
347        assert_eq!(off, 0);
348
349        let frame = s.buf.front().expect("one frame");
350        let (decoded_off, decoded_argv, _) =
351            crate::wire::decode_frame(&frame.bytes).expect("decode");
352        assert_eq!(decoded_off, 0);
353        assert_eq!(decoded_argv, argv(&[b"SET", b"foo", b"bar"]));
354    }
355
356    #[test]
357    fn buffered_bytes_tracks_actual_frame_total() {
358        let mut s = ReplicationSource::new(1024);
359        let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
360        let _ = s.push_mutation(&argv(&[b"DEL", b"k"]));
361        let actual: usize = s.buf.iter().map(|f| f.bytes.len()).sum();
362        assert_eq!(s.buffered_bytes(), actual);
363    }
364
365    #[test]
366    fn drop_up_to_evicts_below_watermark() {
367        // T1.22.5: drop_up_to(w) evicts every frame with offset < w.
368        let mut s = ReplicationSource::new(64 * 1024);
369        for i in 0..5 {
370            let v = format!("v{i}");
371            let _ = s.push_mutation(&argv(&[b"SET", b"k", v.as_bytes()]));
372        }
373        assert_eq!(s.len(), 5);
374        let bytes_before = s.buffered_bytes();
375        // Watermark = 3 → drop offsets 0, 1, 2; keep 3, 4.
376        s.drop_up_to(3);
377        assert_eq!(s.len(), 2);
378        assert_eq!(s.oldest_offset(), Some(3));
379        assert_eq!(s.newest_offset(), Some(4));
380        // bytes accounting must shrink.
381        assert!(s.buffered_bytes() < bytes_before);
382        // Frames-from at the watermark works without TooOld.
383        let kept: Vec<_> = s.frames_from(3).unwrap().collect();
384        assert_eq!(kept.len(), 2);
385    }
386
387    #[test]
388    fn drop_up_to_below_oldest_is_noop() {
389        let mut s = ReplicationSource::new(64 * 1024);
390        let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
391        let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
392        assert_eq!(s.oldest_offset(), Some(0));
393        s.drop_up_to(0); // already-at-or-past-oldest
394        assert_eq!(s.len(), 2);
395    }
396
397    #[test]
398    fn drop_up_to_at_or_past_newest_drops_everything() {
399        let mut s = ReplicationSource::new(64 * 1024);
400        for _ in 0..3 {
401            let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
402        }
403        s.drop_up_to(99);
404        assert!(s.is_empty());
405        assert_eq!(s.buffered_bytes(), 0);
406    }
407}