1use crate::wire::encode_frame;
23use kevy_resp::ArgvView;
24#[cfg(test)]
25use kevy_resp::Argv;
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Frame {
30 pub offset: u64,
32 pub bytes: Vec<u8>,
34}
35
36#[derive(Debug, PartialEq, Eq)]
39pub enum FromOffset {
40 TooOld,
43 Future,
47}
48
49pub struct ReplicationSource {
51 next_offset: u64,
52 bytes_in_buf: usize,
53 max_bytes: usize,
54 buf: std::collections::VecDeque<Frame>,
55}
56
57impl ReplicationSource {
58 pub fn new(max_bytes: usize) -> Self {
63 assert!(max_bytes > 0, "ReplicationSource max_bytes must be > 0");
64 Self {
65 next_offset: 0,
66 bytes_in_buf: 0,
67 max_bytes,
68 buf: std::collections::VecDeque::new(),
69 }
70 }
71
72 pub fn next_offset(&self) -> u64 {
75 self.next_offset
76 }
77
78 pub fn oldest_offset(&self) -> Option<u64> {
80 self.buf.front().map(|f| f.offset)
81 }
82
83 pub fn newest_offset(&self) -> Option<u64> {
85 self.buf.back().map(|f| f.offset)
86 }
87
88 pub fn buffered_bytes(&self) -> usize {
90 self.bytes_in_buf
91 }
92
93 pub fn len(&self) -> usize {
95 self.buf.len()
96 }
97
98 pub fn is_empty(&self) -> bool {
100 self.buf.is_empty()
101 }
102
103 pub fn push_mutation<A: ArgvView + ?Sized>(&mut self, argv: &A) -> u64 {
113 let offset = self.next_offset;
114 let bytes = encode_frame(offset, argv);
115 let frame_len = bytes.len();
116
117 while self.bytes_in_buf + frame_len > self.max_bytes && !self.buf.is_empty() {
120 let dropped = self.buf.pop_front().expect("non-empty checked");
121 self.bytes_in_buf -= dropped.bytes.len();
122 }
123
124 self.bytes_in_buf += frame_len;
125 self.buf.push_back(Frame { offset, bytes });
126 self.next_offset = self
127 .next_offset
128 .checked_add(1)
129 .expect("replication offset wrap — i64::MAX guard tripped");
130 offset
131 }
132
133 pub fn drop_up_to(&mut self, watermark: u64) {
143 while let Some(front) = self.buf.front() {
144 if front.offset >= watermark {
145 break;
146 }
147 let dropped = self.buf.pop_front().expect("front-of-loop");
148 self.bytes_in_buf -= dropped.bytes.len();
149 }
150 }
151
152 pub fn frames_from(&self, from: u64) -> Result<FramesIter<'_>, FromOffset> {
162 if from > self.next_offset {
163 return Err(FromOffset::Future);
164 }
165 if from == self.next_offset {
167 return Ok(FramesIter {
168 buf: &self.buf,
169 cursor: self.buf.len(),
170 });
171 }
172 match self.oldest_offset() {
179 Some(oldest) if from < oldest => return Err(FromOffset::TooOld),
180 None => return Err(FromOffset::TooOld),
181 _ => {}
182 }
183 let start = self.buf.iter().position(|f| f.offset >= from);
186 Ok(FramesIter {
187 buf: &self.buf,
188 cursor: start.unwrap_or(self.buf.len()),
189 })
190 }
191}
192
193pub struct FramesIter<'a> {
195 buf: &'a std::collections::VecDeque<Frame>,
196 cursor: usize,
197}
198
199impl<'a> Iterator for FramesIter<'a> {
200 type Item = &'a Frame;
201 fn next(&mut self) -> Option<&'a Frame> {
202 let item = self.buf.get(self.cursor)?;
203 self.cursor += 1;
204 Some(item)
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::wire::decode_frame;
212
213 fn argv(args: &[&[u8]]) -> Argv {
214 let mut a = Argv::default();
215 for arg in args {
216 a.push(arg);
217 }
218 a
219 }
220
221 #[test]
222 fn fresh_source_is_empty() {
223 let s = ReplicationSource::new(1024);
224 assert!(s.is_empty());
225 assert_eq!(s.len(), 0);
226 assert_eq!(s.next_offset(), 0);
227 assert_eq!(s.oldest_offset(), None);
228 assert_eq!(s.newest_offset(), None);
229 assert_eq!(s.buffered_bytes(), 0);
230 }
231
232 #[test]
233 fn push_assigns_monotonic_offsets() {
234 let mut s = ReplicationSource::new(64 * 1024);
235 let o0 = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
236 let o1 = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
237 let o2 = s.push_mutation(&argv(&[b"DEL", b"a"]));
238 assert_eq!((o0, o1, o2), (0, 1, 2));
239 assert_eq!(s.oldest_offset(), Some(0));
240 assert_eq!(s.newest_offset(), Some(2));
241 assert_eq!(s.next_offset(), 3);
242 assert_eq!(s.len(), 3);
243 }
244
245 #[test]
246 fn pushed_frames_decode_back_to_the_pushed_argv() {
247 let mut s = ReplicationSource::new(1024);
248 let a = argv(&[b"HSET", b"h", b"f", b"v"]);
249 let off = s.push_mutation(&a);
250 let frame = s.buf.front().expect("one frame");
251 assert_eq!(frame.offset, off);
252 let (decoded_off, decoded_argv, used) = decode_frame(&frame.bytes).expect("decode");
253 assert_eq!(decoded_off, off);
254 assert_eq!(decoded_argv, a);
255 assert_eq!(used, frame.bytes.len());
256 }
257
258 #[test]
259 fn eviction_drops_oldest_when_budget_exceeded() {
260 let mut s = ReplicationSource::new(80);
263 let _ = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
264 let _ = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
265 assert_eq!(s.oldest_offset(), Some(0));
266 let _ = s.push_mutation(&argv(&[b"SET", b"c", b"3"]));
267 assert_eq!(s.oldest_offset(), Some(1));
268 assert_eq!(s.newest_offset(), Some(2));
269 assert!(s.buffered_bytes() <= 80);
270 assert_eq!(s.next_offset(), 3);
272 }
273
274 #[test]
275 fn oversized_single_frame_is_retained_against_budget() {
276 let mut s = ReplicationSource::new(8);
280 let off = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
281 assert_eq!(s.len(), 1);
282 assert_eq!(s.oldest_offset(), Some(off));
283 assert!(s.buffered_bytes() > 8); let off2 = s.push_mutation(&argv(&[b"DEL", b"k"]));
286 assert_eq!(s.len(), 1);
287 assert_eq!(s.oldest_offset(), Some(off2));
288 }
289
290 #[test]
291 fn frames_from_at_exact_offset_returns_that_frame_first() {
292 let mut s = ReplicationSource::new(1024);
293 for i in 0..5 {
294 let _ = s.push_mutation(&argv(&[b"SET", b"k", format!("{i}").as_bytes()]));
295 }
296 let mut it = s.frames_from(2).unwrap();
297 let f = it.next().expect("frame");
298 assert_eq!(f.offset, 2);
299 let remaining: Vec<u64> = it.map(|f| f.offset).collect();
300 assert_eq!(remaining, vec![3, 4]);
301 }
302
303 #[test]
304 fn frames_from_at_next_offset_is_empty_caught_up() {
305 let mut s = ReplicationSource::new(1024);
306 let _ = s.push_mutation(&argv(&[b"PING"]));
307 let _ = s.push_mutation(&argv(&[b"PING"]));
308 let it = s.frames_from(s.next_offset()).unwrap();
309 assert_eq!(it.count(), 0);
310 }
311
312 #[test]
313 fn frames_from_too_old_after_eviction() {
314 let mut s = ReplicationSource::new(80);
316 for _ in 0..5 {
317 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
318 }
319 assert!(s.oldest_offset().unwrap() > 0);
321 assert!(matches!(s.frames_from(0), Err(FromOffset::TooOld)));
322 }
323
324 #[test]
325 fn frames_from_future_offset_rejected() {
326 let mut s = ReplicationSource::new(1024);
327 let _ = s.push_mutation(&argv(&[b"PING"]));
328 assert!(matches!(s.frames_from(2), Err(FromOffset::Future)));
330 }
331
332 #[test]
333 fn frames_from_empty_source_at_zero_is_caught_up_not_too_old() {
334 let s = ReplicationSource::new(1024);
338 assert_eq!(s.frames_from(0).unwrap().count(), 0);
339 assert!(matches!(s.frames_from(1), Err(FromOffset::Future)));
341 }
342
343 #[test]
344 fn push_mutation_accepts_argv_borrowed_from_dispatcher_hot_path() {
345 let resp = b"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
352 let (borrowed, consumed) = kevy_resp::parse_command_borrowed(resp)
353 .expect("parse ok")
354 .expect("complete frame");
355 assert_eq!(consumed, resp.len());
356
357 let mut s = ReplicationSource::new(1024);
358 let off = s.push_mutation(&borrowed);
359 assert_eq!(off, 0);
360
361 let frame = s.buf.front().expect("one frame");
362 let (decoded_off, decoded_argv, _) =
363 crate::wire::decode_frame(&frame.bytes).expect("decode");
364 assert_eq!(decoded_off, 0);
365 assert_eq!(decoded_argv, argv(&[b"SET", b"foo", b"bar"]));
366 }
367
368 #[test]
369 fn buffered_bytes_tracks_actual_frame_total() {
370 let mut s = ReplicationSource::new(1024);
371 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
372 let _ = s.push_mutation(&argv(&[b"DEL", b"k"]));
373 let actual: usize = s.buf.iter().map(|f| f.bytes.len()).sum();
374 assert_eq!(s.buffered_bytes(), actual);
375 }
376
377 #[test]
378 fn drop_up_to_evicts_below_watermark() {
379 let mut s = ReplicationSource::new(64 * 1024);
381 for i in 0..5 {
382 let v = format!("v{i}");
383 let _ = s.push_mutation(&argv(&[b"SET", b"k", v.as_bytes()]));
384 }
385 assert_eq!(s.len(), 5);
386 let bytes_before = s.buffered_bytes();
387 s.drop_up_to(3);
389 assert_eq!(s.len(), 2);
390 assert_eq!(s.oldest_offset(), Some(3));
391 assert_eq!(s.newest_offset(), Some(4));
392 assert!(s.buffered_bytes() < bytes_before);
394 let kept: Vec<_> = s.frames_from(3).unwrap().collect();
396 assert_eq!(kept.len(), 2);
397 }
398
399 #[test]
400 fn drop_up_to_below_oldest_is_noop() {
401 let mut s = ReplicationSource::new(64 * 1024);
402 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
403 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
404 assert_eq!(s.oldest_offset(), Some(0));
405 s.drop_up_to(0); assert_eq!(s.len(), 2);
407 }
408
409 #[test]
410 fn drop_up_to_at_or_past_newest_drops_everything() {
411 let mut s = ReplicationSource::new(64 * 1024);
412 for _ in 0..3 {
413 let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
414 }
415 s.drop_up_to(99);
416 assert!(s.is_empty());
417 assert_eq!(s.buffered_bytes(), 0);
418 }
419}