use super::columnar::{Column, ColumnBatch, TypeTag};
use super::zone_map::ColumnStats;
use alloc::vec::Vec;
#[derive(Debug, Clone)]
pub struct ColumnRangePredicate {
pub column_id: u16,
pub lower: Option<Vec<u8>>,
pub upper: Option<Vec<u8>>,
}
impl ColumnRangePredicate {
#[must_use]
pub fn can_skip_block(&self, stats: &[ColumnStats]) -> bool {
let target = u32::from(self.column_id);
let Some(s) = stats.iter().find(|s| s.column_id == target) else {
return false;
};
if let Some(lo) = &self.lower
&& s.max.as_slice() < lo.as_slice()
{
return true;
}
if let Some(hi) = &self.upper
&& s.min.as_slice() > hi.as_slice()
{
return true;
}
false
}
#[must_use]
pub fn matching_rows(&self, batch: &ColumnBatch) -> Vec<bool> {
let rows = batch.row_count as usize;
let Some(col) = batch.columns.iter().find(|c| c.column_id == self.column_id) else {
return alloc::vec![true; rows];
};
if !matches!(col.type_tag, TypeTag::Bytes) {
return alloc::vec![true; rows];
}
(0..rows)
.map(|row| self.row_matches(col, rows, row))
.collect()
}
fn row_matches(&self, col: &Column, rows: usize, row: usize) -> bool {
if !row_valid(col, row) {
return false;
}
let Some(value) = bytes_row(&col.data, rows, row) else {
return false;
};
if let Some(lo) = &self.lower
&& value < lo.as_slice()
{
return false;
}
if let Some(hi) = &self.upper
&& value > hi.as_slice()
{
return false;
}
true
}
}
#[must_use]
pub fn filter_batch(batch: &ColumnBatch, mask: &[bool]) -> ColumnBatch {
let kept = mask.iter().filter(|&&m| m).count();
let rows = batch.row_count as usize;
let columns = batch
.columns
.iter()
.map(|c| filter_column(c, rows, mask, kept))
.collect();
let row_count = u32::try_from(kept).unwrap_or(batch.row_count);
ColumnBatch { row_count, columns }
}
fn filter_column(col: &Column, rows: usize, mask: &[bool], kept: usize) -> Column {
let data = match col.type_tag {
TypeTag::Fixed(width) => {
let width = width as usize;
let mut out = Vec::with_capacity(kept * width);
for (row, &keep) in mask.iter().enumerate() {
if keep
&& let Some(start) = row.checked_mul(width)
&& let Some(end) = start.checked_add(width)
&& let Some(chunk) = col.data.get(start..end)
{
out.extend_from_slice(chunk);
}
}
out
}
TypeTag::Bytes => {
let mut offsets = Vec::with_capacity((kept + 1) * 4);
let mut payload = Vec::new();
let mut acc: u32 = 0;
offsets.extend_from_slice(&acc.to_le_bytes());
for (row, &keep) in mask.iter().enumerate() {
if !keep {
continue;
}
let value = bytes_row(&col.data, rows, row).unwrap_or(&[]);
payload.extend_from_slice(value);
let len = u32::try_from(value.len()).unwrap_or(u32::MAX);
acc = acc.saturating_add(len);
offsets.extend_from_slice(&acc.to_le_bytes());
}
offsets.extend_from_slice(&payload);
offsets
}
};
let validity = col
.validity
.as_ref()
.map(|bits| compact_validity(bits, mask, kept));
Column {
column_id: col.column_id,
type_tag: col.type_tag,
validity,
data,
}
}
fn compact_validity(bits: &[u8], mask: &[bool], kept: usize) -> Vec<u8> {
let mut out = alloc::vec![0u8; kept.div_ceil(8)];
let mut o = 0usize;
for (row, &keep) in mask.iter().enumerate() {
if !keep {
continue;
}
let valid = bits
.get(row / 8)
.is_some_and(|b| b & (1u8 << (row % 8)) != 0);
if valid && let Some(byte) = out.get_mut(o / 8) {
*byte |= 1u8 << (o % 8);
}
o += 1;
}
out
}
fn row_valid(col: &Column, row: usize) -> bool {
match &col.validity {
None => true,
Some(bits) => bits
.get(row / 8)
.is_some_and(|byte| byte & (1u8 << (row % 8)) != 0),
}
}
fn bytes_row(data: &[u8], rows: usize, row: usize) -> Option<&[u8]> {
let table_len = rows.checked_add(1)?.checked_mul(4)?;
let off_i = read_u32_le(data.get(row.checked_mul(4)?..)?)? as usize;
let off_next = read_u32_le(data.get(row.checked_add(1)?.checked_mul(4)?..)?)? as usize;
data.get(table_len.checked_add(off_i)?..table_len.checked_add(off_next)?)
}
fn read_u32_le(bytes: &[u8]) -> Option<u32> {
let head = bytes.get(..4)?;
let mut arr = [0u8; 4];
for (dst, &src) in arr.iter_mut().zip(head) {
*dst = src;
}
Some(u32::from_le_bytes(arr))
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
cpufeatures::new!(cpu_avx2_byte_eq, "avx2");
#[must_use]
pub fn byte_eq_mask(batch: &ColumnBatch, column_id: u16, value: u8) -> Vec<bool> {
let rows = batch.row_count as usize;
let Some(col) = batch.columns.iter().find(|c| c.column_id == column_id) else {
return alloc::vec![true; rows];
};
if !matches!(col.type_tag, TypeTag::Fixed(1)) {
return alloc::vec![true; rows];
}
byte_eq_dispatch(&col.data, value)
}
fn byte_eq_scalar(data: &[u8], value: u8) -> Vec<bool> {
data.iter().map(|&b| b == value).collect()
}
fn byte_eq_dispatch(data: &[u8], value: u8) -> Vec<bool> {
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
{
if cpu_avx2_byte_eq::get() {
return unsafe { byte_eq_avx2(data, value) };
}
byte_eq_scalar(data, value)
}
#[cfg(target_arch = "aarch64")]
{
unsafe { byte_eq_neon(data, value) }
}
#[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")))]
{
byte_eq_scalar(data, value)
}
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
#[target_feature(enable = "avx2")]
unsafe fn byte_eq_avx2(data: &[u8], value: u8) -> Vec<bool> {
#[cfg(target_arch = "x86")]
use core::arch::x86::{
_mm256_cmpeq_epi8, _mm256_loadu_si256, _mm256_movemask_epi8, _mm256_set1_epi8,
};
#[cfg(target_arch = "x86_64")]
use core::arch::x86_64::{
_mm256_cmpeq_epi8, _mm256_loadu_si256, _mm256_movemask_epi8, _mm256_set1_epi8,
};
let mut chunks = data.chunks_exact(32);
let mut out = Vec::with_capacity(data.len());
unsafe {
let needle = _mm256_set1_epi8(i8::from_ne_bytes([value]));
for chunk in &mut chunks {
let v = _mm256_loadu_si256(chunk.as_ptr().cast());
let cmp = _mm256_movemask_epi8(_mm256_cmpeq_epi8(v, needle));
let mask = u32::from_ne_bytes(cmp.to_ne_bytes());
for i in 0..32u32 {
out.push((mask >> i) & 1 == 1);
}
}
}
out.extend(byte_eq_scalar(chunks.remainder(), value));
out
}
#[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")]
unsafe fn byte_eq_neon(data: &[u8], value: u8) -> Vec<bool> {
use core::arch::aarch64::{vceqq_u8, vdupq_n_u8, vld1q_u8, vst1q_u8};
let mut chunks = data.chunks_exact(16);
let mut out = Vec::with_capacity(data.len());
unsafe {
let needle = vdupq_n_u8(value);
for chunk in &mut chunks {
let eq = vceqq_u8(vld1q_u8(chunk.as_ptr()), needle); let mut lanes = [0u8; 16];
vst1q_u8(lanes.as_mut_ptr(), eq);
for &lane in &lanes {
out.push(lane != 0);
}
}
}
out.extend(byte_eq_scalar(chunks.remainder(), value));
out
}
#[cfg(test)]
#[expect(clippy::expect_used, clippy::indexing_slicing, reason = "test code")]
mod tests;