use crate::wire::encode_frame;
use kevy_resp::ArgvView;
#[cfg(test)]
use kevy_resp::Argv;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Frame {
pub offset: u64,
pub bytes: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq)]
pub enum FromOffset {
TooOld,
Future,
}
pub struct ReplicationSource {
next_offset: u64,
bytes_in_buf: usize,
max_bytes: usize,
buf: std::collections::VecDeque<Frame>,
}
impl ReplicationSource {
pub fn new(max_bytes: usize) -> Self {
assert!(max_bytes > 0, "ReplicationSource max_bytes must be > 0");
Self {
next_offset: 0,
bytes_in_buf: 0,
max_bytes,
buf: std::collections::VecDeque::new(),
}
}
pub fn next_offset(&self) -> u64 {
self.next_offset
}
pub fn oldest_offset(&self) -> Option<u64> {
self.buf.front().map(|f| f.offset)
}
pub fn newest_offset(&self) -> Option<u64> {
self.buf.back().map(|f| f.offset)
}
pub fn buffered_bytes(&self) -> usize {
self.bytes_in_buf
}
pub fn len(&self) -> usize {
self.buf.len()
}
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
pub fn push_mutation<A: ArgvView + ?Sized>(&mut self, argv: &A) -> u64 {
let offset = self.next_offset;
let bytes = encode_frame(offset, argv);
let frame_len = bytes.len();
while self.bytes_in_buf + frame_len > self.max_bytes && !self.buf.is_empty() {
let dropped = self.buf.pop_front().expect("non-empty checked");
self.bytes_in_buf -= dropped.bytes.len();
}
self.bytes_in_buf += frame_len;
self.buf.push_back(Frame { offset, bytes });
self.next_offset = self
.next_offset
.checked_add(1)
.expect("replication offset wrap — i64::MAX guard tripped");
offset
}
pub fn drop_up_to(&mut self, watermark: u64) {
while let Some(front) = self.buf.front() {
if front.offset >= watermark {
break;
}
let dropped = self.buf.pop_front().expect("front-of-loop");
self.bytes_in_buf -= dropped.bytes.len();
}
}
pub fn frames_from(&self, from: u64) -> Result<FramesIter<'_>, FromOffset> {
if from > self.next_offset {
return Err(FromOffset::Future);
}
if from == self.next_offset {
return Ok(FramesIter {
buf: &self.buf,
cursor: self.buf.len(),
});
}
match self.oldest_offset() {
Some(oldest) if from < oldest => return Err(FromOffset::TooOld),
None => return Err(FromOffset::TooOld),
_ => {}
}
let start = self.buf.iter().position(|f| f.offset >= from);
Ok(FramesIter {
buf: &self.buf,
cursor: start.unwrap_or(self.buf.len()),
})
}
}
pub struct FramesIter<'a> {
buf: &'a std::collections::VecDeque<Frame>,
cursor: usize,
}
impl<'a> Iterator for FramesIter<'a> {
type Item = &'a Frame;
fn next(&mut self) -> Option<&'a Frame> {
let item = self.buf.get(self.cursor)?;
self.cursor += 1;
Some(item)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::wire::decode_frame;
fn argv(args: &[&[u8]]) -> Argv {
let mut a = Argv::default();
for arg in args {
a.push(arg);
}
a
}
#[test]
fn fresh_source_is_empty() {
let s = ReplicationSource::new(1024);
assert!(s.is_empty());
assert_eq!(s.len(), 0);
assert_eq!(s.next_offset(), 0);
assert_eq!(s.oldest_offset(), None);
assert_eq!(s.newest_offset(), None);
assert_eq!(s.buffered_bytes(), 0);
}
#[test]
fn push_assigns_monotonic_offsets() {
let mut s = ReplicationSource::new(64 * 1024);
let o0 = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
let o1 = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
let o2 = s.push_mutation(&argv(&[b"DEL", b"a"]));
assert_eq!((o0, o1, o2), (0, 1, 2));
assert_eq!(s.oldest_offset(), Some(0));
assert_eq!(s.newest_offset(), Some(2));
assert_eq!(s.next_offset(), 3);
assert_eq!(s.len(), 3);
}
#[test]
fn pushed_frames_decode_back_to_the_pushed_argv() {
let mut s = ReplicationSource::new(1024);
let a = argv(&[b"HSET", b"h", b"f", b"v"]);
let off = s.push_mutation(&a);
let frame = s.buf.front().expect("one frame");
assert_eq!(frame.offset, off);
let (decoded_off, decoded_argv, used) = decode_frame(&frame.bytes).expect("decode");
assert_eq!(decoded_off, off);
assert_eq!(decoded_argv, a);
assert_eq!(used, frame.bytes.len());
}
#[test]
fn eviction_drops_oldest_when_budget_exceeded() {
let mut s = ReplicationSource::new(80);
let _ = s.push_mutation(&argv(&[b"SET", b"a", b"1"]));
let _ = s.push_mutation(&argv(&[b"SET", b"b", b"2"]));
assert_eq!(s.oldest_offset(), Some(0));
let _ = s.push_mutation(&argv(&[b"SET", b"c", b"3"]));
assert_eq!(s.oldest_offset(), Some(1));
assert_eq!(s.newest_offset(), Some(2));
assert!(s.buffered_bytes() <= 80);
assert_eq!(s.next_offset(), 3);
}
#[test]
fn oversized_single_frame_is_retained_against_budget() {
let mut s = ReplicationSource::new(8);
let off = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
assert_eq!(s.len(), 1);
assert_eq!(s.oldest_offset(), Some(off));
assert!(s.buffered_bytes() > 8); let off2 = s.push_mutation(&argv(&[b"DEL", b"k"]));
assert_eq!(s.len(), 1);
assert_eq!(s.oldest_offset(), Some(off2));
}
#[test]
fn frames_from_at_exact_offset_returns_that_frame_first() {
let mut s = ReplicationSource::new(1024);
for i in 0..5 {
let _ = s.push_mutation(&argv(&[b"SET", b"k", format!("{i}").as_bytes()]));
}
let mut it = s.frames_from(2).unwrap();
let f = it.next().expect("frame");
assert_eq!(f.offset, 2);
let remaining: Vec<u64> = it.map(|f| f.offset).collect();
assert_eq!(remaining, vec![3, 4]);
}
#[test]
fn frames_from_at_next_offset_is_empty_caught_up() {
let mut s = ReplicationSource::new(1024);
let _ = s.push_mutation(&argv(&[b"PING"]));
let _ = s.push_mutation(&argv(&[b"PING"]));
let it = s.frames_from(s.next_offset()).unwrap();
assert_eq!(it.count(), 0);
}
#[test]
fn frames_from_too_old_after_eviction() {
let mut s = ReplicationSource::new(80);
for _ in 0..5 {
let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
}
assert!(s.oldest_offset().unwrap() > 0);
assert!(matches!(s.frames_from(0), Err(FromOffset::TooOld)));
}
#[test]
fn frames_from_future_offset_rejected() {
let mut s = ReplicationSource::new(1024);
let _ = s.push_mutation(&argv(&[b"PING"]));
assert!(matches!(s.frames_from(2), Err(FromOffset::Future)));
}
#[test]
fn frames_from_empty_source_at_zero_is_caught_up_not_too_old() {
let s = ReplicationSource::new(1024);
assert_eq!(s.frames_from(0).unwrap().count(), 0);
assert!(matches!(s.frames_from(1), Err(FromOffset::Future)));
}
#[test]
fn push_mutation_accepts_argv_borrowed_from_dispatcher_hot_path() {
let resp = b"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
let (borrowed, consumed) = kevy_resp::parse_command_borrowed(resp)
.expect("parse ok")
.expect("complete frame");
assert_eq!(consumed, resp.len());
let mut s = ReplicationSource::new(1024);
let off = s.push_mutation(&borrowed);
assert_eq!(off, 0);
let frame = s.buf.front().expect("one frame");
let (decoded_off, decoded_argv, _) =
crate::wire::decode_frame(&frame.bytes).expect("decode");
assert_eq!(decoded_off, 0);
assert_eq!(decoded_argv, argv(&[b"SET", b"foo", b"bar"]));
}
#[test]
fn buffered_bytes_tracks_actual_frame_total() {
let mut s = ReplicationSource::new(1024);
let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
let _ = s.push_mutation(&argv(&[b"DEL", b"k"]));
let actual: usize = s.buf.iter().map(|f| f.bytes.len()).sum();
assert_eq!(s.buffered_bytes(), actual);
}
#[test]
fn drop_up_to_evicts_below_watermark() {
let mut s = ReplicationSource::new(64 * 1024);
for i in 0..5 {
let v = format!("v{i}");
let _ = s.push_mutation(&argv(&[b"SET", b"k", v.as_bytes()]));
}
assert_eq!(s.len(), 5);
let bytes_before = s.buffered_bytes();
s.drop_up_to(3);
assert_eq!(s.len(), 2);
assert_eq!(s.oldest_offset(), Some(3));
assert_eq!(s.newest_offset(), Some(4));
assert!(s.buffered_bytes() < bytes_before);
let kept: Vec<_> = s.frames_from(3).unwrap().collect();
assert_eq!(kept.len(), 2);
}
#[test]
fn drop_up_to_below_oldest_is_noop() {
let mut s = ReplicationSource::new(64 * 1024);
let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
assert_eq!(s.oldest_offset(), Some(0));
s.drop_up_to(0); assert_eq!(s.len(), 2);
}
#[test]
fn drop_up_to_at_or_past_newest_drops_everything() {
let mut s = ReplicationSource::new(64 * 1024);
for _ in 0..3 {
let _ = s.push_mutation(&argv(&[b"SET", b"k", b"v"]));
}
s.drop_up_to(99);
assert!(s.is_empty());
assert_eq!(s.buffered_bytes(), 0);
}
}