Skip to main content

nodedb_codec/
crdt_compress.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! CRDT state compression for Loro deltas.
4//!
5//! CRDT operations have a specific data signature:
6//! - **Lamport timestamps**: monotonically increasing → Delta → FastLanes
7//! - **Actor IDs**: purely entropic, but few unique actors → dictionary dedup
8//! - **Content (text edits, JSON)**: contiguous → RLE + FSST for strings
9//!
10//! This module compresses CRDT operation batches for:
11//! - Sync bandwidth reduction (Pattern B)
12//! - Long-term storage efficiency (Pattern C)
13
14use crate::error::CodecError;
15
16/// A CRDT operation for compression.
17#[derive(Debug, Clone)]
18pub struct CrdtOp {
19    /// Lamport timestamp.
20    pub lamport: u64,
21    /// Actor ID (hash or index into actor dictionary).
22    pub actor_id: u64,
23    /// Operation payload (text content, JSON fragment, etc.).
24    pub content: Vec<u8>,
25}
26
27/// Compressed CRDT operation batch.
28///
29/// Wire format:
30/// ```text
31/// [4 bytes] op count (LE u32)
32/// [2 bytes] actor dictionary size (LE u16)
33/// [actor_count × 8 bytes] actor IDs (LE u64)
34/// [N bytes] Delta-encoded Lamport timestamps (nodedb-codec delta format)
35/// [4 bytes] actor_index block size (LE u32)
36/// [M bytes] actor indices (u8 if ≤256 actors, u16 otherwise)
37/// [4 bytes] content block size (LE u32)
38/// [K bytes] FSST-compressed content (newline-delimited)
39/// ```
40pub fn encode(ops: &[CrdtOp]) -> Result<Vec<u8>, CodecError> {
41    if ops.is_empty() {
42        return Ok(0u32.to_le_bytes().to_vec());
43    }
44
45    let count = ops.len() as u32;
46
47    // Build actor dictionary.
48    let mut actor_dict: Vec<u64> = Vec::new();
49    let mut actor_map = std::collections::HashMap::new();
50    for op in ops {
51        actor_map.entry(op.actor_id).or_insert_with(|| {
52            let idx = actor_dict.len() as u16;
53            actor_dict.push(op.actor_id);
54            idx
55        });
56    }
57
58    // Delta-encode Lamport timestamps.
59    let lamports: Vec<i64> = ops.iter().map(|op| op.lamport as i64).collect();
60    let lamport_block = crate::delta::encode(&lamports);
61
62    // Actor indices.
63    let use_u8 = actor_dict.len() <= 256;
64    let actor_indices: Vec<u8> = if use_u8 {
65        ops.iter().map(|op| actor_map[&op.actor_id] as u8).collect()
66    } else {
67        ops.iter()
68            .flat_map(|op| actor_map[&op.actor_id].to_le_bytes())
69            .collect()
70    };
71
72    // FSST-compress content (treat each op's content as a separate string).
73    let content_refs: Vec<&[u8]> = ops.iter().map(|op| op.content.as_slice()).collect();
74    let content_block = crate::fsst::encode(&content_refs);
75
76    // Build output.
77    let mut out = Vec::new();
78    out.extend_from_slice(&count.to_le_bytes());
79    out.extend_from_slice(&(actor_dict.len() as u16).to_le_bytes());
80    for &actor in &actor_dict {
81        out.extend_from_slice(&actor.to_le_bytes());
82    }
83    out.extend_from_slice(&(lamport_block.len() as u32).to_le_bytes());
84    out.extend_from_slice(&lamport_block);
85    out.push(if use_u8 { 1 } else { 2 }); // index width marker
86    out.extend_from_slice(&(actor_indices.len() as u32).to_le_bytes());
87    out.extend_from_slice(&actor_indices);
88    out.extend_from_slice(&(content_block.len() as u32).to_le_bytes());
89    out.extend_from_slice(&content_block);
90
91    Ok(out)
92}
93
94/// Decode compressed CRDT operations.
95pub fn decode(data: &[u8]) -> Result<Vec<CrdtOp>, CodecError> {
96    if data.len() < 4 {
97        return Err(CodecError::Truncated {
98            expected: 4,
99            actual: data.len(),
100        });
101    }
102
103    let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
104    if count == 0 {
105        return Ok(Vec::new());
106    }
107
108    let mut pos = 4;
109
110    // Actor dictionary.
111    if pos + 2 > data.len() {
112        return Err(CodecError::Truncated {
113            expected: pos + 2,
114            actual: data.len(),
115        });
116    }
117    let actor_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
118    pos += 2;
119
120    let actor_bytes = actor_count * 8;
121    if pos + actor_bytes > data.len() {
122        return Err(CodecError::Truncated {
123            expected: pos + actor_bytes,
124            actual: data.len(),
125        });
126    }
127    let actor_dict: Vec<u64> = data[pos..pos + actor_bytes]
128        .chunks_exact(8)
129        .map(|c| u64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
130        .collect();
131    pos += actor_bytes;
132
133    // Lamport block.
134    if pos + 4 > data.len() {
135        return Err(CodecError::Truncated {
136            expected: pos + 4,
137            actual: data.len(),
138        });
139    }
140    let lamport_size =
141        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
142    pos += 4;
143    if pos + lamport_size > data.len() {
144        return Err(CodecError::Truncated {
145            expected: pos + lamport_size,
146            actual: data.len(),
147        });
148    }
149    let lamports = crate::delta::decode(&data[pos..pos + lamport_size])?;
150    pos += lamport_size;
151
152    // Actor index width + data.
153    if pos >= data.len() {
154        return Err(CodecError::Truncated {
155            expected: pos + 1,
156            actual: data.len(),
157        });
158    }
159    let index_width = data[pos];
160    pos += 1;
161
162    if pos + 4 > data.len() {
163        return Err(CodecError::Truncated {
164            expected: pos + 4,
165            actual: data.len(),
166        });
167    }
168    let index_size =
169        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
170    pos += 4;
171    if pos + index_size > data.len() {
172        return Err(CodecError::Truncated {
173            expected: pos + index_size,
174            actual: data.len(),
175        });
176    }
177    let actor_indices: Vec<usize> = if index_width == 1 {
178        data[pos..pos + index_size]
179            .iter()
180            .map(|&b| b as usize)
181            .collect()
182    } else {
183        data[pos..pos + index_size]
184            .chunks_exact(2)
185            .map(|c| u16::from_le_bytes([c[0], c[1]]) as usize)
186            .collect()
187    };
188    pos += index_size;
189
190    // Content block.
191    if pos + 4 > data.len() {
192        return Err(CodecError::Truncated {
193            expected: pos + 4,
194            actual: data.len(),
195        });
196    }
197    let content_size =
198        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
199    pos += 4;
200    if pos + content_size > data.len() {
201        return Err(CodecError::Truncated {
202            expected: pos + content_size,
203            actual: data.len(),
204        });
205    }
206    let contents = crate::fsst::decode(&data[pos..pos + content_size])?;
207
208    // Reconstruct ops.
209    let mut ops = Vec::with_capacity(count);
210    for i in 0..count {
211        let lamport = if i < lamports.len() {
212            lamports[i] as u64
213        } else {
214            0
215        };
216        let actor_idx = if i < actor_indices.len() {
217            actor_indices[i]
218        } else {
219            0
220        };
221        let actor_id = if actor_idx < actor_dict.len() {
222            actor_dict[actor_idx]
223        } else {
224            0
225        };
226        let content = if i < contents.len() {
227            contents[i].clone()
228        } else {
229            Vec::new()
230        };
231
232        ops.push(CrdtOp {
233            lamport,
234            actor_id,
235            content,
236        });
237    }
238
239    Ok(ops)
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn empty_roundtrip() {
248        let encoded = encode(&[]).unwrap();
249        let decoded = decode(&encoded).unwrap();
250        assert!(decoded.is_empty());
251    }
252
253    #[test]
254    fn basic_roundtrip() {
255        let ops = vec![
256            CrdtOp {
257                lamport: 1,
258                actor_id: 100,
259                content: b"insert 'hello'".to_vec(),
260            },
261            CrdtOp {
262                lamport: 2,
263                actor_id: 100,
264                content: b"insert ' world'".to_vec(),
265            },
266            CrdtOp {
267                lamport: 3,
268                actor_id: 200,
269                content: b"delete [0..5]".to_vec(),
270            },
271        ];
272        let encoded = encode(&ops).unwrap();
273        let decoded = decode(&encoded).unwrap();
274
275        assert_eq!(decoded.len(), 3);
276        assert_eq!(decoded[0].lamport, 1);
277        assert_eq!(decoded[0].actor_id, 100);
278        assert_eq!(decoded[0].content, b"insert 'hello'");
279        assert_eq!(decoded[2].actor_id, 200);
280    }
281
282    #[test]
283    fn compression_with_many_ops() {
284        let mut ops = Vec::new();
285        for i in 0..1000 {
286            ops.push(CrdtOp {
287                lamport: i,
288                actor_id: i % 5, // 5 actors
289                content: format!("op-{i}: set key_{} = value_{}", i % 50, i).into_bytes(),
290            });
291        }
292        let encoded = encode(&ops).unwrap();
293        let decoded = decode(&encoded).unwrap();
294
295        assert_eq!(decoded.len(), 1000);
296        for (orig, dec) in ops.iter().zip(decoded.iter()) {
297            assert_eq!(orig.lamport, dec.lamport);
298            assert_eq!(orig.actor_id, dec.actor_id);
299            assert_eq!(orig.content, dec.content);
300        }
301
302        // Should compress well — monotonic lamports + few actors + repetitive content.
303        let raw_size: usize = ops.iter().map(|op| 16 + op.content.len()).sum();
304        let ratio = raw_size as f64 / encoded.len() as f64;
305        assert!(
306            ratio > 1.2,
307            "CRDT ops should compress >1.2x, got {ratio:.2}x"
308        );
309    }
310
311    #[test]
312    fn actor_dictionary_dedup() {
313        let ops: Vec<CrdtOp> = (0..100)
314            .map(|i| CrdtOp {
315                lamport: i,
316                actor_id: 42, // single actor
317                content: b"x".to_vec(),
318            })
319            .collect();
320        let encoded = encode(&ops).unwrap();
321        let decoded = decode(&encoded).unwrap();
322
323        for op in &decoded {
324            assert_eq!(op.actor_id, 42);
325        }
326    }
327}