Skip to main content

qubit_metadata/schema/
filter_validation.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Filter validation support for [`MetadataSchema`].
11
12use qubit_datatype::DataType;
13use qubit_value::Value;
14
15use super::metadata_field::MetadataField;
16use super::metadata_schema::MetadataSchema;
17use super::unknown_field_policy::UnknownFieldPolicy;
18use crate::{
19    Condition,
20    MetadataError,
21    MetadataFilter,
22    MetadataResult,
23    MetadataValidationError,
24    MetadataValidationResult,
25    NumberComparisonPolicy,
26};
27
28impl MetadataSchema {
29    /// Validates a metadata filter against this schema.
30    ///
31    /// # Errors
32    ///
33    /// Returns an aggregate error containing every unknown field, invalid range
34    /// operator, and incompatible filter value discovered during this validation
35    /// pass. Unknown filter fields are accepted when the schema's
36    /// [`UnknownFieldPolicy`] is [`UnknownFieldPolicy::Allow`].
37    pub fn validate_filter(&self, filter: &MetadataFilter) -> MetadataValidationResult<()> {
38        let mut issues = Vec::new();
39        let number_comparison_policy = filter.options().number_comparison_policy;
40        if let Err(error) = filter.visit_conditions(|condition| {
41            self.collect_condition_issues(condition, number_comparison_policy, &mut issues);
42            Ok(())
43        }) {
44            issues.push(error);
45        }
46        if let Some(error) = MetadataValidationError::from_issues(issues) {
47            Err(error)
48        } else {
49            Ok(())
50        }
51    }
52
53    /// Collects every issue for one filter condition.
54    fn collect_condition_issues(
55        &self,
56        condition: &Condition,
57        number_comparison_policy: NumberComparisonPolicy,
58        issues: &mut Vec<MetadataError>,
59    ) {
60        match condition {
61            Condition::Equal { key, value } => collect_issue(
62                issues,
63                self.validate_value_condition(key, "eq", value, number_comparison_policy),
64            ),
65            Condition::NotEqual { key, value } => collect_issue(
66                issues,
67                self.validate_value_condition(key, "ne", value, number_comparison_policy),
68            ),
69            Condition::Less { key, value } => collect_issue(
70                issues,
71                self.validate_range_condition(key, "lt", value, number_comparison_policy),
72            ),
73            Condition::LessEqual { key, value } => collect_issue(
74                issues,
75                self.validate_range_condition(key, "le", value, number_comparison_policy),
76            ),
77            Condition::Greater { key, value } => collect_issue(
78                issues,
79                self.validate_range_condition(key, "gt", value, number_comparison_policy),
80            ),
81            Condition::GreaterEqual { key, value } => collect_issue(
82                issues,
83                self.validate_range_condition(key, "ge", value, number_comparison_policy),
84            ),
85            Condition::In { key, values } => {
86                self.collect_set_value_condition_issues(key, "in_set", values, number_comparison_policy, issues);
87            }
88            Condition::NotIn { key, values } => {
89                self.collect_set_value_condition_issues(key, "not_in_set", values, number_comparison_policy, issues);
90            }
91            Condition::Exists { key } | Condition::NotExists { key } => {
92                collect_issue(issues, self.filter_field(key).map(|_| ()));
93            }
94        }
95    }
96
97    /// Collects field and value issues for a set-membership condition.
98    fn collect_set_value_condition_issues(
99        &self,
100        key: &str,
101        operator: &'static str,
102        values: &[Value],
103        number_comparison_policy: NumberComparisonPolicy,
104        issues: &mut Vec<MetadataError>,
105    ) {
106        let field = match self.filter_field(key) {
107            Ok(field) => field,
108            Err(error) => {
109                issues.push(error);
110                return;
111            }
112        };
113        let Some(field) = field else {
114            return;
115        };
116        for value in values {
117            if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
118                continue;
119            }
120            issues.push(MetadataError::InvalidFilterOperator {
121                key: key.to_string(),
122                operator,
123                data_type: field.data_type(),
124                message: format!(
125                    "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
126                    value.data_type(),
127                    field.data_type(),
128                    number_comparison_policy
129                ),
130            });
131        }
132    }
133
134    /// Validates a non-range value condition.
135    fn validate_value_condition(
136        &self,
137        key: &str,
138        operator: &'static str,
139        value: &Value,
140        number_comparison_policy: NumberComparisonPolicy,
141    ) -> MetadataResult<()> {
142        let Some(field) = self.filter_field(key)? else {
143            return Ok(());
144        };
145        if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
146            return Ok(());
147        }
148        Err(MetadataError::InvalidFilterOperator {
149            key: key.to_string(),
150            operator,
151            data_type: field.data_type(),
152            message: format!(
153                "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
154                value.data_type(),
155                field.data_type(),
156                number_comparison_policy
157            ),
158        })
159    }
160
161    /// Validates a range value condition.
162    fn validate_range_condition(
163        &self,
164        key: &str,
165        operator: &'static str,
166        value: &Value,
167        number_comparison_policy: NumberComparisonPolicy,
168    ) -> MetadataResult<()> {
169        let Some(field) = self.filter_field(key)? else {
170            return Ok(());
171        };
172        if !is_range_comparable_type(field.data_type()) {
173            return Err(MetadataError::InvalidFilterOperator {
174                key: key.to_string(),
175                operator,
176                data_type: field.data_type(),
177                message: "range operators require a numeric or string field".to_string(),
178            });
179        }
180        if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
181            return Ok(());
182        }
183        Err(MetadataError::InvalidFilterOperator {
184            key: key.to_string(),
185            operator,
186            data_type: field.data_type(),
187            message: format!(
188                "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
189                value.data_type(),
190                field.data_type(),
191                number_comparison_policy
192            ),
193        })
194    }
195
196    /// Returns the declared filter field, or accepts unknown fields when allowed.
197    fn filter_field(&self, key: &str) -> MetadataResult<Option<&MetadataField>> {
198        match self.field(key) {
199            Some(field) => Ok(Some(field)),
200            None if matches!(self.unknown_field_policy(), UnknownFieldPolicy::Allow) => Ok(None),
201            None => Err(MetadataError::UnknownFilterField { key: key.to_string() }),
202        }
203    }
204}
205
206/// Appends `result` to the issue list when it contains a validation error.
207#[inline]
208fn collect_issue(issues: &mut Vec<MetadataError>, result: MetadataResult<()>) {
209    if let Err(error) = result {
210        issues.push(error);
211    }
212}
213
214/// Returns `true` when `data_type` is numeric.
215#[inline]
216fn is_numeric_data_type(data_type: DataType) -> bool {
217    matches!(
218        data_type,
219        DataType::Int8
220            | DataType::Int16
221            | DataType::Int32
222            | DataType::Int64
223            | DataType::Int128
224            | DataType::UInt8
225            | DataType::UInt16
226            | DataType::UInt32
227            | DataType::UInt64
228            | DataType::UInt128
229            | DataType::Float32
230            | DataType::Float64
231            | DataType::BigInteger
232            | DataType::BigDecimal
233            | DataType::IntSize
234            | DataType::UIntSize
235    )
236}
237
238/// Returns `true` when `data_type` is a primitive floating-point type.
239#[inline]
240fn is_float_data_type(data_type: DataType) -> bool {
241    matches!(data_type, DataType::Float32 | DataType::Float64)
242}
243
244/// Returns `true` when `data_type` is a big-number type.
245#[inline]
246fn is_big_number_data_type(data_type: DataType) -> bool {
247    matches!(data_type, DataType::BigInteger | DataType::BigDecimal)
248}
249
250/// Returns `true` when `data_type` supports range comparisons.
251#[inline]
252fn is_range_comparable_type(data_type: DataType) -> bool {
253    is_numeric_data_type(data_type) || matches!(data_type, DataType::String)
254}
255
256/// Returns `true` when a filter value is compatible with a schema field type.
257#[inline]
258fn value_matches_field_type(
259    value: &Value,
260    field_type: DataType,
261    number_comparison_policy: NumberComparisonPolicy,
262) -> bool {
263    let value_type = value.data_type();
264    if value_type == field_type {
265        return true;
266    }
267    if !is_numeric_data_type(value_type) || !is_numeric_data_type(field_type) {
268        return false;
269    }
270    if matches!(number_comparison_policy, NumberComparisonPolicy::Approximate) {
271        return true;
272    }
273    value_matches_numeric_field_conservatively(value, field_type)
274}
275
276/// Returns `true` when conservative runtime numeric comparison can handle the pair.
277fn value_matches_numeric_field_conservatively(value: &Value, field_type: DataType) -> bool {
278    let value_type = value.data_type();
279    if !is_float_data_type(value_type) && !is_float_data_type(field_type) {
280        return true;
281    }
282    if is_float_data_type(value_type) && is_float_data_type(field_type) {
283        return true;
284    }
285    if is_big_number_data_type(value_type) || is_big_number_data_type(field_type) {
286        return false;
287    }
288    if is_float_data_type(value_type) {
289        return float_value_fits_integer_field(value, field_type);
290    }
291    integer_value_is_safe_for_float_field(value)
292}
293
294const MAX_SAFE_INTEGER_F64_U128: u128 = 9_007_199_254_740_992;
295const I64_MIN_F64: f64 = -9_223_372_036_854_775_808.0;
296const I64_EXCLUSIVE_MAX_F64: f64 = 9_223_372_036_854_775_808.0;
297const U64_EXCLUSIVE_MAX_F64: f64 = 18_446_744_073_709_551_616.0;
298
299/// Extracts a finite floating-point literal from a filter value.
300#[inline]
301fn finite_float_value(value: &Value) -> Option<f64> {
302    let number = value.to::<f64>().ok()?;
303    number.is_finite().then_some(number)
304}
305
306/// Returns `true` when a float literal can be compared exactly to an integer field.
307fn float_value_fits_integer_field(value: &Value, field_type: DataType) -> bool {
308    let Some(number) = finite_float_value(value) else {
309        return false;
310    };
311    if number.fract() != 0.0 {
312        return false;
313    }
314    if matches!(field_type, DataType::Int128 | DataType::UInt128) {
315        return false;
316    }
317    if matches!(
318        field_type,
319        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::IntSize
320    ) {
321        return (I64_MIN_F64..I64_EXCLUSIVE_MAX_F64).contains(&number);
322    }
323    matches!(
324        field_type,
325        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | DataType::UIntSize
326    ) && (0.0..U64_EXCLUSIVE_MAX_F64).contains(&number)
327}
328
329/// Returns `true` when an integer literal can be compared exactly to a float field.
330fn integer_value_is_safe_for_float_field(value: &Value) -> bool {
331    if let Ok(value) = value.to::<i128>() {
332        return value.unsigned_abs() <= MAX_SAFE_INTEGER_F64_U128;
333    }
334    value.to::<u128>().is_ok_and(|value| value <= MAX_SAFE_INTEGER_F64_U128)
335}