1use crate::ColumnCodec;
17use crate::error::CodecError;
18
19pub fn encode_i64_pipeline(values: &[i64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
28 match codec {
29 ColumnCodec::DeltaFastLanesLz4 => encode_delta_fastlanes_lz4(values),
31 ColumnCodec::FastLanesLz4 => encode_fastlanes_lz4_i64(values),
32 ColumnCodec::PcodecLz4 => {
33 let pco_bytes = crate::pcodec::encode_i64(values)?;
34 Ok(crate::lz4::encode(&pco_bytes))
35 }
36 ColumnCodec::DeltaFastLanesRans => encode_delta_fastlanes_rans(values),
38 ColumnCodec::DoubleDelta => Ok(crate::double_delta::encode(values)),
40 ColumnCodec::Delta => Ok(crate::delta::encode(values)),
41 ColumnCodec::Gorilla => Ok(crate::gorilla::encode_timestamps(values)),
42 ColumnCodec::Raw => {
43 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
44 Ok(crate::raw::encode(&raw))
45 }
46 ColumnCodec::Lz4 => {
47 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
48 Ok(crate::lz4::encode(&raw))
49 }
50 ColumnCodec::Zstd => {
51 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
52 crate::zstd_codec::encode(&raw)
53 }
54 _ => Ok(crate::delta::encode(values)),
55 }
56}
57
58pub fn encode_f64_pipeline(values: &[f64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
60 match codec {
61 ColumnCodec::AlpFastLanesLz4 => encode_alp_fastlanes_lz4(values),
63 ColumnCodec::AlpRdLz4 => {
64 let alp_rd_bytes = crate::alp_rd::encode(values)?;
65 Ok(crate::lz4::encode(&alp_rd_bytes))
66 }
67 ColumnCodec::PcodecLz4 => {
68 let pco_bytes = crate::pcodec::encode_f64(values)?;
69 Ok(crate::lz4::encode(&pco_bytes))
70 }
71 ColumnCodec::AlpFastLanesRans => encode_alp_fastlanes_rans(values),
73 ColumnCodec::Gorilla => Ok(crate::gorilla::encode_f64(values)),
75 ColumnCodec::Raw => {
76 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
77 Ok(crate::raw::encode(&raw))
78 }
79 ColumnCodec::Lz4 => {
80 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
81 Ok(crate::lz4::encode(&raw))
82 }
83 ColumnCodec::Zstd => {
84 let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
85 crate::zstd_codec::encode(&raw)
86 }
87 _ => Ok(crate::gorilla::encode_f64(values)),
88 }
89}
90
91pub fn encode_bytes_pipeline(raw: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
93 match codec {
94 ColumnCodec::FsstLz4 => {
95 let fsst_bytes = crate::fsst::encode(&[raw]);
97 Ok(crate::lz4::encode(&fsst_bytes))
98 }
99 ColumnCodec::FsstRans => {
100 let fsst_bytes = crate::fsst::encode(&[raw]);
101 Ok(crate::rans::encode(&fsst_bytes))
102 }
103 ColumnCodec::Raw => Ok(crate::raw::encode(raw)),
104 ColumnCodec::Lz4 => Ok(crate::lz4::encode(raw)),
105 ColumnCodec::Zstd => crate::zstd_codec::encode(raw),
106 ColumnCodec::FastLanesLz4 => {
107 if raw.len().is_multiple_of(4) {
109 let i64_vals: Vec<i64> = raw
110 .chunks_exact(4)
111 .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as i64)
112 .collect();
113 encode_fastlanes_lz4_i64(&i64_vals)
114 } else {
115 Ok(crate::raw::encode(raw))
116 }
117 }
118 _ => Ok(crate::raw::encode(raw)),
119 }
120}
121
122pub fn decode_i64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<i64>, CodecError> {
128 match codec {
129 ColumnCodec::DeltaFastLanesLz4 => decode_delta_fastlanes_lz4(data),
131 ColumnCodec::FastLanesLz4 => decode_fastlanes_lz4_i64(data),
132 ColumnCodec::PcodecLz4 => {
133 let pco_bytes = crate::lz4::decode(data)?;
134 crate::pcodec::decode_i64(&pco_bytes)
135 }
136 ColumnCodec::DeltaFastLanesRans => decode_delta_fastlanes_rans(data),
138 ColumnCodec::DoubleDelta => crate::double_delta::decode(data),
140 ColumnCodec::Delta => crate::delta::decode(data),
141 ColumnCodec::Gorilla => crate::gorilla::decode_timestamps(data),
142 ColumnCodec::Raw => {
143 let raw = crate::raw::decode(data)?;
144 raw_to_i64(&raw)
145 }
146 ColumnCodec::Lz4 => {
147 let raw = crate::lz4::decode(data)?;
148 raw_to_i64(&raw)
149 }
150 ColumnCodec::Zstd => {
151 let raw = crate::zstd_codec::decode(data)?;
152 raw_to_i64(&raw)
153 }
154 _ => crate::delta::decode(data),
155 }
156}
157
158pub fn decode_f64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<f64>, CodecError> {
160 match codec {
161 ColumnCodec::AlpFastLanesLz4 => decode_alp_fastlanes_lz4(data),
163 ColumnCodec::AlpRdLz4 => {
164 let alp_rd_bytes = crate::lz4::decode(data)?;
165 crate::alp_rd::decode(&alp_rd_bytes)
166 }
167 ColumnCodec::PcodecLz4 => {
168 let pco_bytes = crate::lz4::decode(data)?;
169 crate::pcodec::decode_f64(&pco_bytes)
170 }
171 ColumnCodec::AlpFastLanesRans => decode_alp_fastlanes_rans(data),
173 ColumnCodec::Gorilla => crate::gorilla::decode_f64(data),
175 ColumnCodec::Raw => {
176 let raw = crate::raw::decode(data)?;
177 raw_to_f64(&raw)
178 }
179 ColumnCodec::Lz4 => {
180 let raw = crate::lz4::decode(data)?;
181 raw_to_f64(&raw)
182 }
183 ColumnCodec::Zstd => {
184 let raw = crate::zstd_codec::decode(data)?;
185 raw_to_f64(&raw)
186 }
187 _ => crate::gorilla::decode_f64(data),
188 }
189}
190
191pub fn decode_bytes_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
193 match codec {
194 ColumnCodec::FsstLz4 => {
195 let fsst_bytes = crate::lz4::decode(data)?;
196 let strings = crate::fsst::decode(&fsst_bytes)?;
197 strings.into_iter().next().ok_or(CodecError::Corrupt {
198 detail: "FSST decode produced no output".into(),
199 })
200 }
201 ColumnCodec::FsstRans => {
202 let fsst_bytes = crate::rans::decode(data)?;
203 let strings = crate::fsst::decode(&fsst_bytes)?;
204 strings.into_iter().next().ok_or(CodecError::Corrupt {
205 detail: "FSST decode produced no output".into(),
206 })
207 }
208 ColumnCodec::Raw => crate::raw::decode(data),
209 ColumnCodec::Lz4 => crate::lz4::decode(data),
210 ColumnCodec::Zstd => crate::zstd_codec::decode(data),
211 ColumnCodec::FastLanesLz4 => {
212 let i64_vals = decode_fastlanes_lz4_i64(data)?;
213 Ok(i64_vals
214 .iter()
215 .flat_map(|&v| (v as u32).to_le_bytes())
216 .collect())
217 }
218 _ => crate::raw::decode(data).or_else(|_| Ok(data.to_vec())),
219 }
220}
221
222fn encode_alp_fastlanes_lz4(values: &[f64]) -> Result<Vec<u8>, CodecError> {
228 let alp_encoded = crate::alp::encode(values);
230 Ok(crate::lz4::encode(&alp_encoded))
232}
233
234fn decode_alp_fastlanes_lz4(data: &[u8]) -> Result<Vec<f64>, CodecError> {
235 let alp_bytes = crate::lz4::decode(data)?;
237 crate::alp::decode(&alp_bytes)
238}
239
240fn encode_delta_fastlanes_lz4(values: &[i64]) -> Result<Vec<u8>, CodecError> {
242 if values.is_empty() {
243 return Ok(crate::lz4::encode(&crate::fastlanes::encode(&[])));
244 }
245
246 let mut deltas = Vec::with_capacity(values.len());
248 deltas.push(values[0]); for i in 1..values.len() {
250 deltas.push(values[i].wrapping_sub(values[i - 1]));
251 }
252
253 let packed = crate::fastlanes::encode(&deltas);
255
256 Ok(crate::lz4::encode(&packed))
258}
259
260fn decode_delta_fastlanes_lz4(data: &[u8]) -> Result<Vec<i64>, CodecError> {
261 let packed = crate::lz4::decode(data)?;
263 let deltas = crate::fastlanes::decode(&packed)?;
264
265 if deltas.is_empty() {
266 return Ok(Vec::new());
267 }
268
269 let mut values = Vec::with_capacity(deltas.len());
271 values.push(deltas[0]); for &d in &deltas[1..] {
273 let prev = values[values.len() - 1];
274 values.push(prev.wrapping_add(d));
275 }
276
277 Ok(values)
278}
279
280fn encode_fastlanes_lz4_i64(values: &[i64]) -> Result<Vec<u8>, CodecError> {
282 let packed = crate::fastlanes::encode(values);
283 Ok(crate::lz4::encode(&packed))
284}
285
286fn decode_fastlanes_lz4_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
287 let packed = crate::lz4::decode(data)?;
288 crate::fastlanes::decode(&packed)
289}
290
291fn encode_alp_fastlanes_rans(values: &[f64]) -> Result<Vec<u8>, CodecError> {
293 let alp_encoded = crate::alp::encode(values);
294 Ok(crate::rans::encode(&alp_encoded))
295}
296
297fn decode_alp_fastlanes_rans(data: &[u8]) -> Result<Vec<f64>, CodecError> {
298 let alp_bytes = crate::rans::decode(data)?;
299 crate::alp::decode(&alp_bytes)
300}
301
302fn encode_delta_fastlanes_rans(values: &[i64]) -> Result<Vec<u8>, CodecError> {
304 if values.is_empty() {
305 return Ok(crate::rans::encode(&crate::fastlanes::encode(&[])));
306 }
307
308 let mut deltas = Vec::with_capacity(values.len());
309 deltas.push(values[0]);
310 for i in 1..values.len() {
311 deltas.push(values[i].wrapping_sub(values[i - 1]));
312 }
313
314 let packed = crate::fastlanes::encode(&deltas);
315 Ok(crate::rans::encode(&packed))
316}
317
318fn decode_delta_fastlanes_rans(data: &[u8]) -> Result<Vec<i64>, CodecError> {
319 let packed = crate::rans::decode(data)?;
320 let deltas = crate::fastlanes::decode(&packed)?;
321
322 if deltas.is_empty() {
323 return Ok(Vec::new());
324 }
325
326 let mut values = Vec::with_capacity(deltas.len());
327 values.push(deltas[0]);
328 for &d in &deltas[1..] {
329 let prev = values[values.len() - 1];
330 values.push(prev.wrapping_add(d));
331 }
332
333 Ok(values)
334}
335
336fn raw_to_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
341 if !data.len().is_multiple_of(8) {
342 return Err(CodecError::Corrupt {
343 detail: "i64 data not aligned to 8 bytes".into(),
344 });
345 }
346 Ok(data
347 .chunks_exact(8)
348 .map(|c| i64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
349 .collect())
350}
351
352fn raw_to_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
353 if !data.len().is_multiple_of(8) {
354 return Err(CodecError::Corrupt {
355 detail: "f64 data not aligned to 8 bytes".into(),
356 });
357 }
358 Ok(data
359 .chunks_exact(8)
360 .map(|c| f64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
361 .collect())
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367
368 #[test]
369 fn alp_fastlanes_lz4_decimal_metrics() {
370 let values: Vec<f64> = (0..10_000).map(|i| i as f64 * 0.1).collect();
371 let encoded = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4).unwrap();
372 let decoded = decode_f64_pipeline(&encoded, ColumnCodec::AlpFastLanesLz4).unwrap();
373
374 for (i, (a, b)) in values.iter().zip(decoded.iter()).enumerate() {
375 assert_eq!(a.to_bits(), b.to_bits(), "mismatch at {i}");
376 }
377
378 let raw_size = values.len() * 8;
379 let ratio = raw_size as f64 / encoded.len() as f64;
380 assert!(
381 ratio > 3.0,
382 "ALP+FL+LZ4 should compress decimals >3x, got {ratio:.1}x"
383 );
384 }
385
386 #[test]
387 fn alp_beats_gorilla_on_decimals() {
388 let mut values = Vec::with_capacity(10_000);
389 let mut rng: u64 = 42;
390 for _ in 0..10_000 {
391 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
392 let cpu = ((rng >> 33) as f64 / (u32::MAX as f64)) * 100.0;
393 values.push((cpu * 10.0).round() / 10.0);
394 }
395
396 let alp_size = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4)
397 .unwrap()
398 .len();
399 let gorilla_size = encode_f64_pipeline(&values, ColumnCodec::Gorilla)
400 .unwrap()
401 .len();
402
403 assert!(
404 alp_size < gorilla_size,
405 "ALP ({alp_size}) should beat Gorilla ({gorilla_size}) on decimal metrics"
406 );
407 }
408
409 #[test]
410 fn delta_fastlanes_lz4_timestamps() {
411 let values: Vec<i64> = (0..10_000)
412 .map(|i| 1_700_000_000_000 + i * 10_000)
413 .collect();
414 let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
415 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
416 assert_eq!(decoded, values);
417
418 let raw_size = values.len() * 8;
419 let ratio = raw_size as f64 / encoded.len() as f64;
420 assert!(
421 ratio > 5.0,
422 "Delta+FL+LZ4 should compress timestamps >5x, got {ratio:.1}x"
423 );
424 }
425
426 #[test]
427 fn delta_fastlanes_lz4_jittered_timestamps() {
428 let mut values = Vec::with_capacity(10_000);
429 let mut ts = 1_700_000_000_000i64;
430 let mut rng: u64 = 42;
431 for _ in 0..10_000 {
432 values.push(ts);
433 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
434 let jitter = ((rng >> 33) as i64 % 101) - 50;
435 ts += 10_000 + jitter;
436 }
437 let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
438 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
439 assert_eq!(decoded, values);
440 }
441
442 #[test]
443 fn delta_fastlanes_lz4_counters() {
444 let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
445 let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
446 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
447 assert_eq!(decoded, values);
448 }
449
450 #[test]
451 fn fastlanes_lz4_symbol_ids() {
452 let values: Vec<i64> = (0..5000).map(|i| i % 150).collect();
453 let encoded = encode_i64_pipeline(&values, ColumnCodec::FastLanesLz4).unwrap();
454 let decoded = decode_i64_pipeline(&encoded, ColumnCodec::FastLanesLz4).unwrap();
455 assert_eq!(decoded, values);
456 }
457
458 #[test]
459 fn legacy_codecs_still_work() {
460 let i64_vals: Vec<i64> = (0..1000).collect();
461 for codec in [
462 ColumnCodec::DoubleDelta,
463 ColumnCodec::Delta,
464 ColumnCodec::Gorilla,
465 ] {
466 let encoded = encode_i64_pipeline(&i64_vals, codec).unwrap();
467 let decoded = decode_i64_pipeline(&encoded, codec).unwrap();
468 assert_eq!(decoded, i64_vals, "legacy i64 codec {codec} failed");
469 }
470
471 let f64_vals: Vec<f64> = (0..1000).map(|i| i as f64 * 0.5).collect();
472 let encoded = encode_f64_pipeline(&f64_vals, ColumnCodec::Gorilla).unwrap();
473 let decoded = decode_f64_pipeline(&encoded, ColumnCodec::Gorilla).unwrap();
474 for (a, b) in f64_vals.iter().zip(decoded.iter()) {
475 assert_eq!(a.to_bits(), b.to_bits());
476 }
477 }
478
479 #[test]
480 fn empty_values() {
481 let empty_i64: Vec<i64> = vec![];
482 let empty_f64: Vec<f64> = vec![];
483
484 for codec in [
485 ColumnCodec::DeltaFastLanesLz4,
486 ColumnCodec::FastLanesLz4,
487 ColumnCodec::AlpFastLanesLz4,
488 ] {
489 if matches!(codec, ColumnCodec::AlpFastLanesLz4) {
490 let enc = encode_f64_pipeline(&empty_f64, codec).unwrap();
491 let dec = decode_f64_pipeline(&enc, codec).unwrap();
492 assert!(dec.is_empty());
493 } else {
494 let enc = encode_i64_pipeline(&empty_i64, codec).unwrap();
495 let dec = decode_i64_pipeline(&enc, codec).unwrap();
496 assert!(dec.is_empty());
497 }
498 }
499 }
500
501 #[test]
502 fn bytes_pipeline_roundtrip() {
503 let raw: Vec<u8> = (0..1000u32).flat_map(|i| i.to_le_bytes()).collect();
504 for codec in [ColumnCodec::Raw, ColumnCodec::Lz4] {
505 let encoded = encode_bytes_pipeline(&raw, codec).unwrap();
506 let decoded = decode_bytes_pipeline(&encoded, codec).unwrap();
507 assert_eq!(decoded, raw, "bytes pipeline {codec} failed");
508 }
509 }
510}