Skip to main content

crdt_migrate/
envelope.rs

1use alloc::vec::Vec;
2use core::fmt;
3
4/// Magic byte identifying crdt-kit serialized data.
5pub const MAGIC_BYTE: u8 = 0xCF;
6
7/// Size of the version envelope header in bytes.
8pub const ENVELOPE_HEADER_SIZE: usize = 3;
9
10/// Identifies the type of CRDT stored in the envelope.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
12#[repr(u8)]
13pub enum CrdtType {
14    /// Grow-only counter.
15    GCounter = 1,
16    /// Positive-negative counter.
17    PNCounter = 2,
18    /// Grow-only set.
19    GSet = 3,
20    /// Two-phase set.
21    TwoPSet = 4,
22    /// Last-writer-wins register.
23    LWWRegister = 5,
24    /// Multi-value register.
25    MVRegister = 6,
26    /// Observed-remove set.
27    ORSet = 7,
28    /// Replicated Growable Array.
29    Rga = 8,
30    /// Collaborative text.
31    TextCrdt = 9,
32    /// User-defined composite schema.
33    Custom = 255,
34}
35
36impl CrdtType {
37    /// Convert from a raw byte.
38    pub fn from_byte(b: u8) -> Option<Self> {
39        match b {
40            1 => Some(Self::GCounter),
41            2 => Some(Self::PNCounter),
42            3 => Some(Self::GSet),
43            4 => Some(Self::TwoPSet),
44            5 => Some(Self::LWWRegister),
45            6 => Some(Self::MVRegister),
46            7 => Some(Self::ORSet),
47            8 => Some(Self::Rga),
48            9 => Some(Self::TextCrdt),
49            255 => Some(Self::Custom),
50            _ => None,
51        }
52    }
53}
54
55/// A version envelope wrapping serialized CRDT data.
56///
57/// Binary format (3 bytes overhead):
58/// ```text
59/// [MAGIC: 0xCF][VERSION: u8][CRDT_TYPE: u8][PAYLOAD: N bytes]
60/// ```
61///
62/// # Example
63///
64/// ```
65/// use crdt_migrate::{VersionedEnvelope, CrdtType};
66///
67/// let data = b"some serialized crdt state";
68/// let envelope = VersionedEnvelope::new(1, CrdtType::GCounter, data.to_vec());
69///
70/// let bytes = envelope.to_bytes();
71/// let decoded = VersionedEnvelope::from_bytes(&bytes).unwrap();
72///
73/// assert_eq!(decoded.version, 1);
74/// assert_eq!(decoded.crdt_type, CrdtType::GCounter);
75/// assert_eq!(decoded.payload, data);
76/// ```
77#[derive(Debug, Clone, PartialEq)]
78pub struct VersionedEnvelope {
79    /// Schema version of the payload.
80    pub version: u8,
81    /// Type of CRDT contained.
82    pub crdt_type: CrdtType,
83    /// Serialized CRDT data.
84    pub payload: Vec<u8>,
85}
86
87/// Error parsing a version envelope.
88#[derive(Debug, Clone, PartialEq)]
89pub enum EnvelopeError {
90    /// Data is too short to contain a valid envelope.
91    TooShort,
92    /// Missing or incorrect magic byte.
93    InvalidMagic(u8),
94    /// Unknown CRDT type byte.
95    UnknownCrdtType(u8),
96}
97
98impl fmt::Display for EnvelopeError {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        match self {
101            Self::TooShort => write!(f, "data too short for version envelope"),
102            Self::InvalidMagic(b) => write!(f, "invalid magic byte: 0x{b:02X}, expected 0xCF"),
103            Self::UnknownCrdtType(b) => write!(f, "unknown CRDT type: {b}"),
104        }
105    }
106}
107
108impl VersionedEnvelope {
109    /// Create a new envelope.
110    pub fn new(version: u8, crdt_type: CrdtType, payload: Vec<u8>) -> Self {
111        Self {
112            version,
113            crdt_type,
114            payload,
115        }
116    }
117
118    /// Serialize the envelope to bytes.
119    pub fn to_bytes(&self) -> Vec<u8> {
120        let mut bytes = Vec::with_capacity(ENVELOPE_HEADER_SIZE + self.payload.len());
121        bytes.push(MAGIC_BYTE);
122        bytes.push(self.version);
123        bytes.push(self.crdt_type as u8);
124        bytes.extend_from_slice(&self.payload);
125        bytes
126    }
127
128    /// Parse an envelope from bytes.
129    pub fn from_bytes(data: &[u8]) -> Result<Self, EnvelopeError> {
130        if data.len() < ENVELOPE_HEADER_SIZE {
131            return Err(EnvelopeError::TooShort);
132        }
133
134        if data[0] != MAGIC_BYTE {
135            return Err(EnvelopeError::InvalidMagic(data[0]));
136        }
137
138        let version = data[1];
139        let crdt_type =
140            CrdtType::from_byte(data[2]).ok_or(EnvelopeError::UnknownCrdtType(data[2]))?;
141        let payload = data[ENVELOPE_HEADER_SIZE..].to_vec();
142
143        Ok(Self {
144            version,
145            crdt_type,
146            payload,
147        })
148    }
149
150    /// Peek at the version without fully parsing the envelope.
151    pub fn peek_version(data: &[u8]) -> Result<u8, EnvelopeError> {
152        if data.len() < 2 {
153            return Err(EnvelopeError::TooShort);
154        }
155        if data[0] != MAGIC_BYTE {
156            return Err(EnvelopeError::InvalidMagic(data[0]));
157        }
158        Ok(data[1])
159    }
160
161    /// Check if bytes look like a versioned envelope (starts with magic byte).
162    pub fn is_versioned(data: &[u8]) -> bool {
163        data.first() == Some(&MAGIC_BYTE)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn roundtrip() {
173        let original = VersionedEnvelope::new(3, CrdtType::ORSet, b"test-payload".to_vec());
174        let bytes = original.to_bytes();
175        let decoded = VersionedEnvelope::from_bytes(&bytes).unwrap();
176        assert_eq!(original, decoded);
177    }
178
179    #[test]
180    fn header_size() {
181        let envelope = VersionedEnvelope::new(1, CrdtType::GCounter, vec![]);
182        let bytes = envelope.to_bytes();
183        assert_eq!(bytes.len(), ENVELOPE_HEADER_SIZE);
184    }
185
186    #[test]
187    fn peek_version() {
188        let envelope = VersionedEnvelope::new(42, CrdtType::TextCrdt, b"data".to_vec());
189        let bytes = envelope.to_bytes();
190        assert_eq!(VersionedEnvelope::peek_version(&bytes).unwrap(), 42);
191    }
192
193    #[test]
194    fn is_versioned() {
195        assert!(VersionedEnvelope::is_versioned(&[MAGIC_BYTE, 1, 1]));
196        assert!(!VersionedEnvelope::is_versioned(&[0x00, 1, 1]));
197        assert!(!VersionedEnvelope::is_versioned(&[]));
198    }
199
200    #[test]
201    fn error_too_short() {
202        assert_eq!(
203            VersionedEnvelope::from_bytes(&[MAGIC_BYTE]),
204            Err(EnvelopeError::TooShort)
205        );
206    }
207
208    #[test]
209    fn error_invalid_magic() {
210        assert_eq!(
211            VersionedEnvelope::from_bytes(&[0xAB, 1, 1]),
212            Err(EnvelopeError::InvalidMagic(0xAB))
213        );
214    }
215
216    #[test]
217    fn error_unknown_crdt_type() {
218        assert_eq!(
219            VersionedEnvelope::from_bytes(&[MAGIC_BYTE, 1, 200]),
220            Err(EnvelopeError::UnknownCrdtType(200))
221        );
222    }
223
224    #[test]
225    fn all_crdt_types_roundtrip() {
226        let types = [
227            CrdtType::GCounter,
228            CrdtType::PNCounter,
229            CrdtType::GSet,
230            CrdtType::TwoPSet,
231            CrdtType::LWWRegister,
232            CrdtType::MVRegister,
233            CrdtType::ORSet,
234            CrdtType::Rga,
235            CrdtType::TextCrdt,
236            CrdtType::Custom,
237        ];
238        for ct in types {
239            let envelope = VersionedEnvelope::new(1, ct, b"x".to_vec());
240            let decoded = VersionedEnvelope::from_bytes(&envelope.to_bytes()).unwrap();
241            assert_eq!(decoded.crdt_type, ct);
242        }
243    }
244}