1use crate::ColumnCodec;
15use crate::error::CodecError;
16
17pub fn encode_i64_pipeline(values: &[i64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
26 match codec {
27 ColumnCodec::DeltaFastLanesLz4 => encode_delta_fastlanes_lz4(values),
29 ColumnCodec::FastLanesLz4 => encode_fastlanes_lz4_i64(values),
30 ColumnCodec::PcodecLz4 => {
31 let pco_bytes = crate::pcodec::encode_i64(values)?;
32 Ok(crate::lz4::encode(&pco_bytes))
33 }
34 ColumnCodec::DeltaFastLanesRans => encode_delta_fastlanes_rans(values),
36 ColumnCodec::DoubleDelta => Ok(crate::double_delta::encode(values)),
38 ColumnCodec::Delta => Ok(crate::delta::encode(values)),
39 ColumnCodec::Gorilla => Ok(crate::gorilla::encode_timestamps(values)),
40 ColumnCodec::Raw => {
41 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
42 Ok(crate::raw::encode(&raw))
43 }
44 ColumnCodec::Lz4 => {
45 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
46 Ok(crate::lz4::encode(&raw))
47 }
48 ColumnCodec::Zstd => {
49 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
50 crate::zstd_codec::encode(&raw)
51 }
52 _ => Ok(crate::delta::encode(values)),
53 }
54}
55
56pub fn encode_f64_pipeline(values: &[f64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
58 match codec {
59 ColumnCodec::AlpFastLanesLz4 => encode_alp_fastlanes_lz4(values),
61 ColumnCodec::AlpRdLz4 => {
62 let alp_rd_bytes = crate::alp_rd::encode(values)?;
63 Ok(crate::lz4::encode(&alp_rd_bytes))
64 }
65 ColumnCodec::PcodecLz4 => {
66 let pco_bytes = crate::pcodec::encode_f64(values)?;
67 Ok(crate::lz4::encode(&pco_bytes))
68 }
69 ColumnCodec::AlpFastLanesRans => encode_alp_fastlanes_rans(values),
71 ColumnCodec::Gorilla => Ok(crate::gorilla::encode_f64(values)),
73 ColumnCodec::Raw => {
74 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
75 Ok(crate::raw::encode(&raw))
76 }
77 ColumnCodec::Lz4 => {
78 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
79 Ok(crate::lz4::encode(&raw))
80 }
81 ColumnCodec::Zstd => {
82 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
83 crate::zstd_codec::encode(&raw)
84 }
85 _ => Ok(crate::gorilla::encode_f64(values)),
86 }
87}
88
89pub fn encode_bytes_pipeline(raw: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
91 match codec {
92 ColumnCodec::FsstLz4 => {
93 let fsst_bytes = crate::fsst::encode(&[raw]);
95 Ok(crate::lz4::encode(&fsst_bytes))
96 }
97 ColumnCodec::FsstRans => {
98 let fsst_bytes = crate::fsst::encode(&[raw]);
99 Ok(crate::rans::encode(&fsst_bytes))
100 }
101 ColumnCodec::Raw => Ok(crate::raw::encode(raw)),
102 ColumnCodec::Lz4 => Ok(crate::lz4::encode(raw)),
103 ColumnCodec::Zstd => crate::zstd_codec::encode(raw),
104 ColumnCodec::FastLanesLz4 => {
105 if raw.len().is_multiple_of(4) {
107 let i64_vals: Vec<i64> = raw
108 .chunks_exact(4)
109 .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as i64)
110 .collect();
111 encode_fastlanes_lz4_i64(&i64_vals)
112 } else {
113 Ok(crate::raw::encode(raw))
114 }
115 }
116 _ => Ok(crate::raw::encode(raw)),
117 }
118}
119
120pub fn decode_i64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<i64>, CodecError> {
126 match codec {
127 ColumnCodec::DeltaFastLanesLz4 => decode_delta_fastlanes_lz4(data),
129 ColumnCodec::FastLanesLz4 => decode_fastlanes_lz4_i64(data),
130 ColumnCodec::PcodecLz4 => {
131 let pco_bytes = crate::lz4::decode(data)?;
132 crate::pcodec::decode_i64(&pco_bytes)
133 }
134 ColumnCodec::DeltaFastLanesRans => decode_delta_fastlanes_rans(data),
136 ColumnCodec::DoubleDelta => crate::double_delta::decode(data),
138 ColumnCodec::Delta => crate::delta::decode(data),
139 ColumnCodec::Gorilla => crate::gorilla::decode_timestamps(data),
140 ColumnCodec::Raw => {
141 let raw = crate::raw::decode(data)?;
142 raw_to_i64(&raw)
143 }
144 ColumnCodec::Lz4 => {
145 let raw = crate::lz4::decode(data)?;
146 raw_to_i64(&raw)
147 }
148 ColumnCodec::Zstd => {
149 let raw = crate::zstd_codec::decode(data)?;
150 raw_to_i64(&raw)
151 }
152 _ => crate::delta::decode(data),
153 }
154}
155
156pub fn decode_f64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<f64>, CodecError> {
158 match codec {
159 ColumnCodec::AlpFastLanesLz4 => decode_alp_fastlanes_lz4(data),
161 ColumnCodec::AlpRdLz4 => {
162 let alp_rd_bytes = crate::lz4::decode(data)?;
163 crate::alp_rd::decode(&alp_rd_bytes)
164 }
165 ColumnCodec::PcodecLz4 => {
166 let pco_bytes = crate::lz4::decode(data)?;
167 crate::pcodec::decode_f64(&pco_bytes)
168 }
169 ColumnCodec::AlpFastLanesRans => decode_alp_fastlanes_rans(data),
171 ColumnCodec::Gorilla => crate::gorilla::decode_f64(data),
173 ColumnCodec::Raw => {
174 let raw = crate::raw::decode(data)?;
175 raw_to_f64(&raw)
176 }
177 ColumnCodec::Lz4 => {
178 let raw = crate::lz4::decode(data)?;
179 raw_to_f64(&raw)
180 }
181 ColumnCodec::Zstd => {
182 let raw = crate::zstd_codec::decode(data)?;
183 raw_to_f64(&raw)
184 }
185 _ => crate::gorilla::decode_f64(data),
186 }
187}
188
189pub fn decode_bytes_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
191 match codec {
192 ColumnCodec::FsstLz4 => {
193 let fsst_bytes = crate::lz4::decode(data)?;
194 let strings = crate::fsst::decode(&fsst_bytes)?;
195 strings.into_iter().next().ok_or(CodecError::Corrupt {
196 detail: "FSST decode produced no output".into(),
197 })
198 }
199 ColumnCodec::FsstRans => {
200 let fsst_bytes = crate::rans::decode(data)?;
201 let strings = crate::fsst::decode(&fsst_bytes)?;
202 strings.into_iter().next().ok_or(CodecError::Corrupt {
203 detail: "FSST decode produced no output".into(),
204 })
205 }
206 ColumnCodec::Raw => crate::raw::decode(data),
207 ColumnCodec::Lz4 => crate::lz4::decode(data),
208 ColumnCodec::Zstd => crate::zstd_codec::decode(data),
209 ColumnCodec::FastLanesLz4 => {
210 let i64_vals = decode_fastlanes_lz4_i64(data)?;
211 Ok(i64_vals
212 .iter()
213 .flat_map(|&v| (v as u32).to_le_bytes())
214 .collect())
215 }
216 _ => crate::raw::decode(data).or_else(|_| Ok(data.to_vec())),
217 }
218}
219
220fn encode_alp_fastlanes_lz4(values: &[f64]) -> Result<Vec<u8>, CodecError> {
226 let alp_encoded = crate::alp::encode(values);
228 Ok(crate::lz4::encode(&alp_encoded))
230}
231
232fn decode_alp_fastlanes_lz4(data: &[u8]) -> Result<Vec<f64>, CodecError> {
233 let alp_bytes = crate::lz4::decode(data)?;
235 crate::alp::decode(&alp_bytes)
236}
237
238fn encode_delta_fastlanes_lz4(values: &[i64]) -> Result<Vec<u8>, CodecError> {
240 if values.is_empty() {
241 return Ok(crate::lz4::encode(&crate::fastlanes::encode(&[])));
242 }
243
244 let mut deltas = Vec::with_capacity(values.len());
246 deltas.push(values[0]); for i in 1..values.len() {
248 deltas.push(values[i].wrapping_sub(values[i - 1]));
249 }
250
251 let packed = crate::fastlanes::encode(&deltas);
253
254 Ok(crate::lz4::encode(&packed))
256}
257
258fn decode_delta_fastlanes_lz4(data: &[u8]) -> Result<Vec<i64>, CodecError> {
259 let packed = crate::lz4::decode(data)?;
261 let deltas = crate::fastlanes::decode(&packed)?;
262
263 if deltas.is_empty() {
264 return Ok(Vec::new());
265 }
266
267 let mut values = Vec::with_capacity(deltas.len());
269 values.push(deltas[0]); for &d in &deltas[1..] {
271 let prev = values[values.len() - 1];
272 values.push(prev.wrapping_add(d));
273 }
274
275 Ok(values)
276}
277
278fn encode_fastlanes_lz4_i64(values: &[i64]) -> Result<Vec<u8>, CodecError> {
280 let packed = crate::fastlanes::encode(values);
281 Ok(crate::lz4::encode(&packed))
282}
283
284fn decode_fastlanes_lz4_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
285 let packed = crate::lz4::decode(data)?;
286 crate::fastlanes::decode(&packed)
287}
288
289fn encode_alp_fastlanes_rans(values: &[f64]) -> Result<Vec<u8>, CodecError> {
291 let alp_encoded = crate::alp::encode(values);
292 Ok(crate::rans::encode(&alp_encoded))
293}
294
295fn decode_alp_fastlanes_rans(data: &[u8]) -> Result<Vec<f64>, CodecError> {
296 let alp_bytes = crate::rans::decode(data)?;
297 crate::alp::decode(&alp_bytes)
298}
299
300fn encode_delta_fastlanes_rans(values: &[i64]) -> Result<Vec<u8>, CodecError> {
302 if values.is_empty() {
303 return Ok(crate::rans::encode(&crate::fastlanes::encode(&[])));
304 }
305
306 let mut deltas = Vec::with_capacity(values.len());
307 deltas.push(values[0]);
308 for i in 1..values.len() {
309 deltas.push(values[i].wrapping_sub(values[i - 1]));
310 }
311
312 let packed = crate::fastlanes::encode(&deltas);
313 Ok(crate::rans::encode(&packed))
314}
315
316fn decode_delta_fastlanes_rans(data: &[u8]) -> Result<Vec<i64>, CodecError> {
317 let packed = crate::rans::decode(data)?;
318 let deltas = crate::fastlanes::decode(&packed)?;
319
320 if deltas.is_empty() {
321 return Ok(Vec::new());
322 }
323
324 let mut values = Vec::with_capacity(deltas.len());
325 values.push(deltas[0]);
326 for &d in &deltas[1..] {
327 let prev = values[values.len() - 1];
328 values.push(prev.wrapping_add(d));
329 }
330
331 Ok(values)
332}
333
334fn raw_to_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
339 if !data.len().is_multiple_of(8) {
340 return Err(CodecError::Corrupt {
341 detail: "i64 data not aligned to 8 bytes".into(),
342 });
343 }
344 Ok(data
345 .chunks_exact(8)
346 .map(|c| i64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
347 .collect())
348}
349
350fn raw_to_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
351 if !data.len().is_multiple_of(8) {
352 return Err(CodecError::Corrupt {
353 detail: "f64 data not aligned to 8 bytes".into(),
354 });
355 }
356 Ok(data
357 .chunks_exact(8)
358 .map(|c| f64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
359 .collect())
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn alp_fastlanes_lz4_decimal_metrics() {
368 let values: Vec<f64> = (0..10_000).map(|i| i as f64 * 0.1).collect();
369 let encoded = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4).unwrap();
370 let decoded = decode_f64_pipeline(&encoded, ColumnCodec::AlpFastLanesLz4).unwrap();
371
372 for (i, (a, b)) in values.iter().zip(decoded.iter()).enumerate() {
373 assert_eq!(a.to_bits(), b.to_bits(), "mismatch at {i}");
374 }
375
376 let raw_size = values.len() * 8;
377 let ratio = raw_size as f64 / encoded.len() as f64;
378 assert!(
379 ratio > 3.0,
380 "ALP+FL+LZ4 should compress decimals >3x, got {ratio:.1}x"
381 );
382 }
383
384 #[test]
385 fn alp_beats_gorilla_on_decimals() {
386 let mut values = Vec::with_capacity(10_000);
387 let mut rng: u64 = 42;
388 for _ in 0..10_000 {
389 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
390 let cpu = ((rng >> 33) as f64 / (u32::MAX as f64)) * 100.0;
391 values.push((cpu * 10.0).round() / 10.0);
392 }
393
394 let alp_size = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4)
395 .unwrap()
396 .len();
397 let gorilla_size = encode_f64_pipeline(&values, ColumnCodec::Gorilla)
398 .unwrap()
399 .len();
400
401 assert!(
402 alp_size < gorilla_size,
403 "ALP ({alp_size}) should beat Gorilla ({gorilla_size}) on decimal metrics"
404 );
405 }
406
407 #[test]
408 fn delta_fastlanes_lz4_timestamps() {
409 let values: Vec<i64> = (0..10_000)
410 .map(|i| 1_700_000_000_000 + i * 10_000)
411 .collect();
412 let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
413 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
414 assert_eq!(decoded, values);
415
416 let raw_size = values.len() * 8;
417 let ratio = raw_size as f64 / encoded.len() as f64;
418 assert!(
419 ratio > 5.0,
420 "Delta+FL+LZ4 should compress timestamps >5x, got {ratio:.1}x"
421 );
422 }
423
424 #[test]
425 fn delta_fastlanes_lz4_jittered_timestamps() {
426 let mut values = Vec::with_capacity(10_000);
427 let mut ts = 1_700_000_000_000i64;
428 let mut rng: u64 = 42;
429 for _ in 0..10_000 {
430 values.push(ts);
431 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
432 let jitter = ((rng >> 33) as i64 % 101) - 50;
433 ts += 10_000 + jitter;
434 }
435 let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
436 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
437 assert_eq!(decoded, values);
438 }
439
440 #[test]
441 fn delta_fastlanes_lz4_counters() {
442 let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
443 let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
444 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
445 assert_eq!(decoded, values);
446 }
447
448 #[test]
449 fn fastlanes_lz4_symbol_ids() {
450 let values: Vec<i64> = (0..5000).map(|i| i % 150).collect();
451 let encoded = encode_i64_pipeline(&values, ColumnCodec::FastLanesLz4).unwrap();
452 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::FastLanesLz4).unwrap();
453 assert_eq!(decoded, values);
454 }
455
456 #[test]
457 fn legacy_codecs_still_work() {
458 let i64_vals: Vec<i64> = (0..1000).collect();
459 for codec in [
460 ColumnCodec::DoubleDelta,
461 ColumnCodec::Delta,
462 ColumnCodec::Gorilla,
463 ] {
464 let encoded = encode_i64_pipeline(&i64_vals, codec).unwrap();
465 let decoded = decode_i64_pipeline(&encoded, codec).unwrap();
466 assert_eq!(decoded, i64_vals, "legacy i64 codec {codec} failed");
467 }
468
469 let f64_vals: Vec<f64> = (0..1000).map(|i| i as f64 * 0.5).collect();
470 let encoded = encode_f64_pipeline(&f64_vals, ColumnCodec::Gorilla).unwrap();
471 let decoded = decode_f64_pipeline(&encoded, ColumnCodec::Gorilla).unwrap();
472 for (a, b) in f64_vals.iter().zip(decoded.iter()) {
473 assert_eq!(a.to_bits(), b.to_bits());
474 }
475 }
476
477 #[test]
478 fn empty_values() {
479 let empty_i64: Vec<i64> = vec![];
480 let empty_f64: Vec<f64> = vec![];
481
482 for codec in [
483 ColumnCodec::DeltaFastLanesLz4,
484 ColumnCodec::FastLanesLz4,
485 ColumnCodec::AlpFastLanesLz4,
486 ] {
487 if matches!(codec, ColumnCodec::AlpFastLanesLz4) {
488 let enc = encode_f64_pipeline(&empty_f64, codec).unwrap();
489 let dec = decode_f64_pipeline(&enc, codec).unwrap();
490 assert!(dec.is_empty());
491 } else {
492 let enc = encode_i64_pipeline(&empty_i64, codec).unwrap();
493 let dec = decode_i64_pipeline(&enc, codec).unwrap();
494 assert!(dec.is_empty());
495 }
496 }
497 }
498
499 #[test]
500 fn bytes_pipeline_roundtrip() {
501 let raw: Vec<u8> = (0..1000u32).flat_map(|i| i.to_le_bytes()).collect();
502 for codec in [ColumnCodec::Raw, ColumnCodec::Lz4] {
503 let encoded = encode_bytes_pipeline(&raw, codec).unwrap();
504 let decoded = decode_bytes_pipeline(&encoded, codec).unwrap();
505 assert_eq!(decoded, raw, "bytes pipeline {codec} failed");
506 }
507 }
508}