1use std::sync::atomic::{AtomicUsize, Ordering};
62
63pub const DEFAULT_BATCH_SIZE: usize = 1024;
66
67pub const MIN_BATCH_SIZE: usize = 64;
69
70pub const MAX_BATCH_SIZE: usize = 8192;
72
73#[derive(Debug, Clone)]
81pub enum ColumnVector {
82 Bool(Vec<bool>),
84 Int64(Vec<i64>),
86 UInt64(Vec<u64>),
88 Float64(Vec<f64>),
90 String {
92 offsets: Vec<u32>,
93 data: Vec<u8>,
94 },
95 Binary {
97 offsets: Vec<u32>,
98 data: Vec<u8>,
99 },
100 Null(Vec<u64>),
102}
103
104impl ColumnVector {
105 pub fn new_int64(capacity: usize) -> Self {
107 ColumnVector::Int64(Vec::with_capacity(capacity))
108 }
109
110 pub fn new_float64(capacity: usize) -> Self {
111 ColumnVector::Float64(Vec::with_capacity(capacity))
112 }
113
114 pub fn new_string(capacity: usize) -> Self {
115 ColumnVector::String {
116 offsets: Vec::with_capacity(capacity + 1),
117 data: Vec::with_capacity(capacity * 32), }
119 }
120
121 pub fn len(&self) -> usize {
123 match self {
124 ColumnVector::Bool(v) => v.len(),
125 ColumnVector::Int64(v) => v.len(),
126 ColumnVector::UInt64(v) => v.len(),
127 ColumnVector::Float64(v) => v.len(),
128 ColumnVector::String { offsets, .. } => offsets.len().saturating_sub(1),
129 ColumnVector::Binary { offsets, .. } => offsets.len().saturating_sub(1),
130 ColumnVector::Null(v) => v.len() * 64,
131 }
132 }
133
134 pub fn is_empty(&self) -> bool {
136 self.len() == 0
137 }
138
139 pub fn memory_size(&self) -> usize {
141 match self {
142 ColumnVector::Bool(v) => v.len(),
143 ColumnVector::Int64(v) => v.len() * 8,
144 ColumnVector::UInt64(v) => v.len() * 8,
145 ColumnVector::Float64(v) => v.len() * 8,
146 ColumnVector::String { offsets, data } => offsets.len() * 4 + data.len(),
147 ColumnVector::Binary { offsets, data } => offsets.len() * 4 + data.len(),
148 ColumnVector::Null(v) => v.len() * 8,
149 }
150 }
151
152 #[cfg(target_arch = "x86_64")]
154 pub fn sum_i64(&self) -> Option<i64> {
155 match self {
156 ColumnVector::Int64(values) => {
157 if values.is_empty() {
158 return Some(0);
159 }
160
161 if values.len() >= 16 {
163 Some(simd_sum_i64(values))
164 } else {
165 Some(values.iter().sum())
166 }
167 }
168 _ => None,
169 }
170 }
171
172 #[cfg(not(target_arch = "x86_64"))]
173 pub fn sum_i64(&self) -> Option<i64> {
174 match self {
175 ColumnVector::Int64(values) => Some(values.iter().sum()),
176 _ => None,
177 }
178 }
179
180 #[cfg(target_arch = "x86_64")]
182 pub fn sum_f64(&self) -> Option<f64> {
183 match self {
184 ColumnVector::Float64(values) => {
185 if values.is_empty() {
186 return Some(0.0);
187 }
188
189 if values.len() >= 8 {
190 Some(simd_sum_f64(values))
191 } else {
192 Some(values.iter().sum())
193 }
194 }
195 _ => None,
196 }
197 }
198
199 #[cfg(not(target_arch = "x86_64"))]
200 pub fn sum_f64(&self) -> Option<f64> {
201 match self {
202 ColumnVector::Float64(values) => Some(values.iter().sum()),
203 _ => None,
204 }
205 }
206}
207
208#[cfg(target_arch = "x86_64")]
214fn simd_sum_i64(values: &[i64]) -> i64 {
215 #[cfg(target_feature = "avx2")]
216 {
217 use std::arch::x86_64::*;
218
219 unsafe {
220 let mut sum = _mm256_setzero_si256();
221 let chunks = values.len() / 4;
222 let ptr = values.as_ptr();
223
224 for i in 0..chunks {
225 let v = _mm256_loadu_si256(ptr.add(i * 4) as *const __m256i);
226 sum = _mm256_add_epi64(sum, v);
227 }
228
229 let arr: [i64; 4] = std::mem::transmute(sum);
231 let simd_total: i64 = arr.iter().sum();
232
233 let remaining: i64 = values[chunks * 4..].iter().sum();
235 simd_total + remaining
236 }
237 }
238
239 #[cfg(not(target_feature = "avx2"))]
240 {
241 values.iter().sum()
242 }
243}
244
245#[cfg(target_arch = "x86_64")]
247fn simd_sum_f64(values: &[f64]) -> f64 {
248 #[cfg(target_feature = "avx")]
249 {
250 use std::arch::x86_64::*;
251
252 unsafe {
253 let mut sum = _mm256_setzero_pd();
254 let chunks = values.len() / 4;
255 let ptr = values.as_ptr();
256
257 for i in 0..chunks {
258 let v = _mm256_loadu_pd(ptr.add(i * 4));
259 sum = _mm256_add_pd(sum, v);
260 }
261
262 let arr: [f64; 4] = std::mem::transmute(sum);
264 let simd_total: f64 = arr.iter().sum();
265
266 let remaining: f64 = values[chunks * 4..].iter().sum();
268 simd_total + remaining
269 }
270 }
271
272 #[cfg(not(target_feature = "avx"))]
273 {
274 values.iter().sum()
275 }
276}
277
278#[derive(Debug)]
288pub struct VectorBatch {
289 columns: Vec<(String, ColumnVector)>,
291 row_count: usize,
293 capacity: usize,
295 selection: Option<Vec<usize>>,
297}
298
299impl VectorBatch {
300 pub fn with_capacity(capacity: usize) -> Self {
302 Self {
303 columns: Vec::new(),
304 row_count: 0,
305 capacity,
306 selection: None,
307 }
308 }
309
310 pub fn new() -> Self {
312 Self::with_capacity(DEFAULT_BATCH_SIZE)
313 }
314
315 pub fn capacity(&self) -> usize {
317 self.capacity
318 }
319
320 pub fn row_count(&self) -> usize {
322 if let Some(ref sel) = self.selection {
323 sel.len()
324 } else {
325 self.row_count
326 }
327 }
328
329 pub fn is_full(&self) -> bool {
331 self.row_count >= self.capacity
332 }
333
334 pub fn is_empty(&self) -> bool {
336 self.row_count == 0
337 }
338
339 pub fn add_column(&mut self, name: impl Into<String>, column: ColumnVector) {
341 self.columns.push((name.into(), column));
342 if self.row_count == 0 {
343 self.row_count = self.columns.last().map(|(_, c)| c.len()).unwrap_or(0);
344 }
345 }
346
347 pub fn column(&self, name: &str) -> Option<&ColumnVector> {
349 self.columns
350 .iter()
351 .find(|(n, _)| n == name)
352 .map(|(_, c)| c)
353 }
354
355 pub fn column_at(&self, idx: usize) -> Option<&ColumnVector> {
357 self.columns.get(idx).map(|(_, c)| c)
358 }
359
360 pub fn column_count(&self) -> usize {
362 self.columns.len()
363 }
364
365 pub fn set_selection(&mut self, selection: Vec<usize>) {
367 self.selection = Some(selection);
368 }
369
370 pub fn clear_selection(&mut self) {
372 self.selection = None;
373 }
374
375 pub fn memory_size(&self) -> usize {
377 self.columns.iter().map(|(_, c)| c.memory_size()).sum()
378 }
379
380 pub fn reset(&mut self) {
382 self.columns.clear();
383 self.row_count = 0;
384 self.selection = None;
385 }
386}
387
388impl Default for VectorBatch {
389 fn default() -> Self {
390 Self::new()
391 }
392}
393
394#[derive(Debug, Default)]
400pub struct VectorizedScanStats {
401 pub rows_scanned: AtomicUsize,
403 pub batches_processed: AtomicUsize,
405 pub rows_passed: AtomicUsize,
407 pub bytes_read: AtomicUsize,
409}
410
411impl VectorizedScanStats {
412 pub fn new() -> Self {
413 Self::default()
414 }
415
416 pub fn record_batch(&self, rows: usize, passed: usize, bytes: usize) {
417 self.rows_scanned.fetch_add(rows, Ordering::Relaxed);
418 self.batches_processed.fetch_add(1, Ordering::Relaxed);
419 self.rows_passed.fetch_add(passed, Ordering::Relaxed);
420 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
421 }
422
423 pub fn rows_scanned(&self) -> usize {
424 self.rows_scanned.load(Ordering::Relaxed)
425 }
426
427 pub fn batches_processed(&self) -> usize {
428 self.batches_processed.load(Ordering::Relaxed)
429 }
430}
431
432pub trait VectorPredicate: Send + Sync {
434 fn evaluate(&self, column: &ColumnVector) -> Vec<bool>;
436
437 fn column_name(&self) -> &str;
439}
440
441#[derive(Debug, Clone)]
443pub struct Int64Comparison {
444 column_name: String,
445 op: ComparisonOp,
446 value: i64,
447}
448
449#[derive(Debug, Clone, Copy)]
451pub enum ComparisonOp {
452 Equal,
453 NotEqual,
454 LessThan,
455 LessEqual,
456 GreaterThan,
457 GreaterEqual,
458}
459
460impl Int64Comparison {
461 pub fn new(column_name: impl Into<String>, op: ComparisonOp, value: i64) -> Self {
462 Self {
463 column_name: column_name.into(),
464 op,
465 value,
466 }
467 }
468
469 pub fn eq(column_name: impl Into<String>, value: i64) -> Self {
470 Self::new(column_name, ComparisonOp::Equal, value)
471 }
472
473 pub fn gt(column_name: impl Into<String>, value: i64) -> Self {
474 Self::new(column_name, ComparisonOp::GreaterThan, value)
475 }
476
477 pub fn lt(column_name: impl Into<String>, value: i64) -> Self {
478 Self::new(column_name, ComparisonOp::LessThan, value)
479 }
480}
481
482impl VectorPredicate for Int64Comparison {
483 fn evaluate(&self, column: &ColumnVector) -> Vec<bool> {
484 match column {
485 ColumnVector::Int64(values) => {
486 let cmp_value = self.value;
487 match self.op {
488 ComparisonOp::Equal => values.iter().map(|&v| v == cmp_value).collect(),
489 ComparisonOp::NotEqual => values.iter().map(|&v| v != cmp_value).collect(),
490 ComparisonOp::LessThan => values.iter().map(|&v| v < cmp_value).collect(),
491 ComparisonOp::LessEqual => values.iter().map(|&v| v <= cmp_value).collect(),
492 ComparisonOp::GreaterThan => values.iter().map(|&v| v > cmp_value).collect(),
493 ComparisonOp::GreaterEqual => values.iter().map(|&v| v >= cmp_value).collect(),
494 }
495 }
496 _ => vec![false; column.len()],
497 }
498 }
499
500 fn column_name(&self) -> &str {
501 &self.column_name
502 }
503}
504
505#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
507pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
508 use std::arch::x86_64::*;
509
510 let mut result = vec![false; values.len()];
511 let chunks = values.len() / 4;
512
513 unsafe {
514 let threshold_vec = _mm256_set1_epi64x(threshold);
515
516 for i in 0..chunks {
517 let v = _mm256_loadu_si256(values.as_ptr().add(i * 4) as *const __m256i);
518 let cmp = _mm256_cmpgt_epi64(v, threshold_vec);
519 let mask = _mm256_movemask_epi8(cmp) as u32;
520
521 result[i * 4] = (mask & 0xFF) != 0;
523 result[i * 4 + 1] = (mask & 0xFF00) != 0;
524 result[i * 4 + 2] = (mask & 0xFF0000) != 0;
525 result[i * 4 + 3] = (mask & 0xFF000000) != 0;
526 }
527
528 for i in (chunks * 4)..values.len() {
530 result[i] = values[i] > threshold;
531 }
532 }
533
534 result
535}
536
537#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
538pub fn simd_compare_i64_gt(values: &[i64], threshold: i64) -> Vec<bool> {
539 values.iter().map(|&v| v > threshold).collect()
540}
541
542#[derive(Debug, Clone)]
548pub struct VectorizedScanConfig {
549 pub batch_size: usize,
551 pub prefetch_enabled: bool,
553 pub prefetch_distance: usize,
555 pub simd_enabled: bool,
557}
558
559impl Default for VectorizedScanConfig {
560 fn default() -> Self {
561 Self {
562 batch_size: DEFAULT_BATCH_SIZE,
563 prefetch_enabled: true,
564 prefetch_distance: 16,
565 simd_enabled: true,
566 }
567 }
568}
569
570impl VectorizedScanConfig {
571 pub fn new() -> Self {
572 Self::default()
573 }
574
575 pub fn with_batch_size(mut self, size: usize) -> Self {
576 self.batch_size = size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
577 self
578 }
579
580 pub fn with_prefetch(mut self, enabled: bool) -> Self {
581 self.prefetch_enabled = enabled;
582 self
583 }
584}
585
586pub struct SimdVisibilityFilter;
609
610impl SimdVisibilityFilter {
611 #[inline]
615 pub fn filter_batch(commit_ts: &[u64], snapshot_ts: u64) -> Vec<bool> {
616 let mut result = vec![false; commit_ts.len()];
617 Self::filter_batch_into(commit_ts, snapshot_ts, &mut result);
618 result
619 }
620
621 #[inline]
623 pub fn filter_batch_into(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
624 assert_eq!(commit_ts.len(), out.len());
625
626 #[cfg(target_arch = "x86_64")]
627 {
628 Self::filter_batch_simd_x86(commit_ts, snapshot_ts, out);
629 return;
630 }
631
632 #[cfg(target_arch = "aarch64")]
633 {
634 Self::filter_batch_simd_neon(commit_ts, snapshot_ts, out);
635 return;
636 }
637
638 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
639 {
640 Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
641 }
642 }
643
644 #[inline]
646 fn filter_batch_scalar(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
647 for (i, &ts) in commit_ts.iter().enumerate() {
648 out[i] = ts != 0 && ts < snapshot_ts;
650 }
651 }
652
653 #[cfg(target_arch = "x86_64")]
655 fn filter_batch_simd_x86(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
656 let n = commit_ts.len();
657 if n == 0 {
658 return;
659 }
660
661 let chunks = n / 4;
663 let remainder = n % 4;
664
665 #[cfg(target_feature = "avx2")]
669 unsafe {
670 use std::arch::x86_64::*;
671
672 let zero = _mm256_setzero_si256();
673 let snapshot_vec = _mm256_set1_epi64x(snapshot_ts as i64);
674
675 for chunk in 0..chunks {
676 let ptr = commit_ts.as_ptr().add(chunk * 4) as *const __m256i;
677 let ts_vec = _mm256_loadu_si256(ptr);
678
679 let not_zero = _mm256_xor_si256(
681 _mm256_cmpeq_epi64(ts_vec, zero),
682 _mm256_set1_epi64x(-1), );
684
685 let less_than = _mm256_xor_si256(
688 _mm256_or_si256(
689 _mm256_cmpgt_epi64(ts_vec, snapshot_vec),
690 _mm256_cmpeq_epi64(ts_vec, snapshot_vec),
691 ),
692 _mm256_set1_epi64x(-1),
693 );
694
695 let visible = _mm256_and_si256(not_zero, less_than);
697
698 let mask: [i64; 4] = std::mem::transmute(visible);
700 for j in 0..4 {
701 out[chunk * 4 + j] = mask[j] != 0;
702 }
703 }
704 }
705
706 #[cfg(not(target_feature = "avx2"))]
707 {
708 let chunks = n / 2;
710 for chunk in 0..chunks {
711 let base = chunk * 2;
712 for j in 0..2 {
713 let ts = commit_ts[base + j];
714 out[base + j] = ts != 0 && ts < snapshot_ts;
715 }
716 }
717 }
718
719 let base = chunks * 4;
721 for i in 0..remainder {
722 let ts = commit_ts[base + i];
723 out[base + i] = ts != 0 && ts < snapshot_ts;
724 }
725 }
726
727 #[cfg(target_arch = "aarch64")]
729 fn filter_batch_simd_neon(commit_ts: &[u64], snapshot_ts: u64, out: &mut [bool]) {
730 Self::filter_batch_scalar(commit_ts, snapshot_ts, out);
733 }
734
735 #[inline]
741 pub fn filter_batch_with_txn(
742 commit_ts: &[u64],
743 txn_ids: &[u64],
744 snapshot_ts: u64,
745 current_txn_id: u64,
746 out: &mut [bool],
747 ) {
748 assert_eq!(commit_ts.len(), txn_ids.len());
749 assert_eq!(commit_ts.len(), out.len());
750
751 Self::filter_batch_into(commit_ts, snapshot_ts, out);
753
754 for (i, &txn_id) in txn_ids.iter().enumerate() {
756 if txn_id == current_txn_id {
757 out[i] = true;
758 }
759 }
760 }
761
762 #[inline]
764 pub fn count_visible(commit_ts: &[u64], snapshot_ts: u64) -> usize {
765 let mut count = 0;
766 for &ts in commit_ts {
767 if ts != 0 && ts < snapshot_ts {
768 count += 1;
769 }
770 }
771 count
772 }
773}
774
775#[derive(Debug, Clone)]
780pub struct VersionedSlice<'a> {
781 pub key: &'a [u8],
783 pub value: Option<&'a [u8]>,
785 pub commit_ts: u64,
787 pub txn_id: u64,
789}
790
791impl<'a> VersionedSlice<'a> {
792 #[inline]
794 pub fn is_visible(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
795 if let Some(my_txn) = current_txn_id {
797 if self.txn_id == my_txn {
798 return true;
799 }
800 }
801 self.commit_ts != 0 && self.commit_ts < snapshot_ts
803 }
804}
805
806pub struct StreamingScanIterator<'a, I>
817where
818 I: Iterator<Item = VersionedSlice<'a>>,
819{
820 source: I,
822 batch: Vec<VersionedSlice<'a>>,
824 visibility: Vec<bool>,
826 pos: usize,
828 snapshot_ts: u64,
830 current_txn_id: Option<u64>,
832 batch_size: usize,
834}
835
836impl<'a, I> StreamingScanIterator<'a, I>
837where
838 I: Iterator<Item = VersionedSlice<'a>>,
839{
840 pub fn new(source: I, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
842 Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
843 }
844
845 pub fn with_batch_size(
847 source: I,
848 snapshot_ts: u64,
849 current_txn_id: Option<u64>,
850 batch_size: usize,
851 ) -> Self {
852 Self {
853 source,
854 batch: Vec::with_capacity(batch_size),
855 visibility: Vec::with_capacity(batch_size),
856 pos: 0,
857 snapshot_ts,
858 current_txn_id,
859 batch_size,
860 }
861 }
862
863 fn fetch_batch(&mut self) -> bool {
865 self.batch.clear();
866 self.visibility.clear();
867 self.pos = 0;
868
869 for entry in self.source.by_ref().take(self.batch_size) {
871 self.batch.push(entry);
872 }
873
874 if self.batch.is_empty() {
875 return false;
876 }
877
878 let commit_ts: Vec<u64> = self.batch.iter().map(|e| e.commit_ts).collect();
880 self.visibility.resize(self.batch.len(), false);
881
882 if let Some(txn_id) = self.current_txn_id {
883 let txn_ids: Vec<u64> = self.batch.iter().map(|e| e.txn_id).collect();
884 SimdVisibilityFilter::filter_batch_with_txn(
885 &commit_ts,
886 &txn_ids,
887 self.snapshot_ts,
888 txn_id,
889 &mut self.visibility,
890 );
891 } else {
892 SimdVisibilityFilter::filter_batch_into(&commit_ts, self.snapshot_ts, &mut self.visibility);
893 }
894
895 true
896 }
897}
898
899impl<'a, I> Iterator for StreamingScanIterator<'a, I>
900where
901 I: Iterator<Item = VersionedSlice<'a>>,
902{
903 type Item = VersionedSlice<'a>;
904
905 fn next(&mut self) -> Option<Self::Item> {
906 loop {
907 while self.pos >= self.batch.len() {
909 if !self.fetch_batch() {
910 return None;
911 }
912 }
913
914 while self.pos < self.batch.len() {
916 let idx = self.pos;
917 self.pos += 1;
918
919 if self.visibility[idx] {
920 return Some(self.batch[idx].clone());
921 }
922 }
923 }
924 }
925}
926
927#[derive(Debug)]
959pub struct SoaBatch<'a> {
960 pub commit_ts: Vec<u64>,
962 pub txn_ids: Vec<u64>,
964 pub keys: Vec<&'a [u8]>,
966 pub value_handles: Vec<Option<ValueHandle<'a>>>,
969 pub visibility: Vec<bool>,
971 pub selection: Vec<usize>,
973}
974
975#[derive(Debug, Clone, Copy)]
980pub enum ValueHandle<'a> {
981 Direct(&'a [u8]),
983 BlockOffset { block_id: u32, offset: u32, len: u32 },
985 ArenaSlot { arena_id: u32, slot: u32 },
987}
988
989impl<'a> ValueHandle<'a> {
990 pub fn materialize(&self) -> Option<&'a [u8]> {
992 match self {
993 ValueHandle::Direct(data) => Some(*data),
994 ValueHandle::BlockOffset { .. } => None,
997 ValueHandle::ArenaSlot { .. } => None,
998 }
999 }
1000}
1001
1002impl<'a> SoaBatch<'a> {
1003 pub fn with_capacity(capacity: usize) -> Self {
1005 Self {
1006 commit_ts: Vec::with_capacity(capacity),
1007 txn_ids: Vec::with_capacity(capacity),
1008 keys: Vec::with_capacity(capacity),
1009 value_handles: Vec::with_capacity(capacity),
1010 visibility: Vec::with_capacity(capacity),
1011 selection: Vec::with_capacity(capacity),
1012 }
1013 }
1014
1015 #[inline]
1017 pub fn push(&mut self, key: &'a [u8], value: Option<&'a [u8]>, commit_ts: u64, txn_id: u64) {
1018 self.commit_ts.push(commit_ts);
1019 self.txn_ids.push(txn_id);
1020 self.keys.push(key);
1021 self.value_handles.push(value.map(ValueHandle::Direct));
1022 }
1023
1024 #[inline]
1026 pub fn push_deferred(
1027 &mut self,
1028 key: &'a [u8],
1029 handle: Option<ValueHandle<'a>>,
1030 commit_ts: u64,
1031 txn_id: u64,
1032 ) {
1033 self.commit_ts.push(commit_ts);
1034 self.txn_ids.push(txn_id);
1035 self.keys.push(key);
1036 self.value_handles.push(handle);
1037 }
1038
1039 pub fn len(&self) -> usize {
1041 self.commit_ts.len()
1042 }
1043
1044 pub fn is_empty(&self) -> bool {
1046 self.commit_ts.is_empty()
1047 }
1048
1049 pub fn clear(&mut self) {
1051 self.commit_ts.clear();
1052 self.txn_ids.clear();
1053 self.keys.clear();
1054 self.value_handles.clear();
1055 self.visibility.clear();
1056 self.selection.clear();
1057 }
1058
1059 pub fn filter_visibility(&mut self, snapshot_ts: u64, current_txn_id: Option<u64>) {
1064 let n = self.len();
1065 self.visibility.resize(n, false);
1066 self.selection.clear();
1067
1068 if let Some(txn_id) = current_txn_id {
1070 SimdVisibilityFilter::filter_batch_with_txn(
1071 &self.commit_ts,
1072 &self.txn_ids,
1073 snapshot_ts,
1074 txn_id,
1075 &mut self.visibility,
1076 );
1077 } else {
1078 SimdVisibilityFilter::filter_batch_into(&self.commit_ts, snapshot_ts, &mut self.visibility);
1079 }
1080
1081 for (i, &visible) in self.visibility.iter().enumerate() {
1083 if visible {
1084 self.selection.push(i);
1085 }
1086 }
1087 }
1088
1089 pub fn visible_count(&self) -> usize {
1091 self.selection.len()
1092 }
1093
1094 pub fn iter_visible(&self) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)> + '_ {
1098 self.selection.iter().map(move |&idx| {
1099 let key = self.keys[idx];
1100 let value = self.value_handles[idx].and_then(|h| h.materialize());
1101 (key, value)
1102 })
1103 }
1104
1105 pub fn iter_visible_full(
1107 &self,
1108 ) -> impl Iterator<Item = (&'a [u8], Option<&'a [u8]>, u64, u64)> + '_ {
1109 self.selection.iter().map(move |&idx| {
1110 let key = self.keys[idx];
1111 let value = self.value_handles[idx].and_then(|h| h.materialize());
1112 let ts = self.commit_ts[idx];
1113 let txn = self.txn_ids[idx];
1114 (key, value, ts, txn)
1115 })
1116 }
1117}
1118
1119pub struct SoaScanIterator<'a, S>
1135where
1136 S: SoaSource<'a>,
1137{
1138 source: S,
1140 batch: SoaBatch<'a>,
1142 pos: usize,
1144 snapshot_ts: u64,
1146 current_txn_id: Option<u64>,
1148 #[allow(dead_code)]
1150 batch_size: usize,
1151 stats: SoaScanStats,
1153}
1154
1155#[derive(Debug, Default, Clone)]
1157pub struct SoaScanStats {
1158 pub rows_scanned: usize,
1160 pub rows_visible: usize,
1162 pub values_materialized: usize,
1164 pub batches_processed: usize,
1166}
1167
1168impl SoaScanStats {
1169 pub fn selectivity(&self) -> f64 {
1171 if self.rows_scanned == 0 {
1172 0.0
1173 } else {
1174 self.rows_visible as f64 / self.rows_scanned as f64
1175 }
1176 }
1177
1178 pub fn materialization_efficiency(&self) -> f64 {
1181 if self.rows_visible == 0 {
1182 1.0
1183 } else {
1184 self.values_materialized as f64 / self.rows_visible as f64
1185 }
1186 }
1187}
1188
1189pub trait SoaSource<'a> {
1191 fn fill_batch(&mut self, batch: &mut SoaBatch<'a>) -> bool;
1194}
1195
1196impl<'a, S> SoaScanIterator<'a, S>
1197where
1198 S: SoaSource<'a>,
1199{
1200 pub fn new(source: S, snapshot_ts: u64, current_txn_id: Option<u64>) -> Self {
1202 Self::with_batch_size(source, snapshot_ts, current_txn_id, DEFAULT_BATCH_SIZE)
1203 }
1204
1205 pub fn with_batch_size(
1207 source: S,
1208 snapshot_ts: u64,
1209 current_txn_id: Option<u64>,
1210 batch_size: usize,
1211 ) -> Self {
1212 Self {
1213 source,
1214 batch: SoaBatch::with_capacity(batch_size),
1215 pos: 0,
1216 snapshot_ts,
1217 current_txn_id,
1218 batch_size,
1219 stats: SoaScanStats::default(),
1220 }
1221 }
1222
1223 fn fetch_batch(&mut self) -> bool {
1225 self.batch.clear();
1226 self.pos = 0;
1227
1228 if !self.source.fill_batch(&mut self.batch) {
1230 return false;
1231 }
1232
1233 self.stats.rows_scanned += self.batch.len();
1234 self.stats.batches_processed += 1;
1235
1236 self.batch.filter_visibility(self.snapshot_ts, self.current_txn_id);
1238 self.stats.rows_visible += self.batch.visible_count();
1239
1240 true
1241 }
1242
1243 pub fn stats(&self) -> &SoaScanStats {
1245 &self.stats
1246 }
1247}
1248
1249impl<'a, S> Iterator for SoaScanIterator<'a, S>
1250where
1251 S: SoaSource<'a>,
1252{
1253 type Item = (&'a [u8], Option<&'a [u8]>);
1254
1255 fn next(&mut self) -> Option<Self::Item> {
1256 loop {
1257 while self.pos >= self.batch.selection.len() {
1259 if !self.fetch_batch() {
1260 return None;
1261 }
1262 }
1263
1264 let sel_idx = self.pos;
1266 self.pos += 1;
1267 let row_idx = self.batch.selection[sel_idx];
1268
1269 let key = self.batch.keys[row_idx];
1270 let value = self.batch.value_handles[row_idx].and_then(|h| h.materialize());
1271 self.stats.values_materialized += 1;
1272
1273 return Some((key, value));
1274 }
1275 }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281
1282 #[test]
1283 fn test_column_vector_int64() {
1284 let mut v = ColumnVector::Int64(vec![1, 2, 3, 4, 5]);
1285 assert_eq!(v.len(), 5);
1286 assert_eq!(v.sum_i64(), Some(15));
1287 }
1288
1289 #[test]
1290 fn test_column_vector_float64() {
1291 let v = ColumnVector::Float64(vec![1.0, 2.0, 3.0, 4.0]);
1292 assert_eq!(v.len(), 4);
1293 assert_eq!(v.sum_f64(), Some(10.0));
1294 }
1295
1296 #[test]
1297 fn test_vector_batch() {
1298 let mut batch = VectorBatch::with_capacity(1024);
1299 batch.add_column("id", ColumnVector::Int64(vec![1, 2, 3]));
1300 batch.add_column("value", ColumnVector::Float64(vec![1.5, 2.5, 3.5]));
1301
1302 assert_eq!(batch.row_count(), 3);
1303 assert_eq!(batch.column_count(), 2);
1304 assert!(batch.column("id").is_some());
1305 }
1306
1307 #[test]
1308 fn test_int64_comparison() {
1309 let col = ColumnVector::Int64(vec![1, 5, 10, 15, 20]);
1310 let pred = Int64Comparison::gt("test", 10);
1311 let result = pred.evaluate(&col);
1312
1313 assert_eq!(result, vec![false, false, false, true, true]);
1314 }
1315
1316 #[test]
1317 fn test_simd_sum_i64_large() {
1318 let values: Vec<i64> = (0..1000).collect();
1320 let expected: i64 = (0..1000).sum();
1321
1322 let col = ColumnVector::Int64(values);
1323 assert_eq!(col.sum_i64(), Some(expected));
1324 }
1325
1326 #[test]
1327 fn test_simd_compare_gt() {
1328 let values: Vec<i64> = vec![1, 5, 10, 15, 20, 25, 30, 35];
1329 let result = simd_compare_i64_gt(&values, 12);
1330 assert_eq!(result, vec![false, false, false, true, true, true, true, true]);
1331 }
1332
1333 #[test]
1334 fn test_vectorized_scan_config() {
1335 let config = VectorizedScanConfig::new()
1336 .with_batch_size(2048)
1337 .with_prefetch(true);
1338
1339 assert_eq!(config.batch_size, 2048);
1340 assert!(config.prefetch_enabled);
1341 }
1342
1343 #[test]
1344 fn test_simd_visibility_filter_basic() {
1345 let commit_ts = vec![0, 10, 20, 30, 40];
1347 let snapshot_ts = 25;
1348
1349 let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1350
1351 assert_eq!(result, vec![false, true, true, false, false]);
1358 }
1359
1360 #[test]
1361 fn test_simd_visibility_filter_with_txn() {
1362 let commit_ts = vec![0, 10, 0, 30, 40];
1363 let txn_ids = vec![1, 2, 1, 4, 5]; let snapshot_ts = 25;
1365 let current_txn_id = 1;
1366
1367 let mut result = vec![false; 5];
1368 SimdVisibilityFilter::filter_batch_with_txn(
1369 &commit_ts,
1370 &txn_ids,
1371 snapshot_ts,
1372 current_txn_id,
1373 &mut result,
1374 );
1375
1376 assert_eq!(result, vec![true, true, true, false, false]);
1383 }
1384
1385 #[test]
1386 fn test_simd_visibility_filter_large() {
1387 let n = 1000;
1389 let commit_ts: Vec<u64> = (1..=n as u64).collect();
1390 let snapshot_ts = 500;
1391
1392 let result = SimdVisibilityFilter::filter_batch(&commit_ts, snapshot_ts);
1393
1394 let visible_count = result.iter().filter(|&&v| v).count();
1396 assert_eq!(visible_count, 499);
1397 }
1398
1399 #[test]
1400 fn test_versioned_slice_visibility() {
1401 let slice = VersionedSlice {
1402 key: b"test",
1403 value: Some(b"value"),
1404 commit_ts: 100,
1405 txn_id: 1,
1406 };
1407
1408 assert!(slice.is_visible(200, None));
1409 assert!(!slice.is_visible(50, None));
1410 assert!(slice.is_visible(50, Some(1))); }
1412
1413 #[test]
1414 fn test_streaming_scan_iterator() {
1415 let entries: Vec<VersionedSlice<'static>> = vec![
1416 VersionedSlice { key: b"a", value: Some(b"1"), commit_ts: 10, txn_id: 1 },
1417 VersionedSlice { key: b"b", value: Some(b"2"), commit_ts: 0, txn_id: 2 }, VersionedSlice { key: b"c", value: Some(b"3"), commit_ts: 30, txn_id: 3 }, VersionedSlice { key: b"d", value: Some(b"4"), commit_ts: 15, txn_id: 4 },
1420 ];
1421
1422 let iter = StreamingScanIterator::new(entries.into_iter(), 25, None);
1423 let visible: Vec<_> = iter.collect();
1424
1425 assert_eq!(visible.len(), 2);
1427 assert_eq!(visible[0].key, b"a");
1428 assert_eq!(visible[1].key, b"d");
1429 }
1430
1431 #[test]
1432 fn test_soa_batch_basic() {
1433 let mut batch = SoaBatch::with_capacity(100);
1434
1435 batch.push(b"key1", Some(b"value1"), 10, 1);
1436 batch.push(b"key2", Some(b"value2"), 20, 2);
1437 batch.push(b"key3", None, 30, 3); batch.push(b"key4", Some(b"value4"), 0, 4); assert_eq!(batch.len(), 4);
1441 assert_eq!(batch.commit_ts, vec![10, 20, 30, 0]);
1442 assert_eq!(batch.txn_ids, vec![1, 2, 3, 4]);
1443 }
1444
1445 #[test]
1446 fn test_soa_batch_visibility_filter() {
1447 let mut batch = SoaBatch::with_capacity(100);
1448
1449 batch.push(b"k1", Some(b"v1"), 10, 1); batch.push(b"k2", Some(b"v2"), 0, 2); batch.push(b"k3", Some(b"v3"), 20, 3); batch.push(b"k4", Some(b"v4"), 30, 4); batch.push(b"k5", Some(b"v5"), 0, 5); batch.filter_visibility(25, None);
1456
1457 assert_eq!(batch.visibility, vec![true, false, true, false, false]);
1458 assert_eq!(batch.selection, vec![0, 2]); assert_eq!(batch.visible_count(), 2);
1460 }
1461
1462 #[test]
1463 fn test_soa_batch_self_visibility() {
1464 let mut batch = SoaBatch::with_capacity(100);
1465
1466 batch.push(b"k1", Some(b"v1"), 0, 42); batch.push(b"k2", Some(b"v2"), 10, 1); batch.push(b"k3", Some(b"v3"), 0, 99); batch.filter_visibility(25, Some(42));
1471
1472 assert_eq!(batch.visibility, vec![true, true, false]);
1473 assert_eq!(batch.selection, vec![0, 1]);
1474 }
1475
1476 #[test]
1477 fn test_soa_batch_late_materialization() {
1478 let mut batch = SoaBatch::with_capacity(100);
1479
1480 batch.push(b"key1", Some(b"val1"), 10, 1);
1481 batch.push(b"key2", Some(b"val2"), 0, 2); batch.push(b"key3", Some(b"val3"), 15, 3);
1483
1484 batch.filter_visibility(25, None);
1485
1486 let visible: Vec<_> = batch.iter_visible().collect();
1488
1489 assert_eq!(visible.len(), 2);
1490 assert_eq!(visible[0], (b"key1".as_slice(), Some(b"val1".as_slice())));
1491 assert_eq!(visible[1], (b"key3".as_slice(), Some(b"val3".as_slice())));
1492 }
1493
1494 #[test]
1495 fn test_soa_scan_stats() {
1496 let mut batch = SoaBatch::with_capacity(100);
1497
1498 for i in 0..10u64 {
1500 let ts = if i < 3 { 10 } else { 0 }; batch.push(b"key", Some(b"val"), ts, i);
1502 }
1503
1504 batch.filter_visibility(25, None);
1505
1506 let selectivity = batch.visible_count() as f64 / batch.len() as f64;
1507 assert!((selectivity - 0.3).abs() < 0.01); }
1509
1510 #[test]
1511 fn test_soa_batch_simd_large() {
1512 let mut batch = SoaBatch::with_capacity(2000);
1514
1515 for i in 0..1000u64 {
1516 let ts = if i % 2 == 0 { 10 } else { 50 };
1518 batch.push(b"k", Some(b"v"), ts, i);
1519 }
1520
1521 batch.filter_visibility(25, None);
1522
1523 assert_eq!(batch.visible_count(), 500);
1525
1526 for (i, &idx) in batch.selection.iter().enumerate() {
1528 assert_eq!(idx, i * 2); }
1530 }
1531}