use crate::ast::*;
use powdb_storage::row::{decode_column, RowLayout, ROW_MAGIC, ROW_PREFIX_SIZE};
use powdb_storage::types::*;
#[derive(Clone, Copy)]
pub(super) struct FastPatch {
pub(super) field_off: usize,
pub(super) bitmap_byte_off: usize,
pub(super) bit_mask: u8,
pub(super) bytes: FixedBytes,
}
#[derive(Clone, Copy)]
pub(super) enum FixedBytes {
I64([u8; 8]),
F64([u8; 8]),
Bool(u8),
Uuid([u8; 16]),
}
impl FixedBytes {
#[inline]
pub(super) fn as_slice(&self) -> &[u8] {
match self {
FixedBytes::I64(b) => b.as_slice(),
FixedBytes::F64(b) => b.as_slice(),
FixedBytes::Bool(b) => std::slice::from_ref(b),
FixedBytes::Uuid(b) => b.as_slice(),
}
}
}
pub(super) fn type_name_to_id(name: &str) -> Result<TypeId, String> {
match name.to_ascii_lowercase().as_str() {
"str" | "string" => Ok(TypeId::Str),
"int" => Ok(TypeId::Int),
"float" => Ok(TypeId::Float),
"bool" | "boolean" => Ok(TypeId::Bool),
"datetime" => Ok(TypeId::DateTime),
"uuid" => Ok(TypeId::Uuid),
"bytes" => Ok(TypeId::Bytes),
_ => Err(format!("unknown type name: '{name}'")),
}
}
pub(super) struct FastLayout {
pub(super) bitmap_size: usize,
pub(super) fixed_offsets: Vec<Option<usize>>,
pub(super) fixed_region_size: usize,
pub(super) var_indices: Vec<Option<usize>>,
pub(super) n_var: usize,
}
impl FastLayout {
pub(super) fn new(schema: &Schema) -> Self {
let n_cols = schema.columns.len();
let bitmap_size = n_cols.div_ceil(8);
let mut fixed_offsets = vec![None; n_cols];
let mut var_indices = vec![None; n_cols];
let mut fixed_pos: usize = 0;
let mut var_count: usize = 0;
for (i, col) in schema.columns.iter().enumerate() {
if is_fixed_size(col.type_id) {
fixed_offsets[i] = Some(fixed_pos);
fixed_pos += fixed_size(col.type_id)
.expect("is_fixed_size guard ensures fixed_size returns Some");
} else {
var_indices[i] = Some(var_count);
var_count += 1;
}
}
FastLayout {
bitmap_size,
fixed_offsets,
fixed_region_size: fixed_pos,
var_indices,
n_var: var_count,
}
}
#[inline]
fn var_offset_table_start(&self) -> usize {
2 + self.bitmap_size + self.fixed_region_size
}
#[inline]
fn var_data_start(&self) -> usize {
self.var_offset_table_start() + (self.n_var + 1) * 2
}
}
pub(super) type CompiledPredicate = Box<dyn Fn(&[u8]) -> bool>;
#[inline]
pub(super) fn f64_bits_to_sortable_u64(bits: u64) -> u64 {
if bits & 0x8000_0000_0000_0000 == 0 {
bits ^ 0x8000_0000_0000_0000
} else {
!bits
}
}
enum CompiledLeaf {
Int {
data_offset: usize,
bitmap_byte: usize,
bitmap_bit: u8,
op: BinOp,
literal: i64,
},
Float {
data_offset: usize,
bitmap_byte: usize,
bitmap_bit: u8,
op: BinOp,
literal: f64,
},
IsNull {
bitmap_byte: usize,
bitmap_bit: u8,
want_null: bool,
},
StrEq {
var_offset_table_start: usize,
var_data_start: usize,
var_idx: usize,
bitmap_byte: usize,
bitmap_bit: u8,
negate: bool,
needle: Vec<u8>,
},
}
impl CompiledLeaf {
#[inline]
fn eval(&self, data: &[u8]) -> bool {
let base = if data.len() >= ROW_PREFIX_SIZE && &data[0..4] == ROW_MAGIC {
ROW_PREFIX_SIZE
} else {
0
};
let data = &data[base..];
match self {
CompiledLeaf::Int {
data_offset,
bitmap_byte,
bitmap_bit,
op,
literal,
} => {
if data.len() < *data_offset + 8 || data.len() < 3 + bitmap_byte {
return false;
}
let is_null = (data[2 + bitmap_byte] >> bitmap_bit) & 1 == 1;
if is_null {
return false;
}
let val = i64::from_le_bytes(
data[*data_offset..*data_offset + 8]
.try_into()
.unwrap_or_else(|_| unreachable!()),
);
match op {
BinOp::Eq => val == *literal,
BinOp::Neq => val != *literal,
BinOp::Lt => val < *literal,
BinOp::Gt => val > *literal,
BinOp::Lte => val <= *literal,
BinOp::Gte => val >= *literal,
_ => false,
}
}
CompiledLeaf::Float {
data_offset,
bitmap_byte,
bitmap_bit,
op,
literal,
} => {
if data.len() < *data_offset + 8 || data.len() < 3 + bitmap_byte {
return false;
}
let is_null = (data[2 + bitmap_byte] >> bitmap_bit) & 1 == 1;
if is_null {
return false;
}
let val = f64::from_le_bytes(
data[*data_offset..*data_offset + 8]
.try_into()
.unwrap_or_else(|_| unreachable!()),
);
let ord = val.total_cmp(literal);
match op {
BinOp::Eq => ord.is_eq(),
BinOp::Neq => !ord.is_eq(),
BinOp::Lt => ord.is_lt(),
BinOp::Gt => ord.is_gt(),
BinOp::Lte => !ord.is_gt(),
BinOp::Gte => !ord.is_lt(),
_ => false,
}
}
CompiledLeaf::IsNull {
bitmap_byte,
bitmap_bit,
want_null,
} => {
let is_null = (data[2 + bitmap_byte] >> bitmap_bit) & 1 == 1;
if *want_null {
is_null
} else {
!is_null
}
}
CompiledLeaf::StrEq {
var_offset_table_start,
var_data_start,
var_idx,
bitmap_byte,
bitmap_bit,
negate,
needle,
} => {
if data.len() < 3 + bitmap_byte {
return false;
}
let is_null = (data[2 + bitmap_byte] >> bitmap_bit) & 1 == 1;
if is_null {
return false;
}
let off_pos = var_offset_table_start + var_idx * 2;
let next_pos = var_offset_table_start + (var_idx + 1) * 2;
if data.len() < next_pos + 2 {
return false;
}
let start = u16::from_le_bytes(
data[off_pos..off_pos + 2]
.try_into()
.unwrap_or_else(|_| unreachable!()),
) as usize;
let end = u16::from_le_bytes(
data[next_pos..next_pos + 2]
.try_into()
.unwrap_or_else(|_| unreachable!()),
) as usize;
let abs_start = var_data_start + start;
let abs_end = var_data_start + end;
if abs_end > data.len() || abs_start > abs_end {
return false;
}
let slice = &data[abs_start..abs_end];
let eq = slice == needle.as_slice();
if *negate {
!eq
} else {
eq
}
}
}
}
}
pub(super) fn compile_predicate(
expr: &Expr,
columns: &[String],
layout: &FastLayout,
schema: &Schema,
) -> Option<CompiledPredicate> {
let mut leaves: Vec<CompiledLeaf> = Vec::new();
flatten_and_compile(expr, columns, layout, schema, &mut leaves)?;
if leaves.is_empty() {
return None;
}
if leaves.len() == 1 {
let leaf = leaves
.into_iter()
.next()
.expect("leaves.len() == 1 checked above");
return Some(Box::new(move |data: &[u8]| leaf.eval(data)));
}
Some(Box::new(move |data: &[u8]| {
for leaf in &leaves {
if !leaf.eval(data) {
return false;
}
}
true
}))
}
fn flatten_and_compile(
expr: &Expr,
columns: &[String],
layout: &FastLayout,
schema: &Schema,
out: &mut Vec<CompiledLeaf>,
) -> Option<()> {
match expr {
Expr::BinaryOp(left, BinOp::And, right) => {
flatten_and_compile(left, columns, layout, schema, out)?;
flatten_and_compile(right, columns, layout, schema, out)?;
Some(())
}
Expr::BinaryOp(left, op, right) => {
if let Some(leaf) = build_int_leaf(left, *op, right, columns, layout, schema) {
out.push(leaf);
return Some(());
}
if let Some(leaf) = build_float_leaf(left, *op, right, columns, layout, schema) {
out.push(leaf);
return Some(());
}
if let Some(leaf) = build_str_eq_leaf(left, *op, right, columns, layout, schema) {
out.push(leaf);
return Some(());
}
None
}
Expr::UnaryOp(op, inner) if *op == UnaryOp::IsNull || *op == UnaryOp::IsNotNull => {
if let Expr::Field(name) = inner.as_ref() {
let col_idx = columns.iter().position(|c| c == name)?;
let bitmap_byte = col_idx / 8;
let bitmap_bit = (col_idx % 8) as u8;
let want_null = *op == UnaryOp::IsNull;
out.push(CompiledLeaf::IsNull {
bitmap_byte,
bitmap_bit,
want_null,
});
Some(())
} else {
None
}
}
_ => None,
}
}
fn build_int_leaf(
left: &Expr,
op: BinOp,
right: &Expr,
columns: &[String],
layout: &FastLayout,
schema: &Schema,
) -> Option<CompiledLeaf> {
let (field_name, literal_val, op) = match (left, right) {
(Expr::Field(name), Expr::Literal(Literal::Int(v))) => (name, *v, op),
(Expr::Literal(Literal::Int(v)), Expr::Field(name)) => {
let flipped = match op {
BinOp::Lt => BinOp::Gt,
BinOp::Gt => BinOp::Lt,
BinOp::Lte => BinOp::Gte,
BinOp::Gte => BinOp::Lte,
other => other, };
(name, *v, flipped)
}
_ => return None,
};
let col_idx = columns.iter().position(|c| c == field_name)?;
if schema.columns[col_idx].type_id != TypeId::Int {
return None;
}
let byte_offset = layout.fixed_offsets[col_idx]?;
let bitmap_byte = col_idx / 8;
let bitmap_bit = (col_idx % 8) as u8;
let data_offset = 2 + layout.bitmap_size + byte_offset;
Some(CompiledLeaf::Int {
data_offset,
bitmap_byte,
bitmap_bit,
op,
literal: literal_val,
})
}
fn build_float_leaf(
left: &Expr,
op: BinOp,
right: &Expr,
columns: &[String],
layout: &FastLayout,
schema: &Schema,
) -> Option<CompiledLeaf> {
let (field_name, literal_val, op) = match (left, right) {
(Expr::Field(name), Expr::Literal(Literal::Float(v))) => (name, *v, op),
(Expr::Field(name), Expr::Literal(Literal::Int(v))) => (name, *v as f64, op),
(Expr::Literal(Literal::Float(v)), Expr::Field(name)) => {
let flipped = match op {
BinOp::Lt => BinOp::Gt,
BinOp::Gt => BinOp::Lt,
BinOp::Lte => BinOp::Gte,
BinOp::Gte => BinOp::Lte,
other => other,
};
(name, *v, flipped)
}
(Expr::Literal(Literal::Int(v)), Expr::Field(name)) => {
let flipped = match op {
BinOp::Lt => BinOp::Gt,
BinOp::Gt => BinOp::Lt,
BinOp::Lte => BinOp::Gte,
BinOp::Gte => BinOp::Lte,
other => other,
};
(name, *v as f64, flipped)
}
_ => return None,
};
let col_idx = columns.iter().position(|c| c == field_name)?;
if schema.columns[col_idx].type_id != TypeId::Float {
return None;
}
let byte_offset = layout.fixed_offsets[col_idx]?;
let bitmap_byte = col_idx / 8;
let bitmap_bit = (col_idx % 8) as u8;
let data_offset = 2 + layout.bitmap_size + byte_offset;
Some(CompiledLeaf::Float {
data_offset,
bitmap_byte,
bitmap_bit,
op,
literal: literal_val,
})
}
fn build_str_eq_leaf(
left: &Expr,
op: BinOp,
right: &Expr,
columns: &[String],
layout: &FastLayout,
schema: &Schema,
) -> Option<CompiledLeaf> {
if op != BinOp::Eq && op != BinOp::Neq {
return None;
}
let (field_name, literal_str) = match (left, right) {
(Expr::Field(name), Expr::Literal(Literal::String(s))) => (name, s.clone()),
(Expr::Literal(Literal::String(s)), Expr::Field(name)) => (name, s.clone()),
_ => return None,
};
let col_idx = columns.iter().position(|c| c == field_name)?;
if schema.columns[col_idx].type_id != TypeId::Str {
return None;
}
let var_idx = layout.var_indices[col_idx]?;
let var_offset_table_start = layout.var_offset_table_start();
let var_data_start = layout.var_data_start();
let bitmap_byte = col_idx / 8;
let bitmap_bit = (col_idx % 8) as u8;
let negate = op == BinOp::Neq;
Some(CompiledLeaf::StrEq {
var_offset_table_start,
var_data_start,
var_idx,
bitmap_byte,
bitmap_bit,
negate,
needle: literal_str.into_bytes(),
})
}
pub(super) fn predicate_column_indices(expr: &Expr, columns: &[String]) -> Vec<usize> {
let mut indices = Vec::new();
collect_field_indices(expr, columns, &mut indices);
indices.sort_unstable();
indices.dedup();
indices
}
fn collect_field_indices(expr: &Expr, columns: &[String], out: &mut Vec<usize>) {
match expr {
Expr::Field(name) => {
if let Some(idx) = columns.iter().position(|c| c == name) {
out.push(idx);
}
}
Expr::BinaryOp(left, _, right) => {
collect_field_indices(left, columns, out);
collect_field_indices(right, columns, out);
}
Expr::Coalesce(left, right) => {
collect_field_indices(left, columns, out);
collect_field_indices(right, columns, out);
}
Expr::UnaryOp(_, inner) => {
collect_field_indices(inner, columns, out);
}
Expr::FunctionCall(_, inner) => {
collect_field_indices(inner, columns, out);
}
Expr::ScalarFunc(_, args) => {
for arg in args {
collect_field_indices(arg, columns, out);
}
}
Expr::Cast(inner, _) => {
collect_field_indices(inner, columns, out);
}
Expr::Case { whens, else_expr } => {
for (cond, result) in whens {
collect_field_indices(cond, columns, out);
collect_field_indices(result, columns, out);
}
if let Some(e) = else_expr {
collect_field_indices(e, columns, out);
}
}
Expr::InList { expr, list, .. } => {
collect_field_indices(expr, columns, out);
for item in list {
collect_field_indices(item, columns, out);
}
}
Expr::InSubquery { expr, .. } => {
collect_field_indices(expr, columns, out);
}
_ => {}
}
}
pub(super) fn decode_selective(
schema: &Schema,
layout: &RowLayout,
data: &[u8],
col_indices: &[usize],
) -> Vec<Value> {
let n_cols = schema.columns.len();
let mut values = vec![Value::Empty; n_cols];
for &ci in col_indices {
values[ci] = decode_column(schema, layout, data, ci);
}
values
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn type_name_to_id_lowercase() {
assert_eq!(type_name_to_id("int").unwrap(), TypeId::Int);
assert_eq!(type_name_to_id("str").unwrap(), TypeId::Str);
assert_eq!(type_name_to_id("float").unwrap(), TypeId::Float);
assert_eq!(type_name_to_id("bool").unwrap(), TypeId::Bool);
assert_eq!(type_name_to_id("datetime").unwrap(), TypeId::DateTime);
assert_eq!(type_name_to_id("uuid").unwrap(), TypeId::Uuid);
assert_eq!(type_name_to_id("bytes").unwrap(), TypeId::Bytes);
}
#[test]
fn type_name_to_id_case_insensitive() {
assert_eq!(type_name_to_id("Int").unwrap(), TypeId::Int);
assert_eq!(type_name_to_id("INT").unwrap(), TypeId::Int);
assert_eq!(type_name_to_id("Str").unwrap(), TypeId::Str);
assert_eq!(type_name_to_id("Float").unwrap(), TypeId::Float);
assert_eq!(type_name_to_id("Bool").unwrap(), TypeId::Bool);
assert_eq!(type_name_to_id("DateTime").unwrap(), TypeId::DateTime);
assert_eq!(type_name_to_id("DATETIME").unwrap(), TypeId::DateTime);
assert_eq!(type_name_to_id("Uuid").unwrap(), TypeId::Uuid);
assert_eq!(type_name_to_id("UUID").unwrap(), TypeId::Uuid);
assert_eq!(type_name_to_id("Bytes").unwrap(), TypeId::Bytes);
assert_eq!(type_name_to_id("String").unwrap(), TypeId::Str);
assert_eq!(type_name_to_id("Boolean").unwrap(), TypeId::Bool);
}
#[test]
fn type_name_to_id_unknown_returns_error() {
let err = type_name_to_id("Foo").unwrap_err();
assert!(err.contains("unknown type name"), "got: {err}");
assert!(err.contains("Foo"), "got: {err}");
}
}