Skip to main content

nodedb_codec/
crdt_compress.rs

1//! CRDT state compression for Loro deltas.
2//!
3//! CRDT operations have a specific data signature:
4//! - **Lamport timestamps**: monotonically increasing → Delta → FastLanes
5//! - **Actor IDs**: purely entropic, but few unique actors → dictionary dedup
6//! - **Content (text edits, JSON)**: contiguous → RLE + FSST for strings
7//!
8//! This module compresses CRDT operation batches for:
9//! - Sync bandwidth reduction (Pattern B)
10//! - Long-term storage efficiency (Pattern C)
11
12use crate::error::CodecError;
13
14/// A CRDT operation for compression.
15#[derive(Debug, Clone)]
16pub struct CrdtOp {
17    /// Lamport timestamp.
18    pub lamport: u64,
19    /// Actor ID (hash or index into actor dictionary).
20    pub actor_id: u64,
21    /// Operation payload (text content, JSON fragment, etc.).
22    pub content: Vec<u8>,
23}
24
25/// Compressed CRDT operation batch.
26///
27/// Wire format:
28/// ```text
29/// [4 bytes] op count (LE u32)
30/// [2 bytes] actor dictionary size (LE u16)
31/// [actor_count × 8 bytes] actor IDs (LE u64)
32/// [N bytes] Delta-encoded Lamport timestamps (nodedb-codec delta format)
33/// [4 bytes] actor_index block size (LE u32)
34/// [M bytes] actor indices (u8 if ≤256 actors, u16 otherwise)
35/// [4 bytes] content block size (LE u32)
36/// [K bytes] FSST-compressed content (newline-delimited)
37/// ```
38pub fn encode(ops: &[CrdtOp]) -> Result<Vec<u8>, CodecError> {
39    if ops.is_empty() {
40        return Ok(0u32.to_le_bytes().to_vec());
41    }
42
43    let count = ops.len() as u32;
44
45    // Build actor dictionary.
46    let mut actor_dict: Vec<u64> = Vec::new();
47    let mut actor_map = std::collections::HashMap::new();
48    for op in ops {
49        actor_map.entry(op.actor_id).or_insert_with(|| {
50            let idx = actor_dict.len() as u16;
51            actor_dict.push(op.actor_id);
52            idx
53        });
54    }
55
56    // Delta-encode Lamport timestamps.
57    let lamports: Vec<i64> = ops.iter().map(|op| op.lamport as i64).collect();
58    let lamport_block = crate::delta::encode(&lamports);
59
60    // Actor indices.
61    let use_u8 = actor_dict.len() <= 256;
62    let actor_indices: Vec<u8> = if use_u8 {
63        ops.iter().map(|op| actor_map[&op.actor_id] as u8).collect()
64    } else {
65        ops.iter()
66            .flat_map(|op| actor_map[&op.actor_id].to_le_bytes())
67            .collect()
68    };
69
70    // FSST-compress content (treat each op's content as a separate string).
71    let content_refs: Vec<&[u8]> = ops.iter().map(|op| op.content.as_slice()).collect();
72    let content_block = crate::fsst::encode(&content_refs);
73
74    // Build output.
75    let mut out = Vec::new();
76    out.extend_from_slice(&count.to_le_bytes());
77    out.extend_from_slice(&(actor_dict.len() as u16).to_le_bytes());
78    for &actor in &actor_dict {
79        out.extend_from_slice(&actor.to_le_bytes());
80    }
81    out.extend_from_slice(&(lamport_block.len() as u32).to_le_bytes());
82    out.extend_from_slice(&lamport_block);
83    out.push(if use_u8 { 1 } else { 2 }); // index width marker
84    out.extend_from_slice(&(actor_indices.len() as u32).to_le_bytes());
85    out.extend_from_slice(&actor_indices);
86    out.extend_from_slice(&(content_block.len() as u32).to_le_bytes());
87    out.extend_from_slice(&content_block);
88
89    Ok(out)
90}
91
92/// Decode compressed CRDT operations.
93pub fn decode(data: &[u8]) -> Result<Vec<CrdtOp>, CodecError> {
94    if data.len() < 4 {
95        return Err(CodecError::Truncated {
96            expected: 4,
97            actual: data.len(),
98        });
99    }
100
101    let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
102    if count == 0 {
103        return Ok(Vec::new());
104    }
105
106    let mut pos = 4;
107
108    // Actor dictionary.
109    if pos + 2 > data.len() {
110        return Err(CodecError::Truncated {
111            expected: pos + 2,
112            actual: data.len(),
113        });
114    }
115    let actor_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
116    pos += 2;
117
118    let actor_bytes = actor_count * 8;
119    if pos + actor_bytes > data.len() {
120        return Err(CodecError::Truncated {
121            expected: pos + actor_bytes,
122            actual: data.len(),
123        });
124    }
125    let actor_dict: Vec<u64> = data[pos..pos + actor_bytes]
126        .chunks_exact(8)
127        .map(|c| u64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
128        .collect();
129    pos += actor_bytes;
130
131    // Lamport block.
132    if pos + 4 > data.len() {
133        return Err(CodecError::Truncated {
134            expected: pos + 4,
135            actual: data.len(),
136        });
137    }
138    let lamport_size =
139        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
140    pos += 4;
141    if pos + lamport_size > data.len() {
142        return Err(CodecError::Truncated {
143            expected: pos + lamport_size,
144            actual: data.len(),
145        });
146    }
147    let lamports = crate::delta::decode(&data[pos..pos + lamport_size])?;
148    pos += lamport_size;
149
150    // Actor index width + data.
151    if pos >= data.len() {
152        return Err(CodecError::Truncated {
153            expected: pos + 1,
154            actual: data.len(),
155        });
156    }
157    let index_width = data[pos];
158    pos += 1;
159
160    if pos + 4 > data.len() {
161        return Err(CodecError::Truncated {
162            expected: pos + 4,
163            actual: data.len(),
164        });
165    }
166    let index_size =
167        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
168    pos += 4;
169    if pos + index_size > data.len() {
170        return Err(CodecError::Truncated {
171            expected: pos + index_size,
172            actual: data.len(),
173        });
174    }
175    let actor_indices: Vec<usize> = if index_width == 1 {
176        data[pos..pos + index_size]
177            .iter()
178            .map(|&b| b as usize)
179            .collect()
180    } else {
181        data[pos..pos + index_size]
182            .chunks_exact(2)
183            .map(|c| u16::from_le_bytes([c[0], c[1]]) as usize)
184            .collect()
185    };
186    pos += index_size;
187
188    // Content block.
189    if pos + 4 > data.len() {
190        return Err(CodecError::Truncated {
191            expected: pos + 4,
192            actual: data.len(),
193        });
194    }
195    let content_size =
196        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
197    pos += 4;
198    if pos + content_size > data.len() {
199        return Err(CodecError::Truncated {
200            expected: pos + content_size,
201            actual: data.len(),
202        });
203    }
204    let contents = crate::fsst::decode(&data[pos..pos + content_size])?;
205
206    // Reconstruct ops.
207    let mut ops = Vec::with_capacity(count);
208    for i in 0..count {
209        let lamport = if i < lamports.len() {
210            lamports[i] as u64
211        } else {
212            0
213        };
214        let actor_idx = if i < actor_indices.len() {
215            actor_indices[i]
216        } else {
217            0
218        };
219        let actor_id = if actor_idx < actor_dict.len() {
220            actor_dict[actor_idx]
221        } else {
222            0
223        };
224        let content = if i < contents.len() {
225            contents[i].clone()
226        } else {
227            Vec::new()
228        };
229
230        ops.push(CrdtOp {
231            lamport,
232            actor_id,
233            content,
234        });
235    }
236
237    Ok(ops)
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[test]
245    fn empty_roundtrip() {
246        let encoded = encode(&[]).unwrap();
247        let decoded = decode(&encoded).unwrap();
248        assert!(decoded.is_empty());
249    }
250
251    #[test]
252    fn basic_roundtrip() {
253        let ops = vec![
254            CrdtOp {
255                lamport: 1,
256                actor_id: 100,
257                content: b"insert 'hello'".to_vec(),
258            },
259            CrdtOp {
260                lamport: 2,
261                actor_id: 100,
262                content: b"insert ' world'".to_vec(),
263            },
264            CrdtOp {
265                lamport: 3,
266                actor_id: 200,
267                content: b"delete [0..5]".to_vec(),
268            },
269        ];
270        let encoded = encode(&ops).unwrap();
271        let decoded = decode(&encoded).unwrap();
272
273        assert_eq!(decoded.len(), 3);
274        assert_eq!(decoded[0].lamport, 1);
275        assert_eq!(decoded[0].actor_id, 100);
276        assert_eq!(decoded[0].content, b"insert 'hello'");
277        assert_eq!(decoded[2].actor_id, 200);
278    }
279
280    #[test]
281    fn compression_with_many_ops() {
282        let mut ops = Vec::new();
283        for i in 0..1000 {
284            ops.push(CrdtOp {
285                lamport: i,
286                actor_id: i % 5, // 5 actors
287                content: format!("op-{i}: set key_{} = value_{}", i % 50, i).into_bytes(),
288            });
289        }
290        let encoded = encode(&ops).unwrap();
291        let decoded = decode(&encoded).unwrap();
292
293        assert_eq!(decoded.len(), 1000);
294        for (orig, dec) in ops.iter().zip(decoded.iter()) {
295            assert_eq!(orig.lamport, dec.lamport);
296            assert_eq!(orig.actor_id, dec.actor_id);
297            assert_eq!(orig.content, dec.content);
298        }
299
300        // Should compress well — monotonic lamports + few actors + repetitive content.
301        let raw_size: usize = ops.iter().map(|op| 16 + op.content.len()).sum();
302        let ratio = raw_size as f64 / encoded.len() as f64;
303        assert!(
304            ratio > 1.2,
305            "CRDT ops should compress >1.2x, got {ratio:.2}x"
306        );
307    }
308
309    #[test]
310    fn actor_dictionary_dedup() {
311        let ops: Vec<CrdtOp> = (0..100)
312            .map(|i| CrdtOp {
313                lamport: i,
314                actor_id: 42, // single actor
315                content: b"x".to_vec(),
316            })
317            .collect();
318        let encoded = encode(&ops).unwrap();
319        let decoded = decode(&encoded).unwrap();
320
321        for op in &decoded {
322            assert_eq!(op.actor_id, 42);
323        }
324    }
325}