use std::borrow::Cow;
#[cfg(test)]
use std::cell::RefCell;
use std::collections::hash_map::Iter as HashMapIter;
use std::collections::BTreeMap;
use std::sync::Arc;
use crate::catalog::Catalog;
use crate::db::scalar_at_path;
use crate::db::SharedDbState;
use crate::error::{DbError, QueryError, SchemaError};
use crate::file_format::MAX_QUERY_LIMIT;
use crate::index::IndexState;
use crate::record::RowValue;
use crate::schema::{CollectionId, FieldPath, IndexKind};
use crate::storage::{FileStore, Store};
use crate::ScalarValue;
use super::ast::{OrderBy, OrderDirection};
use super::ast::{Predicate, Query};
use super::operators::{LimitOp, RowKey, RowSource};
fn row_for_index_pk(
latest: &crate::db::LatestMap,
collection_id: u32,
pk_key: Vec<u8>,
index_name: &str,
) -> Result<BTreeMap<String, RowValue>, DbError> {
latest
.get(&(collection_id, pk_key))
.cloned()
.ok_or(DbError::Schema(SchemaError::IndexRowMissing {
collection_id,
index_name: index_name.to_string(),
}))
}
#[derive(Debug, Clone, PartialEq)]
struct IndexKeyRange {
lo: Option<ScalarValue>,
lo_inclusive: bool,
hi: Option<ScalarValue>,
hi_inclusive: bool,
}
#[derive(Debug, Clone, PartialEq)]
enum Plan {
IndexLookup {
collection_id: u32,
index_name: String,
kind: IndexKind,
key: Vec<u8>,
residual: Option<Predicate>,
limit: Option<usize>,
order_by: Option<OrderBy>,
},
IndexRangeLookup {
collection_id: u32,
index_name: String,
kind: IndexKind,
key_range: IndexKeyRange,
residual: Option<Predicate>,
limit: Option<usize>,
order_by: Option<OrderBy>,
},
CollectionScan {
collection_id: u32,
predicate: Option<Predicate>,
limit: Option<usize>,
order_by: Option<OrderBy>,
},
}
pub fn explain_query(catalog: &Catalog, query: &Query) -> Result<String, DbError> {
validate_query_limit(query)?;
let col =
catalog
.get(query.collection)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: query.collection.0,
}))?;
let plan = plan_query(col.id, &col.indexes, query);
#[cfg(feature = "tracing")]
tracing::debug!(plan = ?plan, "explain_query");
Ok(match plan {
Plan::IndexLookup {
index_name,
kind,
residual,
limit,
order_by,
..
} => {
let mut s = String::new();
s.push_str("Plan:\n");
s.push_str(&format!(
" IndexLookup index={index_name:?} kind={kind:?}\n"
));
if let Some(r) = residual {
s.push_str(&format!(" ResidualFilter {r:?}\n"));
}
if let Some(n) = limit {
s.push_str(&format!(" Limit {n}\n"));
}
if let Some(ob) = order_by {
s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
}
s
}
Plan::IndexRangeLookup {
index_name,
kind,
key_range,
residual,
limit,
order_by,
..
} => {
let mut s = String::new();
s.push_str("Plan:\n");
s.push_str(&format!(
" IndexRangeLookup index={index_name:?} kind={kind:?}\n"
));
if let Some(ref lo) = key_range.lo {
let op = if key_range.lo_inclusive { ">=" } else { ">" };
s.push_str(&format!(" KeyRange lo {op} {lo:?}\n"));
}
if let Some(ref hi) = key_range.hi {
let op = if key_range.hi_inclusive { "<=" } else { "<" };
s.push_str(&format!(" KeyRange hi {op} {hi:?}\n"));
}
if let Some(r) = residual {
s.push_str(&format!(" ResidualFilter {r:?}\n"));
}
if let Some(n) = limit {
s.push_str(&format!(" Limit {n}\n"));
}
if let Some(ob) = order_by {
s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
}
s
}
Plan::CollectionScan {
predicate,
limit,
order_by,
..
} => {
let mut s = String::new();
s.push_str("Plan:\n");
s.push_str(" CollectionScan\n");
if let Some(p) = predicate {
s.push_str(&format!(" Filter {p:?}\n"));
}
if let Some(n) = limit {
s.push_str(&format!(" Limit {n}\n"));
}
if let Some(ob) = order_by {
s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
}
s
}
})
}
fn validate_query_limit(query: &Query) -> Result<(), DbError> {
if let Some(n) = query.limit {
if n > MAX_QUERY_LIMIT {
return Err(DbError::Query(QueryError {
message: format!("query limit {n} exceeds maximum {MAX_QUERY_LIMIT}"),
}));
}
}
Ok(())
}
pub fn execute_query(
catalog: &Catalog,
indexes: &IndexState,
latest: &crate::db::LatestMap,
query: &Query,
) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
validate_query_limit(query)?;
let col =
catalog
.get(query.collection)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: query.collection.0,
}))?;
let plan = plan_query(col.id, &col.indexes, query);
#[cfg(feature = "tracing")]
tracing::debug!(plan = ?plan, "execute_query");
match plan {
Plan::IndexLookup {
collection_id,
index_name,
kind,
key,
residual,
limit,
order_by,
} => {
let mut out = Vec::new();
match kind {
IndexKind::Unique => {
if let Some(pk) = indexes.unique_lookup(collection_id, &index_name, &key) {
out.push(row_for_index_pk(
latest,
collection_id,
pk.to_vec(),
&index_name,
)?);
}
}
IndexKind::NonUnique => {
if let Some(pks) = indexes.non_unique_lookup(collection_id, &index_name, &key) {
for pk in pks {
out.push(row_for_index_pk(latest, collection_id, pk, &index_name)?);
}
}
}
}
if let Some(pred) = residual {
out.retain(|row| eval_predicate(row, &pred));
}
apply_order_by_and_limit(
&mut out,
order_by.as_ref(),
limit,
col.primary_field.as_deref(),
);
Ok(out)
}
Plan::IndexRangeLookup {
collection_id,
index_name,
kind,
key_range,
residual,
limit,
order_by,
} => {
let mut out = collect_index_range_rows(
indexes,
latest,
collection_id,
&index_name,
kind,
&key_range,
)?;
if let Some(pred) = residual {
out.retain(|row| eval_predicate(row, &pred));
}
apply_order_by_and_limit(
&mut out,
order_by.as_ref(),
limit,
col.primary_field.as_deref(),
);
Ok(out)
}
Plan::CollectionScan {
collection_id,
predicate,
limit,
order_by,
} => {
let mut out = Vec::new();
for ((cid, _pk), row) in latest.iter() {
if *cid != collection_id {
continue;
}
if let Some(ref p) = predicate {
if !eval_predicate(row, p) {
continue;
}
}
out.push(row.clone());
}
apply_order_by_and_limit(
&mut out,
order_by.as_ref(),
limit,
col.primary_field.as_deref(),
);
Ok(out)
}
}
}
pub struct QueryRowIter<'a> {
state: QueryRowIterState<'a>,
}
enum QueryRowIterState<'a> {
Vec {
rows: Vec<BTreeMap<String, RowValue>>,
pos: usize,
},
Source {
latest: &'a crate::db::LatestMap,
source: Box<dyn RowSource + 'a>,
},
Owned {
snapshot: Arc<SharedDbState>,
source: Box<dyn RowSource + 'static>,
},
}
impl<'a> Iterator for QueryRowIter<'a> {
type Item = Result<BTreeMap<String, RowValue>, DbError>;
fn next(&mut self) -> Option<Self::Item> {
match &mut self.state {
QueryRowIterState::Vec { rows, pos } => {
if *pos >= rows.len() {
None
} else {
let out = rows[*pos].clone();
*pos += 1;
Some(Ok(out))
}
}
QueryRowIterState::Source { latest, source } => match source.next_key() {
None => None,
Some(Err(e)) => Some(Err(e)),
Some(Ok((cid, pk_key))) => Some(row_for_index_pk(latest, cid.0, pk_key, "")),
},
QueryRowIterState::Owned { snapshot, source } => match source.next_key() {
None => None,
Some(Err(e)) => Some(Err(e)),
Some(Ok((cid, pk_key))) => {
Some(row_for_index_pk(&snapshot.latest, cid.0, pk_key, ""))
}
},
}
}
}
struct IndexUniqueSource<'a> {
latest: &'a crate::db::LatestMap,
collection_id: u32,
index_name: String,
pk: Option<Vec<u8>>,
residual: Option<Predicate>,
done: bool,
}
impl RowSource for IndexUniqueSource<'_> {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
if self.done {
return None;
}
self.done = true;
let pk_key = self.pk.take()?;
let row = match row_for_index_pk(
self.latest,
self.collection_id,
pk_key.clone(),
&self.index_name,
) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
if let Some(pred) = &self.residual {
if !eval_predicate(&row, pred) {
return None;
}
}
Some(Ok((CollectionId(self.collection_id), pk_key)))
}
}
struct IndexNonUniqueSource<'a> {
latest: &'a crate::db::LatestMap,
collection_id: u32,
index_name: String,
pks: std::vec::IntoIter<Vec<u8>>,
residual: Option<Predicate>,
}
impl RowSource for IndexNonUniqueSource<'_> {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
for pk_key in self.pks.by_ref() {
let row = match row_for_index_pk(
self.latest,
self.collection_id,
pk_key.clone(),
&self.index_name,
) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
if let Some(pred) = &self.residual {
if !eval_predicate(&row, pred) {
continue;
}
}
return Some(Ok((CollectionId(self.collection_id), pk_key)));
}
None
}
}
struct IndexRangeSource<'a> {
latest: &'a crate::db::LatestMap,
collection_id: u32,
index_name: String,
pks: std::vec::IntoIter<Vec<u8>>,
residual: Option<Predicate>,
}
impl RowSource for IndexRangeSource<'_> {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
for pk_key in self.pks.by_ref() {
let row = match row_for_index_pk(
self.latest,
self.collection_id,
pk_key.clone(),
&self.index_name,
) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
if let Some(pred) = &self.residual {
if !eval_predicate(&row, pred) {
continue;
}
}
return Some(Ok((CollectionId(self.collection_id), pk_key)));
}
None
}
}
fn collect_index_range_rows(
indexes: &IndexState,
latest: &crate::db::LatestMap,
collection_id: u32,
index_name: &str,
kind: IndexKind,
key_range: &IndexKeyRange,
) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
let lo = key_range.lo.as_ref();
let hi = key_range.hi.as_ref();
let pks = match kind {
IndexKind::Unique => indexes.unique_range_lookup(
collection_id,
index_name,
lo,
key_range.lo_inclusive,
hi,
key_range.hi_inclusive,
),
IndexKind::NonUnique => indexes.non_unique_range_lookup(
collection_id,
index_name,
lo,
key_range.lo_inclusive,
hi,
key_range.hi_inclusive,
),
};
let mut out = Vec::with_capacity(pks.len());
for pk in pks {
out.push(row_for_index_pk(latest, collection_id, pk, index_name)?);
}
Ok(out)
}
fn index_range_source<'a>(
indexes: &'a IndexState,
latest: &'a crate::db::LatestMap,
collection_id: u32,
index_name: String,
kind: IndexKind,
key_range: &IndexKeyRange,
residual: Option<Predicate>,
) -> IndexRangeSource<'a> {
let lo = key_range.lo.as_ref();
let hi = key_range.hi.as_ref();
let pks = match kind {
IndexKind::Unique => indexes.unique_range_lookup(
collection_id,
&index_name,
lo,
key_range.lo_inclusive,
hi,
key_range.hi_inclusive,
),
IndexKind::NonUnique => indexes.non_unique_range_lookup(
collection_id,
&index_name,
lo,
key_range.lo_inclusive,
hi,
key_range.hi_inclusive,
),
};
IndexRangeSource {
latest,
collection_id,
index_name,
pks: pks.into_iter(),
residual,
}
}
struct ScanSource<'a> {
it: HashMapIter<'a, (u32, Vec<u8>), BTreeMap<String, RowValue>>,
collection_id: u32,
predicate: Option<Predicate>,
}
impl RowSource for ScanSource<'_> {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
for (&(cid, ref pk_key), row) in self.it.by_ref() {
if cid != self.collection_id {
continue;
}
if let Some(p) = &self.predicate {
if !eval_predicate(row, p) {
continue;
}
}
return Some(Ok((CollectionId(self.collection_id), pk_key.clone())));
}
None
}
}
struct OwnedIndexUniqueSource {
snapshot: Arc<SharedDbState>,
collection_id: u32,
index_name: String,
pk: Option<Vec<u8>>,
residual: Option<Predicate>,
done: bool,
}
impl RowSource for OwnedIndexUniqueSource {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
if self.done {
return None;
}
self.done = true;
let pk_key = self.pk.take()?;
let row = match row_for_index_pk(
&self.snapshot.latest,
self.collection_id,
pk_key.clone(),
&self.index_name,
) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
if let Some(pred) = &self.residual {
if !eval_predicate(&row, pred) {
return None;
}
}
Some(Ok((CollectionId(self.collection_id), pk_key)))
}
}
struct OwnedIndexNonUniqueSource {
snapshot: Arc<SharedDbState>,
collection_id: u32,
index_name: String,
pks: std::vec::IntoIter<Vec<u8>>,
residual: Option<Predicate>,
}
impl RowSource for OwnedIndexNonUniqueSource {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
for pk_key in self.pks.by_ref() {
let row = match row_for_index_pk(
&self.snapshot.latest,
self.collection_id,
pk_key.clone(),
&self.index_name,
) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
if let Some(pred) = &self.residual {
if !eval_predicate(&row, pred) {
continue;
}
}
return Some(Ok((CollectionId(self.collection_id), pk_key)));
}
None
}
}
struct OwnedScanSource {
snapshot: Arc<SharedDbState>,
collection_id: u32,
predicate: Option<Predicate>,
pos: usize,
keys: Vec<(u32, Vec<u8>)>,
}
impl OwnedScanSource {
fn new(snapshot: Arc<SharedDbState>, collection_id: u32, predicate: Option<Predicate>) -> Self {
let mut keys: Vec<(u32, Vec<u8>)> = snapshot
.latest
.keys()
.filter(|(cid, _)| *cid == collection_id)
.cloned()
.collect();
keys.sort_by(|a, b| a.1.cmp(&b.1));
Self {
snapshot,
collection_id,
predicate,
pos: 0,
keys,
}
}
}
impl RowSource for OwnedScanSource {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
while self.pos < self.keys.len() {
let (cid, pk_key) = self.keys[self.pos].clone();
self.pos += 1;
if cid != self.collection_id {
continue;
}
let row = match self.snapshot.latest.get(&(cid, pk_key.clone())) {
Some(r) => r,
None => continue,
};
if let Some(p) = &self.predicate {
if !eval_predicate(row, p) {
continue;
}
}
return Some(Ok((CollectionId(self.collection_id), pk_key)));
}
None
}
}
fn owned_row_source_for_plan(
snapshot: Arc<SharedDbState>,
plan: Plan,
) -> Box<dyn RowSource + 'static> {
match plan {
Plan::IndexLookup {
collection_id,
index_name,
kind,
key,
residual,
..
} => match kind {
IndexKind::Unique => {
let pk: Option<Vec<u8>> = snapshot
.indexes
.unique_lookup(collection_id, &index_name, &key)
.map(|p| p.to_vec());
Box::new(OwnedIndexUniqueSource {
snapshot,
collection_id,
index_name,
pk,
residual,
done: false,
})
}
IndexKind::NonUnique => {
let pks = snapshot
.indexes
.non_unique_lookup(collection_id, &index_name, &key)
.unwrap_or_default()
.into_iter();
Box::new(OwnedIndexNonUniqueSource {
snapshot,
collection_id,
index_name,
pks,
residual,
})
}
},
Plan::IndexRangeLookup {
collection_id,
index_name,
kind,
key_range,
residual,
..
} => {
let lo = key_range.lo.as_ref();
let hi = key_range.hi.as_ref();
let pks = match kind {
IndexKind::Unique => snapshot.indexes.unique_range_lookup(
collection_id,
&index_name,
lo,
key_range.lo_inclusive,
hi,
key_range.hi_inclusive,
),
IndexKind::NonUnique => snapshot.indexes.non_unique_range_lookup(
collection_id,
&index_name,
lo,
key_range.lo_inclusive,
hi,
key_range.hi_inclusive,
),
};
Box::new(OwnedIndexNonUniqueSource {
snapshot,
collection_id,
index_name,
pks: pks.into_iter(),
residual,
})
}
Plan::CollectionScan {
collection_id,
predicate,
..
} => Box::new(OwnedScanSource::new(snapshot, collection_id, predicate)),
}
}
pub fn execute_query_iter_owned(
snapshot: Arc<SharedDbState>,
query: &Query,
db_path: Option<&std::path::Path>,
) -> Result<QueryRowIter<'static>, DbError> {
if query.order_by.is_none() {
validate_query_limit(query)?;
let col = snapshot
.catalog
.get(query.collection)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: query.collection.0,
}))?;
let plan = plan_query(col.id, &col.indexes, query);
let mut source = owned_row_source_for_plan(snapshot.clone(), plan);
if let Some(n) = query.limit {
source = Box::new(LimitOp::new(source, n));
}
return Ok(QueryRowIter {
state: QueryRowIterState::Owned { snapshot, source },
});
}
let order_by = query
.order_by
.clone()
.expect("order_by is Some when this function continues");
let Some(path) = db_path else {
return Ok(QueryRowIter {
state: QueryRowIterState::Vec {
rows: execute_query(
&snapshot.catalog,
&snapshot.indexes,
&snapshot.latest,
query,
)?,
pos: 0,
},
});
};
validate_query_limit(query)?;
let col = snapshot
.catalog
.get(query.collection)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: query.collection.0,
}))?;
let plan = plan_query(col.id, &col.indexes, query);
let base = owned_row_source_for_plan(snapshot.clone(), plan.clone());
let spill_store = open_sorted_query_spill_store(path)?;
#[cfg(feature = "tracing")]
tracing::debug!(spill_path = %path.display(), "execute_query_iter_owned_spill");
let spill = crate::spill::TempSpillFile::new(spill_store)?;
let index_name_for_sort = match &plan {
Plan::IndexLookup { index_name, .. } | Plan::IndexRangeLookup { index_name, .. } => {
index_name.as_str()
}
Plan::CollectionScan { .. } => "",
};
let sort_source = Box::new(ExternalSortSourceOwned::new(
spill,
snapshot.clone(),
base,
col.id.0,
order_by,
index_name_for_sort,
)?);
let mut source: Box<dyn RowSource + 'static> = sort_source;
if let Some(n) = query.limit {
source = Box::new(LimitOp::new(source, n));
}
Ok(QueryRowIter {
state: QueryRowIterState::Owned { snapshot, source },
})
}
pub fn execute_query_iter<'a>(
catalog: &'a Catalog,
indexes: &'a IndexState,
latest: &'a crate::db::LatestMap,
query: &Query,
) -> Result<QueryRowIter<'a>, DbError> {
if query.order_by.is_some() {
return Ok(QueryRowIter {
state: QueryRowIterState::Vec {
rows: execute_query(catalog, indexes, latest, query)?,
pos: 0,
},
});
}
let col =
catalog
.get(query.collection)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: query.collection.0,
}))?;
let plan = plan_query(col.id, &col.indexes, query);
let mut source: Box<dyn RowSource + 'a> = match plan {
Plan::IndexLookup {
collection_id,
index_name,
kind,
key,
residual,
..
} => match kind {
IndexKind::Unique => {
let pk = indexes
.unique_lookup(collection_id, &index_name, &key)
.map(|p| p.to_vec());
Box::new(IndexUniqueSource {
latest,
collection_id,
index_name,
pk,
residual,
done: false,
})
}
IndexKind::NonUnique => {
let pks = indexes
.non_unique_lookup(collection_id, &index_name, &key)
.unwrap_or_default()
.into_iter();
Box::new(IndexNonUniqueSource {
latest,
collection_id,
index_name,
pks,
residual,
})
}
},
Plan::IndexRangeLookup {
collection_id,
index_name,
kind,
key_range,
residual,
..
} => Box::new(index_range_source(
indexes,
latest,
collection_id,
index_name,
kind,
&key_range,
residual,
)),
Plan::CollectionScan {
collection_id,
predicate,
..
} => Box::new(ScanSource {
it: latest.iter(),
collection_id,
predicate,
}),
};
if let Some(n) = query.limit {
source = Box::new(LimitOp::new(source, n));
}
Ok(QueryRowIter {
state: QueryRowIterState::Source { latest, source },
})
}
#[cfg(test)]
type SortedQuerySpillStoreOpenHook = Box<dyn FnMut(&std::path::Path) -> Result<FileStore, DbError>>;
#[cfg(test)]
type SortedQuerySpillStoreOverrideHook =
Box<dyn FnMut(&std::path::Path) -> Result<SortedQuerySpillStore, DbError>>;
#[cfg(test)]
thread_local! {
static QUERY_SORT_SPILL_STORE_OPEN_HOOK: RefCell<Option<SortedQuerySpillStoreOpenHook>> =
RefCell::new(None);
static QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK: RefCell<Option<SortedQuerySpillStoreOverrideHook>> =
RefCell::new(None);
}
#[cfg(test)]
pub(crate) fn test_set_sorted_query_spill_store_open_hook(
hook: Option<SortedQuerySpillStoreOpenHook>,
) {
QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
*c.borrow_mut() = hook;
});
}
#[cfg(test)]
pub(crate) fn test_set_sorted_query_spill_store_override_hook(
hook: Option<SortedQuerySpillStoreOverrideHook>,
) {
QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
*c.borrow_mut() = hook;
});
}
pub(crate) enum SortedQuerySpillStore {
File(FileStore),
#[cfg(test)]
FailLen,
}
impl Store for SortedQuerySpillStore {
fn len(&self) -> Result<u64, DbError> {
match self {
Self::File(f) => f.len(),
#[cfg(test)]
Self::FailLen => Err(DbError::Io(std::io::Error::other(
"sorted query spill store synthetic len() failure (test override)",
))),
}
}
fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
match self {
Self::File(f) => f.read_exact_at(offset, buf),
#[cfg(test)]
Self::FailLen => Err(DbError::Io(std::io::Error::other(
"sorted query spill store synthetic read failure (test override)",
))),
}
}
fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
match self {
Self::File(f) => f.write_all_at(offset, buf),
#[cfg(test)]
Self::FailLen => Err(DbError::Io(std::io::Error::other(
"sorted query spill store synthetic write failure (test override)",
))),
}
}
fn sync(&mut self) -> Result<(), DbError> {
match self {
Self::File(f) => f.sync(),
#[cfg(test)]
Self::FailLen => Ok(()),
}
}
fn truncate(&mut self, len: u64) -> Result<(), DbError> {
match self {
Self::File(f) => f.truncate(len),
#[cfg(test)]
Self::FailLen => Ok(()),
}
}
}
fn open_sorted_query_spill_store(path: &std::path::Path) -> Result<SortedQuerySpillStore, DbError> {
#[cfg(test)]
{
let overridden = QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
let mut bm = c.borrow_mut();
bm.as_mut().map(|hook| hook(path))
});
if let Some(r) = overridden {
return r;
}
let hooked = QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
let mut bm = c.borrow_mut();
bm.as_mut().map(|hook| hook(path))
});
if let Some(r) = hooked {
return r.map(SortedQuerySpillStore::File);
}
}
let _ = path;
let spill_file = tempfile::tempfile().map_err(DbError::Io)?;
Ok(SortedQuerySpillStore::File(FileStore::new(spill_file)))
}
pub fn execute_query_iter_with_spill_path<'a>(
catalog: &'a Catalog,
indexes: &'a IndexState,
latest: &'a crate::db::LatestMap,
q: &Query,
db_path: Option<&std::path::Path>,
) -> Result<QueryRowIter<'a>, DbError> {
if q.order_by.is_none() {
return execute_query_iter(catalog, indexes, latest, q);
}
let order_by = q
.order_by
.clone()
.expect("order_by is Some when this function continues");
let Some(path) = db_path else {
return Ok(QueryRowIter {
state: QueryRowIterState::Vec {
rows: execute_query(catalog, indexes, latest, q)?,
pos: 0,
},
});
};
let col = catalog
.get(q.collection)
.ok_or(DbError::Schema(SchemaError::UnknownCollection {
id: q.collection.0,
}))?;
let plan = plan_query(col.id, &col.indexes, q);
let base: Box<dyn RowSource + 'a> = match plan.clone() {
Plan::IndexLookup {
collection_id,
index_name,
kind,
key,
residual,
..
} => match kind {
IndexKind::Unique => Box::new(IndexUniqueSource {
latest,
collection_id,
index_name: index_name.clone(),
pk: indexes
.unique_lookup(collection_id, &index_name, &key)
.map(|p| p.to_vec()),
residual,
done: false,
}),
IndexKind::NonUnique => Box::new(IndexNonUniqueSource {
latest,
collection_id,
index_name: index_name.clone(),
pks: indexes
.non_unique_lookup(collection_id, &index_name, &key)
.unwrap_or_default()
.into_iter(),
residual,
}),
},
Plan::IndexRangeLookup {
collection_id,
index_name,
kind,
key_range,
residual,
..
} => Box::new(index_range_source(
indexes,
latest,
collection_id,
index_name,
kind,
&key_range,
residual,
)),
Plan::CollectionScan {
collection_id,
predicate,
..
} => Box::new(ScanSource {
it: latest.iter(),
collection_id,
predicate,
}),
};
let spill_store = open_sorted_query_spill_store(path)?;
#[cfg(feature = "tracing")]
tracing::debug!(spill_path = %path.display(), "execute_query_order_by_spill");
let spill = crate::spill::TempSpillFile::new(spill_store)?;
let index_name_for_sort = match &plan {
Plan::IndexLookup { index_name, .. } | Plan::IndexRangeLookup { index_name, .. } => {
index_name.as_str()
}
Plan::CollectionScan { .. } => "",
};
let sort_source = Box::new(ExternalSortSource::new(
spill,
latest,
base,
col.id.0,
order_by,
index_name_for_sort,
)?);
let mut source: Box<dyn RowSource + 'a> = sort_source;
if let Some(n) = q.limit {
source = Box::new(LimitOp::new(source, n));
}
Ok(QueryRowIter {
state: QueryRowIterState::Source { latest, source },
})
}
#[derive(Clone)]
struct SortItem {
none_flag: u8,
sort_key: Vec<u8>,
key: RowKey,
}
#[cfg(test)]
fn sort_item_for(
latest: &crate::db::LatestMap,
key: &RowKey,
order_by: &OrderBy,
) -> Option<SortItem> {
sort_item_for_result(latest, key, order_by, "").ok()
}
fn sort_item_for_result(
latest: &crate::db::LatestMap,
key: &RowKey,
order_by: &OrderBy,
index_name: &str,
) -> Result<SortItem, DbError> {
let (cid, pk) = key;
let row =
latest
.get(&(cid.0, pk.clone()))
.ok_or(DbError::Schema(SchemaError::IndexRowMissing {
collection_id: cid.0,
index_name: index_name.to_string(),
}))?;
let (none_flag, sort_key) = match scalar_at_path(row, &order_by.path) {
None => (1u8, Vec::new()),
Some(s) => (0u8, scalar_sort_key_bytes(&s)),
};
Ok(SortItem {
none_flag,
sort_key,
key: (CollectionId(cid.0), pk.clone()),
})
}
fn scalar_sort_key_bytes(s: &ScalarValue) -> Vec<u8> {
match s {
ScalarValue::Bool(b) => vec![0, if *b { 1 } else { 0 }],
ScalarValue::Int64(v) => {
let u = (*v as u64) ^ 0x8000_0000_0000_0000u64;
let mut out = vec![1];
out.extend_from_slice(&u.to_be_bytes());
out
}
ScalarValue::Uint64(v) => {
let mut out = vec![2];
out.extend_from_slice(&v.to_be_bytes());
out
}
ScalarValue::Float64(v) => {
let n = if *v == 0.0 { 0.0f64 } else { *v };
let mut bits = n.to_bits();
if bits & (1u64 << 63) != 0 {
bits = !bits;
} else {
bits ^= 1u64 << 63;
}
let mut out = vec![3];
out.extend_from_slice(&bits.to_be_bytes());
out
}
ScalarValue::String(st) => {
let mut out = vec![4];
out.extend_from_slice(st.as_bytes());
out
}
ScalarValue::Bytes(b) => {
let mut out = vec![5];
out.extend_from_slice(b);
out
}
ScalarValue::Uuid(u) => {
let mut out = vec![6];
out.extend_from_slice(u);
out
}
ScalarValue::Timestamp(t) => {
let u = (*t as u64) ^ 0x8000_0000_0000_0000u64;
let mut out = vec![7];
out.extend_from_slice(&u.to_be_bytes());
out
}
}
}
fn cmp_sort_item(a: &SortItem, b: &SortItem, dir: OrderDirection) -> std::cmp::Ordering {
let ord = a
.none_flag
.cmp(&b.none_flag)
.then_with(|| a.sort_key.cmp(&b.sort_key))
.then_with(|| a.key.1.cmp(&b.key.1));
match dir {
OrderDirection::Asc => ord,
OrderDirection::Desc => ord.reverse(),
}
}
struct ExternalSortSource<'a, S: Store = FileStore> {
spill: crate::spill::TempSpillFile<S>,
collection_id: u32,
dir: OrderDirection,
heap: std::collections::BinaryHeap<HeapItem>,
runs_meta: Vec<RunMeta>,
run_cursors: Vec<usize>,
_latest: &'a crate::db::LatestMap,
}
#[derive(Clone)]
struct RunMeta {
offset: u64,
payload_len: u64,
}
type SpillSortRunItem = (u8, Vec<u8>, Vec<u8>);
#[cfg(test)]
struct RunReader {
buf: Vec<u8>,
pos: usize,
}
#[cfg(test)]
impl RunReader {
fn new(buf: Vec<u8>) -> Self {
Self { buf, pos: 0 }
}
fn next_item(&mut self) -> Option<(u8, Vec<u8>, Vec<u8>)> {
read_run_item_from_buf(&self.buf, &mut self.pos)
}
}
#[cfg(test)]
fn read_run_item_from_buf(buf: &[u8], pos: &mut usize) -> Option<(u8, Vec<u8>, Vec<u8>)> {
fn read_u32(buf: &[u8], pos: &mut usize) -> Option<u32> {
let b = buf.get(*pos..*pos + 4)?;
*pos += 4;
Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
}
let none_flag = *buf.get(*pos)?;
*pos += 1;
let key_len = read_u32(buf, pos)? as usize;
let key = buf.get(*pos..*pos + key_len)?.to_vec();
*pos += key_len;
let pk_len = read_u32(buf, pos)? as usize;
let pk = buf.get(*pos..*pos + pk_len)?.to_vec();
*pos += pk_len;
Some((none_flag, key, pk))
}
fn read_spill_run_item<S: Store>(
spill: &mut crate::spill::TempSpillFile<S>,
meta: &RunMeta,
pos: &mut usize,
) -> Result<Option<SpillSortRunItem>, DbError> {
let payload_len = meta.payload_len as usize;
if *pos >= payload_len {
return Ok(None);
}
let mut one = [0u8; 1];
spill.read_temp_payload_into(meta.offset, *pos as u64, &mut one)?;
*pos += 1;
let none_flag = one[0];
if *pos + 4 > payload_len {
return Err(DbError::Query(crate::error::QueryError {
message: "external sort spill segment truncated".into(),
}));
}
let mut len_buf = [0u8; 4];
spill.read_temp_payload_into(meta.offset, *pos as u64, &mut len_buf)?;
*pos += 4;
let key_len = u32::from_le_bytes(len_buf) as usize;
crate::file_format::check_field_bytes_len(key_len).map_err(|e| match e {
DbError::Format(fe) => DbError::Query(crate::error::QueryError {
message: format!("external sort spill key: {fe}"),
}),
other => other,
})?;
if *pos + key_len > payload_len {
return Err(DbError::Query(crate::error::QueryError {
message: "external sort spill segment truncated".into(),
}));
}
let mut key = vec![0u8; key_len];
spill.read_temp_payload_into(meta.offset, *pos as u64, &mut key)?;
*pos += key_len;
if *pos + 4 > payload_len {
return Err(DbError::Query(crate::error::QueryError {
message: "external sort spill segment truncated".into(),
}));
}
spill.read_temp_payload_into(meta.offset, *pos as u64, &mut len_buf)?;
*pos += 4;
let pk_len = u32::from_le_bytes(len_buf) as usize;
crate::file_format::check_field_bytes_len(pk_len).map_err(|e| match e {
DbError::Format(fe) => DbError::Query(crate::error::QueryError {
message: format!("external sort spill pk: {fe}"),
}),
other => other,
})?;
if *pos + pk_len > payload_len {
return Err(DbError::Query(crate::error::QueryError {
message: "external sort spill segment truncated".into(),
}));
}
let mut pk = vec![0u8; pk_len];
spill.read_temp_payload_into(meta.offset, *pos as u64, &mut pk)?;
*pos += pk_len;
Ok(Some((none_flag, key, pk)))
}
#[derive(Clone)]
struct HeapItem {
run_idx: usize,
none_flag: u8,
sort_key: Vec<u8>,
pk: Vec<u8>,
dir: OrderDirection,
}
impl PartialEq for HeapItem {
fn eq(&self, other: &Self) -> bool {
(self.none_flag, &self.sort_key, &self.pk) == (other.none_flag, &other.sort_key, &other.pk)
}
}
impl Eq for HeapItem {}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
let a = SortItem {
none_flag: self.none_flag,
sort_key: self.sort_key.clone(),
key: (CollectionId(0), self.pk.clone()),
};
let b = SortItem {
none_flag: other.none_flag,
sort_key: other.sort_key.clone(),
key: (CollectionId(0), other.pk.clone()),
};
cmp_sort_item(&a, &b, self.dir).reverse()
}
}
impl<'a, S: Store> ExternalSortSource<'a, S> {
fn flush_sorted_run(
spill: &mut crate::spill::TempSpillFile<S>,
runs_meta: &mut Vec<RunMeta>,
run: &mut Vec<SortItem>,
dir: OrderDirection,
) -> Result<(), DbError> {
if run.is_empty() {
return Ok(());
}
run.sort_by(|a, b| cmp_sort_item(a, b, dir));
let payload = encode_run(run, dir);
let off = spill.append_temp_segment(&payload)?;
runs_meta.push(RunMeta {
offset: off,
payload_len: payload.len() as u64,
});
run.clear();
Ok(())
}
fn new(
mut spill: crate::spill::TempSpillFile<S>,
latest: &'a crate::db::LatestMap,
mut input: Box<dyn RowSource + 'a>,
collection_id: u32,
order_by: OrderBy,
index_name: &str,
) -> Result<Self, DbError> {
const RUN_KEYS: usize = 2048;
let dir = order_by.direction;
let mut runs_meta: Vec<RunMeta> = Vec::new();
let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
while let Some(rk) = input.next_key() {
let rk = rk?;
let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
run.push(item);
if run.len() >= RUN_KEYS {
Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
}
}
Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
let mut run_cursors = vec![0usize; runs_meta.len()];
let mut heap = std::collections::BinaryHeap::new();
for (i, m) in runs_meta.iter().enumerate() {
match read_spill_run_item(&mut spill, m, &mut run_cursors[i]) {
Ok(Some((none_flag, sort_key, pk))) => {
heap.push(HeapItem {
run_idx: i,
none_flag,
sort_key,
pk: pk.clone(),
dir,
});
}
Ok(None) => {}
Err(e) => return Err(e),
}
}
Ok(Self {
spill,
collection_id,
dir,
heap,
runs_meta,
run_cursors,
_latest: latest,
})
}
}
fn encode_run(run: &[SortItem], _dir: OrderDirection) -> Vec<u8> {
let mut out = Vec::new();
for it in run {
out.push(it.none_flag);
out.extend_from_slice(&(it.sort_key.len() as u32).to_le_bytes());
out.extend_from_slice(&it.sort_key);
out.extend_from_slice(&(it.key.1.len() as u32).to_le_bytes());
out.extend_from_slice(&it.key.1);
}
out
}
impl<'a, S: Store> RowSource for ExternalSortSource<'a, S> {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
let top = self.heap.pop()?;
let run_idx = top.run_idx;
let meta = self.runs_meta[run_idx].clone();
match read_spill_run_item(&mut self.spill, &meta, &mut self.run_cursors[run_idx]) {
Ok(Some((none_flag, sort_key, pk))) => {
self.heap.push(HeapItem {
run_idx,
none_flag,
sort_key,
pk: pk.clone(),
dir: self.dir,
});
}
Ok(None) => {}
Err(e) => return Some(Err(e)),
}
Some(Ok((CollectionId(self.collection_id), top.pk)))
}
}
struct ExternalSortSourceOwned<S: Store = FileStore> {
spill: crate::spill::TempSpillFile<S>,
collection_id: u32,
dir: OrderDirection,
heap: std::collections::BinaryHeap<HeapItem>,
runs_meta: Vec<RunMeta>,
run_cursors: Vec<usize>,
_snapshot: Arc<SharedDbState>,
}
impl<S: Store> ExternalSortSourceOwned<S> {
fn new(
mut spill: crate::spill::TempSpillFile<S>,
snapshot: Arc<SharedDbState>,
mut input: Box<dyn RowSource + 'static>,
collection_id: u32,
order_by: OrderBy,
index_name: &str,
) -> Result<Self, DbError> {
const RUN_KEYS: usize = 2048;
let dir = order_by.direction;
let mut runs_meta: Vec<RunMeta> = Vec::new();
let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
let latest = &snapshot.latest;
while let Some(rk) = input.next_key() {
let rk = rk?;
let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
run.push(item);
if run.len() >= RUN_KEYS {
ExternalSortSource::<S>::flush_sorted_run(
&mut spill,
&mut runs_meta,
&mut run,
dir,
)?;
}
}
ExternalSortSource::<S>::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
let mut run_cursors = vec![0usize; runs_meta.len()];
let mut heap = std::collections::BinaryHeap::new();
for (i, m) in runs_meta.iter().enumerate() {
match read_spill_run_item(&mut spill, m, &mut run_cursors[i]) {
Ok(Some((none_flag, sort_key, pk))) => {
heap.push(HeapItem {
run_idx: i,
none_flag,
sort_key,
pk: pk.clone(),
dir,
});
}
Ok(None) => {}
Err(e) => return Err(e),
}
}
Ok(Self {
spill,
collection_id,
dir,
heap,
runs_meta,
run_cursors,
_snapshot: snapshot,
})
}
}
impl<S: Store> RowSource for ExternalSortSourceOwned<S> {
fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
let top = self.heap.pop()?;
let run_idx = top.run_idx;
let meta = self.runs_meta[run_idx].clone();
match read_spill_run_item(&mut self.spill, &meta, &mut self.run_cursors[run_idx]) {
Ok(Some((none_flag, sort_key, pk))) => {
self.heap.push(HeapItem {
run_idx,
none_flag,
sort_key,
pk: pk.clone(),
dir: self.dir,
});
}
Ok(None) => {}
Err(e) => return Some(Err(e)),
}
Some(Ok((CollectionId(self.collection_id), top.pk)))
}
}
fn plan_query(
collection: CollectionId,
indexes: &[crate::schema::IndexDef],
query: &Query,
) -> Plan {
let Some(pred) = query.predicate.clone() else {
return Plan::CollectionScan {
collection_id: collection.0,
predicate: None,
limit: query.limit,
order_by: query.order_by.clone(),
};
};
let (best, residual) = match choose_index(indexes, &pred) {
None => (None, Some(pred)),
Some(choice) => {
let used = match &choice {
IndexChoice::Eq { used, .. } | IndexChoice::Range { used, .. } => used.clone(),
};
let residual = remove_used_predicate(pred, used);
(Some(choice), residual)
}
};
if let Some(choice) = best {
match choice {
IndexChoice::Eq { idx, value, .. } => Plan::IndexLookup {
collection_id: collection.0,
index_name: idx.name.clone(),
kind: idx.kind,
key: value.canonical_key_bytes(),
residual,
limit: query.limit,
order_by: query.order_by.clone(),
},
IndexChoice::Range { idx, key_range, .. } => Plan::IndexRangeLookup {
collection_id: collection.0,
index_name: idx.name.clone(),
kind: idx.kind,
key_range,
residual,
limit: query.limit,
order_by: query.order_by.clone(),
},
}
} else {
Plan::CollectionScan {
collection_id: collection.0,
predicate: residual,
limit: query.limit,
order_by: query.order_by.clone(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
enum IndexChoice<'a> {
Eq {
idx: &'a crate::schema::IndexDef,
value: ScalarValue,
used: Predicate,
},
Range {
idx: &'a crate::schema::IndexDef,
key_range: IndexKeyRange,
used: Predicate,
},
}
fn is_range_indexable(value: &ScalarValue) -> bool {
matches!(value, ScalarValue::Int64(_) | ScalarValue::String(_))
}
fn merge_lo_bound(current: &mut (Option<ScalarValue>, bool), value: ScalarValue, inclusive: bool) {
match ¤t.0 {
None => *current = (Some(value), inclusive),
Some(existing) => match scalar_partial_cmp(&value, existing) {
Some(std::cmp::Ordering::Greater) => *current = (Some(value), inclusive),
Some(std::cmp::Ordering::Equal) if !inclusive => current.1 = false,
_ => {}
},
}
}
fn merge_hi_bound(current: &mut (Option<ScalarValue>, bool), value: ScalarValue, inclusive: bool) {
match ¤t.0 {
None => *current = (Some(value), inclusive),
Some(existing) => match scalar_partial_cmp(&value, existing) {
Some(std::cmp::Ordering::Less) => *current = (Some(value), inclusive),
Some(std::cmp::Ordering::Equal) if !inclusive => current.1 = false,
_ => {}
},
}
}
fn extract_range_on_path(path: &FieldPath, pred: &Predicate) -> Option<IndexKeyRange> {
let mut lo: (Option<ScalarValue>, bool) = (None, true);
let mut hi: (Option<ScalarValue>, bool) = (None, true);
let mut any = false;
let mut visit = |p: &Predicate| match p {
Predicate::Gte { path: pp, value } if pp == path && is_range_indexable(value) => {
merge_lo_bound(&mut lo, value.clone(), true);
any = true;
}
Predicate::Gt { path: pp, value } if pp == path && is_range_indexable(value) => {
merge_lo_bound(&mut lo, value.clone(), false);
any = true;
}
Predicate::Lte { path: pp, value } if pp == path && is_range_indexable(value) => {
merge_hi_bound(&mut hi, value.clone(), true);
any = true;
}
Predicate::Lt { path: pp, value } if pp == path && is_range_indexable(value) => {
merge_hi_bound(&mut hi, value.clone(), false);
any = true;
}
_ => {}
};
match pred {
Predicate::Gte { .. }
| Predicate::Gt { .. }
| Predicate::Lte { .. }
| Predicate::Lt { .. } => visit(pred),
Predicate::And(items) => {
for item in items {
visit(item);
}
}
_ => return None,
}
if !any {
return None;
}
Some(IndexKeyRange {
lo: lo.0,
lo_inclusive: lo.1,
hi: hi.0,
hi_inclusive: hi.1,
})
}
fn range_used_predicate(path: &FieldPath, pred: &Predicate) -> Option<Predicate> {
match pred {
Predicate::Gte { path: p, .. }
| Predicate::Gt { path: p, .. }
| Predicate::Lte { path: p, .. }
| Predicate::Lt { path: p, .. }
if p == path =>
{
Some(pred.clone())
}
Predicate::And(items) => {
let used: Vec<Predicate> = items
.iter()
.filter(|p| {
matches!(
p,
Predicate::Gte { path: pp, .. }
| Predicate::Gt { path: pp, .. }
| Predicate::Lte { path: pp, .. }
| Predicate::Lt { path: pp, .. } if pp == path
)
})
.cloned()
.collect();
match used.len() {
0 => None,
1 => Some(used.into_iter().next().unwrap()),
_ => Some(Predicate::And(used)),
}
}
_ => None,
}
}
fn try_range_index<'a>(
indexes: &'a [crate::schema::IndexDef],
pred: &Predicate,
) -> Option<IndexChoice<'a>> {
for idx in indexes {
if let Some(range) = extract_range_on_path(&idx.path, pred) {
if let Some(used) = range_used_predicate(&idx.path, pred) {
return Some(IndexChoice::Range {
idx,
key_range: range,
used,
});
}
}
}
None
}
fn choose_index<'a>(
indexes: &'a [crate::schema::IndexDef],
pred: &Predicate,
) -> Option<IndexChoice<'a>> {
match pred {
Predicate::Eq { path, value } => {
indexes
.iter()
.find(|idx| &idx.path == path)
.map(|idx| IndexChoice::Eq {
idx,
value: value.clone(),
used: pred.clone(),
})
}
Predicate::Lt { .. }
| Predicate::Lte { .. }
| Predicate::Gt { .. }
| Predicate::Gte { .. } => try_range_index(indexes, pred),
Predicate::Or(_) => None,
Predicate::And(items) => {
let range = try_range_index(indexes, pred);
let mut unique_eq: Option<IndexChoice<'a>> = None;
let mut any_eq: Option<IndexChoice<'a>> = None;
for p in items {
if let Some(IndexChoice::Eq { idx, value, used }) = choose_index(indexes, p) {
if idx.kind == IndexKind::Unique {
unique_eq = Some(IndexChoice::Eq { idx, value, used });
} else if any_eq.is_none() {
any_eq = Some(IndexChoice::Eq { idx, value, used });
}
}
}
if let Some(u) = unique_eq {
return Some(u);
}
if let Some(r) = range {
return Some(r);
}
any_eq
}
}
}
fn remove_used_predicate(pred: Predicate, used: Predicate) -> Option<Predicate> {
if pred == used {
return None;
}
match pred {
Predicate::And(items) => {
let mut out: Vec<Predicate> = items.into_iter().filter(|p| p != &used).collect();
match out.len() {
0 => None,
1 => Some(out.remove(0)),
_ => Some(Predicate::And(out)),
}
}
_ => Some(pred),
}
}
fn eval_predicate(row: &BTreeMap<String, RowValue>, pred: &Predicate) -> bool {
match pred {
Predicate::Eq { path, value } => scalar_at_path(row, path)
.map(|s| &s == value)
.unwrap_or(false),
Predicate::Lt { path, value } => scalar_at_path(row, path)
.and_then(|s| scalar_partial_cmp(&s, value))
.map(|o| o.is_lt())
.unwrap_or(false),
Predicate::Lte { path, value } => scalar_at_path(row, path)
.and_then(|s| scalar_partial_cmp(&s, value))
.map(|o| o.is_lt() || o.is_eq())
.unwrap_or(false),
Predicate::Gt { path, value } => scalar_at_path(row, path)
.and_then(|s| scalar_partial_cmp(&s, value))
.map(|o| o.is_gt())
.unwrap_or(false),
Predicate::Gte { path, value } => scalar_at_path(row, path)
.and_then(|s| scalar_partial_cmp(&s, value))
.map(|o| o.is_gt() || o.is_eq())
.unwrap_or(false),
Predicate::And(items) => items.iter().all(|p| eval_predicate(row, p)),
Predicate::Or(items) => items.iter().any(|p| eval_predicate(row, p)),
}
}
fn apply_order_by_and_limit(
rows: &mut Vec<BTreeMap<String, RowValue>>,
order_by: Option<&OrderBy>,
limit: Option<usize>,
pk_field: Option<&str>,
) {
const TOPK_ORDER_BY_LIMIT: usize = 1024;
if let Some(ob) = order_by {
let pk_path: Option<FieldPath> =
pk_field.map(|name| FieldPath(vec![Cow::Owned(name.to_string())]));
if let Some(n) = limit {
if n <= TOPK_ORDER_BY_LIMIT && rows.len() > n {
topk_order_by(rows, ob, n, pk_path.as_ref());
return;
}
}
rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path.as_ref()));
}
if let Some(n) = limit {
rows.truncate(n);
}
}
fn compare_rows_for_order(
a: &BTreeMap<String, RowValue>,
b: &BTreeMap<String, RowValue>,
ob: &OrderBy,
pk_path: Option<&FieldPath>,
) -> std::cmp::Ordering {
let av = scalar_at_path(a, &ob.path);
let bv = scalar_at_path(b, &ob.path);
let mut ord = match (av, bv) {
(None, None) => std::cmp::Ordering::Equal,
(None, Some(_)) => std::cmp::Ordering::Greater,
(Some(_), None) => std::cmp::Ordering::Less,
(Some(x), Some(y)) => scalar_sort_key_bytes(&x).cmp(&scalar_sort_key_bytes(&y)),
};
if ord == std::cmp::Ordering::Equal {
if let Some(path) = pk_path {
let apk = scalar_at_path(a, path);
let bpk = scalar_at_path(b, path);
ord = match (apk, bpk) {
(None, None) => std::cmp::Ordering::Equal,
(None, Some(_)) => std::cmp::Ordering::Greater,
(Some(_), None) => std::cmp::Ordering::Less,
(Some(x), Some(y)) => scalar_sort_key_bytes(&x).cmp(&scalar_sort_key_bytes(&y)),
};
}
}
match ob.direction {
OrderDirection::Asc => ord,
OrderDirection::Desc => ord.reverse(),
}
}
fn topk_order_by(
rows: &mut Vec<BTreeMap<String, RowValue>>,
ob: &OrderBy,
k: usize,
pk_path: Option<&FieldPath>,
) {
if rows.len() <= k {
rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path));
return;
}
rows.select_nth_unstable_by(k - 1, |a, b| compare_rows_for_order(a, b, ob, pk_path));
rows.truncate(k);
rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path));
}
fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<std::cmp::Ordering> {
use ScalarValue::*;
match (a, b) {
(Bool(x), Bool(y)) => Some(x.cmp(y)),
(Int64(x), Int64(y)) => Some(x.cmp(y)),
(Uint64(x), Uint64(y)) => Some(x.cmp(y)),
(Float64(x), Float64(y)) => x.partial_cmp(y),
(String(x), String(y)) => Some(x.cmp(y)),
(Bytes(x), Bytes(y)) => Some(x.cmp(y)),
(Uuid(x), Uuid(y)) => Some(x.cmp(y)),
(Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
_ => None,
}
}
#[cfg(test)]
mod tests {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/unit/src_query_planner_tests.rs"
));
}