pub mod alp;
pub mod alp_rd;
pub mod crdt_compress;
pub mod delta;
pub mod detect;
pub mod double_delta;
pub mod error;
pub mod fastlanes;
pub mod fsst;
pub mod gorilla;
pub mod lz4;
pub mod pcodec;
pub mod pipeline;
pub mod rans;
pub mod raw;
pub mod spherical;
pub mod zstd_codec;
pub const CODEC_SAMPLE_SIZE: usize = 1024;
pub use crdt_compress::CrdtOp;
pub use delta::{DeltaDecoder, DeltaEncoder};
pub use detect::detect_codec;
pub use double_delta::{DoubleDeltaDecoder, DoubleDeltaEncoder};
pub use error::CodecError;
pub use gorilla::{GorillaDecoder, GorillaEncoder};
pub use lz4::{Lz4Decoder, Lz4Encoder};
pub use pipeline::{
decode_bytes_pipeline, decode_f64_pipeline, decode_i64_pipeline, encode_bytes_pipeline,
encode_f64_pipeline, encode_i64_pipeline,
};
pub use raw::{RawDecoder, RawEncoder};
pub use zstd_codec::{ZstdDecoder, ZstdEncoder};
use serde::{Deserialize, Serialize};
use zerompk::{FromMessagePack, ToMessagePack};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, ToMessagePack, FromMessagePack,
)]
#[serde(rename_all = "snake_case")]
#[repr(u8)]
#[msgpack(c_enum)]
pub enum ColumnCodec {
Auto = 0,
AlpFastLanesLz4 = 1,
AlpRdLz4 = 2,
PcodecLz4 = 3,
DeltaFastLanesLz4 = 4,
FastLanesLz4 = 5,
FsstLz4 = 6,
AlpFastLanesRans = 7,
DeltaFastLanesRans = 8,
FsstRans = 9,
Gorilla = 10,
DoubleDelta = 11,
Delta = 12,
Lz4 = 13,
Zstd = 14,
Raw = 15,
}
impl ColumnCodec {
pub fn is_compressed(&self) -> bool {
!matches!(self, Self::Raw | Self::Auto)
}
pub fn is_cascading(&self) -> bool {
matches!(
self,
Self::AlpFastLanesLz4
| Self::AlpRdLz4
| Self::PcodecLz4
| Self::DeltaFastLanesLz4
| Self::FastLanesLz4
| Self::FsstLz4
| Self::AlpFastLanesRans
| Self::DeltaFastLanesRans
| Self::FsstRans
)
}
pub fn is_cold_tier(&self) -> bool {
matches!(
self,
Self::AlpFastLanesRans | Self::DeltaFastLanesRans | Self::FsstRans
)
}
pub fn as_str(&self) -> &'static str {
match self {
Self::Auto => "auto",
Self::AlpFastLanesLz4 => "alp_fastlanes_lz4",
Self::AlpRdLz4 => "alp_rd_lz4",
Self::PcodecLz4 => "pcodec_lz4",
Self::DeltaFastLanesLz4 => "delta_fastlanes_lz4",
Self::FastLanesLz4 => "fastlanes_lz4",
Self::FsstLz4 => "fsst_lz4",
Self::AlpFastLanesRans => "alp_fastlanes_rans",
Self::DeltaFastLanesRans => "delta_fastlanes_rans",
Self::FsstRans => "fsst_rans",
Self::Gorilla => "gorilla",
Self::DoubleDelta => "double_delta",
Self::Delta => "delta",
Self::Lz4 => "lz4",
Self::Zstd => "zstd",
Self::Raw => "raw",
}
}
}
impl std::fmt::Display for ColumnCodec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ColumnTypeHint {
Timestamp,
Float64,
Int64,
Symbol,
String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnStatistics {
pub codec: ColumnCodec,
pub count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub min: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sum: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cardinality: Option<u32>,
pub compressed_bytes: u64,
pub uncompressed_bytes: u64,
}
impl ColumnStatistics {
pub fn new(codec: ColumnCodec) -> Self {
Self {
codec,
count: 0,
min: None,
max: None,
sum: None,
cardinality: None,
compressed_bytes: 0,
uncompressed_bytes: 0,
}
}
pub fn from_i64(values: &[i64], codec: ColumnCodec, compressed_bytes: u64) -> Self {
if values.is_empty() {
return Self::new(codec);
}
let mut min = values[0];
let mut max = values[0];
let mut sum: i128 = 0;
for &v in values {
if v < min {
min = v;
}
if v > max {
max = v;
}
sum += v as i128;
}
Self {
codec,
count: values.len() as u64,
min: Some(min as f64),
max: Some(max as f64),
sum: Some(sum as f64),
cardinality: None,
compressed_bytes,
uncompressed_bytes: (values.len() * 8) as u64,
}
}
pub fn from_f64(values: &[f64], codec: ColumnCodec, compressed_bytes: u64) -> Self {
if values.is_empty() {
return Self::new(codec);
}
let mut min = values[0];
let mut max = values[0];
let mut sum: f64 = 0.0;
for &v in values {
if v < min {
min = v;
}
if v > max {
max = v;
}
sum += v;
}
Self {
codec,
count: values.len() as u64,
min: Some(min),
max: Some(max),
sum: Some(sum),
cardinality: None,
compressed_bytes,
uncompressed_bytes: (values.len() * 8) as u64,
}
}
pub fn from_symbols(
values: &[u32],
cardinality: u32,
codec: ColumnCodec,
compressed_bytes: u64,
) -> Self {
Self {
codec,
count: values.len() as u64,
min: None,
max: None,
sum: None,
cardinality: Some(cardinality),
compressed_bytes,
uncompressed_bytes: (values.len() * 4) as u64,
}
}
pub fn compression_ratio(&self) -> f64 {
if self.compressed_bytes == 0 {
return 1.0;
}
self.uncompressed_bytes as f64 / self.compressed_bytes as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn column_codec_serde_roundtrip() {
for codec in [
ColumnCodec::Auto,
ColumnCodec::AlpFastLanesLz4,
ColumnCodec::AlpRdLz4,
ColumnCodec::PcodecLz4,
ColumnCodec::DeltaFastLanesLz4,
ColumnCodec::FastLanesLz4,
ColumnCodec::FsstLz4,
ColumnCodec::AlpFastLanesRans,
ColumnCodec::DeltaFastLanesRans,
ColumnCodec::FsstRans,
ColumnCodec::Gorilla,
ColumnCodec::DoubleDelta,
ColumnCodec::Delta,
ColumnCodec::Lz4,
ColumnCodec::Zstd,
ColumnCodec::Raw,
] {
let json = sonic_rs::to_string(&codec).unwrap();
let back: ColumnCodec = sonic_rs::from_str(&json).unwrap();
assert_eq!(codec, back, "serde roundtrip failed for {codec}");
}
}
#[test]
fn column_statistics_i64() {
let values = vec![10i64, 20, 30, 40, 50];
let stats = ColumnStatistics::from_i64(&values, ColumnCodec::Delta, 12);
assert_eq!(stats.count, 5);
assert_eq!(stats.min, Some(10.0));
assert_eq!(stats.max, Some(50.0));
assert_eq!(stats.sum, Some(150.0));
assert_eq!(stats.uncompressed_bytes, 40);
assert_eq!(stats.compressed_bytes, 12);
}
#[test]
fn column_statistics_f64() {
let values = vec![1.5f64, 2.5, 3.5];
let stats = ColumnStatistics::from_f64(&values, ColumnCodec::Gorilla, 8);
assert_eq!(stats.count, 3);
assert_eq!(stats.min, Some(1.5));
assert_eq!(stats.max, Some(3.5));
assert_eq!(stats.sum, Some(7.5));
}
#[test]
fn column_statistics_symbols() {
let values = vec![0u32, 1, 2, 0, 1];
let stats = ColumnStatistics::from_symbols(&values, 3, ColumnCodec::Raw, 20);
assert_eq!(stats.count, 5);
assert_eq!(stats.cardinality, Some(3));
assert!(stats.min.is_none());
}
#[test]
fn compression_ratio_calculation() {
let stats = ColumnStatistics {
codec: ColumnCodec::Delta,
count: 100,
min: None,
max: None,
sum: None,
cardinality: None,
compressed_bytes: 200,
uncompressed_bytes: 800,
};
assert!((stats.compression_ratio() - 4.0).abs() < f64::EPSILON);
}
}