Skip to main content

sochdb_storage/
vectorized_scan.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SIMD-Accelerated Vectorized Scan Engine (Recommendation 2)
19//!
20//! ## Problem
21//!
22//! Current scan implementation is row-at-a-time:
23//! ```ignore
24//! for entry in self.data.iter() {  // DashMap iteration - pointer chasing
25//!     if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
26//!         && let Some(value) = &v.value  // Option unwrapping
27//!     {
28//!         results.push((key.clone(), value.clone())); // Heap allocs
29//!     }
30//! }
31//! ```
32//!
33//! This has:
34//! - DashMap iterator overhead (~20ns per entry)
35//! - MVCC version chain traversal per row
36//! - Clone allocations in hot path
37//!
38//! ## Solution
39//!
40//! Vectorized execution: process 1024+ rows in a batch, amortizing overhead.
41//!
42//! ## Performance Analysis
43//!
44//! Vectorized execution model:
45//! - **Batch size B = 1024 rows**
46//! - **Per-batch overhead**: O(1) iterator setup + O(B) SIMD operations
47//! - **Amortized cost**: `(setup + B×simd_op) / B ≈ simd_op` for large B
48//!
49//! SIMD comparison (AVX-512):
50//! ```text
51//! Scalar: 100 cycles × 1000 rows = 100,000 cycles
52//! SIMD: 100 cycles × (1000/16) = 6,250 cycles (16x speedup)
53//! ```
54//!
55//! Memory bandwidth calculation:
56//! - Current: 300 bytes/row × 1M rows = 300 MB, random access
57//! - Proposed: 60 bytes/row × 1M rows = 60 MB, sequential scan
58//! - RAM bandwidth: ~50 GB/s → theoretical 833M rows/sec
59//!
60//! ## Expected Improvement
61//!
62//! 10-20x scan throughput improvement
63
64use std::sync::atomic::{AtomicUsize, Ordering};
65
66/// Default batch size for vectorized operations
67/// 1024 rows fits comfortably in L2 cache (~256KB with 256-byte rows)
68pub const DEFAULT_BATCH_SIZE: usize = 1024;
69
70/// Minimum batch size (below this, scalar is faster)
71pub const MIN_BATCH_SIZE: usize = 64;
72
73/// Maximum batch size (above this, memory pressure increases)
74pub const MAX_BATCH_SIZE: usize = 8192;
75
76// =============================================================================
77// Column Vectors for SIMD Operations
78// =============================================================================
79
80/// Typed column vector for SIMD-friendly access
81///
82/// Each variant stores contiguous values for vectorized operations.
83#[derive(Debug, Clone)]
84pub enum ColumnVector {
85    /// Boolean column (bit-packed for SIMD)
86    Bool(Vec<bool>),
87    /// 64-bit signed integers (SIMD-friendly)
88    Int64(Vec<i64>),
89    /// 64-bit unsigned integers
90    UInt64(Vec<u64>),
91    /// 64-bit floats
92    Float64(Vec<f64>),
93    /// Variable-length strings (Arrow-style: offsets + data)
94    String { offsets: Vec<u32>, data: Vec<u8> },
95    /// Binary data (Arrow-style: offsets + data)
96    Binary { offsets: Vec<u32>, data: Vec<u8> },
97    /// Null bitmap for any column (packed bits)
98    Null(Vec<u64>),
99}
100
101impl ColumnVector {
102    /// Create empty column vector of specified type
103    pub fn new_int64(capacity: usize) -> Self {
104        ColumnVector::Int64(Vec::with_capacity(capacity))
105    }
106
107    pub fn new_float64(capacity: usize) -> Self {
108        ColumnVector::Float64(Vec::with_capacity(capacity))
109    }
110
111    pub fn new_string(capacity: usize) -> Self {
112        ColumnVector::String {
113            offsets: Vec::with_capacity(capacity + 1),
114            data: Vec::with_capacity(capacity * 32), // Assume avg 32 bytes per string
115        }
116    }
117
118    /// Get length of vector
119    pub fn len(&self) -> usize {
120        match self {
121            ColumnVector::Bool(v) => v.len(),
122            ColumnVector::Int64(v) => v.len(),
123            ColumnVector::UInt64(v) => v.len(),
124            ColumnVector::Float64(v) => v.len(),
125            ColumnVector::String { offsets, .. } => offsets.len().saturating_sub(1),
126            ColumnVector::Binary { offsets, .. } => offsets.len().saturating_sub(1),
127            ColumnVector::Null(v) => v.len() * 64,
128        }
129    }
130
131    /// Check if empty
132    pub fn is_empty(&self) -> bool {
133        self.len() == 0
134    }
135
136    /// Get memory size in bytes
137    pub fn memory_size(&self) -> usize {
138        match self {
139            ColumnVector::Bool(v) => v.len(),
140            ColumnVector::Int64(v) => v.len() * 8,
141            ColumnVector::UInt64(v) => v.len() * 8,
142            ColumnVector::Float64(v) => v.len() * 8,
143            ColumnVector::String { offsets, data } => offsets.len() * 4 + data.len(),
144            ColumnVector::Binary { offsets, data } => offsets.len() * 4 + data.len(),
145            ColumnVector::Null(v) => v.len() * 8,
146        }
147    }
148
149    /// Sum for Int64 column (SIMD-accelerated)
150    #[cfg(target_arch = "x86_64")]
151    pub fn sum_i64(&self) -> Option<i64> {
152        match self {
153            ColumnVector::Int64(values) => {
154                if values.is_empty() {
155                    return Some(0);
156                }
157
158                // Use SIMD for large vectors
159                if values.len() >= 16 {
160                    Some(simd_sum_i64(values))
161                } else {
162                    Some(values.iter().sum())
163                }
164            }
165            _ => None,
166        }
167    }
168
169    #[cfg(not(target_arch = "x86_64"))]
170    pub fn sum_i64(&self) -> Option<i64> {
171        match self {
172            ColumnVector::Int64(values) => Some(values.iter().sum()),
173            _ => None,
174        }
175    }
176
177    /// Sum for Float64 column (SIMD-accelerated)
178    #[cfg(target_arch = "x86_64")]
179    pub fn sum_f64(&self) -> Option<f64> {
180        match self {
181            ColumnVector::Float64(values) => {
182                if values.is_empty() {
183                    return Some(0.0);
184                }
185
186                if values.len() >= 8 {
187                    Some(simd_sum_f64(values))
188                } else {
189                    Some(values.iter().sum())
190                }
191            }
192            _ => None,
193        }
194    }
195
196    #[cfg(not(target_arch = "x86_64"))]
197    pub fn sum_f64(&self) -> Option<f64> {
198        match self {
199            ColumnVector::Float64(values) => Some(values.iter().sum()),
200            _ => None,
201        }
202    }
203}
204
205// =============================================================================
206// SIMD Implementations (x86_64 with AVX2)
207// =============================================================================
208
209/// SIMD sum for i64 values using AVX2 (4 × i64 per vector)
210#[cfg(target_arch = "x86_64")]
211fn simd_sum_i64(values: &[i64]) -> i64 {
212    #[cfg(target_feature = "avx2")]
213    {
214        use std::arch::x86_64::*;
215
216        unsafe {
217            let mut sum = _mm256_setzero_si256();
218            let chunks = values.len() / 4;
219            let ptr = values.as_ptr();
220
221            for i in 0..chunks {
222                let v = _mm256_loadu_si256(ptr.add(i * 4) as *const __m256i);
223                sum = _mm256_add_epi64(sum, v);
224            }
225
226            // Horizontal add
227            let arr: [i64; 4] = std::mem::transmute(sum);
228            let simd_total: i64 = arr.iter().sum();
229
230            // Add remaining elements
231            let remaining: i64 = values[chunks * 4..].iter().sum();
232            simd_total + remaining
233        }
234    }
235
236    #[cfg(not(target_feature = "avx2"))]
237    {
238        values.iter().sum()
239    }
240}
241
242/// SIMD sum for f64 values using AVX (4 × f64 per vector)
243#[cfg(target_arch = "x86_64")]
244fn simd_sum_f64(values: &[f64]) -> f64 {
245    #[cfg(target_feature = "avx")]
246    {
247        use std::arch::x86_64::*;
248
249        unsafe {
250            let mut sum = _mm256_setzero_pd();
251            let chunks = values.len() / 4;
252            let ptr = values.as_ptr();
253
254            for i in 0..chunks {
255                let v = _mm256_loadu_pd(ptr.add(i * 4));
256                sum = _mm256_add_pd(sum, v);
257            }
258
259            // Horizontal add
260            let arr: [f64; 4] = std::mem::transmute(sum);
261            let simd_total: f64 = arr.iter().sum();
262
263            // Add remaining elements
264            let remaining: f64 = values[chunks * 4..].iter().sum();
265            simd_total + remaining
266        }
267    }
268
269    #[cfg(not(target_feature = "avx"))]
270    {
271        values.iter().sum()
272    }
273}
274
275// =============================================================================
276// Vectorized Batch for Processing
277// =============================================================================
278
279/// A batch of rows for vectorized processing
280///
281/// Instead of processing one row at a time, we accumulate rows into
282/// batches and process them together for better cache utilization
283/// and SIMD opportunities.
284#[derive(Debug)]
285pub struct VectorBatch {
286    /// Column data in columnar format
287    columns: Vec<(String, ColumnVector)>,
288    /// Row count in this batch
289    row_count: usize,
290    /// Capacity (pre-allocated)
291    capacity: usize,
292    /// Selection vector (indexes of selected rows after filtering)
293    selection: Option<Vec<usize>>,
294}
295
296impl VectorBatch {
297    /// Create a new batch with specified capacity
298    pub fn with_capacity(capacity: usize) -> Self {
299        Self {
300            columns: Vec::new(),
301            row_count: 0,
302            capacity,
303            selection: None,
304        }
305    }
306
307    /// Create batch with default size
308    pub fn new() -> Self {
309        Self::with_capacity(DEFAULT_BATCH_SIZE)
310    }
311
312    /// Get batch capacity
313    pub fn capacity(&self) -> usize {
314        self.capacity
315    }
316
317    /// Get current row count
318    pub fn row_count(&self) -> usize {
319        if let Some(ref sel) = self.selection {
320            sel.len()
321        } else {
322            self.row_count
323        }
324    }
325
326    /// Check if batch is full
327    pub fn is_full(&self) -> bool {
328        self.row_count >= self.capacity
329    }
330
331    /// Check if batch is empty
332    pub fn is_empty(&self) -> bool {
333        self.row_count == 0
334    }
335
336    /// Add a column to the batch
337    pub fn add_column(&mut self, name: impl Into<String>, column: ColumnVector) {
338        self.columns.push((name.into(), column));
339        if self.row_count == 0 {
340            self.row_count = self.columns.last().map(|(_, c)| c.len()).unwrap_or(0);
341        }
342    }
343
344    /// Get column by name
345    pub fn column(&self, name: &str) -> Option<&ColumnVector> {
346        self.columns.iter().find(|(n, _)| n == name).map(|(_, c)| c)
347    }
348
349    /// Get column by index
350    pub fn column_at(&self, idx: usize) -> Option<&ColumnVector> {
351        self.columns.get(idx).map(|(_, c)| c)
352    }
353
354    /// Get column count
355    pub fn column_count(&self) -> usize {
356        self.columns.len()
357    }
358
359    /// Set selection vector (for filtering)
360    pub fn set_selection(&mut self, selection: Vec<usize>) {
361        self.selection = Some(selection);
362    }
363
364    /// Clear selection vector
365    pub fn clear_selection(&mut self) {
366        self.selection = None;
367    }
368
369    /// Get total memory size
370    pub fn memory_size(&self) -> usize {
371        self.columns.iter().map(|(_, c)| c.memory_size()).sum()
372    }
373
374    /// Reset batch for reuse
375    pub fn reset(&mut self) {
376        self.columns.clear();
377        self.row_count = 0;
378        self.selection = None;
379    }
380}
381
382impl Default for VectorBatch {
383    fn default() -> Self {
384        Self::new()
385    }
386}
387
388// =============================================================================
389// Vectorized Scan Engine
390// =============================================================================
391
392/// Statistics for vectorized scan operations
393#[derive(Debug, Default)]
394pub struct VectorizedScanStats {
395    /// Total rows scanned
396    pub rows_scanned: AtomicUsize,
397    /// Batches processed
398    pub batches_processed: AtomicUsize,
399    /// Rows passing filter
400    pub rows_passed: AtomicUsize,
401    /// Bytes read from storage
402    pub bytes_read: AtomicUsize,
403}
404
405impl VectorizedScanStats {
406    pub fn new() -> Self {
407        Self::default()
408    }
409
410    pub fn record_batch(&self, rows: usize, passed: usize, bytes: usize) {
411        self.rows_scanned.fetch_add(rows, Ordering::Relaxed);
412        self.batches_processed.fetch_add(1, Ordering::Relaxed);
413        self.rows_passed.fetch_add(passed, Ordering::Relaxed);
414        self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
415    }
416
417    pub fn rows_scanned(&self) -> usize {
418        self.rows_scanned.load(Ordering::Relaxed)
419    }
420
421    pub fn batches_processed(&self) -> usize {
422        self.batches_processed.load(Ordering::Relaxed)
423    }
424}
425
426/// Predicate for vectorized filtering
427pub trait VectorPredicate: Send + Sync {
428    /// Apply predicate to a column vector, returning selection bitmap
429    fn evaluate(&self, column: &ColumnVector) -> Vec<bool>;
430
431    /// Get the column name this predicate operates on
432    fn column_name(&self) -> &str;
433}
434
435/// Comparison predicate for i64 columns
436#[derive(Debug, Clone)]
437pub struct Int64Comparison {
438    column_name: String,
439    op: ComparisonOp,
440    value: i64,
441}
442
443/// Comparison operators
444#[derive(Debug, Clone, Copy)]
445pub enum ComparisonOp {
446    Equal,
447    NotEqual,
448    LessThan,
449    LessEqual,
450    GreaterThan,
451    GreaterEqual,
452}
453
454impl Int64Comparison {
455    pub fn new(column_name: impl Into<String>, op: ComparisonOp, value: i64) -> Self {
456        Self {
457            column_name: column_name.into(),
458            op,
459            value,
460        }
461    }
462
463    pub fn eq(column_name: impl Into<String>, value: i64) -> Self {
464        Self::new(column_name, ComparisonOp::Equal, value)
465    }
466
467    pub fn gt(column_name: impl Into<String>, value: i64) -> Self {
468        Self::new(column_name, ComparisonOp::GreaterThan, value)
469    }
470
471    pub fn lt(column_name: impl Into<String>, value: i64) -> Self {
472        Self::new(column_name, ComparisonOp::LessThan, value)
473    }
474}
475
476impl VectorPredicate for Int64Comparison {
477    fn evaluate(&self, column: &ColumnVector) -> Vec<bool> {
478        match column {
479            ColumnVector::Int64(values) => {
480                let cmp_value = self.value;
481                match self.op {
482                    ComparisonOp::Equal => values.iter().map(|&v| v == cmp_value).collect(),
483                    ComparisonOp::NotEqual => values.iter().map(|&v| v != cmp_value).collect(),
484                    ComparisonOp::LessThan => values.iter().map(|&v| v < cmp_value).collect(),
485                    ComparisonOp::LessEqual => values.iter().map(|&v| v <= cmp_value).collect(),
486                    ComparisonOp::GreaterThan => values.iter().map(|&v| v > cmp_value).collect(),
487                    ComparisonOp::GreaterEqual => values.iter().map(|&v| v >= cmp_value).collect(),
488                }
489            }
490            _ => vec![false; column.len()],
491        }
492    }
493
494    fn column_name(&self) -> &str {
495        &self.column_name
496    }
497}
498
499/// Apply SIMD-optimized comparison for i64 values
500#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
501pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
502    use std::arch::x86_64::*;
503
504    let mut result = vec![false; values.len()];
505    let chunks = values.len() / 4;
506
507    unsafe {
508        let threshold_vec = _mm256_set1_epi64x(threshold);
509
510        for i in 0..chunks {
511            let v = _mm256_loadu_si256(values.as_ptr().add(i * 4) as *const __m256i);
512            let cmp = _mm256_cmpgt_epi64(v, threshold_vec);
513            let mask = _mm256_movemask_epi8(cmp) as u32;
514
515            // Each i64 occupies 8 bytes, so bits 0-7, 8-15, 16-23, 24-31
516            result[i * 4] = (mask & 0xFF) != 0;
517            result[i * 4 + 1] = (mask & 0xFF00) != 0;
518            result[i * 4 + 2] = (mask & 0xFF0000) != 0;
519            result[i * 4 + 3] = (mask & 0xFF000000) != 0;
520        }
521
522        // Handle remaining
523        for i in (chunks * 4)..values.len() {
524            result[i] = values[i] > threshold;
525        }
526    }
527
528    result
529}
530
531#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
532pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
533    values.iter().map(|&v| v > threshold).collect()
534}
535
536// =============================================================================
537// Vectorized Scan Iterator
538// =============================================================================
539
540/// Configuration for vectorized scans
541#[derive(Debug, Clone)]
542pub struct VectorizedScanConfig {
543    /// Batch size for processing
544    pub batch_size: usize,
545    /// Enable prefetching
546    pub prefetch_enabled: bool,
547    /// Prefetch distance in rows
548    pub prefetch_distance: usize,
549    /// Enable SIMD acceleration
550    pub simd_enabled: bool,
551}
552
553impl Default for VectorizedScanConfig {
554    fn default() -> Self {
555        Self {
556            batch_size: DEFAULT_BATCH_SIZE,
557            prefetch_enabled: true,
558            prefetch_distance: 16,
559            simd_enabled: true,
560        }
561    }
562}
563
564impl VectorizedScanConfig {
565    pub fn new() -> Self {
566        Self::default()
567    }
568
569    pub fn with_batch_size(mut self, size: usize) -> Self {
570        self.batch_size = size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
571        self
572    }
573
574    pub fn with_prefetch(mut self, enabled: bool) -> Self {
575        self.prefetch_enabled = enabled;
576        self
577    }
578}
579
580// =============================================================================
581// SIMD Visibility Filtering (Task 4: Zero-Copy SIMD Scans)
582// =============================================================================
583
584/// SIMD-accelerated MVCC visibility filter
585///
586/// ## Performance Analysis
587///
588/// For a snapshot timestamp `S`, a row with commit_ts `C` is visible if:
589/// - `C != 0` (committed) AND `C < S` (committed before snapshot)
590///
591/// This can be vectorized by processing 4 timestamps at a time (AVX2):
592/// ```text
593/// visible[i] = (commit_ts[i] != 0) & (commit_ts[i] < snapshot_ts)
594/// ```
595///
596/// Throughput improvement:
597/// - Scalar: ~1 billion comparisons/sec
598/// - AVX2 (4-way): ~4 billion comparisons/sec  
599/// - AVX-512 (8-way): ~8 billion comparisons/sec
600///
601/// For 1M rows: scalar = 1ms, SIMD = 125-250µs (4-8x speedup)
602pub struct SimdVisibilityFilter;
603
604impl SimdVisibilityFilter {
605    /// Filter a batch of commit timestamps for visibility
606    ///
607    /// Returns a bitmask where 1 = visible, 0 = not visible
608    #[inline]
609    pub fn filter_batch(commit_ts: &[u64], snapshot_ts: u64) -> Vec<bool> {
610        let mut result = vec![false; commit_ts.len()];
611        Self::filter_batch_into(commit_ts, snapshot_ts, &mut result);
612        result
613    }
614
615    /// Filter into an existing buffer to avoid allocation
616    #[inline]
617    pub fn filter_batch_into(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
618        assert_eq!(commit_ts.len(), out.len());
619
620        #[cfg(target_arch = "x86_64")]
621        {
622            Self::filter_batch_simd_x86(commit_ts, snapshot_ts, out);
623            return;
624        }
625
626        #[cfg(target_arch = "aarch64")]
627        {
628            Self::filter_batch_simd_neon(commit_ts, snapshot_ts, out);
629            return;
630        }
631
632        #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
633        {
634            Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
635        }
636    }
637
638    /// Scalar fallback implementation
639    #[inline]
640    fn filter_batch_scalar(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
641        for (i, &ts) in commit_ts.iter().enumerate() {
642            // Visible if: committed (ts != 0) AND committed before snapshot (ts < snapshot_ts)
643            out[i] = ts != 0 && ts < snapshot_ts;
644        }
645    }
646
647    /// x86_64 AVX2 implementation (4 u64s per iteration)
648    #[cfg(target_arch = "x86_64")]
649    fn filter_batch_simd_x86(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
650        let n = commit_ts.len();
651        if n == 0 {
652            return;
653        }
654
655        // Process in chunks of 4 (AVX2 processes 4 × u64)
656        let chunks = n / 4;
657        let remainder = n % 4;
658
659        // Use wide comparison on x86_64
660        // Note: AVX2 doesn't have native u64 comparison, so we use signed comparison
661        // which works for our use case (timestamps are positive)
662        #[cfg(target_feature = "avx2")]
663        unsafe {
664            use std::arch::x86_64::*;
665
666            let zero = _mm256_setzero_si256();
667            let snapshot_vec = _mm256_set1_epi64x(snapshot_ts as i64);
668
669            for chunk in 0..chunks {
670                let ptr = commit_ts.as_ptr().add(chunk * 4) as *const __m256i;
671                let ts_vec = _mm256_loadu_si256(ptr);
672
673                // Check ts != 0 using PCMPEQ and inverting
674                let not_zero = _mm256_xor_si256(
675                    _mm256_cmpeq_epi64(ts_vec, zero),
676                    _mm256_set1_epi64x(-1), // All 1s
677                );
678
679                // Check ts < snapshot using PCMPGT and inverting
680                // (a < b) == !(a >= b) == !(a > b || a == b)
681                let less_than = _mm256_xor_si256(
682                    _mm256_or_si256(
683                        _mm256_cmpgt_epi64(ts_vec, snapshot_vec),
684                        _mm256_cmpeq_epi64(ts_vec, snapshot_vec),
685                    ),
686                    _mm256_set1_epi64x(-1),
687                );
688
689                // Combine: visible = not_zero AND less_than
690                let visible = _mm256_and_si256(not_zero, less_than);
691
692                // Extract results - each 64-bit lane is either all 1s or all 0s
693                let mask: [i64; 4] = std::mem::transmute(visible);
694                for j in 0..4 {
695                    out[chunk * 4 + j] = mask[j] != 0;
696                }
697            }
698        }
699
700        #[cfg(not(target_feature = "avx2"))]
701        {
702            // SSE2 fallback (2 × u64 per iteration)
703            let chunks = n / 2;
704            for chunk in 0..chunks {
705                let base = chunk * 2;
706                for j in 0..2 {
707                    let ts = commit_ts[base + j];
708                    out[base + j] = ts != 0 && ts < snapshot_ts;
709                }
710            }
711        }
712
713        // Handle remainder
714        let base = chunks * 4;
715        for i in 0..remainder {
716            let ts = commit_ts[base + i];
717            out[base + i] = ts != 0 && ts < snapshot_ts;
718        }
719    }
720
721    /// ARM NEON implementation (2 u64s per iteration)
722    #[cfg(target_arch = "aarch64")]
723    fn filter_batch_simd_neon(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
724        // NEON doesn't have all the u64 operations we need, so use scalar on ARM
725        // In a production system, we'd use assembly or wait for better NEON intrinsics
726        Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
727    }
728
729    /// Filter with transaction ID for self-visibility
730    ///
731    /// A row is visible if:
732    /// - (commit_ts != 0 AND commit_ts < snapshot_ts), OR
733    /// - txn_id == current_txn_id (own uncommitted writes)
734    #[inline]
735    pub fn filter_batch_with_txn(
736        commit_ts: &[u64],
737        txn_ids: &[u64],
738        snapshot_ts: u64,
739        current_txn_id: u64,
740        out: &mut [bool],
741    ) {
742        assert_eq!(commit_ts.len(), txn_ids.len());
743        assert_eq!(commit_ts.len(), out.len());
744
745        // First pass: standard visibility
746        Self::filter_batch_into(commit_ts, snapshot_ts, out);
747
748        // Second pass: add self-visibility
749        for (i, &txn_id) in txn_ids.iter().enumerate() {
750            if txn_id == current_txn_id {
751                out[i] = true;
752            }
753        }
754    }
755
756    /// Count visible rows without allocating a result vector
757    #[inline]
758    pub fn count_visible(commit_ts: &[u64], snapshot_ts: u64) -> usize {
759        let mut count = 0;
760        for &ts in commit_ts {
761            if ts != 0 && ts < snapshot_ts {
762                count += 1;
763            }
764        }
765        count
766    }
767}
768
769/// Versioned slice for zero-copy access
770///
771/// Holds a reference to data along with visibility metadata,
772/// avoiding copies during scan iteration.
773#[derive(Debug, Clone)]
774pub struct VersionedSlice<'a> {
775    /// Key bytes (zero-copy reference)
776    pub key: &'a [u8],
777    /// Value bytes (zero-copy reference, None = tombstone)
778    pub value: Option<&'a [u8]>,
779    /// Commit timestamp (0 = uncommitted)
780    pub commit_ts: u64,
781    /// Transaction ID that wrote this version
782    pub txn_id: u64,
783}
784
785impl<'a> VersionedSlice<'a> {
786    /// Check visibility at a snapshot
787    #[inline]
788    pub fn is_visible(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
789        // Self-visibility
790        if let Some(my_txn) = current_txn_id {
791            if self.txn_id == my_txn {
792                return true;
793            }
794        }
795        // Standard visibility: committed before snapshot
796        self.commit_ts != 0 && self.commit_ts < snapshot_ts
797    }
798}
799
800/// Streaming scan iterator with SIMD visibility filtering
801///
802/// ## Design
803///
804/// Instead of materializing all results upfront, this iterator:
805/// 1. Fetches batches of entries from the underlying source
806/// 2. Applies SIMD visibility filtering to the batch
807/// 3. Yields visible entries one at a time
808///
809/// This reduces memory allocation and leverages SIMD for batch filtering.
810pub struct StreamingScanIterator<'a, I>
811where
812    I: Iterator<Item = VersionedSlice<'a>>,
813{
814    /// Underlying iterator
815    source: I,
816    /// Current batch of entries
817    batch: Vec<VersionedSlice<'a>>,
818    /// Visibility mask for current batch
819    visibility: Vec<bool>,
820    /// Current position in batch
821    pos: usize,
822    /// Snapshot timestamp for visibility
823    snapshot_ts: u64,
824    /// Current transaction ID for self-visibility
825    current_txn_id: Option<u64>,
826    /// Batch size for prefetching
827    batch_size: usize,
828}
829
830impl<'a, I> StreamingScanIterator<'a, I>
831where
832    I: Iterator<Item = VersionedSlice<'a>>,
833{
834    /// Create a new streaming scan iterator
835    pub fn new(source: I, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
836        Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
837    }
838
839    /// Create with custom batch size
840    pub fn with_batch_size(
841        source: I,
842        snapshot_ts: u64,
843        current_txn_id: Option<u64>,
844        batch_size: usize,
845    ) -> Self {
846        Self {
847            source,
848            batch: Vec::with_capacity(batch_size),
849            visibility: Vec::with_capacity(batch_size),
850            pos: 0,
851            snapshot_ts,
852            current_txn_id,
853            batch_size,
854        }
855    }
856
857    /// Fetch next batch and compute visibility
858    fn fetch_batch(&mut self) -> bool {
859        self.batch.clear();
860        self.visibility.clear();
861        self.pos = 0;
862
863        // Collect batch
864        for entry in self.source.by_ref().take(self.batch_size) {
865            self.batch.push(entry);
866        }
867
868        if self.batch.is_empty() {
869            return false;
870        }
871
872        // Extract commit timestamps for SIMD filtering
873        let commit_ts: Vec<u64> = self.batch.iter().map(|e| e.commit_ts).collect();
874        self.visibility.resize(self.batch.len(), false);
875
876        if let Some(txn_id) = self.current_txn_id {
877            let txn_ids: Vec<u64> = self.batch.iter().map(|e| e.txn_id).collect();
878            SimdVisibilityFilter::filter_batch_with_txn(
879                &commit_ts,
880                &txn_ids,
881                self.snapshot_ts,
882                txn_id,
883                &mut self.visibility,
884            );
885        } else {
886            SimdVisibilityFilter::filter_batch_into(
887                &commit_ts,
888                self.snapshot_ts,
889                &mut self.visibility,
890            );
891        }
892
893        true
894    }
895}
896
897impl<'a, I> Iterator for StreamingScanIterator<'a, I>
898where
899    I: Iterator<Item = VersionedSlice<'a>>,
900{
901    type Item = VersionedSlice<'a>;
902
903    fn next(&mut self) -> Option<Self::Item> {
904        loop {
905            // Exhausted current batch?
906            while self.pos >= self.batch.len() {
907                if !self.fetch_batch() {
908                    return None;
909                }
910            }
911
912            // Find next visible entry in current batch
913            while self.pos < self.batch.len() {
914                let idx = self.pos;
915                self.pos += 1;
916
917                if self.visibility[idx] {
918                    return Some(self.batch[idx].clone());
919                }
920            }
921        }
922    }
923}
924
925// =============================================================================
926// SoA Batch + Late Materialization (80/20 Optimization)
927// =============================================================================
928
929/// Structure of Arrays (SoA) batch for optimal SIMD visibility filtering
930///
931/// ## Why SoA?
932///
933/// AoS (Array of Structures) layout:
934/// ```text
935/// [key, value, commit_ts, txn_id], [key, value, commit_ts, txn_id], ...
936/// ```
937/// - Poor cache utilization for visibility checks
938/// - SIMD loads scatter data across cache lines
939///
940/// SoA (Structure of Arrays) layout:
941/// ```text
942/// commit_ts: [ts1, ts2, ts3, ts4, ...]  // Contiguous for SIMD
943/// txn_ids:   [id1, id2, id3, id4, ...]  // Contiguous for SIMD
944/// keys:      [k1, k2, k3, k4, ...]      // Only accessed for visible rows
945/// values:    [v1, v2, v3, v4, ...]      // Late materialized
946/// ```
947///
948/// SIMD can process 4-8 timestamps per cycle with perfect cache utilization.
949///
950/// ## Late Materialization
951///
952/// Values are NOT copied into the batch. Instead, we store offsets/handles
953/// and only materialize values for rows that pass visibility filtering.
954///
955/// For scans where 90% of rows are filtered out, this saves ~90% of value copies.
956#[derive(Debug)]
957pub struct SoaBatch<'a> {
958    /// Contiguous commit timestamps for SIMD visibility filtering
959    pub commit_ts: Vec<u64>,
960    /// Contiguous transaction IDs for self-visibility checking
961    pub txn_ids: Vec<u64>,
962    /// Key references (zero-copy)
963    pub keys: Vec<&'a [u8]>,
964    /// Value materializer handles (late binding)
965    /// None = tombstone, Some = handle to materialize value
966    pub value_handles: Vec<Option<ValueHandle<'a>>>,
967    /// Pre-computed visibility mask (after SIMD filtering)
968    pub visibility: Vec<bool>,
969    /// Selection vector: indices of visible rows for late materialization
970    pub selection: Vec<usize>,
971}
972
973/// Handle for late value materialization
974///
975/// Instead of copying values upfront, we store a handle that can
976/// materialize the value on-demand when needed.
977#[derive(Debug, Clone, Copy)]
978pub enum ValueHandle<'a> {
979    /// Direct reference (zero-copy for inmemory data)
980    Direct(&'a [u8]),
981    /// Offset in a data block (for disk-resident data)
982    BlockOffset {
983        block_id: u32,
984        offset: u32,
985        len: u32,
986    },
987    /// Deferred load from arena
988    ArenaSlot { arena_id: u32, slot: u32 },
989}
990
991impl<'a> ValueHandle<'a> {
992    /// Materialize the value (called only for visible rows)
993    pub fn materialize(&self) -> Option<&'a [u8]> {
994        match self {
995            ValueHandle::Direct(data) => Some(*data),
996            // For block/arena handles, would call into storage layer
997            // For now, return None (would be implemented with storage context)
998            ValueHandle::BlockOffset { .. } => None,
999            ValueHandle::ArenaSlot { .. } => None,
1000        }
1001    }
1002}
1003
1004impl<'a> SoaBatch<'a> {
1005    /// Create a new SoA batch with capacity
1006    pub fn with_capacity(capacity: usize) -> Self {
1007        Self {
1008            commit_ts: Vec::with_capacity(capacity),
1009            txn_ids: Vec::with_capacity(capacity),
1010            keys: Vec::with_capacity(capacity),
1011            value_handles: Vec::with_capacity(capacity),
1012            visibility: Vec::with_capacity(capacity),
1013            selection: Vec::with_capacity(capacity),
1014        }
1015    }
1016
1017    /// Add an entry to the batch (SoA decomposition)
1018    #[inline]
1019    pub fn push(&mut self, key: &'a [u8], value: Option<&'a [u8]>, commit_ts: u64, txn_id: u64) {
1020        self.commit_ts.push(commit_ts);
1021        self.txn_ids.push(txn_id);
1022        self.keys.push(key);
1023        self.value_handles.push(value.map(ValueHandle::Direct));
1024    }
1025
1026    /// Add with block handle (for disk-resident values)
1027    #[inline]
1028    pub fn push_deferred(
1029        &mut self,
1030        key: &'a [u8],
1031        handle: Option<ValueHandle<'a>>,
1032        commit_ts: u64,
1033        txn_id: u64,
1034    ) {
1035        self.commit_ts.push(commit_ts);
1036        self.txn_ids.push(txn_id);
1037        self.keys.push(key);
1038        self.value_handles.push(handle);
1039    }
1040
1041    /// Get batch size
1042    pub fn len(&self) -> usize {
1043        self.commit_ts.len()
1044    }
1045
1046    /// Check if empty
1047    pub fn is_empty(&self) -> bool {
1048        self.commit_ts.is_empty()
1049    }
1050
1051    /// Clear the batch for reuse
1052    pub fn clear(&mut self) {
1053        self.commit_ts.clear();
1054        self.txn_ids.clear();
1055        self.keys.clear();
1056        self.value_handles.clear();
1057        self.visibility.clear();
1058        self.selection.clear();
1059    }
1060
1061    /// Apply SIMD visibility filtering and build selection vector
1062    ///
1063    /// This is the hot path - SIMD processes commit_ts array directly
1064    /// without touching keys/values until we know what's visible.
1065    pub fn filter_visibility(&mut self, snapshot_ts: u64, current_txn_id: Option<u64>) {
1066        let n = self.len();
1067        self.visibility.resize(n, false);
1068        self.selection.clear();
1069
1070        // SIMD filter on contiguous commit_ts array
1071        if let Some(txn_id) = current_txn_id {
1072            SimdVisibilityFilter::filter_batch_with_txn(
1073                &self.commit_ts,
1074                &self.txn_ids,
1075                snapshot_ts,
1076                txn_id,
1077                &mut self.visibility,
1078            );
1079        } else {
1080            SimdVisibilityFilter::filter_batch_into(
1081                &self.commit_ts,
1082                snapshot_ts,
1083                &mut self.visibility,
1084            );
1085        }
1086
1087        // Build selection vector (indices of visible rows)
1088        for (i, &visible) in self.visibility.iter().enumerate() {
1089            if visible {
1090                self.selection.push(i);
1091            }
1092        }
1093    }
1094
1095    /// Get visible row count (after filtering)
1096    pub fn visible_count(&self) -> usize {
1097        self.selection.len()
1098    }
1099
1100    /// Iterate over visible rows with late materialization
1101    ///
1102    /// Values are only materialized for rows in the selection vector.
1103    pub fn iter_visible(&self) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)> + '_ {
1104        self.selection.iter().map(move |&idx| {
1105            let key = self.keys[idx];
1106            let value = self.value_handles[idx].and_then(|h| h.materialize());
1107            (key, value)
1108        })
1109    }
1110
1111    /// Iterate visible rows with full metadata
1112    pub fn iter_visible_full(
1113        &self,
1114    ) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>, u64, u64)> + '_ {
1115        self.selection.iter().map(move |&idx| {
1116            let key = self.keys[idx];
1117            let value = self.value_handles[idx].and_then(|h| h.materialize());
1118            let ts = self.commit_ts[idx];
1119            let txn = self.txn_ids[idx];
1120            (key, value, ts, txn)
1121        })
1122    }
1123}
1124
1125/// High-performance SoA scan iterator with SIMD + late materialization
1126///
1127/// ## Performance Characteristics
1128///
1129/// | Phase              | Cache Behavior           | SIMD Usage    |
1130/// |--------------------|--------------------------|---------------|
1131/// | Load batch         | Sequential (SoA arrays)  | N/A           |
1132/// | Visibility filter  | L1-hot (commit_ts only)  | 4-8× speedup  |
1133/// | Build selection    | Sequential (visibility)  | Auto-vectorized |
1134/// | Materialize values | Random (visible only)    | N/A           |
1135///
1136/// For 1M rows with 10% selectivity:
1137/// - Old: Process 1M rows with random access = ~50ms
1138/// - New: SIMD filter 1M × 8 bytes = ~1ms, materialize 100K values = ~5ms
1139/// - **~8× speedup** from SoA + SIMD + late materialization
1140pub struct SoaScanIterator<'a, S>
1141where
1142    S: SoaSource<'a>,
1143{
1144    /// Source that provides SoA batches
1145    source: S,
1146    /// Current batch
1147    batch: SoaBatch<'a>,
1148    /// Position in selection vector
1149    pos: usize,
1150    /// Snapshot timestamp
1151    snapshot_ts: u64,
1152    /// Current transaction ID
1153    current_txn_id: Option<u64>,
1154    /// Batch size
1155    #[allow(dead_code)]
1156    batch_size: usize,
1157    /// Statistics
1158    stats: SoaScanStats,
1159}
1160
1161/// Statistics for SoA scan performance monitoring
1162#[derive(Debug, Default, Clone)]
1163pub struct SoaScanStats {
1164    /// Total rows scanned
1165    pub rows_scanned: usize,
1166    /// Rows that passed visibility filter
1167    pub rows_visible: usize,
1168    /// Rows where values were materialized
1169    pub values_materialized: usize,
1170    /// Number of batches processed
1171    pub batches_processed: usize,
1172}
1173
1174impl SoaScanStats {
1175    /// Get selectivity ratio
1176    pub fn selectivity(&self) -> f64 {
1177        if self.rows_scanned == 0 {
1178            0.0
1179        } else {
1180            self.rows_visible as f64 / self.rows_scanned as f64
1181        }
1182    }
1183
1184    /// Get value materialization efficiency
1185    /// (1.0 = all visible rows materialized, lower = some skipped)
1186    pub fn materialization_efficiency(&self) -> f64 {
1187        if self.rows_visible == 0 {
1188            1.0
1189        } else {
1190            self.values_materialized as f64 / self.rows_visible as f64
1191        }
1192    }
1193}
1194
1195/// Trait for sources that provide SoA batches
1196pub trait SoaSource<'a> {
1197    /// Fill a batch with entries from the source
1198    /// Returns false if source is exhausted
1199    fn fill_batch(&mut self, batch: &mut SoaBatch<'a>) -> bool;
1200}
1201
1202impl<'a, S> SoaScanIterator<'a, S>
1203where
1204    S: SoaSource<'a>,
1205{
1206    /// Create new SoA scan iterator
1207    pub fn new(source: S, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
1208        Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
1209    }
1210
1211    /// Create with custom batch size
1212    pub fn with_batch_size(
1213        source: S,
1214        snapshot_ts: u64,
1215        current_txn_id: Option<u64>,
1216        batch_size: usize,
1217    ) -> Self {
1218        Self {
1219            source,
1220            batch: SoaBatch::with_capacity(batch_size),
1221            pos: 0,
1222            snapshot_ts,
1223            current_txn_id,
1224            batch_size,
1225            stats: SoaScanStats::default(),
1226        }
1227    }
1228
1229    /// Fetch next batch with visibility filtering
1230    fn fetch_batch(&mut self) -> bool {
1231        self.batch.clear();
1232        self.pos = 0;
1233
1234        // Fill batch from source (SoA format)
1235        if !self.source.fill_batch(&mut self.batch) {
1236            return false;
1237        }
1238
1239        self.stats.rows_scanned += self.batch.len();
1240        self.stats.batches_processed += 1;
1241
1242        // SIMD visibility filtering on contiguous arrays
1243        self.batch
1244            .filter_visibility(self.snapshot_ts, self.current_txn_id);
1245        self.stats.rows_visible += self.batch.visible_count();
1246
1247        true
1248    }
1249
1250    /// Get scan statistics
1251    pub fn stats(&self) -> &SoaScanStats {
1252        &self.stats
1253    }
1254}
1255
1256impl<'a, S> Iterator for SoaScanIterator<'a, S>
1257where
1258    S: SoaSource<'a>,
1259{
1260    type Item = (&'a [u8], Option<&'a [u8]>);
1261
1262    fn next(&mut self) -> Option<Self::Item> {
1263        // Refill batches until we have a visible row (or the source is drained).
1264        while self.pos >= self.batch.selection.len() {
1265            if !self.fetch_batch() {
1266                return None;
1267            }
1268        }
1269
1270        // Get next visible row (late materialization)
1271        let sel_idx = self.pos;
1272        self.pos += 1;
1273        let row_idx = self.batch.selection[sel_idx];
1274
1275        let key = self.batch.keys[row_idx];
1276        let value = self.batch.value_handles[row_idx].and_then(|h| h.materialize());
1277        self.stats.values_materialized += 1;
1278
1279        Some((key, value))
1280    }
1281}
1282
1283#[cfg(test)]
1284mod tests {
1285    use super::*;
1286
1287    #[test]
1288    fn test_column_vector_int64() {
1289        let mut v = ColumnVector::Int64(vec![1, 2, 3, 4, 5]);
1290        assert_eq!(v.len(), 5);
1291        assert_eq!(v.sum_i64(), Some(15));
1292    }
1293
1294    #[test]
1295    fn test_column_vector_float64() {
1296        let v = ColumnVector::Float64(vec![1.0, 2.0, 3.0, 4.0]);
1297        assert_eq!(v.len(), 4);
1298        assert_eq!(v.sum_f64(), Some(10.0));
1299    }
1300
1301    #[test]
1302    fn test_vector_batch() {
1303        let mut batch = VectorBatch::with_capacity(1024);
1304        batch.add_column("id", ColumnVector::Int64(vec![1, 2, 3]));
1305        batch.add_column("value", ColumnVector::Float64(vec![1.5, 2.5, 3.5]));
1306
1307        assert_eq!(batch.row_count(), 3);
1308        assert_eq!(batch.column_count(), 2);
1309        assert!(batch.column("id").is_some());
1310    }
1311
1312    #[test]
1313    fn test_int64_comparison() {
1314        let col = ColumnVector::Int64(vec![1, 5, 10, 15, 20]);
1315        let pred = Int64Comparison::gt("test", 10);
1316        let result = pred.evaluate(&col);
1317
1318        assert_eq!(result, vec![false, false, false, true, true]);
1319    }
1320
1321    #[test]
1322    fn test_simd_sum_i64_large() {
1323        // Test with enough elements to trigger SIMD path
1324        let values: Vec<i64> = (0..1000).collect();
1325        let expected: i64 = (0..1000).sum();
1326
1327        let col = ColumnVector::Int64(values);
1328        assert_eq!(col.sum_i64(), Some(expected));
1329    }
1330
1331    #[test]
1332    fn test_simd_compare_gt() {
1333        let values: Vec<i64> = vec![1, 5, 10, 15, 20, 25, 30, 35];
1334        let result = simd_compare_i64_gt(&values, 12);
1335        assert_eq!(
1336            result,
1337            vec![false, false, false, true, true, true, true, true]
1338        );
1339    }
1340
1341    #[test]
1342    fn test_vectorized_scan_config() {
1343        let config = VectorizedScanConfig::new()
1344            .with_batch_size(2048)
1345            .with_prefetch(true);
1346
1347        assert_eq!(config.batch_size, 2048);
1348        assert!(config.prefetch_enabled);
1349    }
1350
1351    #[test]
1352    fn test_simd_visibility_filter_basic() {
1353        // commit_ts: 0 = uncommitted, others = commit time
1354        let commit_ts = vec![0, 10, 20, 30, 40];
1355        let snapshot_ts = 25;
1356
1357        let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1358
1359        // Visible: ts != 0 AND ts < 25
1360        // ts=0: not visible (uncommitted)
1361        // ts=10: visible (10 < 25)
1362        // ts=20: visible (20 < 25)
1363        // ts=30: not visible (30 >= 25)
1364        // ts=40: not visible (40 >= 25)
1365        assert_eq!(result, vec![false, true, true, false, false]);
1366    }
1367
1368    #[test]
1369    fn test_simd_visibility_filter_with_txn() {
1370        let commit_ts = vec![0, 10, 0, 30, 40];
1371        let txn_ids = vec![1, 2, 1, 4, 5]; // txn_id 1 appears twice
1372        let snapshot_ts = 25;
1373        let current_txn_id = 1;
1374
1375        let mut result = vec![false; 5];
1376        SimdVisibilityFilter::filter_batch_with_txn(
1377            &commit_ts,
1378            &txn_ids,
1379            snapshot_ts,
1380            current_txn_id,
1381            &mut result,
1382        );
1383
1384        // Visible:
1385        // [0]: uncommitted but own txn -> visible
1386        // [1]: committed at 10 < 25 -> visible
1387        // [2]: uncommitted but own txn -> visible
1388        // [3]: committed at 30 >= 25 -> not visible
1389        // [4]: committed at 40 >= 25 -> not visible
1390        assert_eq!(result, vec![true, true, true, false, false]);
1391    }
1392
1393    #[test]
1394    fn test_simd_visibility_filter_large() {
1395        // Test with enough elements to trigger SIMD path
1396        let n = 1000;
1397        let commit_ts: Vec<u64> = (1..=n as u64).collect();
1398        let snapshot_ts = 500;
1399
1400        let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1401
1402        // First 499 should be visible (1..500 < 500)
1403        let visible_count = result.iter().filter(|&&v| v).count();
1404        assert_eq!(visible_count, 499);
1405    }
1406
1407    #[test]
1408    fn test_versioned_slice_visibility() {
1409        let slice = VersionedSlice {
1410            key: b"test",
1411            value: Some(b"value"),
1412            commit_ts: 100,
1413            txn_id: 1,
1414        };
1415
1416        assert!(slice.is_visible(200, None));
1417        assert!(!slice.is_visible(50, None));
1418        assert!(slice.is_visible(50, Some(1))); // Self-visibility
1419    }
1420
1421    #[test]
1422    fn test_streaming_scan_iterator() {
1423        let entries: Vec<VersionedSlice<'static>> = vec![
1424            VersionedSlice {
1425                key: b"a",
1426                value: Some(b"1"),
1427                commit_ts: 10,
1428                txn_id: 1,
1429            },
1430            VersionedSlice {
1431                key: b"b",
1432                value: Some(b"2"),
1433                commit_ts: 0,
1434                txn_id: 2,
1435            }, // Uncommitted
1436            VersionedSlice {
1437                key: b"c",
1438                value: Some(b"3"),
1439                commit_ts: 30,
1440                txn_id: 3,
1441            }, // After snapshot
1442            VersionedSlice {
1443                key: b"d",
1444                value: Some(b"4"),
1445                commit_ts: 15,
1446                txn_id: 4,
1447            },
1448        ];
1449
1450        let iter = StreamingScanIterator::new(entries.into_iter(), 25, None);
1451        let visible: Vec<_> = iter.collect();
1452
1453        // Only entries with commit_ts 10 and 15 should be visible
1454        assert_eq!(visible.len(), 2);
1455        assert_eq!(visible[0].key, b"a");
1456        assert_eq!(visible[1].key, b"d");
1457    }
1458
1459    #[test]
1460    fn test_soa_batch_basic() {
1461        let mut batch = SoaBatch::with_capacity(100);
1462
1463        batch.push(b"key1", Some(b"value1"), 10, 1);
1464        batch.push(b"key2", Some(b"value2"), 20, 2);
1465        batch.push(b"key3", None, 30, 3); // Tombstone
1466        batch.push(b"key4", Some(b"value4"), 0, 4); // Uncommitted
1467
1468        assert_eq!(batch.len(), 4);
1469        assert_eq!(batch.commit_ts, vec![10, 20, 30, 0]);
1470        assert_eq!(batch.txn_ids, vec![1, 2, 3, 4]);
1471    }
1472
1473    #[test]
1474    fn test_soa_batch_visibility_filter() {
1475        let mut batch = SoaBatch::with_capacity(100);
1476
1477        batch.push(b"k1", Some(b"v1"), 10, 1); // Visible (10 < 25)
1478        batch.push(b"k2", Some(b"v2"), 0, 2); // Not visible (uncommitted)
1479        batch.push(b"k3", Some(b"v3"), 20, 3); // Visible (20 < 25)
1480        batch.push(b"k4", Some(b"v4"), 30, 4); // Not visible (30 >= 25)
1481        batch.push(b"k5", Some(b"v5"), 0, 5); // Not visible (uncommitted)
1482
1483        batch.filter_visibility(25, None);
1484
1485        assert_eq!(batch.visibility, vec![true, false, true, false, false]);
1486        assert_eq!(batch.selection, vec![0, 2]); // Indices of visible rows
1487        assert_eq!(batch.visible_count(), 2);
1488    }
1489
1490    #[test]
1491    fn test_soa_batch_self_visibility() {
1492        let mut batch = SoaBatch::with_capacity(100);
1493
1494        batch.push(b"k1", Some(b"v1"), 0, 42); // Own uncommitted -> visible
1495        batch.push(b"k2", Some(b"v2"), 10, 1); // Committed -> visible
1496        batch.push(b"k3", Some(b"v3"), 0, 99); // Other's uncommitted -> not visible
1497
1498        batch.filter_visibility(25, Some(42));
1499
1500        assert_eq!(batch.visibility, vec![true, true, false]);
1501        assert_eq!(batch.selection, vec![0, 1]);
1502    }
1503
1504    #[test]
1505    fn test_soa_batch_late_materialization() {
1506        let mut batch = SoaBatch::with_capacity(100);
1507
1508        batch.push(b"key1", Some(b"val1"), 10, 1);
1509        batch.push(b"key2", Some(b"val2"), 0, 2); // Filtered out
1510        batch.push(b"key3", Some(b"val3"), 15, 3);
1511
1512        batch.filter_visibility(25, None);
1513
1514        // Iterate visible - values materialized only now
1515        let visible: Vec<_> = batch.iter_visible().collect();
1516
1517        assert_eq!(visible.len(), 2);
1518        assert_eq!(visible[0], (b"key1".as_slice(), Some(b"val1".as_slice())));
1519        assert_eq!(visible[1], (b"key3".as_slice(), Some(b"val3".as_slice())));
1520    }
1521
1522    #[test]
1523    fn test_soa_scan_stats() {
1524        let mut batch = SoaBatch::with_capacity(100);
1525
1526        // 10 rows, 3 visible
1527        for i in 0..10u64 {
1528            let ts = if i < 3 { 10 } else { 0 }; // First 3 committed, rest uncommitted
1529            batch.push(b"key", Some(b"val"), ts, i);
1530        }
1531
1532        batch.filter_visibility(25, None);
1533
1534        let selectivity = batch.visible_count() as f64 / batch.len() as f64;
1535        assert!((selectivity - 0.3).abs() < 0.01); // 30% selectivity
1536    }
1537
1538    #[test]
1539    fn test_soa_batch_simd_large() {
1540        // Test with enough entries to trigger SIMD paths
1541        let mut batch = SoaBatch::with_capacity(2000);
1542
1543        for i in 0..1000u64 {
1544            // Alternating visible/not visible
1545            let ts = if i % 2 == 0 { 10 } else { 50 };
1546            batch.push(b"k", Some(b"v"), ts, i);
1547        }
1548
1549        batch.filter_visibility(25, None);
1550
1551        // Should have 500 visible (even indices with ts=10)
1552        assert_eq!(batch.visible_count(), 500);
1553
1554        // Verify selection indices are correct
1555        for (i, &idx) in batch.selection.iter().enumerate() {
1556            assert_eq!(idx, i * 2); // 0, 2, 4, 6, ...
1557        }
1558    }
1559}