1use std::sync::atomic::{AtomicUsize, Ordering};
65
66pub const DEFAULT_BATCH_SIZE: usize = 1024;
69
70pub const MIN_BATCH_SIZE: usize = 64;
72
73pub const MAX_BATCH_SIZE: usize = 8192;
75
76#[derive(Debug, Clone)]
84pub enum ColumnVector {
85 Bool(Vec<bool>),
87 Int64(Vec<i64>),
89 UInt64(Vec<u64>),
91 Float64(Vec<f64>),
93 String { offsets: Vec<u32>, data: Vec<u8> },
95 Binary { offsets: Vec<u32>, data: Vec<u8> },
97 Null(Vec<u64>),
99}
100
101impl ColumnVector {
102 pub fn new_int64(capacity: usize) -> Self {
104 ColumnVector::Int64(Vec::with_capacity(capacity))
105 }
106
107 pub fn new_float64(capacity: usize) -> Self {
108 ColumnVector::Float64(Vec::with_capacity(capacity))
109 }
110
111 pub fn new_string(capacity: usize) -> Self {
112 ColumnVector::String {
113 offsets: Vec::with_capacity(capacity + 1),
114 data: Vec::with_capacity(capacity * 32), }
116 }
117
118 pub fn len(&self) -> usize {
120 match self {
121 ColumnVector::Bool(v) => v.len(),
122 ColumnVector::Int64(v) => v.len(),
123 ColumnVector::UInt64(v) => v.len(),
124 ColumnVector::Float64(v) => v.len(),
125 ColumnVector::String { offsets, .. } => offsets.len().saturating_sub(1),
126 ColumnVector::Binary { offsets, .. } => offsets.len().saturating_sub(1),
127 ColumnVector::Null(v) => v.len() * 64,
128 }
129 }
130
131 pub fn is_empty(&self) -> bool {
133 self.len() == 0
134 }
135
136 pub fn memory_size(&self) -> usize {
138 match self {
139 ColumnVector::Bool(v) => v.len(),
140 ColumnVector::Int64(v) => v.len() * 8,
141 ColumnVector::UInt64(v) => v.len() * 8,
142 ColumnVector::Float64(v) => v.len() * 8,
143 ColumnVector::String { offsets, data } => offsets.len() * 4 + data.len(),
144 ColumnVector::Binary { offsets, data } => offsets.len() * 4 + data.len(),
145 ColumnVector::Null(v) => v.len() * 8,
146 }
147 }
148
149 #[cfg(target_arch = "x86_64")]
151 pub fn sum_i64(&self) -> Option<i64> {
152 match self {
153 ColumnVector::Int64(values) => {
154 if values.is_empty() {
155 return Some(0);
156 }
157
158 if values.len() >= 16 {
160 Some(simd_sum_i64(values))
161 } else {
162 Some(values.iter().sum())
163 }
164 }
165 _ => None,
166 }
167 }
168
169 #[cfg(not(target_arch = "x86_64"))]
170 pub fn sum_i64(&self) -> Option<i64> {
171 match self {
172 ColumnVector::Int64(values) => Some(values.iter().sum()),
173 _ => None,
174 }
175 }
176
177 #[cfg(target_arch = "x86_64")]
179 pub fn sum_f64(&self) -> Option<f64> {
180 match self {
181 ColumnVector::Float64(values) => {
182 if values.is_empty() {
183 return Some(0.0);
184 }
185
186 if values.len() >= 8 {
187 Some(simd_sum_f64(values))
188 } else {
189 Some(values.iter().sum())
190 }
191 }
192 _ => None,
193 }
194 }
195
196 #[cfg(not(target_arch = "x86_64"))]
197 pub fn sum_f64(&self) -> Option<f64> {
198 match self {
199 ColumnVector::Float64(values) => Some(values.iter().sum()),
200 _ => None,
201 }
202 }
203}
204
205#[cfg(target_arch = "x86_64")]
211fn simd_sum_i64(values: &[i64]) -> i64 {
212 #[cfg(target_feature = "avx2")]
213 {
214 use std::arch::x86_64::*;
215
216 unsafe {
217 let mut sum = _mm256_setzero_si256();
218 let chunks = values.len() / 4;
219 let ptr = values.as_ptr();
220
221 for i in 0..chunks {
222 let v = _mm256_loadu_si256(ptr.add(i * 4) as *const __m256i);
223 sum = _mm256_add_epi64(sum, v);
224 }
225
226 let arr: [i64; 4] = std::mem::transmute(sum);
228 let simd_total: i64 = arr.iter().sum();
229
230 let remaining: i64 = values[chunks * 4..].iter().sum();
232 simd_total + remaining
233 }
234 }
235
236 #[cfg(not(target_feature = "avx2"))]
237 {
238 values.iter().sum()
239 }
240}
241
242#[cfg(target_arch = "x86_64")]
244fn simd_sum_f64(values: &[f64]) -> f64 {
245 #[cfg(target_feature = "avx")]
246 {
247 use std::arch::x86_64::*;
248
249 unsafe {
250 let mut sum = _mm256_setzero_pd();
251 let chunks = values.len() / 4;
252 let ptr = values.as_ptr();
253
254 for i in 0..chunks {
255 let v = _mm256_loadu_pd(ptr.add(i * 4));
256 sum = _mm256_add_pd(sum, v);
257 }
258
259 let arr: [f64; 4] = std::mem::transmute(sum);
261 let simd_total: f64 = arr.iter().sum();
262
263 let remaining: f64 = values[chunks * 4..].iter().sum();
265 simd_total + remaining
266 }
267 }
268
269 #[cfg(not(target_feature = "avx"))]
270 {
271 values.iter().sum()
272 }
273}
274
275#[derive(Debug)]
285pub struct VectorBatch {
286 columns: Vec<(String, ColumnVector)>,
288 row_count: usize,
290 capacity: usize,
292 selection: Option<Vec<usize>>,
294}
295
296impl VectorBatch {
297 pub fn with_capacity(capacity: usize) -> Self {
299 Self {
300 columns: Vec::new(),
301 row_count: 0,
302 capacity,
303 selection: None,
304 }
305 }
306
307 pub fn new() -> Self {
309 Self::with_capacity(DEFAULT_BATCH_SIZE)
310 }
311
312 pub fn capacity(&self) -> usize {
314 self.capacity
315 }
316
317 pub fn row_count(&self) -> usize {
319 if let Some(ref sel) = self.selection {
320 sel.len()
321 } else {
322 self.row_count
323 }
324 }
325
326 pub fn is_full(&self) -> bool {
328 self.row_count >= self.capacity
329 }
330
331 pub fn is_empty(&self) -> bool {
333 self.row_count == 0
334 }
335
336 pub fn add_column(&mut self, name: impl Into<String>, column: ColumnVector) {
338 self.columns.push((name.into(), column));
339 if self.row_count == 0 {
340 self.row_count = self.columns.last().map(|(_, c)| c.len()).unwrap_or(0);
341 }
342 }
343
344 pub fn column(&self, name: &str) -> Option<&ColumnVector> {
346 self.columns.iter().find(|(n, _)| n == name).map(|(_, c)| c)
347 }
348
349 pub fn column_at(&self, idx: usize) -> Option<&ColumnVector> {
351 self.columns.get(idx).map(|(_, c)| c)
352 }
353
354 pub fn column_count(&self) -> usize {
356 self.columns.len()
357 }
358
359 pub fn set_selection(&mut self, selection: Vec<usize>) {
361 self.selection = Some(selection);
362 }
363
364 pub fn clear_selection(&mut self) {
366 self.selection = None;
367 }
368
369 pub fn memory_size(&self) -> usize {
371 self.columns.iter().map(|(_, c)| c.memory_size()).sum()
372 }
373
374 pub fn reset(&mut self) {
376 self.columns.clear();
377 self.row_count = 0;
378 self.selection = None;
379 }
380}
381
382impl Default for VectorBatch {
383 fn default() -> Self {
384 Self::new()
385 }
386}
387
388#[derive(Debug, Default)]
394pub struct VectorizedScanStats {
395 pub rows_scanned: AtomicUsize,
397 pub batches_processed: AtomicUsize,
399 pub rows_passed: AtomicUsize,
401 pub bytes_read: AtomicUsize,
403}
404
405impl VectorizedScanStats {
406 pub fn new() -> Self {
407 Self::default()
408 }
409
410 pub fn record_batch(&self, rows: usize, passed: usize, bytes: usize) {
411 self.rows_scanned.fetch_add(rows, Ordering::Relaxed);
412 self.batches_processed.fetch_add(1, Ordering::Relaxed);
413 self.rows_passed.fetch_add(passed, Ordering::Relaxed);
414 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
415 }
416
417 pub fn rows_scanned(&self) -> usize {
418 self.rows_scanned.load(Ordering::Relaxed)
419 }
420
421 pub fn batches_processed(&self) -> usize {
422 self.batches_processed.load(Ordering::Relaxed)
423 }
424}
425
426pub trait VectorPredicate: Send + Sync {
428 fn evaluate(&self, column: &ColumnVector) -> Vec<bool>;
430
431 fn column_name(&self) -> &str;
433}
434
435#[derive(Debug, Clone)]
437pub struct Int64Comparison {
438 column_name: String,
439 op: ComparisonOp,
440 value: i64,
441}
442
443#[derive(Debug, Clone, Copy)]
445pub enum ComparisonOp {
446 Equal,
447 NotEqual,
448 LessThan,
449 LessEqual,
450 GreaterThan,
451 GreaterEqual,
452}
453
454impl Int64Comparison {
455 pub fn new(column_name: impl Into<String>, op: ComparisonOp, value: i64) -> Self {
456 Self {
457 column_name: column_name.into(),
458 op,
459 value,
460 }
461 }
462
463 pub fn eq(column_name: impl Into<String>, value: i64) -> Self {
464 Self::new(column_name, ComparisonOp::Equal, value)
465 }
466
467 pub fn gt(column_name: impl Into<String>, value: i64) -> Self {
468 Self::new(column_name, ComparisonOp::GreaterThan, value)
469 }
470
471 pub fn lt(column_name: impl Into<String>, value: i64) -> Self {
472 Self::new(column_name, ComparisonOp::LessThan, value)
473 }
474}
475
476impl VectorPredicate for Int64Comparison {
477 fn evaluate(&self, column: &ColumnVector) -> Vec<bool> {
478 match column {
479 ColumnVector::Int64(values) => {
480 let cmp_value = self.value;
481 match self.op {
482 ComparisonOp::Equal => values.iter().map(|&v| v == cmp_value).collect(),
483 ComparisonOp::NotEqual => values.iter().map(|&v| v != cmp_value).collect(),
484 ComparisonOp::LessThan => values.iter().map(|&v| v < cmp_value).collect(),
485 ComparisonOp::LessEqual => values.iter().map(|&v| v <= cmp_value).collect(),
486 ComparisonOp::GreaterThan => values.iter().map(|&v| v > cmp_value).collect(),
487 ComparisonOp::GreaterEqual => values.iter().map(|&v| v >= cmp_value).collect(),
488 }
489 }
490 _ => vec![false; column.len()],
491 }
492 }
493
494 fn column_name(&self) -> &str {
495 &self.column_name
496 }
497}
498
499#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
501pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
502 use std::arch::x86_64::*;
503
504 let mut result = vec![false; values.len()];
505 let chunks = values.len() / 4;
506
507 unsafe {
508 let threshold_vec = _mm256_set1_epi64x(threshold);
509
510 for i in 0..chunks {
511 let v = _mm256_loadu_si256(values.as_ptr().add(i * 4) as *const __m256i);
512 let cmp = _mm256_cmpgt_epi64(v, threshold_vec);
513 let mask = _mm256_movemask_epi8(cmp) as u32;
514
515 result[i * 4] = (mask & 0xFF) != 0;
517 result[i * 4 + 1] = (mask & 0xFF00) != 0;
518 result[i * 4 + 2] = (mask & 0xFF0000) != 0;
519 result[i * 4 + 3] = (mask & 0xFF000000) != 0;
520 }
521
522 for i in (chunks * 4)..values.len() {
524 result[i] = values[i] > threshold;
525 }
526 }
527
528 result
529}
530
531#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
532pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
533 values.iter().map(|&v| v > threshold).collect()
534}
535
536#[derive(Debug, Clone)]
542pub struct VectorizedScanConfig {
543 pub batch_size: usize,
545 pub prefetch_enabled: bool,
547 pub prefetch_distance: usize,
549 pub simd_enabled: bool,
551}
552
553impl Default for VectorizedScanConfig {
554 fn default() -> Self {
555 Self {
556 batch_size: DEFAULT_BATCH_SIZE,
557 prefetch_enabled: true,
558 prefetch_distance: 16,
559 simd_enabled: true,
560 }
561 }
562}
563
564impl VectorizedScanConfig {
565 pub fn new() -> Self {
566 Self::default()
567 }
568
569 pub fn with_batch_size(mut self, size: usize) -> Self {
570 self.batch_size = size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
571 self
572 }
573
574 pub fn with_prefetch(mut self, enabled: bool) -> Self {
575 self.prefetch_enabled = enabled;
576 self
577 }
578}
579
580pub struct SimdVisibilityFilter;
603
604impl SimdVisibilityFilter {
605 #[inline]
609 pub fn filter_batch(commit_ts: &[u64], snapshot_ts: u64) -> Vec<bool> {
610 let mut result = vec![false; commit_ts.len()];
611 Self::filter_batch_into(commit_ts, snapshot_ts, &mut result);
612 result
613 }
614
615 #[inline]
617 pub fn filter_batch_into(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
618 assert_eq!(commit_ts.len(), out.len());
619
620 #[cfg(target_arch = "x86_64")]
621 {
622 Self::filter_batch_simd_x86(commit_ts, snapshot_ts, out);
623 return;
624 }
625
626 #[cfg(target_arch = "aarch64")]
627 {
628 Self::filter_batch_simd_neon(commit_ts, snapshot_ts, out);
629 return;
630 }
631
632 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
633 {
634 Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
635 }
636 }
637
638 #[inline]
640 fn filter_batch_scalar(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
641 for (i, &ts) in commit_ts.iter().enumerate() {
642 out[i] = ts != 0 && ts < snapshot_ts;
644 }
645 }
646
647 #[cfg(target_arch = "x86_64")]
649 fn filter_batch_simd_x86(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
650 let n = commit_ts.len();
651 if n == 0 {
652 return;
653 }
654
655 let chunks = n / 4;
657 let remainder = n % 4;
658
659 #[cfg(target_feature = "avx2")]
663 unsafe {
664 use std::arch::x86_64::*;
665
666 let zero = _mm256_setzero_si256();
667 let snapshot_vec = _mm256_set1_epi64x(snapshot_ts as i64);
668
669 for chunk in 0..chunks {
670 let ptr = commit_ts.as_ptr().add(chunk * 4) as *const __m256i;
671 let ts_vec = _mm256_loadu_si256(ptr);
672
673 let not_zero = _mm256_xor_si256(
675 _mm256_cmpeq_epi64(ts_vec, zero),
676 _mm256_set1_epi64x(-1), );
678
679 let less_than = _mm256_xor_si256(
682 _mm256_or_si256(
683 _mm256_cmpgt_epi64(ts_vec, snapshot_vec),
684 _mm256_cmpeq_epi64(ts_vec, snapshot_vec),
685 ),
686 _mm256_set1_epi64x(-1),
687 );
688
689 let visible = _mm256_and_si256(not_zero, less_than);
691
692 let mask: [i64; 4] = std::mem::transmute(visible);
694 for j in 0..4 {
695 out[chunk * 4 + j] = mask[j] != 0;
696 }
697 }
698 }
699
700 #[cfg(not(target_feature = "avx2"))]
701 {
702 let chunks = n / 2;
704 for chunk in 0..chunks {
705 let base = chunk * 2;
706 for j in 0..2 {
707 let ts = commit_ts[base + j];
708 out[base + j] = ts != 0 && ts < snapshot_ts;
709 }
710 }
711 }
712
713 let base = chunks * 4;
715 for i in 0..remainder {
716 let ts = commit_ts[base + i];
717 out[base + i] = ts != 0 && ts < snapshot_ts;
718 }
719 }
720
721 #[cfg(target_arch = "aarch64")]
723 fn filter_batch_simd_neon(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
724 Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
727 }
728
729 #[inline]
735 pub fn filter_batch_with_txn(
736 commit_ts: &[u64],
737 txn_ids: &[u64],
738 snapshot_ts: u64,
739 current_txn_id: u64,
740 out: &mut [bool],
741 ) {
742 assert_eq!(commit_ts.len(), txn_ids.len());
743 assert_eq!(commit_ts.len(), out.len());
744
745 Self::filter_batch_into(commit_ts, snapshot_ts, out);
747
748 for (i, &txn_id) in txn_ids.iter().enumerate() {
750 if txn_id == current_txn_id {
751 out[i] = true;
752 }
753 }
754 }
755
756 #[inline]
758 pub fn count_visible(commit_ts: &[u64], snapshot_ts: u64) -> usize {
759 let mut count = 0;
760 for &ts in commit_ts {
761 if ts != 0 && ts < snapshot_ts {
762 count += 1;
763 }
764 }
765 count
766 }
767}
768
769#[derive(Debug, Clone)]
774pub struct VersionedSlice<'a> {
775 pub key: &'a [u8],
777 pub value: Option<&'a [u8]>,
779 pub commit_ts: u64,
781 pub txn_id: u64,
783}
784
785impl<'a> VersionedSlice<'a> {
786 #[inline]
788 pub fn is_visible(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
789 if let Some(my_txn) = current_txn_id {
791 if self.txn_id == my_txn {
792 return true;
793 }
794 }
795 self.commit_ts != 0 && self.commit_ts < snapshot_ts
797 }
798}
799
800pub struct StreamingScanIterator<'a, I>
811where
812 I: Iterator<Item = VersionedSlice<'a>>,
813{
814 source: I,
816 batch: Vec<VersionedSlice<'a>>,
818 visibility: Vec<bool>,
820 pos: usize,
822 snapshot_ts: u64,
824 current_txn_id: Option<u64>,
826 batch_size: usize,
828}
829
830impl<'a, I> StreamingScanIterator<'a, I>
831where
832 I: Iterator<Item = VersionedSlice<'a>>,
833{
834 pub fn new(source: I, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
836 Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
837 }
838
839 pub fn with_batch_size(
841 source: I,
842 snapshot_ts: u64,
843 current_txn_id: Option<u64>,
844 batch_size: usize,
845 ) -> Self {
846 Self {
847 source,
848 batch: Vec::with_capacity(batch_size),
849 visibility: Vec::with_capacity(batch_size),
850 pos: 0,
851 snapshot_ts,
852 current_txn_id,
853 batch_size,
854 }
855 }
856
857 fn fetch_batch(&mut self) -> bool {
859 self.batch.clear();
860 self.visibility.clear();
861 self.pos = 0;
862
863 for entry in self.source.by_ref().take(self.batch_size) {
865 self.batch.push(entry);
866 }
867
868 if self.batch.is_empty() {
869 return false;
870 }
871
872 let commit_ts: Vec<u64> = self.batch.iter().map(|e| e.commit_ts).collect();
874 self.visibility.resize(self.batch.len(), false);
875
876 if let Some(txn_id) = self.current_txn_id {
877 let txn_ids: Vec<u64> = self.batch.iter().map(|e| e.txn_id).collect();
878 SimdVisibilityFilter::filter_batch_with_txn(
879 &commit_ts,
880 &txn_ids,
881 self.snapshot_ts,
882 txn_id,
883 &mut self.visibility,
884 );
885 } else {
886 SimdVisibilityFilter::filter_batch_into(
887 &commit_ts,
888 self.snapshot_ts,
889 &mut self.visibility,
890 );
891 }
892
893 true
894 }
895}
896
897impl<'a, I> Iterator for StreamingScanIterator<'a, I>
898where
899 I: Iterator<Item = VersionedSlice<'a>>,
900{
901 type Item = VersionedSlice<'a>;
902
903 fn next(&mut self) -> Option<Self::Item> {
904 loop {
905 while self.pos >= self.batch.len() {
907 if !self.fetch_batch() {
908 return None;
909 }
910 }
911
912 while self.pos < self.batch.len() {
914 let idx = self.pos;
915 self.pos += 1;
916
917 if self.visibility[idx] {
918 return Some(self.batch[idx].clone());
919 }
920 }
921 }
922 }
923}
924
925#[derive(Debug)]
957pub struct SoaBatch<'a> {
958 pub commit_ts: Vec<u64>,
960 pub txn_ids: Vec<u64>,
962 pub keys: Vec<&'a [u8]>,
964 pub value_handles: Vec<Option<ValueHandle<'a>>>,
967 pub visibility: Vec<bool>,
969 pub selection: Vec<usize>,
971}
972
973#[derive(Debug, Clone, Copy)]
978pub enum ValueHandle<'a> {
979 Direct(&'a [u8]),
981 BlockOffset {
983 block_id: u32,
984 offset: u32,
985 len: u32,
986 },
987 ArenaSlot { arena_id: u32, slot: u32 },
989}
990
991impl<'a> ValueHandle<'a> {
992 pub fn materialize(&self) -> Option<&'a [u8]> {
994 match self {
995 ValueHandle::Direct(data) => Some(*data),
996 ValueHandle::BlockOffset { .. } => None,
999 ValueHandle::ArenaSlot { .. } => None,
1000 }
1001 }
1002}
1003
1004impl<'a> SoaBatch<'a> {
1005 pub fn with_capacity(capacity: usize) -> Self {
1007 Self {
1008 commit_ts: Vec::with_capacity(capacity),
1009 txn_ids: Vec::with_capacity(capacity),
1010 keys: Vec::with_capacity(capacity),
1011 value_handles: Vec::with_capacity(capacity),
1012 visibility: Vec::with_capacity(capacity),
1013 selection: Vec::with_capacity(capacity),
1014 }
1015 }
1016
1017 #[inline]
1019 pub fn push(&mut self, key: &'a [u8], value: Option<&'a [u8]>, commit_ts: u64, txn_id: u64) {
1020 self.commit_ts.push(commit_ts);
1021 self.txn_ids.push(txn_id);
1022 self.keys.push(key);
1023 self.value_handles.push(value.map(ValueHandle::Direct));
1024 }
1025
1026 #[inline]
1028 pub fn push_deferred(
1029 &mut self,
1030 key: &'a [u8],
1031 handle: Option<ValueHandle<'a>>,
1032 commit_ts: u64,
1033 txn_id: u64,
1034 ) {
1035 self.commit_ts.push(commit_ts);
1036 self.txn_ids.push(txn_id);
1037 self.keys.push(key);
1038 self.value_handles.push(handle);
1039 }
1040
1041 pub fn len(&self) -> usize {
1043 self.commit_ts.len()
1044 }
1045
1046 pub fn is_empty(&self) -> bool {
1048 self.commit_ts.is_empty()
1049 }
1050
1051 pub fn clear(&mut self) {
1053 self.commit_ts.clear();
1054 self.txn_ids.clear();
1055 self.keys.clear();
1056 self.value_handles.clear();
1057 self.visibility.clear();
1058 self.selection.clear();
1059 }
1060
1061 pub fn filter_visibility(&mut self, snapshot_ts: u64, current_txn_id: Option<u64>) {
1066 let n = self.len();
1067 self.visibility.resize(n, false);
1068 self.selection.clear();
1069
1070 if let Some(txn_id) = current_txn_id {
1072 SimdVisibilityFilter::filter_batch_with_txn(
1073 &self.commit_ts,
1074 &self.txn_ids,
1075 snapshot_ts,
1076 txn_id,
1077 &mut self.visibility,
1078 );
1079 } else {
1080 SimdVisibilityFilter::filter_batch_into(
1081 &self.commit_ts,
1082 snapshot_ts,
1083 &mut self.visibility,
1084 );
1085 }
1086
1087 for (i, &visible) in self.visibility.iter().enumerate() {
1089 if visible {
1090 self.selection.push(i);
1091 }
1092 }
1093 }
1094
1095 pub fn visible_count(&self) -> usize {
1097 self.selection.len()
1098 }
1099
1100 pub fn iter_visible(&self) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)> + '_ {
1104 self.selection.iter().map(move |&idx| {
1105 let key = self.keys[idx];
1106 let value = self.value_handles[idx].and_then(|h| h.materialize());
1107 (key, value)
1108 })
1109 }
1110
1111 pub fn iter_visible_full(
1113 &self,
1114 ) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>, u64, u64)> + '_ {
1115 self.selection.iter().map(move |&idx| {
1116 let key = self.keys[idx];
1117 let value = self.value_handles[idx].and_then(|h| h.materialize());
1118 let ts = self.commit_ts[idx];
1119 let txn = self.txn_ids[idx];
1120 (key, value, ts, txn)
1121 })
1122 }
1123}
1124
1125pub struct SoaScanIterator<'a, S>
1141where
1142 S: SoaSource<'a>,
1143{
1144 source: S,
1146 batch: SoaBatch<'a>,
1148 pos: usize,
1150 snapshot_ts: u64,
1152 current_txn_id: Option<u64>,
1154 #[allow(dead_code)]
1156 batch_size: usize,
1157 stats: SoaScanStats,
1159}
1160
1161#[derive(Debug, Default, Clone)]
1163pub struct SoaScanStats {
1164 pub rows_scanned: usize,
1166 pub rows_visible: usize,
1168 pub values_materialized: usize,
1170 pub batches_processed: usize,
1172}
1173
1174impl SoaScanStats {
1175 pub fn selectivity(&self) -> f64 {
1177 if self.rows_scanned == 0 {
1178 0.0
1179 } else {
1180 self.rows_visible as f64 / self.rows_scanned as f64
1181 }
1182 }
1183
1184 pub fn materialization_efficiency(&self) -> f64 {
1187 if self.rows_visible == 0 {
1188 1.0
1189 } else {
1190 self.values_materialized as f64 / self.rows_visible as f64
1191 }
1192 }
1193}
1194
1195pub trait SoaSource<'a> {
1197 fn fill_batch(&mut self, batch: &mut SoaBatch<'a>) -> bool;
1200}
1201
1202impl<'a, S> SoaScanIterator<'a, S>
1203where
1204 S: SoaSource<'a>,
1205{
1206 pub fn new(source: S, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
1208 Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
1209 }
1210
1211 pub fn with_batch_size(
1213 source: S,
1214 snapshot_ts: u64,
1215 current_txn_id: Option<u64>,
1216 batch_size: usize,
1217 ) -> Self {
1218 Self {
1219 source,
1220 batch: SoaBatch::with_capacity(batch_size),
1221 pos: 0,
1222 snapshot_ts,
1223 current_txn_id,
1224 batch_size,
1225 stats: SoaScanStats::default(),
1226 }
1227 }
1228
1229 fn fetch_batch(&mut self) -> bool {
1231 self.batch.clear();
1232 self.pos = 0;
1233
1234 if !self.source.fill_batch(&mut self.batch) {
1236 return false;
1237 }
1238
1239 self.stats.rows_scanned += self.batch.len();
1240 self.stats.batches_processed += 1;
1241
1242 self.batch
1244 .filter_visibility(self.snapshot_ts, self.current_txn_id);
1245 self.stats.rows_visible += self.batch.visible_count();
1246
1247 true
1248 }
1249
1250 pub fn stats(&self) -> &SoaScanStats {
1252 &self.stats
1253 }
1254}
1255
1256impl<'a, S> Iterator for SoaScanIterator<'a, S>
1257where
1258 S: SoaSource<'a>,
1259{
1260 type Item = (&'a [u8], Option<&'a [u8]>);
1261
1262 fn next(&mut self) -> Option<Self::Item> {
1263 while self.pos >= self.batch.selection.len() {
1265 if !self.fetch_batch() {
1266 return None;
1267 }
1268 }
1269
1270 let sel_idx = self.pos;
1272 self.pos += 1;
1273 let row_idx = self.batch.selection[sel_idx];
1274
1275 let key = self.batch.keys[row_idx];
1276 let value = self.batch.value_handles[row_idx].and_then(|h| h.materialize());
1277 self.stats.values_materialized += 1;
1278
1279 Some((key, value))
1280 }
1281}
1282
1283#[cfg(test)]
1284mod tests {
1285 use super::*;
1286
1287 #[test]
1288 fn test_column_vector_int64() {
1289 let mut v = ColumnVector::Int64(vec![1, 2, 3, 4, 5]);
1290 assert_eq!(v.len(), 5);
1291 assert_eq!(v.sum_i64(), Some(15));
1292 }
1293
1294 #[test]
1295 fn test_column_vector_float64() {
1296 let v = ColumnVector::Float64(vec![1.0, 2.0, 3.0, 4.0]);
1297 assert_eq!(v.len(), 4);
1298 assert_eq!(v.sum_f64(), Some(10.0));
1299 }
1300
1301 #[test]
1302 fn test_vector_batch() {
1303 let mut batch = VectorBatch::with_capacity(1024);
1304 batch.add_column("id", ColumnVector::Int64(vec![1, 2, 3]));
1305 batch.add_column("value", ColumnVector::Float64(vec![1.5, 2.5, 3.5]));
1306
1307 assert_eq!(batch.row_count(), 3);
1308 assert_eq!(batch.column_count(), 2);
1309 assert!(batch.column("id").is_some());
1310 }
1311
1312 #[test]
1313 fn test_int64_comparison() {
1314 let col = ColumnVector::Int64(vec![1, 5, 10, 15, 20]);
1315 let pred = Int64Comparison::gt("test", 10);
1316 let result = pred.evaluate(&col);
1317
1318 assert_eq!(result, vec![false, false, false, true, true]);
1319 }
1320
1321 #[test]
1322 fn test_simd_sum_i64_large() {
1323 let values: Vec<i64> = (0..1000).collect();
1325 let expected: i64 = (0..1000).sum();
1326
1327 let col = ColumnVector::Int64(values);
1328 assert_eq!(col.sum_i64(), Some(expected));
1329 }
1330
1331 #[test]
1332 fn test_simd_compare_gt() {
1333 let values: Vec<i64> = vec![1, 5, 10, 15, 20, 25, 30, 35];
1334 let result = simd_compare_i64_gt(&values, 12);
1335 assert_eq!(
1336 result,
1337 vec![false, false, false, true, true, true, true, true]
1338 );
1339 }
1340
1341 #[test]
1342 fn test_vectorized_scan_config() {
1343 let config = VectorizedScanConfig::new()
1344 .with_batch_size(2048)
1345 .with_prefetch(true);
1346
1347 assert_eq!(config.batch_size, 2048);
1348 assert!(config.prefetch_enabled);
1349 }
1350
1351 #[test]
1352 fn test_simd_visibility_filter_basic() {
1353 let commit_ts = vec![0, 10, 20, 30, 40];
1355 let snapshot_ts = 25;
1356
1357 let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1358
1359 assert_eq!(result, vec![false, true, true, false, false]);
1366 }
1367
1368 #[test]
1369 fn test_simd_visibility_filter_with_txn() {
1370 let commit_ts = vec![0, 10, 0, 30, 40];
1371 let txn_ids = vec![1, 2, 1, 4, 5]; let snapshot_ts = 25;
1373 let current_txn_id = 1;
1374
1375 let mut result = vec![false; 5];
1376 SimdVisibilityFilter::filter_batch_with_txn(
1377 &commit_ts,
1378 &txn_ids,
1379 snapshot_ts,
1380 current_txn_id,
1381 &mut result,
1382 );
1383
1384 assert_eq!(result, vec![true, true, true, false, false]);
1391 }
1392
1393 #[test]
1394 fn test_simd_visibility_filter_large() {
1395 let n = 1000;
1397 let commit_ts: Vec<u64> = (1..=n as u64).collect();
1398 let snapshot_ts = 500;
1399
1400 let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1401
1402 let visible_count = result.iter().filter(|&&v| v).count();
1404 assert_eq!(visible_count, 499);
1405 }
1406
1407 #[test]
1408 fn test_versioned_slice_visibility() {
1409 let slice = VersionedSlice {
1410 key: b"test",
1411 value: Some(b"value"),
1412 commit_ts: 100,
1413 txn_id: 1,
1414 };
1415
1416 assert!(slice.is_visible(200, None));
1417 assert!(!slice.is_visible(50, None));
1418 assert!(slice.is_visible(50, Some(1))); }
1420
1421 #[test]
1422 fn test_streaming_scan_iterator() {
1423 let entries: Vec<VersionedSlice<'static>> = vec![
1424 VersionedSlice {
1425 key: b"a",
1426 value: Some(b"1"),
1427 commit_ts: 10,
1428 txn_id: 1,
1429 },
1430 VersionedSlice {
1431 key: b"b",
1432 value: Some(b"2"),
1433 commit_ts: 0,
1434 txn_id: 2,
1435 }, VersionedSlice {
1437 key: b"c",
1438 value: Some(b"3"),
1439 commit_ts: 30,
1440 txn_id: 3,
1441 }, VersionedSlice {
1443 key: b"d",
1444 value: Some(b"4"),
1445 commit_ts: 15,
1446 txn_id: 4,
1447 },
1448 ];
1449
1450 let iter = StreamingScanIterator::new(entries.into_iter(), 25, None);
1451 let visible: Vec<_> = iter.collect();
1452
1453 assert_eq!(visible.len(), 2);
1455 assert_eq!(visible[0].key, b"a");
1456 assert_eq!(visible[1].key, b"d");
1457 }
1458
1459 #[test]
1460 fn test_soa_batch_basic() {
1461 let mut batch = SoaBatch::with_capacity(100);
1462
1463 batch.push(b"key1", Some(b"value1"), 10, 1);
1464 batch.push(b"key2", Some(b"value2"), 20, 2);
1465 batch.push(b"key3", None, 30, 3); batch.push(b"key4", Some(b"value4"), 0, 4); assert_eq!(batch.len(), 4);
1469 assert_eq!(batch.commit_ts, vec![10, 20, 30, 0]);
1470 assert_eq!(batch.txn_ids, vec![1, 2, 3, 4]);
1471 }
1472
1473 #[test]
1474 fn test_soa_batch_visibility_filter() {
1475 let mut batch = SoaBatch::with_capacity(100);
1476
1477 batch.push(b"k1", Some(b"v1"), 10, 1); batch.push(b"k2", Some(b"v2"), 0, 2); batch.push(b"k3", Some(b"v3"), 20, 3); batch.push(b"k4", Some(b"v4"), 30, 4); batch.push(b"k5", Some(b"v5"), 0, 5); batch.filter_visibility(25, None);
1484
1485 assert_eq!(batch.visibility, vec![true, false, true, false, false]);
1486 assert_eq!(batch.selection, vec![0, 2]); assert_eq!(batch.visible_count(), 2);
1488 }
1489
1490 #[test]
1491 fn test_soa_batch_self_visibility() {
1492 let mut batch = SoaBatch::with_capacity(100);
1493
1494 batch.push(b"k1", Some(b"v1"), 0, 42); batch.push(b"k2", Some(b"v2"), 10, 1); batch.push(b"k3", Some(b"v3"), 0, 99); batch.filter_visibility(25, Some(42));
1499
1500 assert_eq!(batch.visibility, vec![true, true, false]);
1501 assert_eq!(batch.selection, vec![0, 1]);
1502 }
1503
1504 #[test]
1505 fn test_soa_batch_late_materialization() {
1506 let mut batch = SoaBatch::with_capacity(100);
1507
1508 batch.push(b"key1", Some(b"val1"), 10, 1);
1509 batch.push(b"key2", Some(b"val2"), 0, 2); batch.push(b"key3", Some(b"val3"), 15, 3);
1511
1512 batch.filter_visibility(25, None);
1513
1514 let visible: Vec<_> = batch.iter_visible().collect();
1516
1517 assert_eq!(visible.len(), 2);
1518 assert_eq!(visible[0], (b"key1".as_slice(), Some(b"val1".as_slice())));
1519 assert_eq!(visible[1], (b"key3".as_slice(), Some(b"val3".as_slice())));
1520 }
1521
1522 #[test]
1523 fn test_soa_scan_stats() {
1524 let mut batch = SoaBatch::with_capacity(100);
1525
1526 for i in 0..10u64 {
1528 let ts = if i < 3 { 10 } else { 0 }; batch.push(b"key", Some(b"val"), ts, i);
1530 }
1531
1532 batch.filter_visibility(25, None);
1533
1534 let selectivity = batch.visible_count() as f64 / batch.len() as f64;
1535 assert!((selectivity - 0.3).abs() < 0.01); }
1537
1538 #[test]
1539 fn test_soa_batch_simd_large() {
1540 let mut batch = SoaBatch::with_capacity(2000);
1542
1543 for i in 0..1000u64 {
1544 let ts = if i % 2 == 0 { 10 } else { 50 };
1546 batch.push(b"k", Some(b"v"), ts, i);
1547 }
1548
1549 batch.filter_visibility(25, None);
1550
1551 assert_eq!(batch.visible_count(), 500);
1553
1554 for (i, &idx) in batch.selection.iter().enumerate() {
1556 assert_eq!(idx, i * 2); }
1558 }
1559}