use std::sync::Arc;
use crate::common::SmartString;
use crate::core::{DataType, Value};
#[derive(Clone)]
pub enum ColumnData {
Int64 { values: Vec<i64>, nulls: Vec<bool> },
Float64 { values: Vec<f64>, nulls: Vec<bool> },
TimestampNanos { values: Vec<i64>, nulls: Vec<bool> },
Boolean { values: Vec<bool>, nulls: Vec<bool> },
Dictionary {
ids: Vec<u32>,
dictionary: Arc<[SmartString]>,
nulls: Vec<bool>,
},
Bytes {
data: Vec<u8>,
offsets: Vec<(u64, u64)>,
ext_type: DataType,
nulls: Vec<bool>,
},
}
#[derive(Debug, Clone)]
pub struct ZoneMap {
pub min: Value,
pub max: Value,
pub null_count: u32,
pub row_count: u32,
}
pub const ROW_GROUP_SIZE: usize = 65536;
#[derive(Debug, Clone)]
pub struct RowGroupMeta {
pub start_idx: u32,
pub end_idx: u32,
pub zone_maps: Vec<ZoneMap>,
}
impl ColumnData {
pub fn memory_size(&self) -> usize {
match self {
ColumnData::Int64 { values, nulls } => values.len() * 8 + nulls.len(),
ColumnData::Float64 { values, nulls } => values.len() * 8 + nulls.len(),
ColumnData::TimestampNanos { values, nulls } => values.len() * 8 + nulls.len(),
ColumnData::Boolean { values, nulls } => values.len() + nulls.len(),
ColumnData::Dictionary {
ids,
dictionary,
nulls,
} => {
ids.len() * 4 + dictionary.iter().map(|s| s.len() + 24).sum::<usize>() + nulls.len()
}
ColumnData::Bytes {
data,
offsets,
nulls,
..
} => data.len() + offsets.len() * 16 + nulls.len(),
}
}
pub fn zone_map_for_range(&self, start: usize, end: usize) -> ZoneMap {
let mut null_count = 0u32;
let row_count = (end - start) as u32;
match self {
ColumnData::Int64 { values, nulls } => {
let mut min_val = i64::MAX;
let mut max_val = i64::MIN;
let mut has_value = false;
for i in start..end {
if nulls[i] {
null_count += 1;
} else {
let v = values[i];
if !has_value || v < min_val {
min_val = v;
}
if !has_value || v > max_val {
max_val = v;
}
has_value = true;
}
}
if has_value {
ZoneMap {
min: Value::Integer(min_val),
max: Value::Integer(max_val),
null_count,
row_count,
}
} else {
ZoneMap {
min: Value::Null(DataType::Integer),
max: Value::Null(DataType::Integer),
null_count,
row_count,
}
}
}
ColumnData::Float64 { values, nulls } => {
let mut min_val = f64::INFINITY;
let mut max_val = f64::NEG_INFINITY;
let mut has_value = false;
for i in start..end {
if nulls[i] {
null_count += 1;
} else {
let v = values[i];
if v.is_nan() {
continue;
}
if !has_value || v < min_val {
min_val = v;
}
if !has_value || v > max_val {
max_val = v;
}
has_value = true;
}
}
if has_value {
ZoneMap {
min: Value::Float(min_val),
max: Value::Float(max_val),
null_count,
row_count,
}
} else {
ZoneMap {
min: Value::Null(DataType::Float),
max: Value::Null(DataType::Float),
null_count,
row_count,
}
}
}
ColumnData::TimestampNanos { values, nulls } => {
let mut min_val = i64::MAX;
let mut max_val = i64::MIN;
let mut has_value = false;
for i in start..end {
if nulls[i] {
null_count += 1;
} else {
let v = values[i];
if !has_value || v < min_val {
min_val = v;
}
if !has_value || v > max_val {
max_val = v;
}
has_value = true;
}
}
if has_value {
let to_ts = |nanos: i64| -> Value {
let secs = nanos.div_euclid(1_000_000_000);
let sub = nanos.rem_euclid(1_000_000_000) as u32;
match chrono::TimeZone::timestamp_opt(&chrono::Utc, secs, sub) {
chrono::LocalResult::Single(dt) => Value::Timestamp(dt),
_ => Value::Null(DataType::Timestamp),
}
};
ZoneMap {
min: to_ts(min_val),
max: to_ts(max_val),
null_count,
row_count,
}
} else {
ZoneMap {
min: Value::Null(DataType::Timestamp),
max: Value::Null(DataType::Timestamp),
null_count,
row_count,
}
}
}
ColumnData::Boolean { values, nulls } => {
let mut has_true = false;
let mut has_false = false;
for i in start..end {
if nulls[i] {
null_count += 1;
} else if values[i] {
has_true = true;
} else {
has_false = true;
}
}
let (min, max) = if has_false && has_true {
(Value::Boolean(false), Value::Boolean(true))
} else if has_false {
(Value::Boolean(false), Value::Boolean(false))
} else if has_true {
(Value::Boolean(true), Value::Boolean(true))
} else {
(
Value::Null(DataType::Boolean),
Value::Null(DataType::Boolean),
)
};
ZoneMap {
min,
max,
null_count,
row_count,
}
}
ColumnData::Dictionary {
ids,
dictionary,
nulls,
} => {
let mut min_str: Option<&str> = None;
let mut max_str: Option<&str> = None;
for i in start..end {
if nulls[i] {
null_count += 1;
} else {
let s = dictionary[ids[i] as usize].as_str();
min_str = Some(match min_str {
Some(cur) if cur <= s => cur,
_ => s,
});
max_str = Some(match max_str {
Some(cur) if cur >= s => cur,
_ => s,
});
}
}
if let (Some(mn), Some(mx)) = (min_str, max_str) {
ZoneMap {
min: Value::text(mn),
max: Value::text(mx),
null_count,
row_count,
}
} else {
ZoneMap {
min: Value::Null(DataType::Text),
max: Value::Null(DataType::Text),
null_count,
row_count,
}
}
}
ColumnData::Bytes {
nulls, ext_type, ..
} => {
for is_null in &nulls[start..end] {
if *is_null {
null_count += 1;
}
}
ZoneMap {
min: Value::Null(*ext_type),
max: Value::Null(*ext_type),
null_count,
row_count,
}
}
}
}
#[inline]
pub fn len(&self) -> usize {
match self {
ColumnData::Int64 { values, .. } => values.len(),
ColumnData::Float64 { values, .. } => values.len(),
ColumnData::TimestampNanos { values, .. } => values.len(),
ColumnData::Boolean { values, .. } => values.len(),
ColumnData::Dictionary { ids, .. } => ids.len(),
ColumnData::Bytes { offsets, .. } => offsets.len(),
}
}
#[inline]
pub fn data_type(&self) -> DataType {
match self {
ColumnData::Int64 { .. } => DataType::Integer,
ColumnData::Float64 { .. } => DataType::Float,
ColumnData::TimestampNanos { .. } => DataType::Timestamp,
ColumnData::Boolean { .. } => DataType::Boolean,
ColumnData::Dictionary { .. } => DataType::Text,
ColumnData::Bytes { ext_type, .. } => *ext_type,
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_null(&self, idx: usize) -> bool {
match self {
ColumnData::Int64 { nulls, .. }
| ColumnData::Float64 { nulls, .. }
| ColumnData::TimestampNanos { nulls, .. }
| ColumnData::Boolean { nulls, .. }
| ColumnData::Dictionary { nulls, .. }
| ColumnData::Bytes { nulls, .. } => nulls[idx],
}
}
pub fn get_value(&self, idx: usize) -> Value {
match self {
ColumnData::Int64 { values, nulls } => {
if nulls[idx] {
Value::Null(DataType::Integer)
} else {
Value::Integer(values[idx])
}
}
ColumnData::Float64 { values, nulls } => {
if nulls[idx] {
Value::Null(DataType::Float)
} else {
Value::Float(values[idx])
}
}
ColumnData::TimestampNanos { values, nulls } => {
if nulls[idx] {
Value::Null(DataType::Timestamp)
} else {
let nanos = values[idx];
let secs = nanos.div_euclid(1_000_000_000);
let sub_nanos = nanos.rem_euclid(1_000_000_000) as u32;
match chrono::TimeZone::timestamp_opt(&chrono::Utc, secs, sub_nanos) {
chrono::LocalResult::Single(dt) => Value::Timestamp(dt),
_ => Value::Null(DataType::Timestamp),
}
}
}
ColumnData::Boolean { values, nulls } => {
if nulls[idx] {
Value::Null(DataType::Boolean)
} else {
Value::Boolean(values[idx])
}
}
ColumnData::Dictionary {
ids,
dictionary,
nulls,
} => {
if nulls[idx] {
Value::Null(DataType::Text)
} else {
Value::Text(dictionary[ids[idx] as usize].clone())
}
}
ColumnData::Bytes {
data,
offsets,
ext_type,
nulls,
} => {
if nulls[idx] {
Value::Null(*ext_type)
} else {
let (off, len) = offsets[idx];
let bytes = &data[off as usize..(off + len) as usize];
let mut tagged = Vec::with_capacity(1 + bytes.len());
tagged.push(*ext_type as u8);
tagged.extend_from_slice(bytes);
Value::Extension(crate::common::CompactArc::from(tagged))
}
}
}
}
#[inline]
pub fn get_i64(&self, idx: usize) -> i64 {
match self {
ColumnData::Int64 { values, .. } | ColumnData::TimestampNanos { values, .. } => {
values[idx]
}
_ => 0,
}
}
#[inline]
pub fn get_f64(&self, idx: usize) -> f64 {
match self {
ColumnData::Float64 { values, .. } => values[idx],
_ => 0.0,
}
}
#[inline]
pub fn get_bool(&self, idx: usize) -> bool {
match self {
ColumnData::Boolean { values, .. } => values[idx],
_ => false,
}
}
#[inline]
pub fn get_str(&self, idx: usize) -> &str {
match self {
ColumnData::Dictionary {
ids, dictionary, ..
} => &dictionary[ids[idx] as usize],
_ => "",
}
}
#[inline]
pub fn get_dict_id(&self, idx: usize) -> u32 {
match self {
ColumnData::Dictionary { ids, .. } => ids[idx],
_ => u32::MAX,
}
}
pub fn binary_search_ge(&self, target: i64) -> usize {
match self {
ColumnData::Int64 { values, .. } | ColumnData::TimestampNanos { values, .. } => {
values.partition_point(|v| *v < target)
}
_ => 0,
}
}
pub fn binary_search_gt(&self, target: i64) -> usize {
match self {
ColumnData::Int64 { values, .. } | ColumnData::TimestampNanos { values, .. } => {
values.partition_point(|v| *v <= target)
}
_ => 0,
}
}
pub fn dict_lookup(&self, value: &str) -> Option<u32> {
match self {
ColumnData::Dictionary { dictionary, .. } => {
dictionary
.iter()
.position(|s| s.as_str() == value)
.map(|p| p as u32)
}
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct ColumnBloomFilter {
bits: Vec<u64>,
num_bits: usize,
}
impl ColumnBloomFilter {
pub fn memory_size(&self) -> usize {
self.bits.len() * 8 + 8
}
pub fn new(expected_elements: usize) -> Self {
let num_bits = (expected_elements * 10).max(64);
let num_words = num_bits.div_ceil(64);
Self {
bits: vec![0u64; num_words],
num_bits,
}
}
fn add_raw(&mut self, tag: u8, bytes: &[u8]) {
const FNV_OFFSET: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x100000001b3;
let mut h = FNV_OFFSET;
h ^= tag as u64;
h = h.wrapping_mul(FNV_PRIME);
for &b in bytes {
h ^= b as u64;
h = h.wrapping_mul(FNV_PRIME);
}
self.insert_hash(h);
}
pub fn add_i64(&mut self, val: i64) {
self.add_raw(1, &val.to_le_bytes());
}
pub fn add_f64(&mut self, val: f64) {
self.add_raw(2, &val.to_bits().to_le_bytes());
}
pub fn add_str(&mut self, val: &str) {
self.add_raw(3, val.as_bytes());
}
pub fn add_bool(&mut self, val: bool) {
self.add_raw(4, &[val as u8]);
}
pub fn add_timestamp_nanos(&mut self, nanos: i64) {
self.add_raw(5, &nanos.to_le_bytes());
}
#[inline]
pub fn add_extension_noop(&mut self) {
self.add_raw(0, &[]);
}
fn insert_hash(&mut self, h: u64) {
let h1 = h as usize;
let h2 = (h >> 32) as usize;
for i in 0..3usize {
let bit = (h1.wrapping_add(i.wrapping_mul(h2))) % self.num_bits;
self.bits[bit / 64] |= 1u64 << (bit % 64);
}
}
pub fn add(&mut self, value: &Value) {
let h = Self::hash_value(value);
self.insert_hash(h);
}
pub fn might_contain(&self, value: &Value) -> bool {
self.might_contain_hash(Self::hash_value(value))
}
#[inline]
pub fn might_contain_hash(&self, h: u64) -> bool {
let h1 = h as usize;
let h2 = (h >> 32) as usize;
for i in 0..3usize {
let bit = (h1.wrapping_add(i.wrapping_mul(h2))) % self.num_bits;
if self.bits[bit / 64] & (1u64 << (bit % 64)) == 0 {
return false;
}
}
true
}
pub fn hash_value_static(value: &Value) -> u64 {
Self::hash_value(value)
}
#[inline]
pub fn num_bits(&self) -> usize {
self.num_bits
}
pub fn bits_as_bytes(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(self.bits.len() * 8);
for &word in &self.bits {
out.extend_from_slice(&word.to_le_bytes());
}
out
}
pub fn from_parts(num_bits: usize, bytes: &[u8]) -> Self {
let num_words = bytes.len() / 8;
let mut bits = Vec::with_capacity(num_words);
for i in 0..num_words {
let offset = i * 8;
bits.push(u64::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
bytes[offset + 4],
bytes[offset + 5],
bytes[offset + 6],
bytes[offset + 7],
]));
}
let max_bits = num_words * 64;
let safe_num_bits = if num_bits > max_bits || num_bits == 0 {
max_bits.max(1)
} else {
num_bits
};
Self {
bits,
num_bits: safe_num_bits,
}
}
fn hash_value(value: &Value) -> u64 {
const FNV_OFFSET: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x100000001b3;
let mut h = FNV_OFFSET;
#[inline(always)]
fn mix(h: &mut u64, bytes: &[u8]) {
for &b in bytes {
*h ^= b as u64;
*h = h.wrapping_mul(FNV_PRIME);
}
}
match value {
Value::Integer(i) => {
mix(&mut h, &[1]);
mix(&mut h, &i.to_le_bytes());
}
Value::Float(f) => {
mix(&mut h, &[2]);
mix(&mut h, &f.to_bits().to_le_bytes());
}
Value::Text(s) => {
mix(&mut h, &[3]);
mix(&mut h, s.as_bytes());
}
Value::Boolean(b) => {
mix(&mut h, &[4]);
mix(&mut h, &[*b as u8]);
}
Value::Timestamp(ts) => {
mix(&mut h, &[5]);
let nanos = ts.timestamp_nanos_opt().unwrap_or_else(|| {
ts.timestamp()
.wrapping_mul(1_000_000_000)
.wrapping_add(ts.timestamp_subsec_nanos() as i64)
});
mix(&mut h, &nanos.to_le_bytes());
}
_ => {
mix(&mut h, &[0]);
}
}
h
}
}
impl ZoneMap {
#[inline]
pub fn may_contain_gte(&self, value: &Value) -> bool {
if self.max.is_null() {
return false; }
self.max
.compare(value)
.map(|o| o != std::cmp::Ordering::Less)
.unwrap_or(true) }
#[inline]
pub fn may_contain_lte(&self, value: &Value) -> bool {
if self.min.is_null() {
return false;
}
self.min
.compare(value)
.map(|o| o != std::cmp::Ordering::Greater)
.unwrap_or(true)
}
#[inline]
pub fn may_contain_eq(&self, value: &Value) -> bool {
if self.min.is_null() {
return false;
}
let above_min = self
.min
.compare(value)
.map(|o| o != std::cmp::Ordering::Greater)
.unwrap_or(true);
let below_max = self
.max
.compare(value)
.map(|o| o != std::cmp::Ordering::Less)
.unwrap_or(true);
above_min && below_max
}
#[inline]
pub fn may_contain_between(&self, low: &Value, high: &Value) -> bool {
self.may_contain_gte(low) && self.may_contain_lte(high)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_int64_column() {
let col = ColumnData::Int64 {
values: vec![10, 20, 30, 0, 50],
nulls: vec![false, false, false, true, false],
};
assert_eq!(col.len(), 5);
assert_eq!(col.get_i64(0), 10);
assert_eq!(col.get_i64(2), 30);
assert!(col.is_null(3));
assert!(!col.is_null(0));
assert_eq!(col.get_value(0), Value::Integer(10));
assert!(col.get_value(3).is_null());
}
#[test]
fn test_float64_column() {
let col = ColumnData::Float64 {
values: vec![1.5, 2.5, 0.0],
nulls: vec![false, false, true],
};
assert_eq!(col.get_f64(0), 1.5);
assert_eq!(col.get_value(2), Value::Null(DataType::Float));
}
#[test]
fn test_dictionary_column() {
let col = ColumnData::Dictionary {
ids: vec![0, 1, 0, 1, 0],
dictionary: Arc::from(vec![
SmartString::from("binance"),
SmartString::from("coinbase"),
]),
nulls: vec![false, false, false, false, false],
};
assert_eq!(col.get_str(0), "binance");
assert_eq!(col.get_str(1), "coinbase");
assert_eq!(col.get_str(2), "binance");
assert_eq!(col.get_dict_id(0), 0);
assert_eq!(col.get_dict_id(1), 1);
assert_eq!(col.dict_lookup("binance"), Some(0));
assert_eq!(col.dict_lookup("unknown"), None);
}
#[test]
fn test_binary_search() {
let col = ColumnData::TimestampNanos {
values: vec![100, 200, 300, 400, 500],
nulls: vec![false; 5],
};
assert_eq!(col.binary_search_ge(250), 2); assert_eq!(col.binary_search_ge(300), 2); assert_eq!(col.binary_search_ge(100), 0); assert_eq!(col.binary_search_ge(600), 5); assert_eq!(col.binary_search_gt(300), 3); }
#[test]
fn test_zone_map_pruning() {
let zm = ZoneMap {
min: Value::Integer(10),
max: Value::Integer(100),
null_count: 0,
row_count: 50,
};
assert!(zm.may_contain_gte(&Value::Integer(50))); assert!(zm.may_contain_gte(&Value::Integer(100))); assert!(!zm.may_contain_gte(&Value::Integer(101))); assert!(zm.may_contain_lte(&Value::Integer(50))); assert!(zm.may_contain_lte(&Value::Integer(10))); assert!(!zm.may_contain_lte(&Value::Integer(9))); assert!(zm.may_contain_eq(&Value::Integer(50)));
assert!(!zm.may_contain_eq(&Value::Integer(5)));
assert!(!zm.may_contain_eq(&Value::Integer(101)));
}
}