use bytes::Bytes;
use super::file::RedexFile;
use super::replication::{ChannelId, SyncEvent, SyncNackError, SyncRequest, SyncResponse};
pub const CHUNK_MAX_HARD_CEILING_BYTES: u32 = 64 * 1024 * 1024;
pub const CHUNK_MAX_BACKGROUND_SOFT_CAP_BYTES: u32 = 4 * 1024 * 1024;
#[derive(Debug)]
pub enum SyncRequestOutcome {
Response(SyncResponse),
Nack {
error_code: SyncNackError,
leader_first_retained_seq: u64,
detail: String,
},
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum ApplyError {
#[error("channel mismatch: response carried {got:?}, expected {expected:?}")]
ChannelMismatch {
got: ChannelId,
expected: ChannelId,
},
#[error("chunk is not seq-monotonic at event index {index}")]
NonMonotonic {
index: usize,
},
#[error("first_seq mismatch: response declared {declared}, events[0]={observed}")]
FirstSeqMismatch {
declared: u64,
observed: u64,
},
#[error("chunk first_seq {first_seq} below local next_seq {local_next}")]
StaleChunk {
first_seq: u64,
local_next: u64,
},
#[error("chunk first_seq {first_seq} leaves a gap above local next_seq {local_next}")]
GapBeforeChunk {
first_seq: u64,
local_next: u64,
divergence_suspected: bool,
},
#[error("append failed: {0}")]
AppendFailed(String),
}
pub fn handle_sync_request(
file: &RedexFile,
request: &SyncRequest,
expected_channel: ChannelId,
) -> SyncRequestOutcome {
let leader_first_retained_seq = file.lowest_retained_seq().unwrap_or(0);
if request.channel_id != expected_channel {
return SyncRequestOutcome::Nack {
error_code: SyncNackError::ChannelClosed,
leader_first_retained_seq,
detail: format!(
"channel mismatch: request {:?} vs expected {:?}",
request.channel_id, expected_channel,
),
};
}
let local_next = file.next_seq();
if request.since_seq >= local_next {
return SyncRequestOutcome::Response(SyncResponse {
channel_id: expected_channel,
first_seq: request.since_seq,
leader_first_retained_seq,
request_id: request.request_id,
events: Vec::new(),
});
}
let mut effective_budget = if request.chunk_max == 0 {
CHUNK_MAX_HARD_CEILING_BYTES
} else {
request.chunk_max.min(CHUNK_MAX_HARD_CEILING_BYTES)
};
if matches!(request.class, super::bandwidth::BandwidthClass::Background) {
effective_budget = effective_budget.min(CHUNK_MAX_BACKGROUND_SOFT_CAP_BYTES);
}
let events = file.read_range(request.since_seq, local_next);
if events.is_empty() {
return SyncRequestOutcome::Nack {
error_code: SyncNackError::BadRange,
leader_first_retained_seq,
detail: format!("since_seq {} below first retained event", request.since_seq,),
};
}
let first_seq = events
.first()
.map(|e| e.entry.seq)
.unwrap_or(request.since_seq);
let mut acc: u64 = 0;
let mut out: Vec<SyncEvent> = Vec::new();
for ev in events {
let cost = 8u64 + 4 + ev.payload.len() as u64;
if out.is_empty() && cost > CHUNK_MAX_HARD_CEILING_BYTES as u64 {
return SyncRequestOutcome::Nack {
error_code: SyncNackError::BadRange,
leader_first_retained_seq,
detail: format!(
"event at seq {} exceeds hard ceiling ({} bytes > {})",
ev.entry.seq, cost, CHUNK_MAX_HARD_CEILING_BYTES,
),
};
}
if !out.is_empty() && acc.saturating_add(cost) > effective_budget as u64 {
break;
}
acc = acc.saturating_add(cost);
out.push(SyncEvent {
event_seq: ev.entry.seq,
payload: ev.payload.to_vec(),
});
}
SyncRequestOutcome::Response(SyncResponse {
channel_id: expected_channel,
first_seq,
leader_first_retained_seq,
request_id: request.request_id,
events: out,
})
}
pub fn apply_sync_response(
file: &RedexFile,
response: &SyncResponse,
expected_channel: ChannelId,
) -> Result<u64, ApplyError> {
if response.channel_id != expected_channel {
return Err(ApplyError::ChannelMismatch {
got: response.channel_id,
expected: expected_channel,
});
}
if response.events.is_empty() {
let local_next = file.next_seq();
if response.first_seq < local_next {
return Err(ApplyError::StaleChunk {
first_seq: response.first_seq,
local_next,
});
}
return Ok(local_next);
}
let first = &response.events[0];
if first.event_seq != response.first_seq {
return Err(ApplyError::FirstSeqMismatch {
declared: response.first_seq,
observed: first.event_seq,
});
}
let mut prev = first.event_seq;
for (i, ev) in response.events.iter().enumerate().skip(1) {
let expected = match prev.checked_add(1) {
Some(n) => n,
None => return Err(ApplyError::NonMonotonic { index: i }),
};
if ev.event_seq != expected {
return Err(ApplyError::NonMonotonic { index: i });
}
prev = ev.event_seq;
}
let local_next = file.next_seq();
if response.first_seq < local_next {
return Err(ApplyError::StaleChunk {
first_seq: response.first_seq,
local_next,
});
}
if response.first_seq > local_next {
let divergence_suspected = local_next > response.leader_first_retained_seq
&& local_next > 0
&& response.leader_first_retained_seq < response.first_seq;
return Err(ApplyError::GapBeforeChunk {
first_seq: response.first_seq,
local_next,
divergence_suspected,
});
}
let payloads: Vec<Bytes> = response
.events
.iter()
.map(|e| Bytes::from(e.payload.clone()))
.collect();
let appended = payloads.len() as u64;
file.append_batch(&payloads)
.map_err(|e| ApplyError::AppendFailed(format!("{e:?}")))?;
#[cfg(feature = "redex-disk")]
{
file.sync()
.map_err(|e| ApplyError::AppendFailed(format!("durable-sync: {e:?}")))?;
}
let new_tail = local_next
.checked_add(appended)
.unwrap_or_else(|| file.next_seq());
Ok(new_tail)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::redex::config::RedexFileConfig;
use crate::adapter::net::redex::manager::Redex;
fn channel_id_for(name: &str) -> ChannelId {
let cn = ChannelName::new(name).unwrap();
ChannelId::from_name(&cn)
}
fn build_file(name: &str) -> RedexFile {
let r = Redex::new();
let cn = ChannelName::new(name).unwrap();
r.open_file(&cn, RedexFileConfig::default()).unwrap()
}
fn append_n(file: &RedexFile, n: usize, prefix: &str) {
for i in 0..n {
let payload = format!("{prefix}-{i}");
file.append(payload.as_bytes()).unwrap();
}
}
#[test]
fn empty_file_returns_empty_chunk() {
let f = build_file("redex/empty");
let cid = channel_id_for("redex/empty");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.channel_id, cid);
assert_eq!(resp.first_seq, 0);
assert!(resp.events.is_empty());
}
#[test]
fn caught_up_replica_gets_empty_chunk() {
let f = build_file("redex/caught_up");
append_n(&f, 5, "evt");
let cid = channel_id_for("redex/caught_up");
let req = SyncRequest {
channel_id: cid,
since_seq: f.next_seq(),
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert!(resp.events.is_empty());
assert_eq!(resp.first_seq, f.next_seq());
}
#[test]
fn full_range_assembled_into_chunk() {
let f = build_file("redex/full_range");
append_n(&f, 5, "evt");
let cid = channel_id_for("redex/full_range");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.events.len(), 5);
assert_eq!(resp.first_seq, 0);
assert_eq!(resp.events[0].event_seq, 0);
assert_eq!(resp.events[4].event_seq, 4);
assert_eq!(resp.events[0].payload, b"evt-0");
}
#[test]
fn channel_mismatch_returns_nack() {
let f = build_file("redex/channel_mismatch");
append_n(&f, 1, "x");
let expected = channel_id_for("redex/channel_mismatch");
let wrong = channel_id_for("redex/different_channel");
let req = SyncRequest {
channel_id: wrong,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Nack { error_code, .. } = handle_sync_request(&f, &req, expected)
else {
panic!("expected Nack");
};
assert_eq!(error_code, SyncNackError::ChannelClosed);
}
#[test]
fn chunk_max_zero_uses_hard_ceiling() {
let f = build_file("redex/chunk_zero");
append_n(&f, 3, "evt");
let cid = channel_id_for("redex/chunk_zero");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 0,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.events.len(), 3);
}
#[test]
fn chunk_max_byte_budget_truncates() {
let f = build_file("redex/chunk_truncate");
for _ in 0..10 {
let payload = b"sixteenbytepayl";
f.append(payload).unwrap();
}
let cid = channel_id_for("redex/chunk_truncate");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 60,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(
resp.events.len(),
2,
"expected 2 events under the 60-byte budget; got {} events",
resp.events.len(),
);
}
#[test]
fn chunk_max_always_admits_first_event_even_if_oversize() {
let f = build_file("redex/chunk_first");
let big = vec![0xAB; 200];
f.append(&big).unwrap();
f.append(b"second").unwrap();
let cid = channel_id_for("redex/chunk_first");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 50, request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.events.len(), 1);
assert_eq!(resp.events[0].event_seq, 0);
assert_eq!(resp.events[0].payload, big);
}
#[test]
fn oversize_first_event_above_hard_ceiling_nacks_badrange() {
let f = build_file("redex/oversize_first");
f.append(b"normal").unwrap();
let cid = channel_id_for("redex/oversize_first");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 100,
request_id: 0,
class: Default::default(),
};
match handle_sync_request(&f, &req, cid) {
SyncRequestOutcome::Response(resp) => {
assert_eq!(resp.events.len(), 1);
}
SyncRequestOutcome::Nack { .. } => panic!("normal payload must not nack"),
}
}
#[test]
fn background_class_chunk_max_capped_at_soft_cap() {
use super::super::bandwidth::BandwidthClass;
let f = build_file("redex/bg_cap");
for _ in 0..2000 {
f.append(b"sixteenbytepayl").unwrap();
}
let cid = channel_id_for("redex/bg_cap");
let fg = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 16 * 1024 * 1024,
request_id: 0,
class: BandwidthClass::Foreground,
};
let SyncRequestOutcome::Response(fg_resp) = handle_sync_request(&f, &fg, cid) else {
panic!("expected Response");
};
let bg = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 16 * 1024 * 1024,
request_id: 0,
class: BandwidthClass::Background,
};
let SyncRequestOutcome::Response(bg_resp) = handle_sync_request(&f, &bg, cid) else {
panic!("expected Response");
};
assert_eq!(fg_resp.events.len(), bg_resp.events.len());
}
#[test]
fn since_seq_beyond_tail_returns_empty() {
let f = build_file("redex/beyond");
append_n(&f, 3, "evt");
let cid = channel_id_for("redex/beyond");
let req = SyncRequest {
channel_id: cid,
since_seq: 100, chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert!(resp.events.is_empty());
assert_eq!(resp.first_seq, 100);
}
#[test]
fn apply_returned_tail_matches_file_next_seq_across_sizes() {
for chunk_size in [1usize, 3, 17] {
let dst = build_file(&format!("redex/tail_match_{chunk_size}"));
append_n(&dst, 4, "preload");
let cid = channel_id_for(&format!("redex/tail_match_{chunk_size}"));
let events: Vec<SyncEvent> = (0..chunk_size)
.map(|i| SyncEvent {
event_seq: 4 + i as u64,
payload: format!("e-{i}").into_bytes(),
})
.collect();
let response = SyncResponse {
channel_id: cid,
first_seq: 4,
leader_first_retained_seq: 0,
events,
request_id: 0,
};
let returned_tail = apply_sync_response(&dst, &response, cid).expect("apply");
assert_eq!(
returned_tail,
dst.next_seq(),
"returned tail must equal file.next_seq() for chunk_size={chunk_size}",
);
assert_eq!(
returned_tail,
4 + chunk_size as u64,
"returned tail must equal local_next + chunk.len()",
);
}
}
#[test]
fn applies_chunk_advances_tail() {
let dst = build_file("redex/dst");
let cid = channel_id_for("redex/dst");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 0,
payload: b"first".to_vec(),
},
SyncEvent {
event_seq: 1,
payload: b"second".to_vec(),
},
SyncEvent {
event_seq: 2,
payload: b"third".to_vec(),
},
],
request_id: 0,
};
let new_tail = apply_sync_response(&dst, &response, cid).expect("apply");
assert_eq!(new_tail, 3);
assert_eq!(dst.next_seq(), 3);
}
#[test]
fn empty_chunk_is_noop() {
let dst = build_file("redex/empty_chunk");
append_n(&dst, 2, "x");
let cid = channel_id_for("redex/empty_chunk");
let response = SyncResponse {
channel_id: cid,
first_seq: 100,
leader_first_retained_seq: 0,
events: vec![],
request_id: 0,
};
let new_tail = apply_sync_response(&dst, &response, cid).expect("apply");
assert_eq!(new_tail, 2);
}
#[test]
fn channel_mismatch_rejected() {
let dst = build_file("redex/replica");
let local_cid = channel_id_for("redex/replica");
let foreign_cid = channel_id_for("redex/foreign");
let response = SyncResponse {
channel_id: foreign_cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 0,
payload: b"x".to_vec(),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, local_cid).expect_err("mismatch");
assert!(matches!(err, ApplyError::ChannelMismatch { .. }));
}
#[test]
fn first_seq_mismatch_rejected() {
let dst = build_file("redex/first_mismatch");
let cid = channel_id_for("redex/first_mismatch");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 5, payload: b"x".to_vec(),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("mismatch");
assert!(matches!(err, ApplyError::FirstSeqMismatch { .. }));
}
#[test]
fn non_monotonic_chunk_rejected() {
let dst = build_file("redex/non_mono");
let cid = channel_id_for("redex/non_mono");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 0,
payload: b"a".to_vec(),
},
SyncEvent {
event_seq: 1,
payload: b"b".to_vec(),
},
SyncEvent {
event_seq: 3, payload: b"c".to_vec(),
},
],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(err, ApplyError::NonMonotonic { index: 2 }));
}
#[test]
fn non_monotonic_wrap_at_u64_max_does_not_panic() {
let dst = build_file("redex/wrap");
let cid = channel_id_for("redex/wrap");
let response = SyncResponse {
channel_id: cid,
first_seq: u64::MAX,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: u64::MAX,
payload: b"last".to_vec(),
},
SyncEvent {
event_seq: 0,
payload: b"wraparound".to_vec(),
},
],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(
matches!(err, ApplyError::NonMonotonic { index: 1 }),
"u64::MAX wrap must surface NonMonotonic, got {err:?}"
);
}
#[test]
fn duplicate_seq_rejected_as_non_monotonic() {
let dst = build_file("redex/dup");
let cid = channel_id_for("redex/dup");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 0,
payload: b"a".to_vec(),
},
SyncEvent {
event_seq: 0, payload: b"a-dup".to_vec(),
},
],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(err, ApplyError::NonMonotonic { index: 1 }));
}
#[test]
fn stale_chunk_rejected() {
let dst = build_file("redex/stale");
append_n(&dst, 5, "preload");
let cid = channel_id_for("redex/stale");
let response = SyncResponse {
channel_id: cid,
first_seq: 2,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 2,
payload: b"stale".to_vec(),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(
err,
ApplyError::StaleChunk {
first_seq: 2,
local_next: 5,
}
));
}
#[test]
fn gap_before_chunk_rejected() {
let dst = build_file("redex/gap");
append_n(&dst, 2, "x");
let cid = channel_id_for("redex/gap");
let response = SyncResponse {
channel_id: cid,
first_seq: 5,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 5,
payload: b"future".to_vec(),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
match err {
ApplyError::GapBeforeChunk {
first_seq,
local_next,
divergence_suspected,
} => {
assert_eq!(first_seq, 5);
assert_eq!(local_next, 2);
assert!(divergence_suspected);
}
other => panic!("expected GapBeforeChunk, got {other:?}"),
}
}
#[test]
fn gap_before_chunk_legitimate_retention_trim_not_divergence() {
let dst = build_file("redex/legit_trim");
append_n(&dst, 2, "x");
let cid = channel_id_for("redex/legit_trim");
let response = SyncResponse {
channel_id: cid,
first_seq: 5,
leader_first_retained_seq: 5,
events: vec![SyncEvent {
event_seq: 5,
payload: b"future".to_vec(),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must gap");
match err {
ApplyError::GapBeforeChunk {
divergence_suspected,
..
} => assert!(
!divergence_suspected,
"legitimate retention trim must not flag divergence"
),
other => panic!("expected GapBeforeChunk, got {other:?}"),
}
}
#[test]
fn empty_chunk_with_stale_first_seq_rejected() {
let dst = build_file("redex/empty_stale");
append_n(&dst, 5, "preload");
let cid = channel_id_for("redex/empty_stale");
let response = SyncResponse {
channel_id: cid,
first_seq: 2,
leader_first_retained_seq: 0,
events: vec![],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(
err,
ApplyError::StaleChunk {
first_seq: 2,
local_next: 5,
}
));
}
#[test]
fn gap_before_chunk_empty_replica_not_divergence() {
let dst = build_file("redex/fresh");
let cid = channel_id_for("redex/fresh");
let response = SyncResponse {
channel_id: cid,
first_seq: 5,
leader_first_retained_seq: 3,
events: vec![SyncEvent {
event_seq: 5,
payload: b"future".to_vec(),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must gap");
match err {
ApplyError::GapBeforeChunk {
divergence_suspected,
..
} => assert!(
!divergence_suspected,
"empty replica catching up must not flag divergence"
),
other => panic!("expected GapBeforeChunk, got {other:?}"),
}
}
#[test]
fn leader_to_replica_round_trip() {
let leader = build_file("redex/leader");
let replica = build_file("redex/leader"); for i in 0..5 {
let payload = format!("evt-{i}");
leader.append(payload.as_bytes()).unwrap();
}
let cid = channel_id_for("redex/leader");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&leader, &req, cid) else {
panic!("expected Response");
};
let new_tail = apply_sync_response(&replica, &resp, cid).expect("apply");
assert_eq!(new_tail, leader.next_seq());
for i in 0..5 {
let ev = replica.read_range(i, i + 1).remove(0);
assert_eq!(
std::str::from_utf8(&ev.payload).unwrap(),
format!("evt-{i}"),
);
}
}
#[test]
fn chunked_catch_up_drains_in_two_rounds() {
let leader = build_file("redex/two_rounds");
let replica = build_file("redex/two_rounds");
for i in 0..4 {
let payload = format!("16-byte-evt-{i:02}"); leader.append(payload.as_bytes()).unwrap();
}
let cid = channel_id_for("redex/two_rounds");
let req1 = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 60,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(r1) = handle_sync_request(&leader, &req1, cid) else {
panic!();
};
assert_eq!(r1.events.len(), 2);
apply_sync_response(&replica, &r1, cid).unwrap();
assert_eq!(replica.next_seq(), 2);
let req2 = SyncRequest {
channel_id: cid,
since_seq: replica.next_seq(),
chunk_max: 60,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(r2) = handle_sync_request(&leader, &req2, cid) else {
panic!();
};
assert_eq!(r2.events.len(), 2);
apply_sync_response(&replica, &r2, cid).unwrap();
assert_eq!(replica.next_seq(), 4);
assert_eq!(replica.next_seq(), leader.next_seq());
}
#[test]
fn replica_skip_ahead_then_apply_succeeds() {
let leader = build_file("redex/skip");
let replica = build_file("redex/skip");
for _ in 0..2 {
replica.append(b"old").unwrap();
}
for _ in 0..12 {
leader.append(b"x").unwrap();
}
let cid = ChannelId::from_name(&ChannelName::new("redex/skip").unwrap());
let response = SyncResponse {
channel_id: cid,
first_seq: 10,
leader_first_retained_seq: 10,
events: vec![
SyncEvent {
event_seq: 10,
payload: vec![b'A'],
},
SyncEvent {
event_seq: 11,
payload: vec![b'B'],
},
],
request_id: 0,
};
let err = apply_sync_response(&replica, &response, cid).expect_err("must gap");
let first_seq = match err {
ApplyError::GapBeforeChunk { first_seq, .. } => first_seq,
other => panic!("expected GapBeforeChunk, got {other:?}"),
};
assert_eq!(first_seq, 10);
replica.skip_to(first_seq).unwrap();
assert_eq!(replica.len(), 0);
assert_eq!(replica.next_seq(), 10);
let new_tail = apply_sync_response(&replica, &response, cid).unwrap();
assert_eq!(new_tail, 12);
assert_eq!(replica.next_seq(), 12);
let events = replica.read_range(10, 12);
assert_eq!(events.len(), 2);
assert_eq!(events[0].entry.seq, 10);
assert_eq!(events[0].payload.as_ref(), b"A");
assert_eq!(events[1].entry.seq, 11);
assert_eq!(events[1].payload.as_ref(), b"B");
}
}