1use crate::error::CodecError;
22
23const DEFAULT_BLOCK_SIZE: usize = 4096;
25
26pub fn encode(data: &[u8]) -> Vec<u8> {
34 encode_with_block_size(data, DEFAULT_BLOCK_SIZE)
35}
36
37pub fn encode_with_block_size(data: &[u8], block_size: usize) -> Vec<u8> {
39 let block_size = block_size.max(64); let block_count = if data.is_empty() {
41 0
42 } else {
43 data.len().div_ceil(block_size)
44 };
45
46 let mut out = Vec::with_capacity(12 + block_count * 4 + data.len());
48
49 out.extend_from_slice(&(data.len() as u32).to_le_bytes());
51 out.extend_from_slice(&(block_size as u32).to_le_bytes());
52 out.extend_from_slice(&(block_count as u32).to_le_bytes());
53
54 let lengths_offset = out.len();
56 out.resize(lengths_offset + block_count * 4, 0);
57
58 for (i, chunk) in data.chunks(block_size).enumerate() {
60 let compressed = lz4_flex::compress_prepend_size(chunk);
61 let compressed_len = compressed.len() as u32;
62
63 let table_pos = lengths_offset + i * 4;
65 out[table_pos..table_pos + 4].copy_from_slice(&compressed_len.to_le_bytes());
66
67 out.extend_from_slice(&compressed);
69 }
70
71 out
72}
73
74pub fn decode(data: &[u8]) -> Result<Vec<u8>, CodecError> {
76 let header = read_header(data)?;
77
78 if header.block_count == 0 {
79 return Ok(Vec::new());
80 }
81
82 let mut result = Vec::with_capacity(header.uncompressed_size);
83 let mut block_offset = header.data_offset;
84
85 for i in 0..header.block_count {
86 let compressed_len = header.block_lengths[i];
87 let block_end = block_offset + compressed_len;
88
89 if block_end > data.len() {
90 return Err(CodecError::Truncated {
91 expected: block_end,
92 actual: data.len(),
93 });
94 }
95
96 let block_data = &data[block_offset..block_end];
97 let decompressed = lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
98 CodecError::DecompressFailed {
99 detail: format!("LZ4 block {i}: {e}"),
100 }
101 })?;
102
103 result.extend_from_slice(&decompressed);
104 block_offset = block_end;
105 }
106
107 if result.len() != header.uncompressed_size {
108 return Err(CodecError::Corrupt {
109 detail: format!(
110 "uncompressed size mismatch: header says {}, got {}",
111 header.uncompressed_size,
112 result.len()
113 ),
114 });
115 }
116
117 Ok(result)
118}
119
120pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
124 let header = read_header(data)?;
125
126 if block_idx >= header.block_count {
127 return Err(CodecError::Corrupt {
128 detail: format!(
129 "block index {block_idx} out of range (block_count={})",
130 header.block_count
131 ),
132 });
133 }
134
135 let mut block_offset = header.data_offset;
137 for i in 0..block_idx {
138 block_offset += header.block_lengths[i];
139 }
140
141 let compressed_len = header.block_lengths[block_idx];
142 let block_end = block_offset + compressed_len;
143
144 if block_end > data.len() {
145 return Err(CodecError::Truncated {
146 expected: block_end,
147 actual: data.len(),
148 });
149 }
150
151 let block_data = &data[block_offset..block_end];
152 lz4_flex::decompress_size_prepended(block_data).map_err(|e| CodecError::DecompressFailed {
153 detail: format!("LZ4 block {block_idx}: {e}"),
154 })
155}
156
157struct Lz4Header {
162 uncompressed_size: usize,
163 block_count: usize,
164 block_lengths: Vec<usize>,
165 data_offset: usize,
167}
168
169fn read_header(data: &[u8]) -> Result<Lz4Header, CodecError> {
170 if data.len() < 12 {
171 return Err(CodecError::Truncated {
172 expected: 12,
173 actual: data.len(),
174 });
175 }
176
177 let uncompressed_size = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
178 let _block_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
179 let block_count = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
180
181 let lengths_end = 12 + block_count * 4;
182 if data.len() < lengths_end {
183 return Err(CodecError::Truncated {
184 expected: lengths_end,
185 actual: data.len(),
186 });
187 }
188
189 let block_lengths: Vec<usize> = data[12..lengths_end]
190 .chunks_exact(4)
191 .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as usize)
192 .collect();
193
194 Ok(Lz4Header {
195 uncompressed_size,
196 block_count,
197 block_lengths,
198 data_offset: lengths_end,
199 })
200}
201
202pub struct Lz4Encoder {
208 buf: Vec<u8>,
209 block_size: usize,
210}
211
212impl Lz4Encoder {
213 pub fn new() -> Self {
214 Self {
215 buf: Vec::with_capacity(4096),
216 block_size: DEFAULT_BLOCK_SIZE,
217 }
218 }
219
220 pub fn with_block_size(block_size: usize) -> Self {
221 Self {
222 buf: Vec::with_capacity(block_size),
223 block_size: block_size.max(64),
224 }
225 }
226
227 pub fn push(&mut self, data: &[u8]) {
228 self.buf.extend_from_slice(data);
229 }
230
231 pub fn len(&self) -> usize {
232 self.buf.len()
233 }
234
235 pub fn is_empty(&self) -> bool {
236 self.buf.is_empty()
237 }
238
239 pub fn finish(self) -> Vec<u8> {
240 encode_with_block_size(&self.buf, self.block_size)
241 }
242}
243
244impl Default for Lz4Encoder {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250pub struct Lz4Decoder;
252
253impl Lz4Decoder {
254 pub fn decode_all(data: &[u8]) -> Result<Vec<u8>, CodecError> {
256 decode(data)
257 }
258
259 pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
261 decode_block(data, block_idx)
262 }
263
264 pub fn block_count(data: &[u8]) -> Result<usize, CodecError> {
266 let header = read_header(data)?;
267 Ok(header.block_count)
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn empty_roundtrip() {
277 let encoded = encode(&[]);
278 let decoded = decode(&encoded).unwrap();
279 assert!(decoded.is_empty());
280 }
281
282 #[test]
283 fn small_data_roundtrip() {
284 let data = b"hello world, this is a log message";
285 let encoded = encode(data);
286 let decoded = decode(&encoded).unwrap();
287 assert_eq!(decoded, data);
288 }
289
290 #[test]
291 fn large_data_multiple_blocks() {
292 let mut data = Vec::new();
294 for i in 0..1000 {
295 let line = format!(
296 "2024-01-15T10:30:{:02}.000Z INFO server.handler request_id={} method=GET path=/api/v1/metrics status=200 duration_ms={}\n",
297 i % 60,
298 10000 + i,
299 i * 3 + 1
300 );
301 data.extend_from_slice(line.as_bytes());
302 }
303
304 let encoded = encode(&data);
305 let decoded = decode(&encoded).unwrap();
306 assert_eq!(decoded, data);
307
308 let ratio = data.len() as f64 / encoded.len() as f64;
310 assert!(
311 ratio > 2.0,
312 "expected >2x compression for structured logs, got {ratio:.1}x"
313 );
314 }
315
316 #[test]
317 fn random_access_block() {
318 let data: Vec<u8> = (0..20000).map(|i| (i % 256) as u8).collect();
319 let block_size = 4096;
320 let encoded = encode_with_block_size(&data, block_size);
321
322 let block_count = Lz4Decoder::block_count(&encoded).unwrap();
323 assert_eq!(block_count, data.len().div_ceil(block_size));
324
325 let mut reassembled = Vec::new();
327 for i in 0..block_count {
328 let block = decode_block(&encoded, i).unwrap();
329 reassembled.extend_from_slice(&block);
330 }
331 assert_eq!(reassembled, data);
332 }
333
334 #[test]
335 fn out_of_range_block_index() {
336 let data = b"some data here";
337 let encoded = encode(data);
338 assert!(decode_block(&encoded, 999).is_err());
339 }
340
341 #[test]
342 fn compressible_log_data() {
343 let line = "2024-01-15 ERROR database connection timeout host=db-prod-01 retry=3\n";
345 let data: Vec<u8> = line.as_bytes().repeat(500);
346 let encoded = encode(&data);
347 let decoded = decode(&encoded).unwrap();
348 assert_eq!(decoded, data);
349
350 let ratio = data.len() as f64 / encoded.len() as f64;
351 assert!(
352 ratio > 3.0,
353 "highly repetitive logs should compress >3x, got {ratio:.1}x"
354 );
355 }
356
357 #[test]
358 fn incompressible_data() {
359 let mut data = vec![0u8; 10_000];
361 let mut rng: u64 = 9999;
362 for byte in &mut data {
363 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
364 *byte = (rng >> 33) as u8;
365 }
366 let encoded = encode(&data);
367 let decoded = decode(&encoded).unwrap();
368 assert_eq!(decoded, data);
369 }
370
371 #[test]
372 fn streaming_encoder() {
373 let parts = [b"hello ".as_ref(), b"world".as_ref(), b" test".as_ref()];
374 let full: Vec<u8> = parts.iter().flat_map(|p| p.iter().copied()).collect();
375
376 let mut enc = Lz4Encoder::new();
377 for part in &parts {
378 enc.push(part);
379 }
380 let encoded = enc.finish();
381 let decoded = decode(&encoded).unwrap();
382 assert_eq!(decoded, full);
383 }
384
385 #[test]
386 fn custom_block_size() {
387 let data = vec![42u8; 10_000];
388 let encoded = encode_with_block_size(&data, 1024);
389 let decoded = decode(&encoded).unwrap();
390 assert_eq!(decoded, data);
391
392 let block_count = Lz4Decoder::block_count(&encoded).unwrap();
393 assert_eq!(block_count, 10); }
395
396 #[test]
397 fn truncated_input_errors() {
398 assert!(decode(&[]).is_err());
399 assert!(decode(&[0; 8]).is_err()); }
401}