use qubit_datatype::DataType;
use qubit_value::Value;
use super::metadata_field::MetadataField;
use super::metadata_schema::MetadataSchema;
use super::unknown_field_policy::UnknownFieldPolicy;
use crate::{
Condition,
MetadataError,
MetadataFilter,
MetadataResult,
};
impl MetadataSchema {
pub fn validate_filter(&self, filter: &MetadataFilter) -> MetadataResult<()> {
filter.visit_conditions(|condition| self.validate_condition(condition))
}
fn validate_condition(&self, condition: &Condition) -> MetadataResult<()> {
match condition {
Condition::Equal { key, value } => self.validate_value_condition(key, "eq", value),
Condition::NotEqual { key, value } => self.validate_value_condition(key, "ne", value),
Condition::Less { key, value } => self.validate_range_condition(key, "lt", value),
Condition::LessEqual { key, value } => self.validate_range_condition(key, "le", value),
Condition::Greater { key, value } => self.validate_range_condition(key, "gt", value),
Condition::GreaterEqual { key, value } => {
self.validate_range_condition(key, "ge", value)
}
Condition::In { key, values } => {
for value in values {
self.validate_value_condition(key, "in_set", value)?;
}
Ok(())
}
Condition::NotIn { key, values } => {
for value in values {
self.validate_value_condition(key, "not_in_set", value)?;
}
Ok(())
}
Condition::Exists { key } | Condition::NotExists { key } => {
self.filter_field(key)?;
Ok(())
}
}
}
fn validate_value_condition(
&self,
key: &str,
operator: &'static str,
value: &Value,
) -> MetadataResult<()> {
let Some(field) = self.filter_field(key)? else {
return Ok(());
};
if value_matches_field_type(value, field.data_type()) {
return Ok(());
}
Err(MetadataError::InvalidFilterOperator {
key: key.to_string(),
operator,
data_type: field.data_type(),
message: format!(
"filter value type {} is not compatible with field type {}",
value.data_type(),
field.data_type()
),
})
}
fn validate_range_condition(
&self,
key: &str,
operator: &'static str,
value: &Value,
) -> MetadataResult<()> {
let Some(field) = self.filter_field(key)? else {
return Ok(());
};
if !is_range_comparable_type(field.data_type()) {
return Err(MetadataError::InvalidFilterOperator {
key: key.to_string(),
operator,
data_type: field.data_type(),
message: "range operators require a numeric or string field".to_string(),
});
}
if value_matches_field_type(value, field.data_type()) {
return Ok(());
}
Err(MetadataError::InvalidFilterOperator {
key: key.to_string(),
operator,
data_type: field.data_type(),
message: format!(
"filter value type {} is not compatible with field type {}",
value.data_type(),
field.data_type()
),
})
}
fn filter_field(&self, key: &str) -> MetadataResult<Option<&MetadataField>> {
match self.field(key) {
Some(field) => Ok(Some(field)),
None if matches!(self.unknown_field_policy(), UnknownFieldPolicy::Allow) => Ok(None),
None => Err(MetadataError::UnknownFilterField {
key: key.to_string(),
}),
}
}
}
#[inline]
fn is_numeric_data_type(data_type: DataType) -> bool {
matches!(
data_type,
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int128
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::UInt128
| DataType::Float32
| DataType::Float64
| DataType::BigInteger
| DataType::BigDecimal
| DataType::IntSize
| DataType::UIntSize
)
}
#[inline]
fn is_range_comparable_type(data_type: DataType) -> bool {
is_numeric_data_type(data_type) || matches!(data_type, DataType::String)
}
#[inline]
fn value_matches_field_type(value: &Value, field_type: DataType) -> bool {
let value_type = value.data_type();
value_type == field_type || is_numeric_data_type(value_type) && is_numeric_data_type(field_type)
}