use blake2::{
digest::{generic_array::typenum::U32, FixedOutput, KeyInit, Mac},
Blake2sMac,
};
use bytes::{Buf, BufMut};
use super::super::channel::ChannelName;
pub const SUBPROTOCOL_REDEX: u16 = 0x0E00;
pub const DISPATCH_SYNC_REQUEST: u8 = 0x20;
pub const DISPATCH_SYNC_RESPONSE: u8 = 0x21;
pub const DISPATCH_SYNC_HEARTBEAT: u8 = 0x22;
pub const DISPATCH_SYNC_NACK: u8 = 0x23;
pub const DISPATCH_REPLICA_SYNC_RESERVED_END: u8 = 0x30;
pub const SYNC_REQUEST_SIZE: usize = 3 + 32 + 8 + 4 + 8;
pub const SYNC_REQUEST_SIZE_V2_CLASS: usize = SYNC_REQUEST_SIZE + 1;
pub const SYNC_HEARTBEAT_SIZE: usize = 3 + 32 + 8 + 1 + 8;
const CHANNEL_ID_LABEL: &[u8] = b"redex-channel-id-v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChannelId([u8; 32]);
impl ChannelId {
pub fn from_name(name: &ChannelName) -> Self {
Self::from_str_internal(name.as_str())
}
#[expect(
clippy::expect_used,
reason = "Blake2sMac::new_from_slice rejects only keys longer than 32 bytes; CHANNEL_ID_LABEL is a short compile-time-constant label"
)]
fn from_str_internal(s: &str) -> Self {
let mut mac = <Blake2sMac<U32> as KeyInit>::new_from_slice(CHANNEL_ID_LABEL)
.expect("BLAKE2s accepts variable-length keys");
Mac::update(&mut mac, s.as_bytes());
let bytes = mac.finalize_fixed();
let mut out = [0u8; 32];
out.copy_from_slice(&bytes);
Self(out)
}
pub const fn from_bytes(bytes: [u8; 32]) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplicaRole {
Leader = 0,
Replica = 1,
Candidate = 2,
Idle = 3,
}
impl ReplicaRole {
fn from_wire(byte: u8) -> Option<Self> {
match byte {
0 => Some(Self::Leader),
1 => Some(Self::Replica),
2 => Some(Self::Candidate),
3 => Some(Self::Idle),
_ => None,
}
}
fn to_wire(self) -> u8 {
self as u8
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncNackError {
NotLeader = 1,
BadRange = 2,
Backpressure = 3,
ChannelClosed = 4,
}
impl SyncNackError {
fn from_wire(byte: u8) -> Option<Self> {
match byte {
1 => Some(Self::NotLeader),
2 => Some(Self::BadRange),
3 => Some(Self::Backpressure),
4 => Some(Self::ChannelClosed),
_ => None,
}
}
fn to_wire(self) -> u8 {
self as u8
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncEvent {
pub event_seq: u64,
pub payload: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncRequest {
pub channel_id: ChannelId,
pub since_seq: u64,
pub chunk_max: u32,
pub request_id: u64,
pub class: super::bandwidth::BandwidthClass,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncResponse {
pub channel_id: ChannelId,
pub first_seq: u64,
pub leader_first_retained_seq: u64,
pub request_id: u64,
pub events: Vec<SyncEvent>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncHeartbeat {
pub channel_id: ChannelId,
pub tail_seq: u64,
pub role: ReplicaRole,
pub wall_clock_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncNack {
pub channel_id: ChannelId,
pub since_seq: u64,
pub error_code: SyncNackError,
pub leader_first_retained_seq: u64,
pub request_id: u64,
pub detail: String,
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum WireError {
#[error("redex wire truncated: need {need} bytes, have {have}")]
Truncated {
need: usize,
have: usize,
},
#[error("redex wire subprotocol mismatch: got {got:#06x}, expected {SUBPROTOCOL_REDEX:#06x}")]
SubprotocolMismatch {
got: u16,
},
#[error("redex wire dispatch code {got:#04x} does not match expected {expected:#04x}")]
DispatchMismatch {
got: u8,
expected: u8,
},
#[error("redex wire role byte {0} is not a valid ReplicaRole (0..=3)")]
BadRole(u8),
#[error("redex wire error_code {0} is not a valid SyncNackError (1..=4)")]
BadErrorCode(u8),
#[error("redex wire NACK detail is not valid UTF-8")]
InvalidUtf8,
}
fn put_header(buf: &mut Vec<u8>, dispatch: u8) {
buf.put_u16_le(SUBPROTOCOL_REDEX);
buf.put_u8(dispatch);
}
fn check_header(data: &[u8], expected_dispatch: u8) -> Result<&[u8], WireError> {
if data.len() < 3 {
return Err(WireError::Truncated {
need: 3,
have: data.len(),
});
}
let mut cursor = &data[..3];
let subprotocol = cursor.get_u16_le();
let dispatch = cursor.get_u8();
if subprotocol != SUBPROTOCOL_REDEX {
return Err(WireError::SubprotocolMismatch { got: subprotocol });
}
if dispatch != expected_dispatch {
return Err(WireError::DispatchMismatch {
got: dispatch,
expected: expected_dispatch,
});
}
Ok(&data[3..])
}
fn get_channel_id(cursor: &mut &[u8]) -> ChannelId {
let mut id = [0u8; 32];
id.copy_from_slice(&cursor[..32]);
cursor.advance(32);
ChannelId::from_bytes(id)
}
impl SyncRequest {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(SYNC_REQUEST_SIZE_V2_CLASS);
put_header(&mut buf, DISPATCH_SYNC_REQUEST);
buf.put_slice(self.channel_id.as_bytes());
buf.put_u64_le(self.since_seq);
buf.put_u32_le(self.chunk_max);
buf.put_u64_le(self.request_id);
buf.put_u8(self.class.as_u8());
debug_assert_eq!(buf.len(), SYNC_REQUEST_SIZE_V2_CLASS);
buf
}
pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
let payload = check_header(data, DISPATCH_SYNC_REQUEST)?;
if payload.len() < SYNC_REQUEST_SIZE - 3 {
return Err(WireError::Truncated {
need: SYNC_REQUEST_SIZE,
have: data.len(),
});
}
let mut cursor = payload;
let channel_id = get_channel_id(&mut cursor);
let since_seq = cursor.get_u64_le();
let chunk_max = cursor.get_u32_le();
let request_id = cursor.get_u64_le();
let class = if cursor.has_remaining() {
super::bandwidth::BandwidthClass::from_wire_or_default(cursor.get_u8())
} else {
super::bandwidth::BandwidthClass::default()
};
Ok(Self {
channel_id,
since_seq,
chunk_max,
request_id,
class,
})
}
}
impl SyncResponse {
pub fn to_bytes(&self) -> Vec<u8> {
let events_size: usize = self.events.iter().map(|e| 8 + 4 + e.payload.len()).sum();
let mut buf = Vec::with_capacity(3 + 32 + 8 + 8 + 8 + 4 + events_size);
put_header(&mut buf, DISPATCH_SYNC_RESPONSE);
buf.put_slice(self.channel_id.as_bytes());
buf.put_u64_le(self.first_seq);
buf.put_u64_le(self.leader_first_retained_seq);
buf.put_u64_le(self.request_id);
debug_assert!(
self.events.len() <= u32::MAX as usize,
"events.len() {} exceeds u32::MAX",
self.events.len()
);
let event_count = u32::try_from(self.events.len()).unwrap_or(u32::MAX);
buf.put_u32_le(event_count);
for event in &self.events {
buf.put_u64_le(event.event_seq);
debug_assert!(event.payload.len() <= u32::MAX as usize);
let payload_len = u32::try_from(event.payload.len()).unwrap_or(u32::MAX);
buf.put_u32_le(payload_len);
buf.put_slice(&event.payload);
}
buf
}
pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
let payload = check_header(data, DISPATCH_SYNC_RESPONSE)?;
let prefix_needed = 32 + 8 + 8 + 8 + 4;
if payload.len() < prefix_needed {
return Err(WireError::Truncated {
need: 3 + prefix_needed,
have: data.len(),
});
}
let mut cursor = payload;
let channel_id = get_channel_id(&mut cursor);
let first_seq = cursor.get_u64_le();
let leader_first_retained_seq = cursor.get_u64_le();
let request_id = cursor.get_u64_le();
let event_count = cursor.get_u32_le() as usize;
let mut events = Vec::with_capacity(event_count.min(4096));
for _ in 0..event_count {
if cursor.remaining() < 8 + 4 {
let consumed = data.len() - cursor.remaining();
return Err(WireError::Truncated {
need: consumed + (8 + 4),
have: data.len(),
});
}
let event_seq = cursor.get_u64_le();
let payload_len = cursor.get_u32_le() as usize;
if cursor.remaining() < payload_len {
let consumed = data.len() - cursor.remaining();
let need = consumed
.checked_add(payload_len)
.ok_or(WireError::Truncated {
need: usize::MAX,
have: data.len(),
})?;
return Err(WireError::Truncated {
need,
have: data.len(),
});
}
let event_payload = cursor[..payload_len].to_vec();
cursor.advance(payload_len);
events.push(SyncEvent {
event_seq,
payload: event_payload,
});
}
Ok(Self {
channel_id,
first_seq,
leader_first_retained_seq,
request_id,
events,
})
}
}
impl SyncHeartbeat {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(SYNC_HEARTBEAT_SIZE);
put_header(&mut buf, DISPATCH_SYNC_HEARTBEAT);
buf.put_slice(self.channel_id.as_bytes());
buf.put_u64_le(self.tail_seq);
buf.put_u8(self.role.to_wire());
buf.put_u64_le(self.wall_clock_ms);
debug_assert_eq!(buf.len(), SYNC_HEARTBEAT_SIZE);
buf
}
pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
let payload = check_header(data, DISPATCH_SYNC_HEARTBEAT)?;
if payload.len() < SYNC_HEARTBEAT_SIZE - 3 {
return Err(WireError::Truncated {
need: SYNC_HEARTBEAT_SIZE,
have: data.len(),
});
}
let mut cursor = payload;
let channel_id = get_channel_id(&mut cursor);
let tail_seq = cursor.get_u64_le();
let role_byte = cursor.get_u8();
let role = ReplicaRole::from_wire(role_byte).ok_or(WireError::BadRole(role_byte))?;
let wall_clock_ms = cursor.get_u64_le();
Ok(Self {
channel_id,
tail_seq,
role,
wall_clock_ms,
})
}
}
pub const SYNC_NACK_DETAIL_MAX: usize = u16::MAX as usize;
impl SyncNack {
pub fn to_bytes(&self) -> Vec<u8> {
let detail_str = if self.detail.len() <= SYNC_NACK_DETAIL_MAX {
self.detail.as_str()
} else {
let mut cut = SYNC_NACK_DETAIL_MAX;
while cut > 0 && !self.detail.is_char_boundary(cut) {
cut -= 1;
}
&self.detail[..cut]
};
let detail_bytes = detail_str.as_bytes();
let detail_len = detail_bytes.len();
let mut buf = Vec::with_capacity(3 + 32 + 8 + 1 + 8 + 8 + 2 + detail_len);
put_header(&mut buf, DISPATCH_SYNC_NACK);
buf.put_slice(self.channel_id.as_bytes());
buf.put_u64_le(self.since_seq);
buf.put_u8(self.error_code.to_wire());
buf.put_u64_le(self.leader_first_retained_seq);
buf.put_u64_le(self.request_id);
buf.put_u16_le(detail_len as u16);
buf.put_slice(detail_bytes);
buf
}
pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
let payload = check_header(data, DISPATCH_SYNC_NACK)?;
let prefix_needed = 32 + 8 + 1 + 8 + 8 + 2;
if payload.len() < prefix_needed {
return Err(WireError::Truncated {
need: 3 + prefix_needed,
have: data.len(),
});
}
let mut cursor = payload;
let channel_id = get_channel_id(&mut cursor);
let since_seq = cursor.get_u64_le();
let code_byte = cursor.get_u8();
let error_code =
SyncNackError::from_wire(code_byte).ok_or(WireError::BadErrorCode(code_byte))?;
let leader_first_retained_seq = cursor.get_u64_le();
let request_id = cursor.get_u64_le();
let detail_len = cursor.get_u16_le() as usize;
if cursor.remaining() < detail_len {
let consumed = data.len() - cursor.remaining();
return Err(WireError::Truncated {
need: consumed + detail_len,
have: data.len(),
});
}
let detail_bytes = &cursor[..detail_len];
let detail = std::str::from_utf8(detail_bytes)
.map_err(|_| WireError::InvalidUtf8)?
.to_string();
Ok(Self {
channel_id,
since_seq,
error_code,
leader_first_retained_seq,
request_id,
detail,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_channel_id() -> ChannelId {
ChannelId::from_str_internal("net/redex/example")
}
#[test]
fn channel_id_is_deterministic() {
let a = ChannelId::from_str_internal("payments/settlements");
let b = ChannelId::from_str_internal("payments/settlements");
assert_eq!(a, b);
}
#[test]
fn channel_id_is_unique_per_name() {
let a = ChannelId::from_str_internal("payments/settlements");
let b = ChannelId::from_str_internal("payments/refunds");
assert_ne!(a, b);
}
#[test]
fn sync_request_round_trip() {
let original = SyncRequest {
channel_id: sample_channel_id(),
since_seq: 0xDEAD_BEEF_CAFE_BABE,
chunk_max: 1_048_576,
request_id: 0xAA55_AA55_AA55_AA55,
class: super::super::bandwidth::BandwidthClass::Background,
};
let bytes = original.to_bytes();
assert_eq!(bytes.len(), SYNC_REQUEST_SIZE_V2_CLASS);
let decoded = SyncRequest::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn sync_request_decodes_legacy_55_byte_frame_as_foreground() {
let original = SyncRequest {
channel_id: sample_channel_id(),
since_seq: 7,
chunk_max: 1024,
request_id: 0xBEEF,
class: super::super::bandwidth::BandwidthClass::Realtime,
};
let mut bytes = original.to_bytes();
bytes.pop(); assert_eq!(bytes.len(), SYNC_REQUEST_SIZE);
let decoded = SyncRequest::from_bytes(&bytes).expect("legacy decode");
assert_eq!(decoded.channel_id, original.channel_id);
assert_eq!(decoded.since_seq, original.since_seq);
assert_eq!(decoded.chunk_max, original.chunk_max);
assert_eq!(decoded.request_id, original.request_id);
assert_eq!(
decoded.class,
super::super::bandwidth::BandwidthClass::Foreground
);
}
#[test]
fn sync_request_byte_layout_pinned() {
let req = SyncRequest {
channel_id: ChannelId::from_bytes([0xAB; 32]),
since_seq: 0x0102_0304_0506_0708,
chunk_max: 0x1122_3344,
request_id: 0,
class: Default::default(),
};
let bytes = req.to_bytes();
assert_eq!(&bytes[..3], &[0x00, 0x0E, 0x20]);
assert_eq!(&bytes[3..35], &[0xAB; 32]);
assert_eq!(
&bytes[35..43],
&[0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01]
);
assert_eq!(&bytes[43..47], &[0x44, 0x33, 0x22, 0x11]);
}
#[test]
fn sync_request_rejects_wrong_dispatch() {
let mut bytes = SyncRequest {
channel_id: sample_channel_id(),
since_seq: 0,
chunk_max: 1,
request_id: 0,
class: Default::default(),
}
.to_bytes();
bytes[2] = DISPATCH_SYNC_RESPONSE; let err = SyncRequest::from_bytes(&bytes).expect_err("must reject");
assert!(matches!(err, WireError::DispatchMismatch { .. }));
}
#[test]
fn sync_request_rejects_wrong_subprotocol() {
let mut bytes = SyncRequest {
channel_id: sample_channel_id(),
since_seq: 0,
chunk_max: 1,
request_id: 0,
class: Default::default(),
}
.to_bytes();
bytes[0] = 0x00;
bytes[1] = 0x05; let err = SyncRequest::from_bytes(&bytes).expect_err("must reject");
assert!(matches!(
err,
WireError::SubprotocolMismatch { got: 0x0500 }
));
}
#[test]
fn sync_request_rejects_truncation() {
let bytes = SyncRequest {
channel_id: sample_channel_id(),
since_seq: 0,
chunk_max: 1,
request_id: 0,
class: Default::default(),
}
.to_bytes();
for cut in 0..SYNC_REQUEST_SIZE {
let err = SyncRequest::from_bytes(&bytes[..cut]).expect_err("must reject");
assert!(matches!(err, WireError::Truncated { .. }));
}
let _legacy =
SyncRequest::from_bytes(&bytes[..SYNC_REQUEST_SIZE]).expect("legacy length decodes");
}
#[test]
fn sync_response_round_trip_empty_chunk() {
let original = SyncResponse {
channel_id: sample_channel_id(),
first_seq: 42,
leader_first_retained_seq: 42,
events: vec![],
request_id: 7,
};
let bytes = original.to_bytes();
let decoded = SyncResponse::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn sync_response_round_trip_with_events() {
let original = SyncResponse {
channel_id: sample_channel_id(),
first_seq: 100,
leader_first_retained_seq: 50,
events: vec![
SyncEvent {
event_seq: 100,
payload: b"hello".to_vec(),
},
SyncEvent {
event_seq: 101,
payload: b"world".to_vec(),
},
SyncEvent {
event_seq: 102,
payload: vec![], },
],
request_id: 123,
};
let bytes = original.to_bytes();
let decoded = SyncResponse::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn sync_response_leader_first_retained_seq_byte_offset() {
let original = SyncResponse {
channel_id: sample_channel_id(),
first_seq: 0x0102_0304_0506_0708,
leader_first_retained_seq: 0x1112_1314_1516_1718,
events: vec![],
request_id: 0,
};
let bytes = original.to_bytes();
assert_eq!(
&bytes[43..51],
&0x1112_1314_1516_1718_u64.to_le_bytes(),
"leader_first_retained_seq must be at offset 43..51 in LE form"
);
}
#[test]
fn sync_response_rejects_truncated_event_record() {
let bytes = SyncResponse {
channel_id: sample_channel_id(),
first_seq: 1,
leader_first_retained_seq: 0,
events: vec![SyncEvent {
event_seq: 1,
payload: b"truncated".to_vec(),
}],
request_id: 0,
}
.to_bytes();
let err = SyncResponse::from_bytes(&bytes[..bytes.len() - 3]).expect_err("must reject");
assert!(matches!(err, WireError::Truncated { .. }));
}
#[test]
fn sync_heartbeat_round_trip_each_role() {
for role in [
ReplicaRole::Leader,
ReplicaRole::Replica,
ReplicaRole::Candidate,
ReplicaRole::Idle,
] {
let original = SyncHeartbeat {
channel_id: sample_channel_id(),
tail_seq: 0xCAFE,
role,
wall_clock_ms: 1_700_000_000_000,
};
let bytes = original.to_bytes();
assert_eq!(bytes.len(), SYNC_HEARTBEAT_SIZE);
let decoded = SyncHeartbeat::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, original);
}
}
#[test]
fn sync_heartbeat_rejects_unknown_role() {
let mut bytes = SyncHeartbeat {
channel_id: sample_channel_id(),
tail_seq: 0,
role: ReplicaRole::Leader,
wall_clock_ms: 0,
}
.to_bytes();
bytes[43] = 99;
let err = SyncHeartbeat::from_bytes(&bytes).expect_err("must reject");
assert!(matches!(err, WireError::BadRole(99)));
}
#[test]
fn sync_nack_round_trip_each_error() {
for error_code in [
SyncNackError::NotLeader,
SyncNackError::BadRange,
SyncNackError::Backpressure,
SyncNackError::ChannelClosed,
] {
let original = SyncNack {
channel_id: sample_channel_id(),
since_seq: 12345,
error_code,
leader_first_retained_seq: 9999,
detail: format!("test detail for {:?}", error_code),
request_id: 0,
};
let bytes = original.to_bytes();
let decoded = SyncNack::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, original);
}
}
#[test]
fn sync_nack_empty_detail_round_trips() {
let original = SyncNack {
channel_id: sample_channel_id(),
since_seq: 0,
error_code: SyncNackError::NotLeader,
leader_first_retained_seq: 0,
detail: String::new(),
request_id: 0,
};
let bytes = original.to_bytes();
let decoded = SyncNack::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn sync_nack_truncates_oversized_detail() {
let huge = "x".repeat(SYNC_NACK_DETAIL_MAX + 1000);
let original = SyncNack {
channel_id: sample_channel_id(),
since_seq: 0,
error_code: SyncNackError::Backpressure,
leader_first_retained_seq: 0,
detail: huge.clone(),
request_id: 0,
};
let bytes = original.to_bytes();
let decoded = SyncNack::from_bytes(&bytes).expect("decode");
assert_eq!(decoded.detail.len(), SYNC_NACK_DETAIL_MAX);
assert!(huge.starts_with(&decoded.detail));
}
#[test]
fn sync_nack_rejects_unknown_error_code() {
let mut bytes = SyncNack {
channel_id: sample_channel_id(),
since_seq: 0,
error_code: SyncNackError::NotLeader,
leader_first_retained_seq: 0,
detail: String::new(),
request_id: 0,
}
.to_bytes();
bytes[43] = 0;
let err = SyncNack::from_bytes(&bytes).expect_err("must reject");
assert!(matches!(err, WireError::BadErrorCode(0)));
}
#[test]
fn sync_nack_truncated_payload_reports_correct_need() {
let original = SyncNack {
channel_id: sample_channel_id(),
since_seq: 0x4242_4242_4242_4242,
error_code: SyncNackError::BadRange,
leader_first_retained_seq: 100,
detail: "hello".to_string(),
request_id: 0,
};
let full = original.to_bytes();
let truncated = &full[..full.len() - 1];
match SyncNack::from_bytes(truncated).expect_err("must error") {
WireError::Truncated { need, have } => {
assert_eq!(need, full.len());
assert_eq!(have, truncated.len());
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn sync_nack_truncates_oversized_detail_at_char_boundary() {
let pad_len = SYNC_NACK_DETAIL_MAX - 1;
let mut detail = "a".repeat(pad_len);
detail.push('é'); debug_assert!(detail.len() > SYNC_NACK_DETAIL_MAX);
let original = SyncNack {
channel_id: sample_channel_id(),
since_seq: 0,
error_code: SyncNackError::Backpressure,
leader_first_retained_seq: 0,
detail,
request_id: 0,
};
let bytes = original.to_bytes();
let decoded = SyncNack::from_bytes(&bytes).expect("decode after truncate");
assert_eq!(decoded.detail.len(), pad_len);
}
#[test]
fn sync_nack_rejects_invalid_utf8() {
let mut bytes = SyncNack {
channel_id: sample_channel_id(),
since_seq: 0,
error_code: SyncNackError::BadRange,
leader_first_retained_seq: 0,
detail: "ascii".to_string(),
request_id: 0,
}
.to_bytes();
let detail_start = 62;
let detail_len = bytes.len() - detail_start;
for i in 0..detail_len {
bytes[detail_start + i] = 0xC0; }
let err = SyncNack::from_bytes(&bytes).expect_err("must reject");
assert!(matches!(err, WireError::InvalidUtf8));
}
#[test]
fn dispatch_codes_pinned() {
assert_eq!(DISPATCH_SYNC_REQUEST, 0x20);
assert_eq!(DISPATCH_SYNC_RESPONSE, 0x21);
assert_eq!(DISPATCH_SYNC_HEARTBEAT, 0x22);
assert_eq!(DISPATCH_SYNC_NACK, 0x23);
assert_eq!(DISPATCH_REPLICA_SYNC_RESERVED_END, 0x30);
}
#[test]
fn subprotocol_id_pinned() {
assert_eq!(SUBPROTOCOL_REDEX, 0x0E00);
}
}