Skip to main content

fast_cache/replication/
protocol.rs

1use std::borrow::Cow;
2use std::ptr;
3
4use crate::config::ReplicationCompression;
5use bytes::Bytes as SharedBytes;
6
7use crate::storage::{MutationOp, MutationRecord, StoredEntry, hash_key, hash_key_tag_from_hash};
8use crate::{FastCacheError, Result};
9
10pub const FCRP_MAGIC: &[u8; 4] = b"FCRP";
11pub const FCRP_VERSION: u8 = 1;
12
13const HEADER_LEN: usize = 16;
14pub(crate) const FRAME_HEADER_LEN: usize = HEADER_LEN;
15const FLAG_COMPRESSED: u8 = 0x01;
16// u64::MAX is reserved as the wire sentinel for "no expiry". A legitimate
17// `expire_at_ms` of u64::MAX would map to year ~584,942,417, so it is safe to
18// reserve.
19const EXPIRE_NONE: u64 = u64::MAX;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[repr(u8)]
23pub enum FrameKind {
24    Hello = 1,
25    SnapshotBegin = 2,
26    SnapshotChunk = 3,
27    SnapshotEnd = 4,
28    MutationBatch = 5,
29    Ack = 6,
30    Error = 7,
31}
32
33impl FrameKind {
34    fn from_u8(value: u8) -> Result<Self> {
35        match value {
36            1 => Ok(Self::Hello),
37            2 => Ok(Self::SnapshotBegin),
38            3 => Ok(Self::SnapshotChunk),
39            4 => Ok(Self::SnapshotEnd),
40            5 => Ok(Self::MutationBatch),
41            6 => Ok(Self::Ack),
42            7 => Ok(Self::Error),
43            other => Err(FastCacheError::Protocol(format!(
44                "unsupported FCRP frame kind: {other}"
45            ))),
46        }
47    }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ReplicationCompressionMode {
52    None,
53    Zstd,
54}
55
56impl From<ReplicationCompression> for ReplicationCompressionMode {
57    fn from(value: ReplicationCompression) -> Self {
58        match value {
59            ReplicationCompression::None => Self::None,
60            ReplicationCompression::Zstd => Self::Zstd,
61        }
62    }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct ReplicationFrame {
67    pub kind: FrameKind,
68    pub compressed: bool,
69    pub payload: Vec<u8>,
70}
71
72#[derive(Debug)]
73pub struct ReplicationFramePayload<'a> {
74    pub kind: FrameKind,
75    pub compressed: bool,
76    pub payload: Cow<'a, [u8]>,
77}
78
79#[derive(Debug, Clone)]
80pub struct ReplicationFrameBytesPayload {
81    pub kind: FrameKind,
82    pub compressed: bool,
83    pub payload: SharedBytes,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct ShardWatermarks {
88    values: Vec<u64>,
89}
90
91impl ShardWatermarks {
92    pub fn new(shard_count: usize) -> Self {
93        Self {
94            values: vec![0; shard_count],
95        }
96    }
97
98    pub fn from_vec(values: Vec<u64>) -> Self {
99        Self { values }
100    }
101
102    pub fn as_slice(&self) -> &[u64] {
103        &self.values
104    }
105
106    pub fn into_vec(self) -> Vec<u64> {
107        self.values
108    }
109
110    pub fn get(&self, shard_id: usize) -> u64 {
111        self.values.get(shard_id).copied().unwrap_or(0)
112    }
113
114    pub fn observe(&mut self, shard_id: usize, sequence: u64) {
115        if shard_id >= self.values.len() {
116            self.values.resize(shard_id + 1, 0);
117        }
118        self.values[shard_id] = self.values[shard_id].max(sequence);
119    }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum ReplicationMutationOp {
124    Set,
125    Del,
126    Expire,
127}
128
129impl From<&MutationOp> for ReplicationMutationOp {
130    fn from(value: &MutationOp) -> Self {
131        match value {
132            MutationOp::Set => Self::Set,
133            MutationOp::Del => Self::Del,
134            MutationOp::Expire => Self::Expire,
135        }
136    }
137}
138
139impl ReplicationMutationOp {
140    fn to_byte(self) -> u8 {
141        match self {
142            Self::Set => 1,
143            Self::Del => 2,
144            Self::Expire => 3,
145        }
146    }
147
148    fn from_byte(value: u8) -> Result<Self> {
149        match value {
150            1 => Ok(Self::Set),
151            2 => Ok(Self::Del),
152            3 => Ok(Self::Expire),
153            other => Err(FastCacheError::Protocol(format!(
154                "unsupported FCRP mutation op: {other}"
155            ))),
156        }
157    }
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct ReplicationMutation {
162    pub shard_id: usize,
163    pub sequence: u64,
164    pub timestamp_ms: u64,
165    pub op: ReplicationMutationOp,
166    pub key_hash: u64,
167    pub key_tag: u64,
168    pub key: SharedBytes,
169    pub value: SharedBytes,
170    pub expire_at_ms: Option<u64>,
171}
172
173#[derive(Debug, Clone, Copy)]
174pub struct BorrowedReplicationMutation<'a> {
175    pub shard_id: usize,
176    pub sequence: u64,
177    pub timestamp_ms: u64,
178    pub op: ReplicationMutationOp,
179    pub key_hash: u64,
180    pub key_tag: u64,
181    pub key: &'a [u8],
182    pub value: &'a [u8],
183    pub expire_at_ms: Option<u64>,
184}
185
186#[derive(Debug, Clone)]
187pub struct FrameBackedReplicationMutation<'a> {
188    pub shard_id: usize,
189    pub sequence: u64,
190    pub op: ReplicationMutationOp,
191    pub key_hash: u64,
192    pub key: &'a [u8],
193    pub value: SharedBytes,
194    pub expire_at_ms: Option<u64>,
195}
196
197impl ReplicationMutation {
198    pub fn from_record(record: &MutationRecord) -> Self {
199        let key_hash = hash_key(record.key.as_ref());
200        Self::from_record_with_key_hash(record, key_hash)
201    }
202
203    pub fn from_record_with_key_hash(record: &MutationRecord, key_hash: u64) -> Self {
204        Self {
205            shard_id: record.shard_id,
206            sequence: record.sequence,
207            timestamp_ms: record.timestamp_ms,
208            op: ReplicationMutationOp::from(&record.op),
209            key_hash,
210            key_tag: hash_key_tag_from_hash(key_hash),
211            key: record.key.clone(),
212            value: record.value.clone(),
213            expire_at_ms: record.expire_at_ms,
214        }
215    }
216
217    pub fn estimated_uncompressed_len(&self) -> usize {
218        mutation_record_payload_len(self.key.len(), self.value.len())
219    }
220
221    pub(crate) fn as_borrowed(&self) -> BorrowedReplicationMutation<'_> {
222        BorrowedReplicationMutation {
223            shard_id: self.shard_id,
224            sequence: self.sequence,
225            timestamp_ms: self.timestamp_ms,
226            op: self.op,
227            key_hash: self.key_hash,
228            key_tag: self.key_tag,
229            key: self.key.as_ref(),
230            value: self.value.as_ref(),
231            expire_at_ms: self.expire_at_ms,
232        }
233    }
234}
235
236#[derive(Debug, Clone)]
237pub struct ReplicationSnapshot {
238    pub entries: Vec<StoredEntry>,
239    pub watermarks: ShardWatermarks,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct ReplicationSnapshotChunk {
244    pub watermarks: ShardWatermarks,
245    pub chunk_index: u64,
246    pub is_last: bool,
247    pub entries: Vec<StoredEntry>,
248}
249
250pub fn encode_frame(
251    kind: FrameKind,
252    compression: ReplicationCompressionMode,
253    zstd_level: i32,
254    payload: &[u8],
255) -> Result<Vec<u8>> {
256    match compression {
257        ReplicationCompressionMode::None => {
258            let mut out = Vec::with_capacity(HEADER_LEN + payload.len());
259            write_header(
260                &mut out,
261                kind,
262                0,
263                payload.len() as u32,
264                payload.len() as u32,
265            );
266            out.extend_from_slice(payload);
267            Ok(out)
268        }
269        ReplicationCompressionMode::Zstd => {
270            let compressed = zstd::bulk::compress(payload, zstd_level).map_err(|error| {
271                FastCacheError::Protocol(format!("FCRP zstd compression failed: {error}"))
272            })?;
273            let mut out = Vec::with_capacity(HEADER_LEN + compressed.len());
274            write_header(
275                &mut out,
276                kind,
277                FLAG_COMPRESSED,
278                compressed.len() as u32,
279                payload.len() as u32,
280            );
281            out.extend_from_slice(&compressed);
282            Ok(out)
283        }
284    }
285}
286
287fn write_header(
288    out: &mut Vec<u8>,
289    kind: FrameKind,
290    flags: u8,
291    payload_len: u32,
292    uncompressed_len: u32,
293) {
294    out.extend_from_slice(FCRP_MAGIC);
295    out.push(FCRP_VERSION);
296    out.push(kind as u8);
297    out.push(flags);
298    out.push(0);
299    out.extend_from_slice(&payload_len.to_le_bytes());
300    out.extend_from_slice(&uncompressed_len.to_le_bytes());
301}
302
303pub(crate) fn write_uncompressed_frame_header_at(
304    frame: &mut [u8],
305    kind: FrameKind,
306    payload_len: usize,
307) {
308    debug_assert!(frame.len() >= HEADER_LEN);
309    frame[..4].copy_from_slice(FCRP_MAGIC);
310    frame[4] = FCRP_VERSION;
311    frame[5] = kind as u8;
312    frame[6] = 0;
313    frame[7] = 0;
314    frame[8..12].copy_from_slice(&(payload_len as u32).to_le_bytes());
315    frame[12..16].copy_from_slice(&(payload_len as u32).to_le_bytes());
316}
317
318pub fn decode_frame(bytes: &[u8]) -> Result<ReplicationFrame> {
319    let frame = decode_frame_payload(bytes)?;
320    Ok(ReplicationFrame {
321        kind: frame.kind,
322        compressed: frame.compressed,
323        payload: frame.payload.into_owned(),
324    })
325}
326
327pub fn decode_frame_payload(bytes: &[u8]) -> Result<ReplicationFramePayload<'_>> {
328    if bytes.len() < HEADER_LEN {
329        return Err(FastCacheError::Protocol("FCRP frame is truncated".into()));
330    }
331    if &bytes[..4] != FCRP_MAGIC {
332        return Err(FastCacheError::Protocol("FCRP magic mismatch".into()));
333    }
334    if bytes[4] != FCRP_VERSION {
335        return Err(FastCacheError::Protocol(format!(
336            "unsupported FCRP version: {}",
337            bytes[4]
338        )));
339    }
340    let kind = FrameKind::from_u8(bytes[5])?;
341    let flags = bytes[6];
342    let payload_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
343    let uncompressed_len = u32::from_le_bytes(bytes[12..16].try_into().unwrap()) as usize;
344    if HEADER_LEN + payload_len != bytes.len() {
345        return Err(FastCacheError::Protocol(
346            "FCRP frame length mismatch".into(),
347        ));
348    }
349    let raw = &bytes[HEADER_LEN..];
350    let compressed = flags & FLAG_COMPRESSED != 0;
351    let payload = if compressed {
352        Cow::Owned(
353            zstd::bulk::decompress(raw, uncompressed_len).map_err(|error| {
354                FastCacheError::Protocol(format!("FCRP zstd decompression failed: {error}"))
355            })?,
356        )
357    } else {
358        Cow::Borrowed(raw)
359    };
360    Ok(ReplicationFramePayload {
361        kind,
362        compressed,
363        payload,
364    })
365}
366
367pub fn decode_frame_payload_bytes(bytes: SharedBytes) -> Result<ReplicationFrameBytesPayload> {
368    if bytes.len() < HEADER_LEN {
369        return Err(FastCacheError::Protocol("FCRP frame is truncated".into()));
370    }
371    if &bytes[..4] != FCRP_MAGIC {
372        return Err(FastCacheError::Protocol("FCRP magic mismatch".into()));
373    }
374    if bytes[4] != FCRP_VERSION {
375        return Err(FastCacheError::Protocol(format!(
376            "unsupported FCRP version: {}",
377            bytes[4]
378        )));
379    }
380
381    let kind = FrameKind::from_u8(bytes[5])?;
382    let flags = bytes[6];
383    let payload_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
384    let uncompressed_len = u32::from_le_bytes(bytes[12..16].try_into().unwrap()) as usize;
385    if HEADER_LEN + payload_len != bytes.len() {
386        return Err(FastCacheError::Protocol(
387            "FCRP frame length mismatch".into(),
388        ));
389    }
390
391    let compressed = flags & FLAG_COMPRESSED != 0;
392    let payload = match compressed {
393        true => {
394            let raw = &bytes[HEADER_LEN..];
395            SharedBytes::from(
396                zstd::bulk::decompress(raw, uncompressed_len).map_err(|error| {
397                    FastCacheError::Protocol(format!("FCRP zstd decompression failed: {error}"))
398                })?,
399            )
400        }
401        false => bytes.slice(HEADER_LEN..),
402    };
403    Ok(ReplicationFrameBytesPayload {
404        kind,
405        compressed,
406        payload,
407    })
408}
409
410pub fn encode_mutation_batch(mutations: &[ReplicationMutation]) -> Vec<u8> {
411    let payload_len = mutation_batch_payload_len(mutations);
412    let mut out = Vec::with_capacity(payload_len);
413    write_mutation_batch_payload(&mut out, mutations);
414    out
415}
416
417pub(crate) fn encode_mutation_batch_frame_with_payload_len(
418    mutations: &[ReplicationMutation],
419    payload_len: usize,
420    compression: ReplicationCompressionMode,
421    zstd_level: i32,
422) -> Result<(Vec<u8>, usize)> {
423    match compression {
424        ReplicationCompressionMode::None => {
425            let mut out = Vec::with_capacity(HEADER_LEN + payload_len);
426            write_header(
427                &mut out,
428                FrameKind::MutationBatch,
429                0,
430                payload_len as u32,
431                payload_len as u32,
432            );
433            write_mutation_batch_payload(&mut out, mutations);
434            Ok((out, payload_len))
435        }
436        ReplicationCompressionMode::Zstd => {
437            let mut payload = Vec::with_capacity(payload_len);
438            write_mutation_batch_payload(&mut payload, mutations);
439            encode_frame(FrameKind::MutationBatch, compression, zstd_level, &payload)
440                .map(|frame| (frame, payload_len))
441        }
442    }
443}
444
445fn mutation_batch_payload_len(mutations: &[ReplicationMutation]) -> usize {
446    4 + mutations
447        .iter()
448        .map(ReplicationMutation::estimated_uncompressed_len)
449        .sum::<usize>()
450}
451
452pub(crate) fn mutation_record_payload_len(key_len: usize, value_len: usize) -> usize {
453    4 + 8 + 8 + 1 + 8 + 8 + 8 + 4 + 4 + key_len + value_len
454}
455
456pub(crate) fn borrowed_mutation_record_payload_len(
457    mutation: BorrowedReplicationMutation<'_>,
458) -> usize {
459    mutation_record_payload_len(mutation.key.len(), mutation.value.len())
460}
461
462pub(crate) fn mutation_batch_record_count(bytes: &[u8]) -> Result<usize> {
463    let Some(count) = bytes.get(..4) else {
464        return Err(FastCacheError::Protocol("FCRP payload is truncated".into()));
465    };
466    Ok(u32::from_le_bytes(count.try_into().unwrap()) as usize)
467}
468
469pub(crate) fn write_borrowed_mutation_payload_record(
470    out: &mut Vec<u8>,
471    mutation: BorrowedReplicationMutation<'_>,
472) {
473    let start = out.len();
474    let record_len = borrowed_mutation_record_payload_len(mutation);
475    out.reserve(record_len);
476    // The frame builder just reserved `record_len`, and every write below
477    // advances by exactly the encoded field width. This avoids one bounds and
478    // capacity check per scalar field on the replication hot path.
479    unsafe {
480        let mut cursor = out.as_mut_ptr().add(start);
481        write_u32_le(&mut cursor, mutation.shard_id as u32);
482        write_u64_le(&mut cursor, mutation.sequence);
483        write_u64_le(&mut cursor, mutation.timestamp_ms);
484        write_u8(&mut cursor, mutation.op.to_byte());
485        write_u64_le(&mut cursor, mutation.key_hash);
486        write_u64_le(&mut cursor, mutation.key_tag);
487        write_u64_le(&mut cursor, mutation.expire_at_ms.unwrap_or(EXPIRE_NONE));
488        write_u32_le(&mut cursor, mutation.key.len() as u32);
489        write_u32_le(&mut cursor, mutation.value.len() as u32);
490        write_bytes(&mut cursor, mutation.key);
491        write_bytes(&mut cursor, mutation.value);
492        debug_assert_eq!(
493            cursor.offset_from(out.as_ptr().add(start)),
494            record_len as isize
495        );
496        out.set_len(start + record_len);
497    }
498}
499
500#[inline(always)]
501unsafe fn write_u8(cursor: &mut *mut u8, value: u8) {
502    // SAFETY: the caller reserved enough capacity for the whole encoded record.
503    unsafe {
504        ptr::write(*cursor, value);
505        *cursor = (*cursor).add(1);
506    }
507}
508
509#[inline(always)]
510unsafe fn write_u32_le(cursor: &mut *mut u8, value: u32) {
511    // SAFETY: the caller reserved enough capacity for the whole encoded record;
512    // unaligned writes are intentional because the frame is byte packed.
513    unsafe {
514        ptr::write_unaligned((*cursor).cast::<u32>(), value.to_le());
515        *cursor = (*cursor).add(4);
516    }
517}
518
519#[inline(always)]
520unsafe fn write_u64_le(cursor: &mut *mut u8, value: u64) {
521    // SAFETY: the caller reserved enough capacity for the whole encoded record;
522    // unaligned writes are intentional because the frame is byte packed.
523    unsafe {
524        ptr::write_unaligned((*cursor).cast::<u64>(), value.to_le());
525        *cursor = (*cursor).add(8);
526    }
527}
528
529#[inline(always)]
530unsafe fn write_bytes(cursor: &mut *mut u8, bytes: &[u8]) {
531    // SAFETY: the caller reserved enough capacity for the whole encoded record.
532    unsafe {
533        ptr::copy_nonoverlapping(bytes.as_ptr(), *cursor, bytes.len());
534        *cursor = (*cursor).add(bytes.len());
535    }
536}
537
538fn write_mutation_batch_payload(out: &mut Vec<u8>, mutations: &[ReplicationMutation]) {
539    out.extend_from_slice(&(mutations.len() as u32).to_le_bytes());
540    for mutation in mutations {
541        write_borrowed_mutation_payload_record(out, mutation.as_borrowed());
542    }
543}
544
545pub fn decode_mutation_batch(bytes: &[u8]) -> Result<Vec<ReplicationMutation>> {
546    let mut cursor = Cursor::new(bytes);
547    let count = cursor.u32()? as usize;
548    let mut mutations = Vec::with_capacity(count);
549    for _ in 0..count {
550        let shard_id = cursor.u32()? as usize;
551        let sequence = cursor.u64()?;
552        let timestamp_ms = cursor.u64()?;
553        let op = ReplicationMutationOp::from_byte(cursor.u8()?)?;
554        let key_hash = cursor.u64()?;
555        let key_tag = cursor.u64()?;
556        let expire_raw = cursor.u64()?;
557        let key_len = cursor.u32()? as usize;
558        let value_len = cursor.u32()? as usize;
559        let key = SharedBytes::from(cursor.bytes(key_len)?.to_vec());
560        let value = SharedBytes::from(cursor.bytes(value_len)?.to_vec());
561        mutations.push(ReplicationMutation {
562            shard_id,
563            sequence,
564            timestamp_ms,
565            op,
566            key_hash,
567            key_tag,
568            key,
569            value,
570            expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
571        });
572    }
573    cursor.finish()?;
574    Ok(mutations)
575}
576
577pub fn visit_mutation_batch_payload<F>(bytes: &[u8], mut visit: F) -> Result<()>
578where
579    F: FnMut(BorrowedReplicationMutation<'_>) -> Result<()>,
580{
581    let mut cursor = Cursor::new(bytes);
582    let count = cursor.u32()? as usize;
583    for _ in 0..count {
584        let shard_id = cursor.u32()? as usize;
585        let sequence = cursor.u64()?;
586        let timestamp_ms = cursor.u64()?;
587        let op = ReplicationMutationOp::from_byte(cursor.u8()?)?;
588        let key_hash = cursor.u64()?;
589        let key_tag = cursor.u64()?;
590        let expire_raw = cursor.u64()?;
591        let key_len = cursor.u32()? as usize;
592        let value_len = cursor.u32()? as usize;
593        let key = cursor.bytes(key_len)?;
594        let value = cursor.bytes(value_len)?;
595        visit(BorrowedReplicationMutation {
596            shard_id,
597            sequence,
598            timestamp_ms,
599            op,
600            key_hash,
601            key_tag,
602            key,
603            value,
604            expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
605        })?;
606    }
607    cursor.finish()
608}
609
610pub fn visit_mutation_batch_payload_bytes<F>(bytes: SharedBytes, mut visit: F) -> Result<()>
611where
612    F: FnMut(FrameBackedReplicationMutation<'_>) -> Result<()>,
613{
614    let mut cursor = Cursor::new(bytes.as_ref());
615    let count = cursor.u32()? as usize;
616    for _ in 0..count {
617        let shard_id = cursor.u32()? as usize;
618        let sequence = cursor.u64()?;
619        let _timestamp_ms = cursor.u64()?;
620        let op = ReplicationMutationOp::from_byte(cursor.u8()?)?;
621        let key_hash = cursor.u64()?;
622        let _key_tag = cursor.u64()?;
623        let expire_raw = cursor.u64()?;
624        let key_len = cursor.u32()? as usize;
625        let value_len = cursor.u32()? as usize;
626        let key = cursor.bytes(key_len)?;
627        let value_start = cursor.pos;
628        let _ = cursor.bytes(value_len)?;
629        let value = bytes.slice(value_start..value_start + value_len);
630        visit(FrameBackedReplicationMutation {
631            shard_id,
632            sequence,
633            op,
634            key_hash,
635            key,
636            value,
637            expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
638        })?;
639    }
640    cursor.finish()
641}
642
643#[derive(Debug, Clone, Copy, PartialEq, Eq)]
644pub enum HelloRole {
645    Replica = 1,
646    ServiceSubscriber = 2,
647}
648
649impl HelloRole {
650    fn to_byte(self) -> u8 {
651        self as u8
652    }
653
654    fn from_byte(value: u8) -> Result<Self> {
655        match value {
656            1 => Ok(Self::Replica),
657            2 => Ok(Self::ServiceSubscriber),
658            other => Err(FastCacheError::Protocol(format!(
659                "unsupported FCRP hello role: {other}"
660            ))),
661        }
662    }
663}
664
665#[derive(Debug, Clone, PartialEq, Eq)]
666pub struct ReplicationHello {
667    pub version: u8,
668    pub role: HelloRole,
669    pub auth_token: Option<String>,
670    pub since: Option<ShardWatermarks>,
671}
672
673pub fn encode_hello(hello: &ReplicationHello) -> Vec<u8> {
674    let mut out = Vec::new();
675    out.push(hello.version);
676    out.push(hello.role.to_byte());
677    let token = hello.auth_token.as_deref().unwrap_or("");
678    out.extend_from_slice(&(token.len() as u32).to_le_bytes());
679    out.extend_from_slice(token.as_bytes());
680    match &hello.since {
681        Some(watermarks) => {
682            out.push(1);
683            out.extend_from_slice(&(watermarks.as_slice().len() as u32).to_le_bytes());
684            for value in watermarks.as_slice() {
685                out.extend_from_slice(&value.to_le_bytes());
686            }
687        }
688        None => out.push(0),
689    }
690    out
691}
692
693pub fn decode_hello(bytes: &[u8]) -> Result<ReplicationHello> {
694    let mut cursor = Cursor::new(bytes);
695    let version = cursor.u8()?;
696    let role = HelloRole::from_byte(cursor.u8()?)?;
697    let token_len = cursor.u32()? as usize;
698    let token_bytes = cursor.bytes(token_len)?;
699    let auth_token = if token_len == 0 {
700        None
701    } else {
702        Some(
703            std::str::from_utf8(token_bytes)
704                .map_err(|error| {
705                    FastCacheError::Protocol(format!("FCRP hello auth token is not UTF-8: {error}"))
706                })?
707                .to_string(),
708        )
709    };
710    let has_since = cursor.u8()? != 0;
711    let since = if has_since {
712        let count = cursor.u32()? as usize;
713        let mut values = Vec::with_capacity(count);
714        for _ in 0..count {
715            values.push(cursor.u64()?);
716        }
717        Some(ShardWatermarks::from_vec(values))
718    } else {
719        None
720    };
721    cursor.finish()?;
722    Ok(ReplicationHello {
723        version,
724        role,
725        auth_token,
726        since,
727    })
728}
729
730pub fn encode_error(message: &str) -> Vec<u8> {
731    let mut out = Vec::with_capacity(4 + message.len());
732    out.extend_from_slice(&(message.len() as u32).to_le_bytes());
733    out.extend_from_slice(message.as_bytes());
734    out
735}
736
737pub fn decode_error(bytes: &[u8]) -> Result<String> {
738    let mut cursor = Cursor::new(bytes);
739    let len = cursor.u32()? as usize;
740    let body = cursor.bytes(len)?;
741    cursor.finish()?;
742    std::str::from_utf8(body)
743        .map(|s| s.to_string())
744        .map_err(|error| FastCacheError::Protocol(format!("FCRP error payload not UTF-8: {error}")))
745}
746
747pub fn encode_ack(watermarks: &ShardWatermarks) -> Vec<u8> {
748    let mut out = Vec::with_capacity(4 + watermarks.as_slice().len() * 8);
749    out.extend_from_slice(&(watermarks.as_slice().len() as u32).to_le_bytes());
750    for value in watermarks.as_slice() {
751        out.extend_from_slice(&value.to_le_bytes());
752    }
753    out
754}
755
756pub fn decode_ack(bytes: &[u8]) -> Result<ShardWatermarks> {
757    let mut cursor = Cursor::new(bytes);
758    let count = cursor.u32()? as usize;
759    let mut values = Vec::with_capacity(count);
760    for _ in 0..count {
761        values.push(cursor.u64()?);
762    }
763    cursor.finish()?;
764    Ok(ShardWatermarks::from_vec(values))
765}
766
767pub fn encode_snapshot_chunk(chunk: &ReplicationSnapshotChunk) -> Vec<u8> {
768    let mut out = Vec::new();
769    out.extend_from_slice(&chunk.chunk_index.to_le_bytes());
770    out.push(u8::from(chunk.is_last));
771    out.extend_from_slice(&(chunk.watermarks.as_slice().len() as u32).to_le_bytes());
772    for watermark in chunk.watermarks.as_slice() {
773        out.extend_from_slice(&watermark.to_le_bytes());
774    }
775    out.extend_from_slice(&(chunk.entries.len() as u32).to_le_bytes());
776    for entry in &chunk.entries {
777        out.extend_from_slice(&(entry.key.len() as u32).to_le_bytes());
778        out.extend_from_slice(&(entry.value.len() as u32).to_le_bytes());
779        out.extend_from_slice(&entry.expire_at_ms.unwrap_or(EXPIRE_NONE).to_le_bytes());
780        out.extend_from_slice(entry.key.as_ref());
781        out.extend_from_slice(entry.value.as_ref());
782    }
783    out
784}
785
786pub fn decode_snapshot_chunk(bytes: &[u8]) -> Result<ReplicationSnapshotChunk> {
787    let mut cursor = Cursor::new(bytes);
788    let chunk_index = cursor.u64()?;
789    let is_last = cursor.u8()? != 0;
790    let watermark_count = cursor.u32()? as usize;
791    let mut watermarks = Vec::with_capacity(watermark_count);
792    for _ in 0..watermark_count {
793        watermarks.push(cursor.u64()?);
794    }
795    let entry_count = cursor.u32()? as usize;
796    let mut entries = Vec::with_capacity(entry_count);
797    for _ in 0..entry_count {
798        let key_len = cursor.u32()? as usize;
799        let value_len = cursor.u32()? as usize;
800        let expire_raw = cursor.u64()?;
801        let key = cursor.bytes(key_len)?.to_vec();
802        let value = cursor.bytes(value_len)?.to_vec();
803        entries.push(StoredEntry {
804            key,
805            value,
806            expire_at_ms: (expire_raw != EXPIRE_NONE).then_some(expire_raw),
807        });
808    }
809    cursor.finish()?;
810    Ok(ReplicationSnapshotChunk {
811        watermarks: ShardWatermarks::from_vec(watermarks),
812        chunk_index,
813        is_last,
814        entries,
815    })
816}
817
818struct Cursor<'a> {
819    bytes: &'a [u8],
820    pos: usize,
821}
822
823impl<'a> Cursor<'a> {
824    fn new(bytes: &'a [u8]) -> Self {
825        Self { bytes, pos: 0 }
826    }
827
828    fn u8(&mut self) -> Result<u8> {
829        let Some(value) = self.bytes.get(self.pos).copied() else {
830            return Err(FastCacheError::Protocol("FCRP payload is truncated".into()));
831        };
832        self.pos += 1;
833        Ok(value)
834    }
835
836    fn u32(&mut self) -> Result<u32> {
837        let bytes = self.bytes(4)?;
838        Ok(u32::from_le_bytes(bytes.try_into().unwrap()))
839    }
840
841    fn u64(&mut self) -> Result<u64> {
842        let bytes = self.bytes(8)?;
843        Ok(u64::from_le_bytes(bytes.try_into().unwrap()))
844    }
845
846    fn bytes(&mut self, len: usize) -> Result<&'a [u8]> {
847        if self.pos + len > self.bytes.len() {
848            return Err(FastCacheError::Protocol("FCRP payload is truncated".into()));
849        }
850        let bytes = &self.bytes[self.pos..self.pos + len];
851        self.pos += len;
852        Ok(bytes)
853    }
854
855    fn finish(&self) -> Result<()> {
856        if self.pos != self.bytes.len() {
857            return Err(FastCacheError::Protocol(
858                "FCRP payload contains trailing bytes".into(),
859            ));
860        }
861        Ok(())
862    }
863}
864
865#[cfg(test)]
866mod tests {
867    use crate::storage::hash_key_tag;
868
869    use super::*;
870
871    fn sample_mutation(sequence: u64) -> ReplicationMutation {
872        let key = b"alpha".to_vec();
873        ReplicationMutation {
874            shard_id: 2,
875            sequence,
876            timestamp_ms: 42,
877            op: ReplicationMutationOp::Set,
878            key_hash: hash_key(&key),
879            key_tag: hash_key_tag(&key),
880            key: SharedBytes::from(key),
881            value: SharedBytes::from_static(b"value"),
882            expire_at_ms: Some(99),
883        }
884    }
885
886    #[test]
887    fn mutation_batch_round_trips() {
888        let mutations = vec![sample_mutation(1), sample_mutation(2)];
889        let encoded = encode_mutation_batch(&mutations);
890        let decoded = decode_mutation_batch(&encoded).expect("decode");
891        assert_eq!(decoded, mutations);
892    }
893
894    #[test]
895    fn mutation_batch_frame_round_trips_without_payload_copy() {
896        let mutations = vec![sample_mutation(1), sample_mutation(2)];
897        let payload = encode_mutation_batch(&mutations);
898        let (encoded, uncompressed_len) = encode_mutation_batch_frame_with_payload_len(
899            &mutations,
900            payload.len(),
901            ReplicationCompressionMode::None,
902            0,
903        )
904        .expect("encode");
905        let decoded = decode_frame(&encoded).expect("decode frame");
906        assert_eq!(decoded.kind, FrameKind::MutationBatch);
907        assert!(!decoded.compressed);
908        assert_eq!(uncompressed_len, payload.len());
909        assert_eq!(decoded.payload, payload);
910        assert_eq!(
911            decode_mutation_batch(&decoded.payload).expect("decode batch"),
912            mutations
913        );
914    }
915
916    #[test]
917    fn mutation_batch_payload_bytes_visits_frame_backed_values() {
918        let mutations = vec![sample_mutation(1), sample_mutation(2)];
919        let payload = encode_mutation_batch(&mutations);
920        let (encoded, _) = encode_mutation_batch_frame_with_payload_len(
921            &mutations,
922            payload.len(),
923            ReplicationCompressionMode::None,
924            0,
925        )
926        .expect("encode");
927
928        let decoded = decode_frame_payload_bytes(SharedBytes::from(encoded)).expect("decode");
929        let payload_start = decoded.payload.as_ptr() as usize;
930        let payload_end = payload_start + decoded.payload.len();
931        let mut visited = Vec::new();
932        visit_mutation_batch_payload_bytes(decoded.payload, |mutation| {
933            let value_start = mutation.value.as_ptr() as usize;
934            let value_end = value_start + mutation.value.len();
935            assert!(value_start >= payload_start);
936            assert!(value_end <= payload_end);
937            visited.push(mutation.value);
938            Ok(())
939        })
940        .expect("visit");
941
942        assert_eq!(
943            visited,
944            vec![mutations[0].value.clone(), mutations[1].value.clone()]
945        );
946    }
947
948    #[test]
949    fn zstd_frame_round_trips() {
950        let payload = encode_mutation_batch(&[sample_mutation(1)]);
951        let encoded = encode_frame(
952            FrameKind::MutationBatch,
953            ReplicationCompressionMode::Zstd,
954            3,
955            &payload,
956        )
957        .expect("encode");
958        let decoded = decode_frame(&encoded).expect("decode");
959        assert_eq!(decoded.kind, FrameKind::MutationBatch);
960        assert!(decoded.compressed);
961        assert_eq!(decoded.payload, payload);
962    }
963
964    #[test]
965    fn snapshot_chunk_round_trips() {
966        let chunk = ReplicationSnapshotChunk {
967            watermarks: ShardWatermarks::from_vec(vec![1, 2]),
968            chunk_index: 3,
969            is_last: true,
970            entries: vec![StoredEntry {
971                key: b"k".to_vec(),
972                value: b"v".to_vec(),
973                expire_at_ms: None,
974            }],
975        };
976        let encoded = encode_snapshot_chunk(&chunk);
977        let decoded = decode_snapshot_chunk(&encoded).expect("decode");
978        assert_eq!(decoded, chunk);
979    }
980}