1use qubit_datatype::DataType;
13use qubit_value::Value;
14
15use super::metadata_field::MetadataField;
16use super::metadata_schema::MetadataSchema;
17use super::unknown_field_policy::UnknownFieldPolicy;
18use crate::{
19 Condition,
20 MetadataError,
21 MetadataFilter,
22 MetadataResult,
23 MetadataValidationError,
24 MetadataValidationResult,
25 NumberComparisonPolicy,
26};
27
28impl MetadataSchema {
29 pub fn validate_filter(&self, filter: &MetadataFilter) -> MetadataValidationResult<()> {
38 let mut issues = Vec::new();
39 let number_comparison_policy = filter.options().number_comparison_policy;
40 if let Err(error) = filter.visit_conditions(|condition| {
41 self.collect_condition_issues(condition, number_comparison_policy, &mut issues);
42 Ok(())
43 }) {
44 issues.push(error);
45 }
46 if let Some(error) = MetadataValidationError::from_issues(issues) {
47 Err(error)
48 } else {
49 Ok(())
50 }
51 }
52
53 fn collect_condition_issues(
55 &self,
56 condition: &Condition,
57 number_comparison_policy: NumberComparisonPolicy,
58 issues: &mut Vec<MetadataError>,
59 ) {
60 match condition {
61 Condition::Equal { key, value } => collect_issue(
62 issues,
63 self.validate_value_condition(key, "eq", value, number_comparison_policy),
64 ),
65 Condition::NotEqual { key, value } => collect_issue(
66 issues,
67 self.validate_value_condition(key, "ne", value, number_comparison_policy),
68 ),
69 Condition::Less { key, value } => collect_issue(
70 issues,
71 self.validate_range_condition(key, "lt", value, number_comparison_policy),
72 ),
73 Condition::LessEqual { key, value } => collect_issue(
74 issues,
75 self.validate_range_condition(key, "le", value, number_comparison_policy),
76 ),
77 Condition::Greater { key, value } => collect_issue(
78 issues,
79 self.validate_range_condition(key, "gt", value, number_comparison_policy),
80 ),
81 Condition::GreaterEqual { key, value } => collect_issue(
82 issues,
83 self.validate_range_condition(key, "ge", value, number_comparison_policy),
84 ),
85 Condition::In { key, values } => {
86 self.collect_set_value_condition_issues(
87 key,
88 "in_set",
89 values,
90 number_comparison_policy,
91 issues,
92 );
93 }
94 Condition::NotIn { key, values } => {
95 self.collect_set_value_condition_issues(
96 key,
97 "not_in_set",
98 values,
99 number_comparison_policy,
100 issues,
101 );
102 }
103 Condition::Exists { key } | Condition::NotExists { key } => {
104 collect_issue(issues, self.filter_field(key).map(|_| ()));
105 }
106 }
107 }
108
109 fn collect_set_value_condition_issues(
111 &self,
112 key: &str,
113 operator: &'static str,
114 values: &[Value],
115 number_comparison_policy: NumberComparisonPolicy,
116 issues: &mut Vec<MetadataError>,
117 ) {
118 let field = match self.filter_field(key) {
119 Ok(field) => field,
120 Err(error) => {
121 issues.push(error);
122 return;
123 }
124 };
125 let Some(field) = field else {
126 return;
127 };
128 for value in values {
129 if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
130 continue;
131 }
132 issues.push(MetadataError::InvalidFilterOperator {
133 key: key.to_string(),
134 operator,
135 data_type: field.data_type(),
136 message: format!(
137 "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
138 value.data_type(),
139 field.data_type(),
140 number_comparison_policy
141 ),
142 });
143 }
144 }
145
146 fn validate_value_condition(
148 &self,
149 key: &str,
150 operator: &'static str,
151 value: &Value,
152 number_comparison_policy: NumberComparisonPolicy,
153 ) -> MetadataResult<()> {
154 let Some(field) = self.filter_field(key)? else {
155 return Ok(());
156 };
157 if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
158 return Ok(());
159 }
160 Err(MetadataError::InvalidFilterOperator {
161 key: key.to_string(),
162 operator,
163 data_type: field.data_type(),
164 message: format!(
165 "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
166 value.data_type(),
167 field.data_type(),
168 number_comparison_policy
169 ),
170 })
171 }
172
173 fn validate_range_condition(
175 &self,
176 key: &str,
177 operator: &'static str,
178 value: &Value,
179 number_comparison_policy: NumberComparisonPolicy,
180 ) -> MetadataResult<()> {
181 let Some(field) = self.filter_field(key)? else {
182 return Ok(());
183 };
184 if !is_range_comparable_type(field.data_type()) {
185 return Err(MetadataError::InvalidFilterOperator {
186 key: key.to_string(),
187 operator,
188 data_type: field.data_type(),
189 message: "range operators require a numeric or string field".to_string(),
190 });
191 }
192 if value_matches_field_type(value, field.data_type(), number_comparison_policy) {
193 return Ok(());
194 }
195 Err(MetadataError::InvalidFilterOperator {
196 key: key.to_string(),
197 operator,
198 data_type: field.data_type(),
199 message: format!(
200 "filter value type {} is not compatible with field type {} under {:?} number comparison policy",
201 value.data_type(),
202 field.data_type(),
203 number_comparison_policy
204 ),
205 })
206 }
207
208 fn filter_field(&self, key: &str) -> MetadataResult<Option<&MetadataField>> {
210 match self.field(key) {
211 Some(field) => Ok(Some(field)),
212 None if matches!(self.unknown_field_policy(), UnknownFieldPolicy::Allow) => Ok(None),
213 None => Err(MetadataError::UnknownFilterField {
214 key: key.to_string(),
215 }),
216 }
217 }
218}
219
220#[inline]
222fn collect_issue(issues: &mut Vec<MetadataError>, result: MetadataResult<()>) {
223 if let Err(error) = result {
224 issues.push(error);
225 }
226}
227
228#[inline]
230fn is_numeric_data_type(data_type: DataType) -> bool {
231 matches!(
232 data_type,
233 DataType::Int8
234 | DataType::Int16
235 | DataType::Int32
236 | DataType::Int64
237 | DataType::Int128
238 | DataType::UInt8
239 | DataType::UInt16
240 | DataType::UInt32
241 | DataType::UInt64
242 | DataType::UInt128
243 | DataType::Float32
244 | DataType::Float64
245 | DataType::BigInteger
246 | DataType::BigDecimal
247 | DataType::IntSize
248 | DataType::UIntSize
249 )
250}
251
252#[inline]
254fn is_float_data_type(data_type: DataType) -> bool {
255 matches!(data_type, DataType::Float32 | DataType::Float64)
256}
257
258#[inline]
260fn is_big_number_data_type(data_type: DataType) -> bool {
261 matches!(data_type, DataType::BigInteger | DataType::BigDecimal)
262}
263
264#[inline]
266fn is_range_comparable_type(data_type: DataType) -> bool {
267 is_numeric_data_type(data_type) || matches!(data_type, DataType::String)
268}
269
270#[inline]
272fn value_matches_field_type(
273 value: &Value,
274 field_type: DataType,
275 number_comparison_policy: NumberComparisonPolicy,
276) -> bool {
277 let value_type = value.data_type();
278 if value_type == field_type {
279 return true;
280 }
281 if !is_numeric_data_type(value_type) || !is_numeric_data_type(field_type) {
282 return false;
283 }
284 if matches!(
285 number_comparison_policy,
286 NumberComparisonPolicy::Approximate
287 ) {
288 return true;
289 }
290 value_matches_numeric_field_conservatively(value, field_type)
291}
292
293fn value_matches_numeric_field_conservatively(value: &Value, field_type: DataType) -> bool {
295 let value_type = value.data_type();
296 if !is_float_data_type(value_type) && !is_float_data_type(field_type) {
297 return true;
298 }
299 if is_float_data_type(value_type) && is_float_data_type(field_type) {
300 return true;
301 }
302 if is_big_number_data_type(value_type) || is_big_number_data_type(field_type) {
303 return false;
304 }
305 if is_float_data_type(value_type) {
306 return float_value_fits_integer_field(value, field_type);
307 }
308 integer_value_is_safe_for_float_field(value)
309}
310
311const MAX_SAFE_INTEGER_F64_U128: u128 = 9_007_199_254_740_992;
312const I64_MIN_F64: f64 = -9_223_372_036_854_775_808.0;
313const I64_EXCLUSIVE_MAX_F64: f64 = 9_223_372_036_854_775_808.0;
314const U64_EXCLUSIVE_MAX_F64: f64 = 18_446_744_073_709_551_616.0;
315
316#[inline]
318fn finite_float_value(value: &Value) -> Option<f64> {
319 let number = value.to::<f64>().ok()?;
320 number.is_finite().then_some(number)
321}
322
323fn float_value_fits_integer_field(value: &Value, field_type: DataType) -> bool {
325 let Some(number) = finite_float_value(value) else {
326 return false;
327 };
328 if number.fract() != 0.0 {
329 return false;
330 }
331 if matches!(field_type, DataType::Int128 | DataType::UInt128) {
332 return false;
333 }
334 if matches!(
335 field_type,
336 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::IntSize
337 ) {
338 return (I64_MIN_F64..I64_EXCLUSIVE_MAX_F64).contains(&number);
339 }
340 matches!(
341 field_type,
342 DataType::UInt8
343 | DataType::UInt16
344 | DataType::UInt32
345 | DataType::UInt64
346 | DataType::UIntSize
347 ) && (0.0..U64_EXCLUSIVE_MAX_F64).contains(&number)
348}
349
350fn integer_value_is_safe_for_float_field(value: &Value) -> bool {
352 if let Ok(value) = value.to::<i128>() {
353 return value.unsigned_abs() <= MAX_SAFE_INTEGER_F64_U128;
354 }
355 value
356 .to::<u128>()
357 .is_ok_and(|value| value <= MAX_SAFE_INTEGER_F64_U128)
358}