use std::sync::Arc;
use crate::core::{Error, Result, Row, Value};
use crate::storage::traits::Scanner;
use super::writer::FrozenVolume;
enum TypedTarget {
Int64(i64),
Float64(f64),
Bool(bool),
}
struct ColumnPredicate {
col_idx: usize,
op: crate::core::Operator,
target: TypedTarget,
}
struct GroupColumnCache {
#[allow(dead_code)]
group_idx: usize,
columns: Vec<Option<super::column::ColumnData>>,
group_start: usize,
}
impl GroupColumnCache {
#[inline(always)]
fn col_and_local(
&self,
col_idx: usize,
global_idx: usize,
) -> Option<(&super::column::ColumnData, usize)> {
self.columns[col_idx]
.as_ref()
.map(|col| (col, global_idx - self.group_start))
}
}
pub struct VolumeScanner {
volume: Arc<FrozenVolume>,
project_cols: Vec<usize>,
is_full_projection: bool,
current_idx: usize,
end_idx: usize,
current_row: Row,
current_rid: i64,
has_current: bool,
column_mapping: Option<super::writer::ColumnMapping>,
error: Option<Error>,
filter: Option<Box<dyn crate::storage::expression::Expression>>,
dict_filters: Vec<(usize, u32)>,
matching_indices: Option<Vec<usize>>,
match_idx: usize,
visibility_bitmap: Option<Arc<Vec<u64>>>,
pending_cold_deletes: Option<Arc<rustc_hash::FxHashSet<i64>>>,
committed_tombstones: Option<Arc<rustc_hash::FxHashMap<i64, u64>>>,
pub snapshot_seq: Option<u64>,
typed_predicates: Vec<ColumnPredicate>,
needed_cols: Option<Vec<bool>>,
row_group_skips: Option<Vec<bool>>,
group_cache: Option<GroupColumnCache>,
next_group_boundary: usize,
}
impl VolumeScanner {
#[inline]
fn compute_is_full_projection(project_cols: &[usize], num_cols: usize) -> bool {
project_cols.len() == num_cols && project_cols.iter().enumerate().all(|(i, &c)| c == i)
}
pub fn new(
volume: Arc<FrozenVolume>,
project_cols: Vec<usize>,
_delete_vector: Option<()>,
) -> Self {
let project = if project_cols.is_empty() {
(0..volume.columns.len()).collect()
} else {
project_cols
};
let is_full_projection = Self::compute_is_full_projection(&project, volume.columns.len());
volume.mark_accessed();
let mut s = Self {
end_idx: volume.meta.row_count,
volume,
project_cols: project,
is_full_projection,
current_idx: 0,
current_row: Row::new(),
current_rid: 0,
has_current: false,
error: None,
filter: None,
column_mapping: None,
dict_filters: Vec::new(),
matching_indices: None,
match_idx: 0,
visibility_bitmap: None,
pending_cold_deletes: None,
committed_tombstones: None,
snapshot_seq: None,
typed_predicates: Vec::new(),
needed_cols: None,
row_group_skips: None,
group_cache: None,
next_group_boundary: 0,
};
if !s.is_full_projection && s.volume.columns.should_use_group_cache() {
let mut mask = vec![false; s.volume.columns.len()];
for &ci in &s.project_cols {
if ci < mask.len() {
mask[ci] = true;
}
}
s.needed_cols = Some(mask);
}
s
}
pub fn with_range(
volume: Arc<FrozenVolume>,
project_cols: Vec<usize>,
start_idx: usize,
end_idx: usize,
_delete_vector: Option<()>,
) -> Self {
let project = if project_cols.is_empty() {
(0..volume.columns.len()).collect()
} else {
project_cols
};
let is_full_projection = Self::compute_is_full_projection(&project, volume.columns.len());
volume.mark_accessed();
let mut s = Self {
volume,
project_cols: project,
is_full_projection,
current_idx: start_idx,
end_idx,
current_row: Row::new(),
current_rid: 0,
has_current: false,
error: None,
filter: None,
column_mapping: None,
dict_filters: Vec::new(),
matching_indices: None,
match_idx: 0,
visibility_bitmap: None,
pending_cold_deletes: None,
committed_tombstones: None,
snapshot_seq: None,
typed_predicates: Vec::new(),
needed_cols: None,
row_group_skips: None,
group_cache: None,
next_group_boundary: 0,
};
if !s.is_full_projection && s.volume.columns.should_use_group_cache() {
let mut mask = vec![false; s.volume.columns.len()];
for &ci in &s.project_cols {
if ci < mask.len() {
mask[ci] = true;
}
}
s.needed_cols = Some(mask);
}
s
}
pub fn set_pending_cold_deletes(&mut self, pending: Arc<rustc_hash::FxHashSet<i64>>) {
self.pending_cold_deletes = Some(pending);
}
pub fn set_skip_sets(
&mut self,
committed: Arc<rustc_hash::FxHashMap<i64, u64>>,
dynamic: Arc<rustc_hash::FxHashSet<i64>>,
) {
self.committed_tombstones = Some(committed);
self.pending_cold_deletes = Some(dynamic);
}
pub fn set_visibility_bitmap(&mut self, bitmap: Option<Arc<Vec<u64>>>) {
self.visibility_bitmap = bitmap;
}
pub fn empty() -> Self {
Self {
volume: Arc::new(FrozenVolume {
columns: super::writer::LazyColumns::empty(),
meta: Arc::new(super::writer::VolumeMeta {
zone_maps: Vec::new(),
bloom_filters: Vec::new(),
stats: super::stats::VolumeAggregateStats::new(0),
row_count: 0,
column_names: Vec::new(),
column_types: Vec::new(),
row_ids: Vec::new(),
sorted_columns: Vec::new(),
column_name_map: ahash::AHashMap::new(),
row_groups: Vec::new(),
}),
unique_indices: std::sync::Arc::new(parking_lot::RwLock::new(
rustc_hash::FxHashMap::default(),
)),
last_access_epoch: std::sync::atomic::AtomicU64::new(0),
}),
project_cols: Vec::new(),
is_full_projection: true,
current_idx: 0,
end_idx: 0,
current_row: Row::new(),
current_rid: 0,
has_current: false,
error: None,
filter: None,
column_mapping: None,
dict_filters: Vec::new(),
matching_indices: None,
match_idx: 0,
visibility_bitmap: None,
pending_cold_deletes: None,
committed_tombstones: None,
snapshot_seq: None,
typed_predicates: Vec::new(),
needed_cols: None,
row_group_skips: None,
group_cache: None,
next_group_boundary: 0,
}
}
pub fn set_filter(&mut self, filter: Box<dyn crate::storage::expression::Expression>) {
let comparisons = filter.collect_comparisons();
let store = if self.volume.columns.should_use_group_cache() {
self.volume.columns.compressed_store()
} else {
None
};
for &(col_name, op, value) in &comparisons {
if op != crate::core::Operator::Eq {
continue;
}
if let Value::Text(s) = value {
if let Some(col_idx) = self.volume.column_index(col_name) {
let dict_id = if let Some(st) = store {
st.dict_lookup(col_idx, s.as_str())
} else {
self.volume.columns[col_idx].dict_lookup(s.as_str())
};
if let Some(id) = dict_id {
self.dict_filters.push((col_idx, id));
} else {
self.current_idx = self.end_idx;
self.filter = Some(filter);
return;
}
}
}
}
if !self.dict_filters.is_empty() {
let scan_range = self.end_idx - self.current_idx;
let selectivity_cap = scan_range / 10; let matches = if let Some(st) = store {
let mut m = Vec::new();
let first_col = self.dict_filters[0].0;
let num_grp = st.num_groups(first_col);
let mut exceeded = false;
for gi in 0..num_grp {
let gs = gi * super::column::ROW_GROUP_SIZE;
let ge = ((gi + 1) * super::column::ROW_GROUP_SIZE).min(self.end_idx);
if gs >= self.end_idx || ge <= self.current_idx {
continue;
}
let mut group_cols = Vec::with_capacity(self.dict_filters.len());
let mut corrupt = false;
for &(ci, _) in &self.dict_filters {
match st.decompress_single_group(ci, gi) {
Ok(col) => group_cols.push(col),
Err(_) => {
corrupt = true;
break;
}
}
}
if corrupt {
self.current_idx = self.end_idx;
self.error =
Some(Error::internal("corrupt V4 block during dictionary filter"));
self.filter = Some(filter);
return;
}
for i in gs.max(self.current_idx)..ge {
let local = i - gs;
let ok = self.dict_filters.iter().zip(group_cols.iter()).all(
|(&(_, eid), col)| !col.is_null(local) && col.get_dict_id(local) == eid,
);
if ok {
m.push(i);
}
}
if m.len() > selectivity_cap {
exceeded = true;
break;
}
}
if exceeded {
None
} else {
Some(m)
}
} else {
let mut m = Vec::new();
for i in self.current_idx..self.end_idx {
let ok = self.dict_filters.iter().all(|&(ci, eid)| {
!self.volume.columns[ci].is_null(i)
&& self.volume.columns[ci].get_dict_id(i) == eid
});
if ok {
m.push(i);
}
if m.len() > selectivity_cap {
break;
}
}
if m.len() > selectivity_cap {
None
} else {
Some(m)
}
};
self.matching_indices = matches;
}
for &(col_name, op, value) in &comparisons {
if !matches!(
op,
crate::core::Operator::Eq
| crate::core::Operator::Ne
| crate::core::Operator::Gt
| crate::core::Operator::Gte
| crate::core::Operator::Lt
| crate::core::Operator::Lte
) {
continue;
}
let col_idx = match self.volume.column_index(col_name) {
Some(idx) => idx,
None => continue,
};
let col_dt = self.volume.columns.data_type(col_idx);
let target = match (value, col_dt) {
(Value::Integer(v), crate::core::DataType::Integer) => TypedTarget::Int64(*v),
(Value::Float(v), crate::core::DataType::Float) => TypedTarget::Float64(*v),
(Value::Boolean(v), crate::core::DataType::Boolean) => TypedTarget::Bool(*v),
(Value::Timestamp(dt), crate::core::DataType::Timestamp) => {
TypedTarget::Int64(dt.timestamp_nanos_opt().unwrap_or_else(|| {
dt.timestamp()
.saturating_mul(1_000_000_000)
.saturating_add(dt.timestamp_subsec_nanos() as i64)
}))
}
(Value::Integer(v), crate::core::DataType::Float) => {
TypedTarget::Float64(*v as f64)
}
_ => continue,
};
self.typed_predicates.push(ColumnPredicate {
col_idx,
op,
target,
});
}
let mut filter_cols = Vec::new();
if filter.collect_column_indices(&mut filter_cols) {
let num_cols = self.volume.columns.len();
let mask_len = if let Some(ref m) = self.column_mapping {
m.sources.len().max(num_cols)
} else {
num_cols
};
let mut mask = vec![false; mask_len];
for &ci in &filter_cols {
if ci < mask_len {
mask[ci] = true;
}
}
for &ci in &self.project_cols {
if ci < mask_len {
mask[ci] = true;
}
}
self.needed_cols = Some(mask);
} else {
self.needed_cols = None;
}
if !self.volume.meta.row_groups.is_empty() && !comparisons.is_empty() {
let skips: Vec<bool> = self
.volume
.meta
.row_groups
.iter()
.map(|rg| {
for &(col_name, op, value) in &comparisons {
let col_idx = match self.volume.column_index(col_name) {
Some(idx) if idx < rg.zone_maps.len() => idx,
_ => continue,
};
let zm = &rg.zone_maps[col_idx];
let dominated = match op {
crate::core::Operator::Eq => !zm.may_contain_eq(value),
crate::core::Operator::Gt | crate::core::Operator::Gte => {
!zm.may_contain_gte(value)
}
crate::core::Operator::Lt | crate::core::Operator::Lte => {
!zm.may_contain_lte(value)
}
_ => false,
};
if dominated {
return true; }
}
false
})
.collect();
if skips.iter().any(|&s| s) {
self.row_group_skips = Some(skips);
}
}
self.filter = Some(filter);
}
#[inline]
fn evaluate_typed_predicates(&self, idx: usize) -> bool {
for pred in &self.typed_predicates {
let (col, local) = self.col_and_idx(pred.col_idx, idx);
if col.is_null(local) {
continue;
}
let matches = match &pred.target {
TypedTarget::Int64(target) => {
let val = col.get_i64(local);
match pred.op {
crate::core::Operator::Eq => val == *target,
crate::core::Operator::Ne => val != *target,
crate::core::Operator::Gt => val > *target,
crate::core::Operator::Gte => val >= *target,
crate::core::Operator::Lt => val < *target,
crate::core::Operator::Lte => val <= *target,
_ => true,
}
}
TypedTarget::Float64(target) => {
let val = col.get_f64(local);
match pred.op {
crate::core::Operator::Eq => val == *target,
crate::core::Operator::Ne => val != *target,
crate::core::Operator::Gt => val > *target,
crate::core::Operator::Gte => val >= *target,
crate::core::Operator::Lt => val < *target,
crate::core::Operator::Lte => val <= *target,
_ => true,
}
}
TypedTarget::Bool(target) => {
let val = col.get_bool(local);
match pred.op {
crate::core::Operator::Eq => val == *target,
crate::core::Operator::Ne => val != *target,
_ => true,
}
}
};
if !matches {
return false;
}
}
true
}
pub fn set_column_mapping(&mut self, mapping: super::writer::ColumnMapping) {
if !mapping.is_identity {
self.column_mapping = Some(mapping);
}
}
#[inline(always)]
fn col_and_idx(
&self,
col_idx: usize,
global_idx: usize,
) -> (&super::column::ColumnData, usize) {
if let Some(ref cache) = self.group_cache {
if let Some(pair) = cache.col_and_local(col_idx, global_idx) {
return pair;
}
}
(&self.volume.columns[col_idx], global_idx)
}
fn load_group_cache(&mut self, group_idx: usize) {
let store = match self.volume.columns.compressed_store() {
Some(s) => s,
None => {
self.group_cache = None;
return;
}
};
let col_count = self.volume.columns.len();
let group_start = group_idx * super::column::ROW_GROUP_SIZE;
let mut columns: Vec<Option<super::column::ColumnData>> = vec![None; col_count];
if let Some(ref needed) = self.needed_cols {
for (ci, &need) in needed.iter().enumerate() {
if need && ci < col_count && group_idx < store.num_groups(ci) {
match store.decompress_single_group(ci, group_idx) {
Ok(col) => columns[ci] = Some(col),
Err(e) => {
self.error = Some(Error::internal(format!("corrupt V4 block: {}", e)));
return;
}
}
}
}
} else {
for (ci, slot) in columns.iter_mut().enumerate() {
if group_idx < store.num_groups(ci) {
match store.decompress_single_group(ci, group_idx) {
Ok(col) => *slot = Some(col),
Err(e) => {
self.error = Some(Error::internal(format!("corrupt V4 block: {}", e)));
return;
}
}
}
}
}
self.group_cache = Some(GroupColumnCache {
group_idx,
columns,
group_start,
});
}
#[inline(always)]
fn should_skip_row(&self, idx: usize) -> bool {
if let Some(ref bm) = self.visibility_bitmap {
let word_idx = idx >> 6;
if word_idx < bm.len() && (bm[word_idx] >> (idx & 63)) & 1 == 0 {
return true;
}
}
let rid = self.volume.meta.row_ids[idx];
if let Some(ref ts) = self.committed_tombstones {
if let Some(&commit_seq) = ts.get(&rid) {
if self.snapshot_seq.is_none_or(|ss| commit_seq <= ss) {
return true;
}
}
}
if let Some(ref pending) = self.pending_cold_deletes {
if pending.contains(&rid) {
return true;
}
}
false
}
#[inline(always)]
fn dict_filters_reject(&self, idx: usize) -> bool {
for &(col_idx, expected_id) in &self.dict_filters {
let (col, local) = self.col_and_idx(col_idx, idx);
if col.is_null(local) || col.get_dict_id(local) != expected_id {
return true;
}
}
false
}
#[inline(always)]
fn materialize_row(&mut self, idx: usize) -> bool {
if self.group_cache.is_some() && self.column_mapping.is_none() {
return self.materialize_row_from_cache(idx);
}
if let Some(ref filter) = self.filter {
let full_row = match (&self.needed_cols, &self.column_mapping) {
(Some(mask), Some(mapping)) => {
self.volume.get_row_mapped_needed(idx, mapping, mask)
}
(Some(mask), None) => self.volume.get_row_needed(idx, mask),
(None, Some(mapping)) => self.volume.get_row_mapped(idx, mapping),
(None, None) => self.volume.get_row(idx),
};
if !filter.evaluate_fast(&full_row) {
return false;
}
if self.is_full_projection {
self.current_row = full_row;
} else {
self.current_row = Row::from_values(
self.project_cols
.iter()
.map(|&col| {
full_row
.get(col)
.cloned()
.unwrap_or(Value::Null(crate::core::DataType::Null))
})
.collect(),
);
}
} else if let Some(ref mapping) = self.column_mapping {
if self.is_full_projection {
self.current_row = self.volume.get_row_mapped(idx, mapping);
} else {
self.current_row =
self.volume
.get_row_mapped_projected(idx, mapping, &self.project_cols);
}
} else if self.is_full_projection {
self.current_row = self.volume.get_row(idx);
} else {
self.current_row = self.volume.get_row_projected(idx, &self.project_cols);
}
true
}
fn materialize_row_from_cache(&mut self, idx: usize) -> bool {
let col_count = self.volume.columns.len();
let full_row = if let Some(ref needed) = self.needed_cols {
let values: Vec<Value> = (0..col_count)
.map(|ci| {
if ci < needed.len() && needed[ci] {
let (col, local) = self.col_and_idx(ci, idx);
col.get_value(local)
} else {
Value::Null(self.volume.columns.data_type(ci))
}
})
.collect();
Row::from_values(values)
} else {
let values: Vec<Value> = (0..col_count)
.map(|ci| {
let (col, local) = self.col_and_idx(ci, idx);
col.get_value(local)
})
.collect();
Row::from_values(values)
};
if let Some(ref filter) = self.filter {
if !filter.evaluate_fast(&full_row) {
return false;
}
}
if self.is_full_projection {
self.current_row = full_row;
} else {
self.current_row = Row::from_values(
self.project_cols
.iter()
.map(|&col| {
full_row
.get(col)
.cloned()
.unwrap_or(Value::Null(crate::core::DataType::Null))
})
.collect(),
);
}
true
}
}
impl Scanner for VolumeScanner {
fn next(&mut self) -> bool {
if self.error.is_some() {
self.has_current = false;
return false;
}
let use_group_cache_fast = self.volume.columns.should_use_group_cache();
if self.matching_indices.is_some() {
loop {
let idx = match self.matching_indices.as_ref() {
Some(indices) if self.match_idx < indices.len() => {
let i = indices[self.match_idx];
self.match_idx += 1;
i
}
_ => {
self.has_current = false;
return false;
}
};
if self.should_skip_row(idx) {
continue;
}
if use_group_cache_fast {
let gi = idx / super::column::ROW_GROUP_SIZE;
let need_load = self.group_cache.as_ref().is_none_or(|c| c.group_idx != gi);
if need_load {
self.load_group_cache(gi);
if self.error.is_some() {
self.has_current = false;
return false;
}
}
}
if !self.typed_predicates.is_empty() && !self.evaluate_typed_predicates(idx) {
continue;
}
if !self.materialize_row(idx) {
continue;
}
self.current_rid = self.volume.meta.row_ids[idx];
self.has_current = true;
return true;
}
}
let use_group_cache = self.volume.columns.should_use_group_cache();
while self.current_idx < self.end_idx {
if self.current_idx >= self.next_group_boundary {
let group_idx = self.current_idx / super::column::ROW_GROUP_SIZE;
self.next_group_boundary =
((group_idx + 1) * super::column::ROW_GROUP_SIZE).min(self.end_idx);
if let Some(ref skips) = self.row_group_skips {
if group_idx < skips.len() && skips[group_idx] {
self.current_idx = self.next_group_boundary;
continue;
}
}
if use_group_cache {
self.load_group_cache(group_idx);
if self.error.is_some() {
self.has_current = false;
return false;
}
}
}
if self.should_skip_row(self.current_idx) {
self.current_idx += 1;
continue;
}
let idx = self.current_idx;
if !self.dict_filters.is_empty() && self.dict_filters_reject(idx) {
self.current_idx += 1;
continue;
}
if !self.typed_predicates.is_empty() && !self.evaluate_typed_predicates(idx) {
self.current_idx += 1;
continue;
}
if !self.materialize_row(idx) {
self.current_idx += 1;
continue;
}
self.current_rid = self.volume.meta.row_ids[idx];
self.has_current = true;
self.current_idx += 1;
return true;
}
self.has_current = false;
false
}
fn row(&self) -> &Row {
&self.current_row
}
fn err(&self) -> Option<&Error> {
self.error.as_ref()
}
fn close(&mut self) -> Result<()> {
self.has_current = false;
Ok(())
}
fn take_row(&mut self) -> Row {
self.has_current = false;
std::mem::take(&mut self.current_row)
}
fn take_row_with_id(&mut self) -> (i64, Row) {
let rid = self.current_rid;
self.has_current = false;
(rid, std::mem::take(&mut self.current_row))
}
fn current_row_id(&self) -> i64 {
self.current_rid
}
fn estimated_count(&self) -> Option<usize> {
Some(self.end_idx.saturating_sub(self.current_idx))
}
}
pub struct MergingScanner {
sources: Vec<Box<dyn Scanner>>,
current_source: usize,
}
impl MergingScanner {
pub fn new(sources: Vec<Box<dyn Scanner>>) -> Self {
Self {
sources,
current_source: 0,
}
}
}
impl Scanner for MergingScanner {
fn next(&mut self) -> bool {
while self.current_source < self.sources.len() {
if self.sources[self.current_source].next() {
return true;
}
if self.sources[self.current_source].err().is_some() {
return false;
}
self.current_source += 1;
}
false
}
fn row(&self) -> &Row {
debug_assert!(
self.current_source < self.sources.len(),
"row() called after iteration completed"
);
self.sources[self.current_source].row()
}
fn current_row_id(&self) -> i64 {
if self.current_source < self.sources.len() {
self.sources[self.current_source].current_row_id()
} else {
0
}
}
fn err(&self) -> Option<&Error> {
if self.current_source < self.sources.len() {
self.sources[self.current_source].err()
} else {
None
}
}
fn close(&mut self) -> Result<()> {
for source in &mut self.sources {
source.close()?;
}
Ok(())
}
fn take_row(&mut self) -> Row {
debug_assert!(
self.current_source < self.sources.len(),
"take_row() called after iteration completed"
);
self.sources[self.current_source].take_row()
}
fn take_row_with_id(&mut self) -> (i64, Row) {
debug_assert!(
self.current_source < self.sources.len(),
"take_row_with_id() called after iteration completed"
);
self.sources[self.current_source].take_row_with_id()
}
fn estimated_count(&self) -> Option<usize> {
let mut total = 0usize;
for source in &self.sources {
total += source.estimated_count()?;
}
Some(total)
}
}
pub struct RowVecScanner {
rows: crate::core::RowVec,
index: usize,
empty_row: Row,
}
impl RowVecScanner {
pub fn new(rows: crate::core::RowVec) -> Self {
Self {
rows,
index: 0,
empty_row: Row::new(),
}
}
}
impl Scanner for RowVecScanner {
fn next(&mut self) -> bool {
if self.index < self.rows.len() {
self.index += 1;
true
} else {
false
}
}
fn row(&self) -> &Row {
if self.index > 0 && self.index <= self.rows.len() {
&self.rows[self.index - 1].1
} else {
&self.empty_row
}
}
fn current_row_id(&self) -> i64 {
if self.index > 0 && self.index <= self.rows.len() {
self.rows[self.index - 1].0
} else {
0
}
}
fn take_row(&mut self) -> Row {
self.row().clone()
}
fn take_row_with_id(&mut self) -> (i64, Row) {
if self.index > 0 && self.index <= self.rows.len() {
let (id, ref row) = self.rows[self.index - 1];
(id, row.clone())
} else {
(0, Row::new())
}
}
fn err(&self) -> Option<&Error> {
None
}
fn close(&mut self) -> Result<()> {
Ok(())
}
fn estimated_count(&self) -> Option<usize> {
Some(self.rows.len())
}
}
#[cfg(test)]
mod tests {
use super::super::writer::VolumeBuilder;
use super::*;
use crate::core::{DataType, SchemaBuilder};
fn make_test_volume() -> Arc<FrozenVolume> {
let schema = SchemaBuilder::new("test")
.column("id", DataType::Integer, false, true)
.column("name", DataType::Text, false, false)
.column("price", DataType::Float, false, false)
.build();
let mut builder = VolumeBuilder::with_capacity(&schema, 5);
builder.add_row(
1,
&Row::from_values(vec![
Value::Integer(1),
Value::text("apple"),
Value::Float(1.50),
]),
);
builder.add_row(
2,
&Row::from_values(vec![
Value::Integer(2),
Value::text("banana"),
Value::Float(0.75),
]),
);
builder.add_row(
3,
&Row::from_values(vec![
Value::Integer(3),
Value::text("cherry"),
Value::Float(3.00),
]),
);
builder.add_row(
4,
&Row::from_values(vec![
Value::Integer(4),
Value::text("date"),
Value::Float(5.00),
]),
);
builder.add_row(
5,
&Row::from_values(vec![
Value::Integer(5),
Value::text("elderberry"),
Value::Float(8.00),
]),
);
Arc::new(builder.finish())
}
#[test]
fn test_full_scan() {
let vol = make_test_volume();
let mut scanner = VolumeScanner::new(vol, vec![], None);
let mut count = 0;
while scanner.next() {
let row = scanner.row();
assert_eq!(row.len(), 3);
count += 1;
}
assert_eq!(count, 5);
assert!(scanner.err().is_none());
}
#[test]
fn test_projected_scan() {
let vol = make_test_volume();
let mut scanner = VolumeScanner::new(vol, vec![1, 2], None);
assert!(scanner.next());
let row = scanner.row();
assert_eq!(row.len(), 2);
assert_eq!(row.get(0), Some(&Value::text("apple")));
assert_eq!(row.get(1), Some(&Value::Float(1.50)));
}
#[test]
fn test_range_scan() {
let vol = make_test_volume();
let mut scanner = VolumeScanner::with_range(Arc::clone(&vol), vec![], 2, 4, None);
let mut count = 0;
let mut ids = Vec::new();
while scanner.next() {
if let Some(Value::Integer(id)) = scanner.row().get(0) {
ids.push(*id);
}
count += 1;
}
assert_eq!(count, 2);
assert_eq!(ids, vec![3, 4]); }
#[test]
fn test_empty_scanner() {
let mut scanner = VolumeScanner::empty();
assert!(!scanner.next());
assert!(scanner.err().is_none());
}
#[test]
fn test_take_row() {
let vol = make_test_volume();
let mut scanner = VolumeScanner::new(vol, vec![0], None);
assert!(scanner.next());
let row = scanner.take_row();
assert_eq!(row.get(0), Some(&Value::Integer(1)));
}
#[test]
fn test_merging_scanner() {
let vol = make_test_volume();
let scanner1 = Box::new(VolumeScanner::with_range(
Arc::clone(&vol),
vec![0],
0,
2,
None,
));
let scanner2 = Box::new(VolumeScanner::with_range(
Arc::clone(&vol),
vec![0],
3,
5,
None,
));
let mut merger = MergingScanner::new(vec![scanner1, scanner2]);
let mut ids = Vec::new();
while merger.next() {
if let Some(Value::Integer(id)) = merger.row().get(0) {
ids.push(*id);
}
}
assert_eq!(ids, vec![1, 2, 4, 5]); assert!(merger.err().is_none());
}
#[test]
fn test_estimated_count() {
let vol = make_test_volume();
let scanner = VolumeScanner::new(Arc::clone(&vol), vec![], None);
assert_eq!(scanner.estimated_count(), Some(5));
let scanner = VolumeScanner::with_range(vol, vec![], 2, 4, None);
assert_eq!(scanner.estimated_count(), Some(2));
}
}