use bytes::Bytes;
use super::error::RedexError;
use super::file::RedexFile;
use super::replication::{ChannelId, SyncEvent, SyncNackError, SyncRequest, SyncResponse};
pub const CHUNK_MAX_HARD_CEILING_BYTES: u32 = 64 * 1024 * 1024;
pub const CHUNK_MAX_BACKGROUND_SOFT_CAP_BYTES: u32 = 4 * 1024 * 1024;
const SYNC_EVENT_WIRE_OVERHEAD_BYTES: u64 = 8 + 4;
#[derive(Debug)]
pub enum SyncRequestOutcome {
Response(SyncResponse),
Nack {
error_code: SyncNackError,
leader_first_retained_seq: u64,
detail: String,
},
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum ApplyError {
#[error("channel mismatch: response carried {got:?}, expected {expected:?}")]
ChannelMismatch {
got: ChannelId,
expected: ChannelId,
},
#[error("chunk is not seq-monotonic at event index {index}")]
NonMonotonic {
index: usize,
},
#[error("first_seq mismatch: response declared {declared}, events[0]={observed}")]
FirstSeqMismatch {
declared: u64,
observed: u64,
},
#[error("chunk first_seq {first_seq} below local next_seq {local_next}")]
StaleChunk {
first_seq: u64,
local_next: u64,
},
#[error("chunk first_seq {first_seq} leaves a gap above local next_seq {local_next}")]
GapBeforeChunk {
first_seq: u64,
local_next: u64,
divergence_suspected: bool,
},
#[error("append failed: {0}")]
AppendFailed(String),
}
pub fn handle_sync_request(
file: &RedexFile,
request: &SyncRequest,
expected_channel: ChannelId,
) -> SyncRequestOutcome {
let leader_first_retained_seq = file.lowest_retained_seq().unwrap_or(0);
if request.channel_id != expected_channel {
return SyncRequestOutcome::Nack {
error_code: SyncNackError::ChannelClosed,
leader_first_retained_seq,
detail: format!(
"channel mismatch: request {:?} vs expected {:?}",
request.channel_id, expected_channel,
),
};
}
let local_next = file.next_seq();
if request.since_seq >= local_next {
return SyncRequestOutcome::Response(SyncResponse {
channel_id: expected_channel,
first_seq: request.since_seq,
leader_first_retained_seq,
request_id: request.request_id,
events: Vec::new(),
});
}
let mut effective_budget = if request.chunk_max == 0 {
CHUNK_MAX_HARD_CEILING_BYTES
} else {
request.chunk_max.min(CHUNK_MAX_HARD_CEILING_BYTES)
};
if matches!(request.class, super::bandwidth::BandwidthClass::Background) {
effective_budget = effective_budget.min(CHUNK_MAX_BACKGROUND_SOFT_CAP_BYTES);
}
let events = file.read_range_limited(
request.since_seq,
local_next,
effective_budget as u64,
SYNC_EVENT_WIRE_OVERHEAD_BYTES,
);
if events.is_empty() {
return SyncRequestOutcome::Nack {
error_code: SyncNackError::BadRange,
leader_first_retained_seq,
detail: format!("since_seq {} below first retained event", request.since_seq,),
};
}
let first_seq = events
.first()
.map(|e| e.entry.seq)
.unwrap_or(request.since_seq);
let mut acc: u64 = 0;
let mut out: Vec<SyncEvent> = Vec::new();
for ev in events {
let cost = SYNC_EVENT_WIRE_OVERHEAD_BYTES + ev.payload.len() as u64;
if out.is_empty() && cost > CHUNK_MAX_HARD_CEILING_BYTES as u64 {
return SyncRequestOutcome::Nack {
error_code: SyncNackError::BadRange,
leader_first_retained_seq,
detail: format!(
"event at seq {} exceeds hard ceiling ({} bytes > {})",
ev.entry.seq, cost, CHUNK_MAX_HARD_CEILING_BYTES,
),
};
}
if !out.is_empty() && acc.saturating_add(cost) > effective_budget as u64 {
break;
}
acc = acc.saturating_add(cost);
out.push(SyncEvent {
event_seq: ev.entry.seq,
payload: ev.payload.clone(),
});
}
SyncRequestOutcome::Response(SyncResponse {
channel_id: expected_channel,
first_seq,
leader_first_retained_seq,
request_id: request.request_id,
events: out,
})
}
pub async fn apply_sync_response_async(
file: RedexFile,
response: SyncResponse,
expected_channel: ChannelId,
) -> Result<u64, ApplyError> {
tokio::task::spawn_blocking(move || apply_sync_response(&file, &response, expected_channel))
.await
.unwrap_or_else(|join_err| {
Err(ApplyError::AppendFailed(format!(
"apply_sync_response_async: spawn_blocking panicked: {join_err}"
)))
})
}
pub fn apply_sync_response(
file: &RedexFile,
response: &SyncResponse,
expected_channel: ChannelId,
) -> Result<u64, ApplyError> {
if response.channel_id != expected_channel {
return Err(ApplyError::ChannelMismatch {
got: response.channel_id,
expected: expected_channel,
});
}
if response.events.is_empty() {
let local_next = file.next_seq();
if response.first_seq < local_next {
return Err(ApplyError::StaleChunk {
first_seq: response.first_seq,
local_next,
});
}
return Ok(local_next);
}
let first = &response.events[0];
if first.event_seq != response.first_seq {
return Err(ApplyError::FirstSeqMismatch {
declared: response.first_seq,
observed: first.event_seq,
});
}
let mut prev = first.event_seq;
for (i, ev) in response.events.iter().enumerate().skip(1) {
let expected = match prev.checked_add(1) {
Some(n) => n,
None => return Err(ApplyError::NonMonotonic { index: i }),
};
if ev.event_seq != expected {
return Err(ApplyError::NonMonotonic { index: i });
}
prev = ev.event_seq;
}
let local_next = file.next_seq();
if response.first_seq < local_next {
return Err(ApplyError::StaleChunk {
first_seq: response.first_seq,
local_next,
});
}
if response.first_seq > local_next {
let divergence_suspected = local_next > response.leader_first_retained_seq
&& local_next > 0
&& response.leader_first_retained_seq < response.first_seq;
return Err(ApplyError::GapBeforeChunk {
first_seq: response.first_seq,
local_next,
divergence_suspected,
});
}
let payloads: Vec<Bytes> = response.events.iter().map(|e| e.payload.clone()).collect();
let appended = payloads.len() as u64;
file.append_batch_if_next_seq(response.first_seq, &payloads)
.map_err(|e| match e {
RedexError::SeqMismatch { actual, .. } => ApplyError::GapBeforeChunk {
first_seq: response.first_seq,
local_next: actual,
divergence_suspected: true,
},
other => ApplyError::AppendFailed(format!("{other:?}")),
})?;
#[cfg(feature = "redex-disk")]
{
file.sync()
.map_err(|e| ApplyError::AppendFailed(format!("durable-sync: {e:?}")))?;
}
let new_tail = local_next
.checked_add(appended)
.unwrap_or_else(|| file.next_seq());
Ok(new_tail)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::redex::config::RedexFileConfig;
use crate::adapter::net::redex::manager::Redex;
fn channel_id_for(name: &str) -> ChannelId {
let cn = ChannelName::new(name).unwrap();
ChannelId::from_name(&cn)
}
fn build_file(name: &str) -> RedexFile {
let r = Redex::new();
let cn = ChannelName::new(name).unwrap();
r.open_file(&cn, RedexFileConfig::default()).unwrap()
}
fn append_n(file: &RedexFile, n: usize, prefix: &str) {
for i in 0..n {
let payload = format!("{prefix}-{i}");
file.append(payload.as_bytes()).unwrap();
}
}
#[test]
fn empty_file_returns_empty_chunk() {
let f = build_file("redex/empty");
let cid = channel_id_for("redex/empty");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.channel_id, cid);
assert_eq!(resp.first_seq, 0);
assert!(resp.events.is_empty());
}
#[test]
fn caught_up_replica_gets_empty_chunk() {
let f = build_file("redex/caught_up");
append_n(&f, 5, "evt");
let cid = channel_id_for("redex/caught_up");
let req = SyncRequest {
channel_id: cid,
since_seq: f.next_seq(),
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert!(resp.events.is_empty());
assert_eq!(resp.first_seq, f.next_seq());
}
#[test]
fn full_range_assembled_into_chunk() {
let f = build_file("redex/full_range");
append_n(&f, 5, "evt");
let cid = channel_id_for("redex/full_range");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.events.len(), 5);
assert_eq!(resp.first_seq, 0);
assert_eq!(resp.events[0].event_seq, 0);
assert_eq!(resp.events[4].event_seq, 4);
assert_eq!(resp.events[0].payload.as_ref(), b"evt-0");
}
#[test]
fn channel_mismatch_returns_nack() {
let f = build_file("redex/channel_mismatch");
append_n(&f, 1, "x");
let expected = channel_id_for("redex/channel_mismatch");
let wrong = channel_id_for("redex/different_channel");
let req = SyncRequest {
channel_id: wrong,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Nack { error_code, .. } = handle_sync_request(&f, &req, expected)
else {
panic!("expected Nack");
};
assert_eq!(error_code, SyncNackError::ChannelClosed);
}
#[test]
fn chunk_max_zero_uses_hard_ceiling() {
let f = build_file("redex/chunk_zero");
append_n(&f, 3, "evt");
let cid = channel_id_for("redex/chunk_zero");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 0,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.events.len(), 3);
}
#[test]
fn chunk_max_byte_budget_truncates() {
let f = build_file("redex/chunk_truncate");
for _ in 0..10 {
let payload = b"sixteenbytepayl";
f.append(payload).unwrap();
}
let cid = channel_id_for("redex/chunk_truncate");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 60,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(
resp.events.len(),
2,
"expected 2 events under the 60-byte budget; got {} events",
resp.events.len(),
);
}
#[test]
fn chunk_max_always_admits_first_event_even_if_oversize() {
let f = build_file("redex/chunk_first");
let big = vec![0xAB; 200];
f.append(&big).unwrap();
f.append(b"second").unwrap();
let cid = channel_id_for("redex/chunk_first");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 50, request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.events.len(), 1);
assert_eq!(resp.events[0].event_seq, 0);
assert_eq!(resp.events[0].payload, big);
}
#[test]
fn oversize_first_event_above_hard_ceiling_nacks_badrange() {
let f = build_file("redex/oversize_first");
f.append(b"normal").unwrap();
let cid = channel_id_for("redex/oversize_first");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 100,
request_id: 0,
class: Default::default(),
};
match handle_sync_request(&f, &req, cid) {
SyncRequestOutcome::Response(resp) => {
assert_eq!(resp.events.len(), 1);
}
SyncRequestOutcome::Nack { .. } => panic!("normal payload must not nack"),
}
}
#[test]
fn background_class_chunk_max_capped_at_soft_cap() {
use super::super::bandwidth::BandwidthClass;
let f = build_file("redex/bg_cap");
for _ in 0..2000 {
f.append(b"sixteenbytepayl").unwrap();
}
let cid = channel_id_for("redex/bg_cap");
let fg = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 16 * 1024 * 1024,
request_id: 0,
class: BandwidthClass::Foreground,
};
let SyncRequestOutcome::Response(fg_resp) = handle_sync_request(&f, &fg, cid) else {
panic!("expected Response");
};
let bg = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 16 * 1024 * 1024,
request_id: 0,
class: BandwidthClass::Background,
};
let SyncRequestOutcome::Response(bg_resp) = handle_sync_request(&f, &bg, cid) else {
panic!("expected Response");
};
assert_eq!(fg_resp.events.len(), bg_resp.events.len());
}
#[test]
fn since_seq_beyond_tail_returns_empty() {
let f = build_file("redex/beyond");
append_n(&f, 3, "evt");
let cid = channel_id_for("redex/beyond");
let req = SyncRequest {
channel_id: cid,
since_seq: 100, chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert!(resp.events.is_empty());
assert_eq!(resp.first_seq, 100);
}
#[test]
fn apply_returned_tail_matches_file_next_seq_across_sizes() {
for chunk_size in [1usize, 3, 17] {
let dst = build_file(&format!("redex/tail_match_{chunk_size}"));
append_n(&dst, 4, "preload");
let cid = channel_id_for(&format!("redex/tail_match_{chunk_size}"));
let events: Vec<SyncEvent> = (0..chunk_size)
.map(|i| SyncEvent {
event_seq: 4 + i as u64,
payload: bytes::Bytes::from(format!("e-{i}").into_bytes()),
})
.collect();
let response = SyncResponse {
channel_id: cid,
first_seq: 4,
leader_first_retained_seq: 0,
events,
request_id: 0,
};
let returned_tail = apply_sync_response(&dst, &response, cid).expect("apply");
assert_eq!(
returned_tail,
dst.next_seq(),
"returned tail must equal file.next_seq() for chunk_size={chunk_size}",
);
assert_eq!(
returned_tail,
4 + chunk_size as u64,
"returned tail must equal local_next + chunk.len()",
);
}
}
#[test]
fn applies_chunk_advances_tail() {
let dst = build_file("redex/dst");
let cid = channel_id_for("redex/dst");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 0,
payload: bytes::Bytes::from_static(b"first"),
},
SyncEvent {
event_seq: 1,
payload: bytes::Bytes::from_static(b"second"),
},
SyncEvent {
event_seq: 2,
payload: bytes::Bytes::from_static(b"third"),
},
],
request_id: 0,
};
let new_tail = apply_sync_response(&dst, &response, cid).expect("apply");
assert_eq!(new_tail, 3);
assert_eq!(dst.next_seq(), 3);
}
#[test]
fn empty_chunk_is_noop() {
let dst = build_file("redex/empty_chunk");
append_n(&dst, 2, "x");
let cid = channel_id_for("redex/empty_chunk");
let response = SyncResponse {
channel_id: cid,
first_seq: 100,
leader_first_retained_seq: 0,
events: vec![],
request_id: 0,
};
let new_tail = apply_sync_response(&dst, &response, cid).expect("apply");
assert_eq!(new_tail, 2);
}
#[test]
fn channel_mismatch_rejected() {
let dst = build_file("redex/replica");
let local_cid = channel_id_for("redex/replica");
let foreign_cid = channel_id_for("redex/foreign");
let response = SyncResponse {
channel_id: foreign_cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 0,
payload: bytes::Bytes::from_static(b"x"),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, local_cid).expect_err("mismatch");
assert!(matches!(err, ApplyError::ChannelMismatch { .. }));
}
#[test]
fn first_seq_mismatch_rejected() {
let dst = build_file("redex/first_mismatch");
let cid = channel_id_for("redex/first_mismatch");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 5, payload: bytes::Bytes::from_static(b"x"),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("mismatch");
assert!(matches!(err, ApplyError::FirstSeqMismatch { .. }));
}
#[test]
fn non_monotonic_chunk_rejected() {
let dst = build_file("redex/non_mono");
let cid = channel_id_for("redex/non_mono");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 0,
payload: bytes::Bytes::from_static(b"a"),
},
SyncEvent {
event_seq: 1,
payload: bytes::Bytes::from_static(b"b"),
},
SyncEvent {
event_seq: 3, payload: bytes::Bytes::from_static(b"c"),
},
],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(err, ApplyError::NonMonotonic { index: 2 }));
}
#[test]
fn non_monotonic_wrap_at_u64_max_does_not_panic() {
let dst = build_file("redex/wrap");
let cid = channel_id_for("redex/wrap");
let response = SyncResponse {
channel_id: cid,
first_seq: u64::MAX,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: u64::MAX,
payload: bytes::Bytes::from_static(b"last"),
},
SyncEvent {
event_seq: 0,
payload: bytes::Bytes::from_static(b"wraparound"),
},
],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(
matches!(err, ApplyError::NonMonotonic { index: 1 }),
"u64::MAX wrap must surface NonMonotonic, got {err:?}"
);
}
#[test]
fn duplicate_seq_rejected_as_non_monotonic() {
let dst = build_file("redex/dup");
let cid = channel_id_for("redex/dup");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 0,
payload: bytes::Bytes::from_static(b"a"),
},
SyncEvent {
event_seq: 0, payload: bytes::Bytes::from_static(b"a-dup"),
},
],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(err, ApplyError::NonMonotonic { index: 1 }));
}
#[test]
fn stale_chunk_rejected() {
let dst = build_file("redex/stale");
append_n(&dst, 5, "preload");
let cid = channel_id_for("redex/stale");
let response = SyncResponse {
channel_id: cid,
first_seq: 2,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 2,
payload: bytes::Bytes::from_static(b"stale"),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(
err,
ApplyError::StaleChunk {
first_seq: 2,
local_next: 5,
}
));
}
#[test]
fn gap_before_chunk_rejected() {
let dst = build_file("redex/gap");
append_n(&dst, 2, "x");
let cid = channel_id_for("redex/gap");
let response = SyncResponse {
channel_id: cid,
first_seq: 5,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 5,
payload: bytes::Bytes::from_static(b"future"),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
match err {
ApplyError::GapBeforeChunk {
first_seq,
local_next,
divergence_suspected,
} => {
assert_eq!(first_seq, 5);
assert_eq!(local_next, 2);
assert!(divergence_suspected);
}
other => panic!("expected GapBeforeChunk, got {other:?}"),
}
}
#[test]
fn gap_before_chunk_legitimate_retention_trim_not_divergence() {
let dst = build_file("redex/legit_trim");
append_n(&dst, 2, "x");
let cid = channel_id_for("redex/legit_trim");
let response = SyncResponse {
channel_id: cid,
first_seq: 5,
leader_first_retained_seq: 5,
events: vec![SyncEvent {
event_seq: 5,
payload: bytes::Bytes::from_static(b"future"),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must gap");
match err {
ApplyError::GapBeforeChunk {
divergence_suspected,
..
} => assert!(
!divergence_suspected,
"legitimate retention trim must not flag divergence"
),
other => panic!("expected GapBeforeChunk, got {other:?}"),
}
}
#[test]
fn empty_chunk_with_stale_first_seq_rejected() {
let dst = build_file("redex/empty_stale");
append_n(&dst, 5, "preload");
let cid = channel_id_for("redex/empty_stale");
let response = SyncResponse {
channel_id: cid,
first_seq: 2,
leader_first_retained_seq: 0,
events: vec![],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must reject");
assert!(matches!(
err,
ApplyError::StaleChunk {
first_seq: 2,
local_next: 5,
}
));
}
#[test]
fn gap_before_chunk_empty_replica_not_divergence() {
let dst = build_file("redex/fresh");
let cid = channel_id_for("redex/fresh");
let response = SyncResponse {
channel_id: cid,
first_seq: 5,
leader_first_retained_seq: 3,
events: vec![SyncEvent {
event_seq: 5,
payload: bytes::Bytes::from_static(b"future"),
}],
request_id: 0,
};
let err = apply_sync_response(&dst, &response, cid).expect_err("must gap");
match err {
ApplyError::GapBeforeChunk {
divergence_suspected,
..
} => assert!(
!divergence_suspected,
"empty replica catching up must not flag divergence"
),
other => panic!("expected GapBeforeChunk, got {other:?}"),
}
}
#[test]
fn leader_to_replica_round_trip() {
let leader = build_file("redex/leader");
let replica = build_file("redex/leader"); for i in 0..5 {
let payload = format!("evt-{i}");
leader.append(payload.as_bytes()).unwrap();
}
let cid = channel_id_for("redex/leader");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 4096,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&leader, &req, cid) else {
panic!("expected Response");
};
let new_tail = apply_sync_response(&replica, &resp, cid).expect("apply");
assert_eq!(new_tail, leader.next_seq());
for i in 0..5 {
let ev = replica.read_range(i, i + 1).remove(0);
assert_eq!(
std::str::from_utf8(&ev.payload).unwrap(),
format!("evt-{i}"),
);
}
}
#[test]
fn chunked_catch_up_drains_in_two_rounds() {
let leader = build_file("redex/two_rounds");
let replica = build_file("redex/two_rounds");
for i in 0..4 {
let payload = format!("16-byte-evt-{i:02}"); leader.append(payload.as_bytes()).unwrap();
}
let cid = channel_id_for("redex/two_rounds");
let req1 = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 60,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(r1) = handle_sync_request(&leader, &req1, cid) else {
panic!();
};
assert_eq!(r1.events.len(), 2);
apply_sync_response(&replica, &r1, cid).unwrap();
assert_eq!(replica.next_seq(), 2);
let req2 = SyncRequest {
channel_id: cid,
since_seq: replica.next_seq(),
chunk_max: 60,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(r2) = handle_sync_request(&leader, &req2, cid) else {
panic!();
};
assert_eq!(r2.events.len(), 2);
apply_sync_response(&replica, &r2, cid).unwrap();
assert_eq!(replica.next_seq(), 4);
assert_eq!(replica.next_seq(), leader.next_seq());
}
#[test]
fn replica_skip_ahead_then_apply_succeeds() {
let leader = build_file("redex/skip");
let replica = build_file("redex/skip");
for _ in 0..2 {
replica.append(b"old").unwrap();
}
for _ in 0..12 {
leader.append(b"x").unwrap();
}
let cid = ChannelId::from_name(&ChannelName::new("redex/skip").unwrap());
let response = SyncResponse {
channel_id: cid,
first_seq: 10,
leader_first_retained_seq: 10,
events: vec![
SyncEvent {
event_seq: 10,
payload: bytes::Bytes::from_static(b"A"),
},
SyncEvent {
event_seq: 11,
payload: bytes::Bytes::from_static(b"B"),
},
],
request_id: 0,
};
let err = apply_sync_response(&replica, &response, cid).expect_err("must gap");
let first_seq = match err {
ApplyError::GapBeforeChunk { first_seq, .. } => first_seq,
other => panic!("expected GapBeforeChunk, got {other:?}"),
};
assert_eq!(first_seq, 10);
replica.skip_to(first_seq).unwrap();
assert_eq!(replica.len(), 0);
assert_eq!(replica.next_seq(), 10);
let new_tail = apply_sync_response(&replica, &response, cid).unwrap();
assert_eq!(new_tail, 12);
assert_eq!(replica.next_seq(), 12);
let events = replica.read_range(10, 12);
assert_eq!(events.len(), 2);
assert_eq!(events[0].entry.seq, 10);
assert_eq!(events[0].payload.as_ref(), b"A");
assert_eq!(events[1].entry.seq, 11);
assert_eq!(events[1].payload.as_ref(), b"B");
}
#[test]
fn oversized_first_event_under_hard_ceiling_still_ships_alone() {
let f = build_file("redex/oversize_progress");
f.append(&vec![b'L'; 256]).unwrap();
append_n(&f, 3, "small");
let cid = channel_id_for("redex/oversize_progress");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 16,
request_id: 7,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("oversize-but-under-ceiling first event must ship, not NACK");
};
assert_eq!(resp.first_seq, 0);
assert_eq!(
resp.events.len(),
1,
"exactly the oversize event ships; nothing rides along"
);
assert_eq!(resp.events[0].event_seq, 0);
assert_eq!(resp.events[0].payload.len(), 256);
let next_req = SyncRequest {
channel_id: cid,
since_seq: resp.first_seq + resp.events.len() as u64,
chunk_max: 4096,
request_id: 8,
class: Default::default(),
};
let SyncRequestOutcome::Response(next) = handle_sync_request(&f, &next_req, cid) else {
panic!("expected Response");
};
assert_eq!(next.first_seq, 1);
assert_eq!(next.events.len(), 3);
assert_eq!(next.events[0].event_seq, 1);
assert_eq!(next.events[0].payload.as_ref(), b"small-0");
}
#[test]
fn zero_length_payload_backlog_is_still_budget_bounded() {
let f = build_file("redex/zero_len_budget");
for _ in 0..100 {
f.append(b"").unwrap();
}
let cid = channel_id_for("redex/zero_len_budget");
let req = SyncRequest {
channel_id: cid,
since_seq: 0,
chunk_max: 50,
request_id: 0,
class: Default::default(),
};
let SyncRequestOutcome::Response(resp) = handle_sync_request(&f, &req, cid) else {
panic!("expected Response");
};
assert_eq!(resp.events.len(), 4);
assert_eq!(resp.first_seq, 0);
}
#[tokio::test]
async fn apply_sync_response_async_resolves_only_after_apply_is_visible() {
let replica = build_file("redex/async_apply");
let cid = channel_id_for("redex/async_apply");
let response = SyncResponse {
channel_id: cid,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 0,
payload: bytes::Bytes::from_static(b"a"),
},
SyncEvent {
event_seq: 1,
payload: bytes::Bytes::from_static(b"b"),
},
],
request_id: 0,
};
let new_tail = apply_sync_response_async(replica.clone(), response, cid)
.await
.unwrap();
assert_eq!(new_tail, 2);
assert_eq!(replica.next_seq(), 2);
let events = replica.read_range(0, 2);
assert_eq!(events.len(), 2);
assert_eq!(events[0].payload.as_ref(), b"a");
assert_eq!(events[1].payload.as_ref(), b"b");
}
#[tokio::test]
async fn apply_sync_response_async_propagates_apply_errors() {
let replica = build_file("redex/async_apply_err");
let expected = channel_id_for("redex/async_apply_err");
let wrong = channel_id_for("redex/async_apply_err_other");
let response = SyncResponse {
channel_id: wrong,
first_seq: 0,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 0,
payload: bytes::Bytes::from_static(b"x"),
}],
request_id: 0,
};
let err = apply_sync_response_async(replica.clone(), response, expected)
.await
.expect_err("channel mismatch must propagate");
assert!(matches!(err, ApplyError::ChannelMismatch { .. }));
assert_eq!(replica.next_seq(), 0, "nothing applied on error");
}
#[test]
fn apply_sync_response_happy_path_through_seq_guard() {
let dst = build_file("redex/guarded_apply");
append_n(&dst, 3, "pre"); let cid = channel_id_for("redex/guarded_apply");
let response = SyncResponse {
channel_id: cid,
first_seq: 3,
leader_first_retained_seq: 0,
events: vec![
SyncEvent {
event_seq: 3,
payload: bytes::Bytes::from_static(b"d"),
},
SyncEvent {
event_seq: 4,
payload: bytes::Bytes::from_static(b"e"),
},
],
request_id: 0,
};
let new_tail = apply_sync_response(&dst, &response, cid).expect("guarded apply");
assert_eq!(new_tail, 5);
assert_eq!(dst.next_seq(), 5);
assert_eq!(&dst.read_one(3).unwrap().payload[..], b"d");
assert_eq!(&dst.read_one(4).unwrap().payload[..], b"e");
}
}