1use std::borrow::Cow;
2use std::ptr;
3
4use crate::config::ReplicationCompression;
5use bytes::Bytes as SharedBytes;
6
7use crate::storage::{MutationOp, MutationRecord, StoredEntry, hash_key, hash_key_tag_from_hash};
8use crate::{FastCacheError, Result};
9
10pub const FCRP_MAGIC: &[u8; 4] = b"FCRP";
11pub const FCRP_VERSION: u8 = 1;
12
13const HEADER_LEN: usize = 16;
14pub(crate) const FRAME_HEADER_LEN: usize = HEADER_LEN;
15const FLAG_COMPRESSED: u8 = 0x01;
16const EXPIRE_NONE: u64 = u64::MAX;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[repr(u8)]
23pub enum FrameKind {
24 Hello = 1,
25 SnapshotBegin = 2,
26 SnapshotChunk = 3,
27 SnapshotEnd = 4,
28 MutationBatch = 5,
29 Ack = 6,
30 Error = 7,
31}
32
33impl FrameKind {
34 fn from_u8(value: u8) -> Result<Self> {
35 match value {
36 1 => Ok(Self::Hello),
37 2 => Ok(Self::SnapshotBegin),
38 3 => Ok(Self::SnapshotChunk),
39 4 => Ok(Self::SnapshotEnd),
40 5 => Ok(Self::MutationBatch),
41 6 => Ok(Self::Ack),
42 7 => Ok(Self::Error),
43 other => Err(FastCacheError::Protocol(format!(
44 "unsupported FCRP frame kind: {other}"
45 ))),
46 }
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ReplicationCompressionMode {
52 None,
53 Zstd,
54}
55
56impl From<ReplicationCompression> for ReplicationCompressionMode {
57 fn from(value: ReplicationCompression) -> Self {
58 match value {
59 ReplicationCompression::None => Self::None,
60 ReplicationCompression::Zstd => Self::Zstd,
61 }
62 }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct ReplicationFrame {
67 pub kind: FrameKind,
68 pub compressed: bool,
69 pub payload: Vec<u8>,
70}
71
72#[derive(Debug)]
73pub struct ReplicationFramePayload<'a> {
74 pub kind: FrameKind,
75 pub compressed: bool,
76 pub payload: Cow<'a, [u8]>,
77}
78
79#[derive(Debug, Clone)]
80pub struct ReplicationFrameBytesPayload {
81 pub kind: FrameKind,
82 pub compressed: bool,
83 pub payload: SharedBytes,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct ShardWatermarks {
88 values: Vec<u64>,
89}
90
91impl ShardWatermarks {
92 pub fn new(shard_count: usize) -> Self {
93 Self {
94 values: vec![0; shard_count],
95 }
96 }
97
98 pub fn from_vec(values: Vec<u64>) -> Self {
99 Self { values }
100 }
101
102 pub fn as_slice(&self) -> &[u64] {
103 &self.values
104 }
105
106 pub fn into_vec(self) -> Vec<u64> {
107 self.values
108 }
109
110 pub fn get(&self, shard_id: usize) -> u64 {
111 self.values.get(shard_id).copied().unwrap_or(0)
112 }
113
114 pub fn observe(&mut self, shard_id: usize, sequence: u64) {
115 if shard_id >= self.values.len() {
116 self.values.resize(shard_id + 1, 0);
117 }
118 self.values[shard_id] = self.values[shard_id].max(sequence);
119 }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum ReplicationMutationOp {
124 Set,
125 Del,
126 Expire,
127}
128
129impl From<&MutationOp> for ReplicationMutationOp {
130 fn from(value: &MutationOp) -> Self {
131 match value {
132 MutationOp::Set => Self::Set,
133 MutationOp::Del => Self::Del,
134 MutationOp::Expire => Self::Expire,
135 }
136 }
137}
138
139impl ReplicationMutationOp {
140 fn to_byte(self) -> u8 {
141 match self {
142 Self::Set => 1,
143 Self::Del => 2,
144 Self::Expire => 3,
145 }
146 }
147
148 fn from_byte(value: u8) -> Result<Self> {
149 match value {
150 1 => Ok(Self::Set),
151 2 => Ok(Self::Del),
152 3 => Ok(Self::Expire),
153 other => Err(FastCacheError::Protocol(format!(
154 "unsupported FCRP mutation op: {other}"
155 ))),
156 }
157 }
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct ReplicationMutation {
162 pub shard_id: usize,
163 pub sequence: u64,
164 pub timestamp_ms: u64,
165 pub op: ReplicationMutationOp,
166 pub key_hash: u64,
167 pub key_tag: u64,
168 pub key: SharedBytes,
169 pub value: SharedBytes,
170 pub expire_at_ms: Option<u64>,
171}
172
173#[derive(Debug, Clone, Copy)]
174pub struct BorrowedReplicationMutation<'a> {
175 pub shard_id: usize,
176 pub sequence: u64,
177 pub timestamp_ms: u64,
178 pub op: ReplicationMutationOp,
179 pub key_hash: u64,
180 pub key_tag: u64,
181 pub key: &'a [u8],
182 pub value: &'a [u8],
183 pub expire_at_ms: Option<u64>,
184}
185
186#[derive(Debug, Clone)]
187pub struct FrameBackedReplicationMutation<'a> {
188 pub shard_id: usize,
189 pub sequence: u64,
190 pub op: ReplicationMutationOp,
191 pub key_hash: u64,
192 pub key: &'a [u8],
193 pub value: SharedBytes,
194 pub expire_at_ms: Option<u64>,
195}
196
197impl ReplicationMutation {
198 pub fn from_record(record: &MutationRecord) -> Self {
199 let key_hash = hash_key(record.key.as_ref());
200 Self::from_record_with_key_hash(record, key_hash)
201 }
202
203 pub fn from_record_with_key_hash(record: &MutationRecord, key_hash: u64) -> Self {
204 Self {
205 shard_id: record.shard_id,
206 sequence: record.sequence,
207 timestamp_ms: record.timestamp_ms,
208 op: ReplicationMutationOp::from(&record.op),
209 key_hash,
210 key_tag: hash_key_tag_from_hash(key_hash),
211 key: record.key.clone(),
212 value: record.value.clone(),
213 expire_at_ms: record.expire_at_ms,
214 }
215 }
216
217 pub fn estimated_uncompressed_len(&self) -> usize {
218 mutation_record_payload_len(self.key.len(), self.value.len())
219 }
220
221 pub(crate) fn as_borrowed(&self) -> BorrowedReplicationMutation<'_> {
222 BorrowedReplicationMutation {
223 shard_id: self.shard_id,
224 sequence: self.sequence,
225 timestamp_ms: self.timestamp_ms,
226 op: self.op,
227 key_hash: self.key_hash,
228 key_tag: self.key_tag,
229 key: self.key.as_ref(),
230 value: self.value.as_ref(),
231 expire_at_ms: self.expire_at_ms,
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
237pub struct ReplicationSnapshot {
238 pub entries: Vec<StoredEntry>,
239 pub watermarks: ShardWatermarks,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct ReplicationSnapshotChunk {
244 pub watermarks: ShardWatermarks,
245 pub chunk_index: u64,
246 pub is_last: bool,
247 pub entries: Vec<StoredEntry>,
248}
249
250pub fn encode_frame(
251 kind: FrameKind,
252 compression: ReplicationCompressionMode,
253 zstd_level: i32,
254 payload: &[u8],
255) -> Result<Vec<u8>> {
256 match compression {
257 ReplicationCompressionMode::None => {
258 let mut out = Vec::with_capacity(HEADER_LEN + payload.len());
259 write_header(
260 &mut out,
261 kind,
262 0,
263 payload.len() as u32,
264 payload.len() as u32,
265 );
266 out.extend_from_slice(payload);
267 Ok(out)
268 }
269 ReplicationCompressionMode::Zstd => {
270 let compressed = zstd::bulk::compress(payload, zstd_level).map_err(|error| {
271 FastCacheError::Protocol(format!("FCRP zstd compression failed: {error}"))
272 })?;
273 let mut out = Vec::with_capacity(HEADER_LEN + compressed.len());
274 write_header(
275 &mut out,
276 kind,
277 FLAG_COMPRESSED,
278 compressed.len() as u32,
279 payload.len() as u32,
280 );
281 out.extend_from_slice(&compressed);
282 Ok(out)
283 }
284 }
285}
286
287fn write_header(
288 out: &mut Vec<u8>,
289 kind: FrameKind,
290 flags: u8,
291 payload_len: u32,
292 uncompressed_len: u32,
293) {
294 out.extend_from_slice(FCRP_MAGIC);
295 out.push(FCRP_VERSION);
296 out.push(kind as u8);
297 out.push(flags);
298 out.push(0);
299 out.extend_from_slice(&payload_len.to_le_bytes());
300 out.extend_from_slice(&uncompressed_len.to_le_bytes());
301}
302
303pub(crate) fn write_uncompressed_frame_header_at(
304 frame: &mut [u8],
305 kind: FrameKind,
306 payload_len: usize,
307) {
308 debug_assert!(frame.len() >= HEADER_LEN);
309 frame[..4].copy_from_slice(FCRP_MAGIC);
310 frame[4] = FCRP_VERSION;
311 frame[5] = kind as u8;
312 frame[6] = 0;
313 frame[7] = 0;
314 frame[8..12].copy_from_slice(&(payload_len as u32).to_le_bytes());
315 frame[12..16].copy_from_slice(&(payload_len as u32).to_le_bytes());
316}
317
318pub fn decode_frame(bytes: &[u8]) -> Result<ReplicationFrame> {
319 let frame = decode_frame_payload(bytes)?;
320 Ok(ReplicationFrame {
321 kind: frame.kind,
322 compressed: frame.compressed,
323 payload: frame.payload.into_owned(),
324 })
325}
326
327pub fn decode_frame_payload(bytes: &[u8]) -> Result<ReplicationFramePayload<'_>> {
328 if bytes.len() < HEADER_LEN {
329 return Err(FastCacheError::Protocol("FCRP frame is truncated".into()));
330 }
331 if &bytes[..4] != FCRP_MAGIC {
332 return Err(FastCacheError::Protocol("FCRP magic mismatch".into()));
333 }
334 if bytes[4] != FCRP_VERSION {
335 return Err(FastCacheError::Protocol(format!(
336 "unsupported FCRP version: {}",
337 bytes[4]
338 )));
339 }
340 let kind = FrameKind::from_u8(bytes[5])?;
341 let flags = bytes[6];
342 let payload_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
343 let uncompressed_len = u32::from_le_bytes(bytes[12..16].try_into().unwrap()) as usize;
344 if HEADER_LEN + payload_len != bytes.len() {
345 return Err(FastCacheError::Protocol(
346 "FCRP frame length mismatch".into(),
347 ));
348 }
349 let raw = &bytes[HEADER_LEN..];
350 let compressed = flags & FLAG_COMPRESSED != 0;
351 let payload = if compressed {
352 Cow::Owned(
353 zstd::bulk::decompress(raw, uncompressed_len).map_err(|error| {
354 FastCacheError::Protocol(format!("FCRP zstd decompression failed: {error}"))
355 })?,
356 )
357 } else {
358 Cow::Borrowed(raw)
359 };
360 Ok(ReplicationFramePayload {
361 kind,
362 compressed,
363 payload,
364 })
365}
366
367pub fn decode_frame_payload_bytes(bytes: SharedBytes) -> Result<ReplicationFrameBytesPayload> {
368 if bytes.len() < HEADER_LEN {
369 return Err(FastCacheError::Protocol("FCRP frame is truncated".into()));
370 }
371 if &bytes[..4] != FCRP_MAGIC {
372 return Err(FastCacheError::Protocol("FCRP magic mismatch".into()));
373 }
374 if bytes[4] != FCRP_VERSION {
375 return Err(FastCacheError::Protocol(format!(
376 "unsupported FCRP version: {}",
377 bytes[4]
378 )));
379 }
380
381 let kind = FrameKind::from_u8(bytes[5])?;
382 let flags = bytes[6];
383 let payload_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
384 let uncompressed_len = u32::from_le_bytes(bytes[12..16].try_into().unwrap()) as usize;
385 if HEADER_LEN + payload_len != bytes.len() {
386 return Err(FastCacheError::Protocol(
387 "FCRP frame length mismatch".into(),
388 ));
389 }
390
391 let compressed = flags & FLAG_COMPRESSED != 0;
392 let payload = match compressed {
393 true => {
394 let raw = &bytes[HEADER_LEN..];
395 SharedBytes::from(
396 zstd::bulk::decompress(raw, uncompressed_len).map_err(|error| {
397 FastCacheError::Protocol(format!("FCRP zstd decompression failed: {error}"))
398 })?,
399 )
400 }
401 false => bytes.slice(HEADER_LEN..),
402 };
403 Ok(ReplicationFrameBytesPayload {
404 kind,
405 compressed,
406 payload,
407 })
408}
409
410pub fn encode_mutation_batch(mutations: &[ReplicationMutation]) -> Vec<u8> {
411 let payload_len = mutation_batch_payload_len(mutations);
412 let mut out = Vec::with_capacity(payload_len);
413 write_mutation_batch_payload(&mut out, mutations);
414 out
415}
416
417pub(crate) fn encode_mutation_batch_frame_with_payload_len(
418 mutations: &[ReplicationMutation],
419 payload_len: usize,
420 compression: ReplicationCompressionMode,
421 zstd_level: i32,
422) -> Result<(Vec<u8>, usize)> {
423 match compression {
424 ReplicationCompressionMode::None => {
425 let mut out = Vec::with_capacity(HEADER_LEN + payload_len);
426 write_header(
427 &mut out,
428 FrameKind::MutationBatch,
429 0,
430 payload_len as u32,
431 payload_len as u32,
432 );
433 write_mutation_batch_payload(&mut out, mutations);
434 Ok((out, payload_len))
435 }
436 ReplicationCompressionMode::Zstd => {
437 let mut payload = Vec::with_capacity(payload_len);
438 write_mutation_batch_payload(&mut payload, mutations);
439 encode_frame(FrameKind::MutationBatch, compression, zstd_level, &payload)
440 .map(|frame| (frame, payload_len))
441 }
442 }
443}
444
445fn mutation_batch_payload_len(mutations: &[ReplicationMutation]) -> usize {
446 4 + mutations
447 .iter()
448 .map(ReplicationMutation::estimated_uncompressed_len)
449 .sum::<usize>()
450}
451
452pub(crate) fn mutation_record_payload_len(key_len: usize, value_len: usize) -> usize {
453 4 + 8 + 8 + 1 + 8 + 8 + 8 + 4 + 4 + key_len + value_len
454}
455
456pub(crate) fn borrowed_mutation_record_payload_len(
457 mutation: BorrowedReplicationMutation<'_>,
458) -> usize {
459 mutation_record_payload_len(mutation.key.len(), mutation.value.len())
460}
461
462pub(crate) fn mutation_batch_record_count(bytes: &[u8]) -> Result<usize> {
463 let Some(count) = bytes.get(..4) else {
464 return Err(FastCacheError::Protocol("FCRP payload is truncated".into()));
465 };
466 Ok(u32::from_le_bytes(count.try_into().unwrap()) as usize)
467}
468
469pub(crate) fn write_borrowed_mutation_payload_record(
470 out: &mut Vec<u8>,
471 mutation: BorrowedReplicationMutation<'_>,
472) {
473 let start = out.len();
474 let record_len = borrowed_mutation_record_payload_len(mutation);
475 out.reserve(record_len);
476 unsafe {
480 let mut cursor = out.as_mut_ptr().add(start);
481 write_u32_le(&mut cursor, mutation.shard_id as u32);
482 write_u64_le(&mut cursor, mutation.sequence);
483 write_u64_le(&mut cursor, mutation.timestamp_ms);
484 write_u8(&mut cursor, mutation.op.to_byte());
485 write_u64_le(&mut cursor, mutation.key_hash);
486 write_u64_le(&mut cursor, mutation.key_tag);
487 write_u64_le(&mut cursor, mutation.expire_at_ms.unwrap_or(EXPIRE_NONE));
488 write_u32_le(&mut cursor, mutation.key.len() as u32);
489 write_u32_le(&mut cursor, mutation.value.len() as u32);
490 write_bytes(&mut cursor, mutation.key);
491 write_bytes(&mut cursor, mutation.value);
492 debug_assert_eq!(
493 cursor.offset_from(out.as_ptr().add(start)),
494 record_len as isize
495 );
496 out.set_len(start + record_len);
497 }
498}
499
500#[inline(always)]
501unsafe fn write_u8(cursor: &mut *mut u8, value: u8) {
502 unsafe {
504 ptr::write(*cursor, value);
505 *cursor = (*cursor).add(1);
506 }
507}
508
509#[inline(always)]
510unsafe fn write_u32_le(cursor: &mut *mut u8, value: u32) {
511 unsafe {
514 ptr::write_unaligned((*cursor).cast::<u32>(), value.to_le());
515 *cursor = (*cursor).add(4);
516 }
517}
518
519#[inline(always)]
520unsafe fn write_u64_le(cursor: &mut *mut u8, value: u64) {
521 unsafe {
524 ptr::write_unaligned((*cursor).cast::<u64>(), value.to_le());
525 *cursor = (*cursor).add(8);
526 }
527}
528
529#[inline(always)]
530unsafe fn write_bytes(cursor: &mut *mut u8, bytes: &[u8]) {
531 unsafe {
533 ptr::copy_nonoverlapping(bytes.as_ptr(), *cursor, bytes.len());
534 *cursor = (*cursor).add(bytes.len());
535 }
536}
537
538fn write_mutation_batch_payload(out: &mut Vec<u8>, mutations: &[ReplicationMutation]) {
539 out.extend_from_slice(&(mutations.len() as u32).to_le_bytes());
540 for mutation in mutations {
541 write_borrowed_mutation_payload_record(out, mutation.as_borrowed());
542 }
543}
544
545pub fn decode_mutation_batch(bytes: &[u8]) -> Result<Vec<ReplicationMutation>> {
546 let mut cursor = Cursor::new(bytes);
547 let count = cursor.u32()? as usize;
548 let mut mutations = Vec::with_capacity(count);
549 for _ in 0..count {
550 let shard_id = cursor.u32()? as usize;
551 let sequence = cursor.u64()?;
552 let timestamp_ms = cursor.u64()?;
553 let op = ReplicationMutationOp::from_byte(cursor.u8()?)?;
554 let key_hash = cursor.u64()?;
555 let key_tag = cursor.u64()?;
556 let expire_raw = cursor.u64()?;
557 let key_len = cursor.u32()? as usize;
558 let value_len = cursor.u32()? as usize;
559 let key = SharedBytes::from(cursor.bytes(key_len)?.to_vec());
560 let value = SharedBytes::from(cursor.bytes(value_len)?.to_vec());
561 mutations.push(ReplicationMutation {
562 shard_id,
563 sequence,
564 timestamp_ms,
565 op,
566 key_hash,
567 key_tag,
568 key,
569 value,
570 expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
571 });
572 }
573 cursor.finish()?;
574 Ok(mutations)
575}
576
577pub fn visit_mutation_batch_payload<F>(bytes: &[u8], mut visit: F) -> Result<()>
578where
579 F: FnMut(BorrowedReplicationMutation<'_>) -> Result<()>,
580{
581 let mut cursor = Cursor::new(bytes);
582 let count = cursor.u32()? as usize;
583 for _ in 0..count {
584 let shard_id = cursor.u32()? as usize;
585 let sequence = cursor.u64()?;
586 let timestamp_ms = cursor.u64()?;
587 let op = ReplicationMutationOp::from_byte(cursor.u8()?)?;
588 let key_hash = cursor.u64()?;
589 let key_tag = cursor.u64()?;
590 let expire_raw = cursor.u64()?;
591 let key_len = cursor.u32()? as usize;
592 let value_len = cursor.u32()? as usize;
593 let key = cursor.bytes(key_len)?;
594 let value = cursor.bytes(value_len)?;
595 visit(BorrowedReplicationMutation {
596 shard_id,
597 sequence,
598 timestamp_ms,
599 op,
600 key_hash,
601 key_tag,
602 key,
603 value,
604 expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
605 })?;
606 }
607 cursor.finish()
608}
609
610pub fn visit_mutation_batch_payload_bytes<F>(bytes: SharedBytes, mut visit: F) -> Result<()>
611where
612 F: FnMut(FrameBackedReplicationMutation<'_>) -> Result<()>,
613{
614 let mut cursor = Cursor::new(bytes.as_ref());
615 let count = cursor.u32()? as usize;
616 for _ in 0..count {
617 let shard_id = cursor.u32()? as usize;
618 let sequence = cursor.u64()?;
619 let _timestamp_ms = cursor.u64()?;
620 let op = ReplicationMutationOp::from_byte(cursor.u8()?)?;
621 let key_hash = cursor.u64()?;
622 let _key_tag = cursor.u64()?;
623 let expire_raw = cursor.u64()?;
624 let key_len = cursor.u32()? as usize;
625 let value_len = cursor.u32()? as usize;
626 let key = cursor.bytes(key_len)?;
627 let value_start = cursor.pos;
628 let _ = cursor.bytes(value_len)?;
629 let value = bytes.slice(value_start..value_start + value_len);
630 visit(FrameBackedReplicationMutation {
631 shard_id,
632 sequence,
633 op,
634 key_hash,
635 key,
636 value,
637 expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
638 })?;
639 }
640 cursor.finish()
641}
642
643#[derive(Debug, Clone, Copy, PartialEq, Eq)]
644pub enum HelloRole {
645 Replica = 1,
646 ServiceSubscriber = 2,
647}
648
649impl HelloRole {
650 fn to_byte(self) -> u8 {
651 self as u8
652 }
653
654 fn from_byte(value: u8) -> Result<Self> {
655 match value {
656 1 => Ok(Self::Replica),
657 2 => Ok(Self::ServiceSubscriber),
658 other => Err(FastCacheError::Protocol(format!(
659 "unsupported FCRP hello role: {other}"
660 ))),
661 }
662 }
663}
664
665#[derive(Debug, Clone, PartialEq, Eq)]
666pub struct ReplicationHello {
667 pub version: u8,
668 pub role: HelloRole,
669 pub auth_token: Option<String>,
670 pub since: Option<ShardWatermarks>,
671}
672
673pub fn encode_hello(hello: &ReplicationHello) -> Vec<u8> {
674 let mut out = Vec::new();
675 out.push(hello.version);
676 out.push(hello.role.to_byte());
677 let token = hello.auth_token.as_deref().unwrap_or("");
678 out.extend_from_slice(&(token.len() as u32).to_le_bytes());
679 out.extend_from_slice(token.as_bytes());
680 match &hello.since {
681 Some(watermarks) => {
682 out.push(1);
683 out.extend_from_slice(&(watermarks.as_slice().len() as u32).to_le_bytes());
684 for value in watermarks.as_slice() {
685 out.extend_from_slice(&value.to_le_bytes());
686 }
687 }
688 None => out.push(0),
689 }
690 out
691}
692
693pub fn decode_hello(bytes: &[u8]) -> Result<ReplicationHello> {
694 let mut cursor = Cursor::new(bytes);
695 let version = cursor.u8()?;
696 let role = HelloRole::from_byte(cursor.u8()?)?;
697 let token_len = cursor.u32()? as usize;
698 let token_bytes = cursor.bytes(token_len)?;
699 let auth_token = if token_len == 0 {
700 None
701 } else {
702 Some(
703 std::str::from_utf8(token_bytes)
704 .map_err(|error| {
705 FastCacheError::Protocol(format!("FCRP hello auth token is not UTF-8: {error}"))
706 })?
707 .to_string(),
708 )
709 };
710 let has_since = cursor.u8()? != 0;
711 let since = if has_since {
712 let count = cursor.u32()? as usize;
713 let mut values = Vec::with_capacity(count);
714 for _ in 0..count {
715 values.push(cursor.u64()?);
716 }
717 Some(ShardWatermarks::from_vec(values))
718 } else {
719 None
720 };
721 cursor.finish()?;
722 Ok(ReplicationHello {
723 version,
724 role,
725 auth_token,
726 since,
727 })
728}
729
730pub fn encode_error(message: &str) -> Vec<u8> {
731 let mut out = Vec::with_capacity(4 + message.len());
732 out.extend_from_slice(&(message.len() as u32).to_le_bytes());
733 out.extend_from_slice(message.as_bytes());
734 out
735}
736
737pub fn decode_error(bytes: &[u8]) -> Result<String> {
738 let mut cursor = Cursor::new(bytes);
739 let len = cursor.u32()? as usize;
740 let body = cursor.bytes(len)?;
741 cursor.finish()?;
742 std::str::from_utf8(body)
743 .map(|s| s.to_string())
744 .map_err(|error| FastCacheError::Protocol(format!("FCRP error payload not UTF-8: {error}")))
745}
746
747pub fn encode_ack(watermarks: &ShardWatermarks) -> Vec<u8> {
748 let mut out = Vec::with_capacity(4 + watermarks.as_slice().len() * 8);
749 out.extend_from_slice(&(watermarks.as_slice().len() as u32).to_le_bytes());
750 for value in watermarks.as_slice() {
751 out.extend_from_slice(&value.to_le_bytes());
752 }
753 out
754}
755
756pub fn decode_ack(bytes: &[u8]) -> Result<ShardWatermarks> {
757 let mut cursor = Cursor::new(bytes);
758 let count = cursor.u32()? as usize;
759 let mut values = Vec::with_capacity(count);
760 for _ in 0..count {
761 values.push(cursor.u64()?);
762 }
763 cursor.finish()?;
764 Ok(ShardWatermarks::from_vec(values))
765}
766
767pub fn encode_snapshot_chunk(chunk: &ReplicationSnapshotChunk) -> Vec<u8> {
768 let mut out = Vec::new();
769 out.extend_from_slice(&chunk.chunk_index.to_le_bytes());
770 out.push(u8::from(chunk.is_last));
771 out.extend_from_slice(&(chunk.watermarks.as_slice().len() as u32).to_le_bytes());
772 for watermark in chunk.watermarks.as_slice() {
773 out.extend_from_slice(&watermark.to_le_bytes());
774 }
775 out.extend_from_slice(&(chunk.entries.len() as u32).to_le_bytes());
776 for entry in &chunk.entries {
777 out.extend_from_slice(&(entry.key.len() as u32).to_le_bytes());
778 out.extend_from_slice(&(entry.value.len() as u32).to_le_bytes());
779 out.extend_from_slice(&entry.expire_at_ms.unwrap_or(EXPIRE_NONE).to_le_bytes());
780 out.extend_from_slice(entry.key.as_ref());
781 out.extend_from_slice(entry.value.as_ref());
782 }
783 out
784}
785
786pub fn decode_snapshot_chunk(bytes: &[u8]) -> Result<ReplicationSnapshotChunk> {
787 let mut cursor = Cursor::new(bytes);
788 let chunk_index = cursor.u64()?;
789 let is_last = cursor.u8()? != 0;
790 let watermark_count = cursor.u32()? as usize;
791 let mut watermarks = Vec::with_capacity(watermark_count);
792 for _ in 0..watermark_count {
793 watermarks.push(cursor.u64()?);
794 }
795 let entry_count = cursor.u32()? as usize;
796 let mut entries = Vec::with_capacity(entry_count);
797 for _ in 0..entry_count {
798 let key_len = cursor.u32()? as usize;
799 let value_len = cursor.u32()? as usize;
800 let expire_raw = cursor.u64()?;
801 let key = cursor.bytes(key_len)?.to_vec();
802 let value = cursor.bytes(value_len)?.to_vec();
803 entries.push(StoredEntry {
804 key,
805 value,
806 expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
807 });
808 }
809 cursor.finish()?;
810 Ok(ReplicationSnapshotChunk {
811 watermarks: ShardWatermarks::from_vec(watermarks),
812 chunk_index,
813 is_last,
814 entries,
815 })
816}
817
818struct Cursor<'a> {
819 bytes: &'a [u8],
820 pos: usize,
821}
822
823impl<'a> Cursor<'a> {
824 fn new(bytes: &'a [u8]) -> Self {
825 Self { bytes, pos: 0 }
826 }
827
828 fn u8(&mut self) -> Result<u8> {
829 let Some(value) = self.bytes.get(self.pos).copied() else {
830 return Err(FastCacheError::Protocol("FCRP payload is truncated".into()));
831 };
832 self.pos += 1;
833 Ok(value)
834 }
835
836 fn u32(&mut self) -> Result<u32> {
837 let bytes = self.bytes(4)?;
838 Ok(u32::from_le_bytes(bytes.try_into().unwrap()))
839 }
840
841 fn u64(&mut self) -> Result<u64> {
842 let bytes = self.bytes(8)?;
843 Ok(u64::from_le_bytes(bytes.try_into().unwrap()))
844 }
845
846 fn bytes(&mut self, len: usize) -> Result<&'a [u8]> {
847 if self.pos + len > self.bytes.len() {
848 return Err(FastCacheError::Protocol("FCRP payload is truncated".into()));
849 }
850 let bytes = &self.bytes[self.pos..self.pos + len];
851 self.pos += len;
852 Ok(bytes)
853 }
854
855 fn finish(&self) -> Result<()> {
856 if self.pos != self.bytes.len() {
857 return Err(FastCacheError::Protocol(
858 "FCRP payload contains trailing bytes".into(),
859 ));
860 }
861 Ok(())
862 }
863}
864
865#[cfg(test)]
866mod tests {
867 use crate::storage::hash_key_tag;
868
869 use super::*;
870
871 fn sample_mutation(sequence: u64) -> ReplicationMutation {
872 let key = b"alpha".to_vec();
873 ReplicationMutation {
874 shard_id: 2,
875 sequence,
876 timestamp_ms: 42,
877 op: ReplicationMutationOp::Set,
878 key_hash: hash_key(&key),
879 key_tag: hash_key_tag(&key),
880 key: SharedBytes::from(key),
881 value: SharedBytes::from_static(b"value"),
882 expire_at_ms: Some(99),
883 }
884 }
885
886 #[test]
887 fn mutation_batch_round_trips() {
888 let mutations = vec![sample_mutation(1), sample_mutation(2)];
889 let encoded = encode_mutation_batch(&mutations);
890 let decoded = decode_mutation_batch(&encoded).expect("decode");
891 assert_eq!(decoded, mutations);
892 }
893
894 #[test]
895 fn mutation_batch_frame_round_trips_without_payload_copy() {
896 let mutations = vec![sample_mutation(1), sample_mutation(2)];
897 let payload = encode_mutation_batch(&mutations);
898 let (encoded, uncompressed_len) = encode_mutation_batch_frame_with_payload_len(
899 &mutations,
900 payload.len(),
901 ReplicationCompressionMode::None,
902 0,
903 )
904 .expect("encode");
905 let decoded = decode_frame(&encoded).expect("decode frame");
906 assert_eq!(decoded.kind, FrameKind::MutationBatch);
907 assert!(!decoded.compressed);
908 assert_eq!(uncompressed_len, payload.len());
909 assert_eq!(decoded.payload, payload);
910 assert_eq!(
911 decode_mutation_batch(&decoded.payload).expect("decode batch"),
912 mutations
913 );
914 }
915
916 #[test]
917 fn mutation_batch_payload_bytes_visits_frame_backed_values() {
918 let mutations = vec![sample_mutation(1), sample_mutation(2)];
919 let payload = encode_mutation_batch(&mutations);
920 let (encoded, _) = encode_mutation_batch_frame_with_payload_len(
921 &mutations,
922 payload.len(),
923 ReplicationCompressionMode::None,
924 0,
925 )
926 .expect("encode");
927
928 let decoded = decode_frame_payload_bytes(SharedBytes::from(encoded)).expect("decode");
929 let payload_start = decoded.payload.as_ptr() as usize;
930 let payload_end = payload_start + decoded.payload.len();
931 let mut visited = Vec::new();
932 visit_mutation_batch_payload_bytes(decoded.payload, |mutation| {
933 let value_start = mutation.value.as_ptr() as usize;
934 let value_end = value_start + mutation.value.len();
935 assert!(value_start >= payload_start);
936 assert!(value_end <= payload_end);
937 visited.push(mutation.value);
938 Ok(())
939 })
940 .expect("visit");
941
942 assert_eq!(
943 visited,
944 vec![mutations[0].value.clone(), mutations[1].value.clone()]
945 );
946 }
947
948 #[test]
949 fn zstd_frame_round_trips() {
950 let payload = encode_mutation_batch(&[sample_mutation(1)]);
951 let encoded = encode_frame(
952 FrameKind::MutationBatch,
953 ReplicationCompressionMode::Zstd,
954 3,
955 &payload,
956 )
957 .expect("encode");
958 let decoded = decode_frame(&encoded).expect("decode");
959 assert_eq!(decoded.kind, FrameKind::MutationBatch);
960 assert!(decoded.compressed);
961 assert_eq!(decoded.payload, payload);
962 }
963
964 #[test]
965 fn snapshot_chunk_round_trips() {
966 let chunk = ReplicationSnapshotChunk {
967 watermarks: ShardWatermarks::from_vec(vec![1, 2]),
968 chunk_index: 3,
969 is_last: true,
970 entries: vec![StoredEntry {
971 key: b"k".to_vec(),
972 value: b"v".to_vec(),
973 expire_at_ms: None,
974 }],
975 };
976 let encoded = encode_snapshot_chunk(&chunk);
977 let decoded = decode_snapshot_chunk(&encoded).expect("decode");
978 assert_eq!(decoded, chunk);
979 }
980}