Skip to main content

qubit_metadata/schema/
filter_validation.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Filter validation support for [`MetadataSchema`].
11
12use qubit_datatype::DataType;
13use qubit_value::Value;
14
15use super::metadata_field::MetadataField;
16use super::metadata_schema::MetadataSchema;
17use super::unknown_field_policy::UnknownFieldPolicy;
18use crate::{Condition, MetadataError, MetadataFilter, MetadataResult};
19
20impl MetadataSchema {
21    /// Validates a metadata filter against this schema.
22    ///
23    /// # Errors
24    ///
25    /// Returns an error when the filter references an unknown field and this
26    /// schema rejects unknown fields, uses a range operator on a non-comparable
27    /// declared field, or compares a declared field with an incompatible value
28    /// type. Unknown filter fields are accepted when the schema's
29    /// [`UnknownFieldPolicy`] is [`UnknownFieldPolicy::Allow`].
30    pub fn validate_filter(&self, filter: &MetadataFilter) -> MetadataResult<()> {
31        filter.visit_conditions(|condition| self.validate_condition(condition))
32    }
33
34    /// Validates one filter condition against this schema.
35    fn validate_condition(&self, condition: &Condition) -> MetadataResult<()> {
36        match condition {
37            Condition::Equal { key, value } => self.validate_value_condition(key, "eq", value),
38            Condition::NotEqual { key, value } => self.validate_value_condition(key, "ne", value),
39            Condition::Less { key, value } => self.validate_range_condition(key, "lt", value),
40            Condition::LessEqual { key, value } => self.validate_range_condition(key, "le", value),
41            Condition::Greater { key, value } => self.validate_range_condition(key, "gt", value),
42            Condition::GreaterEqual { key, value } => {
43                self.validate_range_condition(key, "ge", value)
44            }
45            Condition::In { key, values } => {
46                for value in values {
47                    self.validate_value_condition(key, "in_set", value)?;
48                }
49                Ok(())
50            }
51            Condition::NotIn { key, values } => {
52                for value in values {
53                    self.validate_value_condition(key, "not_in_set", value)?;
54                }
55                Ok(())
56            }
57            Condition::Exists { key } | Condition::NotExists { key } => {
58                self.filter_field(key)?;
59                Ok(())
60            }
61        }
62    }
63
64    /// Validates a non-range value condition.
65    fn validate_value_condition(
66        &self,
67        key: &str,
68        operator: &'static str,
69        value: &Value,
70    ) -> MetadataResult<()> {
71        let Some(field) = self.filter_field(key)? else {
72            return Ok(());
73        };
74        if value_matches_field_type(value, field.data_type()) {
75            return Ok(());
76        }
77        Err(MetadataError::InvalidFilterOperator {
78            key: key.to_string(),
79            operator,
80            data_type: field.data_type(),
81            message: format!(
82                "filter value type {} is not compatible with field type {}",
83                value.data_type(),
84                field.data_type()
85            ),
86        })
87    }
88
89    /// Validates a range value condition.
90    fn validate_range_condition(
91        &self,
92        key: &str,
93        operator: &'static str,
94        value: &Value,
95    ) -> MetadataResult<()> {
96        let Some(field) = self.filter_field(key)? else {
97            return Ok(());
98        };
99        if !is_range_comparable_type(field.data_type()) {
100            return Err(MetadataError::InvalidFilterOperator {
101                key: key.to_string(),
102                operator,
103                data_type: field.data_type(),
104                message: "range operators require a numeric or string field".to_string(),
105            });
106        }
107        if value_matches_field_type(value, field.data_type()) {
108            return Ok(());
109        }
110        Err(MetadataError::InvalidFilterOperator {
111            key: key.to_string(),
112            operator,
113            data_type: field.data_type(),
114            message: format!(
115                "filter value type {} is not compatible with field type {}",
116                value.data_type(),
117                field.data_type()
118            ),
119        })
120    }
121
122    /// Returns the declared filter field, or accepts unknown fields when allowed.
123    fn filter_field(&self, key: &str) -> MetadataResult<Option<&MetadataField>> {
124        match self.field(key) {
125            Some(field) => Ok(Some(field)),
126            None if matches!(self.unknown_field_policy(), UnknownFieldPolicy::Allow) => Ok(None),
127            None => Err(MetadataError::UnknownFilterField {
128                key: key.to_string(),
129            }),
130        }
131    }
132}
133
134/// Returns `true` when `data_type` is numeric.
135#[inline]
136fn is_numeric_data_type(data_type: DataType) -> bool {
137    matches!(
138        data_type,
139        DataType::Int8
140            | DataType::Int16
141            | DataType::Int32
142            | DataType::Int64
143            | DataType::Int128
144            | DataType::UInt8
145            | DataType::UInt16
146            | DataType::UInt32
147            | DataType::UInt64
148            | DataType::UInt128
149            | DataType::Float32
150            | DataType::Float64
151            | DataType::BigInteger
152            | DataType::BigDecimal
153            | DataType::IntSize
154            | DataType::UIntSize
155    )
156}
157
158/// Returns `true` when `data_type` supports range comparisons.
159#[inline]
160fn is_range_comparable_type(data_type: DataType) -> bool {
161    is_numeric_data_type(data_type) || matches!(data_type, DataType::String)
162}
163
164/// Returns `true` when a filter value is compatible with a schema field type.
165#[inline]
166fn value_matches_field_type(value: &Value, field_type: DataType) -> bool {
167    let value_type = value.data_type();
168    value_type == field_type || is_numeric_data_type(value_type) && is_numeric_data_type(field_type)
169}