1use crate::wire::encode_frame;
23use kevy_resp::ArgvView;
24#[cfg(test)]
25use kevy_resp::Argv;
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Frame {
30 pub offset: u64,
32 pub bytes: Vec<u8>,
34}
35
36#[derive(Debug, PartialEq, Eq)]
39pub enum FromOffset {
40 TooOld,
43 Future,
47}
48
49pub struct ReplicationSource {
51 next_offset: u64,
52 bytes_in_buf: usize,
53 max_bytes: usize,
54 buf: std::collections::VecDeque<Frame>,
55}
56
57impl ReplicationSource {
58 pub fn new(max_bytes: usize) -> Self {
63 assert!(max_bytes > 0, "ReplicationSource max_bytes must be > 0");
64 Self {
65 next_offset: 0,
66 bytes_in_buf: 0,
67 max_bytes,
68 buf: std::collections::VecDeque::new(),
69 }
70 }
71
72 pub fn next_offset(&self) -> u64 {
75 self.next_offset
76 }
77
78 pub fn oldest_offset(&self) -> Option<u64> {
80 self.buf.front().map(|f| f.offset)
81 }
82
83 pub fn newest_offset(&self) -> Option<u64> {
85 self.buf.back().map(|f| f.offset)
86 }
87
88 pub fn buffered_bytes(&self) -> usize {
90 self.bytes_in_buf
91 }
92
93 pub fn len(&self) -> usize {
95 self.buf.len()
96 }
97
98 pub fn is_empty(&self) -> bool {
100 self.buf.is_empty()
101 }
102
103 pub fn push_mutation<A: ArgvView + ?Sized>(&mut self, argv: &A) -> u64 {
113 let offset = self.next_offset;
114 let bytes = encode_frame(offset, argv);
115 let frame_len = bytes.len();
116
117 while self.bytes_in_buf + frame_len > self.max_bytes && !self.buf.is_empty() {
120 let dropped = self.buf.pop_front().expect("non-empty checked");
121 self.bytes_in_buf -= dropped.bytes.len();
122 }
123
124 self.bytes_in_buf += frame_len;
125 self.buf.push_back(Frame { offset, bytes });
126 self.next_offset = self
127 .next_offset
128 .checked_add(1)
129 .expect("replication offset wrap — i64::MAX guard tripped");
130 offset
131 }
132
133 pub fn drop_up_to(&mut self, watermark: u64) {
143 while let Some(front) = self.buf.front() {
144 if front.offset >= watermark {
145 break;
146 }
147 let dropped = self.buf.pop_front().expect("front-of-loop");
148 self.bytes_in_buf -= dropped.bytes.len();
149 }
150 }
151
152 pub fn frames_from(&self, from: u64) -> Result<FramesIter<'_>, FromOffset> {
162 if from > self.next_offset {
163 return Err(FromOffset::Future);
164 }
165 if let Some(oldest) = self.oldest_offset()
167 && from < oldest
168 {
169 return Err(FromOffset::TooOld);
170 }
171 let start = self.buf.iter().position(|f| f.offset >= from);
174 Ok(FramesIter {
175 buf: &self.buf,
176 cursor: start.unwrap_or(self.buf.len()),
177 })
178 }
179}
180
181pub struct FramesIter<'a> {
183 buf: &'a std::collections::VecDeque<Frame>,
184 cursor: usize,
185}
186
187impl<'a> Iterator for FramesIter<'a> {
188 type Item = &'a Frame;
189 fn next(&mut self) -> Option<&'a Frame> {
190 let item = self.buf.get(self.cursor)?;
191 self.cursor += 1;
192 Some(item)
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::wire::decode_frame;
200
201 fn argv(args: &[&[u8]]) -> Argv {
202 let mut a = Argv::default();
203 for arg in args {
204 a.push(arg);
205 }
206 a
207 }
208
209 #[test]
210 fn fresh_source_is_empty() {
211 let s = ReplicationSource::new(1024);
212 assert!(s.is_empty());
213 assert_eq!(s.len(), 0);
214 assert_eq!(s.next_offset(), 0);
215 assert_eq!(s.oldest_offset(), None);
216 assert_eq!(s.newest_offset(), None);
217 assert_eq!(s.buffered_bytes(), 0);
218 }
219
220 #[test]
221 fn push_assigns_monotonic_offsets() {
222 let mut s = ReplicationSource::new(64 * 1024);
223 let o0 = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
224 let o1 = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
225 let o2 = s.push_mutation(&argv(&[b"DEL", b"a"]));
226 assert_eq!((o0, o1, o2), (0, 1, 2));
227 assert_eq!(s.oldest_offset(), Some(0));
228 assert_eq!(s.newest_offset(), Some(2));
229 assert_eq!(s.next_offset(), 3);
230 assert_eq!(s.len(), 3);
231 }
232
233 #[test]
234 fn pushed_frames_decode_back_to_the_pushed_argv() {
235 let mut s = ReplicationSource::new(1024);
236 let a = argv(&[b"HSET", b"h", b"f", b"v"]);
237 let off = s.push_mutation(&a);
238 let frame = s.buf.front().expect("one frame");
239 assert_eq!(frame.offset, off);
240 let (decoded_off, decoded_argv, used) = decode_frame(&frame.bytes).expect("decode");
241 assert_eq!(decoded_off, off);
242 assert_eq!(decoded_argv, a);
243 assert_eq!(used, frame.bytes.len());
244 }
245
246 #[test]
247 fn eviction_drops_oldest_when_budget_exceeded() {
248 let mut s = ReplicationSource::new(80);
251 let _ = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
252 let _ = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
253 assert_eq!(s.oldest_offset(), Some(0));
254 let _ = s.push_mutation(&argv(&[b"SET", b"c", b"3"]));
255 assert_eq!(s.oldest_offset(), Some(1));
256 assert_eq!(s.newest_offset(), Some(2));
257 assert!(s.buffered_bytes() <= 80);
258 assert_eq!(s.next_offset(), 3);
260 }
261
262 #[test]
263 fn oversized_single_frame_is_retained_against_budget() {
264 let mut s = ReplicationSource::new(8);
268 let off = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
269 assert_eq!(s.len(), 1);
270 assert_eq!(s.oldest_offset(), Some(off));
271 assert!(s.buffered_bytes() > 8); let off2 = s.push_mutation(&argv(&[b"DEL", b"k"]));
274 assert_eq!(s.len(), 1);
275 assert_eq!(s.oldest_offset(), Some(off2));
276 }
277
278 #[test]
279 fn frames_from_at_exact_offset_returns_that_frame_first() {
280 let mut s = ReplicationSource::new(1024);
281 for i in 0..5 {
282 let _ = s.push_mutation(&argv(&[b"SET", b"k", format!("{i}").as_bytes()]));
283 }
284 let mut it = s.frames_from(2).unwrap();
285 let f = it.next().expect("frame");
286 assert_eq!(f.offset, 2);
287 let remaining: Vec<u64> = it.map(|f| f.offset).collect();
288 assert_eq!(remaining, vec![3, 4]);
289 }
290
291 #[test]
292 fn frames_from_at_next_offset_is_empty_caught_up() {
293 let mut s = ReplicationSource::new(1024);
294 let _ = s.push_mutation(&argv(&[b"PING"]));
295 let _ = s.push_mutation(&argv(&[b"PING"]));
296 let it = s.frames_from(s.next_offset()).unwrap();
297 assert_eq!(it.count(), 0);
298 }
299
300 #[test]
301 fn frames_from_too_old_after_eviction() {
302 let mut s = ReplicationSource::new(80);
304 for _ in 0..5 {
305 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
306 }
307 assert!(s.oldest_offset().unwrap() > 0);
309 assert!(matches!(s.frames_from(0), Err(FromOffset::TooOld)));
310 }
311
312 #[test]
313 fn frames_from_future_offset_rejected() {
314 let mut s = ReplicationSource::new(1024);
315 let _ = s.push_mutation(&argv(&[b"PING"]));
316 assert!(matches!(s.frames_from(2), Err(FromOffset::Future)));
318 }
319
320 #[test]
321 fn frames_from_empty_source_at_zero_is_caught_up_not_too_old() {
322 let s = ReplicationSource::new(1024);
326 assert_eq!(s.frames_from(0).unwrap().count(), 0);
327 assert!(matches!(s.frames_from(1), Err(FromOffset::Future)));
329 }
330
331 #[test]
332 fn push_mutation_accepts_argv_borrowed_from_dispatcher_hot_path() {
333 let resp = b"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
340 let (borrowed, consumed) = kevy_resp::parse_command_borrowed(resp)
341 .expect("parse ok")
342 .expect("complete frame");
343 assert_eq!(consumed, resp.len());
344
345 let mut s = ReplicationSource::new(1024);
346 let off = s.push_mutation(&borrowed);
347 assert_eq!(off, 0);
348
349 let frame = s.buf.front().expect("one frame");
350 let (decoded_off, decoded_argv, _) =
351 crate::wire::decode_frame(&frame.bytes).expect("decode");
352 assert_eq!(decoded_off, 0);
353 assert_eq!(decoded_argv, argv(&[b"SET", b"foo", b"bar"]));
354 }
355
356 #[test]
357 fn buffered_bytes_tracks_actual_frame_total() {
358 let mut s = ReplicationSource::new(1024);
359 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
360 let _ = s.push_mutation(&argv(&[b"DEL", b"k"]));
361 let actual: usize = s.buf.iter().map(|f| f.bytes.len()).sum();
362 assert_eq!(s.buffered_bytes(), actual);
363 }
364
365 #[test]
366 fn drop_up_to_evicts_below_watermark() {
367 let mut s = ReplicationSource::new(64 * 1024);
369 for i in 0..5 {
370 let v = format!("v{i}");
371 let _ = s.push_mutation(&argv(&[b"SET", b"k", v.as_bytes()]));
372 }
373 assert_eq!(s.len(), 5);
374 let bytes_before = s.buffered_bytes();
375 s.drop_up_to(3);
377 assert_eq!(s.len(), 2);
378 assert_eq!(s.oldest_offset(), Some(3));
379 assert_eq!(s.newest_offset(), Some(4));
380 assert!(s.buffered_bytes() < bytes_before);
382 let kept: Vec<_> = s.frames_from(3).unwrap().collect();
384 assert_eq!(kept.len(), 2);
385 }
386
387 #[test]
388 fn drop_up_to_below_oldest_is_noop() {
389 let mut s = ReplicationSource::new(64 * 1024);
390 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
391 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
392 assert_eq!(s.oldest_offset(), Some(0));
393 s.drop_up_to(0); assert_eq!(s.len(), 2);
395 }
396
397 #[test]
398 fn drop_up_to_at_or_past_newest_drops_everything() {
399 let mut s = ReplicationSource::new(64 * 1024);
400 for _ in 0..3 {
401 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
402 }
403 s.drop_up_to(99);
404 assert!(s.is_empty());
405 assert_eq!(s.buffered_bytes(), 0);
406 }
407}