use crate::common::CompactArc;
use crate::core::{Result, Row, RowVec, Value};
use crate::parser::ast::Expression;
use crate::storage::traits::QueryResult;
use rustc_hash::{FxHashMap, FxHasher};
use super::expression::RowFilter;
pub struct ExecResult {
affected: i64,
insert_id: i64,
}
static EMPTY_COLUMNS: &[String] = &[];
static EMPTY_ROW: std::sync::OnceLock<Row> = std::sync::OnceLock::new();
#[inline]
fn get_empty_row() -> &'static Row {
EMPTY_ROW.get_or_init(Row::new)
}
impl ExecResult {
#[inline]
pub fn new(rows_affected: i64, last_insert_id: i64) -> Self {
Self {
affected: rows_affected,
insert_id: last_insert_id,
}
}
#[inline]
pub fn empty() -> Self {
Self::new(0, 0)
}
#[inline]
pub fn with_rows_affected(rows_affected: i64) -> Self {
Self::new(rows_affected, 0)
}
#[inline]
pub fn with_last_insert_id(rows_affected: i64, last_insert_id: i64) -> Self {
Self::new(rows_affected, last_insert_id)
}
}
impl QueryResult for ExecResult {
fn columns(&self) -> &[String] {
EMPTY_COLUMNS
}
fn next(&mut self) -> bool {
false
}
fn scan(&self, _dest: &mut [Value]) -> Result<()> {
Err(crate::core::Error::internal("scan() called on exec result"))
}
fn row(&self) -> &Row {
get_empty_row()
}
fn close(&mut self) -> Result<()> {
Ok(())
}
fn rows_affected(&self) -> i64 {
self.affected
}
fn last_insert_id(&self) -> i64 {
self.insert_id
}
fn with_aliases(self: Box<Self>, _aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
self
}
}
enum RowStorage {
Owned(RowVec),
Shared(CompactArc<Vec<Row>>),
}
impl RowStorage {
#[inline]
fn len(&self) -> usize {
match self {
RowStorage::Owned(rv) => rv.len(),
RowStorage::Shared(rows) => rows.len(),
}
}
#[inline]
fn get(&self, index: usize) -> Option<&Row> {
match self {
RowStorage::Owned(rv) => rv.get(index).map(|(_, row)| row),
RowStorage::Shared(rows) => rows.get(index),
}
}
#[inline]
fn take(&mut self, index: usize) -> Row {
match self {
RowStorage::Owned(rv) => std::mem::take(&mut rv[index].1),
RowStorage::Shared(rows) => rows[index].clone(),
}
}
}
pub struct ExecutorResult {
columns: CompactArc<Vec<String>>,
rows: RowStorage,
len: usize,
current_index: Option<usize>,
closed: bool,
affected: i64,
insert_id: i64,
}
impl ExecutorResult {
pub fn new(columns: Vec<String>, rows: RowVec) -> Self {
let len = rows.len();
Self {
columns: CompactArc::new(columns),
rows: RowStorage::Owned(rows),
len,
current_index: None,
closed: false,
affected: 0,
insert_id: 0,
}
}
pub fn with_arc_columns(columns: CompactArc<Vec<String>>, rows: RowVec) -> Self {
let len = rows.len();
Self {
columns,
rows: RowStorage::Owned(rows),
len,
current_index: None,
closed: false,
affected: 0,
insert_id: 0,
}
}
pub fn with_shared_rows(columns: Vec<String>, rows: CompactArc<Vec<Row>>) -> Self {
let len = rows.len();
Self {
columns: CompactArc::new(columns),
rows: RowStorage::Shared(rows),
len,
current_index: None,
closed: false,
affected: 0,
insert_id: 0,
}
}
pub fn with_arc_columns_shared_rows(
columns: CompactArc<Vec<String>>,
rows: CompactArc<Vec<Row>>,
) -> Self {
let len = rows.len();
Self {
columns,
rows: RowStorage::Shared(rows),
len,
current_index: None,
closed: false,
affected: 0,
insert_id: 0,
}
}
pub fn with_arc_all(columns: CompactArc<Vec<String>>, rows: CompactArc<Vec<Row>>) -> Self {
let len = rows.len();
Self {
columns,
rows: RowStorage::Shared(rows),
len,
current_index: None,
closed: false,
affected: 0,
insert_id: 0,
}
}
pub fn empty() -> Self {
Self::new(Vec::new(), RowVec::new())
}
pub fn with_columns(columns: Vec<String>) -> Self {
Self::new(columns, RowVec::new())
}
pub fn with_schema(columns: Vec<String>, rows: RowVec, _schema: crate::core::Schema) -> Self {
Self::new(columns, rows)
}
pub fn add_row(&mut self, row: Row) {
if let RowStorage::Shared(arc_rows) = &self.rows {
let mut rv = RowVec::with_capacity(arc_rows.len() + 1);
for (idx, r) in arc_rows.iter().enumerate() {
rv.push((idx as i64, r.clone()));
}
self.rows = RowStorage::Owned(rv);
}
if let RowStorage::Owned(rv) = &mut self.rows {
rv.push((self.len as i64, row));
self.len += 1;
}
}
#[inline]
pub fn row_count(&self) -> usize {
self.len
}
#[inline]
pub fn get_row(&self, index: usize) -> Option<&Row> {
self.rows.get(index)
}
pub fn into_rows(self) -> Vec<Row> {
match self.rows {
RowStorage::Owned(mut rv) => rv.drain_rows().collect(),
RowStorage::Shared(rows) => {
CompactArc::try_unwrap(rows).unwrap_or_else(|arc| (*arc).clone())
}
}
}
pub fn into_arc_rows(self) -> CompactArc<Vec<Row>> {
match self.rows {
RowStorage::Owned(mut rv) => CompactArc::new(rv.drain_rows().collect()),
RowStorage::Shared(rows) => rows,
}
}
pub fn reset(&mut self) {
self.current_index = None;
}
pub fn set_rows_affected(&mut self, count: i64) {
self.affected = count;
}
pub fn set_last_insert_id(&mut self, id: i64) {
self.insert_id = id;
}
}
impl QueryResult for ExecutorResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn columns_arc(&self) -> Option<CompactArc<Vec<String>>> {
Some(CompactArc::clone(&self.columns))
}
#[inline]
fn next(&mut self) -> bool {
if self.closed {
return false;
}
let next_index = match self.current_index {
None => 0,
Some(i) => i + 1,
};
if next_index < self.len {
self.current_index = Some(next_index);
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
let row = self.row();
if dest.len() != row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
row.len()
)));
}
for (i, value) in row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
match self.current_index {
Some(i) => self
.rows
.get(i)
.expect("row() called without successful next()"),
_ => panic!("row() called without successful next()"),
}
}
fn take_row(&mut self) -> Row {
match self.current_index {
Some(i) if i < self.rows.len() => self.rows.take(i),
_ => panic!("take_row() called without successful next()"),
}
}
fn close(&mut self) -> Result<()> {
self.closed = true;
Ok(())
}
fn rows_affected(&self) -> i64 {
self.affected
}
fn last_insert_id(&self) -> i64 {
self.insert_id
}
fn try_into_arc_rows(&mut self) -> Option<CompactArc<Vec<Row>>> {
let rows = std::mem::replace(&mut self.rows, RowStorage::Owned(RowVec::new()));
self.closed = true; match rows {
RowStorage::Owned(mut rv) => Some(CompactArc::new(rv.drain_rows().collect())),
RowStorage::Shared(arc) => Some(arc),
}
}
fn with_aliases(
mut self: Box<Self>,
aliases: FxHashMap<String, String>,
) -> Box<dyn QueryResult> {
let columns = CompactArc::make_mut(&mut self.columns);
for col in columns {
for (alias, original) in &aliases {
if col == original {
*col = alias.clone();
break;
}
}
}
self
}
}
pub struct FilteredResult {
inner: Box<dyn QueryResult>,
filter: RowFilter,
current_row: Option<Row>,
columns: Vec<String>,
pending_error: Option<crate::core::Error>,
}
unsafe impl Send for FilteredResult {}
impl FilteredResult {
pub fn new(inner: Box<dyn QueryResult>, filter_expr: &Expression) -> Result<Self> {
let columns = inner.columns().to_vec();
let filter = RowFilter::new(filter_expr, &columns)?;
Ok(Self {
inner,
filter,
current_row: None,
columns,
pending_error: None,
})
}
pub fn from_filter(inner: Box<dyn QueryResult>, filter: RowFilter) -> Self {
let columns = inner.columns().to_vec();
Self {
inner,
filter,
current_row: None,
columns,
pending_error: None,
}
}
pub fn with_defaults(inner: Box<dyn QueryResult>, filter_expr: Expression) -> Result<Self> {
let columns = inner.columns().to_vec();
let filter = RowFilter::new(&filter_expr, &columns)?;
Ok(Self {
inner,
filter,
current_row: None,
columns,
pending_error: None,
})
}
}
impl QueryResult for FilteredResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
while self.inner.next() {
let row = self.inner.row();
match self.filter.matches_checked(row) {
Ok(true) => {
self.current_row = Some(self.inner.take_row());
return true;
}
Ok(false) => continue,
Err(e) => {
self.pending_error = Some(e);
self.current_row = None;
return false;
}
}
}
self.current_row = None;
false
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if let Some(ref row) = self.current_row {
if dest.len() != row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
row.len()
)));
}
for (i, value) in row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
} else {
Err(crate::core::Error::internal(
"scan() called without successful next()",
))
}
}
fn row(&self) -> &Row {
self.current_row
.as_ref()
.expect("row() called without successful next()")
}
fn take_row(&mut self) -> Row {
self.current_row
.take()
.expect("take_row() called without successful next()")
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
self.inner.rows_affected()
}
fn last_insert_id(&self) -> i64 {
self.inner.last_insert_id()
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.pending_error
.take()
.or_else(|| self.inner.last_error())
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
enum CompiledProjection {
Star,
QualifiedStar {
qualifier_lower: String,
},
Compiled(super::expression::SharedProgram),
}
pub struct ExprMappedResult {
inner: Box<dyn QueryResult>,
projections: Vec<CompiledProjection>,
vm: super::expression::ExprVM,
current_row: Row,
output_columns: Vec<String>,
source_columns_lower: Vec<String>,
}
unsafe impl Send for ExprMappedResult {}
impl ExprMappedResult {
pub fn new(
inner: Box<dyn QueryResult>,
expressions: Vec<Expression>,
output_columns: Vec<String>,
) -> Result<Self> {
use super::expression::compile_expression;
let source_columns = inner.columns().to_vec();
let mut projections = Vec::with_capacity(expressions.len());
for expr in &expressions {
let projection = match expr {
Expression::Star(_) => CompiledProjection::Star,
Expression::QualifiedStar(qs) => CompiledProjection::QualifiedStar {
qualifier_lower: qs.qualifier.to_lowercase().to_string(),
},
_ => {
let program = compile_expression(expr, &source_columns)?;
CompiledProjection::Compiled(program)
}
};
projections.push(projection);
}
let source_columns_lower: Vec<String> =
source_columns.iter().map(|c| c.to_lowercase()).collect();
let capacity = projections.len();
Ok(Self {
inner,
projections,
vm: super::expression::ExprVM::new(),
current_row: Row::with_capacity(capacity),
output_columns,
source_columns_lower,
})
}
pub fn with_defaults(
inner: Box<dyn QueryResult>,
expressions: Vec<Expression>,
output_columns: Vec<String>,
) -> Result<Self> {
Self::new(inner, expressions, output_columns)
}
}
impl QueryResult for ExprMappedResult {
fn columns(&self) -> &[String] {
&self.output_columns
}
fn next(&mut self) -> bool {
use super::expression::ExecuteContext;
if self.inner.next() {
let source_row = self.inner.row();
self.current_row.reserve_inline(self.projections.len());
self.current_row.clear_inline();
for projection in &self.projections {
match projection {
CompiledProjection::Star => {
for value in source_row.iter() {
self.current_row.push_inline(value.clone());
}
}
CompiledProjection::QualifiedStar { qualifier_lower } => {
let qualifier_len = qualifier_lower.len();
for (idx, col_lower) in self.source_columns_lower.iter().enumerate() {
if col_lower.len() > qualifier_len
&& col_lower.starts_with(qualifier_lower.as_str())
&& col_lower.as_bytes()[qualifier_len] == b'.'
&& idx < source_row.len()
{
self.current_row.push_inline(source_row[idx].clone());
}
}
}
CompiledProjection::Compiled(program) => {
let ctx = ExecuteContext::new(source_row);
let value = self
.vm
.execute_cow(program, &ctx)
.unwrap_or(Value::null_unknown());
self.current_row.push_inline(value);
}
}
}
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.inner.last_error()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct LimitedResult {
inner: Box<dyn QueryResult>,
limit: Option<usize>,
offset: usize,
returned_count: usize,
offset_applied: bool,
columns: Vec<String>,
}
impl LimitedResult {
pub fn new(inner: Box<dyn QueryResult>, limit: Option<usize>, offset: usize) -> Self {
let columns = inner.columns().to_vec();
Self {
inner,
limit,
offset,
returned_count: 0,
offset_applied: false,
columns,
}
}
pub fn with_limit(inner: Box<dyn QueryResult>, limit: usize) -> Self {
Self::new(inner, Some(limit), 0)
}
pub fn with_offset(inner: Box<dyn QueryResult>, offset: usize) -> Self {
Self::new(inner, None, offset)
}
}
impl QueryResult for LimitedResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
if !self.offset_applied {
for _ in 0..self.offset {
if !self.inner.next() {
self.offset_applied = true;
return false;
}
}
self.offset_applied = true;
}
if let Some(limit) = self.limit {
if self.returned_count >= limit {
return false;
}
}
if self.inner.next() {
self.returned_count += 1;
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
self.inner.rows_affected()
}
fn last_insert_id(&self) -> i64 {
self.inner.last_insert_id()
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.inner.last_error()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct OrderedResult {
inner: ExecutorResult,
}
#[derive(Clone, Copy)]
pub struct RadixOrderSpec {
pub col_idx: usize,
pub ascending: bool,
pub nulls_first: Option<bool>,
}
impl OrderedResult {
pub fn new<F>(mut inner: Box<dyn QueryResult>, compare: F) -> Result<Self>
where
F: Fn(&Row, &Row) -> std::cmp::Ordering,
{
let columns = inner.columns().to_vec();
let mut rows = RowVec::new();
let mut idx = 0i64;
while inner.next() {
rows.push((idx, inner.take_row()));
idx += 1;
}
if let Some(err) = inner.last_error() {
return Err(err);
}
rows.sort_unstable_by(|(_, a), (_, b)| compare(a, b));
let memory_result = ExecutorResult::new(columns, rows);
Ok(Self {
inner: memory_result,
})
}
pub fn new_radix<F>(
mut inner: Box<dyn QueryResult>,
order_specs: &[RadixOrderSpec],
fallback_compare: F,
) -> Result<Self>
where
F: Fn(&Row, &Row) -> std::cmp::Ordering,
{
let columns = inner.columns().to_vec();
let mut rows = RowVec::new();
let mut idx = 0i64;
while inner.next() {
rows.push((idx, inner.take_row()));
idx += 1;
}
if let Some(err) = inner.last_error() {
return Err(err);
}
let has_explicit_nulls_ordering = order_specs.iter().any(|s| s.nulls_first.is_some());
if !has_explicit_nulls_ordering {
if order_specs.len() == 1 {
let spec = &order_specs[0];
if Self::try_radix_sort_single_int(&mut rows, spec.col_idx, spec.ascending) {
return Ok(Self {
inner: ExecutorResult::new(columns, rows),
});
}
}
if order_specs.len() <= 4 && Self::try_radix_sort_multi_int(&mut rows, order_specs) {
return Ok(Self {
inner: ExecutorResult::new(columns, rows),
});
}
}
rows.sort_unstable_by(|(_, a), (_, b)| fallback_compare(a, b));
Ok(Self {
inner: ExecutorResult::new(columns, rows),
})
}
fn try_radix_sort_single_int(rows: &mut RowVec, col_idx: usize, ascending: bool) -> bool {
for (_, row) in rows.iter() {
match row.get(col_idx) {
Some(Value::Integer(_)) => continue,
Some(Value::Null(_)) => continue, _ => return false, }
}
if ascending {
radsort::sort_by_key(rows, |(_, row)| {
match row.get(col_idx) {
Some(Value::Integer(i)) => *i,
_ => i64::MIN, }
});
} else {
radsort::sort_by_key(rows, |(_, row)| {
match row.get(col_idx) {
Some(Value::Integer(i)) => {
i.wrapping_neg().wrapping_sub(1)
}
_ => i64::MAX, }
});
}
true
}
fn try_radix_sort_multi_int(rows: &mut RowVec, order_specs: &[RadixOrderSpec]) -> bool {
for (_, row) in rows.iter() {
for spec in order_specs {
match row.get(spec.col_idx) {
Some(Value::Integer(_)) => continue,
Some(Value::Null(_)) => continue,
_ => return false,
}
}
}
for spec in order_specs.iter().rev() {
if spec.ascending {
radsort::sort_by_key(rows, |(_, row)| match row.get(spec.col_idx) {
Some(Value::Integer(i)) => *i,
_ => i64::MIN,
});
} else {
radsort::sort_by_key(rows, |(_, row)| match row.get(spec.col_idx) {
Some(Value::Integer(i)) => i.wrapping_neg().wrapping_sub(1),
_ => i64::MAX,
});
}
}
true
}
}
impl QueryResult for OrderedResult {
fn columns(&self) -> &[String] {
self.inner.columns()
}
fn next(&mut self) -> bool {
self.inner.next()
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct TopNResult {
inner: ExecutorResult,
}
impl TopNResult {
pub fn new<F>(
mut inner: Box<dyn QueryResult>,
compare: F,
limit: usize,
offset: usize,
) -> Result<Self>
where
F: Fn(&Row, &Row) -> std::cmp::Ordering + Clone,
{
use std::collections::BinaryHeap;
let columns = inner.columns().to_vec();
let heap_capacity = limit.saturating_add(offset);
if heap_capacity == 0 {
return Ok(Self {
inner: ExecutorResult::new(columns, RowVec::new()),
});
}
let compare = std::sync::Arc::new(compare);
struct HeapRow<F: Fn(&Row, &Row) -> std::cmp::Ordering> {
row: Row,
compare: std::sync::Arc<F>,
}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> PartialEq for HeapRow<F> {
fn eq(&self, other: &Self) -> bool {
(self.compare)(&self.row, &other.row) == std::cmp::Ordering::Equal
}
}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> Eq for HeapRow<F> {}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> PartialOrd for HeapRow<F> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<F: Fn(&Row, &Row) -> std::cmp::Ordering> Ord for HeapRow<F> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.compare)(&self.row, &other.row)
}
}
let mut heap: BinaryHeap<HeapRow<F>> = BinaryHeap::with_capacity(heap_capacity + 1);
while inner.next() {
let row = inner.take_row();
if heap.len() < heap_capacity {
heap.push(HeapRow {
row,
compare: std::sync::Arc::clone(&compare),
});
} else if let Some(worst) = heap.peek() {
if compare(&row, &worst.row) == std::cmp::Ordering::Less {
heap.pop();
heap.push(HeapRow {
row,
compare: std::sync::Arc::clone(&compare),
});
}
}
}
if let Some(err) = inner.last_error() {
return Err(err);
}
let mut rows: Vec<Row> = heap.into_iter().map(|hr| hr.row).collect();
rows.sort_unstable_by(|a, b| compare(a, b));
if offset > 0 && offset < rows.len() {
rows.drain(..offset);
} else if offset >= rows.len() {
rows.clear();
}
let result_rows: RowVec = rows
.into_iter()
.enumerate()
.map(|(i, row)| (i as i64, row))
.collect();
Ok(Self {
inner: ExecutorResult::new(columns, result_rows),
})
}
}
impl QueryResult for TopNResult {
fn columns(&self) -> &[String] {
self.inner.columns()
}
fn next(&mut self) -> bool {
self.inner.next()
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct DistinctResult {
inner: Box<dyn QueryResult>,
columns: Vec<String>,
distinct_column_count: usize,
seen: FxHashMap<u64, Vec<Vec<Value>>>,
current_row: Row,
has_current: bool,
}
impl DistinctResult {
pub fn new(inner: Box<dyn QueryResult>) -> Self {
Self::with_column_count(inner, None)
}
pub fn with_column_count(inner: Box<dyn QueryResult>, distinct_columns: Option<usize>) -> Self {
let columns = inner.columns().to_vec();
let distinct_column_count = distinct_columns.unwrap_or(columns.len());
Self {
inner,
columns,
distinct_column_count,
seen: FxHashMap::default(),
current_row: Row::new(),
has_current: false,
}
}
fn hash_row(&self, row: &Row) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = FxHasher::default();
for value in row.iter().take(self.distinct_column_count) {
value.hash(&mut hasher);
}
hasher.finish()
}
fn extract_distinct_values(&self, row: &Row) -> Vec<Value> {
row.iter()
.take(self.distinct_column_count)
.cloned()
.collect()
}
}
impl QueryResult for DistinctResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
while self.inner.next() {
let row = self.inner.row();
let hash = self.hash_row(row);
let values = self.extract_distinct_values(row);
let is_dup = if let Some(seen_rows) = self.seen.get(&hash) {
seen_rows.contains(&values)
} else {
false
};
if !is_dup {
self.current_row = self.inner.take_row();
self.seen.entry(hash).or_default().push(values);
self.has_current = true;
return true;
}
}
self.has_current = false;
false
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if !self.has_current {
return Ok(());
}
let len = dest.len().min(self.current_row.len());
for (i, v) in self.current_row.iter().take(len).enumerate() {
dest[i] = v.clone();
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
self.has_current = false;
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.inner.last_error()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct DistinctOnResult {
inner: Box<dyn QueryResult>,
columns: Vec<String>,
key_indices: Vec<usize>,
seen: FxHashMap<u64, Vec<Vec<Value>>>,
current_row: Row,
has_current: bool,
}
impl DistinctOnResult {
pub fn new(inner: Box<dyn QueryResult>, key_indices: Vec<usize>) -> Self {
let columns = inner.columns().to_vec();
Self {
inner,
columns,
key_indices,
seen: FxHashMap::default(),
current_row: Row::new(),
has_current: false,
}
}
fn extract_key(&self, row: &Row) -> Vec<Value> {
self.key_indices
.iter()
.map(|&i| row.get(i).cloned().unwrap_or_else(Value::null_unknown))
.collect()
}
fn hash_key(&self, key: &[Value]) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = FxHasher::default();
for value in key {
value.hash(&mut hasher);
}
hasher.finish()
}
}
impl QueryResult for DistinctOnResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
while self.inner.next() {
let row = self.inner.row();
let key = self.extract_key(row);
let hash = self.hash_key(&key);
let is_dup = if let Some(seen_keys) = self.seen.get(&hash) {
seen_keys.contains(&key)
} else {
false
};
if is_dup {
continue; }
self.seen.entry(hash).or_default().push(key);
self.current_row = self.inner.take_row();
self.has_current = true;
return true;
}
self.has_current = false;
false
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if !self.has_current {
return Ok(());
}
let len = dest.len().min(self.current_row.len());
for (i, v) in self.current_row.iter().take(len).enumerate() {
dest[i] = v.clone();
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
self.has_current = false;
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.seen.clear();
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.inner.last_error()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct AliasedResult {
inner: Box<dyn QueryResult>,
aliased_columns: Vec<String>,
}
impl AliasedResult {
pub fn new(inner: Box<dyn QueryResult>, aliases: FxHashMap<String, String>) -> Self {
let original_columns = inner.columns().to_vec();
let aliased_columns = original_columns
.iter()
.map(|col| {
for (alias, original) in &aliases {
if col == original {
return alias.clone();
}
}
col.clone()
})
.collect();
Self {
inner,
aliased_columns,
}
}
}
impl QueryResult for AliasedResult {
fn columns(&self) -> &[String] {
&self.aliased_columns
}
fn next(&mut self) -> bool {
self.inner.next()
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
self.inner.scan(dest)
}
fn row(&self) -> &Row {
self.inner.row()
}
fn take_row(&mut self) -> Row {
self.inner.take_row()
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
self.inner.rows_affected()
}
fn last_insert_id(&self) -> i64 {
self.inner.last_insert_id()
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.inner.last_error()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct ProjectedResult {
inner: Box<dyn QueryResult>,
keep_columns: usize,
projected_columns: Vec<String>,
current_row: Row,
}
impl ProjectedResult {
pub fn new(inner: Box<dyn QueryResult>, keep_columns: usize) -> Self {
let projected_columns: Vec<String> =
inner.columns().iter().take(keep_columns).cloned().collect();
Self {
inner,
keep_columns,
projected_columns,
current_row: Row::with_capacity(keep_columns),
}
}
}
impl QueryResult for ProjectedResult {
fn columns(&self) -> &[String] {
&self.projected_columns
}
fn next(&mut self) -> bool {
if self.inner.next() {
self.current_row.reserve_inline(self.keep_columns);
self.current_row.clear_inline();
let full_row = self.inner.row();
for i in 0..self.keep_columns {
self.current_row
.push_inline(full_row.get(i).cloned().unwrap_or(Value::null_unknown()));
}
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
for (i, val) in dest.iter_mut().enumerate().take(self.keep_columns) {
*val = self
.current_row
.get(i)
.cloned()
.unwrap_or(Value::null_unknown());
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.inner.last_error()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct ScannerResult {
scanner: Box<dyn crate::storage::traits::Scanner>,
columns: Vec<String>,
current_row: Row,
has_current: bool,
}
impl ScannerResult {
pub fn new(scanner: Box<dyn crate::storage::traits::Scanner>, columns: Vec<String>) -> Self {
Self {
scanner,
columns,
current_row: Row::new(),
has_current: false,
}
}
}
impl QueryResult for ScannerResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn next(&mut self) -> bool {
if self.scanner.next() {
self.current_row = self.scanner.take_row();
self.has_current = true;
true
} else {
self.has_current = false;
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if !self.has_current {
return Err(crate::core::Error::internal(
"scan() called without successful next()",
));
}
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
if !self.has_current {
panic!("row() called without successful next()");
}
&self.current_row
}
fn take_row(&mut self) -> Row {
if !self.has_current {
panic!("take_row() called without successful next()");
}
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.scanner.close()
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.scanner.err().cloned()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn estimated_count(&self) -> Option<usize> {
self.scanner.estimated_count()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub type StreamingFilterResult = FilteredResult;
pub struct StreamingProjectionResult {
inner: Box<dyn QueryResult>,
column_indices: Vec<usize>,
output_columns: Vec<String>,
current_row: Row,
}
impl StreamingProjectionResult {
pub fn new(
inner: Box<dyn QueryResult>,
column_indices: Vec<usize>,
output_columns: Vec<String>,
) -> Self {
let capacity = column_indices.len();
Self {
inner,
column_indices,
output_columns,
current_row: Row::with_capacity(capacity),
}
}
}
impl QueryResult for StreamingProjectionResult {
fn columns(&self) -> &[String] {
&self.output_columns
}
fn next(&mut self) -> bool {
if self.inner.next() {
self.current_row.reserve_inline(self.column_indices.len());
self.current_row.clear_inline();
let source_row = self.inner.row();
for &idx in &self.column_indices {
self.current_row.push_inline(
source_row
.get(idx)
.cloned()
.unwrap_or(Value::null_unknown()),
);
}
true
} else {
false
}
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.inner.close()
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn last_error(&mut self) -> Option<crate::core::Error> {
self.inner.last_error()
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
pub struct ColumnarResult {
columns: CompactArc<Vec<String>>,
data: Vec<Vec<Value>>,
num_rows: usize,
current_index: Option<usize>,
current_row: Row,
closed: bool,
}
impl ColumnarResult {
pub fn new(columns: Vec<String>, data: Vec<Vec<Value>>) -> Self {
debug_assert!(
columns.len() == data.len(),
"columns.len() ({}) != data.len() ({})",
columns.len(),
data.len()
);
let num_rows = data.first().map(|c| c.len()).unwrap_or(0);
#[cfg(debug_assertions)]
for (i, col) in data.iter().enumerate() {
debug_assert!(
col.len() == num_rows,
"column {} has {} rows but expected {}",
i,
col.len(),
num_rows
);
}
let num_cols = columns.len();
Self {
columns: CompactArc::new(columns),
data,
num_rows,
current_index: None,
current_row: Row::with_capacity(num_cols),
closed: false,
}
}
pub fn with_arc_columns(columns: CompactArc<Vec<String>>, data: Vec<Vec<Value>>) -> Self {
let num_rows = data.first().map(|c| c.len()).unwrap_or(0);
let num_cols = columns.len();
Self {
columns,
data,
num_rows,
current_index: None,
current_row: Row::with_capacity(num_cols),
closed: false,
}
}
#[inline]
pub fn row_count(&self) -> usize {
self.num_rows
}
#[inline]
pub fn column_count(&self) -> usize {
self.data.len()
}
}
impl QueryResult for ColumnarResult {
fn columns(&self) -> &[String] {
&self.columns
}
fn columns_arc(&self) -> Option<CompactArc<Vec<String>>> {
Some(CompactArc::clone(&self.columns))
}
#[inline]
fn next(&mut self) -> bool {
if self.closed {
return false;
}
let next_idx = match self.current_index {
None => 0,
Some(i) => i + 1,
};
if next_idx >= self.num_rows {
return false;
}
self.current_index = Some(next_idx);
self.current_row.reserve_inline(self.data.len());
self.current_row.clear_inline();
for col_data in &self.data {
self.current_row.push_inline(col_data[next_idx].clone());
}
true
}
fn scan(&self, dest: &mut [Value]) -> Result<()> {
if dest.len() != self.current_row.len() {
return Err(crate::core::Error::internal(format!(
"scan destination has {} values but row has {} columns",
dest.len(),
self.current_row.len()
)));
}
for (i, value) in self.current_row.iter().enumerate() {
dest[i] = value.clone();
}
Ok(())
}
#[inline]
fn row(&self) -> &Row {
&self.current_row
}
fn take_row(&mut self) -> Row {
std::mem::take(&mut self.current_row)
}
fn close(&mut self) -> Result<()> {
self.closed = true;
self.data.clear();
Ok(())
}
fn rows_affected(&self) -> i64 {
0
}
fn last_insert_id(&self) -> i64 {
0
}
fn estimated_count(&self) -> Option<usize> {
Some(self.num_rows)
}
fn with_aliases(self: Box<Self>, aliases: FxHashMap<String, String>) -> Box<dyn QueryResult> {
Box::new(AliasedResult::new(self, aliases))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::row_vec::RowVec;
fn make_rows(rows: Vec<Row>) -> RowVec {
let mut rv = RowVec::with_capacity(rows.len());
for (i, row) in rows.into_iter().enumerate() {
rv.push((i as i64, row));
}
rv
}
#[test]
fn test_exec_result() {
let result = ExecResult::new(5, 10);
assert_eq!(result.rows_affected(), 5);
assert_eq!(result.last_insert_id(), 10);
assert_eq!(result.columns().len(), 0);
}
#[test]
fn test_exec_result_empty() {
let result = ExecResult::empty();
assert_eq!(result.rows_affected(), 0);
assert_eq!(result.last_insert_id(), 0);
}
#[test]
fn test_memory_result() {
let columns = vec!["id".to_string(), "name".to_string()];
let rows = make_rows(vec![
Row::from_values(vec![Value::Integer(1), Value::text("Alice")]),
Row::from_values(vec![Value::Integer(2), Value::text("Bob")]),
]);
let mut result = ExecutorResult::new(columns, rows);
assert_eq!(result.columns().len(), 2);
assert_eq!(result.row_count(), 2);
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(1)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(!result.next());
}
#[test]
fn test_filtered_result() {
let columns = vec!["id".to_string(), "value".to_string()];
let rows = make_rows(vec![
Row::from_values(vec![Value::Integer(1), Value::Integer(10)]),
Row::from_values(vec![Value::Integer(2), Value::Integer(20)]),
Row::from_values(vec![Value::Integer(3), Value::Integer(30)]),
]);
let inner = Box::new(ExecutorResult::new(columns, rows));
use crate::executor::utils::dummy_token;
use crate::parser::ast::{Identifier, InfixExpression, InfixOperator, IntegerLiteral};
use crate::parser::token::TokenType;
let filter_expr = Expression::Infix(InfixExpression {
token: dummy_token(">", TokenType::Operator),
left: Box::new(Expression::Identifier(Identifier {
token: dummy_token("value", TokenType::Identifier),
value: "value".into(),
value_lower: "value".into(),
})),
operator: ">".into(),
op_type: InfixOperator::GreaterThan,
right: Box::new(Expression::IntegerLiteral(IntegerLiteral {
token: dummy_token("15", TokenType::Integer),
value: 15,
})),
});
let mut result = FilteredResult::with_defaults(inner, filter_expr).unwrap();
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(3)));
assert!(!result.next());
}
#[test]
fn test_limited_result() {
let columns = vec!["id".to_string()];
let rows = make_rows(vec![
Row::from_values(vec![Value::Integer(1)]),
Row::from_values(vec![Value::Integer(2)]),
Row::from_values(vec![Value::Integer(3)]),
Row::from_values(vec![Value::Integer(4)]),
Row::from_values(vec![Value::Integer(5)]),
]);
let inner = Box::new(ExecutorResult::new(columns, rows));
let mut result = LimitedResult::new(inner, Some(2), 1);
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(3)));
assert!(!result.next()); }
#[test]
fn test_ordered_result() {
let columns = vec!["id".to_string(), "value".to_string()];
let rows = make_rows(vec![
Row::from_values(vec![Value::Integer(3), Value::Integer(30)]),
Row::from_values(vec![Value::Integer(1), Value::Integer(10)]),
Row::from_values(vec![Value::Integer(2), Value::Integer(20)]),
]);
let inner = Box::new(ExecutorResult::new(columns, rows));
let mut result = OrderedResult::new(inner, |a, b| {
let a_id = a.get(0).and_then(|v| v.as_int64()).unwrap_or(0);
let b_id = b.get(0).and_then(|v| v.as_int64()).unwrap_or(0);
a_id.cmp(&b_id)
})
.unwrap();
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(1)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(2)));
assert!(result.next());
assert_eq!(result.row().get(0), Some(&Value::Integer(3)));
assert!(!result.next());
}
#[test]
fn test_distinct_result() {
let columns = vec!["name".to_string()];
let rows = make_rows(vec![
Row::from_values(vec![Value::text("Alice")]),
Row::from_values(vec![Value::text("Bob")]),
Row::from_values(vec![Value::text("Alice")]), Row::from_values(vec![Value::text("Charlie")]),
]);
let inner = Box::new(ExecutorResult::new(columns, rows));
let mut result = DistinctResult::new(inner);
let mut names = Vec::new();
while result.next() {
if let Some(Value::Text(name)) = result.row().get(0) {
names.push(name.to_string());
}
}
assert_eq!(names.len(), 3);
assert!(names.contains(&"Alice".to_string()));
assert!(names.contains(&"Bob".to_string()));
assert!(names.contains(&"Charlie".to_string()));
}
#[test]
fn test_aliased_result() {
let columns = vec!["id".to_string(), "name".to_string()];
let rows = make_rows(vec![Row::from_values(vec![
Value::Integer(1),
Value::text("Alice"),
])]);
let inner = Box::new(ExecutorResult::new(columns, rows));
let mut aliases = FxHashMap::default();
aliases.insert("user_name".to_string(), "name".to_string());
let mut result = AliasedResult::new(inner, aliases);
assert_eq!(result.columns(), &["id", "user_name"]);
assert!(result.next());
assert_eq!(result.row().get(1), Some(&Value::text("Alice")));
}
}