json_eval_rs/rlogic/evaluator/
array_ops.rs

1use super::{Evaluator, types::*};
2use serde_json::{Value, Map as JsonMap};
3use super::super::compiled::CompiledLogic;
4use super::helpers;
5
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8
9#[cfg(feature = "parallel")]
10const PARALLEL_THRESHOLD: usize = 1000; // Parallelize arrays with 10+ elements for better performance
11
12impl Evaluator {
13    /// Execute array quantifier (all/some/none) - ZERO-COPY
14    pub(super) fn eval_quantifier(&self, quantifier: Quantifier, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
15        let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
16        if let Value::Array(arr) = array_val {
17            // Parallelize for large arrays
18            #[cfg(feature = "parallel")]
19            if arr.len() >= PARALLEL_THRESHOLD {
20                let result = match quantifier {
21                    Quantifier::All => {
22                        arr.par_iter()
23                            .try_fold(|| true, |_, item| -> Result<bool, String> {
24                                // Use item as user_data, no internal context needed
25                                let result = self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?;
26                                Ok(helpers::is_truthy(&result))
27                            })
28                            .try_reduce(|| true, |a, b| -> Result<bool, String> { Ok(a && b) })?
29                    },
30                    Quantifier::Some => {
31                        arr.par_iter()
32                            .try_fold(|| false, |_, item| -> Result<bool, String> {
33                                let result = self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?;
34                                Ok(helpers::is_truthy(&result))
35                            })
36                            .try_reduce(|| false, |a, b| -> Result<bool, String> { Ok(a || b) })?
37                    },
38                    Quantifier::None => {
39                        arr.par_iter()
40                            .try_fold(|| true, |_, item| -> Result<bool, String> {
41                                let result = self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?;
42                                Ok(!helpers::is_truthy(&result))
43                            })
44                            .try_reduce(|| true, |a, b| -> Result<bool, String> { Ok(a && b) })?
45                    }
46                };
47                return Ok(Value::Bool(result));
48            }
49            
50            for item in arr {
51                let result = self.evaluate_with_context(logic_expr, &item, &Value::Null, depth + 1)?;
52                let truthy = helpers::is_truthy(&result);
53                match quantifier {
54                    Quantifier::All if !truthy => return Ok(Value::Bool(false)),
55                    Quantifier::Some if truthy => return Ok(Value::Bool(true)),
56                    Quantifier::None if truthy => return Ok(Value::Bool(false)),
57                    _ => {}
58                }
59            }
60            Ok(Value::Bool(match quantifier {
61                Quantifier::All => true,
62                Quantifier::Some => false,
63                Quantifier::None => true,
64            }))
65        } else {
66            Ok(Value::Bool(match quantifier {
67                Quantifier::All | Quantifier::Some => false,
68                Quantifier::None => true,
69            }))
70        }
71    }
72
73    /// Evaluate map operation - ZERO-COPY
74    pub(super) fn eval_map(&self, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
75        let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
76        if let Value::Array(arr) = array_val {
77            // Parallelize for large arrays
78            #[cfg(feature = "parallel")]
79            if arr.len() >= PARALLEL_THRESHOLD {
80                let results: Result<Vec<_>, String> = arr.par_iter()
81                    .map(|item| self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1))
82                    .collect();
83                return Ok(Value::Array(results?));
84            }
85            
86            let mut results = Vec::with_capacity(arr.len());
87            for item in &arr {
88                results.push(self.evaluate_with_context(logic_expr, item, &Value::Null, depth + 1)?);
89            }
90            Ok(Value::Array(results))
91        } else {
92            Ok(Value::Array(vec![]))
93        }
94    }
95
96    /// Evaluate filter operation - ZERO-COPY
97    pub(super) fn eval_filter(&self, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
98        let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
99        if let Value::Array(arr) = array_val {
100            // Parallelize for large arrays
101            #[cfg(feature = "parallel")]
102            if arr.len() >= PARALLEL_THRESHOLD {
103                let results: Result<Vec<_>, String> = arr.into_par_iter()
104                    .filter_map(|item| {
105                        match self.evaluate_with_context(logic_expr, &item, &Value::Null, depth + 1) {
106                            Ok(result) if helpers::is_truthy(&result) => Some(Ok(item)),
107                            Ok(_) => None,
108                            Err(e) => Some(Err(e)),
109                        }
110                    })
111                    .collect();
112                return Ok(Value::Array(results?));
113            }
114            
115            let mut results = Vec::with_capacity(arr.len());
116            for item in arr.into_iter() {
117                let result = self.evaluate_with_context(logic_expr, &item, &Value::Null, depth + 1)?;
118                if helpers::is_truthy(&result) {
119                    results.push(item);
120                }
121            }
122            Ok(Value::Array(results))
123        } else {
124            Ok(Value::Array(vec![]))
125        }
126    }
127
128    /// Evaluate reduce operation - ZERO-COPY
129    pub(super) fn eval_reduce(&self, array_expr: &CompiledLogic, logic_expr: &CompiledLogic, initial_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
130        let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
131        let mut accumulator = self.evaluate_with_context(initial_expr, user_data, internal_context, depth + 1)?;
132
133        if let Value::Array(arr) = array_val {
134            for item in arr {
135                // Create small context with current and accumulator
136                let mut context = JsonMap::with_capacity(2);
137                context.insert("current".to_string(), item);
138                context.insert("accumulator".to_string(), accumulator);
139                let combined = Value::Object(context);
140                accumulator = self.evaluate_with_context(logic_expr, &combined, &Value::Null, depth + 1)?;
141            }
142        }
143        Ok(accumulator)
144    }
145
146    /// Evaluate merge operation - ZERO-COPY
147    pub(super) fn eval_merge(&self, items: &[CompiledLogic], user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
148        let mut merged = Vec::new();
149        for item in items {
150            let val = self.evaluate_with_context(item, user_data, internal_context, depth + 1)?;
151            if let Value::Array(arr) = val {
152                merged.extend(arr);
153            } else {
154                merged.push(val);
155            }
156        }
157        Ok(Value::Array(merged))
158    }
159
160    /// Evaluate in operation (check if value exists in array) - ZERO-COPY
161    pub(super) fn eval_in(&self, value_expr: &CompiledLogic, array_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
162        use ahash::{AHashSet, RandomState};
163
164        const HASH_SET_THRESHOLD: usize = 32;
165
166        let value = self.evaluate_with_context(value_expr, user_data, internal_context, depth + 1)?;
167        let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
168
169        if let Value::Array(arr) = array_val {
170            if arr.len() > HASH_SET_THRESHOLD {
171                if let Some(key) = helpers::scalar_hash_key(&value) {
172                    let mut set = AHashSet::with_capacity_and_hasher(arr.len(), RandomState::new());
173                    let mut all_scalar = true;
174                    for item in &arr {
175                        if let Some(item_key) = helpers::scalar_hash_key(item) {
176                            set.insert(item_key);
177                        } else {
178                            all_scalar = false;
179                            break;
180                        }
181                    }
182                    if all_scalar {
183                        return Ok(Value::Bool(set.contains(&key)));
184                    }
185                }
186            }
187            for item in arr {
188                if helpers::loose_equal(&value, &item) {
189                    return Ok(Value::Bool(true));
190                }
191            }
192            Ok(Value::Bool(false))
193        } else if let Value::String(s) = array_val {
194            if let Value::String(needle) = value {
195                Ok(Value::Bool(s.contains(&needle)))
196            } else {
197                Ok(Value::Bool(false))
198            }
199        } else {
200            Ok(Value::Bool(false))
201        }
202    }
203
204    /// Evaluate Sum operation with optional indexThreshold - ZERO-COPY
205    pub(super) fn eval_sum(&self, array_expr: &CompiledLogic, field_expr: &Option<Box<CompiledLogic>>, threshold_expr: &Option<Box<CompiledLogic>>, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
206        let array_val = self.evaluate_with_context(array_expr, user_data, internal_context, depth + 1)?;
207        
208        // Evaluate threshold if provided
209        let threshold = if let Some(thresh_e) = threshold_expr {
210            let thresh_val = self.evaluate_with_context(thresh_e, user_data, internal_context, depth + 1)?;
211            helpers::to_f64(&thresh_val) as i64
212        } else {
213            -1  // No threshold
214        };
215
216        let sum = match &array_val {
217            Value::Array(arr) => {
218                // Check if field is provided and is a non-null string
219                let field_name_opt = if let Some(field_e) = field_expr {
220                    let field_val = self.evaluate_with_context(field_e, user_data, internal_context, depth + 1)?;
221                    match field_val {
222                        Value::String(s) => Some(s),
223                        Value::Null => None,  // Treat null as no field
224                        _ => None,
225                    }
226                } else {
227                    None
228                };
229                
230                // Apply threshold if specified
231                let items_to_process = if threshold >= 0 {
232                    let limit = (threshold as usize + 1).min(arr.len());
233                    &arr[..limit]
234                } else {
235                    arr
236                };
237                
238                if let Some(field_name) = field_name_opt {
239                    // Sum with field name
240                    #[cfg(feature = "parallel")]
241                    if items_to_process.len() >= PARALLEL_THRESHOLD {
242                        items_to_process.par_iter()
243                            .filter_map(|item| {
244                                if let Value::Object(obj) = item {
245                                    obj.get(&field_name).map(|val| helpers::to_f64(val))
246                                } else {
247                                    None
248                                }
249                            })
250                            .sum()
251                    } else {
252                        let mut sum = 0.0_f64;
253                        for item in items_to_process {
254                            if let Value::Object(obj) = item {
255                                if let Some(val) = obj.get(&field_name) {
256                                    sum += helpers::to_f64(val);
257                                }
258                            }
259                        }
260                        sum
261                    }
262                    
263                    #[cfg(not(feature = "parallel"))]
264                    {
265                        let mut sum = 0.0_f64;
266                        for item in items_to_process {
267                            if let Value::Object(obj) = item {
268                                if let Some(val) = obj.get(&field_name) {
269                                    sum += helpers::to_f64(val);
270                                }
271                            }
272                        }
273                        sum
274                    }
275                } else {
276                    // Sum without field name (threshold already applied above)
277                    #[cfg(feature = "parallel")]
278                    if items_to_process.len() >= PARALLEL_THRESHOLD {
279                        items_to_process.par_iter().map(|item| helpers::to_f64(item)).sum()
280                    } else {
281                        items_to_process.iter().map(|item| helpers::to_f64(item)).sum()
282                    }
283                    
284                    #[cfg(not(feature = "parallel"))]
285                    items_to_process.iter().map(|item| helpers::to_f64(item)).sum()
286                }
287            }
288            _ => helpers::to_f64(&array_val),
289        };
290
291        Ok(self.f64_to_json(sum))
292    }
293
294    /// Evaluate For loop operation - TRUE ZERO-COPY IMPLEMENTATION
295    /// 
296    /// This is the key optimization: instead of cloning the entire user_data context,
297    /// we create a tiny internal_context with just $loopIteration and pass user_data by reference.
298    pub(super) fn eval_for(&self, start_expr: &CompiledLogic, end_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
299        let next_depth = depth + 1;
300        let start_val = self.evaluate_with_context(start_expr, user_data, internal_context, next_depth)?;
301        let end_val = self.evaluate_with_context(end_expr, user_data, internal_context, next_depth)?;
302        let start = helpers::to_number(&start_val) as i64;
303        let end = helpers::to_number(&end_val) as i64;
304
305        // CRITICAL: FOR returns an ARRAY of all iteration results (for use with MULTIPLIES, etc.)
306        let mut results = Vec::new();
307
308        // ZERO-COPY: Create tiny contexts for each iteration, no cloning of user_data!
309        for i in start..end {
310            // Create minimal internal context with just $loopIteration
311            let loop_context = serde_json::json!({
312                "$loopIteration": i
313            });
314            
315            // Evaluate with loop context as internal_context, user_data remains untouched
316            let result = self.evaluate_with_context(logic_expr, user_data, &loop_context, next_depth)?;
317            results.push(result);
318        }
319
320        Ok(Value::Array(results))
321    }
322
323    /// Evaluate Multiplies operation (product of array values) - ZERO-COPY
324    /// Optimized for MULTIPLIES+FOR pattern
325    pub(super) fn eval_multiplies(&self, items: &[CompiledLogic], user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
326        // OPTIMIZATION: Detect MULTIPLIES containing single FOR loop
327        // Pattern: MULTIPLIES([FOR(start, end, body)])
328        // Instead of: FOR creates array -> flatten -> multiply
329        // Optimize to: compute product directly in loop (with parallelization)
330        if items.len() == 1 {
331            if let CompiledLogic::For(start_expr, end_expr, logic_expr) = &items[0] {
332                return self.eval_multiplies_for(start_expr, end_expr, logic_expr, user_data, internal_context, depth);
333            }
334        }
335
336        // Standard path: flatten and multiply
337        let values = self.flatten_array_values(items, user_data, internal_context, depth)?;
338        if values.is_empty() { return Ok(Value::Null); }
339        if values.len() == 1 { return Ok(self.f64_to_json(values[0])); }
340        let result = values.iter().skip(1).fold(values[0], |acc, n| acc * n);
341        Ok(self.f64_to_json(result))
342    }
343
344    /// Optimized MULTIPLIES+FOR: compute product directly without intermediate array
345    /// Uses parallel computation for large ranges (>= PARALLEL_THRESHOLD)
346    fn eval_multiplies_for(&self, start_expr: &CompiledLogic, end_expr: &CompiledLogic, logic_expr: &CompiledLogic, user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
347        let next_depth = depth + 1;
348        let start_val = self.evaluate_with_context(start_expr, user_data, internal_context, next_depth)?;
349        let end_val = self.evaluate_with_context(end_expr, user_data, internal_context, next_depth)?;
350        let start = helpers::to_number(&start_val) as i64;
351        let end = helpers::to_number(&end_val) as i64;
352
353        if start >= end {
354            return Ok(Value::Null);
355        }
356
357        #[cfg(feature = "parallel")]
358        let range_size = (end - start) as usize;
359
360        // Parallelize for large ranges
361        #[cfg(feature = "parallel")]
362        if range_size >= PARALLEL_THRESHOLD {
363            let result = (start..end)
364                .into_par_iter()
365                .try_fold(|| 1.0_f64, |product, i| -> Result<f64, String> {
366                    let loop_context = serde_json::json!({
367                        "$loopIteration": i
368                    });
369                    let val = self.evaluate_with_context(logic_expr, user_data, &loop_context, next_depth)?;
370                    Ok(product * helpers::to_f64(&val))
371                })
372                .try_reduce(|| 1.0, |a, b| -> Result<f64, String> { Ok(a * b) })?;
373            
374            return Ok(self.f64_to_json(result));
375        }
376        
377        // Sequential for small ranges or when parallel feature is disabled
378        let mut product = 1.0_f64;
379        for i in start..end {
380            let loop_context = serde_json::json!({
381                "$loopIteration": i
382            });
383            let val = self.evaluate_with_context(logic_expr, user_data, &loop_context, next_depth)?;
384            product *= helpers::to_f64(&val);
385        }
386        Ok(self.f64_to_json(product))
387    }
388
389    /// Evaluate Divides operation (division of array values) - ZERO-COPY
390    pub(super) fn eval_divides(&self, items: &[CompiledLogic], user_data: &Value, internal_context: &Value, depth: usize) -> Result<Value, String> {
391        let values = self.flatten_array_values(items, user_data, internal_context, depth)?;
392        if values.is_empty() { return Ok(Value::Null); }
393        if values.len() == 1 { return Ok(self.f64_to_json(values[0])); }
394        let result = values.iter().skip(1).fold(values[0], |acc, n| {
395            if *n == 0.0 { acc } else { acc / n }
396        });
397        Ok(self.f64_to_json(result))
398    }
399}