1use super::{Evaluator, types::*};
2use serde_json::{Value, Map as JsonMap};
3use super::super::compiled::CompiledLogic;
4use super::helpers;
5
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8
9#[cfg(feature = "parallel")]
10const PARALLEL_THRESHOLD: usize = 1000; impl Evaluator {
13 pub(super) fn eval_quantifier(&self, quantifier: Quantifier, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
15 let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
16 if let Value::Array(arr) = array_val {
17 #[cfg(feature = "parallel")]
19 if arr.len() >= PARALLEL_THRESHOLD {
20 let result = match quantifier {
21 Quantifier::All => {
22 arr.par_iter()
23 .try_fold(|| true, |_, item| -> Result<bool, String> {
24 let result = self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?;
26 Ok(helpers::is_truthy(&result))
27 })
28 .try_reduce(|| true, |a, b| -> Result<bool, String> { Ok(a && b) })?
29 },
30 Quantifier::Some => {
31 arr.par_iter()
32 .try_fold(|| false, |_, item| -> Result<bool, String> {
33 let result = self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?;
34 Ok(helpers::is_truthy(&result))
35 })
36 .try_reduce(|| false, |a, b| -> Result<bool, String> { Ok(a || b) })?
37 },
38 Quantifier::None => {
39 arr.par_iter()
40 .try_fold(|| true, |_, item| -> Result<bool, String> {
41 let result = self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?;
42 Ok(!helpers::is_truthy(&result))
43 })
44 .try_reduce(|| true, |a, b| -> Result<bool, String> { Ok(a && b) })?
45 }
46 };
47 return Ok(Value::Bool(result));
48 }
49
50 for item in arr {
51 let result = self.evaluate_with_context(logic_expr, &item, &Value::Null, depth + 1)?;
52 let truthy = helpers::is_truthy(&result);
53 match quantifier {
54 Quantifier::All if !truthy => return Ok(Value::Bool(false)),
55 Quantifier::Some if truthy => return Ok(Value::Bool(true)),
56 Quantifier::None if truthy => return Ok(Value::Bool(false)),
57 _ => {}
58 }
59 }
60 Ok(Value::Bool(match quantifier {
61 Quantifier::All => true,
62 Quantifier::Some => false,
63 Quantifier::None => true,
64 }))
65 } else {
66 Ok(Value::Bool(match quantifier {
67 Quantifier::All | Quantifier::Some => false,
68 Quantifier::None => true,
69 }))
70 }
71 }
72
73 pub(super) fn eval_map(&self, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
75 let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
76 if let Value::Array(arr) = array_val {
77 #[cfg(feature = "parallel")]
79 if arr.len() >= PARALLEL_THRESHOLD {
80 let results: Result<Vec<_>, String> = arr.par_iter()
81 .map(|item| self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1))
82 .collect();
83 return Ok(Value::Array(results?));
84 }
85
86 let mut results = Vec::with_capacity(arr.len());
87 for item in &arr {
88 results.push(self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?);
89 }
90 Ok(Value::Array(results))
91 } else {
92 Ok(Value::Array(vec![]))
93 }
94 }
95
96 pub(super) fn eval_filter(&self, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
98 let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
99 if let Value::Array(arr) = array_val {
100 #[cfg(feature = "parallel")]
102 if arr.len() >= PARALLEL_THRESHOLD {
103 let results: Result<Vec<_>, String> = arr.into_par_iter()
104 .filter_map(|item| {
105 match self.evaluate_with_context(logic_expr, &item, &Value::Null, depth + 1) {
106 Ok(result) if helpers::is_truthy(&result) => Some(Ok(item)),
107 Ok(_) => None,
108 Err(e) => Some(Err(e)),
109 }
110 })
111 .collect();
112 return Ok(Value::Array(results?));
113 }
114
115 let mut results = Vec::with_capacity(arr.len());
116 for item in arr.into_iter() {
117 let result = self.evaluate_with_context(logic_expr, &item, &Value::Null, depth + 1)?;
118 if helpers::is_truthy(&result) {
119 results.push(item);
120 }
121 }
122 Ok(Value::Array(results))
123 } else {
124 Ok(Value::Array(vec![]))
125 }
126 }
127
128 pub(super) fn eval_reduce(&self, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, initial_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
130 let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
131 let mut accumulator = self.evaluate_with_context(initial_expr, user_data, internal_context, depth + 1)?;
132
133 if let Value::Array(arr) = array_val {
134 for item in arr {
135 let mut context = JsonMap::with_capacity(2);
137 context.insert("current".to_string(), item);
138 context.insert("accumulator".to_string(), accumulator);
139 let combined = Value::Object(context);
140 accumulator = self.evaluate_with_context(logic_expr, &combined, &Value::Null, depth + 1)?;
141 }
142 }
143 Ok(accumulator)
144 }
145
146 pub(super) fn eval_merge(&self, items: &[CompiledLogic], user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
148 let mut merged = Vec::new();
149 for item in items {
150 let val = self.evaluate_with_context(item, user_data, internal_context, depth + 1)?;
151 if let Value::Array(arr) = val {
152 merged.extend(arr);
153 } else {
154 merged.push(val);
155 }
156 }
157 Ok(Value::Array(merged))
158 }
159
160 pub(super) fn eval_in(&self, value_expr: &CompiledLogic, array_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
162 use ahash::{AHashSet, RandomState};
163
164 const HASH_SET_THRESHOLD: usize = 32;
165
166 let value = self.evaluate_with_context(value_expr, user_data, internal_context, depth + 1)?;
167 let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
168
169 if let Value::Array(arr) = array_val {
170 if arr.len() > HASH_SET_THRESHOLD {
171 if let Some(key) = helpers::scalar_hash_key(&value) {
172 let mut set = AHashSet::with_capacity_and_hasher(arr.len(), RandomState::new());
173 let mut all_scalar = true;
174 for item in &arr {
175 if let Some(item_key) = helpers::scalar_hash_key(item) {
176 set.insert(item_key);
177 } else {
178 all_scalar = false;
179 break;
180 }
181 }
182 if all_scalar {
183 return Ok(Value::Bool(set.contains(&key)));
184 }
185 }
186 }
187 for item in arr {
188 if helpers::loose_equal(&value, &item) {
189 return Ok(Value::Bool(true));
190 }
191 }
192 Ok(Value::Bool(false))
193 } else if let Value::String(s) = array_val {
194 if let Value::String(needle) = value {
195 Ok(Value::Bool(s.contains(&needle)))
196 } else {
197 Ok(Value::Bool(false))
198 }
199 } else {
200 Ok(Value::Bool(false))
201 }
202 }
203
204 pub(super) fn eval_sum(&self, array_expr: &CompiledLogic, field_expr: &Option<Box<CompiledLogic>>, threshold_expr: &Option<Box<CompiledLogic>>, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
206 let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
207
208 let threshold = if let Some(thresh_e) = threshold_expr {
210 let thresh_val = self.evaluate_with_context(thresh_e, user_data, internal_context, depth + 1)?;
211 helpers::to_f64(&thresh_val) as i64
212 } else {
213 -1 };
215
216 let sum = match &array_val {
217 Value::Array(arr) => {
218 let field_name_opt = if let Some(field_e) = field_expr {
220 let field_val = self.evaluate_with_context(field_e, user_data, internal_context, depth + 1)?;
221 match field_val {
222 Value::String(s) => Some(s),
223 Value::Null => None, _ => None,
225 }
226 } else {
227 None
228 };
229
230 let items_to_process = if threshold >= 0 {
232 let limit = (threshold as usize + 1).min(arr.len());
233 &arr[..limit]
234 } else {
235 arr
236 };
237
238 if let Some(field_name) = field_name_opt {
239 #[cfg(feature = "parallel")]
241 if items_to_process.len() >= PARALLEL_THRESHOLD {
242 items_to_process.par_iter()
243 .filter_map(|item| {
244 if let Value::Object(obj) = item {
245 obj.get(&field_name).map(|val| helpers::to_f64(val))
246 } else {
247 None
248 }
249 })
250 .sum()
251 } else {
252 let mut sum = 0.0_f64;
253 for item in items_to_process {
254 if let Value::Object(obj) = item {
255 if let Some(val) = obj.get(&field_name) {
256 sum += helpers::to_f64(val);
257 }
258 }
259 }
260 sum
261 }
262
263 #[cfg(not(feature = "parallel"))]
264 {
265 let mut sum = 0.0_f64;
266 for item in items_to_process {
267 if let Value::Object(obj) = item {
268 if let Some(val) = obj.get(&field_name) {
269 sum += helpers::to_f64(val);
270 }
271 }
272 }
273 sum
274 }
275 } else {
276 #[cfg(feature = "parallel")]
278 if items_to_process.len() >= PARALLEL_THRESHOLD {
279 items_to_process.par_iter().map(|item| helpers::to_f64(item)).sum()
280 } else {
281 items_to_process.iter().map(|item| helpers::to_f64(item)).sum()
282 }
283
284 #[cfg(not(feature = "parallel"))]
285 items_to_process.iter().map(|item| helpers::to_f64(item)).sum()
286 }
287 }
288 _ => helpers::to_f64(&array_val),
289 };
290
291 Ok(self.f64_to_json(sum))
292 }
293
294 pub(super) fn eval_for(&self, start_expr: &CompiledLogic, end_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
299 let next_depth = depth + 1;
300 let start_val = self.evaluate_with_context(start_expr, user_data, internal_context, next_depth)?;
301 let end_val = self.evaluate_with_context(end_expr, user_data, internal_context, next_depth)?;
302 let start = helpers::to_number(&start_val) as i64;
303 let end = helpers::to_number(&end_val) as i64;
304
305 let mut results = Vec::new();
307
308 for i in start..end {
310 let loop_context = serde_json::json!({
312 "$loopIteration": i
313 });
314
315 let result = self.evaluate_with_context(logic_expr, user_data, &loop_context, next_depth)?;
317 results.push(result);
318 }
319
320 Ok(Value::Array(results))
321 }
322
323 pub(super) fn eval_multiplies(&self, items: &[CompiledLogic], user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
326 if items.len() == 1 {
331 if let CompiledLogic::For(start_expr, end_expr, logic_expr) = &items[0] {
332 return self.eval_multiplies_for(start_expr, end_expr, logic_expr, user_data, internal_context, depth);
333 }
334 }
335
336 let values = self.flatten_array_values(items, user_data, internal_context, depth)?;
338 if values.is_empty() { return Ok(Value::Null); }
339 if values.len() == 1 { return Ok(self.f64_to_json(values[0])); }
340 let result = values.iter().skip(1).fold(values[0], |acc, n| acc * n);
341 Ok(self.f64_to_json(result))
342 }
343
344 fn eval_multiplies_for(&self, start_expr: &CompiledLogic, end_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
347 let next_depth = depth + 1;
348 let start_val = self.evaluate_with_context(start_expr, user_data, internal_context, next_depth)?;
349 let end_val = self.evaluate_with_context(end_expr, user_data, internal_context, next_depth)?;
350 let start = helpers::to_number(&start_val) as i64;
351 let end = helpers::to_number(&end_val) as i64;
352
353 if start >= end {
354 return Ok(Value::Null);
355 }
356
357 #[cfg(feature = "parallel")]
358 let range_size = (end - start) as usize;
359
360 #[cfg(feature = "parallel")]
362 if range_size >= PARALLEL_THRESHOLD {
363 let result = (start..end)
364 .into_par_iter()
365 .try_fold(|| 1.0_f64, |product, i| -> Result<f64, String> {
366 let loop_context = serde_json::json!({
367 "$loopIteration": i
368 });
369 let val = self.evaluate_with_context(logic_expr, user_data, &loop_context, next_depth)?;
370 Ok(product * helpers::to_f64(&val))
371 })
372 .try_reduce(|| 1.0, |a, b| -> Result<f64, String> { Ok(a * b) })?;
373
374 return Ok(self.f64_to_json(result));
375 }
376
377 let mut product = 1.0_f64;
379 for i in start..end {
380 let loop_context = serde_json::json!({
381 "$loopIteration": i
382 });
383 let val = self.evaluate_with_context(logic_expr, user_data, &loop_context, next_depth)?;
384 product *= helpers::to_f64(&val);
385 }
386 Ok(self.f64_to_json(product))
387 }
388
389 pub(super) fn eval_divides(&self, items: &[CompiledLogic], user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
391 let values = self.flatten_array_values(items, user_data, internal_context, depth)?;
392 if values.is_empty() { return Ok(Value::Null); }
393 if values.len() == 1 { return Ok(self.f64_to_json(values[0])); }
394 let result = values.iter().skip(1).fold(values[0], |acc, n| {
395 if *n == 0.0 { acc } else { acc / n }
396 });
397 Ok(self.f64_to_json(result))
398 }
399}