Skip to main content

sochdb_storage/
vectorized_scan.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SIMD-Accelerated Vectorized Scan Engine (Recommendation 2)
19//!
20//! ## Problem
21//!
22//! Current scan implementation is row-at-a-time:
23//! ```ignore
24//! for entry in self.data.iter() {  // DashMap iteration - pointer chasing
25//!     if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
26//!         && let Some(value) = &v.value  // Option unwrapping
27//!     {
28//!         results.push((key.clone(), value.clone())); // Heap allocs
29//!     }
30//! }
31//! ```
32//!
33//! This has:
34//! - DashMap iterator overhead (~20ns per entry)
35//! - MVCC version chain traversal per row
36//! - Clone allocations in hot path
37//!
38//! ## Solution
39//!
40//! Vectorized execution: process 1024+ rows in a batch, amortizing overhead.
41//!
42//! ## Performance Analysis
43//!
44//! Vectorized execution model:
45//! - **Batch size B = 1024 rows**
46//! - **Per-batch overhead**: O(1) iterator setup + O(B) SIMD operations
47//! - **Amortized cost**: `(setup + B×simd_op) / B ≈ simd_op` for large B
48//!
49//! SIMD comparison (AVX-512):
50//! ```text
51//! Scalar: 100 cycles × 1000 rows = 100,000 cycles
52//! SIMD: 100 cycles × (1000/16) = 6,250 cycles (16x speedup)
53//! ```
54//!
55//! Memory bandwidth calculation:
56//! - Current: 300 bytes/row × 1M rows = 300 MB, random access
57//! - Proposed: 60 bytes/row × 1M rows = 60 MB, sequential scan
58//! - RAM bandwidth: ~50 GB/s → theoretical 833M rows/sec
59//!
60//! ## Expected Improvement
61//!
62//! 10-20x scan throughput improvement
63
64use std::sync::atomic::{AtomicUsize, Ordering};
65
66/// Default batch size for vectorized operations
67/// 1024 rows fits comfortably in L2 cache (~256KB with 256-byte rows)
68pub const DEFAULT_BATCH_SIZE: usize = 1024;
69
70/// Minimum batch size (below this, scalar is faster)
71pub const MIN_BATCH_SIZE: usize = 64;
72
73/// Maximum batch size (above this, memory pressure increases)
74pub const MAX_BATCH_SIZE: usize = 8192;
75
76// =============================================================================
77// Column Vectors for SIMD Operations
78// =============================================================================
79
80/// Typed column vector for SIMD-friendly access
81/// 
82/// Each variant stores contiguous values for vectorized operations.
83#[derive(Debug, Clone)]
84pub enum ColumnVector {
85    /// Boolean column (bit-packed for SIMD)
86    Bool(Vec<bool>),
87    /// 64-bit signed integers (SIMD-friendly)
88    Int64(Vec<i64>),
89    /// 64-bit unsigned integers
90    UInt64(Vec<u64>),
91    /// 64-bit floats
92    Float64(Vec<f64>),
93    /// Variable-length strings (Arrow-style: offsets + data)
94    String {
95        offsets: Vec<u32>,
96        data: Vec<u8>,
97    },
98    /// Binary data (Arrow-style: offsets + data)
99    Binary {
100        offsets: Vec<u32>,
101        data: Vec<u8>,
102    },
103    /// Null bitmap for any column (packed bits)
104    Null(Vec<u64>),
105}
106
107impl ColumnVector {
108    /// Create empty column vector of specified type
109    pub fn new_int64(capacity: usize) -> Self {
110        ColumnVector::Int64(Vec::with_capacity(capacity))
111    }
112
113    pub fn new_float64(capacity: usize) -> Self {
114        ColumnVector::Float64(Vec::with_capacity(capacity))
115    }
116
117    pub fn new_string(capacity: usize) -> Self {
118        ColumnVector::String {
119            offsets: Vec::with_capacity(capacity + 1),
120            data: Vec::with_capacity(capacity * 32), // Assume avg 32 bytes per string
121        }
122    }
123
124    /// Get length of vector
125    pub fn len(&self) -> usize {
126        match self {
127            ColumnVector::Bool(v) => v.len(),
128            ColumnVector::Int64(v) => v.len(),
129            ColumnVector::UInt64(v) => v.len(),
130            ColumnVector::Float64(v) => v.len(),
131            ColumnVector::String { offsets, .. } => offsets.len().saturating_sub(1),
132            ColumnVector::Binary { offsets, .. } => offsets.len().saturating_sub(1),
133            ColumnVector::Null(v) => v.len() * 64,
134        }
135    }
136
137    /// Check if empty
138    pub fn is_empty(&self) -> bool {
139        self.len() == 0
140    }
141
142    /// Get memory size in bytes
143    pub fn memory_size(&self) -> usize {
144        match self {
145            ColumnVector::Bool(v) => v.len(),
146            ColumnVector::Int64(v) => v.len() * 8,
147            ColumnVector::UInt64(v) => v.len() * 8,
148            ColumnVector::Float64(v) => v.len() * 8,
149            ColumnVector::String { offsets, data } => offsets.len() * 4 + data.len(),
150            ColumnVector::Binary { offsets, data } => offsets.len() * 4 + data.len(),
151            ColumnVector::Null(v) => v.len() * 8,
152        }
153    }
154
155    /// Sum for Int64 column (SIMD-accelerated)
156    #[cfg(target_arch = "x86_64")]
157    pub fn sum_i64(&self) -> Option<i64> {
158        match self {
159            ColumnVector::Int64(values) => {
160                if values.is_empty() {
161                    return Some(0);
162                }
163                
164                // Use SIMD for large vectors
165                if values.len() >= 16 {
166                    Some(simd_sum_i64(values))
167                } else {
168                    Some(values.iter().sum())
169                }
170            }
171            _ => None,
172        }
173    }
174
175    #[cfg(not(target_arch = "x86_64"))]
176    pub fn sum_i64(&self) -> Option<i64> {
177        match self {
178            ColumnVector::Int64(values) => Some(values.iter().sum()),
179            _ => None,
180        }
181    }
182
183    /// Sum for Float64 column (SIMD-accelerated)
184    #[cfg(target_arch = "x86_64")]
185    pub fn sum_f64(&self) -> Option<f64> {
186        match self {
187            ColumnVector::Float64(values) => {
188                if values.is_empty() {
189                    return Some(0.0);
190                }
191                
192                if values.len() >= 8 {
193                    Some(simd_sum_f64(values))
194                } else {
195                    Some(values.iter().sum())
196                }
197            }
198            _ => None,
199        }
200    }
201
202    #[cfg(not(target_arch = "x86_64"))]
203    pub fn sum_f64(&self) -> Option<f64> {
204        match self {
205            ColumnVector::Float64(values) => Some(values.iter().sum()),
206            _ => None,
207        }
208    }
209}
210
211// =============================================================================
212// SIMD Implementations (x86_64 with AVX2)
213// =============================================================================
214
215/// SIMD sum for i64 values using AVX2 (4 × i64 per vector)
216#[cfg(target_arch = "x86_64")]
217fn simd_sum_i64(values: &[i64]) -> i64 {
218    #[cfg(target_feature = "avx2")]
219    {
220        use std::arch::x86_64::*;
221        
222        unsafe {
223            let mut sum = _mm256_setzero_si256();
224            let chunks = values.len() / 4;
225            let ptr = values.as_ptr();
226            
227            for i in 0..chunks {
228                let v = _mm256_loadu_si256(ptr.add(i * 4) as *const __m256i);
229                sum = _mm256_add_epi64(sum, v);
230            }
231            
232            // Horizontal add
233            let arr: [i64; 4] = std::mem::transmute(sum);
234            let simd_total: i64 = arr.iter().sum();
235            
236            // Add remaining elements
237            let remaining: i64 = values[chunks * 4..].iter().sum();
238            simd_total + remaining
239        }
240    }
241    
242    #[cfg(not(target_feature = "avx2"))]
243    {
244        values.iter().sum()
245    }
246}
247
248/// SIMD sum for f64 values using AVX (4 × f64 per vector)
249#[cfg(target_arch = "x86_64")]
250fn simd_sum_f64(values: &[f64]) -> f64 {
251    #[cfg(target_feature = "avx")]
252    {
253        use std::arch::x86_64::*;
254        
255        unsafe {
256            let mut sum = _mm256_setzero_pd();
257            let chunks = values.len() / 4;
258            let ptr = values.as_ptr();
259            
260            for i in 0..chunks {
261                let v = _mm256_loadu_pd(ptr.add(i * 4));
262                sum = _mm256_add_pd(sum, v);
263            }
264            
265            // Horizontal add
266            let arr: [f64; 4] = std::mem::transmute(sum);
267            let simd_total: f64 = arr.iter().sum();
268            
269            // Add remaining elements
270            let remaining: f64 = values[chunks * 4..].iter().sum();
271            simd_total + remaining
272        }
273    }
274    
275    #[cfg(not(target_feature = "avx"))]
276    {
277        values.iter().sum()
278    }
279}
280
281// =============================================================================
282// Vectorized Batch for Processing
283// =============================================================================
284
285/// A batch of rows for vectorized processing
286/// 
287/// Instead of processing one row at a time, we accumulate rows into
288/// batches and process them together for better cache utilization
289/// and SIMD opportunities.
290#[derive(Debug)]
291pub struct VectorBatch {
292    /// Column data in columnar format
293    columns: Vec<(String, ColumnVector)>,
294    /// Row count in this batch
295    row_count: usize,
296    /// Capacity (pre-allocated)
297    capacity: usize,
298    /// Selection vector (indexes of selected rows after filtering)
299    selection: Option<Vec<usize>>,
300}
301
302impl VectorBatch {
303    /// Create a new batch with specified capacity
304    pub fn with_capacity(capacity: usize) -> Self {
305        Self {
306            columns: Vec::new(),
307            row_count: 0,
308            capacity,
309            selection: None,
310        }
311    }
312
313    /// Create batch with default size
314    pub fn new() -> Self {
315        Self::with_capacity(DEFAULT_BATCH_SIZE)
316    }
317
318    /// Get batch capacity
319    pub fn capacity(&self) -> usize {
320        self.capacity
321    }
322
323    /// Get current row count
324    pub fn row_count(&self) -> usize {
325        if let Some(ref sel) = self.selection {
326            sel.len()
327        } else {
328            self.row_count
329        }
330    }
331
332    /// Check if batch is full
333    pub fn is_full(&self) -> bool {
334        self.row_count >= self.capacity
335    }
336
337    /// Check if batch is empty
338    pub fn is_empty(&self) -> bool {
339        self.row_count == 0
340    }
341
342    /// Add a column to the batch
343    pub fn add_column(&mut self, name: impl Into<String>, column: ColumnVector) {
344        self.columns.push((name.into(), column));
345        if self.row_count == 0 {
346            self.row_count = self.columns.last().map(|(_, c)| c.len()).unwrap_or(0);
347        }
348    }
349
350    /// Get column by name
351    pub fn column(&self, name: &str) -> Option<&ColumnVector> {
352        self.columns
353            .iter()
354            .find(|(n, _)| n == name)
355            .map(|(_, c)| c)
356    }
357
358    /// Get column by index
359    pub fn column_at(&self, idx: usize) -> Option<&ColumnVector> {
360        self.columns.get(idx).map(|(_, c)| c)
361    }
362
363    /// Get column count
364    pub fn column_count(&self) -> usize {
365        self.columns.len()
366    }
367
368    /// Set selection vector (for filtering)
369    pub fn set_selection(&mut self, selection: Vec<usize>) {
370        self.selection = Some(selection);
371    }
372
373    /// Clear selection vector
374    pub fn clear_selection(&mut self) {
375        self.selection = None;
376    }
377
378    /// Get total memory size
379    pub fn memory_size(&self) -> usize {
380        self.columns.iter().map(|(_, c)| c.memory_size()).sum()
381    }
382
383    /// Reset batch for reuse
384    pub fn reset(&mut self) {
385        self.columns.clear();
386        self.row_count = 0;
387        self.selection = None;
388    }
389}
390
391impl Default for VectorBatch {
392    fn default() -> Self {
393        Self::new()
394    }
395}
396
397// =============================================================================
398// Vectorized Scan Engine
399// =============================================================================
400
401/// Statistics for vectorized scan operations
402#[derive(Debug, Default)]
403pub struct VectorizedScanStats {
404    /// Total rows scanned
405    pub rows_scanned: AtomicUsize,
406    /// Batches processed
407    pub batches_processed: AtomicUsize,
408    /// Rows passing filter
409    pub rows_passed: AtomicUsize,
410    /// Bytes read from storage
411    pub bytes_read: AtomicUsize,
412}
413
414impl VectorizedScanStats {
415    pub fn new() -> Self {
416        Self::default()
417    }
418
419    pub fn record_batch(&self, rows: usize, passed: usize, bytes: usize) {
420        self.rows_scanned.fetch_add(rows, Ordering::Relaxed);
421        self.batches_processed.fetch_add(1, Ordering::Relaxed);
422        self.rows_passed.fetch_add(passed, Ordering::Relaxed);
423        self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
424    }
425
426    pub fn rows_scanned(&self) -> usize {
427        self.rows_scanned.load(Ordering::Relaxed)
428    }
429
430    pub fn batches_processed(&self) -> usize {
431        self.batches_processed.load(Ordering::Relaxed)
432    }
433}
434
435/// Predicate for vectorized filtering
436pub trait VectorPredicate: Send + Sync {
437    /// Apply predicate to a column vector, returning selection bitmap
438    fn evaluate(&self, column: &ColumnVector) -> Vec<bool>;
439    
440    /// Get the column name this predicate operates on
441    fn column_name(&self) -> &str;
442}
443
444/// Comparison predicate for i64 columns
445#[derive(Debug, Clone)]
446pub struct Int64Comparison {
447    column_name: String,
448    op: ComparisonOp,
449    value: i64,
450}
451
452/// Comparison operators
453#[derive(Debug, Clone, Copy)]
454pub enum ComparisonOp {
455    Equal,
456    NotEqual,
457    LessThan,
458    LessEqual,
459    GreaterThan,
460    GreaterEqual,
461}
462
463impl Int64Comparison {
464    pub fn new(column_name: impl Into<String>, op: ComparisonOp, value: i64) -> Self {
465        Self {
466            column_name: column_name.into(),
467            op,
468            value,
469        }
470    }
471
472    pub fn eq(column_name: impl Into<String>, value: i64) -> Self {
473        Self::new(column_name, ComparisonOp::Equal, value)
474    }
475
476    pub fn gt(column_name: impl Into<String>, value: i64) -> Self {
477        Self::new(column_name, ComparisonOp::GreaterThan, value)
478    }
479
480    pub fn lt(column_name: impl Into<String>, value: i64) -> Self {
481        Self::new(column_name, ComparisonOp::LessThan, value)
482    }
483}
484
485impl VectorPredicate for Int64Comparison {
486    fn evaluate(&self, column: &ColumnVector) -> Vec<bool> {
487        match column {
488            ColumnVector::Int64(values) => {
489                let cmp_value = self.value;
490                match self.op {
491                    ComparisonOp::Equal => values.iter().map(|&v| v == cmp_value).collect(),
492                    ComparisonOp::NotEqual => values.iter().map(|&v| v != cmp_value).collect(),
493                    ComparisonOp::LessThan => values.iter().map(|&v| v < cmp_value).collect(),
494                    ComparisonOp::LessEqual => values.iter().map(|&v| v <= cmp_value).collect(),
495                    ComparisonOp::GreaterThan => values.iter().map(|&v| v > cmp_value).collect(),
496                    ComparisonOp::GreaterEqual => values.iter().map(|&v| v >= cmp_value).collect(),
497                }
498            }
499            _ => vec![false; column.len()],
500        }
501    }
502
503    fn column_name(&self) -> &str {
504        &self.column_name
505    }
506}
507
508/// Apply SIMD-optimized comparison for i64 values
509#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
510pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
511    use std::arch::x86_64::*;
512    
513    let mut result = vec![false; values.len()];
514    let chunks = values.len() / 4;
515    
516    unsafe {
517        let threshold_vec = _mm256_set1_epi64x(threshold);
518        
519        for i in 0..chunks {
520            let v = _mm256_loadu_si256(values.as_ptr().add(i * 4) as *const __m256i);
521            let cmp = _mm256_cmpgt_epi64(v, threshold_vec);
522            let mask = _mm256_movemask_epi8(cmp) as u32;
523            
524            // Each i64 occupies 8 bytes, so bits 0-7, 8-15, 16-23, 24-31
525            result[i * 4] = (mask & 0xFF) != 0;
526            result[i * 4 + 1] = (mask & 0xFF00) != 0;
527            result[i * 4 + 2] = (mask & 0xFF0000) != 0;
528            result[i * 4 + 3] = (mask & 0xFF000000) != 0;
529        }
530        
531        // Handle remaining
532        for i in (chunks * 4)..values.len() {
533            result[i] = values[i] > threshold;
534        }
535    }
536    
537    result
538}
539
540#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
541pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
542    values.iter().map(|&v| v > threshold).collect()
543}
544
545// =============================================================================
546// Vectorized Scan Iterator
547// =============================================================================
548
549/// Configuration for vectorized scans
550#[derive(Debug, Clone)]
551pub struct VectorizedScanConfig {
552    /// Batch size for processing
553    pub batch_size: usize,
554    /// Enable prefetching
555    pub prefetch_enabled: bool,
556    /// Prefetch distance in rows
557    pub prefetch_distance: usize,
558    /// Enable SIMD acceleration
559    pub simd_enabled: bool,
560}
561
562impl Default for VectorizedScanConfig {
563    fn default() -> Self {
564        Self {
565            batch_size: DEFAULT_BATCH_SIZE,
566            prefetch_enabled: true,
567            prefetch_distance: 16,
568            simd_enabled: true,
569        }
570    }
571}
572
573impl VectorizedScanConfig {
574    pub fn new() -> Self {
575        Self::default()
576    }
577
578    pub fn with_batch_size(mut self, size: usize) -> Self {
579        self.batch_size = size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
580        self
581    }
582
583    pub fn with_prefetch(mut self, enabled: bool) -> Self {
584        self.prefetch_enabled = enabled;
585        self
586    }
587}
588
589// =============================================================================
590// SIMD Visibility Filtering (Task 4: Zero-Copy SIMD Scans)
591// =============================================================================
592
593/// SIMD-accelerated MVCC visibility filter
594///
595/// ## Performance Analysis
596///
597/// For a snapshot timestamp `S`, a row with commit_ts `C` is visible if:
598/// - `C != 0` (committed) AND `C < S` (committed before snapshot)
599///
600/// This can be vectorized by processing 4 timestamps at a time (AVX2):
601/// ```text
602/// visible[i] = (commit_ts[i] != 0) & (commit_ts[i] < snapshot_ts)
603/// ```
604///
605/// Throughput improvement:
606/// - Scalar: ~1 billion comparisons/sec
607/// - AVX2 (4-way): ~4 billion comparisons/sec  
608/// - AVX-512 (8-way): ~8 billion comparisons/sec
609///
610/// For 1M rows: scalar = 1ms, SIMD = 125-250µs (4-8x speedup)
611pub struct SimdVisibilityFilter;
612
613impl SimdVisibilityFilter {
614    /// Filter a batch of commit timestamps for visibility
615    ///
616    /// Returns a bitmask where 1 = visible, 0 = not visible
617    #[inline]
618    pub fn filter_batch(commit_ts: &[u64], snapshot_ts: u64) -> Vec<bool> {
619        let mut result = vec![false; commit_ts.len()];
620        Self::filter_batch_into(commit_ts, snapshot_ts, &mut result);
621        result
622    }
623
624    /// Filter into an existing buffer to avoid allocation
625    #[inline]
626    pub fn filter_batch_into(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
627        assert_eq!(commit_ts.len(), out.len());
628
629        #[cfg(target_arch = "x86_64")]
630        {
631            Self::filter_batch_simd_x86(commit_ts, snapshot_ts, out);
632            return;
633        }
634
635        #[cfg(target_arch = "aarch64")]
636        {
637            Self::filter_batch_simd_neon(commit_ts, snapshot_ts, out);
638            return;
639        }
640
641        #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
642        {
643            Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
644        }
645    }
646
647    /// Scalar fallback implementation
648    #[inline]
649    fn filter_batch_scalar(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
650        for (i, &ts) in commit_ts.iter().enumerate() {
651            // Visible if: committed (ts != 0) AND committed before snapshot (ts < snapshot_ts)
652            out[i] = ts != 0 && ts < snapshot_ts;
653        }
654    }
655
656    /// x86_64 AVX2 implementation (4 u64s per iteration)
657    #[cfg(target_arch = "x86_64")]
658    fn filter_batch_simd_x86(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
659        let n = commit_ts.len();
660        if n == 0 {
661            return;
662        }
663
664        // Process in chunks of 4 (AVX2 processes 4 × u64)
665        let chunks = n / 4;
666        let remainder = n % 4;
667
668        // Use wide comparison on x86_64
669        // Note: AVX2 doesn't have native u64 comparison, so we use signed comparison
670        // which works for our use case (timestamps are positive)
671        #[cfg(target_feature = "avx2")]
672        unsafe {
673            use std::arch::x86_64::*;
674
675            let zero = _mm256_setzero_si256();
676            let snapshot_vec = _mm256_set1_epi64x(snapshot_ts as i64);
677
678            for chunk in 0..chunks {
679                let ptr = commit_ts.as_ptr().add(chunk * 4) as *const __m256i;
680                let ts_vec = _mm256_loadu_si256(ptr);
681
682                // Check ts != 0 using PCMPEQ and inverting
683                let not_zero = _mm256_xor_si256(
684                    _mm256_cmpeq_epi64(ts_vec, zero),
685                    _mm256_set1_epi64x(-1), // All 1s
686                );
687
688                // Check ts < snapshot using PCMPGT and inverting
689                // (a < b) == !(a >= b) == !(a > b || a == b)
690                let less_than = _mm256_xor_si256(
691                    _mm256_or_si256(
692                        _mm256_cmpgt_epi64(ts_vec, snapshot_vec),
693                        _mm256_cmpeq_epi64(ts_vec, snapshot_vec),
694                    ),
695                    _mm256_set1_epi64x(-1),
696                );
697
698                // Combine: visible = not_zero AND less_than
699                let visible = _mm256_and_si256(not_zero, less_than);
700
701                // Extract results - each 64-bit lane is either all 1s or all 0s
702                let mask: [i64; 4] = std::mem::transmute(visible);
703                for j in 0..4 {
704                    out[chunk * 4 + j] = mask[j] != 0;
705                }
706            }
707        }
708
709        #[cfg(not(target_feature = "avx2"))]
710        {
711            // SSE2 fallback (2 × u64 per iteration)
712            let chunks = n / 2;
713            for chunk in 0..chunks {
714                let base = chunk * 2;
715                for j in 0..2 {
716                    let ts = commit_ts[base + j];
717                    out[base + j] = ts != 0 && ts < snapshot_ts;
718                }
719            }
720        }
721
722        // Handle remainder
723        let base = chunks * 4;
724        for i in 0..remainder {
725            let ts = commit_ts[base + i];
726            out[base + i] = ts != 0 && ts < snapshot_ts;
727        }
728    }
729
730    /// ARM NEON implementation (2 u64s per iteration)
731    #[cfg(target_arch = "aarch64")]
732    fn filter_batch_simd_neon(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
733        // NEON doesn't have all the u64 operations we need, so use scalar on ARM
734        // In a production system, we'd use assembly or wait for better NEON intrinsics
735        Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
736    }
737
738    /// Filter with transaction ID for self-visibility
739    ///
740    /// A row is visible if:
741    /// - (commit_ts != 0 AND commit_ts < snapshot_ts), OR
742    /// - txn_id == current_txn_id (own uncommitted writes)
743    #[inline]
744    pub fn filter_batch_with_txn(
745        commit_ts: &[u64],
746        txn_ids: &[u64],
747        snapshot_ts: u64,
748        current_txn_id: u64,
749        out: &mut [bool],
750    ) {
751        assert_eq!(commit_ts.len(), txn_ids.len());
752        assert_eq!(commit_ts.len(), out.len());
753
754        // First pass: standard visibility
755        Self::filter_batch_into(commit_ts, snapshot_ts, out);
756
757        // Second pass: add self-visibility
758        for (i, &txn_id) in txn_ids.iter().enumerate() {
759            if txn_id == current_txn_id {
760                out[i] = true;
761            }
762        }
763    }
764
765    /// Count visible rows without allocating a result vector
766    #[inline]
767    pub fn count_visible(commit_ts: &[u64], snapshot_ts: u64) -> usize {
768        let mut count = 0;
769        for &ts in commit_ts {
770            if ts != 0 && ts < snapshot_ts {
771                count += 1;
772            }
773        }
774        count
775    }
776}
777
778/// Versioned slice for zero-copy access
779///
780/// Holds a reference to data along with visibility metadata,
781/// avoiding copies during scan iteration.
782#[derive(Debug, Clone)]
783pub struct VersionedSlice<'a> {
784    /// Key bytes (zero-copy reference)
785    pub key: &'a [u8],
786    /// Value bytes (zero-copy reference, None = tombstone)
787    pub value: Option<&'a [u8]>,
788    /// Commit timestamp (0 = uncommitted)
789    pub commit_ts: u64,
790    /// Transaction ID that wrote this version
791    pub txn_id: u64,
792}
793
794impl<'a> VersionedSlice<'a> {
795    /// Check visibility at a snapshot
796    #[inline]
797    pub fn is_visible(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
798        // Self-visibility
799        if let Some(my_txn) = current_txn_id {
800            if self.txn_id == my_txn {
801                return true;
802            }
803        }
804        // Standard visibility: committed before snapshot
805        self.commit_ts != 0 && self.commit_ts < snapshot_ts
806    }
807}
808
809/// Streaming scan iterator with SIMD visibility filtering
810///
811/// ## Design
812///
813/// Instead of materializing all results upfront, this iterator:
814/// 1. Fetches batches of entries from the underlying source
815/// 2. Applies SIMD visibility filtering to the batch
816/// 3. Yields visible entries one at a time
817///
818/// This reduces memory allocation and leverages SIMD for batch filtering.
819pub struct StreamingScanIterator<'a, I>
820where
821    I: Iterator<Item = VersionedSlice<'a>>,
822{
823    /// Underlying iterator
824    source: I,
825    /// Current batch of entries
826    batch: Vec<VersionedSlice<'a>>,
827    /// Visibility mask for current batch
828    visibility: Vec<bool>,
829    /// Current position in batch
830    pos: usize,
831    /// Snapshot timestamp for visibility
832    snapshot_ts: u64,
833    /// Current transaction ID for self-visibility
834    current_txn_id: Option<u64>,
835    /// Batch size for prefetching
836    batch_size: usize,
837}
838
839impl<'a, I> StreamingScanIterator<'a, I>
840where
841    I: Iterator<Item = VersionedSlice<'a>>,
842{
843    /// Create a new streaming scan iterator
844    pub fn new(source: I, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
845        Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
846    }
847
848    /// Create with custom batch size
849    pub fn with_batch_size(
850        source: I,
851        snapshot_ts: u64,
852        current_txn_id: Option<u64>,
853        batch_size: usize,
854    ) -> Self {
855        Self {
856            source,
857            batch: Vec::with_capacity(batch_size),
858            visibility: Vec::with_capacity(batch_size),
859            pos: 0,
860            snapshot_ts,
861            current_txn_id,
862            batch_size,
863        }
864    }
865
866    /// Fetch next batch and compute visibility
867    fn fetch_batch(&mut self) -> bool {
868        self.batch.clear();
869        self.visibility.clear();
870        self.pos = 0;
871
872        // Collect batch
873        for entry in self.source.by_ref().take(self.batch_size) {
874            self.batch.push(entry);
875        }
876
877        if self.batch.is_empty() {
878            return false;
879        }
880
881        // Extract commit timestamps for SIMD filtering
882        let commit_ts: Vec<u64> = self.batch.iter().map(|e| e.commit_ts).collect();
883        self.visibility.resize(self.batch.len(), false);
884
885        if let Some(txn_id) = self.current_txn_id {
886            let txn_ids: Vec<u64> = self.batch.iter().map(|e| e.txn_id).collect();
887            SimdVisibilityFilter::filter_batch_with_txn(
888                &commit_ts,
889                &txn_ids,
890                self.snapshot_ts,
891                txn_id,
892                &mut self.visibility,
893            );
894        } else {
895            SimdVisibilityFilter::filter_batch_into(&commit_ts, self.snapshot_ts, &mut self.visibility);
896        }
897
898        true
899    }
900}
901
902impl<'a, I> Iterator for StreamingScanIterator<'a, I>
903where
904    I: Iterator<Item = VersionedSlice<'a>>,
905{
906    type Item = VersionedSlice<'a>;
907
908    fn next(&mut self) -> Option<Self::Item> {
909        loop {
910            // Exhausted current batch?
911            while self.pos >= self.batch.len() {
912                if !self.fetch_batch() {
913                    return None;
914                }
915            }
916
917            // Find next visible entry in current batch
918            while self.pos < self.batch.len() {
919                let idx = self.pos;
920                self.pos += 1;
921
922                if self.visibility[idx] {
923                    return Some(self.batch[idx].clone());
924                }
925            }
926        }
927    }
928}
929
930// =============================================================================
931// SoA Batch + Late Materialization (80/20 Optimization)
932// =============================================================================
933
934/// Structure of Arrays (SoA) batch for optimal SIMD visibility filtering
935///
936/// ## Why SoA?
937///
938/// AoS (Array of Structures) layout:
939/// ```text
940/// [key, value, commit_ts, txn_id], [key, value, commit_ts, txn_id], ...
941/// ```
942/// - Poor cache utilization for visibility checks
943/// - SIMD loads scatter data across cache lines
944///
945/// SoA (Structure of Arrays) layout:
946/// ```text
947/// commit_ts: [ts1, ts2, ts3, ts4, ...]  // Contiguous for SIMD
948/// txn_ids:   [id1, id2, id3, id4, ...]  // Contiguous for SIMD
949/// keys:      [k1, k2, k3, k4, ...]      // Only accessed for visible rows
950/// values:    [v1, v2, v3, v4, ...]      // Late materialized
951/// ```
952///
953/// SIMD can process 4-8 timestamps per cycle with perfect cache utilization.
954///
955/// ## Late Materialization
956///
957/// Values are NOT copied into the batch. Instead, we store offsets/handles
958/// and only materialize values for rows that pass visibility filtering.
959///
960/// For scans where 90% of rows are filtered out, this saves ~90% of value copies.
961#[derive(Debug)]
962pub struct SoaBatch<'a> {
963    /// Contiguous commit timestamps for SIMD visibility filtering
964    pub commit_ts: Vec<u64>,
965    /// Contiguous transaction IDs for self-visibility checking
966    pub txn_ids: Vec<u64>,
967    /// Key references (zero-copy)
968    pub keys: Vec<&'a [u8]>,
969    /// Value materializer handles (late binding)
970    /// None = tombstone, Some = handle to materialize value
971    pub value_handles: Vec<Option<ValueHandle<'a>>>,
972    /// Pre-computed visibility mask (after SIMD filtering)
973    pub visibility: Vec<bool>,
974    /// Selection vector: indices of visible rows for late materialization
975    pub selection: Vec<usize>,
976}
977
978/// Handle for late value materialization
979///
980/// Instead of copying values upfront, we store a handle that can
981/// materialize the value on-demand when needed.
982#[derive(Debug, Clone, Copy)]
983pub enum ValueHandle<'a> {
984    /// Direct reference (zero-copy for inmemory data)
985    Direct(&'a [u8]),
986    /// Offset in a data block (for disk-resident data)
987    BlockOffset { block_id: u32, offset: u32, len: u32 },
988    /// Deferred load from arena
989    ArenaSlot { arena_id: u32, slot: u32 },
990}
991
992impl<'a> ValueHandle<'a> {
993    /// Materialize the value (called only for visible rows)
994    pub fn materialize(&self) -> Option<&'a [u8]> {
995        match self {
996            ValueHandle::Direct(data) => Some(*data),
997            // For block/arena handles, would call into storage layer
998            // For now, return None (would be implemented with storage context)
999            ValueHandle::BlockOffset { .. } => None,
1000            ValueHandle::ArenaSlot { .. } => None,
1001        }
1002    }
1003}
1004
1005impl<'a> SoaBatch<'a> {
1006    /// Create a new SoA batch with capacity
1007    pub fn with_capacity(capacity: usize) -> Self {
1008        Self {
1009            commit_ts: Vec::with_capacity(capacity),
1010            txn_ids: Vec::with_capacity(capacity),
1011            keys: Vec::with_capacity(capacity),
1012            value_handles: Vec::with_capacity(capacity),
1013            visibility: Vec::with_capacity(capacity),
1014            selection: Vec::with_capacity(capacity),
1015        }
1016    }
1017
1018    /// Add an entry to the batch (SoA decomposition)
1019    #[inline]
1020    pub fn push(&mut self, key: &'a [u8], value: Option<&'a [u8]>, commit_ts: u64, txn_id: u64) {
1021        self.commit_ts.push(commit_ts);
1022        self.txn_ids.push(txn_id);
1023        self.keys.push(key);
1024        self.value_handles.push(value.map(ValueHandle::Direct));
1025    }
1026
1027    /// Add with block handle (for disk-resident values)
1028    #[inline]
1029    pub fn push_deferred(
1030        &mut self,
1031        key: &'a [u8],
1032        handle: Option<ValueHandle<'a>>,
1033        commit_ts: u64,
1034        txn_id: u64,
1035    ) {
1036        self.commit_ts.push(commit_ts);
1037        self.txn_ids.push(txn_id);
1038        self.keys.push(key);
1039        self.value_handles.push(handle);
1040    }
1041
1042    /// Get batch size
1043    pub fn len(&self) -> usize {
1044        self.commit_ts.len()
1045    }
1046
1047    /// Check if empty
1048    pub fn is_empty(&self) -> bool {
1049        self.commit_ts.is_empty()
1050    }
1051
1052    /// Clear the batch for reuse
1053    pub fn clear(&mut self) {
1054        self.commit_ts.clear();
1055        self.txn_ids.clear();
1056        self.keys.clear();
1057        self.value_handles.clear();
1058        self.visibility.clear();
1059        self.selection.clear();
1060    }
1061
1062    /// Apply SIMD visibility filtering and build selection vector
1063    ///
1064    /// This is the hot path - SIMD processes commit_ts array directly
1065    /// without touching keys/values until we know what's visible.
1066    pub fn filter_visibility(&mut self, snapshot_ts: u64, current_txn_id: Option<u64>) {
1067        let n = self.len();
1068        self.visibility.resize(n, false);
1069        self.selection.clear();
1070
1071        // SIMD filter on contiguous commit_ts array
1072        if let Some(txn_id) = current_txn_id {
1073            SimdVisibilityFilter::filter_batch_with_txn(
1074                &self.commit_ts,
1075                &self.txn_ids,
1076                snapshot_ts,
1077                txn_id,
1078                &mut self.visibility,
1079            );
1080        } else {
1081            SimdVisibilityFilter::filter_batch_into(&self.commit_ts, snapshot_ts, &mut self.visibility);
1082        }
1083
1084        // Build selection vector (indices of visible rows)
1085        for (i, &visible) in self.visibility.iter().enumerate() {
1086            if visible {
1087                self.selection.push(i);
1088            }
1089        }
1090    }
1091
1092    /// Get visible row count (after filtering)
1093    pub fn visible_count(&self) -> usize {
1094        self.selection.len()
1095    }
1096
1097    /// Iterate over visible rows with late materialization
1098    ///
1099    /// Values are only materialized for rows in the selection vector.
1100    pub fn iter_visible(&self) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)> + '_ {
1101        self.selection.iter().map(move |&idx| {
1102            let key = self.keys[idx];
1103            let value = self.value_handles[idx].and_then(|h| h.materialize());
1104            (key, value)
1105        })
1106    }
1107
1108    /// Iterate visible rows with full metadata
1109    pub fn iter_visible_full(
1110        &self,
1111    ) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>, u64, u64)> + '_ {
1112        self.selection.iter().map(move |&idx| {
1113            let key = self.keys[idx];
1114            let value = self.value_handles[idx].and_then(|h| h.materialize());
1115            let ts = self.commit_ts[idx];
1116            let txn = self.txn_ids[idx];
1117            (key, value, ts, txn)
1118        })
1119    }
1120}
1121
1122/// High-performance SoA scan iterator with SIMD + late materialization
1123///
1124/// ## Performance Characteristics
1125///
1126/// | Phase              | Cache Behavior           | SIMD Usage    |
1127/// |--------------------|--------------------------|---------------|
1128/// | Load batch         | Sequential (SoA arrays)  | N/A           |
1129/// | Visibility filter  | L1-hot (commit_ts only)  | 4-8× speedup  |
1130/// | Build selection    | Sequential (visibility)  | Auto-vectorized |
1131/// | Materialize values | Random (visible only)    | N/A           |
1132///
1133/// For 1M rows with 10% selectivity:
1134/// - Old: Process 1M rows with random access = ~50ms
1135/// - New: SIMD filter 1M × 8 bytes = ~1ms, materialize 100K values = ~5ms
1136/// - **~8× speedup** from SoA + SIMD + late materialization
1137pub struct SoaScanIterator<'a, S>
1138where
1139    S: SoaSource<'a>,
1140{
1141    /// Source that provides SoA batches
1142    source: S,
1143    /// Current batch
1144    batch: SoaBatch<'a>,
1145    /// Position in selection vector
1146    pos: usize,
1147    /// Snapshot timestamp
1148    snapshot_ts: u64,
1149    /// Current transaction ID
1150    current_txn_id: Option<u64>,
1151    /// Batch size
1152    #[allow(dead_code)]
1153    batch_size: usize,
1154    /// Statistics
1155    stats: SoaScanStats,
1156}
1157
1158/// Statistics for SoA scan performance monitoring
1159#[derive(Debug, Default, Clone)]
1160pub struct SoaScanStats {
1161    /// Total rows scanned
1162    pub rows_scanned: usize,
1163    /// Rows that passed visibility filter
1164    pub rows_visible: usize,
1165    /// Rows where values were materialized
1166    pub values_materialized: usize,
1167    /// Number of batches processed
1168    pub batches_processed: usize,
1169}
1170
1171impl SoaScanStats {
1172    /// Get selectivity ratio
1173    pub fn selectivity(&self) -> f64 {
1174        if self.rows_scanned == 0 {
1175            0.0
1176        } else {
1177            self.rows_visible as f64 / self.rows_scanned as f64
1178        }
1179    }
1180
1181    /// Get value materialization efficiency
1182    /// (1.0 = all visible rows materialized, lower = some skipped)
1183    pub fn materialization_efficiency(&self) -> f64 {
1184        if self.rows_visible == 0 {
1185            1.0
1186        } else {
1187            self.values_materialized as f64 / self.rows_visible as f64
1188        }
1189    }
1190}
1191
1192/// Trait for sources that provide SoA batches
1193pub trait SoaSource<'a> {
1194    /// Fill a batch with entries from the source
1195    /// Returns false if source is exhausted
1196    fn fill_batch(&mut self, batch: &mut SoaBatch<'a>) -> bool;
1197}
1198
1199impl<'a, S> SoaScanIterator<'a, S>
1200where
1201    S: SoaSource<'a>,
1202{
1203    /// Create new SoA scan iterator
1204    pub fn new(source: S, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
1205        Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
1206    }
1207
1208    /// Create with custom batch size
1209    pub fn with_batch_size(
1210        source: S,
1211        snapshot_ts: u64,
1212        current_txn_id: Option<u64>,
1213        batch_size: usize,
1214    ) -> Self {
1215        Self {
1216            source,
1217            batch: SoaBatch::with_capacity(batch_size),
1218            pos: 0,
1219            snapshot_ts,
1220            current_txn_id,
1221            batch_size,
1222            stats: SoaScanStats::default(),
1223        }
1224    }
1225
1226    /// Fetch next batch with visibility filtering
1227    fn fetch_batch(&mut self) -> bool {
1228        self.batch.clear();
1229        self.pos = 0;
1230
1231        // Fill batch from source (SoA format)
1232        if !self.source.fill_batch(&mut self.batch) {
1233            return false;
1234        }
1235
1236        self.stats.rows_scanned += self.batch.len();
1237        self.stats.batches_processed += 1;
1238
1239        // SIMD visibility filtering on contiguous arrays
1240        self.batch.filter_visibility(self.snapshot_ts, self.current_txn_id);
1241        self.stats.rows_visible += self.batch.visible_count();
1242
1243        true
1244    }
1245
1246    /// Get scan statistics
1247    pub fn stats(&self) -> &SoaScanStats {
1248        &self.stats
1249    }
1250}
1251
1252impl<'a, S> Iterator for SoaScanIterator<'a, S>
1253where
1254    S: SoaSource<'a>,
1255{
1256    type Item = (&'a [u8], Option<&'a [u8]>);
1257
1258    fn next(&mut self) -> Option<Self::Item> {
1259        loop {
1260            // Need new batch?
1261            while self.pos >= self.batch.selection.len() {
1262                if !self.fetch_batch() {
1263                    return None;
1264                }
1265            }
1266
1267            // Get next visible row (late materialization)
1268            let sel_idx = self.pos;
1269            self.pos += 1;
1270            let row_idx = self.batch.selection[sel_idx];
1271
1272            let key = self.batch.keys[row_idx];
1273            let value = self.batch.value_handles[row_idx].and_then(|h| h.materialize());
1274            self.stats.values_materialized += 1;
1275
1276            return Some((key, value));
1277        }
1278    }
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283    use super::*;
1284
1285    #[test]
1286    fn test_column_vector_int64() {
1287        let mut v = ColumnVector::Int64(vec![1, 2, 3, 4, 5]);
1288        assert_eq!(v.len(), 5);
1289        assert_eq!(v.sum_i64(), Some(15));
1290    }
1291
1292    #[test]
1293    fn test_column_vector_float64() {
1294        let v = ColumnVector::Float64(vec![1.0, 2.0, 3.0, 4.0]);
1295        assert_eq!(v.len(), 4);
1296        assert_eq!(v.sum_f64(), Some(10.0));
1297    }
1298
1299    #[test]
1300    fn test_vector_batch() {
1301        let mut batch = VectorBatch::with_capacity(1024);
1302        batch.add_column("id", ColumnVector::Int64(vec![1, 2, 3]));
1303        batch.add_column("value", ColumnVector::Float64(vec![1.5, 2.5, 3.5]));
1304        
1305        assert_eq!(batch.row_count(), 3);
1306        assert_eq!(batch.column_count(), 2);
1307        assert!(batch.column("id").is_some());
1308    }
1309
1310    #[test]
1311    fn test_int64_comparison() {
1312        let col = ColumnVector::Int64(vec![1, 5, 10, 15, 20]);
1313        let pred = Int64Comparison::gt("test", 10);
1314        let result = pred.evaluate(&col);
1315        
1316        assert_eq!(result, vec![false, false, false, true, true]);
1317    }
1318
1319    #[test]
1320    fn test_simd_sum_i64_large() {
1321        // Test with enough elements to trigger SIMD path
1322        let values: Vec<i64> = (0..1000).collect();
1323        let expected: i64 = (0..1000).sum();
1324        
1325        let col = ColumnVector::Int64(values);
1326        assert_eq!(col.sum_i64(), Some(expected));
1327    }
1328
1329    #[test]
1330    fn test_simd_compare_gt() {
1331        let values: Vec<i64> = vec![1, 5, 10, 15, 20, 25, 30, 35];
1332        let result = simd_compare_i64_gt(&values, 12);
1333        assert_eq!(result, vec![false, false, false, true, true, true, true, true]);
1334    }
1335
1336    #[test]
1337    fn test_vectorized_scan_config() {
1338        let config = VectorizedScanConfig::new()
1339            .with_batch_size(2048)
1340            .with_prefetch(true);
1341        
1342        assert_eq!(config.batch_size, 2048);
1343        assert!(config.prefetch_enabled);
1344    }
1345
1346    #[test]
1347    fn test_simd_visibility_filter_basic() {
1348        // commit_ts: 0 = uncommitted, others = commit time
1349        let commit_ts = vec![0, 10, 20, 30, 40];
1350        let snapshot_ts = 25;
1351        
1352        let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1353        
1354        // Visible: ts != 0 AND ts < 25
1355        // ts=0: not visible (uncommitted)
1356        // ts=10: visible (10 < 25)
1357        // ts=20: visible (20 < 25)
1358        // ts=30: not visible (30 >= 25)
1359        // ts=40: not visible (40 >= 25)
1360        assert_eq!(result, vec![false, true, true, false, false]);
1361    }
1362
1363    #[test]
1364    fn test_simd_visibility_filter_with_txn() {
1365        let commit_ts = vec![0, 10, 0, 30, 40];
1366        let txn_ids = vec![1, 2, 1, 4, 5];  // txn_id 1 appears twice
1367        let snapshot_ts = 25;
1368        let current_txn_id = 1;
1369        
1370        let mut result = vec![false; 5];
1371        SimdVisibilityFilter::filter_batch_with_txn(
1372            &commit_ts,
1373            &txn_ids,
1374            snapshot_ts,
1375            current_txn_id,
1376            &mut result,
1377        );
1378        
1379        // Visible:
1380        // [0]: uncommitted but own txn -> visible
1381        // [1]: committed at 10 < 25 -> visible
1382        // [2]: uncommitted but own txn -> visible
1383        // [3]: committed at 30 >= 25 -> not visible
1384        // [4]: committed at 40 >= 25 -> not visible
1385        assert_eq!(result, vec![true, true, true, false, false]);
1386    }
1387
1388    #[test]
1389    fn test_simd_visibility_filter_large() {
1390        // Test with enough elements to trigger SIMD path
1391        let n = 1000;
1392        let commit_ts: Vec<u64> = (1..=n as u64).collect();
1393        let snapshot_ts = 500;
1394        
1395        let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1396        
1397        // First 499 should be visible (1..500 < 500)
1398        let visible_count = result.iter().filter(|&&v| v).count();
1399        assert_eq!(visible_count, 499);
1400    }
1401
1402    #[test]
1403    fn test_versioned_slice_visibility() {
1404        let slice = VersionedSlice {
1405            key: b"test",
1406            value: Some(b"value"),
1407            commit_ts: 100,
1408            txn_id: 1,
1409        };
1410        
1411        assert!(slice.is_visible(200, None));
1412        assert!(!slice.is_visible(50, None));
1413        assert!(slice.is_visible(50, Some(1))); // Self-visibility
1414    }
1415
1416    #[test]
1417    fn test_streaming_scan_iterator() {
1418        let entries: Vec<VersionedSlice<'static>> = vec![
1419            VersionedSlice { key: b"a", value: Some(b"1"), commit_ts: 10, txn_id: 1 },
1420            VersionedSlice { key: b"b", value: Some(b"2"), commit_ts: 0, txn_id: 2 },  // Uncommitted
1421            VersionedSlice { key: b"c", value: Some(b"3"), commit_ts: 30, txn_id: 3 },  // After snapshot
1422            VersionedSlice { key: b"d", value: Some(b"4"), commit_ts: 15, txn_id: 4 },
1423        ];
1424        
1425        let iter = StreamingScanIterator::new(entries.into_iter(), 25, None);
1426        let visible: Vec<_> = iter.collect();
1427        
1428        // Only entries with commit_ts 10 and 15 should be visible
1429        assert_eq!(visible.len(), 2);
1430        assert_eq!(visible[0].key, b"a");
1431        assert_eq!(visible[1].key, b"d");
1432    }
1433
1434    #[test]
1435    fn test_soa_batch_basic() {
1436        let mut batch = SoaBatch::with_capacity(100);
1437        
1438        batch.push(b"key1", Some(b"value1"), 10, 1);
1439        batch.push(b"key2", Some(b"value2"), 20, 2);
1440        batch.push(b"key3", None, 30, 3);  // Tombstone
1441        batch.push(b"key4", Some(b"value4"), 0, 4);  // Uncommitted
1442        
1443        assert_eq!(batch.len(), 4);
1444        assert_eq!(batch.commit_ts, vec![10, 20, 30, 0]);
1445        assert_eq!(batch.txn_ids, vec![1, 2, 3, 4]);
1446    }
1447
1448    #[test]
1449    fn test_soa_batch_visibility_filter() {
1450        let mut batch = SoaBatch::with_capacity(100);
1451        
1452        batch.push(b"k1", Some(b"v1"), 10, 1);  // Visible (10 < 25)
1453        batch.push(b"k2", Some(b"v2"), 0, 2);   // Not visible (uncommitted)
1454        batch.push(b"k3", Some(b"v3"), 20, 3);  // Visible (20 < 25)
1455        batch.push(b"k4", Some(b"v4"), 30, 4);  // Not visible (30 >= 25)
1456        batch.push(b"k5", Some(b"v5"), 0, 5);   // Not visible (uncommitted)
1457        
1458        batch.filter_visibility(25, None);
1459        
1460        assert_eq!(batch.visibility, vec![true, false, true, false, false]);
1461        assert_eq!(batch.selection, vec![0, 2]);  // Indices of visible rows
1462        assert_eq!(batch.visible_count(), 2);
1463    }
1464
1465    #[test]
1466    fn test_soa_batch_self_visibility() {
1467        let mut batch = SoaBatch::with_capacity(100);
1468        
1469        batch.push(b"k1", Some(b"v1"), 0, 42);  // Own uncommitted -> visible
1470        batch.push(b"k2", Some(b"v2"), 10, 1);  // Committed -> visible
1471        batch.push(b"k3", Some(b"v3"), 0, 99);  // Other's uncommitted -> not visible
1472        
1473        batch.filter_visibility(25, Some(42));
1474        
1475        assert_eq!(batch.visibility, vec![true, true, false]);
1476        assert_eq!(batch.selection, vec![0, 1]);
1477    }
1478
1479    #[test]
1480    fn test_soa_batch_late_materialization() {
1481        let mut batch = SoaBatch::with_capacity(100);
1482        
1483        batch.push(b"key1", Some(b"val1"), 10, 1);
1484        batch.push(b"key2", Some(b"val2"), 0, 2);   // Filtered out
1485        batch.push(b"key3", Some(b"val3"), 15, 3);
1486        
1487        batch.filter_visibility(25, None);
1488        
1489        // Iterate visible - values materialized only now
1490        let visible: Vec<_> = batch.iter_visible().collect();
1491        
1492        assert_eq!(visible.len(), 2);
1493        assert_eq!(visible[0], (b"key1".as_slice(), Some(b"val1".as_slice())));
1494        assert_eq!(visible[1], (b"key3".as_slice(), Some(b"val3".as_slice())));
1495    }
1496
1497    #[test]
1498    fn test_soa_scan_stats() {
1499        let mut batch = SoaBatch::with_capacity(100);
1500        
1501        // 10 rows, 3 visible
1502        for i in 0..10u64 {
1503            let ts = if i < 3 { 10 } else { 0 };  // First 3 committed, rest uncommitted
1504            batch.push(b"key", Some(b"val"), ts, i);
1505        }
1506        
1507        batch.filter_visibility(25, None);
1508        
1509        let selectivity = batch.visible_count() as f64 / batch.len() as f64;
1510        assert!((selectivity - 0.3).abs() < 0.01);  // 30% selectivity
1511    }
1512
1513    #[test]
1514    fn test_soa_batch_simd_large() {
1515        // Test with enough entries to trigger SIMD paths
1516        let mut batch = SoaBatch::with_capacity(2000);
1517        
1518        for i in 0..1000u64 {
1519            // Alternating visible/not visible
1520            let ts = if i % 2 == 0 { 10 } else { 50 };
1521            batch.push(b"k", Some(b"v"), ts, i);
1522        }
1523        
1524        batch.filter_visibility(25, None);
1525        
1526        // Should have 500 visible (even indices with ts=10)
1527        assert_eq!(batch.visible_count(), 500);
1528        
1529        // Verify selection indices are correct
1530        for (i, &idx) in batch.selection.iter().enumerate() {
1531            assert_eq!(idx, i * 2);  // 0, 2, 4, 6, ...
1532        }
1533    }
1534}