use crate::core::{Result, Row, Value};
use crate::parser::ast::Expression;
use crate::storage::traits::QueryResult;
use rustc_hash::{FxHashMap, FxHasher};
use super::expression::RowFilter;
pub struct ExecResult {
affected: i64,
insert_id: i64,
columns: Vec<String>,
empty_row: Row,
}
impl ExecResult {
pub fn new(rows_affected: i64, last_insert_id: i64) -> Self {
Self {
affected: rows_affected,
insert_id: last_insert_id,
columns: Vec::new(),
empty_row: Row::new(),
}
}
pub fn empty() -> Self {
Self::new(0, 0)
}
pub fn with_rows_affected(rows_affected: i64) -> Self {
Self::new(rows_affected, 0)
}
pub fn with_last_insert_id(rows_affected: i64, last_insert_id: i64) -> Self {
Self::new(rows_affected, last_insert_id)
}
}
impl QueryResult for ExecResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
false
}
fn scan(&self, _dest: &mut [Value]) -> Result<()> {
Err(crate::core::Error::internal("scan() called on exec result"))
}
fn row(&self) -> &Row {
&self.empty_row
}
fn close(&mut self) -> Result<()> {
Ok(())
}
fn rows_affected(&self) -> i64 {
self.affected
}
fn last_insert_id(&self) -> i64 {
self.insert_id
}
fn with_aliases(self: Box<Self>, _aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
self
}
}
pub struct ExecutorMemoryResult {
columns: Vec<String>,
rows: Vec<Row>,
current_index: Option<usize>,
closed: bool,
affected: i64,
insert_id: i64,
}
impl ExecutorMemoryResult {
pub fn new(columns: Vec<String>, rows: Vec<Row>) -> Self {
Self {
columns,
rows,
current_index: None,
closed: false,
affected: 0,
insert_id: 0,
}
}
pub fn empty() -> Self {
Self::new(Vec::new(), Vec::new())
}
pub fn with_columns(columns: Vec<String>) -> Self {
Self::new(columns, Vec::new())
}
pub fn with_schema(columns: Vec<String>, rows: Vec<Row>, _schema: crate::core::Schema) -> Self {
Self::new(columns, rows)
}
pub fn add_row(&mut self, row: Row) {
self.rows.push(row);
}
pub fn row_count(&self) -> usize {
self.rows.len()
}
pub fn rows(&self) -> &[Row] {
&self.rows
}
pub fn into_rows(self) -> Vec<Row> {
self.rows
}
pub fn reset(&mut self) {
self.current_index = None;
}
pub fn set_rows_affected(&mut self, count: i64) {
self.affected = count;
}
pub fn set_last_insert_id(&mut self, id: i64) {
self.insert_id = id;
}
}
impl QueryResult for ExecutorMemoryResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
if self.closed {
return false;
}
let next_index = match self.current_index {
None => 0,
Some(i) => i + 1,
};
if next_index < self.rows.len() {
self.current_index = Some(next_index);
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
let row = self.row();
if dest.len() != row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
row.len()
)));
}
for (i, value) in row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
match self.current_index {
Some(i) if i < self.rows.len() => &self.rows[i],
_ => panic!("row() called without successful next()"),
}
}
fn take_row(&mut self) -> Row {
match self.current_index {
Some(i) if i < self.rows.len() => {
std::mem::take(&mut self.rows[i])
}
_ => panic!("take_row() called without successful next()"),
}
}
fn close(&mut self) -> Result<()> {
self.closed = true;
Ok(())
}
fn rows_affected(&self) -> i64 {
self.affected
}
fn last_insert_id(&self) -> i64 {
self.insert_id
}
fn with_aliases(
mut self: Box<Self>,
aliases: FxHashMap<String, String>,
) -> Box<dyn QueryResult> {
for col in &mut self.columns {
for (alias, original) in &aliases {
if col == original {
*col = alias.clone();
break;
}
}
}
self
}
}
pub struct FilteredResult {
inner: Box<dyn QueryResult>,
predicate: Box<dyn Fn(&Row) -> bool + Send + Sync>,
current_row: Option<Row>,
columns: Vec<String>,
}
impl FilteredResult {
pub fn new(
inner: Box<dyn QueryResult>,
predicate: Box<dyn Fn(&Row) -> bool + Send + Sync>,
) -> Self {
let columns = inner.columns().to_vec();
Self {
inner,
predicate,
current_row: None,
columns,
}
}
}
impl QueryResult for FilteredResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
while self.inner.next() {
let row = self.inner.row();
if (self.predicate)(row) {
self.current_row = Some(self.inner.take_row());
return true;
}
}
self.current_row = None;
false
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if let Some(ref row) = self.current_row {
if dest.len() != row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
row.len()
)));
}
for (i, value) in row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
} else {
Err(crate::core::Error::internal(
"scan() called without successful next()",
))
}
}
fn row(&self) -> &Row {
self.current_row
.as_ref()
.expect("row() called without successful next()")
}
fn take_row(&mut self) -> Row {
self.current_row
.take()
.expect("take_row() called without successful next()")
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
self.inner.rows_affected()
}
fn last_insert_id(&self) -> i64 {
self.inner.last_insert_id()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct MappedResult {
inner: Box<dyn QueryResult>,
mapper: Box<dyn Fn(&Row) -> Row + Send + Sync>,
current_row: Row,
output_columns: Vec<String>,
}
impl MappedResult {
pub fn new(
inner: Box<dyn QueryResult>,
output_columns: Vec<String>,
mapper: Box<dyn Fn(&Row) -> Row + Send + Sync>,
) -> Self {
Self {
inner,
mapper,
current_row: Row::new(),
output_columns,
}
}
}
impl QueryResult for MappedResult {
fn columns(&self) -> &[String] {
&self.output_columns
}
fn next(&mut self) -> bool {
if self.inner.next() {
let source_row = self.inner.row();
self.current_row = (self.mapper)(source_row);
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct ExprFilteredResult {
inner: Box<dyn QueryResult>,
filter: RowFilter,
current_row: Option<Row>,
columns: Vec<String>,
}
unsafe impl Send for ExprFilteredResult {}
impl ExprFilteredResult {
pub fn new(inner: Box<dyn QueryResult>, filter_expr: &Expression) -> Result<Self> {
let columns = inner.columns().to_vec();
let filter = RowFilter::new(filter_expr, &columns)?;
Ok(Self {
inner,
filter,
current_row: None,
columns,
})
}
pub fn with_defaults(inner: Box<dyn QueryResult>, filter_expr: Expression) -> Self {
let columns = inner.columns().to_vec();
let filter =
RowFilter::new(&filter_expr, &columns).expect("Failed to compile filter expression");
Self {
inner,
filter,
current_row: None,
columns,
}
}
}
impl QueryResult for ExprFilteredResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
while self.inner.next() {
let row = self.inner.row();
if self.filter.matches(row) {
self.current_row = Some(self.inner.take_row());
return true;
}
}
self.current_row = None;
false
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if let Some(ref row) = self.current_row {
if dest.len() != row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
row.len()
)));
}
for (i, value) in row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
} else {
Err(crate::core::Error::internal(
"scan() called without successful next()",
))
}
}
fn row(&self) -> &Row {
self.current_row
.as_ref()
.expect("row() called without successful next()")
}
fn take_row(&mut self) -> Row {
self.current_row
.take()
.expect("take_row() called without successful next()")
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
self.inner.rows_affected()
}
fn last_insert_id(&self) -> i64 {
self.inner.last_insert_id()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
enum CompiledProjection {
Star,
QualifiedStar {
prefix_lower: String,
},
Compiled(super::expression::SharedProgram),
}
pub struct ExprMappedResult {
inner: Box<dyn QueryResult>,
projections: Vec<CompiledProjection>,
vm: super::expression::ExprVM,
current_row: Row,
output_columns: Vec<String>,
source_columns: Vec<String>,
}
unsafe impl Send for ExprMappedResult {}
impl ExprMappedResult {
pub fn new(
inner: Box<dyn QueryResult>,
expressions: Vec<Expression>,
output_columns: Vec<String>,
) -> Result<Self> {
use super::expression::compile_expression;
let source_columns = inner.columns().to_vec();
let mut projections = Vec::with_capacity(expressions.len());
for expr in &expressions {
let projection = match expr {
Expression::Star(_) => CompiledProjection::Star,
Expression::QualifiedStar(qs) => {
let prefix = format!("{}.", qs.qualifier);
CompiledProjection::QualifiedStar {
prefix_lower: prefix.to_lowercase(),
}
}
_ => {
let program = compile_expression(expr, &source_columns)?;
CompiledProjection::Compiled(program)
}
};
projections.push(projection);
}
Ok(Self {
inner,
projections,
vm: super::expression::ExprVM::new(),
current_row: Row::new(),
output_columns,
source_columns,
})
}
pub fn with_defaults(
inner: Box<dyn QueryResult>,
expressions: Vec<Expression>,
output_columns: Vec<String>,
) -> Self {
Self::new(inner, expressions, output_columns)
.expect("Failed to compile projection expressions")
}
}
impl QueryResult for ExprMappedResult {
fn columns(&self) -> &[String] {
&self.output_columns
}
fn next(&mut self) -> bool {
use super::expression::ExecuteContext;
if self.inner.next() {
let source_row = self.inner.row();
let row_data = source_row.as_slice();
let mut result_row = Row::with_capacity(self.projections.len());
for projection in &self.projections {
match projection {
CompiledProjection::Star => {
for value in source_row.iter() {
result_row.push(value.clone());
}
}
CompiledProjection::QualifiedStar { prefix_lower, .. } => {
for (idx, col) in self.source_columns.iter().enumerate() {
if col.to_lowercase().starts_with(prefix_lower)
&& idx < source_row.len()
{
result_row.push(source_row[idx].clone());
}
}
}
CompiledProjection::Compiled(program) => {
let ctx = ExecuteContext::new(row_data);
let value = self
.vm
.execute(program, &ctx)
.unwrap_or(Value::null_unknown());
result_row.push(value);
}
}
}
self.current_row = result_row;
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct LimitedResult {
inner: Box<dyn QueryResult>,
limit: Option<usize>,
offset: usize,
returned_count: usize,
offset_applied: bool,
columns: Vec<String>,
}
impl LimitedResult {
pub fn new(inner: Box<dyn QueryResult>, limit: Option<usize>, offset: usize) -> Self {
let columns = inner.columns().to_vec();
Self {
inner,
limit,
offset,
returned_count: 0,
offset_applied: false,
columns,
}
}
pub fn with_limit(inner: Box<dyn QueryResult>, limit: usize) -> Self {
Self::new(inner, Some(limit), 0)
}
pub fn with_offset(inner: Box<dyn QueryResult>, offset: usize) -> Self {
Self::new(inner, None, offset)
}
}
impl QueryResult for LimitedResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
if !self.offset_applied {
for _ in 0..self.offset {
if !self.inner.next() {
self.offset_applied = true;
return false;
}
}
self.offset_applied = true;
}
if let Some(limit) = self.limit {
if self.returned_count >= limit {
return false;
}
}
if self.inner.next() {
self.returned_count += 1;
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
self.inner.rows_affected()
}
fn last_insert_id(&self) -> i64 {
self.inner.last_insert_id()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct OrderedResult {
inner: ExecutorMemoryResult,
}
#[derive(Clone, Copy)]
pub struct RadixOrderSpec {
pub col_idx: usize,
pub ascending: bool,
pub nulls_first: Option<bool>,
}
impl OrderedResult {
pub fn new<F>(mut inner: Box<dyn QueryResult>, compare: F) -> Self
where
F: Fn(&Row, &Row) -> std::cmp::Ordering,
{
let columns = inner.columns().to_vec();
let mut rows = Vec::new();
while inner.next() {
rows.push(inner.take_row());
}
rows.sort_by(compare);
let memory_result = ExecutorMemoryResult::new(columns, rows);
Self {
inner: memory_result,
}
}
pub fn new_radix<F>(
mut inner: Box<dyn QueryResult>,
order_specs: &[RadixOrderSpec],
fallback_compare: F,
) -> Self
where
F: Fn(&Row, &Row) -> std::cmp::Ordering,
{
let columns = inner.columns().to_vec();
let mut rows = Vec::new();
while inner.next() {
rows.push(inner.take_row());
}
let has_explicit_nulls_ordering = order_specs.iter().any(|s| s.nulls_first.is_some());
if !has_explicit_nulls_ordering {
if order_specs.len() == 1 {
let spec = &order_specs[0];
if Self::try_radix_sort_single_int(&mut rows, spec.col_idx, spec.ascending) {
return Self {
inner: ExecutorMemoryResult::new(columns, rows),
};
}
}
if order_specs.len() <= 4 && Self::try_radix_sort_multi_int(&mut rows, order_specs) {
return Self {
inner: ExecutorMemoryResult::new(columns, rows),
};
}
}
rows.sort_by(fallback_compare);
Self {
inner: ExecutorMemoryResult::new(columns, rows),
}
}
fn try_radix_sort_single_int(rows: &mut [Row], col_idx: usize, ascending: bool) -> bool {
for row in rows.iter() {
match row.get(col_idx) {
Some(Value::Integer(_)) => continue,
Some(Value::Null(_)) => continue, _ => return false, }
}
if ascending {
radsort::sort_by_key(rows, |row| {
match row.get(col_idx) {
Some(Value::Integer(i)) => *i,
_ => i64::MIN, }
});
} else {
radsort::sort_by_key(rows, |row| {
match row.get(col_idx) {
Some(Value::Integer(i)) => {
i.wrapping_neg().wrapping_sub(1)
}
_ => i64::MAX, }
});
}
true
}
fn try_radix_sort_multi_int(rows: &mut [Row], order_specs: &[RadixOrderSpec]) -> bool {
for row in rows.iter() {
for spec in order_specs {
match row.get(spec.col_idx) {
Some(Value::Integer(_)) => continue,
Some(Value::Null(_)) => continue,
_ => return false,
}
}
}
for spec in order_specs.iter().rev() {
if spec.ascending {
radsort::sort_by_key(rows, |row| match row.get(spec.col_idx) {
Some(Value::Integer(i)) => *i,
_ => i64::MIN,
});
} else {
radsort::sort_by_key(rows, |row| match row.get(spec.col_idx) {
Some(Value::Integer(i)) => i.wrapping_neg().wrapping_sub(1),
_ => i64::MAX,
});
}
}
true
}
}
impl QueryResult for OrderedResult {
fn columns(&self) -> &[String] {
self.inner.columns()
}
fn next(&mut self) -> bool {
self.inner.next()
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct TopNResult {
inner: ExecutorMemoryResult,
}
impl TopNResult {
pub fn new<F>(mut inner: Box<dyn QueryResult>, compare: F, limit: usize, offset: usize) -> Self
where
F: Fn(&Row, &Row) -> std::cmp::Ordering + Clone,
{
use std::collections::BinaryHeap;
let columns = inner.columns().to_vec();
let heap_capacity = limit.saturating_add(offset);
if heap_capacity == 0 {
return Self {
inner: ExecutorMemoryResult::new(columns, Vec::new()),
};
}
struct HeapRow<F: Fn(&Row, &Row) -> std::cmp::Ordering> {
row: Row,
compare: F,
}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> PartialEq for HeapRow<F> {
fn eq(&self, other: &Self) -> bool {
(self.compare)(&self.row, &other.row) == std::cmp::Ordering::Equal
}
}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> Eq for HeapRow<F> {}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> PartialOrd for HeapRow<F> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> Ord for HeapRow<F> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.compare)(&self.row, &other.row)
}
}
let mut heap: BinaryHeap<HeapRow<F>> = BinaryHeap::with_capacity(heap_capacity + 1);
while inner.next() {
let row = inner.take_row();
if heap.len() < heap_capacity {
heap.push(HeapRow {
row,
compare: compare.clone(),
});
} else {
if let Some(worst) = heap.peek() {
if compare(&row, &worst.row) == std::cmp::Ordering::Less {
heap.pop();
heap.push(HeapRow {
row,
compare: compare.clone(),
});
}
}
}
}
let mut rows: Vec<Row> = heap.into_iter().map(|hr| hr.row).collect();
rows.sort_by(|a, b| compare(a, b));
if offset > 0 && offset < rows.len() {
rows.drain(..offset);
} else if offset >= rows.len() {
rows.clear();
}
Self {
inner: ExecutorMemoryResult::new(columns, rows),
}
}
}
impl QueryResult for TopNResult {
fn columns(&self) -> &[String] {
self.inner.columns()
}
fn next(&mut self) -> bool {
self.inner.next()
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct DistinctResult {
inner: ExecutorMemoryResult,
}
impl DistinctResult {
pub fn new(inner: Box<dyn QueryResult>) -> Self {
Self::with_column_count(inner, None)
}
pub fn with_column_count(
mut inner: Box<dyn QueryResult>,
distinct_columns: Option<usize>,
) -> Self {
use std::hash::{Hash, Hasher};
let columns = inner.columns().to_vec();
let num_distinct_cols = distinct_columns.unwrap_or(columns.len());
let mut hash_to_indices: FxHashMap<u64, Vec<usize>> = FxHashMap::default();
let mut rows = Vec::new();
while inner.next() {
let mut hasher = FxHasher::default();
let row = inner.row();
for value in row.as_slice().iter().take(num_distinct_cols) {
value.hash(&mut hasher);
}
let hash = hasher.finish();
let indices = hash_to_indices.entry(hash).or_default();
let is_duplicate = indices.iter().any(|&idx| {
let existing_row: &Row = &rows[idx];
for i in 0..num_distinct_cols {
match (row.get(i), existing_row.get(i)) {
(Some(v1), Some(v2)) if v1 == v2 => continue,
(None, None) => continue,
_ => return false,
}
}
true
});
if !is_duplicate {
indices.push(rows.len());
rows.push(inner.take_row());
}
}
let memory_result = ExecutorMemoryResult::new(columns, rows);
Self {
inner: memory_result,
}
}
}
impl QueryResult for DistinctResult {
fn columns(&self) -> &[String] {
self.inner.columns()
}
fn next(&mut self) -> bool {
self.inner.next()
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct AliasedResult {
inner: Box<dyn QueryResult>,
aliased_columns: Vec<String>,
}
impl AliasedResult {
pub fn new(inner: Box<dyn QueryResult>, aliases: FxHashMap<String, String>) -> Self {
let original_columns = inner.columns().to_vec();
let aliased_columns = original_columns
.iter()
.map(|col| {
for (alias, original) in &aliases {
if col == original {
return alias.clone();
}
}
col.clone()
})
.collect();
Self {
inner,
aliased_columns,
}
}
}
impl QueryResult for AliasedResult {
fn columns(&self) -> &[String] {
&self.aliased_columns
}
fn next(&mut self) -> bool {
self.inner.next()
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
self.inner.rows_affected()
}
fn last_insert_id(&self) -> i64 {
self.inner.last_insert_id()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct ProjectedResult {
inner: Box<dyn QueryResult>,
keep_columns: usize,
projected_columns: Vec<String>,
current_row: Row,
}
impl ProjectedResult {
pub fn new(inner: Box<dyn QueryResult>, keep_columns: usize) -> Self {
let projected_columns: Vec<String> =
inner.columns().iter().take(keep_columns).cloned().collect();
Self {
inner,
keep_columns,
projected_columns,
current_row: Row::new(),
}
}
}
impl QueryResult for ProjectedResult {
fn columns(&self) -> &[String] {
&self.projected_columns
}
fn next(&mut self) -> bool {
if self.inner.next() {
let full_row = self.inner.row();
self.current_row.clear();
self.current_row.reserve(self.keep_columns);
for i in 0..self.keep_columns {
self.current_row
.push(full_row.get(i).cloned().unwrap_or(Value::null_unknown()));
}
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
for (i, val) in dest.iter_mut().enumerate().take(self.keep_columns) {
*val = self
.current_row
.get(i)
.cloned()
.unwrap_or(Value::null_unknown());
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct ScannerResult {
scanner: Box<dyn crate::storage::traits::Scanner>,
columns: Vec<String>,
current_row: Row,
has_current: bool,
}
impl ScannerResult {
pub fn new(scanner: Box<dyn crate::storage::traits::Scanner>, columns: Vec<String>) -> Self {
Self {
scanner,
columns,
current_row: Row::new(),
has_current: false,
}
}
}
impl QueryResult for ScannerResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
if self.scanner.next() {
self.current_row = self.scanner.take_row();
self.has_current = true;
true
} else {
self.has_current = false;
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if !self.has_current {
return Err(crate::core::Error::internal(
"scan() called without successful next()",
));
}
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
if !self.has_current {
panic!("row() called without successful next()");
}
&self.current_row
}
fn take_row(&mut self) -> Row {
if !self.has_current {
panic!("take_row() called without successful next()");
}
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.scanner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub type StreamingFilterResult = FilteredResult;
pub struct StreamingProjectionResult {
inner: Box<dyn QueryResult>,
column_indices: Vec<usize>,
output_columns: Vec<String>,
current_row: Row,
}
impl StreamingProjectionResult {
pub fn new(
inner: Box<dyn QueryResult>,
column_indices: Vec<usize>,
output_columns: Vec<String>,
) -> Self {
Self {
inner,
column_indices,
output_columns,
current_row: Row::new(),
}
}
}
impl QueryResult for StreamingProjectionResult {
fn columns(&self) -> &[String] {
&self.output_columns
}
fn next(&mut self) -> bool {
if self.inner.next() {
let source_row = self.inner.row();
self.current_row.clear();
self.current_row.reserve(self.column_indices.len());
for &idx in &self.column_indices {
self.current_row.push(
source_row
.get(idx)
.cloned()
.unwrap_or(Value::null_unknown()),
);
}
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exec_result() {
let result = ExecResult::new(5, 10);
assert_eq!(result.rows_affected(), 5);
assert_eq!(result.last_insert_id(), 10);
assert_eq!(result.columns().len(), 0);
}
#[test]
fn test_exec_result_empty() {
let result = ExecResult::empty();
assert_eq!(result.rows_affected(), 0);
assert_eq!(result.last_insert_id(), 0);
}
#[test]
fn test_memory_result() {
let columns = vec!["id".to_string(), "name".to_string()];
let rows = vec![
Row::from_values(vec![Value::Integer(1), Value::text("Alice")]),
Row::from_values(vec![Value::Integer(2), Value::text("Bob")]),
];
let mut result = ExecutorMemoryResult::new(columns, rows);
assert_eq!(result.columns().len(), 2);
assert_eq!(result.row_count(), 2);
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(1)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(!result.next());
}
#[test]
fn test_filtered_result() {
let columns = vec!["id".to_string(), "value".to_string()];
let rows = vec![
Row::from_values(vec![Value::Integer(1), Value::Integer(10)]),
Row::from_values(vec![Value::Integer(2), Value::Integer(20)]),
Row::from_values(vec![Value::Integer(3), Value::Integer(30)]),
];
let inner = Box::new(ExecutorMemoryResult::new(columns, rows));
let predicate = Box::new(|row: &Row| {
if let Some(Value::Integer(v)) = row.get(1) {
*v > 15
} else {
false
}
});
let mut result = FilteredResult::new(inner, predicate);
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(3)));
assert!(!result.next());
}
#[test]
fn test_limited_result() {
let columns = vec!["id".to_string()];
let rows = vec![
Row::from_values(vec![Value::Integer(1)]),
Row::from_values(vec![Value::Integer(2)]),
Row::from_values(vec![Value::Integer(3)]),
Row::from_values(vec![Value::Integer(4)]),
Row::from_values(vec![Value::Integer(5)]),
];
let inner = Box::new(ExecutorMemoryResult::new(columns.clone(), rows.clone()));
let mut result = LimitedResult::new(inner, Some(2), 1);
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(3)));
assert!(!result.next()); }
#[test]
fn test_ordered_result() {
let columns = vec!["id".to_string(), "value".to_string()];
let rows = vec![
Row::from_values(vec![Value::Integer(3), Value::Integer(30)]),
Row::from_values(vec![Value::Integer(1), Value::Integer(10)]),
Row::from_values(vec![Value::Integer(2), Value::Integer(20)]),
];
let inner = Box::new(ExecutorMemoryResult::new(columns, rows));
let mut result = OrderedResult::new(inner, |a, b| {
let a_id = a.get(0).and_then(|v| v.as_int64()).unwrap_or(0);
let b_id = b.get(0).and_then(|v| v.as_int64()).unwrap_or(0);
a_id.cmp(&b_id)
});
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(1)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(3)));
assert!(!result.next());
}
#[test]
fn test_distinct_result() {
let columns = vec!["name".to_string()];
let rows = vec![
Row::from_values(vec![Value::text("Alice")]),
Row::from_values(vec![Value::text("Bob")]),
Row::from_values(vec![Value::text("Alice")]), Row::from_values(vec![Value::text("Charlie")]),
];
let inner = Box::new(ExecutorMemoryResult::new(columns, rows));
let mut result = DistinctResult::new(inner);
let mut names = Vec::new();
while result.next() {
if let Some(Value::Text(name)) = result.row().get(0) {
names.push(name.to_string());
}
}
assert_eq!(names.len(), 3);
assert!(names.contains(&"Alice".to_string()));
assert!(names.contains(&"Bob".to_string()));
assert!(names.contains(&"Charlie".to_string()));
}
#[test]
fn test_aliased_result() {
let columns = vec!["id".to_string(), "name".to_string()];
let rows = vec![Row::from_values(vec![
Value::Integer(1),
Value::text("Alice"),
])];
let inner = Box::new(ExecutorMemoryResult::new(columns, rows));
let mut aliases = FxHashMap::default();
aliases.insert("user_name".to_string(), "name".to_string());
let mut result = AliasedResult::new(inner, aliases);
assert_eq!(result.columns(), &["id", "user_name"]);
assert!(result.next());
assert_eq!(result.row().get(1), Some(&Value::text("Alice")));
}
}