Skip to main content

qubit_metadata/schema/
filter_validation.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Filter validation support for [`MetadataSchema`].
11
12use qubit_datatype::DataType;
13use qubit_value::Value;
14
15use super::metadata_field::MetadataField;
16use super::metadata_schema::MetadataSchema;
17use super::unknown_field_policy::UnknownFieldPolicy;
18use crate::{
19    Condition,
20    MetadataError,
21    MetadataFilter,
22    MetadataResult,
23};
24
25impl MetadataSchema {
26    /// Validates a metadata filter against this schema.
27    ///
28    /// # Errors
29    ///
30    /// Returns an error when the filter references an unknown field and this
31    /// schema rejects unknown fields, uses a range operator on a non-comparable
32    /// declared field, or compares a declared field with an incompatible value
33    /// type. Unknown filter fields are accepted when the schema's
34    /// [`UnknownFieldPolicy`] is [`UnknownFieldPolicy::Allow`].
35    pub fn validate_filter(&self, filter: &MetadataFilter) -> MetadataResult<()> {
36        filter.visit_conditions(|condition| self.validate_condition(condition))
37    }
38
39    /// Validates one filter condition against this schema.
40    fn validate_condition(&self, condition: &Condition) -> MetadataResult<()> {
41        match condition {
42            Condition::Equal { key, value } => self.validate_value_condition(key, "eq", value),
43            Condition::NotEqual { key, value } => self.validate_value_condition(key, "ne", value),
44            Condition::Less { key, value } => self.validate_range_condition(key, "lt", value),
45            Condition::LessEqual { key, value } => self.validate_range_condition(key, "le", value),
46            Condition::Greater { key, value } => self.validate_range_condition(key, "gt", value),
47            Condition::GreaterEqual { key, value } => {
48                self.validate_range_condition(key, "ge", value)
49            }
50            Condition::In { key, values } => {
51                for value in values {
52                    self.validate_value_condition(key, "in_set", value)?;
53                }
54                Ok(())
55            }
56            Condition::NotIn { key, values } => {
57                for value in values {
58                    self.validate_value_condition(key, "not_in_set", value)?;
59                }
60                Ok(())
61            }
62            Condition::Exists { key } | Condition::NotExists { key } => {
63                self.filter_field(key)?;
64                Ok(())
65            }
66        }
67    }
68
69    /// Validates a non-range value condition.
70    fn validate_value_condition(
71        &self,
72        key: &str,
73        operator: &'static str,
74        value: &Value,
75    ) -> MetadataResult<()> {
76        let Some(field) = self.filter_field(key)? else {
77            return Ok(());
78        };
79        if value_matches_field_type(value, field.data_type()) {
80            return Ok(());
81        }
82        Err(MetadataError::InvalidFilterOperator {
83            key: key.to_string(),
84            operator,
85            data_type: field.data_type(),
86            message: format!(
87                "filter value type {} is not compatible with field type {}",
88                value.data_type(),
89                field.data_type()
90            ),
91        })
92    }
93
94    /// Validates a range value condition.
95    fn validate_range_condition(
96        &self,
97        key: &str,
98        operator: &'static str,
99        value: &Value,
100    ) -> MetadataResult<()> {
101        let Some(field) = self.filter_field(key)? else {
102            return Ok(());
103        };
104        if !is_range_comparable_type(field.data_type()) {
105            return Err(MetadataError::InvalidFilterOperator {
106                key: key.to_string(),
107                operator,
108                data_type: field.data_type(),
109                message: "range operators require a numeric or string field".to_string(),
110            });
111        }
112        if value_matches_field_type(value, field.data_type()) {
113            return Ok(());
114        }
115        Err(MetadataError::InvalidFilterOperator {
116            key: key.to_string(),
117            operator,
118            data_type: field.data_type(),
119            message: format!(
120                "filter value type {} is not compatible with field type {}",
121                value.data_type(),
122                field.data_type()
123            ),
124        })
125    }
126
127    /// Returns the declared filter field, or accepts unknown fields when allowed.
128    fn filter_field(&self, key: &str) -> MetadataResult<Option<&MetadataField>> {
129        match self.field(key) {
130            Some(field) => Ok(Some(field)),
131            None if matches!(self.unknown_field_policy(), UnknownFieldPolicy::Allow) => Ok(None),
132            None => Err(MetadataError::UnknownFilterField {
133                key: key.to_string(),
134            }),
135        }
136    }
137}
138
139/// Returns `true` when `data_type` is numeric.
140#[inline]
141fn is_numeric_data_type(data_type: DataType) -> bool {
142    matches!(
143        data_type,
144        DataType::Int8
145            | DataType::Int16
146            | DataType::Int32
147            | DataType::Int64
148            | DataType::Int128
149            | DataType::UInt8
150            | DataType::UInt16
151            | DataType::UInt32
152            | DataType::UInt64
153            | DataType::UInt128
154            | DataType::Float32
155            | DataType::Float64
156            | DataType::BigInteger
157            | DataType::BigDecimal
158            | DataType::IntSize
159            | DataType::UIntSize
160    )
161}
162
163/// Returns `true` when `data_type` supports range comparisons.
164#[inline]
165fn is_range_comparable_type(data_type: DataType) -> bool {
166    is_numeric_data_type(data_type) || matches!(data_type, DataType::String)
167}
168
169/// Returns `true` when a filter value is compatible with a schema field type.
170#[inline]
171fn value_matches_field_type(value: &Value, field_type: DataType) -> bool {
172    let value_type = value.data_type();
173    value_type == field_type || is_numeric_data_type(value_type) && is_numeric_data_type(field_type)
174}