use std::sync::atomic::{AtomicUsize, Ordering};
pub const DEFAULT_BATCH_SIZE: usize = 1024;
pub const MIN_BATCH_SIZE: usize = 64;
pub const MAX_BATCH_SIZE: usize = 8192;
#[derive(Debug, Clone)]
pub enum ColumnVector {
Bool(Vec<bool>),
Int64(Vec<i64>),
UInt64(Vec<u64>),
Float64(Vec<f64>),
String { offsets: Vec<u32>, data: Vec<u8> },
Binary { offsets: Vec<u32>, data: Vec<u8> },
Null(Vec<u64>),
}
impl ColumnVector {
pub fn new_int64(capacity: usize) -> Self {
ColumnVector::Int64(Vec::with_capacity(capacity))
}
pub fn new_float64(capacity: usize) -> Self {
ColumnVector::Float64(Vec::with_capacity(capacity))
}
pub fn new_string(capacity: usize) -> Self {
ColumnVector::String {
offsets: Vec::with_capacity(capacity + 1),
data: Vec::with_capacity(capacity * 32), }
}
pub fn len(&self) -> usize {
match self {
ColumnVector::Bool(v) => v.len(),
ColumnVector::Int64(v) => v.len(),
ColumnVector::UInt64(v) => v.len(),
ColumnVector::Float64(v) => v.len(),
ColumnVector::String { offsets, .. } => offsets.len().saturating_sub(1),
ColumnVector::Binary { offsets, .. } => offsets.len().saturating_sub(1),
ColumnVector::Null(v) => v.len() * 64,
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn memory_size(&self) -> usize {
match self {
ColumnVector::Bool(v) => v.len(),
ColumnVector::Int64(v) => v.len() * 8,
ColumnVector::UInt64(v) => v.len() * 8,
ColumnVector::Float64(v) => v.len() * 8,
ColumnVector::String { offsets, data } => offsets.len() * 4 + data.len(),
ColumnVector::Binary { offsets, data } => offsets.len() * 4 + data.len(),
ColumnVector::Null(v) => v.len() * 8,
}
}
#[cfg(target_arch = "x86_64")]
pub fn sum_i64(&self) -> Option<i64> {
match self {
ColumnVector::Int64(values) => {
if values.is_empty() {
return Some(0);
}
if values.len() >= 16 {
Some(simd_sum_i64(values))
} else {
Some(values.iter().sum())
}
}
_ => None,
}
}
#[cfg(not(target_arch = "x86_64"))]
pub fn sum_i64(&self) -> Option<i64> {
match self {
ColumnVector::Int64(values) => Some(values.iter().sum()),
_ => None,
}
}
#[cfg(target_arch = "x86_64")]
pub fn sum_f64(&self) -> Option<f64> {
match self {
ColumnVector::Float64(values) => {
if values.is_empty() {
return Some(0.0);
}
if values.len() >= 8 {
Some(simd_sum_f64(values))
} else {
Some(values.iter().sum())
}
}
_ => None,
}
}
#[cfg(not(target_arch = "x86_64"))]
pub fn sum_f64(&self) -> Option<f64> {
match self {
ColumnVector::Float64(values) => Some(values.iter().sum()),
_ => None,
}
}
}
#[cfg(target_arch = "x86_64")]
fn simd_sum_i64(values: &[i64]) -> i64 {
#[cfg(target_feature = "avx2")]
{
use std::arch::x86_64::*;
unsafe {
let mut sum = _mm256_setzero_si256();
let chunks = values.len() / 4;
let ptr = values.as_ptr();
for i in 0..chunks {
let v = _mm256_loadu_si256(ptr.add(i * 4) as *const __m256i);
sum = _mm256_add_epi64(sum, v);
}
let arr: [i64; 4] = std::mem::transmute(sum);
let simd_total: i64 = arr.iter().sum();
let remaining: i64 = values[chunks * 4..].iter().sum();
simd_total + remaining
}
}
#[cfg(not(target_feature = "avx2"))]
{
values.iter().sum()
}
}
#[cfg(target_arch = "x86_64")]
fn simd_sum_f64(values: &[f64]) -> f64 {
#[cfg(target_feature = "avx")]
{
use std::arch::x86_64::*;
unsafe {
let mut sum = _mm256_setzero_pd();
let chunks = values.len() / 4;
let ptr = values.as_ptr();
for i in 0..chunks {
let v = _mm256_loadu_pd(ptr.add(i * 4));
sum = _mm256_add_pd(sum, v);
}
let arr: [f64; 4] = std::mem::transmute(sum);
let simd_total: f64 = arr.iter().sum();
let remaining: f64 = values[chunks * 4..].iter().sum();
simd_total + remaining
}
}
#[cfg(not(target_feature = "avx"))]
{
values.iter().sum()
}
}
#[derive(Debug)]
pub struct VectorBatch {
columns: Vec<(String, ColumnVector)>,
row_count: usize,
capacity: usize,
selection: Option<Vec<usize>>,
}
impl VectorBatch {
pub fn with_capacity(capacity: usize) -> Self {
Self {
columns: Vec::new(),
row_count: 0,
capacity,
selection: None,
}
}
pub fn new() -> Self {
Self::with_capacity(DEFAULT_BATCH_SIZE)
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn row_count(&self) -> usize {
if let Some(ref sel) = self.selection {
sel.len()
} else {
self.row_count
}
}
pub fn is_full(&self) -> bool {
self.row_count >= self.capacity
}
pub fn is_empty(&self) -> bool {
self.row_count == 0
}
pub fn add_column(&mut self, name: impl Into<String>, column: ColumnVector) {
self.columns.push((name.into(), column));
if self.row_count == 0 {
self.row_count = self.columns.last().map(|(_, c)| c.len()).unwrap_or(0);
}
}
pub fn column(&self, name: &str) -> Option<&ColumnVector> {
self.columns.iter().find(|(n, _)| n == name).map(|(_, c)| c)
}
pub fn column_at(&self, idx: usize) -> Option<&ColumnVector> {
self.columns.get(idx).map(|(_, c)| c)
}
pub fn column_count(&self) -> usize {
self.columns.len()
}
pub fn set_selection(&mut self, selection: Vec<usize>) {
self.selection = Some(selection);
}
pub fn clear_selection(&mut self) {
self.selection = None;
}
pub fn memory_size(&self) -> usize {
self.columns.iter().map(|(_, c)| c.memory_size()).sum()
}
pub fn reset(&mut self) {
self.columns.clear();
self.row_count = 0;
self.selection = None;
}
}
impl Default for VectorBatch {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub struct VectorizedScanStats {
pub rows_scanned: AtomicUsize,
pub batches_processed: AtomicUsize,
pub rows_passed: AtomicUsize,
pub bytes_read: AtomicUsize,
}
impl VectorizedScanStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_batch(&self, rows: usize, passed: usize, bytes: usize) {
self.rows_scanned.fetch_add(rows, Ordering::Relaxed);
self.batches_processed.fetch_add(1, Ordering::Relaxed);
self.rows_passed.fetch_add(passed, Ordering::Relaxed);
self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
}
pub fn rows_scanned(&self) -> usize {
self.rows_scanned.load(Ordering::Relaxed)
}
pub fn batches_processed(&self) -> usize {
self.batches_processed.load(Ordering::Relaxed)
}
}
pub trait VectorPredicate: Send + Sync {
fn evaluate(&self, column: &ColumnVector) -> Vec<bool>;
fn column_name(&self) -> &str;
}
#[derive(Debug, Clone)]
pub struct Int64Comparison {
column_name: String,
op: ComparisonOp,
value: i64,
}
#[derive(Debug, Clone, Copy)]
pub enum ComparisonOp {
Equal,
NotEqual,
LessThan,
LessEqual,
GreaterThan,
GreaterEqual,
}
impl Int64Comparison {
pub fn new(column_name: impl Into<String>, op: ComparisonOp, value: i64) -> Self {
Self {
column_name: column_name.into(),
op,
value,
}
}
pub fn eq(column_name: impl Into<String>, value: i64) -> Self {
Self::new(column_name, ComparisonOp::Equal, value)
}
pub fn gt(column_name: impl Into<String>, value: i64) -> Self {
Self::new(column_name, ComparisonOp::GreaterThan, value)
}
pub fn lt(column_name: impl Into<String>, value: i64) -> Self {
Self::new(column_name, ComparisonOp::LessThan, value)
}
}
impl VectorPredicate for Int64Comparison {
fn evaluate(&self, column: &ColumnVector) -> Vec<bool> {
match column {
ColumnVector::Int64(values) => {
let cmp_value = self.value;
match self.op {
ComparisonOp::Equal => values.iter().map(|&v| v == cmp_value).collect(),
ComparisonOp::NotEqual => values.iter().map(|&v| v != cmp_value).collect(),
ComparisonOp::LessThan => values.iter().map(|&v| v < cmp_value).collect(),
ComparisonOp::LessEqual => values.iter().map(|&v| v <= cmp_value).collect(),
ComparisonOp::GreaterThan => values.iter().map(|&v| v > cmp_value).collect(),
ComparisonOp::GreaterEqual => values.iter().map(|&v| v >= cmp_value).collect(),
}
}
_ => vec![false; column.len()],
}
}
fn column_name(&self) -> &str {
&self.column_name
}
}
#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
use std::arch::x86_64::*;
let mut result = vec![false; values.len()];
let chunks = values.len() / 4;
unsafe {
let threshold_vec = _mm256_set1_epi64x(threshold);
for i in 0..chunks {
let v = _mm256_loadu_si256(values.as_ptr().add(i * 4) as *const __m256i);
let cmp = _mm256_cmpgt_epi64(v, threshold_vec);
let mask = _mm256_movemask_epi8(cmp) as u32;
result[i * 4] = (mask & 0xFF) != 0;
result[i * 4 + 1] = (mask & 0xFF00) != 0;
result[i * 4 + 2] = (mask & 0xFF0000) != 0;
result[i * 4 + 3] = (mask & 0xFF000000) != 0;
}
for i in (chunks * 4)..values.len() {
result[i] = values[i] > threshold;
}
}
result
}
#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
values.iter().map(|&v| v > threshold).collect()
}
#[derive(Debug, Clone)]
pub struct VectorizedScanConfig {
pub batch_size: usize,
pub prefetch_enabled: bool,
pub prefetch_distance: usize,
pub simd_enabled: bool,
}
impl Default for VectorizedScanConfig {
fn default() -> Self {
Self {
batch_size: DEFAULT_BATCH_SIZE,
prefetch_enabled: true,
prefetch_distance: 16,
simd_enabled: true,
}
}
}
impl VectorizedScanConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
self
}
pub fn with_prefetch(mut self, enabled: bool) -> Self {
self.prefetch_enabled = enabled;
self
}
}
pub struct SimdVisibilityFilter;
impl SimdVisibilityFilter {
#[inline]
pub fn filter_batch(commit_ts: &[u64], snapshot_ts: u64) -> Vec<bool> {
let mut result = vec![false; commit_ts.len()];
Self::filter_batch_into(commit_ts, snapshot_ts, &mut result);
result
}
#[inline]
pub fn filter_batch_into(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
assert_eq!(commit_ts.len(), out.len());
#[cfg(target_arch = "x86_64")]
{
Self::filter_batch_simd_x86(commit_ts, snapshot_ts, out);
return;
}
#[cfg(target_arch = "aarch64")]
{
Self::filter_batch_simd_neon(commit_ts, snapshot_ts, out);
return;
}
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
{
Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
}
}
#[inline]
fn filter_batch_scalar(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
for (i, &ts) in commit_ts.iter().enumerate() {
out[i] = ts != 0 && ts < snapshot_ts;
}
}
#[cfg(target_arch = "x86_64")]
fn filter_batch_simd_x86(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
let n = commit_ts.len();
if n == 0 {
return;
}
let chunks = n / 4;
let remainder = n % 4;
#[cfg(target_feature = "avx2")]
unsafe {
use std::arch::x86_64::*;
let zero = _mm256_setzero_si256();
let snapshot_vec = _mm256_set1_epi64x(snapshot_ts as i64);
for chunk in 0..chunks {
let ptr = commit_ts.as_ptr().add(chunk * 4) as *const __m256i;
let ts_vec = _mm256_loadu_si256(ptr);
let not_zero = _mm256_xor_si256(
_mm256_cmpeq_epi64(ts_vec, zero),
_mm256_set1_epi64x(-1), );
let less_than = _mm256_xor_si256(
_mm256_or_si256(
_mm256_cmpgt_epi64(ts_vec, snapshot_vec),
_mm256_cmpeq_epi64(ts_vec, snapshot_vec),
),
_mm256_set1_epi64x(-1),
);
let visible = _mm256_and_si256(not_zero, less_than);
let mask: [i64; 4] = std::mem::transmute(visible);
for j in 0..4 {
out[chunk * 4 + j] = mask[j] != 0;
}
}
}
#[cfg(not(target_feature = "avx2"))]
{
let chunks = n / 2;
for chunk in 0..chunks {
let base = chunk * 2;
for j in 0..2 {
let ts = commit_ts[base + j];
out[base + j] = ts != 0 && ts < snapshot_ts;
}
}
}
let base = chunks * 4;
for i in 0..remainder {
let ts = commit_ts[base + i];
out[base + i] = ts != 0 && ts < snapshot_ts;
}
}
#[cfg(target_arch = "aarch64")]
fn filter_batch_simd_neon(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
}
#[inline]
pub fn filter_batch_with_txn(
commit_ts: &[u64],
txn_ids: &[u64],
snapshot_ts: u64,
current_txn_id: u64,
out: &mut [bool],
) {
assert_eq!(commit_ts.len(), txn_ids.len());
assert_eq!(commit_ts.len(), out.len());
Self::filter_batch_into(commit_ts, snapshot_ts, out);
for (i, &txn_id) in txn_ids.iter().enumerate() {
if txn_id == current_txn_id {
out[i] = true;
}
}
}
#[inline]
pub fn count_visible(commit_ts: &[u64], snapshot_ts: u64) -> usize {
let mut count = 0;
for &ts in commit_ts {
if ts != 0 && ts < snapshot_ts {
count += 1;
}
}
count
}
}
#[derive(Debug, Clone)]
pub struct VersionedSlice<'a> {
pub key: &'a [u8],
pub value: Option<&'a [u8]>,
pub commit_ts: u64,
pub txn_id: u64,
}
impl<'a> VersionedSlice<'a> {
#[inline]
pub fn is_visible(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
if let Some(my_txn) = current_txn_id {
if self.txn_id == my_txn {
return true;
}
}
self.commit_ts != 0 && self.commit_ts < snapshot_ts
}
}
pub struct StreamingScanIterator<'a, I>
where
I: Iterator<Item = VersionedSlice<'a>>,
{
source: I,
batch: Vec<VersionedSlice<'a>>,
visibility: Vec<bool>,
pos: usize,
snapshot_ts: u64,
current_txn_id: Option<u64>,
batch_size: usize,
}
impl<'a, I> StreamingScanIterator<'a, I>
where
I: Iterator<Item = VersionedSlice<'a>>,
{
pub fn new(source: I, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
}
pub fn with_batch_size(
source: I,
snapshot_ts: u64,
current_txn_id: Option<u64>,
batch_size: usize,
) -> Self {
Self {
source,
batch: Vec::with_capacity(batch_size),
visibility: Vec::with_capacity(batch_size),
pos: 0,
snapshot_ts,
current_txn_id,
batch_size,
}
}
fn fetch_batch(&mut self) -> bool {
self.batch.clear();
self.visibility.clear();
self.pos = 0;
for entry in self.source.by_ref().take(self.batch_size) {
self.batch.push(entry);
}
if self.batch.is_empty() {
return false;
}
let commit_ts: Vec<u64> = self.batch.iter().map(|e| e.commit_ts).collect();
self.visibility.resize(self.batch.len(), false);
if let Some(txn_id) = self.current_txn_id {
let txn_ids: Vec<u64> = self.batch.iter().map(|e| e.txn_id).collect();
SimdVisibilityFilter::filter_batch_with_txn(
&commit_ts,
&txn_ids,
self.snapshot_ts,
txn_id,
&mut self.visibility,
);
} else {
SimdVisibilityFilter::filter_batch_into(
&commit_ts,
self.snapshot_ts,
&mut self.visibility,
);
}
true
}
}
impl<'a, I> Iterator for StreamingScanIterator<'a, I>
where
I: Iterator<Item = VersionedSlice<'a>>,
{
type Item = VersionedSlice<'a>;
fn next(&mut self) -> Option<Self::Item> {
loop {
while self.pos >= self.batch.len() {
if !self.fetch_batch() {
return None;
}
}
while self.pos < self.batch.len() {
let idx = self.pos;
self.pos += 1;
if self.visibility[idx] {
return Some(self.batch[idx].clone());
}
}
}
}
}
#[derive(Debug)]
pub struct SoaBatch<'a> {
pub commit_ts: Vec<u64>,
pub txn_ids: Vec<u64>,
pub keys: Vec<&'a [u8]>,
pub value_handles: Vec<Option<ValueHandle<'a>>>,
pub visibility: Vec<bool>,
pub selection: Vec<usize>,
}
#[derive(Debug, Clone, Copy)]
pub enum ValueHandle<'a> {
Direct(&'a [u8]),
BlockOffset {
block_id: u32,
offset: u32,
len: u32,
},
ArenaSlot { arena_id: u32, slot: u32 },
}
impl<'a> ValueHandle<'a> {
pub fn materialize(&self) -> Option<&'a [u8]> {
match self {
ValueHandle::Direct(data) => Some(*data),
ValueHandle::BlockOffset { .. } => None,
ValueHandle::ArenaSlot { .. } => None,
}
}
}
impl<'a> SoaBatch<'a> {
pub fn with_capacity(capacity: usize) -> Self {
Self {
commit_ts: Vec::with_capacity(capacity),
txn_ids: Vec::with_capacity(capacity),
keys: Vec::with_capacity(capacity),
value_handles: Vec::with_capacity(capacity),
visibility: Vec::with_capacity(capacity),
selection: Vec::with_capacity(capacity),
}
}
#[inline]
pub fn push(&mut self, key: &'a [u8], value: Option<&'a [u8]>, commit_ts: u64, txn_id: u64) {
self.commit_ts.push(commit_ts);
self.txn_ids.push(txn_id);
self.keys.push(key);
self.value_handles.push(value.map(ValueHandle::Direct));
}
#[inline]
pub fn push_deferred(
&mut self,
key: &'a [u8],
handle: Option<ValueHandle<'a>>,
commit_ts: u64,
txn_id: u64,
) {
self.commit_ts.push(commit_ts);
self.txn_ids.push(txn_id);
self.keys.push(key);
self.value_handles.push(handle);
}
pub fn len(&self) -> usize {
self.commit_ts.len()
}
pub fn is_empty(&self) -> bool {
self.commit_ts.is_empty()
}
pub fn clear(&mut self) {
self.commit_ts.clear();
self.txn_ids.clear();
self.keys.clear();
self.value_handles.clear();
self.visibility.clear();
self.selection.clear();
}
pub fn filter_visibility(&mut self, snapshot_ts: u64, current_txn_id: Option<u64>) {
let n = self.len();
self.visibility.resize(n, false);
self.selection.clear();
if let Some(txn_id) = current_txn_id {
SimdVisibilityFilter::filter_batch_with_txn(
&self.commit_ts,
&self.txn_ids,
snapshot_ts,
txn_id,
&mut self.visibility,
);
} else {
SimdVisibilityFilter::filter_batch_into(
&self.commit_ts,
snapshot_ts,
&mut self.visibility,
);
}
for (i, &visible) in self.visibility.iter().enumerate() {
if visible {
self.selection.push(i);
}
}
}
pub fn visible_count(&self) -> usize {
self.selection.len()
}
pub fn iter_visible(&self) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)> + '_ {
self.selection.iter().map(move |&idx| {
let key = self.keys[idx];
let value = self.value_handles[idx].and_then(|h| h.materialize());
(key, value)
})
}
pub fn iter_visible_full(
&self,
) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>, u64, u64)> + '_ {
self.selection.iter().map(move |&idx| {
let key = self.keys[idx];
let value = self.value_handles[idx].and_then(|h| h.materialize());
let ts = self.commit_ts[idx];
let txn = self.txn_ids[idx];
(key, value, ts, txn)
})
}
}
pub struct SoaScanIterator<'a, S>
where
S: SoaSource<'a>,
{
source: S,
batch: SoaBatch<'a>,
pos: usize,
snapshot_ts: u64,
current_txn_id: Option<u64>,
#[allow(dead_code)]
batch_size: usize,
stats: SoaScanStats,
}
#[derive(Debug, Default, Clone)]
pub struct SoaScanStats {
pub rows_scanned: usize,
pub rows_visible: usize,
pub values_materialized: usize,
pub batches_processed: usize,
}
impl SoaScanStats {
pub fn selectivity(&self) -> f64 {
if self.rows_scanned == 0 {
0.0
} else {
self.rows_visible as f64 / self.rows_scanned as f64
}
}
pub fn materialization_efficiency(&self) -> f64 {
if self.rows_visible == 0 {
1.0
} else {
self.values_materialized as f64 / self.rows_visible as f64
}
}
}
pub trait SoaSource<'a> {
fn fill_batch(&mut self, batch: &mut SoaBatch<'a>) -> bool;
}
impl<'a, S> SoaScanIterator<'a, S>
where
S: SoaSource<'a>,
{
pub fn new(source: S, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
}
pub fn with_batch_size(
source: S,
snapshot_ts: u64,
current_txn_id: Option<u64>,
batch_size: usize,
) -> Self {
Self {
source,
batch: SoaBatch::with_capacity(batch_size),
pos: 0,
snapshot_ts,
current_txn_id,
batch_size,
stats: SoaScanStats::default(),
}
}
fn fetch_batch(&mut self) -> bool {
self.batch.clear();
self.pos = 0;
if !self.source.fill_batch(&mut self.batch) {
return false;
}
self.stats.rows_scanned += self.batch.len();
self.stats.batches_processed += 1;
self.batch
.filter_visibility(self.snapshot_ts, self.current_txn_id);
self.stats.rows_visible += self.batch.visible_count();
true
}
pub fn stats(&self) -> &SoaScanStats {
&self.stats
}
}
impl<'a, S> Iterator for SoaScanIterator<'a, S>
where
S: SoaSource<'a>,
{
type Item = (&'a [u8], Option<&'a [u8]>);
fn next(&mut self) -> Option<Self::Item> {
while self.pos >= self.batch.selection.len() {
if !self.fetch_batch() {
return None;
}
}
let sel_idx = self.pos;
self.pos += 1;
let row_idx = self.batch.selection[sel_idx];
let key = self.batch.keys[row_idx];
let value = self.batch.value_handles[row_idx].and_then(|h| h.materialize());
self.stats.values_materialized += 1;
Some((key, value))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_column_vector_int64() {
let mut v = ColumnVector::Int64(vec![1, 2, 3, 4, 5]);
assert_eq!(v.len(), 5);
assert_eq!(v.sum_i64(), Some(15));
}
#[test]
fn test_column_vector_float64() {
let v = ColumnVector::Float64(vec![1.0, 2.0, 3.0, 4.0]);
assert_eq!(v.len(), 4);
assert_eq!(v.sum_f64(), Some(10.0));
}
#[test]
fn test_vector_batch() {
let mut batch = VectorBatch::with_capacity(1024);
batch.add_column("id", ColumnVector::Int64(vec![1, 2, 3]));
batch.add_column("value", ColumnVector::Float64(vec![1.5, 2.5, 3.5]));
assert_eq!(batch.row_count(), 3);
assert_eq!(batch.column_count(), 2);
assert!(batch.column("id").is_some());
}
#[test]
fn test_int64_comparison() {
let col = ColumnVector::Int64(vec![1, 5, 10, 15, 20]);
let pred = Int64Comparison::gt("test", 10);
let result = pred.evaluate(&col);
assert_eq!(result, vec![false, false, false, true, true]);
}
#[test]
fn test_simd_sum_i64_large() {
let values: Vec<i64> = (0..1000).collect();
let expected: i64 = (0..1000).sum();
let col = ColumnVector::Int64(values);
assert_eq!(col.sum_i64(), Some(expected));
}
#[test]
fn test_simd_compare_gt() {
let values: Vec<i64> = vec![1, 5, 10, 15, 20, 25, 30, 35];
let result = simd_compare_i64_gt(&values, 12);
assert_eq!(
result,
vec![false, false, false, true, true, true, true, true]
);
}
#[test]
fn test_vectorized_scan_config() {
let config = VectorizedScanConfig::new()
.with_batch_size(2048)
.with_prefetch(true);
assert_eq!(config.batch_size, 2048);
assert!(config.prefetch_enabled);
}
#[test]
fn test_simd_visibility_filter_basic() {
let commit_ts = vec![0, 10, 20, 30, 40];
let snapshot_ts = 25;
let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
assert_eq!(result, vec![false, true, true, false, false]);
}
#[test]
fn test_simd_visibility_filter_with_txn() {
let commit_ts = vec![0, 10, 0, 30, 40];
let txn_ids = vec![1, 2, 1, 4, 5]; let snapshot_ts = 25;
let current_txn_id = 1;
let mut result = vec![false; 5];
SimdVisibilityFilter::filter_batch_with_txn(
&commit_ts,
&txn_ids,
snapshot_ts,
current_txn_id,
&mut result,
);
assert_eq!(result, vec![true, true, true, false, false]);
}
#[test]
fn test_simd_visibility_filter_large() {
let n = 1000;
let commit_ts: Vec<u64> = (1..=n as u64).collect();
let snapshot_ts = 500;
let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
let visible_count = result.iter().filter(|&&v| v).count();
assert_eq!(visible_count, 499);
}
#[test]
fn test_versioned_slice_visibility() {
let slice = VersionedSlice {
key: b"test",
value: Some(b"value"),
commit_ts: 100,
txn_id: 1,
};
assert!(slice.is_visible(200, None));
assert!(!slice.is_visible(50, None));
assert!(slice.is_visible(50, Some(1))); }
#[test]
fn test_streaming_scan_iterator() {
let entries: Vec<VersionedSlice<'static>> = vec![
VersionedSlice {
key: b"a",
value: Some(b"1"),
commit_ts: 10,
txn_id: 1,
},
VersionedSlice {
key: b"b",
value: Some(b"2"),
commit_ts: 0,
txn_id: 2,
}, VersionedSlice {
key: b"c",
value: Some(b"3"),
commit_ts: 30,
txn_id: 3,
}, VersionedSlice {
key: b"d",
value: Some(b"4"),
commit_ts: 15,
txn_id: 4,
},
];
let iter = StreamingScanIterator::new(entries.into_iter(), 25, None);
let visible: Vec<_> = iter.collect();
assert_eq!(visible.len(), 2);
assert_eq!(visible[0].key, b"a");
assert_eq!(visible[1].key, b"d");
}
#[test]
fn test_soa_batch_basic() {
let mut batch = SoaBatch::with_capacity(100);
batch.push(b"key1", Some(b"value1"), 10, 1);
batch.push(b"key2", Some(b"value2"), 20, 2);
batch.push(b"key3", None, 30, 3); batch.push(b"key4", Some(b"value4"), 0, 4);
assert_eq!(batch.len(), 4);
assert_eq!(batch.commit_ts, vec![10, 20, 30, 0]);
assert_eq!(batch.txn_ids, vec![1, 2, 3, 4]);
}
#[test]
fn test_soa_batch_visibility_filter() {
let mut batch = SoaBatch::with_capacity(100);
batch.push(b"k1", Some(b"v1"), 10, 1); batch.push(b"k2", Some(b"v2"), 0, 2); batch.push(b"k3", Some(b"v3"), 20, 3); batch.push(b"k4", Some(b"v4"), 30, 4); batch.push(b"k5", Some(b"v5"), 0, 5);
batch.filter_visibility(25, None);
assert_eq!(batch.visibility, vec![true, false, true, false, false]);
assert_eq!(batch.selection, vec![0, 2]); assert_eq!(batch.visible_count(), 2);
}
#[test]
fn test_soa_batch_self_visibility() {
let mut batch = SoaBatch::with_capacity(100);
batch.push(b"k1", Some(b"v1"), 0, 42); batch.push(b"k2", Some(b"v2"), 10, 1); batch.push(b"k3", Some(b"v3"), 0, 99);
batch.filter_visibility(25, Some(42));
assert_eq!(batch.visibility, vec![true, true, false]);
assert_eq!(batch.selection, vec![0, 1]);
}
#[test]
fn test_soa_batch_late_materialization() {
let mut batch = SoaBatch::with_capacity(100);
batch.push(b"key1", Some(b"val1"), 10, 1);
batch.push(b"key2", Some(b"val2"), 0, 2); batch.push(b"key3", Some(b"val3"), 15, 3);
batch.filter_visibility(25, None);
let visible: Vec<_> = batch.iter_visible().collect();
assert_eq!(visible.len(), 2);
assert_eq!(visible[0], (b"key1".as_slice(), Some(b"val1".as_slice())));
assert_eq!(visible[1], (b"key3".as_slice(), Some(b"val3".as_slice())));
}
#[test]
fn test_soa_scan_stats() {
let mut batch = SoaBatch::with_capacity(100);
for i in 0..10u64 {
let ts = if i < 3 { 10 } else { 0 }; batch.push(b"key", Some(b"val"), ts, i);
}
batch.filter_visibility(25, None);
let selectivity = batch.visible_count() as f64 / batch.len() as f64;
assert!((selectivity - 0.3).abs() < 0.01); }
#[test]
fn test_soa_batch_simd_large() {
let mut batch = SoaBatch::with_capacity(2000);
for i in 0..1000u64 {
let ts = if i % 2 == 0 { 10 } else { 50 };
batch.push(b"k", Some(b"v"), ts, i);
}
batch.filter_visibility(25, None);
assert_eq!(batch.visible_count(), 500);
for (i, &idx) in batch.selection.iter().enumerate() {
assert_eq!(idx, i * 2); }
}
}