1pub mod alp;
21pub mod alp_rd;
22pub mod crdt_compress;
23pub mod delta;
24pub mod detect;
25pub mod double_delta;
26pub mod error;
27pub mod fastlanes;
28pub mod fsst;
29pub mod gorilla;
30pub mod lz4;
31pub mod pcodec;
32pub mod pipeline;
33pub mod rans;
34pub mod raw;
35pub mod spherical;
36pub mod zstd_codec;
37
38pub const CODEC_SAMPLE_SIZE: usize = 1024;
41
42pub use crdt_compress::CrdtOp;
43pub use delta::{DeltaDecoder, DeltaEncoder};
44pub use detect::detect_codec;
45pub use double_delta::{DoubleDeltaDecoder, DoubleDeltaEncoder};
46pub use error::CodecError;
47pub use gorilla::{GorillaDecoder, GorillaEncoder};
48pub use lz4::{Lz4Decoder, Lz4Encoder};
49pub use pipeline::{
50 decode_bytes_pipeline, decode_f64_pipeline, decode_i64_pipeline, encode_bytes_pipeline,
51 encode_f64_pipeline, encode_i64_pipeline,
52};
53pub use raw::{RawDecoder, RawEncoder};
54pub use zstd_codec::{ZstdDecoder, ZstdEncoder};
55
56use serde::{Deserialize, Serialize};
57use zerompk::{FromMessagePack, ToMessagePack};
58
59#[derive(
64 Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, ToMessagePack, FromMessagePack,
65)]
66#[serde(rename_all = "snake_case")]
67#[repr(u8)]
68#[msgpack(c_enum)]
69pub enum ColumnCodec {
70 Auto = 0,
73
74 AlpFastLanesLz4 = 1,
77 AlpRdLz4 = 2,
79 PcodecLz4 = 3,
81 DeltaFastLanesLz4 = 4,
83 FastLanesLz4 = 5,
85 FsstLz4 = 6,
87
88 AlpFastLanesRans = 7,
91 DeltaFastLanesRans = 8,
93 FsstRans = 9,
95
96 Gorilla = 10,
99 DoubleDelta = 11,
101 Delta = 12,
103 Lz4 = 13,
105 Zstd = 14,
107 Raw = 15,
109}
110
111impl ColumnCodec {
112 pub fn is_compressed(&self) -> bool {
113 !matches!(self, Self::Raw | Self::Auto)
114 }
115
116 pub fn is_cascading(&self) -> bool {
118 matches!(
119 self,
120 Self::AlpFastLanesLz4
121 | Self::AlpRdLz4
122 | Self::PcodecLz4
123 | Self::DeltaFastLanesLz4
124 | Self::FastLanesLz4
125 | Self::FsstLz4
126 | Self::AlpFastLanesRans
127 | Self::DeltaFastLanesRans
128 | Self::FsstRans
129 )
130 }
131
132 pub fn is_cold_tier(&self) -> bool {
134 matches!(
135 self,
136 Self::AlpFastLanesRans | Self::DeltaFastLanesRans | Self::FsstRans
137 )
138 }
139
140 pub fn as_str(&self) -> &'static str {
141 match self {
142 Self::Auto => "auto",
143 Self::AlpFastLanesLz4 => "alp_fastlanes_lz4",
144 Self::AlpRdLz4 => "alp_rd_lz4",
145 Self::PcodecLz4 => "pcodec_lz4",
146 Self::DeltaFastLanesLz4 => "delta_fastlanes_lz4",
147 Self::FastLanesLz4 => "fastlanes_lz4",
148 Self::FsstLz4 => "fsst_lz4",
149 Self::AlpFastLanesRans => "alp_fastlanes_rans",
150 Self::DeltaFastLanesRans => "delta_fastlanes_rans",
151 Self::FsstRans => "fsst_rans",
152 Self::Gorilla => "gorilla",
153 Self::DoubleDelta => "double_delta",
154 Self::Delta => "delta",
155 Self::Lz4 => "lz4",
156 Self::Zstd => "zstd",
157 Self::Raw => "raw",
158 }
159 }
160}
161
162impl std::fmt::Display for ColumnCodec {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 f.write_str(self.as_str())
165 }
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum ColumnTypeHint {
171 Timestamp,
172 Float64,
173 Int64,
174 Symbol,
175 String,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct ColumnStatistics {
184 pub codec: ColumnCodec,
186 pub count: u64,
188 #[serde(skip_serializing_if = "Option::is_none")]
190 pub min: Option<f64>,
191 #[serde(skip_serializing_if = "Option::is_none")]
193 pub max: Option<f64>,
194 #[serde(skip_serializing_if = "Option::is_none")]
196 pub sum: Option<f64>,
197 #[serde(skip_serializing_if = "Option::is_none")]
199 pub cardinality: Option<u32>,
200 pub compressed_bytes: u64,
202 pub uncompressed_bytes: u64,
204}
205
206impl ColumnStatistics {
207 pub fn new(codec: ColumnCodec) -> Self {
209 Self {
210 codec,
211 count: 0,
212 min: None,
213 max: None,
214 sum: None,
215 cardinality: None,
216 compressed_bytes: 0,
217 uncompressed_bytes: 0,
218 }
219 }
220
221 pub fn from_i64(values: &[i64], codec: ColumnCodec, compressed_bytes: u64) -> Self {
223 if values.is_empty() {
224 return Self::new(codec);
225 }
226
227 let mut min = values[0];
228 let mut max = values[0];
229 let mut sum: i128 = 0;
230
231 for &v in values {
232 if v < min {
233 min = v;
234 }
235 if v > max {
236 max = v;
237 }
238 sum += v as i128;
239 }
240
241 Self {
242 codec,
243 count: values.len() as u64,
244 min: Some(min as f64),
245 max: Some(max as f64),
246 sum: Some(sum as f64),
247 cardinality: None,
248 compressed_bytes,
249 uncompressed_bytes: (values.len() * 8) as u64,
250 }
251 }
252
253 pub fn from_f64(values: &[f64], codec: ColumnCodec, compressed_bytes: u64) -> Self {
255 if values.is_empty() {
256 return Self::new(codec);
257 }
258
259 let mut min = values[0];
260 let mut max = values[0];
261 let mut sum: f64 = 0.0;
262
263 for &v in values {
264 if v < min {
265 min = v;
266 }
267 if v > max {
268 max = v;
269 }
270 sum += v;
271 }
272
273 Self {
274 codec,
275 count: values.len() as u64,
276 min: Some(min),
277 max: Some(max),
278 sum: Some(sum),
279 cardinality: None,
280 compressed_bytes,
281 uncompressed_bytes: (values.len() * 8) as u64,
282 }
283 }
284
285 pub fn from_symbols(
287 values: &[u32],
288 cardinality: u32,
289 codec: ColumnCodec,
290 compressed_bytes: u64,
291 ) -> Self {
292 Self {
293 codec,
294 count: values.len() as u64,
295 min: None,
296 max: None,
297 sum: None,
298 cardinality: Some(cardinality),
299 compressed_bytes,
300 uncompressed_bytes: (values.len() * 4) as u64,
301 }
302 }
303
304 pub fn compression_ratio(&self) -> f64 {
306 if self.compressed_bytes == 0 {
307 return 1.0;
308 }
309 self.uncompressed_bytes as f64 / self.compressed_bytes as f64
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn column_codec_serde_roundtrip() {
319 for codec in [
320 ColumnCodec::Auto,
321 ColumnCodec::AlpFastLanesLz4,
322 ColumnCodec::AlpRdLz4,
323 ColumnCodec::PcodecLz4,
324 ColumnCodec::DeltaFastLanesLz4,
325 ColumnCodec::FastLanesLz4,
326 ColumnCodec::FsstLz4,
327 ColumnCodec::AlpFastLanesRans,
328 ColumnCodec::DeltaFastLanesRans,
329 ColumnCodec::FsstRans,
330 ColumnCodec::Gorilla,
331 ColumnCodec::DoubleDelta,
332 ColumnCodec::Delta,
333 ColumnCodec::Lz4,
334 ColumnCodec::Zstd,
335 ColumnCodec::Raw,
336 ] {
337 let json = sonic_rs::to_string(&codec).unwrap();
338 let back: ColumnCodec = sonic_rs::from_str(&json).unwrap();
339 assert_eq!(codec, back, "serde roundtrip failed for {codec}");
340 }
341 }
342
343 #[test]
344 fn column_statistics_i64() {
345 let values = vec![10i64, 20, 30, 40, 50];
346 let stats = ColumnStatistics::from_i64(&values, ColumnCodec::Delta, 12);
347 assert_eq!(stats.count, 5);
348 assert_eq!(stats.min, Some(10.0));
349 assert_eq!(stats.max, Some(50.0));
350 assert_eq!(stats.sum, Some(150.0));
351 assert_eq!(stats.uncompressed_bytes, 40);
352 assert_eq!(stats.compressed_bytes, 12);
353 }
354
355 #[test]
356 fn column_statistics_f64() {
357 let values = vec![1.5f64, 2.5, 3.5];
358 let stats = ColumnStatistics::from_f64(&values, ColumnCodec::Gorilla, 8);
359 assert_eq!(stats.count, 3);
360 assert_eq!(stats.min, Some(1.5));
361 assert_eq!(stats.max, Some(3.5));
362 assert_eq!(stats.sum, Some(7.5));
363 }
364
365 #[test]
366 fn column_statistics_symbols() {
367 let values = vec![0u32, 1, 2, 0, 1];
368 let stats = ColumnStatistics::from_symbols(&values, 3, ColumnCodec::Raw, 20);
369 assert_eq!(stats.count, 5);
370 assert_eq!(stats.cardinality, Some(3));
371 assert!(stats.min.is_none());
372 }
373
374 #[test]
375 fn compression_ratio_calculation() {
376 let stats = ColumnStatistics {
377 codec: ColumnCodec::Delta,
378 count: 100,
379 min: None,
380 max: None,
381 sum: None,
382 cardinality: None,
383 compressed_bytes: 200,
384 uncompressed_bytes: 800,
385 };
386 assert!((stats.compression_ratio() - 4.0).abs() < f64::EPSILON);
387 }
388}