sochdb_storage/
vectorized_scan.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SIMD-Accelerated Vectorized Scan Engine (Recommendation 2)
16//!
17//! ## Problem
18//!
19//! Current scan implementation is row-at-a-time:
20//! ```ignore
21//! for entry in self.data.iter() {  // DashMap iteration - pointer chasing
22//!     if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
23//!         && let Some(value) = &v.value  // Option unwrapping
24//!     {
25//!         results.push((key.clone(), value.clone())); // Heap allocs
26//!     }
27//! }
28//! ```
29//!
30//! This has:
31//! - DashMap iterator overhead (~20ns per entry)
32//! - MVCC version chain traversal per row
33//! - Clone allocations in hot path
34//!
35//! ## Solution
36//!
37//! Vectorized execution: process 1024+ rows in a batch, amortizing overhead.
38//!
39//! ## Performance Analysis
40//!
41//! Vectorized execution model:
42//! - **Batch size B = 1024 rows**
43//! - **Per-batch overhead**: O(1) iterator setup + O(B) SIMD operations
44//! - **Amortized cost**: `(setup + B×simd_op) / B ≈ simd_op` for large B
45//!
46//! SIMD comparison (AVX-512):
47//! ```text
48//! Scalar: 100 cycles × 1000 rows = 100,000 cycles
49//! SIMD: 100 cycles × (1000/16) = 6,250 cycles (16x speedup)
50//! ```
51//!
52//! Memory bandwidth calculation:
53//! - Current: 300 bytes/row × 1M rows = 300 MB, random access
54//! - Proposed: 60 bytes/row × 1M rows = 60 MB, sequential scan
55//! - RAM bandwidth: ~50 GB/s → theoretical 833M rows/sec
56//!
57//! ## Expected Improvement
58//!
59//! 10-20x scan throughput improvement
60
61use std::sync::atomic::{AtomicUsize, Ordering};
62
63/// Default batch size for vectorized operations
64/// 1024 rows fits comfortably in L2 cache (~256KB with 256-byte rows)
65pub const DEFAULT_BATCH_SIZE: usize = 1024;
66
67/// Minimum batch size (below this, scalar is faster)
68pub const MIN_BATCH_SIZE: usize = 64;
69
70/// Maximum batch size (above this, memory pressure increases)
71pub const MAX_BATCH_SIZE: usize = 8192;
72
73// =============================================================================
74// Column Vectors for SIMD Operations
75// =============================================================================
76
77/// Typed column vector for SIMD-friendly access
78/// 
79/// Each variant stores contiguous values for vectorized operations.
80#[derive(Debug, Clone)]
81pub enum ColumnVector {
82    /// Boolean column (bit-packed for SIMD)
83    Bool(Vec<bool>),
84    /// 64-bit signed integers (SIMD-friendly)
85    Int64(Vec<i64>),
86    /// 64-bit unsigned integers
87    UInt64(Vec<u64>),
88    /// 64-bit floats
89    Float64(Vec<f64>),
90    /// Variable-length strings (Arrow-style: offsets + data)
91    String {
92        offsets: Vec<u32>,
93        data: Vec<u8>,
94    },
95    /// Binary data (Arrow-style: offsets + data)
96    Binary {
97        offsets: Vec<u32>,
98        data: Vec<u8>,
99    },
100    /// Null bitmap for any column (packed bits)
101    Null(Vec<u64>),
102}
103
104impl ColumnVector {
105    /// Create empty column vector of specified type
106    pub fn new_int64(capacity: usize) -> Self {
107        ColumnVector::Int64(Vec::with_capacity(capacity))
108    }
109
110    pub fn new_float64(capacity: usize) -> Self {
111        ColumnVector::Float64(Vec::with_capacity(capacity))
112    }
113
114    pub fn new_string(capacity: usize) -> Self {
115        ColumnVector::String {
116            offsets: Vec::with_capacity(capacity + 1),
117            data: Vec::with_capacity(capacity * 32), // Assume avg 32 bytes per string
118        }
119    }
120
121    /// Get length of vector
122    pub fn len(&self) -> usize {
123        match self {
124            ColumnVector::Bool(v) => v.len(),
125            ColumnVector::Int64(v) => v.len(),
126            ColumnVector::UInt64(v) => v.len(),
127            ColumnVector::Float64(v) => v.len(),
128            ColumnVector::String { offsets, .. } => offsets.len().saturating_sub(1),
129            ColumnVector::Binary { offsets, .. } => offsets.len().saturating_sub(1),
130            ColumnVector::Null(v) => v.len() * 64,
131        }
132    }
133
134    /// Check if empty
135    pub fn is_empty(&self) -> bool {
136        self.len() == 0
137    }
138
139    /// Get memory size in bytes
140    pub fn memory_size(&self) -> usize {
141        match self {
142            ColumnVector::Bool(v) => v.len(),
143            ColumnVector::Int64(v) => v.len() * 8,
144            ColumnVector::UInt64(v) => v.len() * 8,
145            ColumnVector::Float64(v) => v.len() * 8,
146            ColumnVector::String { offsets, data } => offsets.len() * 4 + data.len(),
147            ColumnVector::Binary { offsets, data } => offsets.len() * 4 + data.len(),
148            ColumnVector::Null(v) => v.len() * 8,
149        }
150    }
151
152    /// Sum for Int64 column (SIMD-accelerated)
153    #[cfg(target_arch = "x86_64")]
154    pub fn sum_i64(&self) -> Option<i64> {
155        match self {
156            ColumnVector::Int64(values) => {
157                if values.is_empty() {
158                    return Some(0);
159                }
160                
161                // Use SIMD for large vectors
162                if values.len() >= 16 {
163                    Some(simd_sum_i64(values))
164                } else {
165                    Some(values.iter().sum())
166                }
167            }
168            _ => None,
169        }
170    }
171
172    #[cfg(not(target_arch = "x86_64"))]
173    pub fn sum_i64(&self) -> Option<i64> {
174        match self {
175            ColumnVector::Int64(values) => Some(values.iter().sum()),
176            _ => None,
177        }
178    }
179
180    /// Sum for Float64 column (SIMD-accelerated)
181    #[cfg(target_arch = "x86_64")]
182    pub fn sum_f64(&self) -> Option<f64> {
183        match self {
184            ColumnVector::Float64(values) => {
185                if values.is_empty() {
186                    return Some(0.0);
187                }
188                
189                if values.len() >= 8 {
190                    Some(simd_sum_f64(values))
191                } else {
192                    Some(values.iter().sum())
193                }
194            }
195            _ => None,
196        }
197    }
198
199    #[cfg(not(target_arch = "x86_64"))]
200    pub fn sum_f64(&self) -> Option<f64> {
201        match self {
202            ColumnVector::Float64(values) => Some(values.iter().sum()),
203            _ => None,
204        }
205    }
206}
207
208// =============================================================================
209// SIMD Implementations (x86_64 with AVX2)
210// =============================================================================
211
212/// SIMD sum for i64 values using AVX2 (4 × i64 per vector)
213#[cfg(target_arch = "x86_64")]
214fn simd_sum_i64(values: &[i64]) -> i64 {
215    #[cfg(target_feature = "avx2")]
216    {
217        use std::arch::x86_64::*;
218        
219        unsafe {
220            let mut sum = _mm256_setzero_si256();
221            let chunks = values.len() / 4;
222            let ptr = values.as_ptr();
223            
224            for i in 0..chunks {
225                let v = _mm256_loadu_si256(ptr.add(i * 4) as *const __m256i);
226                sum = _mm256_add_epi64(sum, v);
227            }
228            
229            // Horizontal add
230            let arr: [i64; 4] = std::mem::transmute(sum);
231            let simd_total: i64 = arr.iter().sum();
232            
233            // Add remaining elements
234            let remaining: i64 = values[chunks * 4..].iter().sum();
235            simd_total + remaining
236        }
237    }
238    
239    #[cfg(not(target_feature = "avx2"))]
240    {
241        values.iter().sum()
242    }
243}
244
245/// SIMD sum for f64 values using AVX (4 × f64 per vector)
246#[cfg(target_arch = "x86_64")]
247fn simd_sum_f64(values: &[f64]) -> f64 {
248    #[cfg(target_feature = "avx")]
249    {
250        use std::arch::x86_64::*;
251        
252        unsafe {
253            let mut sum = _mm256_setzero_pd();
254            let chunks = values.len() / 4;
255            let ptr = values.as_ptr();
256            
257            for i in 0..chunks {
258                let v = _mm256_loadu_pd(ptr.add(i * 4));
259                sum = _mm256_add_pd(sum, v);
260            }
261            
262            // Horizontal add
263            let arr: [f64; 4] = std::mem::transmute(sum);
264            let simd_total: f64 = arr.iter().sum();
265            
266            // Add remaining elements
267            let remaining: f64 = values[chunks * 4..].iter().sum();
268            simd_total + remaining
269        }
270    }
271    
272    #[cfg(not(target_feature = "avx"))]
273    {
274        values.iter().sum()
275    }
276}
277
278// =============================================================================
279// Vectorized Batch for Processing
280// =============================================================================
281
282/// A batch of rows for vectorized processing
283/// 
284/// Instead of processing one row at a time, we accumulate rows into
285/// batches and process them together for better cache utilization
286/// and SIMD opportunities.
287#[derive(Debug)]
288pub struct VectorBatch {
289    /// Column data in columnar format
290    columns: Vec<(String, ColumnVector)>,
291    /// Row count in this batch
292    row_count: usize,
293    /// Capacity (pre-allocated)
294    capacity: usize,
295    /// Selection vector (indexes of selected rows after filtering)
296    selection: Option<Vec<usize>>,
297}
298
299impl VectorBatch {
300    /// Create a new batch with specified capacity
301    pub fn with_capacity(capacity: usize) -> Self {
302        Self {
303            columns: Vec::new(),
304            row_count: 0,
305            capacity,
306            selection: None,
307        }
308    }
309
310    /// Create batch with default size
311    pub fn new() -> Self {
312        Self::with_capacity(DEFAULT_BATCH_SIZE)
313    }
314
315    /// Get batch capacity
316    pub fn capacity(&self) -> usize {
317        self.capacity
318    }
319
320    /// Get current row count
321    pub fn row_count(&self) -> usize {
322        if let Some(ref sel) = self.selection {
323            sel.len()
324        } else {
325            self.row_count
326        }
327    }
328
329    /// Check if batch is full
330    pub fn is_full(&self) -> bool {
331        self.row_count >= self.capacity
332    }
333
334    /// Check if batch is empty
335    pub fn is_empty(&self) -> bool {
336        self.row_count == 0
337    }
338
339    /// Add a column to the batch
340    pub fn add_column(&mut self, name: impl Into<String>, column: ColumnVector) {
341        self.columns.push((name.into(), column));
342        if self.row_count == 0 {
343            self.row_count = self.columns.last().map(|(_, c)| c.len()).unwrap_or(0);
344        }
345    }
346
347    /// Get column by name
348    pub fn column(&self, name: &str) -> Option<&ColumnVector> {
349        self.columns
350            .iter()
351            .find(|(n, _)| n == name)
352            .map(|(_, c)| c)
353    }
354
355    /// Get column by index
356    pub fn column_at(&self, idx: usize) -> Option<&ColumnVector> {
357        self.columns.get(idx).map(|(_, c)| c)
358    }
359
360    /// Get column count
361    pub fn column_count(&self) -> usize {
362        self.columns.len()
363    }
364
365    /// Set selection vector (for filtering)
366    pub fn set_selection(&mut self, selection: Vec<usize>) {
367        self.selection = Some(selection);
368    }
369
370    /// Clear selection vector
371    pub fn clear_selection(&mut self) {
372        self.selection = None;
373    }
374
375    /// Get total memory size
376    pub fn memory_size(&self) -> usize {
377        self.columns.iter().map(|(_, c)| c.memory_size()).sum()
378    }
379
380    /// Reset batch for reuse
381    pub fn reset(&mut self) {
382        self.columns.clear();
383        self.row_count = 0;
384        self.selection = None;
385    }
386}
387
388impl Default for VectorBatch {
389    fn default() -> Self {
390        Self::new()
391    }
392}
393
394// =============================================================================
395// Vectorized Scan Engine
396// =============================================================================
397
398/// Statistics for vectorized scan operations
399#[derive(Debug, Default)]
400pub struct VectorizedScanStats {
401    /// Total rows scanned
402    pub rows_scanned: AtomicUsize,
403    /// Batches processed
404    pub batches_processed: AtomicUsize,
405    /// Rows passing filter
406    pub rows_passed: AtomicUsize,
407    /// Bytes read from storage
408    pub bytes_read: AtomicUsize,
409}
410
411impl VectorizedScanStats {
412    pub fn new() -> Self {
413        Self::default()
414    }
415
416    pub fn record_batch(&self, rows: usize, passed: usize, bytes: usize) {
417        self.rows_scanned.fetch_add(rows, Ordering::Relaxed);
418        self.batches_processed.fetch_add(1, Ordering::Relaxed);
419        self.rows_passed.fetch_add(passed, Ordering::Relaxed);
420        self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
421    }
422
423    pub fn rows_scanned(&self) -> usize {
424        self.rows_scanned.load(Ordering::Relaxed)
425    }
426
427    pub fn batches_processed(&self) -> usize {
428        self.batches_processed.load(Ordering::Relaxed)
429    }
430}
431
432/// Predicate for vectorized filtering
433pub trait VectorPredicate: Send + Sync {
434    /// Apply predicate to a column vector, returning selection bitmap
435    fn evaluate(&self, column: &ColumnVector) -> Vec<bool>;
436    
437    /// Get the column name this predicate operates on
438    fn column_name(&self) -> &str;
439}
440
441/// Comparison predicate for i64 columns
442#[derive(Debug, Clone)]
443pub struct Int64Comparison {
444    column_name: String,
445    op: ComparisonOp,
446    value: i64,
447}
448
449/// Comparison operators
450#[derive(Debug, Clone, Copy)]
451pub enum ComparisonOp {
452    Equal,
453    NotEqual,
454    LessThan,
455    LessEqual,
456    GreaterThan,
457    GreaterEqual,
458}
459
460impl Int64Comparison {
461    pub fn new(column_name: impl Into<String>, op: ComparisonOp, value: i64) -> Self {
462        Self {
463            column_name: column_name.into(),
464            op,
465            value,
466        }
467    }
468
469    pub fn eq(column_name: impl Into<String>, value: i64) -> Self {
470        Self::new(column_name, ComparisonOp::Equal, value)
471    }
472
473    pub fn gt(column_name: impl Into<String>, value: i64) -> Self {
474        Self::new(column_name, ComparisonOp::GreaterThan, value)
475    }
476
477    pub fn lt(column_name: impl Into<String>, value: i64) -> Self {
478        Self::new(column_name, ComparisonOp::LessThan, value)
479    }
480}
481
482impl VectorPredicate for Int64Comparison {
483    fn evaluate(&self, column: &ColumnVector) -> Vec<bool> {
484        match column {
485            ColumnVector::Int64(values) => {
486                let cmp_value = self.value;
487                match self.op {
488                    ComparisonOp::Equal => values.iter().map(|&v| v == cmp_value).collect(),
489                    ComparisonOp::NotEqual => values.iter().map(|&v| v != cmp_value).collect(),
490                    ComparisonOp::LessThan => values.iter().map(|&v| v < cmp_value).collect(),
491                    ComparisonOp::LessEqual => values.iter().map(|&v| v <= cmp_value).collect(),
492                    ComparisonOp::GreaterThan => values.iter().map(|&v| v > cmp_value).collect(),
493                    ComparisonOp::GreaterEqual => values.iter().map(|&v| v >= cmp_value).collect(),
494                }
495            }
496            _ => vec![false; column.len()],
497        }
498    }
499
500    fn column_name(&self) -> &str {
501        &self.column_name
502    }
503}
504
505/// Apply SIMD-optimized comparison for i64 values
506#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
507pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
508    use std::arch::x86_64::*;
509    
510    let mut result = vec![false; values.len()];
511    let chunks = values.len() / 4;
512    
513    unsafe {
514        let threshold_vec = _mm256_set1_epi64x(threshold);
515        
516        for i in 0..chunks {
517            let v = _mm256_loadu_si256(values.as_ptr().add(i * 4) as *const __m256i);
518            let cmp = _mm256_cmpgt_epi64(v, threshold_vec);
519            let mask = _mm256_movemask_epi8(cmp) as u32;
520            
521            // Each i64 occupies 8 bytes, so bits 0-7, 8-15, 16-23, 24-31
522            result[i * 4] = (mask & 0xFF) != 0;
523            result[i * 4 + 1] = (mask & 0xFF00) != 0;
524            result[i * 4 + 2] = (mask & 0xFF0000) != 0;
525            result[i * 4 + 3] = (mask & 0xFF000000) != 0;
526        }
527        
528        // Handle remaining
529        for i in (chunks * 4)..values.len() {
530            result[i] = values[i] > threshold;
531        }
532    }
533    
534    result
535}
536
537#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
538pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
539    values.iter().map(|&v| v > threshold).collect()
540}
541
542// =============================================================================
543// Vectorized Scan Iterator
544// =============================================================================
545
546/// Configuration for vectorized scans
547#[derive(Debug, Clone)]
548pub struct VectorizedScanConfig {
549    /// Batch size for processing
550    pub batch_size: usize,
551    /// Enable prefetching
552    pub prefetch_enabled: bool,
553    /// Prefetch distance in rows
554    pub prefetch_distance: usize,
555    /// Enable SIMD acceleration
556    pub simd_enabled: bool,
557}
558
559impl Default for VectorizedScanConfig {
560    fn default() -> Self {
561        Self {
562            batch_size: DEFAULT_BATCH_SIZE,
563            prefetch_enabled: true,
564            prefetch_distance: 16,
565            simd_enabled: true,
566        }
567    }
568}
569
570impl VectorizedScanConfig {
571    pub fn new() -> Self {
572        Self::default()
573    }
574
575    pub fn with_batch_size(mut self, size: usize) -> Self {
576        self.batch_size = size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
577        self
578    }
579
580    pub fn with_prefetch(mut self, enabled: bool) -> Self {
581        self.prefetch_enabled = enabled;
582        self
583    }
584}
585
586// =============================================================================
587// SIMD Visibility Filtering (Task 4: Zero-Copy SIMD Scans)
588// =============================================================================
589
590/// SIMD-accelerated MVCC visibility filter
591///
592/// ## Performance Analysis
593///
594/// For a snapshot timestamp `S`, a row with commit_ts `C` is visible if:
595/// - `C != 0` (committed) AND `C < S` (committed before snapshot)
596///
597/// This can be vectorized by processing 4 timestamps at a time (AVX2):
598/// ```text
599/// visible[i] = (commit_ts[i] != 0) & (commit_ts[i] < snapshot_ts)
600/// ```
601///
602/// Throughput improvement:
603/// - Scalar: ~1 billion comparisons/sec
604/// - AVX2 (4-way): ~4 billion comparisons/sec  
605/// - AVX-512 (8-way): ~8 billion comparisons/sec
606///
607/// For 1M rows: scalar = 1ms, SIMD = 125-250µs (4-8x speedup)
608pub struct SimdVisibilityFilter;
609
610impl SimdVisibilityFilter {
611    /// Filter a batch of commit timestamps for visibility
612    ///
613    /// Returns a bitmask where 1 = visible, 0 = not visible
614    #[inline]
615    pub fn filter_batch(commit_ts: &[u64], snapshot_ts: u64) -> Vec<bool> {
616        let mut result = vec![false; commit_ts.len()];
617        Self::filter_batch_into(commit_ts, snapshot_ts, &mut result);
618        result
619    }
620
621    /// Filter into an existing buffer to avoid allocation
622    #[inline]
623    pub fn filter_batch_into(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
624        assert_eq!(commit_ts.len(), out.len());
625
626        #[cfg(target_arch = "x86_64")]
627        {
628            Self::filter_batch_simd_x86(commit_ts, snapshot_ts, out);
629            return;
630        }
631
632        #[cfg(target_arch = "aarch64")]
633        {
634            Self::filter_batch_simd_neon(commit_ts, snapshot_ts, out);
635            return;
636        }
637
638        #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
639        {
640            Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
641        }
642    }
643
644    /// Scalar fallback implementation
645    #[inline]
646    fn filter_batch_scalar(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
647        for (i, &ts) in commit_ts.iter().enumerate() {
648            // Visible if: committed (ts != 0) AND committed before snapshot (ts < snapshot_ts)
649            out[i] = ts != 0 && ts < snapshot_ts;
650        }
651    }
652
653    /// x86_64 AVX2 implementation (4 u64s per iteration)
654    #[cfg(target_arch = "x86_64")]
655    fn filter_batch_simd_x86(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
656        let n = commit_ts.len();
657        if n == 0 {
658            return;
659        }
660
661        // Process in chunks of 4 (AVX2 processes 4 × u64)
662        let chunks = n / 4;
663        let remainder = n % 4;
664
665        // Use wide comparison on x86_64
666        // Note: AVX2 doesn't have native u64 comparison, so we use signed comparison
667        // which works for our use case (timestamps are positive)
668        #[cfg(target_feature = "avx2")]
669        unsafe {
670            use std::arch::x86_64::*;
671
672            let zero = _mm256_setzero_si256();
673            let snapshot_vec = _mm256_set1_epi64x(snapshot_ts as i64);
674
675            for chunk in 0..chunks {
676                let ptr = commit_ts.as_ptr().add(chunk * 4) as *const __m256i;
677                let ts_vec = _mm256_loadu_si256(ptr);
678
679                // Check ts != 0 using PCMPEQ and inverting
680                let not_zero = _mm256_xor_si256(
681                    _mm256_cmpeq_epi64(ts_vec, zero),
682                    _mm256_set1_epi64x(-1), // All 1s
683                );
684
685                // Check ts < snapshot using PCMPGT and inverting
686                // (a < b) == !(a >= b) == !(a > b || a == b)
687                let less_than = _mm256_xor_si256(
688                    _mm256_or_si256(
689                        _mm256_cmpgt_epi64(ts_vec, snapshot_vec),
690                        _mm256_cmpeq_epi64(ts_vec, snapshot_vec),
691                    ),
692                    _mm256_set1_epi64x(-1),
693                );
694
695                // Combine: visible = not_zero AND less_than
696                let visible = _mm256_and_si256(not_zero, less_than);
697
698                // Extract results - each 64-bit lane is either all 1s or all 0s
699                let mask: [i64; 4] = std::mem::transmute(visible);
700                for j in 0..4 {
701                    out[chunk * 4 + j] = mask[j] != 0;
702                }
703            }
704        }
705
706        #[cfg(not(target_feature = "avx2"))]
707        {
708            // SSE2 fallback (2 × u64 per iteration)
709            let chunks = n / 2;
710            for chunk in 0..chunks {
711                let base = chunk * 2;
712                for j in 0..2 {
713                    let ts = commit_ts[base + j];
714                    out[base + j] = ts != 0 && ts < snapshot_ts;
715                }
716            }
717        }
718
719        // Handle remainder
720        let base = chunks * 4;
721        for i in 0..remainder {
722            let ts = commit_ts[base + i];
723            out[base + i] = ts != 0 && ts < snapshot_ts;
724        }
725    }
726
727    /// ARM NEON implementation (2 u64s per iteration)
728    #[cfg(target_arch = "aarch64")]
729    fn filter_batch_simd_neon(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
730        // NEON doesn't have all the u64 operations we need, so use scalar on ARM
731        // In a production system, we'd use assembly or wait for better NEON intrinsics
732        Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
733    }
734
735    /// Filter with transaction ID for self-visibility
736    ///
737    /// A row is visible if:
738    /// - (commit_ts != 0 AND commit_ts < snapshot_ts), OR
739    /// - txn_id == current_txn_id (own uncommitted writes)
740    #[inline]
741    pub fn filter_batch_with_txn(
742        commit_ts: &[u64],
743        txn_ids: &[u64],
744        snapshot_ts: u64,
745        current_txn_id: u64,
746        out: &mut [bool],
747    ) {
748        assert_eq!(commit_ts.len(), txn_ids.len());
749        assert_eq!(commit_ts.len(), out.len());
750
751        // First pass: standard visibility
752        Self::filter_batch_into(commit_ts, snapshot_ts, out);
753
754        // Second pass: add self-visibility
755        for (i, &txn_id) in txn_ids.iter().enumerate() {
756            if txn_id == current_txn_id {
757                out[i] = true;
758            }
759        }
760    }
761
762    /// Count visible rows without allocating a result vector
763    #[inline]
764    pub fn count_visible(commit_ts: &[u64], snapshot_ts: u64) -> usize {
765        let mut count = 0;
766        for &ts in commit_ts {
767            if ts != 0 && ts < snapshot_ts {
768                count += 1;
769            }
770        }
771        count
772    }
773}
774
775/// Versioned slice for zero-copy access
776///
777/// Holds a reference to data along with visibility metadata,
778/// avoiding copies during scan iteration.
779#[derive(Debug, Clone)]
780pub struct VersionedSlice<'a> {
781    /// Key bytes (zero-copy reference)
782    pub key: &'a [u8],
783    /// Value bytes (zero-copy reference, None = tombstone)
784    pub value: Option<&'a [u8]>,
785    /// Commit timestamp (0 = uncommitted)
786    pub commit_ts: u64,
787    /// Transaction ID that wrote this version
788    pub txn_id: u64,
789}
790
791impl<'a> VersionedSlice<'a> {
792    /// Check visibility at a snapshot
793    #[inline]
794    pub fn is_visible(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
795        // Self-visibility
796        if let Some(my_txn) = current_txn_id {
797            if self.txn_id == my_txn {
798                return true;
799            }
800        }
801        // Standard visibility: committed before snapshot
802        self.commit_ts != 0 && self.commit_ts < snapshot_ts
803    }
804}
805
806/// Streaming scan iterator with SIMD visibility filtering
807///
808/// ## Design
809///
810/// Instead of materializing all results upfront, this iterator:
811/// 1. Fetches batches of entries from the underlying source
812/// 2. Applies SIMD visibility filtering to the batch
813/// 3. Yields visible entries one at a time
814///
815/// This reduces memory allocation and leverages SIMD for batch filtering.
816pub struct StreamingScanIterator<'a, I>
817where
818    I: Iterator<Item = VersionedSlice<'a>>,
819{
820    /// Underlying iterator
821    source: I,
822    /// Current batch of entries
823    batch: Vec<VersionedSlice<'a>>,
824    /// Visibility mask for current batch
825    visibility: Vec<bool>,
826    /// Current position in batch
827    pos: usize,
828    /// Snapshot timestamp for visibility
829    snapshot_ts: u64,
830    /// Current transaction ID for self-visibility
831    current_txn_id: Option<u64>,
832    /// Batch size for prefetching
833    batch_size: usize,
834}
835
836impl<'a, I> StreamingScanIterator<'a, I>
837where
838    I: Iterator<Item = VersionedSlice<'a>>,
839{
840    /// Create a new streaming scan iterator
841    pub fn new(source: I, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
842        Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
843    }
844
845    /// Create with custom batch size
846    pub fn with_batch_size(
847        source: I,
848        snapshot_ts: u64,
849        current_txn_id: Option<u64>,
850        batch_size: usize,
851    ) -> Self {
852        Self {
853            source,
854            batch: Vec::with_capacity(batch_size),
855            visibility: Vec::with_capacity(batch_size),
856            pos: 0,
857            snapshot_ts,
858            current_txn_id,
859            batch_size,
860        }
861    }
862
863    /// Fetch next batch and compute visibility
864    fn fetch_batch(&mut self) -> bool {
865        self.batch.clear();
866        self.visibility.clear();
867        self.pos = 0;
868
869        // Collect batch
870        for entry in self.source.by_ref().take(self.batch_size) {
871            self.batch.push(entry);
872        }
873
874        if self.batch.is_empty() {
875            return false;
876        }
877
878        // Extract commit timestamps for SIMD filtering
879        let commit_ts: Vec<u64> = self.batch.iter().map(|e| e.commit_ts).collect();
880        self.visibility.resize(self.batch.len(), false);
881
882        if let Some(txn_id) = self.current_txn_id {
883            let txn_ids: Vec<u64> = self.batch.iter().map(|e| e.txn_id).collect();
884            SimdVisibilityFilter::filter_batch_with_txn(
885                &commit_ts,
886                &txn_ids,
887                self.snapshot_ts,
888                txn_id,
889                &mut self.visibility,
890            );
891        } else {
892            SimdVisibilityFilter::filter_batch_into(&commit_ts, self.snapshot_ts, &mut self.visibility);
893        }
894
895        true
896    }
897}
898
899impl<'a, I> Iterator for StreamingScanIterator<'a, I>
900where
901    I: Iterator<Item = VersionedSlice<'a>>,
902{
903    type Item = VersionedSlice<'a>;
904
905    fn next(&mut self) -> Option<Self::Item> {
906        loop {
907            // Exhausted current batch?
908            while self.pos >= self.batch.len() {
909                if !self.fetch_batch() {
910                    return None;
911                }
912            }
913
914            // Find next visible entry in current batch
915            while self.pos < self.batch.len() {
916                let idx = self.pos;
917                self.pos += 1;
918
919                if self.visibility[idx] {
920                    return Some(self.batch[idx].clone());
921                }
922            }
923        }
924    }
925}
926
927// =============================================================================
928// SoA Batch + Late Materialization (80/20 Optimization)
929// =============================================================================
930
931/// Structure of Arrays (SoA) batch for optimal SIMD visibility filtering
932///
933/// ## Why SoA?
934///
935/// AoS (Array of Structures) layout:
936/// ```text
937/// [key, value, commit_ts, txn_id], [key, value, commit_ts, txn_id], ...
938/// ```
939/// - Poor cache utilization for visibility checks
940/// - SIMD loads scatter data across cache lines
941///
942/// SoA (Structure of Arrays) layout:
943/// ```text
944/// commit_ts: [ts1, ts2, ts3, ts4, ...]  // Contiguous for SIMD
945/// txn_ids:   [id1, id2, id3, id4, ...]  // Contiguous for SIMD
946/// keys:      [k1, k2, k3, k4, ...]      // Only accessed for visible rows
947/// values:    [v1, v2, v3, v4, ...]      // Late materialized
948/// ```
949///
950/// SIMD can process 4-8 timestamps per cycle with perfect cache utilization.
951///
952/// ## Late Materialization
953///
954/// Values are NOT copied into the batch. Instead, we store offsets/handles
955/// and only materialize values for rows that pass visibility filtering.
956///
957/// For scans where 90% of rows are filtered out, this saves ~90% of value copies.
958#[derive(Debug)]
959pub struct SoaBatch<'a> {
960    /// Contiguous commit timestamps for SIMD visibility filtering
961    pub commit_ts: Vec<u64>,
962    /// Contiguous transaction IDs for self-visibility checking
963    pub txn_ids: Vec<u64>,
964    /// Key references (zero-copy)
965    pub keys: Vec<&'a [u8]>,
966    /// Value materializer handles (late binding)
967    /// None = tombstone, Some = handle to materialize value
968    pub value_handles: Vec<Option<ValueHandle<'a>>>,
969    /// Pre-computed visibility mask (after SIMD filtering)
970    pub visibility: Vec<bool>,
971    /// Selection vector: indices of visible rows for late materialization
972    pub selection: Vec<usize>,
973}
974
975/// Handle for late value materialization
976///
977/// Instead of copying values upfront, we store a handle that can
978/// materialize the value on-demand when needed.
979#[derive(Debug, Clone, Copy)]
980pub enum ValueHandle<'a> {
981    /// Direct reference (zero-copy for inmemory data)
982    Direct(&'a [u8]),
983    /// Offset in a data block (for disk-resident data)
984    BlockOffset { block_id: u32, offset: u32, len: u32 },
985    /// Deferred load from arena
986    ArenaSlot { arena_id: u32, slot: u32 },
987}
988
989impl<'a> ValueHandle<'a> {
990    /// Materialize the value (called only for visible rows)
991    pub fn materialize(&self) -> Option<&'a [u8]> {
992        match self {
993            ValueHandle::Direct(data) => Some(*data),
994            // For block/arena handles, would call into storage layer
995            // For now, return None (would be implemented with storage context)
996            ValueHandle::BlockOffset { .. } => None,
997            ValueHandle::ArenaSlot { .. } => None,
998        }
999    }
1000}
1001
1002impl<'a> SoaBatch<'a> {
1003    /// Create a new SoA batch with capacity
1004    pub fn with_capacity(capacity: usize) -> Self {
1005        Self {
1006            commit_ts: Vec::with_capacity(capacity),
1007            txn_ids: Vec::with_capacity(capacity),
1008            keys: Vec::with_capacity(capacity),
1009            value_handles: Vec::with_capacity(capacity),
1010            visibility: Vec::with_capacity(capacity),
1011            selection: Vec::with_capacity(capacity),
1012        }
1013    }
1014
1015    /// Add an entry to the batch (SoA decomposition)
1016    #[inline]
1017    pub fn push(&mut self, key: &'a [u8], value: Option<&'a [u8]>, commit_ts: u64, txn_id: u64) {
1018        self.commit_ts.push(commit_ts);
1019        self.txn_ids.push(txn_id);
1020        self.keys.push(key);
1021        self.value_handles.push(value.map(ValueHandle::Direct));
1022    }
1023
1024    /// Add with block handle (for disk-resident values)
1025    #[inline]
1026    pub fn push_deferred(
1027        &mut self,
1028        key: &'a [u8],
1029        handle: Option<ValueHandle<'a>>,
1030        commit_ts: u64,
1031        txn_id: u64,
1032    ) {
1033        self.commit_ts.push(commit_ts);
1034        self.txn_ids.push(txn_id);
1035        self.keys.push(key);
1036        self.value_handles.push(handle);
1037    }
1038
1039    /// Get batch size
1040    pub fn len(&self) -> usize {
1041        self.commit_ts.len()
1042    }
1043
1044    /// Check if empty
1045    pub fn is_empty(&self) -> bool {
1046        self.commit_ts.is_empty()
1047    }
1048
1049    /// Clear the batch for reuse
1050    pub fn clear(&mut self) {
1051        self.commit_ts.clear();
1052        self.txn_ids.clear();
1053        self.keys.clear();
1054        self.value_handles.clear();
1055        self.visibility.clear();
1056        self.selection.clear();
1057    }
1058
1059    /// Apply SIMD visibility filtering and build selection vector
1060    ///
1061    /// This is the hot path - SIMD processes commit_ts array directly
1062    /// without touching keys/values until we know what's visible.
1063    pub fn filter_visibility(&mut self, snapshot_ts: u64, current_txn_id: Option<u64>) {
1064        let n = self.len();
1065        self.visibility.resize(n, false);
1066        self.selection.clear();
1067
1068        // SIMD filter on contiguous commit_ts array
1069        if let Some(txn_id) = current_txn_id {
1070            SimdVisibilityFilter::filter_batch_with_txn(
1071                &self.commit_ts,
1072                &self.txn_ids,
1073                snapshot_ts,
1074                txn_id,
1075                &mut self.visibility,
1076            );
1077        } else {
1078            SimdVisibilityFilter::filter_batch_into(&self.commit_ts, snapshot_ts, &mut self.visibility);
1079        }
1080
1081        // Build selection vector (indices of visible rows)
1082        for (i, &visible) in self.visibility.iter().enumerate() {
1083            if visible {
1084                self.selection.push(i);
1085            }
1086        }
1087    }
1088
1089    /// Get visible row count (after filtering)
1090    pub fn visible_count(&self) -> usize {
1091        self.selection.len()
1092    }
1093
1094    /// Iterate over visible rows with late materialization
1095    ///
1096    /// Values are only materialized for rows in the selection vector.
1097    pub fn iter_visible(&self) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)> + '_ {
1098        self.selection.iter().map(move |&idx| {
1099            let key = self.keys[idx];
1100            let value = self.value_handles[idx].and_then(|h| h.materialize());
1101            (key, value)
1102        })
1103    }
1104
1105    /// Iterate visible rows with full metadata
1106    pub fn iter_visible_full(
1107        &self,
1108    ) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>, u64, u64)> + '_ {
1109        self.selection.iter().map(move |&idx| {
1110            let key = self.keys[idx];
1111            let value = self.value_handles[idx].and_then(|h| h.materialize());
1112            let ts = self.commit_ts[idx];
1113            let txn = self.txn_ids[idx];
1114            (key, value, ts, txn)
1115        })
1116    }
1117}
1118
1119/// High-performance SoA scan iterator with SIMD + late materialization
1120///
1121/// ## Performance Characteristics
1122///
1123/// | Phase              | Cache Behavior           | SIMD Usage    |
1124/// |--------------------|--------------------------|---------------|
1125/// | Load batch         | Sequential (SoA arrays)  | N/A           |
1126/// | Visibility filter  | L1-hot (commit_ts only)  | 4-8× speedup  |
1127/// | Build selection    | Sequential (visibility)  | Auto-vectorized |
1128/// | Materialize values | Random (visible only)    | N/A           |
1129///
1130/// For 1M rows with 10% selectivity:
1131/// - Old: Process 1M rows with random access = ~50ms
1132/// - New: SIMD filter 1M × 8 bytes = ~1ms, materialize 100K values = ~5ms
1133/// - **~8× speedup** from SoA + SIMD + late materialization
1134pub struct SoaScanIterator<'a, S>
1135where
1136    S: SoaSource<'a>,
1137{
1138    /// Source that provides SoA batches
1139    source: S,
1140    /// Current batch
1141    batch: SoaBatch<'a>,
1142    /// Position in selection vector
1143    pos: usize,
1144    /// Snapshot timestamp
1145    snapshot_ts: u64,
1146    /// Current transaction ID
1147    current_txn_id: Option<u64>,
1148    /// Batch size
1149    #[allow(dead_code)]
1150    batch_size: usize,
1151    /// Statistics
1152    stats: SoaScanStats,
1153}
1154
1155/// Statistics for SoA scan performance monitoring
1156#[derive(Debug, Default, Clone)]
1157pub struct SoaScanStats {
1158    /// Total rows scanned
1159    pub rows_scanned: usize,
1160    /// Rows that passed visibility filter
1161    pub rows_visible: usize,
1162    /// Rows where values were materialized
1163    pub values_materialized: usize,
1164    /// Number of batches processed
1165    pub batches_processed: usize,
1166}
1167
1168impl SoaScanStats {
1169    /// Get selectivity ratio
1170    pub fn selectivity(&self) -> f64 {
1171        if self.rows_scanned == 0 {
1172            0.0
1173        } else {
1174            self.rows_visible as f64 / self.rows_scanned as f64
1175        }
1176    }
1177
1178    /// Get value materialization efficiency
1179    /// (1.0 = all visible rows materialized, lower = some skipped)
1180    pub fn materialization_efficiency(&self) -> f64 {
1181        if self.rows_visible == 0 {
1182            1.0
1183        } else {
1184            self.values_materialized as f64 / self.rows_visible as f64
1185        }
1186    }
1187}
1188
1189/// Trait for sources that provide SoA batches
1190pub trait SoaSource<'a> {
1191    /// Fill a batch with entries from the source
1192    /// Returns false if source is exhausted
1193    fn fill_batch(&mut self, batch: &mut SoaBatch<'a>) -> bool;
1194}
1195
1196impl<'a, S> SoaScanIterator<'a, S>
1197where
1198    S: SoaSource<'a>,
1199{
1200    /// Create new SoA scan iterator
1201    pub fn new(source: S, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
1202        Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
1203    }
1204
1205    /// Create with custom batch size
1206    pub fn with_batch_size(
1207        source: S,
1208        snapshot_ts: u64,
1209        current_txn_id: Option<u64>,
1210        batch_size: usize,
1211    ) -> Self {
1212        Self {
1213            source,
1214            batch: SoaBatch::with_capacity(batch_size),
1215            pos: 0,
1216            snapshot_ts,
1217            current_txn_id,
1218            batch_size,
1219            stats: SoaScanStats::default(),
1220        }
1221    }
1222
1223    /// Fetch next batch with visibility filtering
1224    fn fetch_batch(&mut self) -> bool {
1225        self.batch.clear();
1226        self.pos = 0;
1227
1228        // Fill batch from source (SoA format)
1229        if !self.source.fill_batch(&mut self.batch) {
1230            return false;
1231        }
1232
1233        self.stats.rows_scanned += self.batch.len();
1234        self.stats.batches_processed += 1;
1235
1236        // SIMD visibility filtering on contiguous arrays
1237        self.batch.filter_visibility(self.snapshot_ts, self.current_txn_id);
1238        self.stats.rows_visible += self.batch.visible_count();
1239
1240        true
1241    }
1242
1243    /// Get scan statistics
1244    pub fn stats(&self) -> &SoaScanStats {
1245        &self.stats
1246    }
1247}
1248
1249impl<'a, S> Iterator for SoaScanIterator<'a, S>
1250where
1251    S: SoaSource<'a>,
1252{
1253    type Item = (&'a [u8], Option<&'a [u8]>);
1254
1255    fn next(&mut self) -> Option<Self::Item> {
1256        loop {
1257            // Need new batch?
1258            while self.pos >= self.batch.selection.len() {
1259                if !self.fetch_batch() {
1260                    return None;
1261                }
1262            }
1263
1264            // Get next visible row (late materialization)
1265            let sel_idx = self.pos;
1266            self.pos += 1;
1267            let row_idx = self.batch.selection[sel_idx];
1268
1269            let key = self.batch.keys[row_idx];
1270            let value = self.batch.value_handles[row_idx].and_then(|h| h.materialize());
1271            self.stats.values_materialized += 1;
1272
1273            return Some((key, value));
1274        }
1275    }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use super::*;
1281
1282    #[test]
1283    fn test_column_vector_int64() {
1284        let mut v = ColumnVector::Int64(vec![1, 2, 3, 4, 5]);
1285        assert_eq!(v.len(), 5);
1286        assert_eq!(v.sum_i64(), Some(15));
1287    }
1288
1289    #[test]
1290    fn test_column_vector_float64() {
1291        let v = ColumnVector::Float64(vec![1.0, 2.0, 3.0, 4.0]);
1292        assert_eq!(v.len(), 4);
1293        assert_eq!(v.sum_f64(), Some(10.0));
1294    }
1295
1296    #[test]
1297    fn test_vector_batch() {
1298        let mut batch = VectorBatch::with_capacity(1024);
1299        batch.add_column("id", ColumnVector::Int64(vec![1, 2, 3]));
1300        batch.add_column("value", ColumnVector::Float64(vec![1.5, 2.5, 3.5]));
1301        
1302        assert_eq!(batch.row_count(), 3);
1303        assert_eq!(batch.column_count(), 2);
1304        assert!(batch.column("id").is_some());
1305    }
1306
1307    #[test]
1308    fn test_int64_comparison() {
1309        let col = ColumnVector::Int64(vec![1, 5, 10, 15, 20]);
1310        let pred = Int64Comparison::gt("test", 10);
1311        let result = pred.evaluate(&col);
1312        
1313        assert_eq!(result, vec![false, false, false, true, true]);
1314    }
1315
1316    #[test]
1317    fn test_simd_sum_i64_large() {
1318        // Test with enough elements to trigger SIMD path
1319        let values: Vec<i64> = (0..1000).collect();
1320        let expected: i64 = (0..1000).sum();
1321        
1322        let col = ColumnVector::Int64(values);
1323        assert_eq!(col.sum_i64(), Some(expected));
1324    }
1325
1326    #[test]
1327    fn test_simd_compare_gt() {
1328        let values: Vec<i64> = vec![1, 5, 10, 15, 20, 25, 30, 35];
1329        let result = simd_compare_i64_gt(&values, 12);
1330        assert_eq!(result, vec![false, false, false, true, true, true, true, true]);
1331    }
1332
1333    #[test]
1334    fn test_vectorized_scan_config() {
1335        let config = VectorizedScanConfig::new()
1336            .with_batch_size(2048)
1337            .with_prefetch(true);
1338        
1339        assert_eq!(config.batch_size, 2048);
1340        assert!(config.prefetch_enabled);
1341    }
1342
1343    #[test]
1344    fn test_simd_visibility_filter_basic() {
1345        // commit_ts: 0 = uncommitted, others = commit time
1346        let commit_ts = vec![0, 10, 20, 30, 40];
1347        let snapshot_ts = 25;
1348        
1349        let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1350        
1351        // Visible: ts != 0 AND ts < 25
1352        // ts=0: not visible (uncommitted)
1353        // ts=10: visible (10 < 25)
1354        // ts=20: visible (20 < 25)
1355        // ts=30: not visible (30 >= 25)
1356        // ts=40: not visible (40 >= 25)
1357        assert_eq!(result, vec![false, true, true, false, false]);
1358    }
1359
1360    #[test]
1361    fn test_simd_visibility_filter_with_txn() {
1362        let commit_ts = vec![0, 10, 0, 30, 40];
1363        let txn_ids = vec![1, 2, 1, 4, 5];  // txn_id 1 appears twice
1364        let snapshot_ts = 25;
1365        let current_txn_id = 1;
1366        
1367        let mut result = vec![false; 5];
1368        SimdVisibilityFilter::filter_batch_with_txn(
1369            &commit_ts,
1370            &txn_ids,
1371            snapshot_ts,
1372            current_txn_id,
1373            &mut result,
1374        );
1375        
1376        // Visible:
1377        // [0]: uncommitted but own txn -> visible
1378        // [1]: committed at 10 < 25 -> visible
1379        // [2]: uncommitted but own txn -> visible
1380        // [3]: committed at 30 >= 25 -> not visible
1381        // [4]: committed at 40 >= 25 -> not visible
1382        assert_eq!(result, vec![true, true, true, false, false]);
1383    }
1384
1385    #[test]
1386    fn test_simd_visibility_filter_large() {
1387        // Test with enough elements to trigger SIMD path
1388        let n = 1000;
1389        let commit_ts: Vec<u64> = (1..=n as u64).collect();
1390        let snapshot_ts = 500;
1391        
1392        let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1393        
1394        // First 499 should be visible (1..500 < 500)
1395        let visible_count = result.iter().filter(|&&v| v).count();
1396        assert_eq!(visible_count, 499);
1397    }
1398
1399    #[test]
1400    fn test_versioned_slice_visibility() {
1401        let slice = VersionedSlice {
1402            key: b"test",
1403            value: Some(b"value"),
1404            commit_ts: 100,
1405            txn_id: 1,
1406        };
1407        
1408        assert!(slice.is_visible(200, None));
1409        assert!(!slice.is_visible(50, None));
1410        assert!(slice.is_visible(50, Some(1))); // Self-visibility
1411    }
1412
1413    #[test]
1414    fn test_streaming_scan_iterator() {
1415        let entries: Vec<VersionedSlice<'static>> = vec![
1416            VersionedSlice { key: b"a", value: Some(b"1"), commit_ts: 10, txn_id: 1 },
1417            VersionedSlice { key: b"b", value: Some(b"2"), commit_ts: 0, txn_id: 2 },  // Uncommitted
1418            VersionedSlice { key: b"c", value: Some(b"3"), commit_ts: 30, txn_id: 3 },  // After snapshot
1419            VersionedSlice { key: b"d", value: Some(b"4"), commit_ts: 15, txn_id: 4 },
1420        ];
1421        
1422        let iter = StreamingScanIterator::new(entries.into_iter(), 25, None);
1423        let visible: Vec<_> = iter.collect();
1424        
1425        // Only entries with commit_ts 10 and 15 should be visible
1426        assert_eq!(visible.len(), 2);
1427        assert_eq!(visible[0].key, b"a");
1428        assert_eq!(visible[1].key, b"d");
1429    }
1430
1431    #[test]
1432    fn test_soa_batch_basic() {
1433        let mut batch = SoaBatch::with_capacity(100);
1434        
1435        batch.push(b"key1", Some(b"value1"), 10, 1);
1436        batch.push(b"key2", Some(b"value2"), 20, 2);
1437        batch.push(b"key3", None, 30, 3);  // Tombstone
1438        batch.push(b"key4", Some(b"value4"), 0, 4);  // Uncommitted
1439        
1440        assert_eq!(batch.len(), 4);
1441        assert_eq!(batch.commit_ts, vec![10, 20, 30, 0]);
1442        assert_eq!(batch.txn_ids, vec![1, 2, 3, 4]);
1443    }
1444
1445    #[test]
1446    fn test_soa_batch_visibility_filter() {
1447        let mut batch = SoaBatch::with_capacity(100);
1448        
1449        batch.push(b"k1", Some(b"v1"), 10, 1);  // Visible (10 < 25)
1450        batch.push(b"k2", Some(b"v2"), 0, 2);   // Not visible (uncommitted)
1451        batch.push(b"k3", Some(b"v3"), 20, 3);  // Visible (20 < 25)
1452        batch.push(b"k4", Some(b"v4"), 30, 4);  // Not visible (30 >= 25)
1453        batch.push(b"k5", Some(b"v5"), 0, 5);   // Not visible (uncommitted)
1454        
1455        batch.filter_visibility(25, None);
1456        
1457        assert_eq!(batch.visibility, vec![true, false, true, false, false]);
1458        assert_eq!(batch.selection, vec![0, 2]);  // Indices of visible rows
1459        assert_eq!(batch.visible_count(), 2);
1460    }
1461
1462    #[test]
1463    fn test_soa_batch_self_visibility() {
1464        let mut batch = SoaBatch::with_capacity(100);
1465        
1466        batch.push(b"k1", Some(b"v1"), 0, 42);  // Own uncommitted -> visible
1467        batch.push(b"k2", Some(b"v2"), 10, 1);  // Committed -> visible
1468        batch.push(b"k3", Some(b"v3"), 0, 99);  // Other's uncommitted -> not visible
1469        
1470        batch.filter_visibility(25, Some(42));
1471        
1472        assert_eq!(batch.visibility, vec![true, true, false]);
1473        assert_eq!(batch.selection, vec![0, 1]);
1474    }
1475
1476    #[test]
1477    fn test_soa_batch_late_materialization() {
1478        let mut batch = SoaBatch::with_capacity(100);
1479        
1480        batch.push(b"key1", Some(b"val1"), 10, 1);
1481        batch.push(b"key2", Some(b"val2"), 0, 2);   // Filtered out
1482        batch.push(b"key3", Some(b"val3"), 15, 3);
1483        
1484        batch.filter_visibility(25, None);
1485        
1486        // Iterate visible - values materialized only now
1487        let visible: Vec<_> = batch.iter_visible().collect();
1488        
1489        assert_eq!(visible.len(), 2);
1490        assert_eq!(visible[0], (b"key1".as_slice(), Some(b"val1".as_slice())));
1491        assert_eq!(visible[1], (b"key3".as_slice(), Some(b"val3".as_slice())));
1492    }
1493
1494    #[test]
1495    fn test_soa_scan_stats() {
1496        let mut batch = SoaBatch::with_capacity(100);
1497        
1498        // 10 rows, 3 visible
1499        for i in 0..10u64 {
1500            let ts = if i < 3 { 10 } else { 0 };  // First 3 committed, rest uncommitted
1501            batch.push(b"key", Some(b"val"), ts, i);
1502        }
1503        
1504        batch.filter_visibility(25, None);
1505        
1506        let selectivity = batch.visible_count() as f64 / batch.len() as f64;
1507        assert!((selectivity - 0.3).abs() < 0.01);  // 30% selectivity
1508    }
1509
1510    #[test]
1511    fn test_soa_batch_simd_large() {
1512        // Test with enough entries to trigger SIMD paths
1513        let mut batch = SoaBatch::with_capacity(2000);
1514        
1515        for i in 0..1000u64 {
1516            // Alternating visible/not visible
1517            let ts = if i % 2 == 0 { 10 } else { 50 };
1518            batch.push(b"k", Some(b"v"), ts, i);
1519        }
1520        
1521        batch.filter_visibility(25, None);
1522        
1523        // Should have 500 visible (even indices with ts=10)
1524        assert_eq!(batch.visible_count(), 500);
1525        
1526        // Verify selection indices are correct
1527        for (i, &idx) in batch.selection.iter().enumerate() {
1528            assert_eq!(idx, i * 2);  // 0, 2, 4, 6, ...
1529        }
1530    }
1531}