use crate::{
db::{
executor::group::{GroupKey, KeyCanonicalError, StableHash, stable_hash_from_digest},
executor::projection::materialize::row_view::RowView,
query::plan::PageSpec,
},
error::InternalError,
value::{Value, ValueHashWriter},
};
use std::collections::HashMap;
#[cfg(feature = "sql")]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(super) struct ProjectionDistinctWindow {
offset: usize,
limit: Option<usize>,
}
#[cfg(feature = "sql")]
impl ProjectionDistinctWindow {
pub(super) fn from_page(page: Option<&PageSpec>) -> Self {
Self {
offset: page.map_or(0, |page| usize::try_from(page.offset).unwrap_or(usize::MAX)),
limit: page.and_then(|page| {
page.limit
.map(|limit| usize::try_from(limit).unwrap_or(usize::MAX))
}),
}
}
const fn output_is_empty(self) -> bool {
matches!(self.limit, Some(0))
}
fn output_capacity(self) -> usize {
self.limit.unwrap_or(0)
}
fn stop_after_distinct_count(self) -> Option<usize> {
self.limit.map(|limit| self.offset.saturating_add(limit))
}
}
#[cfg(feature = "sql")]
struct DistinctProjectionRowSet {
buckets: HashMap<StableHash, Vec<Value>>,
}
#[cfg(feature = "sql")]
impl DistinctProjectionRowSet {
fn new() -> Self {
Self {
buckets: HashMap::new(),
}
}
fn insert_row(&mut self, row: &RowView<'_>) -> Result<bool, KeyCanonicalError> {
if projected_row_requires_owned_canonical_lookup(row.values()) {
return self.insert_row_with_owned_canonicalization(row);
}
let hash = stable_hash_projected_row(row)?;
if self
.buckets
.get(&hash)
.is_some_and(|bucket| bucket.iter().any(|key| projected_row_matches_key(row, key)))
{
return Ok(false);
}
self.buckets
.entry(hash)
.or_default()
.push(canonical_projected_row_value(row)?);
Ok(true)
}
fn insert_row_with_owned_canonicalization(
&mut self,
row: &RowView<'_>,
) -> Result<bool, KeyCanonicalError> {
let key = GroupKey::from_group_values(row.values().to_vec())?;
let hash = key.hash();
let canonical = key.into_canonical_value();
let bucket = self.buckets.entry(hash).or_default();
if bucket.iter().any(|existing| existing == &canonical) {
return Ok(false);
}
bucket.push(canonical);
Ok(true)
}
}
#[cfg(feature = "sql")]
fn canonical_projected_row_value(row: &RowView<'_>) -> Result<Value, KeyCanonicalError> {
GroupKey::from_group_values(row.values().to_vec()).map(GroupKey::into_canonical_value)
}
#[cfg(feature = "sql")]
fn stable_hash_projected_row(row: &RowView<'_>) -> Result<StableHash, KeyCanonicalError> {
let mut hash_writer = ValueHashWriter::new();
hash_writer.write_list_prefix(row.values().len());
for value in row.values() {
hash_writer
.write_list_value(value)
.map_err(|err| KeyCanonicalError::HashingFailed {
reason: err.display_with_class(),
})?;
}
Ok(stable_hash_from_digest(hash_writer.finish()))
}
#[cfg(feature = "sql")]
fn projected_row_requires_owned_canonical_lookup(row: &[Value]) -> bool {
row.iter().any(value_requires_owned_canonical_lookup)
}
#[cfg(feature = "sql")]
fn value_requires_owned_canonical_lookup(value: &Value) -> bool {
match value {
Value::Map(_) => true,
Value::List(items) => items.iter().any(value_requires_owned_canonical_lookup),
_ => false,
}
}
#[cfg(feature = "sql")]
fn projected_row_matches_key(row: &RowView<'_>, key: &Value) -> bool {
let Value::List(key_values) = key else {
return false;
};
if row.values().len() != key_values.len() {
return false;
}
key_values
.iter()
.enumerate()
.all(|(index, canonical)| value_matches_canonical_key(row.get(index), canonical))
}
#[cfg(feature = "sql")]
fn value_matches_canonical_key(value: &Value, canonical: &Value) -> bool {
match (value, canonical) {
(Value::Decimal(value), Value::Decimal(canonical)) => value.normalize() == *canonical,
(Value::List(values), Value::List(canonical_values)) => {
values.len() == canonical_values.len()
&& values
.iter()
.zip(canonical_values)
.all(|(value, canonical)| value_matches_canonical_key(value, canonical))
}
(Value::Map(_), _) => false,
_ => value == canonical,
}
}
#[cfg(feature = "sql")]
struct DistinctProjectionAccumulator {
distinct_rows: DistinctProjectionRowSet,
output_rows: Vec<RowView<'static>>,
window: ProjectionDistinctWindow,
distinct_seen: usize,
}
#[cfg(feature = "sql")]
impl DistinctProjectionAccumulator {
fn new(window: ProjectionDistinctWindow) -> Self {
Self {
distinct_rows: DistinctProjectionRowSet::new(),
output_rows: Vec::with_capacity(window.output_capacity()),
window,
distinct_seen: 0,
}
}
fn consider_row(
&mut self,
row: RowView<'static>,
mut record_bounded_stop: impl FnMut(),
) -> Result<bool, InternalError> {
let inserted = self
.distinct_rows
.insert_row(&row)
.map_err(KeyCanonicalError::into_internal_error)?;
if !inserted {
return Ok(true);
}
let distinct_index = self.distinct_seen;
self.distinct_seen = self.distinct_seen.saturating_add(1);
if distinct_index >= self.window.offset {
self.output_rows.push(row);
}
let Some(stop_after) = self.window.stop_after_distinct_count() else {
return Ok(true);
};
if self.distinct_seen >= stop_after {
record_bounded_stop();
return Ok(false);
}
Ok(true)
}
fn into_rows(self) -> Vec<RowView<'static>> {
self.output_rows
}
}
#[cfg(feature = "sql")]
pub(super) fn collect_bounded_distinct_projected_rows<I>(
window: ProjectionDistinctWindow,
rows: impl IntoIterator<Item = I>,
mut record_candidate_row: impl FnMut(),
mut record_bounded_stop: impl FnMut(),
mut project_row: impl FnMut(I) -> Result<RowView<'static>, InternalError>,
) -> Result<Vec<RowView<'static>>, InternalError> {
if window.output_is_empty() {
return Ok(Vec::new());
}
let mut accumulator = DistinctProjectionAccumulator::new(window);
for row in rows {
let projected = project_row(row)?;
record_candidate_row();
if !accumulator.consider_row(projected, &mut record_bounded_stop)? {
break;
}
}
Ok(accumulator.into_rows())
}