#[cfg(test)]
use crate::model::field::FieldModel;
use crate::{
db::{
direction::Direction,
executor::aggregate::capability::{
field_kind_supports_aggregate_ordering, field_kind_supports_numeric_aggregation,
},
numeric::{coerce_numeric_decimal, compare_numeric_or_strict_order},
query::plan::FieldSlot as PlannedFieldSlot,
},
error::InternalError,
model::field::FieldKind,
types::Decimal,
value::Value,
};
use std::cmp::Ordering;
use thiserror::Error as ThisError;
#[derive(Clone, Debug, ThisError)]
pub(in crate::db::executor) enum AggregateFieldValueError {
#[error("unknown aggregate target field: {field}")]
UnknownField { field: String },
#[error("aggregate target field does not support ordering: {field} kind={kind:?}")]
UnsupportedFieldKind { field: String, kind: FieldKind },
#[error("aggregate target field value missing on entity: {field}")]
MissingFieldValue { field: String },
#[error("aggregate target field value type mismatch: {field} kind={kind:?} value={value:?}")]
FieldValueTypeMismatch {
field: String,
kind: FieldKind,
value: Box<Value>,
},
#[error(
"aggregate target field values are incomparable under strict ordering: {field} left={left:?} right={right:?}"
)]
IncomparableFieldValues {
field: String,
left: Box<Value>,
right: Box<Value>,
},
}
impl AggregateFieldValueError {
pub(in crate::db::executor) fn into_internal_error(self) -> InternalError {
let message = self.to_string();
match self {
Self::UnknownField { .. } | Self::UnsupportedFieldKind { .. } => {
InternalError::executor_unsupported(message)
}
Self::MissingFieldValue { .. }
| Self::FieldValueTypeMismatch { .. }
| Self::IncomparableFieldValues { .. } => {
InternalError::query_executor_invariant(message)
}
}
}
}
#[cfg(test)]
fn field_model_with_index<'a>(
fields: &'a [FieldModel],
field: &str,
) -> Option<(usize, &'a FieldModel)> {
fields
.iter()
.enumerate()
.find(|(_, candidate)| candidate.name() == field)
}
#[derive(Clone, Copy, Debug)]
pub(in crate::db::executor) struct FieldSlot {
pub(in crate::db::executor) index: usize,
pub(in crate::db::executor) kind: FieldKind,
}
fn unknown_aggregate_target_field(target_field: &str) -> AggregateFieldValueError {
AggregateFieldValueError::UnknownField {
field: target_field.to_string(),
}
}
fn resolve_aggregate_target_slot(
index: usize,
target_field: &str,
kind: FieldKind,
supports_kind: Option<fn(&FieldKind) -> bool>,
) -> Result<FieldSlot, AggregateFieldValueError> {
if let Some(supports_kind) = supports_kind
&& !supports_kind(&kind)
{
return Err(AggregateFieldValueError::UnsupportedFieldKind {
field: target_field.to_string(),
kind,
});
}
Ok(FieldSlot { index, kind })
}
fn coerce_numeric_field_decimal_owned(
target_field: &str,
field_slot: FieldSlot,
value: Value,
) -> Result<Decimal, AggregateFieldValueError> {
let Some(decimal) = coerce_numeric_decimal(&value) else {
return Err(AggregateFieldValueError::FieldValueTypeMismatch {
field: target_field.to_string(),
kind: field_slot.kind,
value: Box::new(value),
});
};
Ok(decimal)
}
fn field_kind_matches_value(kind: &FieldKind, value: &Value) -> bool {
match (kind, value) {
(FieldKind::Account, Value::Account(_))
| (FieldKind::Blob, Value::Blob(_))
| (FieldKind::Bool, Value::Bool(_))
| (FieldKind::Date, Value::Date(_))
| (FieldKind::Decimal { .. }, Value::Decimal(_))
| (FieldKind::Duration, Value::Duration(_))
| (FieldKind::Enum { .. }, Value::Enum(_))
| (FieldKind::Float32, Value::Float32(_))
| (FieldKind::Float64, Value::Float64(_))
| (FieldKind::Int, Value::Int(_))
| (FieldKind::Int128, Value::Int128(_))
| (FieldKind::IntBig, Value::IntBig(_))
| (FieldKind::Principal, Value::Principal(_))
| (FieldKind::Subaccount, Value::Subaccount(_))
| (FieldKind::Text, Value::Text(_))
| (FieldKind::Timestamp, Value::Timestamp(_))
| (FieldKind::Uint, Value::Uint(_))
| (FieldKind::Uint128, Value::Uint128(_))
| (FieldKind::UintBig, Value::UintBig(_))
| (FieldKind::Ulid, Value::Ulid(_))
| (FieldKind::Unit, Value::Unit)
| (FieldKind::Structured { .. }, Value::List(_) | Value::Map(_)) => true,
(FieldKind::Relation { key_kind, .. }, value) => field_kind_matches_value(key_kind, value),
(FieldKind::List(inner) | FieldKind::Set(inner), Value::List(items)) => items
.iter()
.all(|item| field_kind_matches_value(inner, item)),
(FieldKind::Map { key, value }, Value::Map(entries)) => {
entries.iter().all(|(entry_key, entry_value)| {
field_kind_matches_value(key, entry_key)
&& field_kind_matches_value(value, entry_value)
})
}
_ => false,
}
}
fn direct_compare_orderable_field_values(
kind: &FieldKind,
left: &Value,
right: &Value,
) -> Option<Ordering> {
match (kind, left, right) {
(FieldKind::Decimal { .. }, Value::Decimal(left), Value::Decimal(right)) => {
left.partial_cmp(right)
}
(FieldKind::Float32, Value::Float32(left), Value::Float32(right)) => {
left.get().partial_cmp(&right.get())
}
(FieldKind::Float64, Value::Float64(left), Value::Float64(right)) => {
left.get().partial_cmp(&right.get())
}
(FieldKind::Int, Value::Int(left), Value::Int(right)) => Some(left.cmp(right)),
(FieldKind::Int128, Value::Int128(left), Value::Int128(right)) => {
Some(left.get().cmp(&right.get()))
}
(FieldKind::Uint, Value::Uint(left), Value::Uint(right)) => Some(left.cmp(right)),
(FieldKind::Uint128, Value::Uint128(left), Value::Uint128(right)) => {
Some(left.get().cmp(&right.get()))
}
(FieldKind::Relation { key_kind, .. }, left, right) => {
direct_compare_orderable_field_values(key_kind, left, right)
}
_ => None,
}
}
#[cfg(test)]
pub(in crate::db::executor) fn resolve_orderable_aggregate_target_slot_from_fields(
fields: &[FieldModel],
target_field: &str,
) -> Result<FieldSlot, AggregateFieldValueError> {
let Some((index, field)) = field_model_with_index(fields, target_field) else {
return Err(unknown_aggregate_target_field(target_field));
};
resolve_aggregate_target_slot(
index,
target_field,
field.kind(),
Some(field_kind_supports_aggregate_ordering),
)
}
pub(in crate::db::executor) fn resolve_orderable_aggregate_target_slot_from_planner_slot(
field_slot: &PlannedFieldSlot,
) -> Result<FieldSlot, AggregateFieldValueError> {
let target_field = field_slot.field();
let Some(kind) = field_slot.kind() else {
return Err(unknown_aggregate_target_field(target_field));
};
resolve_aggregate_target_slot(
field_slot.index(),
target_field,
kind,
Some(field_kind_supports_aggregate_ordering),
)
}
#[cfg(test)]
pub(in crate::db::executor) fn resolve_any_aggregate_target_slot_from_fields(
fields: &[FieldModel],
target_field: &str,
) -> Result<FieldSlot, AggregateFieldValueError> {
let Some((index, field)) = field_model_with_index(fields, target_field) else {
return Err(unknown_aggregate_target_field(target_field));
};
resolve_aggregate_target_slot(index, target_field, field.kind(), None)
}
pub(in crate::db::executor) fn resolve_any_aggregate_target_slot_from_planner_slot(
field_slot: &PlannedFieldSlot,
) -> Result<FieldSlot, AggregateFieldValueError> {
let target_field = field_slot.field();
let Some(kind) = field_slot.kind() else {
return Err(unknown_aggregate_target_field(target_field));
};
resolve_aggregate_target_slot(field_slot.index(), target_field, kind, None)
}
#[cfg(test)]
pub(in crate::db::executor) fn resolve_numeric_aggregate_target_slot_from_fields(
fields: &[FieldModel],
target_field: &str,
) -> Result<FieldSlot, AggregateFieldValueError> {
let Some((index, field)) = field_model_with_index(fields, target_field) else {
return Err(unknown_aggregate_target_field(target_field));
};
resolve_aggregate_target_slot(
index,
target_field,
field.kind(),
Some(field_kind_supports_numeric_aggregation),
)
}
pub(in crate::db::executor) fn resolve_numeric_aggregate_target_slot_from_planner_slot(
field_slot: &PlannedFieldSlot,
) -> Result<FieldSlot, AggregateFieldValueError> {
let target_field = field_slot.field();
let Some(kind) = field_slot.kind() else {
return Err(unknown_aggregate_target_field(target_field));
};
resolve_aggregate_target_slot(
field_slot.index(),
target_field,
kind,
Some(field_kind_supports_numeric_aggregation),
)
}
pub(in crate::db::executor) fn extract_orderable_field_value_with_slot_reader(
target_field: &str,
field_slot: FieldSlot,
read_slot: &mut dyn FnMut(usize) -> Option<Value>,
) -> Result<Value, AggregateFieldValueError> {
let Some(value) = read_slot(field_slot.index) else {
return Err(AggregateFieldValueError::MissingFieldValue {
field: target_field.to_string(),
});
};
if !field_kind_matches_value(&field_slot.kind, &value) {
return Err(AggregateFieldValueError::FieldValueTypeMismatch {
field: target_field.to_string(),
kind: field_slot.kind,
value: Box::new(value),
});
}
Ok(value)
}
pub(in crate::db::executor) fn extract_orderable_field_value_with_slot_ref_reader<'a>(
target_field: &str,
field_slot: FieldSlot,
read_slot: &mut dyn FnMut(usize) -> Option<&'a Value>,
) -> Result<&'a Value, AggregateFieldValueError> {
let Some(value) = read_slot(field_slot.index) else {
return Err(AggregateFieldValueError::MissingFieldValue {
field: target_field.to_string(),
});
};
if !field_kind_matches_value(&field_slot.kind, value) {
return Err(AggregateFieldValueError::FieldValueTypeMismatch {
field: target_field.to_string(),
kind: field_slot.kind,
value: Box::new(value.clone()),
});
}
Ok(value)
}
pub(in crate::db::executor) fn extract_orderable_field_value_from_decoded_slot(
target_field: &str,
field_slot: FieldSlot,
decoded_value: Option<Value>,
) -> Result<Value, AggregateFieldValueError> {
let mut decoded_value = decoded_value;
extract_orderable_field_value_with_slot_reader(target_field, field_slot, &mut |_| {
decoded_value.take()
})
}
#[cfg(test)]
pub(in crate::db::executor) fn extract_numeric_field_decimal_with_slot_reader(
target_field: &str,
field_slot: FieldSlot,
read_slot: &mut dyn FnMut(usize) -> Option<Value>,
) -> Result<Decimal, AggregateFieldValueError> {
let value =
extract_orderable_field_value_with_slot_reader(target_field, field_slot, read_slot)?;
coerce_numeric_field_decimal_owned(target_field, field_slot, value)
}
pub(in crate::db::executor) fn extract_numeric_field_decimal_with_slot_ref_reader<'a>(
target_field: &str,
field_slot: FieldSlot,
read_slot: &mut dyn FnMut(usize) -> Option<&'a Value>,
) -> Result<Decimal, AggregateFieldValueError> {
let value =
extract_orderable_field_value_with_slot_ref_reader(target_field, field_slot, read_slot)?;
coerce_numeric_field_decimal_owned(target_field, field_slot, value.clone())
}
pub(in crate::db::executor) fn extract_numeric_field_decimal_from_decoded_slot(
target_field: &str,
field_slot: FieldSlot,
decoded_value: Option<Value>,
) -> Result<Decimal, AggregateFieldValueError> {
let value =
extract_orderable_field_value_from_decoded_slot(target_field, field_slot, decoded_value)?;
coerce_numeric_field_decimal_owned(target_field, field_slot, value)
}
pub(in crate::db::executor) fn compare_orderable_field_values(
target_field: &str,
left: &Value,
right: &Value,
) -> Result<Ordering, AggregateFieldValueError> {
let Some(ordering) = compare_numeric_or_strict_order(left, right) else {
return Err(AggregateFieldValueError::IncomparableFieldValues {
field: target_field.to_string(),
left: Box::new(left.clone()),
right: Box::new(right.clone()),
});
};
Ok(ordering)
}
pub(in crate::db::executor) fn compare_orderable_field_values_with_slot(
target_field: &str,
field_slot: FieldSlot,
left: &Value,
right: &Value,
) -> Result<Ordering, AggregateFieldValueError> {
if let Some(ordering) = direct_compare_orderable_field_values(&field_slot.kind, left, right) {
return Ok(ordering);
}
compare_orderable_field_values(target_field, left, right)
}
#[must_use]
pub(in crate::db::executor) const fn apply_aggregate_direction(
ordering: Ordering,
direction: Direction,
) -> Ordering {
match direction {
Direction::Asc => ordering,
Direction::Desc => ordering.reverse(),
}
}
#[cfg(test)]
mod tests;