nodedb_codec/
crdt_compress.rs1use crate::error::CodecError;
15
16#[derive(Debug, Clone)]
18pub struct CrdtOp {
19 pub lamport: u64,
21 pub actor_id: u64,
23 pub content: Vec<u8>,
25}
26
27pub fn encode(ops: &[CrdtOp]) -> Result<Vec<u8>, CodecError> {
41 if ops.is_empty() {
42 return Ok(0u32.to_le_bytes().to_vec());
43 }
44
45 let count = ops.len() as u32;
46
47 let mut actor_dict: Vec<u64> = Vec::new();
49 let mut actor_map = std::collections::HashMap::new();
50 for op in ops {
51 actor_map.entry(op.actor_id).or_insert_with(|| {
52 let idx = actor_dict.len() as u16;
53 actor_dict.push(op.actor_id);
54 idx
55 });
56 }
57
58 let lamports: Vec<i64> = ops.iter().map(|op| op.lamport as i64).collect();
60 let lamport_block = crate::delta::encode(&lamports);
61
62 let use_u8 = actor_dict.len() <= 256;
64 let actor_indices: Vec<u8> = if use_u8 {
65 ops.iter().map(|op| actor_map[&op.actor_id] as u8).collect()
66 } else {
67 ops.iter()
68 .flat_map(|op| actor_map[&op.actor_id].to_le_bytes())
69 .collect()
70 };
71
72 let content_refs: Vec<&[u8]> = ops.iter().map(|op| op.content.as_slice()).collect();
74 let content_block = crate::fsst::encode(&content_refs);
75
76 let mut out = Vec::new();
78 out.extend_from_slice(&count.to_le_bytes());
79 out.extend_from_slice(&(actor_dict.len() as u16).to_le_bytes());
80 for &actor in &actor_dict {
81 out.extend_from_slice(&actor.to_le_bytes());
82 }
83 out.extend_from_slice(&(lamport_block.len() as u32).to_le_bytes());
84 out.extend_from_slice(&lamport_block);
85 out.push(if use_u8 { 1 } else { 2 }); out.extend_from_slice(&(actor_indices.len() as u32).to_le_bytes());
87 out.extend_from_slice(&actor_indices);
88 out.extend_from_slice(&(content_block.len() as u32).to_le_bytes());
89 out.extend_from_slice(&content_block);
90
91 Ok(out)
92}
93
94pub fn decode(data: &[u8]) -> Result<Vec<CrdtOp>, CodecError> {
96 if data.len() < 4 {
97 return Err(CodecError::Truncated {
98 expected: 4,
99 actual: data.len(),
100 });
101 }
102
103 let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
104 if count == 0 {
105 return Ok(Vec::new());
106 }
107
108 let mut pos = 4;
109
110 if pos + 2 > data.len() {
112 return Err(CodecError::Truncated {
113 expected: pos + 2,
114 actual: data.len(),
115 });
116 }
117 let actor_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
118 pos += 2;
119
120 let actor_bytes = actor_count * 8;
121 if pos + actor_bytes > data.len() {
122 return Err(CodecError::Truncated {
123 expected: pos + actor_bytes,
124 actual: data.len(),
125 });
126 }
127 let actor_dict: Vec<u64> = data[pos..pos + actor_bytes]
128 .chunks_exact(8)
129 .map(|c| u64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
130 .collect();
131 pos += actor_bytes;
132
133 if pos + 4 > data.len() {
135 return Err(CodecError::Truncated {
136 expected: pos + 4,
137 actual: data.len(),
138 });
139 }
140 let lamport_size =
141 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
142 pos += 4;
143 if pos + lamport_size > data.len() {
144 return Err(CodecError::Truncated {
145 expected: pos + lamport_size,
146 actual: data.len(),
147 });
148 }
149 let lamports = crate::delta::decode(&data[pos..pos + lamport_size])?;
150 pos += lamport_size;
151
152 if pos >= data.len() {
154 return Err(CodecError::Truncated {
155 expected: pos + 1,
156 actual: data.len(),
157 });
158 }
159 let index_width = data[pos];
160 pos += 1;
161
162 if pos + 4 > data.len() {
163 return Err(CodecError::Truncated {
164 expected: pos + 4,
165 actual: data.len(),
166 });
167 }
168 let index_size =
169 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
170 pos += 4;
171 if pos + index_size > data.len() {
172 return Err(CodecError::Truncated {
173 expected: pos + index_size,
174 actual: data.len(),
175 });
176 }
177 let actor_indices: Vec<usize> = if index_width == 1 {
178 data[pos..pos + index_size]
179 .iter()
180 .map(|&b| b as usize)
181 .collect()
182 } else {
183 data[pos..pos + index_size]
184 .chunks_exact(2)
185 .map(|c| u16::from_le_bytes([c[0], c[1]]) as usize)
186 .collect()
187 };
188 pos += index_size;
189
190 if pos + 4 > data.len() {
192 return Err(CodecError::Truncated {
193 expected: pos + 4,
194 actual: data.len(),
195 });
196 }
197 let content_size =
198 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
199 pos += 4;
200 if pos + content_size > data.len() {
201 return Err(CodecError::Truncated {
202 expected: pos + content_size,
203 actual: data.len(),
204 });
205 }
206 let contents = crate::fsst::decode(&data[pos..pos + content_size])?;
207
208 let mut ops = Vec::with_capacity(count);
210 for i in 0..count {
211 let lamport = if i < lamports.len() {
212 lamports[i] as u64
213 } else {
214 0
215 };
216 let actor_idx = if i < actor_indices.len() {
217 actor_indices[i]
218 } else {
219 0
220 };
221 let actor_id = if actor_idx < actor_dict.len() {
222 actor_dict[actor_idx]
223 } else {
224 0
225 };
226 let content = if i < contents.len() {
227 contents[i].clone()
228 } else {
229 Vec::new()
230 };
231
232 ops.push(CrdtOp {
233 lamport,
234 actor_id,
235 content,
236 });
237 }
238
239 Ok(ops)
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn empty_roundtrip() {
248 let encoded = encode(&[]).unwrap();
249 let decoded = decode(&encoded).unwrap();
250 assert!(decoded.is_empty());
251 }
252
253 #[test]
254 fn basic_roundtrip() {
255 let ops = vec![
256 CrdtOp {
257 lamport: 1,
258 actor_id: 100,
259 content: b"insert 'hello'".to_vec(),
260 },
261 CrdtOp {
262 lamport: 2,
263 actor_id: 100,
264 content: b"insert ' world'".to_vec(),
265 },
266 CrdtOp {
267 lamport: 3,
268 actor_id: 200,
269 content: b"delete [0..5]".to_vec(),
270 },
271 ];
272 let encoded = encode(&ops).unwrap();
273 let decoded = decode(&encoded).unwrap();
274
275 assert_eq!(decoded.len(), 3);
276 assert_eq!(decoded[0].lamport, 1);
277 assert_eq!(decoded[0].actor_id, 100);
278 assert_eq!(decoded[0].content, b"insert 'hello'");
279 assert_eq!(decoded[2].actor_id, 200);
280 }
281
282 #[test]
283 fn compression_with_many_ops() {
284 let mut ops = Vec::new();
285 for i in 0..1000 {
286 ops.push(CrdtOp {
287 lamport: i,
288 actor_id: i % 5, content: format!("op-{i}: set key_{} = value_{}", i % 50, i).into_bytes(),
290 });
291 }
292 let encoded = encode(&ops).unwrap();
293 let decoded = decode(&encoded).unwrap();
294
295 assert_eq!(decoded.len(), 1000);
296 for (orig, dec) in ops.iter().zip(decoded.iter()) {
297 assert_eq!(orig.lamport, dec.lamport);
298 assert_eq!(orig.actor_id, dec.actor_id);
299 assert_eq!(orig.content, dec.content);
300 }
301
302 let raw_size: usize = ops.iter().map(|op| 16 + op.content.len()).sum();
304 let ratio = raw_size as f64 / encoded.len() as f64;
305 assert!(
306 ratio > 1.2,
307 "CRDT ops should compress >1.2x, got {ratio:.2}x"
308 );
309 }
310
311 #[test]
312 fn actor_dictionary_dedup() {
313 let ops: Vec<CrdtOp> = (0..100)
314 .map(|i| CrdtOp {
315 lamport: i,
316 actor_id: 42, content: b"x".to_vec(),
318 })
319 .collect();
320 let encoded = encode(&ops).unwrap();
321 let decoded = decode(&encoded).unwrap();
322
323 for op in &decoded {
324 assert_eq!(op.actor_id, 42);
325 }
326 }
327}