1use std::sync::atomic::{AtomicUsize, Ordering};
65
66pub const DEFAULT_BATCH_SIZE: usize = 1024;
69
70pub const MIN_BATCH_SIZE: usize = 64;
72
73pub const MAX_BATCH_SIZE: usize = 8192;
75
76#[derive(Debug, Clone)]
84pub enum ColumnVector {
85 Bool(Vec<bool>),
87 Int64(Vec<i64>),
89 UInt64(Vec<u64>),
91 Float64(Vec<f64>),
93 String {
95 offsets: Vec<u32>,
96 data: Vec<u8>,
97 },
98 Binary {
100 offsets: Vec<u32>,
101 data: Vec<u8>,
102 },
103 Null(Vec<u64>),
105}
106
107impl ColumnVector {
108 pub fn new_int64(capacity: usize) -> Self {
110 ColumnVector::Int64(Vec::with_capacity(capacity))
111 }
112
113 pub fn new_float64(capacity: usize) -> Self {
114 ColumnVector::Float64(Vec::with_capacity(capacity))
115 }
116
117 pub fn new_string(capacity: usize) -> Self {
118 ColumnVector::String {
119 offsets: Vec::with_capacity(capacity + 1),
120 data: Vec::with_capacity(capacity * 32), }
122 }
123
124 pub fn len(&self) -> usize {
126 match self {
127 ColumnVector::Bool(v) => v.len(),
128 ColumnVector::Int64(v) => v.len(),
129 ColumnVector::UInt64(v) => v.len(),
130 ColumnVector::Float64(v) => v.len(),
131 ColumnVector::String { offsets, .. } => offsets.len().saturating_sub(1),
132 ColumnVector::Binary { offsets, .. } => offsets.len().saturating_sub(1),
133 ColumnVector::Null(v) => v.len() * 64,
134 }
135 }
136
137 pub fn is_empty(&self) -> bool {
139 self.len() == 0
140 }
141
142 pub fn memory_size(&self) -> usize {
144 match self {
145 ColumnVector::Bool(v) => v.len(),
146 ColumnVector::Int64(v) => v.len() * 8,
147 ColumnVector::UInt64(v) => v.len() * 8,
148 ColumnVector::Float64(v) => v.len() * 8,
149 ColumnVector::String { offsets, data } => offsets.len() * 4 + data.len(),
150 ColumnVector::Binary { offsets, data } => offsets.len() * 4 + data.len(),
151 ColumnVector::Null(v) => v.len() * 8,
152 }
153 }
154
155 #[cfg(target_arch = "x86_64")]
157 pub fn sum_i64(&self) -> Option<i64> {
158 match self {
159 ColumnVector::Int64(values) => {
160 if values.is_empty() {
161 return Some(0);
162 }
163
164 if values.len() >= 16 {
166 Some(simd_sum_i64(values))
167 } else {
168 Some(values.iter().sum())
169 }
170 }
171 _ => None,
172 }
173 }
174
175 #[cfg(not(target_arch = "x86_64"))]
176 pub fn sum_i64(&self) -> Option<i64> {
177 match self {
178 ColumnVector::Int64(values) => Some(values.iter().sum()),
179 _ => None,
180 }
181 }
182
183 #[cfg(target_arch = "x86_64")]
185 pub fn sum_f64(&self) -> Option<f64> {
186 match self {
187 ColumnVector::Float64(values) => {
188 if values.is_empty() {
189 return Some(0.0);
190 }
191
192 if values.len() >= 8 {
193 Some(simd_sum_f64(values))
194 } else {
195 Some(values.iter().sum())
196 }
197 }
198 _ => None,
199 }
200 }
201
202 #[cfg(not(target_arch = "x86_64"))]
203 pub fn sum_f64(&self) -> Option<f64> {
204 match self {
205 ColumnVector::Float64(values) => Some(values.iter().sum()),
206 _ => None,
207 }
208 }
209}
210
211#[cfg(target_arch = "x86_64")]
217fn simd_sum_i64(values: &[i64]) -> i64 {
218 #[cfg(target_feature = "avx2")]
219 {
220 use std::arch::x86_64::*;
221
222 unsafe {
223 let mut sum = _mm256_setzero_si256();
224 let chunks = values.len() / 4;
225 let ptr = values.as_ptr();
226
227 for i in 0..chunks {
228 let v = _mm256_loadu_si256(ptr.add(i * 4) as *const __m256i);
229 sum = _mm256_add_epi64(sum, v);
230 }
231
232 let arr: [i64; 4] = std::mem::transmute(sum);
234 let simd_total: i64 = arr.iter().sum();
235
236 let remaining: i64 = values[chunks * 4..].iter().sum();
238 simd_total + remaining
239 }
240 }
241
242 #[cfg(not(target_feature = "avx2"))]
243 {
244 values.iter().sum()
245 }
246}
247
248#[cfg(target_arch = "x86_64")]
250fn simd_sum_f64(values: &[f64]) -> f64 {
251 #[cfg(target_feature = "avx")]
252 {
253 use std::arch::x86_64::*;
254
255 unsafe {
256 let mut sum = _mm256_setzero_pd();
257 let chunks = values.len() / 4;
258 let ptr = values.as_ptr();
259
260 for i in 0..chunks {
261 let v = _mm256_loadu_pd(ptr.add(i * 4));
262 sum = _mm256_add_pd(sum, v);
263 }
264
265 let arr: [f64; 4] = std::mem::transmute(sum);
267 let simd_total: f64 = arr.iter().sum();
268
269 let remaining: f64 = values[chunks * 4..].iter().sum();
271 simd_total + remaining
272 }
273 }
274
275 #[cfg(not(target_feature = "avx"))]
276 {
277 values.iter().sum()
278 }
279}
280
281#[derive(Debug)]
291pub struct VectorBatch {
292 columns: Vec<(String, ColumnVector)>,
294 row_count: usize,
296 capacity: usize,
298 selection: Option<Vec<usize>>,
300}
301
302impl VectorBatch {
303 pub fn with_capacity(capacity: usize) -> Self {
305 Self {
306 columns: Vec::new(),
307 row_count: 0,
308 capacity,
309 selection: None,
310 }
311 }
312
313 pub fn new() -> Self {
315 Self::with_capacity(DEFAULT_BATCH_SIZE)
316 }
317
318 pub fn capacity(&self) -> usize {
320 self.capacity
321 }
322
323 pub fn row_count(&self) -> usize {
325 if let Some(ref sel) = self.selection {
326 sel.len()
327 } else {
328 self.row_count
329 }
330 }
331
332 pub fn is_full(&self) -> bool {
334 self.row_count >= self.capacity
335 }
336
337 pub fn is_empty(&self) -> bool {
339 self.row_count == 0
340 }
341
342 pub fn add_column(&mut self, name: impl Into<String>, column: ColumnVector) {
344 self.columns.push((name.into(), column));
345 if self.row_count == 0 {
346 self.row_count = self.columns.last().map(|(_, c)| c.len()).unwrap_or(0);
347 }
348 }
349
350 pub fn column(&self, name: &str) -> Option<&ColumnVector> {
352 self.columns
353 .iter()
354 .find(|(n, _)| n == name)
355 .map(|(_, c)| c)
356 }
357
358 pub fn column_at(&self, idx: usize) -> Option<&ColumnVector> {
360 self.columns.get(idx).map(|(_, c)| c)
361 }
362
363 pub fn column_count(&self) -> usize {
365 self.columns.len()
366 }
367
368 pub fn set_selection(&mut self, selection: Vec<usize>) {
370 self.selection = Some(selection);
371 }
372
373 pub fn clear_selection(&mut self) {
375 self.selection = None;
376 }
377
378 pub fn memory_size(&self) -> usize {
380 self.columns.iter().map(|(_, c)| c.memory_size()).sum()
381 }
382
383 pub fn reset(&mut self) {
385 self.columns.clear();
386 self.row_count = 0;
387 self.selection = None;
388 }
389}
390
391impl Default for VectorBatch {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397#[derive(Debug, Default)]
403pub struct VectorizedScanStats {
404 pub rows_scanned: AtomicUsize,
406 pub batches_processed: AtomicUsize,
408 pub rows_passed: AtomicUsize,
410 pub bytes_read: AtomicUsize,
412}
413
414impl VectorizedScanStats {
415 pub fn new() -> Self {
416 Self::default()
417 }
418
419 pub fn record_batch(&self, rows: usize, passed: usize, bytes: usize) {
420 self.rows_scanned.fetch_add(rows, Ordering::Relaxed);
421 self.batches_processed.fetch_add(1, Ordering::Relaxed);
422 self.rows_passed.fetch_add(passed, Ordering::Relaxed);
423 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
424 }
425
426 pub fn rows_scanned(&self) -> usize {
427 self.rows_scanned.load(Ordering::Relaxed)
428 }
429
430 pub fn batches_processed(&self) -> usize {
431 self.batches_processed.load(Ordering::Relaxed)
432 }
433}
434
435pub trait VectorPredicate: Send + Sync {
437 fn evaluate(&self, column: &ColumnVector) -> Vec<bool>;
439
440 fn column_name(&self) -> &str;
442}
443
444#[derive(Debug, Clone)]
446pub struct Int64Comparison {
447 column_name: String,
448 op: ComparisonOp,
449 value: i64,
450}
451
452#[derive(Debug, Clone, Copy)]
454pub enum ComparisonOp {
455 Equal,
456 NotEqual,
457 LessThan,
458 LessEqual,
459 GreaterThan,
460 GreaterEqual,
461}
462
463impl Int64Comparison {
464 pub fn new(column_name: impl Into<String>, op: ComparisonOp, value: i64) -> Self {
465 Self {
466 column_name: column_name.into(),
467 op,
468 value,
469 }
470 }
471
472 pub fn eq(column_name: impl Into<String>, value: i64) -> Self {
473 Self::new(column_name, ComparisonOp::Equal, value)
474 }
475
476 pub fn gt(column_name: impl Into<String>, value: i64) -> Self {
477 Self::new(column_name, ComparisonOp::GreaterThan, value)
478 }
479
480 pub fn lt(column_name: impl Into<String>, value: i64) -> Self {
481 Self::new(column_name, ComparisonOp::LessThan, value)
482 }
483}
484
485impl VectorPredicate for Int64Comparison {
486 fn evaluate(&self, column: &ColumnVector) -> Vec<bool> {
487 match column {
488 ColumnVector::Int64(values) => {
489 let cmp_value = self.value;
490 match self.op {
491 ComparisonOp::Equal => values.iter().map(|&v| v == cmp_value).collect(),
492 ComparisonOp::NotEqual => values.iter().map(|&v| v != cmp_value).collect(),
493 ComparisonOp::LessThan => values.iter().map(|&v| v < cmp_value).collect(),
494 ComparisonOp::LessEqual => values.iter().map(|&v| v <= cmp_value).collect(),
495 ComparisonOp::GreaterThan => values.iter().map(|&v| v > cmp_value).collect(),
496 ComparisonOp::GreaterEqual => values.iter().map(|&v| v >= cmp_value).collect(),
497 }
498 }
499 _ => vec![false; column.len()],
500 }
501 }
502
503 fn column_name(&self) -> &str {
504 &self.column_name
505 }
506}
507
508#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
510pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
511 use std::arch::x86_64::*;
512
513 let mut result = vec![false; values.len()];
514 let chunks = values.len() / 4;
515
516 unsafe {
517 let threshold_vec = _mm256_set1_epi64x(threshold);
518
519 for i in 0..chunks {
520 let v = _mm256_loadu_si256(values.as_ptr().add(i * 4) as *const __m256i);
521 let cmp = _mm256_cmpgt_epi64(v, threshold_vec);
522 let mask = _mm256_movemask_epi8(cmp) as u32;
523
524 result[i * 4] = (mask & 0xFF) != 0;
526 result[i * 4 + 1] = (mask & 0xFF00) != 0;
527 result[i * 4 + 2] = (mask & 0xFF0000) != 0;
528 result[i * 4 + 3] = (mask & 0xFF000000) != 0;
529 }
530
531 for i in (chunks * 4)..values.len() {
533 result[i] = values[i] > threshold;
534 }
535 }
536
537 result
538}
539
540#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
541pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
542 values.iter().map(|&v| v > threshold).collect()
543}
544
545#[derive(Debug, Clone)]
551pub struct VectorizedScanConfig {
552 pub batch_size: usize,
554 pub prefetch_enabled: bool,
556 pub prefetch_distance: usize,
558 pub simd_enabled: bool,
560}
561
562impl Default for VectorizedScanConfig {
563 fn default() -> Self {
564 Self {
565 batch_size: DEFAULT_BATCH_SIZE,
566 prefetch_enabled: true,
567 prefetch_distance: 16,
568 simd_enabled: true,
569 }
570 }
571}
572
573impl VectorizedScanConfig {
574 pub fn new() -> Self {
575 Self::default()
576 }
577
578 pub fn with_batch_size(mut self, size: usize) -> Self {
579 self.batch_size = size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
580 self
581 }
582
583 pub fn with_prefetch(mut self, enabled: bool) -> Self {
584 self.prefetch_enabled = enabled;
585 self
586 }
587}
588
589pub struct SimdVisibilityFilter;
612
613impl SimdVisibilityFilter {
614 #[inline]
618 pub fn filter_batch(commit_ts: &[u64], snapshot_ts: u64) -> Vec<bool> {
619 let mut result = vec![false; commit_ts.len()];
620 Self::filter_batch_into(commit_ts, snapshot_ts, &mut result);
621 result
622 }
623
624 #[inline]
626 pub fn filter_batch_into(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
627 assert_eq!(commit_ts.len(), out.len());
628
629 #[cfg(target_arch = "x86_64")]
630 {
631 Self::filter_batch_simd_x86(commit_ts, snapshot_ts, out);
632 return;
633 }
634
635 #[cfg(target_arch = "aarch64")]
636 {
637 Self::filter_batch_simd_neon(commit_ts, snapshot_ts, out);
638 return;
639 }
640
641 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
642 {
643 Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
644 }
645 }
646
647 #[inline]
649 fn filter_batch_scalar(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
650 for (i, &ts) in commit_ts.iter().enumerate() {
651 out[i] = ts != 0 && ts < snapshot_ts;
653 }
654 }
655
656 #[cfg(target_arch = "x86_64")]
658 fn filter_batch_simd_x86(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
659 let n = commit_ts.len();
660 if n == 0 {
661 return;
662 }
663
664 let chunks = n / 4;
666 let remainder = n % 4;
667
668 #[cfg(target_feature = "avx2")]
672 unsafe {
673 use std::arch::x86_64::*;
674
675 let zero = _mm256_setzero_si256();
676 let snapshot_vec = _mm256_set1_epi64x(snapshot_ts as i64);
677
678 for chunk in 0..chunks {
679 let ptr = commit_ts.as_ptr().add(chunk * 4) as *const __m256i;
680 let ts_vec = _mm256_loadu_si256(ptr);
681
682 let not_zero = _mm256_xor_si256(
684 _mm256_cmpeq_epi64(ts_vec, zero),
685 _mm256_set1_epi64x(-1), );
687
688 let less_than = _mm256_xor_si256(
691 _mm256_or_si256(
692 _mm256_cmpgt_epi64(ts_vec, snapshot_vec),
693 _mm256_cmpeq_epi64(ts_vec, snapshot_vec),
694 ),
695 _mm256_set1_epi64x(-1),
696 );
697
698 let visible = _mm256_and_si256(not_zero, less_than);
700
701 let mask: [i64; 4] = std::mem::transmute(visible);
703 for j in 0..4 {
704 out[chunk * 4 + j] = mask[j] != 0;
705 }
706 }
707 }
708
709 #[cfg(not(target_feature = "avx2"))]
710 {
711 let chunks = n / 2;
713 for chunk in 0..chunks {
714 let base = chunk * 2;
715 for j in 0..2 {
716 let ts = commit_ts[base + j];
717 out[base + j] = ts != 0 && ts < snapshot_ts;
718 }
719 }
720 }
721
722 let base = chunks * 4;
724 for i in 0..remainder {
725 let ts = commit_ts[base + i];
726 out[base + i] = ts != 0 && ts < snapshot_ts;
727 }
728 }
729
730 #[cfg(target_arch = "aarch64")]
732 fn filter_batch_simd_neon(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
733 Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
736 }
737
738 #[inline]
744 pub fn filter_batch_with_txn(
745 commit_ts: &[u64],
746 txn_ids: &[u64],
747 snapshot_ts: u64,
748 current_txn_id: u64,
749 out: &mut [bool],
750 ) {
751 assert_eq!(commit_ts.len(), txn_ids.len());
752 assert_eq!(commit_ts.len(), out.len());
753
754 Self::filter_batch_into(commit_ts, snapshot_ts, out);
756
757 for (i, &txn_id) in txn_ids.iter().enumerate() {
759 if txn_id == current_txn_id {
760 out[i] = true;
761 }
762 }
763 }
764
765 #[inline]
767 pub fn count_visible(commit_ts: &[u64], snapshot_ts: u64) -> usize {
768 let mut count = 0;
769 for &ts in commit_ts {
770 if ts != 0 && ts < snapshot_ts {
771 count += 1;
772 }
773 }
774 count
775 }
776}
777
778#[derive(Debug, Clone)]
783pub struct VersionedSlice<'a> {
784 pub key: &'a [u8],
786 pub value: Option<&'a [u8]>,
788 pub commit_ts: u64,
790 pub txn_id: u64,
792}
793
794impl<'a> VersionedSlice<'a> {
795 #[inline]
797 pub fn is_visible(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
798 if let Some(my_txn) = current_txn_id {
800 if self.txn_id == my_txn {
801 return true;
802 }
803 }
804 self.commit_ts != 0 && self.commit_ts < snapshot_ts
806 }
807}
808
809pub struct StreamingScanIterator<'a, I>
820where
821 I: Iterator<Item = VersionedSlice<'a>>,
822{
823 source: I,
825 batch: Vec<VersionedSlice<'a>>,
827 visibility: Vec<bool>,
829 pos: usize,
831 snapshot_ts: u64,
833 current_txn_id: Option<u64>,
835 batch_size: usize,
837}
838
839impl<'a, I> StreamingScanIterator<'a, I>
840where
841 I: Iterator<Item = VersionedSlice<'a>>,
842{
843 pub fn new(source: I, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
845 Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
846 }
847
848 pub fn with_batch_size(
850 source: I,
851 snapshot_ts: u64,
852 current_txn_id: Option<u64>,
853 batch_size: usize,
854 ) -> Self {
855 Self {
856 source,
857 batch: Vec::with_capacity(batch_size),
858 visibility: Vec::with_capacity(batch_size),
859 pos: 0,
860 snapshot_ts,
861 current_txn_id,
862 batch_size,
863 }
864 }
865
866 fn fetch_batch(&mut self) -> bool {
868 self.batch.clear();
869 self.visibility.clear();
870 self.pos = 0;
871
872 for entry in self.source.by_ref().take(self.batch_size) {
874 self.batch.push(entry);
875 }
876
877 if self.batch.is_empty() {
878 return false;
879 }
880
881 let commit_ts: Vec<u64> = self.batch.iter().map(|e| e.commit_ts).collect();
883 self.visibility.resize(self.batch.len(), false);
884
885 if let Some(txn_id) = self.current_txn_id {
886 let txn_ids: Vec<u64> = self.batch.iter().map(|e| e.txn_id).collect();
887 SimdVisibilityFilter::filter_batch_with_txn(
888 &commit_ts,
889 &txn_ids,
890 self.snapshot_ts,
891 txn_id,
892 &mut self.visibility,
893 );
894 } else {
895 SimdVisibilityFilter::filter_batch_into(&commit_ts, self.snapshot_ts, &mut self.visibility);
896 }
897
898 true
899 }
900}
901
902impl<'a, I> Iterator for StreamingScanIterator<'a, I>
903where
904 I: Iterator<Item = VersionedSlice<'a>>,
905{
906 type Item = VersionedSlice<'a>;
907
908 fn next(&mut self) -> Option<Self::Item> {
909 loop {
910 while self.pos >= self.batch.len() {
912 if !self.fetch_batch() {
913 return None;
914 }
915 }
916
917 while self.pos < self.batch.len() {
919 let idx = self.pos;
920 self.pos += 1;
921
922 if self.visibility[idx] {
923 return Some(self.batch[idx].clone());
924 }
925 }
926 }
927 }
928}
929
930#[derive(Debug)]
962pub struct SoaBatch<'a> {
963 pub commit_ts: Vec<u64>,
965 pub txn_ids: Vec<u64>,
967 pub keys: Vec<&'a [u8]>,
969 pub value_handles: Vec<Option<ValueHandle<'a>>>,
972 pub visibility: Vec<bool>,
974 pub selection: Vec<usize>,
976}
977
978#[derive(Debug, Clone, Copy)]
983pub enum ValueHandle<'a> {
984 Direct(&'a [u8]),
986 BlockOffset { block_id: u32, offset: u32, len: u32 },
988 ArenaSlot { arena_id: u32, slot: u32 },
990}
991
992impl<'a> ValueHandle<'a> {
993 pub fn materialize(&self) -> Option<&'a [u8]> {
995 match self {
996 ValueHandle::Direct(data) => Some(*data),
997 ValueHandle::BlockOffset { .. } => None,
1000 ValueHandle::ArenaSlot { .. } => None,
1001 }
1002 }
1003}
1004
1005impl<'a> SoaBatch<'a> {
1006 pub fn with_capacity(capacity: usize) -> Self {
1008 Self {
1009 commit_ts: Vec::with_capacity(capacity),
1010 txn_ids: Vec::with_capacity(capacity),
1011 keys: Vec::with_capacity(capacity),
1012 value_handles: Vec::with_capacity(capacity),
1013 visibility: Vec::with_capacity(capacity),
1014 selection: Vec::with_capacity(capacity),
1015 }
1016 }
1017
1018 #[inline]
1020 pub fn push(&mut self, key: &'a [u8], value: Option<&'a [u8]>, commit_ts: u64, txn_id: u64) {
1021 self.commit_ts.push(commit_ts);
1022 self.txn_ids.push(txn_id);
1023 self.keys.push(key);
1024 self.value_handles.push(value.map(ValueHandle::Direct));
1025 }
1026
1027 #[inline]
1029 pub fn push_deferred(
1030 &mut self,
1031 key: &'a [u8],
1032 handle: Option<ValueHandle<'a>>,
1033 commit_ts: u64,
1034 txn_id: u64,
1035 ) {
1036 self.commit_ts.push(commit_ts);
1037 self.txn_ids.push(txn_id);
1038 self.keys.push(key);
1039 self.value_handles.push(handle);
1040 }
1041
1042 pub fn len(&self) -> usize {
1044 self.commit_ts.len()
1045 }
1046
1047 pub fn is_empty(&self) -> bool {
1049 self.commit_ts.is_empty()
1050 }
1051
1052 pub fn clear(&mut self) {
1054 self.commit_ts.clear();
1055 self.txn_ids.clear();
1056 self.keys.clear();
1057 self.value_handles.clear();
1058 self.visibility.clear();
1059 self.selection.clear();
1060 }
1061
1062 pub fn filter_visibility(&mut self, snapshot_ts: u64, current_txn_id: Option<u64>) {
1067 let n = self.len();
1068 self.visibility.resize(n, false);
1069 self.selection.clear();
1070
1071 if let Some(txn_id) = current_txn_id {
1073 SimdVisibilityFilter::filter_batch_with_txn(
1074 &self.commit_ts,
1075 &self.txn_ids,
1076 snapshot_ts,
1077 txn_id,
1078 &mut self.visibility,
1079 );
1080 } else {
1081 SimdVisibilityFilter::filter_batch_into(&self.commit_ts, snapshot_ts, &mut self.visibility);
1082 }
1083
1084 for (i, &visible) in self.visibility.iter().enumerate() {
1086 if visible {
1087 self.selection.push(i);
1088 }
1089 }
1090 }
1091
1092 pub fn visible_count(&self) -> usize {
1094 self.selection.len()
1095 }
1096
1097 pub fn iter_visible(&self) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)> + '_ {
1101 self.selection.iter().map(move |&idx| {
1102 let key = self.keys[idx];
1103 let value = self.value_handles[idx].and_then(|h| h.materialize());
1104 (key, value)
1105 })
1106 }
1107
1108 pub fn iter_visible_full(
1110 &self,
1111 ) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>, u64, u64)> + '_ {
1112 self.selection.iter().map(move |&idx| {
1113 let key = self.keys[idx];
1114 let value = self.value_handles[idx].and_then(|h| h.materialize());
1115 let ts = self.commit_ts[idx];
1116 let txn = self.txn_ids[idx];
1117 (key, value, ts, txn)
1118 })
1119 }
1120}
1121
1122pub struct SoaScanIterator<'a, S>
1138where
1139 S: SoaSource<'a>,
1140{
1141 source: S,
1143 batch: SoaBatch<'a>,
1145 pos: usize,
1147 snapshot_ts: u64,
1149 current_txn_id: Option<u64>,
1151 #[allow(dead_code)]
1153 batch_size: usize,
1154 stats: SoaScanStats,
1156}
1157
1158#[derive(Debug, Default, Clone)]
1160pub struct SoaScanStats {
1161 pub rows_scanned: usize,
1163 pub rows_visible: usize,
1165 pub values_materialized: usize,
1167 pub batches_processed: usize,
1169}
1170
1171impl SoaScanStats {
1172 pub fn selectivity(&self) -> f64 {
1174 if self.rows_scanned == 0 {
1175 0.0
1176 } else {
1177 self.rows_visible as f64 / self.rows_scanned as f64
1178 }
1179 }
1180
1181 pub fn materialization_efficiency(&self) -> f64 {
1184 if self.rows_visible == 0 {
1185 1.0
1186 } else {
1187 self.values_materialized as f64 / self.rows_visible as f64
1188 }
1189 }
1190}
1191
1192pub trait SoaSource<'a> {
1194 fn fill_batch(&mut self, batch: &mut SoaBatch<'a>) -> bool;
1197}
1198
1199impl<'a, S> SoaScanIterator<'a, S>
1200where
1201 S: SoaSource<'a>,
1202{
1203 pub fn new(source: S, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
1205 Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
1206 }
1207
1208 pub fn with_batch_size(
1210 source: S,
1211 snapshot_ts: u64,
1212 current_txn_id: Option<u64>,
1213 batch_size: usize,
1214 ) -> Self {
1215 Self {
1216 source,
1217 batch: SoaBatch::with_capacity(batch_size),
1218 pos: 0,
1219 snapshot_ts,
1220 current_txn_id,
1221 batch_size,
1222 stats: SoaScanStats::default(),
1223 }
1224 }
1225
1226 fn fetch_batch(&mut self) -> bool {
1228 self.batch.clear();
1229 self.pos = 0;
1230
1231 if !self.source.fill_batch(&mut self.batch) {
1233 return false;
1234 }
1235
1236 self.stats.rows_scanned += self.batch.len();
1237 self.stats.batches_processed += 1;
1238
1239 self.batch.filter_visibility(self.snapshot_ts, self.current_txn_id);
1241 self.stats.rows_visible += self.batch.visible_count();
1242
1243 true
1244 }
1245
1246 pub fn stats(&self) -> &SoaScanStats {
1248 &self.stats
1249 }
1250}
1251
1252impl<'a, S> Iterator for SoaScanIterator<'a, S>
1253where
1254 S: SoaSource<'a>,
1255{
1256 type Item = (&'a [u8], Option<&'a [u8]>);
1257
1258 fn next(&mut self) -> Option<Self::Item> {
1259 loop {
1260 while self.pos >= self.batch.selection.len() {
1262 if !self.fetch_batch() {
1263 return None;
1264 }
1265 }
1266
1267 let sel_idx = self.pos;
1269 self.pos += 1;
1270 let row_idx = self.batch.selection[sel_idx];
1271
1272 let key = self.batch.keys[row_idx];
1273 let value = self.batch.value_handles[row_idx].and_then(|h| h.materialize());
1274 self.stats.values_materialized += 1;
1275
1276 return Some((key, value));
1277 }
1278 }
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283 use super::*;
1284
1285 #[test]
1286 fn test_column_vector_int64() {
1287 let mut v = ColumnVector::Int64(vec![1, 2, 3, 4, 5]);
1288 assert_eq!(v.len(), 5);
1289 assert_eq!(v.sum_i64(), Some(15));
1290 }
1291
1292 #[test]
1293 fn test_column_vector_float64() {
1294 let v = ColumnVector::Float64(vec![1.0, 2.0, 3.0, 4.0]);
1295 assert_eq!(v.len(), 4);
1296 assert_eq!(v.sum_f64(), Some(10.0));
1297 }
1298
1299 #[test]
1300 fn test_vector_batch() {
1301 let mut batch = VectorBatch::with_capacity(1024);
1302 batch.add_column("id", ColumnVector::Int64(vec![1, 2, 3]));
1303 batch.add_column("value", ColumnVector::Float64(vec![1.5, 2.5, 3.5]));
1304
1305 assert_eq!(batch.row_count(), 3);
1306 assert_eq!(batch.column_count(), 2);
1307 assert!(batch.column("id").is_some());
1308 }
1309
1310 #[test]
1311 fn test_int64_comparison() {
1312 let col = ColumnVector::Int64(vec![1, 5, 10, 15, 20]);
1313 let pred = Int64Comparison::gt("test", 10);
1314 let result = pred.evaluate(&col);
1315
1316 assert_eq!(result, vec![false, false, false, true, true]);
1317 }
1318
1319 #[test]
1320 fn test_simd_sum_i64_large() {
1321 let values: Vec<i64> = (0..1000).collect();
1323 let expected: i64 = (0..1000).sum();
1324
1325 let col = ColumnVector::Int64(values);
1326 assert_eq!(col.sum_i64(), Some(expected));
1327 }
1328
1329 #[test]
1330 fn test_simd_compare_gt() {
1331 let values: Vec<i64> = vec![1, 5, 10, 15, 20, 25, 30, 35];
1332 let result = simd_compare_i64_gt(&values, 12);
1333 assert_eq!(result, vec![false, false, false, true, true, true, true, true]);
1334 }
1335
1336 #[test]
1337 fn test_vectorized_scan_config() {
1338 let config = VectorizedScanConfig::new()
1339 .with_batch_size(2048)
1340 .with_prefetch(true);
1341
1342 assert_eq!(config.batch_size, 2048);
1343 assert!(config.prefetch_enabled);
1344 }
1345
1346 #[test]
1347 fn test_simd_visibility_filter_basic() {
1348 let commit_ts = vec![0, 10, 20, 30, 40];
1350 let snapshot_ts = 25;
1351
1352 let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1353
1354 assert_eq!(result, vec![false, true, true, false, false]);
1361 }
1362
1363 #[test]
1364 fn test_simd_visibility_filter_with_txn() {
1365 let commit_ts = vec![0, 10, 0, 30, 40];
1366 let txn_ids = vec![1, 2, 1, 4, 5]; let snapshot_ts = 25;
1368 let current_txn_id = 1;
1369
1370 let mut result = vec![false; 5];
1371 SimdVisibilityFilter::filter_batch_with_txn(
1372 &commit_ts,
1373 &txn_ids,
1374 snapshot_ts,
1375 current_txn_id,
1376 &mut result,
1377 );
1378
1379 assert_eq!(result, vec![true, true, true, false, false]);
1386 }
1387
1388 #[test]
1389 fn test_simd_visibility_filter_large() {
1390 let n = 1000;
1392 let commit_ts: Vec<u64> = (1..=n as u64).collect();
1393 let snapshot_ts = 500;
1394
1395 let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1396
1397 let visible_count = result.iter().filter(|&&v| v).count();
1399 assert_eq!(visible_count, 499);
1400 }
1401
1402 #[test]
1403 fn test_versioned_slice_visibility() {
1404 let slice = VersionedSlice {
1405 key: b"test",
1406 value: Some(b"value"),
1407 commit_ts: 100,
1408 txn_id: 1,
1409 };
1410
1411 assert!(slice.is_visible(200, None));
1412 assert!(!slice.is_visible(50, None));
1413 assert!(slice.is_visible(50, Some(1))); }
1415
1416 #[test]
1417 fn test_streaming_scan_iterator() {
1418 let entries: Vec<VersionedSlice<'static>> = vec![
1419 VersionedSlice { key: b"a", value: Some(b"1"), commit_ts: 10, txn_id: 1 },
1420 VersionedSlice { key: b"b", value: Some(b"2"), commit_ts: 0, txn_id: 2 }, VersionedSlice { key: b"c", value: Some(b"3"), commit_ts: 30, txn_id: 3 }, VersionedSlice { key: b"d", value: Some(b"4"), commit_ts: 15, txn_id: 4 },
1423 ];
1424
1425 let iter = StreamingScanIterator::new(entries.into_iter(), 25, None);
1426 let visible: Vec<_> = iter.collect();
1427
1428 assert_eq!(visible.len(), 2);
1430 assert_eq!(visible[0].key, b"a");
1431 assert_eq!(visible[1].key, b"d");
1432 }
1433
1434 #[test]
1435 fn test_soa_batch_basic() {
1436 let mut batch = SoaBatch::with_capacity(100);
1437
1438 batch.push(b"key1", Some(b"value1"), 10, 1);
1439 batch.push(b"key2", Some(b"value2"), 20, 2);
1440 batch.push(b"key3", None, 30, 3); batch.push(b"key4", Some(b"value4"), 0, 4); assert_eq!(batch.len(), 4);
1444 assert_eq!(batch.commit_ts, vec![10, 20, 30, 0]);
1445 assert_eq!(batch.txn_ids, vec![1, 2, 3, 4]);
1446 }
1447
1448 #[test]
1449 fn test_soa_batch_visibility_filter() {
1450 let mut batch = SoaBatch::with_capacity(100);
1451
1452 batch.push(b"k1", Some(b"v1"), 10, 1); batch.push(b"k2", Some(b"v2"), 0, 2); batch.push(b"k3", Some(b"v3"), 20, 3); batch.push(b"k4", Some(b"v4"), 30, 4); batch.push(b"k5", Some(b"v5"), 0, 5); batch.filter_visibility(25, None);
1459
1460 assert_eq!(batch.visibility, vec![true, false, true, false, false]);
1461 assert_eq!(batch.selection, vec![0, 2]); assert_eq!(batch.visible_count(), 2);
1463 }
1464
1465 #[test]
1466 fn test_soa_batch_self_visibility() {
1467 let mut batch = SoaBatch::with_capacity(100);
1468
1469 batch.push(b"k1", Some(b"v1"), 0, 42); batch.push(b"k2", Some(b"v2"), 10, 1); batch.push(b"k3", Some(b"v3"), 0, 99); batch.filter_visibility(25, Some(42));
1474
1475 assert_eq!(batch.visibility, vec![true, true, false]);
1476 assert_eq!(batch.selection, vec![0, 1]);
1477 }
1478
1479 #[test]
1480 fn test_soa_batch_late_materialization() {
1481 let mut batch = SoaBatch::with_capacity(100);
1482
1483 batch.push(b"key1", Some(b"val1"), 10, 1);
1484 batch.push(b"key2", Some(b"val2"), 0, 2); batch.push(b"key3", Some(b"val3"), 15, 3);
1486
1487 batch.filter_visibility(25, None);
1488
1489 let visible: Vec<_> = batch.iter_visible().collect();
1491
1492 assert_eq!(visible.len(), 2);
1493 assert_eq!(visible[0], (b"key1".as_slice(), Some(b"val1".as_slice())));
1494 assert_eq!(visible[1], (b"key3".as_slice(), Some(b"val3".as_slice())));
1495 }
1496
1497 #[test]
1498 fn test_soa_scan_stats() {
1499 let mut batch = SoaBatch::with_capacity(100);
1500
1501 for i in 0..10u64 {
1503 let ts = if i < 3 { 10 } else { 0 }; batch.push(b"key", Some(b"val"), ts, i);
1505 }
1506
1507 batch.filter_visibility(25, None);
1508
1509 let selectivity = batch.visible_count() as f64 / batch.len() as f64;
1510 assert!((selectivity - 0.3).abs() < 0.01); }
1512
1513 #[test]
1514 fn test_soa_batch_simd_large() {
1515 let mut batch = SoaBatch::with_capacity(2000);
1517
1518 for i in 0..1000u64 {
1519 let ts = if i % 2 == 0 { 10 } else { 50 };
1521 batch.push(b"k", Some(b"v"), ts, i);
1522 }
1523
1524 batch.filter_visibility(25, None);
1525
1526 assert_eq!(batch.visible_count(), 500);
1528
1529 for (i, &idx) in batch.selection.iter().enumerate() {
1531 assert_eq!(idx, i * 2); }
1533 }
1534}