1use qubit_datatype::DataType;
13use qubit_value::Value;
14
15use super::metadata_field::MetadataField;
16use super::metadata_schema::MetadataSchema;
17use super::unknown_field_policy::UnknownFieldPolicy;
18use crate::{
19 Condition,
20 MetadataError,
21 MetadataFilter,
22 MetadataResult,
23 MetadataValidationError,
24 MetadataValidationResult,
25 NumberComparisonPolicy,
26};
27
28impl MetadataSchema {
29 pub fn validate_filter(&self, filter: &MetadataFilter) -> MetadataValidationResult<()> {
38 let mut issues = Vec::new();
39 let number_comparison_policy = filter.options().number_comparison_policy;
40 if let Err(error) = filter.visit_conditions(|condition| {
41 self.collect_condition_issues(condition, number_comparison_policy, &mut issues);
42 Ok(())
43 }) {
44 issues.push(error);
45 }
46 if let Some(error) = MetadataValidationError::from_issues(issues) {
47 Err(error)
48 } else {
49 Ok(())
50 }
51 }
52
53 fn collect_condition_issues(
55 &self,
56 condition: &Condition,
57 number_comparison_policy: NumberComparisonPolicy,
58 issues: &mut Vec<MetadataError>,
59 ) {
60 match condition {
61 Condition::Equal { key, value } => collect_issue(
62 issues,
63 self.validate_value_condition(key, "eq", value, number_comparison_policy),
64 ),
65 Condition::NotEqual { key, value } => collect_issue(
66 issues,
67 self.validate_value_condition(key, "ne", value, number_comparison_policy),
68 ),
69 Condition::Less { key, value } => collect_issue(
70 issues,
71 self.validate_range_condition(key, "lt", value, number_comparison_policy),
72 ),
73 Condition::LessEqual { key, value } => collect_issue(
74 issues,
75 self.validate_range_condition(key, "le", value, number_comparison_policy),
76 ),
77 Condition::Greater { key, value } => collect_issue(
78 issues,
79 self.validate_range_condition(key, "gt", value, number_comparison_policy),
80 ),
81 Condition::GreaterEqual { key, value } => collect_issue(
82 issues,
83 self.validate_range_condition(key, "ge", value, number_comparison_policy),
84 ),
85 Condition::In { key, values } => {
86 self.collect_set_value_condition_issues(key, "in_set", values, number_comparison_policy, issues);
87 }
88 Condition::NotIn { key, values } => {
89 self.collect_set_value_condition_issues(key, "not_in_set", values, number_comparison_policy, issues);
90 }
91 Condition::Exists { key } | Condition::NotExists { key } => {
92 collect_issue(issues, self.filter_field(key).map(|_| ()));
93 }
94 }
95 }
96
97 fn collect_set_value_condition_issues(
99 &self,
100 key: &str,
101 operator: &'static str,
102 values: &[Value],
103 number_comparison_policy: NumberComparisonPolicy,
104 issues: &mut Vec<MetadataError>,
105 ) {
106 let field = match self.filter_field(key) {
107 Ok(field) => field,
108 Err(error) => {
109 issues.push(error);
110 return;
111 }
112 };
113 let Some(field) = field else {
114 return;
115 };
116 for value in values {
117 if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
118 continue;
119 }
120 issues.push(MetadataError::InvalidFilterOperator {
121 key: key.to_string(),
122 operator,
123 data_type: field.data_type(),
124 message: format!(
125 "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
126 value.data_type(),
127 field.data_type(),
128 number_comparison_policy
129 ),
130 });
131 }
132 }
133
134 fn validate_value_condition(
136 &self,
137 key: &str,
138 operator: &'static str,
139 value: &Value,
140 number_comparison_policy: NumberComparisonPolicy,
141 ) -> MetadataResult<()> {
142 let Some(field) = self.filter_field(key)? else {
143 return Ok(());
144 };
145 if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
146 return Ok(());
147 }
148 Err(MetadataError::InvalidFilterOperator {
149 key: key.to_string(),
150 operator,
151 data_type: field.data_type(),
152 message: format!(
153 "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
154 value.data_type(),
155 field.data_type(),
156 number_comparison_policy
157 ),
158 })
159 }
160
161 fn validate_range_condition(
163 &self,
164 key: &str,
165 operator: &'static str,
166 value: &Value,
167 number_comparison_policy: NumberComparisonPolicy,
168 ) -> MetadataResult<()> {
169 let Some(field) = self.filter_field(key)? else {
170 return Ok(());
171 };
172 if !is_range_comparable_type(field.data_type()) {
173 return Err(MetadataError::InvalidFilterOperator {
174 key: key.to_string(),
175 operator,
176 data_type: field.data_type(),
177 message: "range operators require a numeric or string field".to_string(),
178 });
179 }
180 if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
181 return Ok(());
182 }
183 Err(MetadataError::InvalidFilterOperator {
184 key: key.to_string(),
185 operator,
186 data_type: field.data_type(),
187 message: format!(
188 "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
189 value.data_type(),
190 field.data_type(),
191 number_comparison_policy
192 ),
193 })
194 }
195
196 fn filter_field(&self, key: &str) -> MetadataResult<Option<&MetadataField>> {
198 match self.field(key) {
199 Some(field) => Ok(Some(field)),
200 None if matches!(self.unknown_field_policy(), UnknownFieldPolicy::Allow) => Ok(None),
201 None => Err(MetadataError::UnknownFilterField { key: key.to_string() }),
202 }
203 }
204}
205
206#[inline]
208fn collect_issue(issues: &mut Vec<MetadataError>, result: MetadataResult<()>) {
209 if let Err(error) = result {
210 issues.push(error);
211 }
212}
213
214#[inline]
216fn is_numeric_data_type(data_type: DataType) -> bool {
217 matches!(
218 data_type,
219 DataType::Int8
220 | DataType::Int16
221 | DataType::Int32
222 | DataType::Int64
223 | DataType::Int128
224 | DataType::UInt8
225 | DataType::UInt16
226 | DataType::UInt32
227 | DataType::UInt64
228 | DataType::UInt128
229 | DataType::Float32
230 | DataType::Float64
231 | DataType::BigInteger
232 | DataType::BigDecimal
233 | DataType::IntSize
234 | DataType::UIntSize
235 )
236}
237
238#[inline]
240fn is_float_data_type(data_type: DataType) -> bool {
241 matches!(data_type, DataType::Float32 | DataType::Float64)
242}
243
244#[inline]
246fn is_big_number_data_type(data_type: DataType) -> bool {
247 matches!(data_type, DataType::BigInteger | DataType::BigDecimal)
248}
249
250#[inline]
252fn is_range_comparable_type(data_type: DataType) -> bool {
253 is_numeric_data_type(data_type) || matches!(data_type, DataType::String)
254}
255
256#[inline]
258fn value_matches_field_type(
259 value: &Value,
260 field_type: DataType,
261 number_comparison_policy: NumberComparisonPolicy,
262) -> bool {
263 let value_type = value.data_type();
264 if value_type == field_type {
265 return true;
266 }
267 if !is_numeric_data_type(value_type) || !is_numeric_data_type(field_type) {
268 return false;
269 }
270 if matches!(number_comparison_policy, NumberComparisonPolicy::Approximate) {
271 return true;
272 }
273 value_matches_numeric_field_conservatively(value, field_type)
274}
275
276fn value_matches_numeric_field_conservatively(value: &Value, field_type: DataType) -> bool {
278 let value_type = value.data_type();
279 if !is_float_data_type(value_type) && !is_float_data_type(field_type) {
280 return true;
281 }
282 if is_float_data_type(value_type) && is_float_data_type(field_type) {
283 return true;
284 }
285 if is_big_number_data_type(value_type) || is_big_number_data_type(field_type) {
286 return false;
287 }
288 if is_float_data_type(value_type) {
289 return float_value_fits_integer_field(value, field_type);
290 }
291 integer_value_is_safe_for_float_field(value)
292}
293
294const MAX_SAFE_INTEGER_F64_U128: u128 = 9_007_199_254_740_992;
295const I64_MIN_F64: f64 = -9_223_372_036_854_775_808.0;
296const I64_EXCLUSIVE_MAX_F64: f64 = 9_223_372_036_854_775_808.0;
297const U64_EXCLUSIVE_MAX_F64: f64 = 18_446_744_073_709_551_616.0;
298
299#[inline]
301fn finite_float_value(value: &Value) -> Option<f64> {
302 let number = value.to::<f64>().ok()?;
303 number.is_finite().then_some(number)
304}
305
306fn float_value_fits_integer_field(value: &Value, field_type: DataType) -> bool {
308 let Some(number) = finite_float_value(value) else {
309 return false;
310 };
311 if number.fract() != 0.0 {
312 return false;
313 }
314 if matches!(field_type, DataType::Int128 | DataType::UInt128) {
315 return false;
316 }
317 if matches!(
318 field_type,
319 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::IntSize
320 ) {
321 return (I64_MIN_F64..I64_EXCLUSIVE_MAX_F64).contains(&number);
322 }
323 matches!(
324 field_type,
325 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | DataType::UIntSize
326 ) && (0.0..U64_EXCLUSIVE_MAX_F64).contains(&number)
327}
328
329fn integer_value_is_safe_for_float_field(value: &Value) -> bool {
331 if let Ok(value) = value.to::<i128>() {
332 return value.unsigned_abs() <= MAX_SAFE_INTEGER_F64_U128;
333 }
334 value.to::<u128>().is_ok_and(|value| value <= MAX_SAFE_INTEGER_F64_U128)
335}