nodedb_codec/
crdt_compress.rs1use crate::error::CodecError;
13
14#[derive(Debug, Clone)]
16pub struct CrdtOp {
17 pub lamport: u64,
19 pub actor_id: u64,
21 pub content: Vec<u8>,
23}
24
25pub fn encode(ops: &[CrdtOp]) -> Result<Vec<u8>, CodecError> {
39 if ops.is_empty() {
40 return Ok(0u32.to_le_bytes().to_vec());
41 }
42
43 let count = ops.len() as u32;
44
45 let mut actor_dict: Vec<u64> = Vec::new();
47 let mut actor_map = std::collections::HashMap::new();
48 for op in ops {
49 actor_map.entry(op.actor_id).or_insert_with(|| {
50 let idx = actor_dict.len() as u16;
51 actor_dict.push(op.actor_id);
52 idx
53 });
54 }
55
56 let lamports: Vec<i64> = ops.iter().map(|op| op.lamport as i64).collect();
58 let lamport_block = crate::delta::encode(&lamports);
59
60 let use_u8 = actor_dict.len() <= 256;
62 let actor_indices: Vec<u8> = if use_u8 {
63 ops.iter().map(|op| actor_map[&op.actor_id] as u8).collect()
64 } else {
65 ops.iter()
66 .flat_map(|op| actor_map[&op.actor_id].to_le_bytes())
67 .collect()
68 };
69
70 let content_refs: Vec<&[u8]> = ops.iter().map(|op| op.content.as_slice()).collect();
72 let content_block = crate::fsst::encode(&content_refs);
73
74 let mut out = Vec::new();
76 out.extend_from_slice(&count.to_le_bytes());
77 out.extend_from_slice(&(actor_dict.len() as u16).to_le_bytes());
78 for &actor in &actor_dict {
79 out.extend_from_slice(&actor.to_le_bytes());
80 }
81 out.extend_from_slice(&(lamport_block.len() as u32).to_le_bytes());
82 out.extend_from_slice(&lamport_block);
83 out.push(if use_u8 { 1 } else { 2 }); out.extend_from_slice(&(actor_indices.len() as u32).to_le_bytes());
85 out.extend_from_slice(&actor_indices);
86 out.extend_from_slice(&(content_block.len() as u32).to_le_bytes());
87 out.extend_from_slice(&content_block);
88
89 Ok(out)
90}
91
92pub fn decode(data: &[u8]) -> Result<Vec<CrdtOp>, CodecError> {
94 if data.len() < 4 {
95 return Err(CodecError::Truncated {
96 expected: 4,
97 actual: data.len(),
98 });
99 }
100
101 let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
102 if count == 0 {
103 return Ok(Vec::new());
104 }
105
106 let mut pos = 4;
107
108 if pos + 2 > data.len() {
110 return Err(CodecError::Truncated {
111 expected: pos + 2,
112 actual: data.len(),
113 });
114 }
115 let actor_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
116 pos += 2;
117
118 let actor_bytes = actor_count * 8;
119 if pos + actor_bytes > data.len() {
120 return Err(CodecError::Truncated {
121 expected: pos + actor_bytes,
122 actual: data.len(),
123 });
124 }
125 let actor_dict: Vec<u64> = data[pos..pos + actor_bytes]
126 .chunks_exact(8)
127 .map(|c| u64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
128 .collect();
129 pos += actor_bytes;
130
131 if pos + 4 > data.len() {
133 return Err(CodecError::Truncated {
134 expected: pos + 4,
135 actual: data.len(),
136 });
137 }
138 let lamport_size =
139 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
140 pos += 4;
141 if pos + lamport_size > data.len() {
142 return Err(CodecError::Truncated {
143 expected: pos + lamport_size,
144 actual: data.len(),
145 });
146 }
147 let lamports = crate::delta::decode(&data[pos..pos + lamport_size])?;
148 pos += lamport_size;
149
150 if pos >= data.len() {
152 return Err(CodecError::Truncated {
153 expected: pos + 1,
154 actual: data.len(),
155 });
156 }
157 let index_width = data[pos];
158 pos += 1;
159
160 if pos + 4 > data.len() {
161 return Err(CodecError::Truncated {
162 expected: pos + 4,
163 actual: data.len(),
164 });
165 }
166 let index_size =
167 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
168 pos += 4;
169 if pos + index_size > data.len() {
170 return Err(CodecError::Truncated {
171 expected: pos + index_size,
172 actual: data.len(),
173 });
174 }
175 let actor_indices: Vec<usize> = if index_width == 1 {
176 data[pos..pos + index_size]
177 .iter()
178 .map(|&b| b as usize)
179 .collect()
180 } else {
181 data[pos..pos + index_size]
182 .chunks_exact(2)
183 .map(|c| u16::from_le_bytes([c[0], c[1]]) as usize)
184 .collect()
185 };
186 pos += index_size;
187
188 if pos + 4 > data.len() {
190 return Err(CodecError::Truncated {
191 expected: pos + 4,
192 actual: data.len(),
193 });
194 }
195 let content_size =
196 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
197 pos += 4;
198 if pos + content_size > data.len() {
199 return Err(CodecError::Truncated {
200 expected: pos + content_size,
201 actual: data.len(),
202 });
203 }
204 let contents = crate::fsst::decode(&data[pos..pos + content_size])?;
205
206 let mut ops = Vec::with_capacity(count);
208 for i in 0..count {
209 let lamport = if i < lamports.len() {
210 lamports[i] as u64
211 } else {
212 0
213 };
214 let actor_idx = if i < actor_indices.len() {
215 actor_indices[i]
216 } else {
217 0
218 };
219 let actor_id = if actor_idx < actor_dict.len() {
220 actor_dict[actor_idx]
221 } else {
222 0
223 };
224 let content = if i < contents.len() {
225 contents[i].clone()
226 } else {
227 Vec::new()
228 };
229
230 ops.push(CrdtOp {
231 lamport,
232 actor_id,
233 content,
234 });
235 }
236
237 Ok(ops)
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[test]
245 fn empty_roundtrip() {
246 let encoded = encode(&[]).unwrap();
247 let decoded = decode(&encoded).unwrap();
248 assert!(decoded.is_empty());
249 }
250
251 #[test]
252 fn basic_roundtrip() {
253 let ops = vec![
254 CrdtOp {
255 lamport: 1,
256 actor_id: 100,
257 content: b"insert 'hello'".to_vec(),
258 },
259 CrdtOp {
260 lamport: 2,
261 actor_id: 100,
262 content: b"insert ' world'".to_vec(),
263 },
264 CrdtOp {
265 lamport: 3,
266 actor_id: 200,
267 content: b"delete [0..5]".to_vec(),
268 },
269 ];
270 let encoded = encode(&ops).unwrap();
271 let decoded = decode(&encoded).unwrap();
272
273 assert_eq!(decoded.len(), 3);
274 assert_eq!(decoded[0].lamport, 1);
275 assert_eq!(decoded[0].actor_id, 100);
276 assert_eq!(decoded[0].content, b"insert 'hello'");
277 assert_eq!(decoded[2].actor_id, 200);
278 }
279
280 #[test]
281 fn compression_with_many_ops() {
282 let mut ops = Vec::new();
283 for i in 0..1000 {
284 ops.push(CrdtOp {
285 lamport: i,
286 actor_id: i % 5, content: format!("op-{i}: set key_{} = value_{}", i % 50, i).into_bytes(),
288 });
289 }
290 let encoded = encode(&ops).unwrap();
291 let decoded = decode(&encoded).unwrap();
292
293 assert_eq!(decoded.len(), 1000);
294 for (orig, dec) in ops.iter().zip(decoded.iter()) {
295 assert_eq!(orig.lamport, dec.lamport);
296 assert_eq!(orig.actor_id, dec.actor_id);
297 assert_eq!(orig.content, dec.content);
298 }
299
300 let raw_size: usize = ops.iter().map(|op| 16 + op.content.len()).sum();
302 let ratio = raw_size as f64 / encoded.len() as f64;
303 assert!(
304 ratio > 1.2,
305 "CRDT ops should compress >1.2x, got {ratio:.2}x"
306 );
307 }
308
309 #[test]
310 fn actor_dictionary_dedup() {
311 let ops: Vec<CrdtOp> = (0..100)
312 .map(|i| CrdtOp {
313 lamport: i,
314 actor_id: 42, content: b"x".to_vec(),
316 })
317 .collect();
318 let encoded = encode(&ops).unwrap();
319 let decoded = decode(&encoded).unwrap();
320
321 for op in &decoded {
322 assert_eq!(op.actor_id, 42);
323 }
324 }
325}