use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use crate::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValueRange {
pub min: Option<Value>,
pub max: Option<Value>,
pub has_nulls: bool,
pub count: u64,
pub null_count: u64,
}
impl ValueRange {
pub fn empty() -> Self {
Self {
min: None,
max: None,
has_nulls: false,
count: 0,
null_count: 0,
}
}
pub fn from_value(value: Value) -> Self {
if matches!(value, Value::Null) {
Self {
min: None,
max: None,
has_nulls: true,
count: 1,
null_count: 1,
}
} else {
Self {
min: Some(value.clone()),
max: Some(value),
has_nulls: false,
count: 1,
null_count: 0,
}
}
}
pub fn update(&mut self, value: &Value) {
self.count += 1;
if matches!(value, Value::Null) {
self.has_nulls = true;
self.null_count += 1;
return;
}
match &self.min {
None => self.min = Some(value.clone()),
Some(current_min) => {
if Self::compare_values(value, current_min) == Some(Ordering::Less) {
self.min = Some(value.clone());
}
}
}
match &self.max {
None => self.max = Some(value.clone()),
Some(current_max) => {
if Self::compare_values(value, current_max) == Some(Ordering::Greater) {
self.max = Some(value.clone());
}
}
}
}
pub fn merge(&mut self, other: &ValueRange) {
self.count += other.count;
self.null_count += other.null_count;
self.has_nulls = self.has_nulls || other.has_nulls;
if let Some(other_min) = &other.min {
match &self.min {
None => self.min = Some(other_min.clone()),
Some(current_min) => {
if Self::compare_values(other_min, current_min) == Some(Ordering::Less) {
self.min = Some(other_min.clone());
}
}
}
}
if let Some(other_max) = &other.max {
match &self.max {
None => self.max = Some(other_max.clone()),
Some(current_max) => {
if Self::compare_values(other_max, current_max) == Some(Ordering::Greater) {
self.max = Some(other_max.clone());
}
}
}
}
}
pub fn might_contain(&self, value: &Value) -> bool {
if matches!(value, Value::Null) {
return self.has_nulls;
}
let min = match &self.min {
Some(m) => m,
None => return false, };
let max = match &self.max {
Some(m) => m,
None => return false,
};
let ge_min = Self::compare_values(value, min)
.map(|o| o != Ordering::Less)
.unwrap_or(false);
let le_max = Self::compare_values(value, max)
.map(|o| o != Ordering::Greater)
.unwrap_or(false);
ge_min && le_max
}
pub fn might_contain_less_than(&self, value: &Value) -> bool {
match &self.min {
Some(min) => {
Self::compare_values(min, value)
.map(|o| o == Ordering::Less)
.unwrap_or(false)
}
None => false,
}
}
pub fn might_contain_less_or_equal(&self, value: &Value) -> bool {
match &self.min {
Some(min) => {
Self::compare_values(min, value)
.map(|o| o != Ordering::Greater)
.unwrap_or(false)
}
None => false,
}
}
pub fn might_contain_greater_than(&self, value: &Value) -> bool {
match &self.max {
Some(max) => {
Self::compare_values(max, value)
.map(|o| o == Ordering::Greater)
.unwrap_or(false)
}
None => false,
}
}
pub fn might_contain_greater_or_equal(&self, value: &Value) -> bool {
match &self.max {
Some(max) => {
Self::compare_values(max, value)
.map(|o| o != Ordering::Less)
.unwrap_or(false)
}
None => false,
}
}
pub fn might_overlap(&self, low: &Value, high: &Value) -> bool {
let min_le_high = match &self.min {
Some(min) => Self::compare_values(min, high)
.map(|o| o != Ordering::Greater)
.unwrap_or(false),
None => false,
};
let max_ge_low = match &self.max {
Some(max) => Self::compare_values(max, low)
.map(|o| o != Ordering::Less)
.unwrap_or(false),
None => false,
};
min_le_high && max_ge_low
}
fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
match (a, b) {
(Value::Null, Value::Null) => Some(Ordering::Equal),
(Value::Null, _) => Some(Ordering::Less), (_, Value::Null) => Some(Ordering::Greater),
(Value::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
(Value::Int2(a), Value::Int2(b)) => Some(a.cmp(b)),
(Value::Int4(a), Value::Int4(b)) => Some(a.cmp(b)),
(Value::Int8(a), Value::Int8(b)) => Some(a.cmp(b)),
(Value::Int2(a), Value::Int4(b)) => Some((*a as i32).cmp(b)),
(Value::Int4(a), Value::Int2(b)) => Some(a.cmp(&(*b as i32))),
(Value::Int2(a), Value::Int8(b)) => Some((*a as i64).cmp(b)),
(Value::Int8(a), Value::Int2(b)) => Some(a.cmp(&(*b as i64))),
(Value::Int4(a), Value::Int8(b)) => Some((*a as i64).cmp(b)),
(Value::Int8(a), Value::Int4(b)) => Some(a.cmp(&(*b as i64))),
(Value::Float4(a), Value::Float4(b)) => a.partial_cmp(b),
(Value::Float8(a), Value::Float8(b)) => a.partial_cmp(b),
(Value::Float4(a), Value::Float8(b)) => (*a as f64).partial_cmp(b),
(Value::Float8(a), Value::Float4(b)) => a.partial_cmp(&(*b as f64)),
(Value::String(a), Value::String(b)) => Some(a.cmp(b)),
(Value::Bytes(a), Value::Bytes(b)) => Some(a.cmp(b)),
(Value::Uuid(a), Value::Uuid(b)) => Some(a.cmp(b)),
(Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
(Value::Numeric(a), Value::Numeric(b)) => {
Self::compare_numeric_strings(a, b)
}
(Value::Json(a), Value::Json(b)) => Some(a.cmp(b)),
_ => None,
}
}
fn compare_numeric_strings(a: &str, b: &str) -> Option<Ordering> {
let a_val: f64 = a.parse().ok()?;
let b_val: f64 = b.parse().ok()?;
a_val.partial_cmp(&b_val)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnZoneMap {
pub column_name: String,
pub range: ValueRange,
}
impl ColumnZoneMap {
pub fn new(column_name: String) -> Self {
Self {
column_name,
range: ValueRange::empty(),
}
}
pub fn update(&mut self, value: &Value) {
self.range.update(value);
}
pub fn merge(&mut self, other: &ColumnZoneMap) {
self.range.merge(&other.range);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlockZoneMap {
pub block_id: u64,
pub row_count: u64,
pub first_row_id: u64,
pub last_row_id: u64,
pub columns: HashMap<String, ColumnZoneMap>,
}
impl BlockZoneMap {
pub fn new(block_id: u64, first_row_id: u64) -> Self {
Self {
block_id,
row_count: 0,
first_row_id,
last_row_id: first_row_id,
columns: HashMap::new(),
}
}
pub fn add_row(&mut self, row_id: u64, values: &[(String, Value)]) {
self.row_count += 1;
self.last_row_id = row_id;
for (col_name, value) in values {
self.columns
.entry(col_name.clone())
.or_insert_with(|| ColumnZoneMap::new(col_name.clone()))
.update(value);
}
}
pub fn might_contain(&self, column_name: &str, value: &Value) -> bool {
match self.columns.get(column_name) {
Some(czm) => czm.range.might_contain(value),
None => true, }
}
pub fn might_match_range(
&self,
column_name: &str,
op: RangeOp,
value: &Value,
) -> bool {
match self.columns.get(column_name) {
Some(czm) => match op {
RangeOp::Eq => czm.range.might_contain(value),
RangeOp::NotEq => true, RangeOp::Lt => czm.range.might_contain_less_than(value),
RangeOp::LtEq => czm.range.might_contain_less_or_equal(value),
RangeOp::Gt => czm.range.might_contain_greater_than(value),
RangeOp::GtEq => czm.range.might_contain_greater_or_equal(value),
},
None => true,
}
}
pub fn might_match_between(
&self,
column_name: &str,
low: &Value,
high: &Value,
) -> bool {
match self.columns.get(column_name) {
Some(czm) => czm.range.might_overlap(low, high),
None => true,
}
}
pub fn might_contain_null(&self, column_name: &str) -> bool {
match self.columns.get(column_name) {
Some(czm) => czm.range.has_nulls,
None => true,
}
}
pub fn might_contain_not_null(&self, column_name: &str) -> bool {
match self.columns.get(column_name) {
Some(czm) => czm.range.count > czm.range.null_count,
None => true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RangeOp {
Eq,
NotEq,
Lt,
LtEq,
Gt,
GtEq,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ZoneMapStats {
pub total_blocks: u64,
pub blocks_skipped: u64,
pub blocks_scanned: u64,
pub predicate_evaluations: u64,
}
impl ZoneMapStats {
pub fn skip_ratio(&self) -> f64 {
let total = self.blocks_skipped + self.blocks_scanned;
if total == 0 {
0.0
} else {
self.blocks_skipped as f64 / total as f64
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableZoneMap {
pub table_name: String,
pub blocks: Vec<BlockZoneMap>,
pub stats: ZoneMapStats,
pub block_size: usize,
}
impl TableZoneMap {
pub fn new(table_name: String, block_size: usize) -> Self {
Self {
table_name,
blocks: Vec::new(),
stats: ZoneMapStats::default(),
block_size: block_size.max(1),
}
}
pub fn with_defaults(table_name: String) -> Self {
Self::new(table_name, 1000)
}
pub fn add_row(&mut self, row_id: u64, values: &[(String, Value)]) {
let block_idx = (row_id as usize) / self.block_size;
while self.blocks.len() <= block_idx {
let block_id = self.blocks.len() as u64;
let first_row_id = block_id * self.block_size as u64;
self.blocks.push(BlockZoneMap::new(block_id, first_row_id));
self.stats.total_blocks += 1;
}
if let Some(block) = self.blocks.get_mut(block_idx) {
block.add_row(row_id, values);
}
}
pub fn get_matching_blocks_eq(&mut self, column_name: &str, value: &Value) -> Vec<u64> {
let mut matching = Vec::new();
for block in &self.blocks {
self.stats.predicate_evaluations += 1;
if block.might_contain(column_name, value) {
matching.push(block.block_id);
self.stats.blocks_scanned += 1;
} else {
self.stats.blocks_skipped += 1;
}
}
matching
}
pub fn get_matching_blocks_range(
&mut self,
column_name: &str,
op: RangeOp,
value: &Value,
) -> Vec<u64> {
let mut matching = Vec::new();
for block in &self.blocks {
self.stats.predicate_evaluations += 1;
if block.might_match_range(column_name, op, value) {
matching.push(block.block_id);
self.stats.blocks_scanned += 1;
} else {
self.stats.blocks_skipped += 1;
}
}
matching
}
pub fn build_from_tuples(&mut self, tuples: &[crate::Tuple], schema: &crate::Schema) {
for (row_id, tuple) in tuples.iter().enumerate() {
let values: Vec<(String, Value)> = schema.columns.iter()
.zip(tuple.values.iter())
.map(|(col, val)| (col.name.clone(), val.clone()))
.collect();
self.add_row(row_id as u64, &values);
}
}
pub fn get_matching_blocks_between(
&mut self,
column_name: &str,
low: &Value,
high: &Value,
) -> Vec<u64> {
let mut matching = Vec::new();
for block in &self.blocks {
self.stats.predicate_evaluations += 1;
if block.might_match_between(column_name, low, high) {
matching.push(block.block_id);
self.stats.blocks_scanned += 1;
} else {
self.stats.blocks_skipped += 1;
}
}
matching
}
pub fn get_blocks_with_nulls(&mut self, column_name: &str) -> Vec<u64> {
let mut matching = Vec::new();
for block in &self.blocks {
if block.might_contain_null(column_name) {
matching.push(block.block_id);
}
}
matching
}
pub fn get_blocks_with_not_nulls(&mut self, column_name: &str) -> Vec<u64> {
let mut matching = Vec::new();
for block in &self.blocks {
if block.might_contain_not_null(column_name) {
matching.push(block.block_id);
}
}
matching
}
pub fn get_block_row_range(&self, block_id: u64) -> Option<(u64, u64)> {
self.blocks
.get(block_id as usize)
.map(|b| (b.first_row_id, b.last_row_id))
}
pub fn stats(&self) -> &ZoneMapStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = ZoneMapStats {
total_blocks: self.blocks.len() as u64,
..Default::default()
};
}
pub fn memory_usage(&self) -> usize {
let mut total = std::mem::size_of::<Self>();
for block in &self.blocks {
total += std::mem::size_of::<BlockZoneMap>();
for (name, czm) in &block.columns {
total += name.len() + std::mem::size_of::<ColumnZoneMap>();
if let Some(min) = &czm.range.min {
total += Self::estimate_value_size(min);
}
if let Some(max) = &czm.range.max {
total += Self::estimate_value_size(max);
}
}
}
total
}
fn estimate_value_size(value: &Value) -> usize {
match value {
Value::Null => 1,
Value::Boolean(_) => 1,
Value::Int2(_) => 2,
Value::Int4(_) => 4,
Value::Int8(_) => 8,
Value::Float4(_) => 4,
Value::Float8(_) => 8,
Value::String(s) => s.len(),
Value::Bytes(b) => b.len(),
Value::Uuid(_) => 16,
Value::Timestamp(_) => 12,
Value::Date(_) => 4,
Value::Time(_) => 8,
Value::Numeric(s) => s.len(),
Value::Json(j) => j.len(),
Value::Array(arr) => arr.iter().map(Self::estimate_value_size).sum(),
Value::Vector(v) => v.len() * 4,
Value::DictRef { .. } => 4,
Value::CasRef { .. } => 32,
Value::ColumnarRef => 1,
Value::Interval(_) => 16, }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_value_range_basic() {
let mut range = ValueRange::empty();
range.update(&Value::Int8(10));
range.update(&Value::Int8(5));
range.update(&Value::Int8(15));
assert_eq!(range.min, Some(Value::Int8(5)));
assert_eq!(range.max, Some(Value::Int8(15)));
assert_eq!(range.count, 3);
}
#[test]
fn test_value_range_nulls() {
let mut range = ValueRange::empty();
range.update(&Value::Int8(10));
range.update(&Value::Null);
range.update(&Value::Int8(20));
assert!(range.has_nulls);
assert_eq!(range.null_count, 1);
assert_eq!(range.min, Some(Value::Int8(10)));
assert_eq!(range.max, Some(Value::Int8(20)));
}
#[test]
fn test_value_range_might_contain() {
let mut range = ValueRange::empty();
range.update(&Value::Int8(10));
range.update(&Value::Int8(20));
assert!(range.might_contain(&Value::Int8(15)));
assert!(range.might_contain(&Value::Int8(10)));
assert!(range.might_contain(&Value::Int8(20)));
assert!(!range.might_contain(&Value::Int8(5)));
assert!(!range.might_contain(&Value::Int8(25)));
}
#[test]
fn test_block_zone_map() {
let mut bzm = BlockZoneMap::new(0, 0);
bzm.add_row(0, &[
("id".to_string(), Value::Int8(1)),
("name".to_string(), Value::String("Alice".to_string())),
]);
bzm.add_row(1, &[
("id".to_string(), Value::Int8(5)),
("name".to_string(), Value::String("Bob".to_string())),
]);
assert!(bzm.might_contain("id", &Value::Int8(3)));
assert!(!bzm.might_contain("id", &Value::Int8(10)));
assert!(bzm.might_match_range("id", RangeOp::Gt, &Value::Int8(0)));
assert!(!bzm.might_match_range("id", RangeOp::Gt, &Value::Int8(10)));
}
#[test]
fn test_table_zone_map() {
let mut tzm = TableZoneMap::new("users".to_string(), 10);
for i in 0..25 {
tzm.add_row(i, &[
("id".to_string(), Value::Int8(i as i64)),
("status".to_string(), Value::String(if i % 2 == 0 { "active" } else { "inactive" }.to_string())),
]);
}
assert_eq!(tzm.blocks.len(), 3);
let matching = tzm.get_matching_blocks_eq("id", &Value::Int8(5));
assert!(matching.contains(&0));
let matching = tzm.get_matching_blocks_range("id", RangeOp::Gt, &Value::Int8(15));
assert!(matching.contains(&1)); assert!(matching.contains(&2)); }
#[test]
fn test_zone_map_between() {
let mut tzm = TableZoneMap::new("test".to_string(), 10);
for i in 0..30 {
tzm.add_row(i, &[("val".to_string(), Value::Int8(i as i64))]);
}
let matching = tzm.get_matching_blocks_between(
"val",
&Value::Int8(5),
&Value::Int8(15),
);
assert!(matching.contains(&0));
assert!(matching.contains(&1));
assert!(!matching.contains(&2)); }
#[test]
fn test_zone_map_strings() {
let mut tzm = TableZoneMap::new("test".to_string(), 5);
tzm.add_row(0, &[("name".to_string(), Value::String("Alice".to_string()))]);
tzm.add_row(1, &[("name".to_string(), Value::String("Bob".to_string()))]);
tzm.add_row(2, &[("name".to_string(), Value::String("Charlie".to_string()))]);
let matching = tzm.get_matching_blocks_eq("name", &Value::String("Bob".to_string()));
assert!(!matching.is_empty());
let matching = tzm.get_matching_blocks_eq("name", &Value::String("Zoe".to_string()));
assert!(matching.is_empty()); }
}