Skip to main content

qubit_metadata/schema/
filter_validation.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Filter validation support for [`MetadataSchema`].
11
12use qubit_datatype::DataType;
13use qubit_value::Value;
14
15use super::metadata_field::MetadataField;
16use super::metadata_schema::MetadataSchema;
17use super::unknown_field_policy::UnknownFieldPolicy;
18use crate::{
19    Condition,
20    MetadataError,
21    MetadataFilter,
22    MetadataResult,
23    MetadataValidationError,
24    MetadataValidationResult,
25    NumberComparisonPolicy,
26};
27
28impl MetadataSchema {
29    /// Validates a metadata filter against this schema.
30    ///
31    /// # Errors
32    ///
33    /// Returns an aggregate error containing every unknown field, invalid range
34    /// operator, and incompatible filter value discovered during this validation
35    /// pass. Unknown filter fields are accepted when the schema's
36    /// [`UnknownFieldPolicy`] is [`UnknownFieldPolicy::Allow`].
37    pub fn validate_filter(&self, filter: &MetadataFilter) -> MetadataValidationResult<()> {
38        let mut issues = Vec::new();
39        let number_comparison_policy = filter.options().number_comparison_policy;
40        if let Err(error) = filter.visit_conditions(|condition| {
41            self.collect_condition_issues(condition, number_comparison_policy, &mut issues);
42            Ok(())
43        }) {
44            issues.push(error);
45        }
46        if let Some(error) = MetadataValidationError::from_issues(issues) {
47            Err(error)
48        } else {
49            Ok(())
50        }
51    }
52
53    /// Collects every issue for one filter condition.
54    fn collect_condition_issues(
55        &self,
56        condition: &Condition,
57        number_comparison_policy: NumberComparisonPolicy,
58        issues: &mut Vec<MetadataError>,
59    ) {
60        match condition {
61            Condition::Equal { key, value } => collect_issue(
62                issues,
63                self.validate_value_condition(key, "eq", value, number_comparison_policy),
64            ),
65            Condition::NotEqual { key, value } => collect_issue(
66                issues,
67                self.validate_value_condition(key, "ne", value, number_comparison_policy),
68            ),
69            Condition::Less { key, value } => collect_issue(
70                issues,
71                self.validate_range_condition(key, "lt", value, number_comparison_policy),
72            ),
73            Condition::LessEqual { key, value } => collect_issue(
74                issues,
75                self.validate_range_condition(key, "le", value, number_comparison_policy),
76            ),
77            Condition::Greater { key, value } => collect_issue(
78                issues,
79                self.validate_range_condition(key, "gt", value, number_comparison_policy),
80            ),
81            Condition::GreaterEqual { key, value } => collect_issue(
82                issues,
83                self.validate_range_condition(key, "ge", value, number_comparison_policy),
84            ),
85            Condition::In { key, values } => {
86                self.collect_set_value_condition_issues(
87                    key,
88                    "in_set",
89                    values,
90                    number_comparison_policy,
91                    issues,
92                );
93            }
94            Condition::NotIn { key, values } => {
95                self.collect_set_value_condition_issues(
96                    key,
97                    "not_in_set",
98                    values,
99                    number_comparison_policy,
100                    issues,
101                );
102            }
103            Condition::Exists { key } | Condition::NotExists { key } => {
104                collect_issue(issues, self.filter_field(key).map(|_| ()));
105            }
106        }
107    }
108
109    /// Collects field and value issues for a set-membership condition.
110    fn collect_set_value_condition_issues(
111        &self,
112        key: &str,
113        operator: &'static str,
114        values: &[Value],
115        number_comparison_policy: NumberComparisonPolicy,
116        issues: &mut Vec<MetadataError>,
117    ) {
118        let field = match self.filter_field(key) {
119            Ok(field) => field,
120            Err(error) => {
121                issues.push(error);
122                return;
123            }
124        };
125        let Some(field) = field else {
126            return;
127        };
128        for value in values {
129            if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
130                continue;
131            }
132            issues.push(MetadataError::InvalidFilterOperator {
133                key: key.to_string(),
134                operator,
135                data_type: field.data_type(),
136                message: format!(
137                    "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
138                    value.data_type(),
139                    field.data_type(),
140                    number_comparison_policy
141                ),
142            });
143        }
144    }
145
146    /// Validates a non-range value condition.
147    fn validate_value_condition(
148        &self,
149        key: &str,
150        operator: &'static str,
151        value: &Value,
152        number_comparison_policy: NumberComparisonPolicy,
153    ) -> MetadataResult<()> {
154        let Some(field) = self.filter_field(key)? else {
155            return Ok(());
156        };
157        if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
158            return Ok(());
159        }
160        Err(MetadataError::InvalidFilterOperator {
161            key: key.to_string(),
162            operator,
163            data_type: field.data_type(),
164            message: format!(
165                "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
166                value.data_type(),
167                field.data_type(),
168                number_comparison_policy
169            ),
170        })
171    }
172
173    /// Validates a range value condition.
174    fn validate_range_condition(
175        &self,
176        key: &str,
177        operator: &'static str,
178        value: &Value,
179        number_comparison_policy: NumberComparisonPolicy,
180    ) -> MetadataResult<()> {
181        let Some(field) = self.filter_field(key)? else {
182            return Ok(());
183        };
184        if !is_range_comparable_type(field.data_type()) {
185            return Err(MetadataError::InvalidFilterOperator {
186                key: key.to_string(),
187                operator,
188                data_type: field.data_type(),
189                message: "range operators require a numeric or string field".to_string(),
190            });
191        }
192        if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
193            return Ok(());
194        }
195        Err(MetadataError::InvalidFilterOperator {
196            key: key.to_string(),
197            operator,
198            data_type: field.data_type(),
199            message: format!(
200                "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
201                value.data_type(),
202                field.data_type(),
203                number_comparison_policy
204            ),
205        })
206    }
207
208    /// Returns the declared filter field, or accepts unknown fields when allowed.
209    fn filter_field(&self, key: &str) -> MetadataResult<Option<&MetadataField>> {
210        match self.field(key) {
211            Some(field) => Ok(Some(field)),
212            None if matches!(self.unknown_field_policy(), UnknownFieldPolicy::Allow) => Ok(None),
213            None => Err(MetadataError::UnknownFilterField {
214                key: key.to_string(),
215            }),
216        }
217    }
218}
219
220/// Appends `result` to the issue list when it contains a validation error.
221#[inline]
222fn collect_issue(issues: &mut Vec<MetadataError>, result: MetadataResult<()>) {
223    if let Err(error) = result {
224        issues.push(error);
225    }
226}
227
228/// Returns `true` when `data_type` is numeric.
229#[inline]
230fn is_numeric_data_type(data_type: DataType) -> bool {
231    matches!(
232        data_type,
233        DataType::Int8
234            | DataType::Int16
235            | DataType::Int32
236            | DataType::Int64
237            | DataType::Int128
238            | DataType::UInt8
239            | DataType::UInt16
240            | DataType::UInt32
241            | DataType::UInt64
242            | DataType::UInt128
243            | DataType::Float32
244            | DataType::Float64
245            | DataType::BigInteger
246            | DataType::BigDecimal
247            | DataType::IntSize
248            | DataType::UIntSize
249    )
250}
251
252/// Returns `true` when `data_type` is a primitive floating-point type.
253#[inline]
254fn is_float_data_type(data_type: DataType) -> bool {
255    matches!(data_type, DataType::Float32 | DataType::Float64)
256}
257
258/// Returns `true` when `data_type` is a big-number type.
259#[inline]
260fn is_big_number_data_type(data_type: DataType) -> bool {
261    matches!(data_type, DataType::BigInteger | DataType::BigDecimal)
262}
263
264/// Returns `true` when `data_type` supports range comparisons.
265#[inline]
266fn is_range_comparable_type(data_type: DataType) -> bool {
267    is_numeric_data_type(data_type) || matches!(data_type, DataType::String)
268}
269
270/// Returns `true` when a filter value is compatible with a schema field type.
271#[inline]
272fn value_matches_field_type(
273    value: &Value,
274    field_type: DataType,
275    number_comparison_policy: NumberComparisonPolicy,
276) -> bool {
277    let value_type = value.data_type();
278    if value_type == field_type {
279        return true;
280    }
281    if !is_numeric_data_type(value_type) || !is_numeric_data_type(field_type) {
282        return false;
283    }
284    if matches!(
285        number_comparison_policy,
286        NumberComparisonPolicy::Approximate
287    ) {
288        return true;
289    }
290    value_matches_numeric_field_conservatively(value, field_type)
291}
292
293/// Returns `true` when conservative runtime numeric comparison can handle the pair.
294fn value_matches_numeric_field_conservatively(value: &Value, field_type: DataType) -> bool {
295    let value_type = value.data_type();
296    if !is_float_data_type(value_type) && !is_float_data_type(field_type) {
297        return true;
298    }
299    if is_float_data_type(value_type) && is_float_data_type(field_type) {
300        return true;
301    }
302    if is_big_number_data_type(value_type) || is_big_number_data_type(field_type) {
303        return false;
304    }
305    if is_float_data_type(value_type) {
306        return float_value_fits_integer_field(value, field_type);
307    }
308    integer_value_is_safe_for_float_field(value)
309}
310
311const MAX_SAFE_INTEGER_F64_U128: u128 = 9_007_199_254_740_992;
312const I64_MIN_F64: f64 = -9_223_372_036_854_775_808.0;
313const I64_EXCLUSIVE_MAX_F64: f64 = 9_223_372_036_854_775_808.0;
314const U64_EXCLUSIVE_MAX_F64: f64 = 18_446_744_073_709_551_616.0;
315
316/// Extracts a finite floating-point literal from a filter value.
317#[inline]
318fn finite_float_value(value: &Value) -> Option<f64> {
319    let number = value.to::<f64>().ok()?;
320    number.is_finite().then_some(number)
321}
322
323/// Returns `true` when a float literal can be compared exactly to an integer field.
324fn float_value_fits_integer_field(value: &Value, field_type: DataType) -> bool {
325    let Some(number) = finite_float_value(value) else {
326        return false;
327    };
328    if number.fract() != 0.0 {
329        return false;
330    }
331    if matches!(field_type, DataType::Int128 | DataType::UInt128) {
332        return false;
333    }
334    if matches!(
335        field_type,
336        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::IntSize
337    ) {
338        return (I64_MIN_F64..I64_EXCLUSIVE_MAX_F64).contains(&number);
339    }
340    matches!(
341        field_type,
342        DataType::UInt8
343            | DataType::UInt16
344            | DataType::UInt32
345            | DataType::UInt64
346            | DataType::UIntSize
347    ) && (0.0..U64_EXCLUSIVE_MAX_F64).contains(&number)
348}
349
350/// Returns `true` when an integer literal can be compared exactly to a float field.
351fn integer_value_is_safe_for_float_field(value: &Value) -> bool {
352    if let Ok(value) = value.to::<i128>() {
353        return value.unsigned_abs() <= MAX_SAFE_INTEGER_F64_U128;
354    }
355    value
356        .to::<u128>()
357        .is_ok_and(|value| value <= MAX_SAFE_INTEGER_F64_U128)
358}