Skip to main content

json_eval_rs/jsoneval/
evaluate.rs

1use super::JSONEval;
2use crate::jsoneval::json_parser;
3use crate::jsoneval::path_utils;
4use crate::jsoneval::table_evaluate;
5use crate::jsoneval::cancellation::CancellationToken;
6use crate::utils::clean_float_noise_scalar;
7use crate::time_block;
8
9use serde_json::Value;
10
11#[cfg(feature = "parallel")]
12use rayon::prelude::*;
13
14#[cfg(feature = "parallel")]
15use std::sync::Mutex;
16
17impl JSONEval {
18    /// Evaluate the schema with the given data and context.
19    ///
20    /// # Arguments
21    ///
22    /// * `data` - The data to evaluate.
23    /// * `context` - The context to evaluate.
24    ///
25    /// # Returns
26    ///
27    /// A `Result` indicating success or an error message.
28    pub fn evaluate(
29        &mut self,
30        data: &str,
31        context: Option<&str>,
32        paths: Option<&[String]>,
33        token: Option<&CancellationToken>,
34    ) -> Result<(), String> {
35        if let Some(t) = token {
36            if t.is_cancelled() {
37                return Err("Cancelled".to_string());
38            }
39        }
40        time_block!("evaluate() [total]", {
41            let context_provided = context.is_some();
42
43            // Use SIMD-accelerated JSON parsing
44            let data: Value = time_block!("  parse data", { json_parser::parse_json_str(data)? });
45            let context: Value = time_block!("  parse context", {
46                json_parser::parse_json_str(context.unwrap_or("{}"))?
47            });
48
49            // Collect top-level data keys to selectively purge cache (before move)
50            let changed_data_paths: Vec<String> = if let Some(obj) = data.as_object() {
51                obj.keys().map(|k| format!("/{}", k)).collect()
52            } else {
53                Vec::new()
54            };
55
56            // Store data and replace in eval_data (clone once instead of twice)
57            self.data = data.clone();
58            time_block!("  replace_data_and_context", {
59                self.eval_data.replace_data_and_context(data, context);
60            });
61
62            // Selectively purge cache entries that depend on changed top-level data keys
63            // This is more efficient than clearing entire cache
64            time_block!("  purge_cache", {
65                self.purge_cache_for_changed_data(&changed_data_paths);
66
67                // Also purge context-dependent cache if context was provided
68                if context_provided {
69                    self.purge_cache_for_context_change();
70                }
71            });
72
73            // Call internal evaluate (uses existing data if not provided)
74            self.evaluate_internal(paths, token)
75        })
76    }
77
78    /// Internal evaluate that can be called when data is already set
79    /// This avoids double-locking and unnecessary data cloning for re-evaluation from evaluate_dependents
80    pub(crate) fn evaluate_internal(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) -> Result<(), String> {
81        if let Some(t) = token {
82            if t.is_cancelled() {
83                return Err("Cancelled".to_string());
84            }
85        }
86        time_block!("  evaluate_internal() [total]", {
87            // Acquire lock for synchronous execution
88            let _lock = self.eval_lock.lock().unwrap();
89
90            // Normalize paths to schema pointers for correct filtering
91            let normalized_paths_storage; // Keep alive
92            let normalized_paths = if let Some(p_list) = paths {
93                normalized_paths_storage = p_list
94                    .iter()
95                    .flat_map(|p| {
96                        let normalized = if p.starts_with("#/") {
97                            // Case 1: JSON Schema path (e.g. #/properties/foo) - keep as is
98                            p.to_string()
99                        } else if p.starts_with('/') {
100                            // Case 2: Rust Pointer path (e.g. /properties/foo) - ensure # prefix
101                            format!("#{}", p)
102                        } else {
103                            // Case 3: Dot notation (e.g. properties.foo) - replace dots with slashes and add prefix
104                            format!("#/{}", p.replace('.', "/"))
105                        };
106
107                        vec![normalized]
108                    })
109                    .collect::<Vec<_>>();
110                Some(normalized_paths_storage.as_slice())
111            } else {
112                None
113            };
114
115            // Borrow sorted_evaluations via Arc (avoid deep-cloning Vec<Vec<String>>)
116            let eval_batches = self.sorted_evaluations.clone();
117
118            // Process each batch - parallelize evaluations within each batch
119            // Batches are processed sequentially to maintain dependency order
120            // Process value evaluations (simple computed fields)
121            // These are independent of rule batches and should always run
122            let eval_data_values = self.eval_data.clone();
123            time_block!("      evaluate values", {
124                #[cfg(feature = "parallel")]
125                if self.value_evaluations.len() > 100 {
126                    let value_results: Mutex<Vec<(String, Value)>> =
127                        Mutex::new(Vec::with_capacity(self.value_evaluations.len()));
128
129                    self.value_evaluations.par_iter().for_each(|eval_key| {
130                        // Skip if has dependencies (will be handled in sorted batches)
131                        if let Some(deps) = self.dependencies.get(eval_key) {
132                            if !deps.is_empty() {
133                                return;
134                            }
135                        }
136
137                        // Filter items if paths are provided
138                        if let Some(filter_paths) = normalized_paths {
139                            if !filter_paths.is_empty()
140                                && !filter_paths.iter().any(|p| {
141                                    eval_key.starts_with(p.as_str())
142                                        || p.starts_with(eval_key.as_str())
143                                })
144                            {
145                                return;
146                            }
147                        }
148
149                        // For value evaluations (e.g. /properties/foo/value), we want the value at that path
150                        // The path in eval_key is like "#/properties/foo/value"
151                        let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
152
153                        // Try cache first (thread-safe)
154                        if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
155                            return;
156                        }
157
158                        // Cache miss - evaluate
159                        if let Some(logic_id) = self.evaluations.get(eval_key) {
160                            if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
161                                let cleaned_val = clean_float_noise_scalar(val);
162                                // Cache result (thread-safe)
163                                self.cache_result(eval_key, Value::Null, &eval_data_values);
164                                value_results
165                                    .lock()
166                                    .unwrap()
167                                    .push((pointer_path, cleaned_val));
168                            }
169                        }
170                    });
171
172                    // Write results to evaluated_schema
173                    for (result_path, value) in value_results.into_inner().unwrap() {
174                        if let Some(pointer_value) = self.evaluated_schema.pointer_mut(&result_path)
175                        {
176                            *pointer_value = value;
177                        }
178                    }
179                }
180
181                // Sequential execution for values (if not parallel or small count)
182                #[cfg(feature = "parallel")]
183                let value_eval_items = if self.value_evaluations.len() > 100 {
184                    &self.value_evaluations[0..0]
185                } else {
186                    &self.value_evaluations
187                };
188
189                #[cfg(not(feature = "parallel"))]
190                let value_eval_items = &self.value_evaluations;
191
192                for eval_key in value_eval_items.iter() {
193                    if let Some(t) = token {
194                        if t.is_cancelled() {
195                            return Err("Cancelled".to_string());
196                        }
197                    }
198                    // Skip if has dependencies (will be handled in sorted batches)
199                    if let Some(deps) = self.dependencies.get(eval_key) {
200                        if !deps.is_empty() {
201                            continue;
202                        }
203                    }
204
205                    // Filter items if paths are provided
206                    if let Some(filter_paths) = normalized_paths {
207                        if !filter_paths.is_empty()
208                            && !filter_paths.iter().any(|p| {
209                                eval_key.starts_with(p.as_str()) || p.starts_with(eval_key.as_str())
210                            })
211                        {
212                            continue;
213                        }
214                    }
215
216                    let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
217
218                    // Try cache first
219                    if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
220                        continue;
221                    }
222
223                    // Cache miss - evaluate
224                    if let Some(logic_id) = self.evaluations.get(eval_key) {
225                        if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
226                            let cleaned_val = clean_float_noise_scalar(val);
227                            // Cache result
228                            self.cache_result(eval_key, Value::Null, &eval_data_values);
229
230                            if let Some(pointer_value) =
231                                self.evaluated_schema.pointer_mut(&pointer_path)
232                            {
233                                *pointer_value = cleaned_val;
234                            }
235                        }
236                    }
237                }
238            });
239
240            time_block!("    process batches", {
241                for batch in eval_batches.iter() {
242                    if let Some(t) = token {
243                        if t.is_cancelled() {
244                            return Err("Cancelled".to_string());
245                        }
246                    }
247                    // Skip empty batches
248                    if batch.is_empty() {
249                        continue;
250                    }
251
252                    // Check if we can skip this entire batch optimization
253                    // If paths are provided, we can check if ANY item in batch matches ANY path
254                    if let Some(filter_paths) = normalized_paths {
255                        if !filter_paths.is_empty() {
256                            let batch_has_match = batch.iter().any(|eval_key| {
257                                filter_paths.iter().any(|p| {
258                                    eval_key.starts_with(p.as_str())
259                                        || p.starts_with(eval_key.as_str())
260                                })
261                            });
262                            if !batch_has_match {
263                                continue;
264                            }
265                        }
266                    }
267
268                    // No pre-checking cache - we'll check inside parallel execution
269                    // This allows thread-safe cache access during parallel evaluation
270
271                    // Parallel execution within batch (no dependencies between items)
272                    // Use Mutex for thread-safe result collection
273                    // Store both eval_key and result for cache storage
274                    let eval_data_snapshot = self.eval_data.clone();
275
276                    // Parallelize only if batch has multiple items (overhead not worth it for single item)
277
278                    #[cfg(feature = "parallel")]
279                    if batch.len() > 1000 {
280                        let results: Mutex<Vec<(String, String, Value)>> =
281                            Mutex::new(Vec::with_capacity(batch.len()));
282                        batch.par_iter().for_each(|eval_key| {
283                            // Filter individual items if paths are provided
284                            if let Some(filter_paths) = normalized_paths {
285                                if !filter_paths.is_empty()
286                                    && !filter_paths.iter().any(|p| {
287                                        eval_key.starts_with(p.as_str())
288                                            || p.starts_with(eval_key.as_str())
289                                    })
290                                {
291                                    return;
292                                }
293                            }
294
295                            let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
296
297                            // Try cache first (thread-safe)
298                            if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
299                                return;
300                            }
301
302                            // Cache miss - evaluate
303                            let is_table = self.table_metadata.contains_key(eval_key);
304
305                            if is_table {
306                                // Evaluate table using sandboxed metadata (parallel-safe, immutable parent scope)
307                                if let Ok(rows) = table_evaluate::evaluate_table(
308                                    self,
309                                    eval_key,
310                                    &eval_data_snapshot,
311                                    token
312                                ) {
313                                    let value = Value::Array(rows);
314                                    // Cache result (thread-safe)
315                                    self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
316                                    results.lock().unwrap().push((
317                                        eval_key.clone(),
318                                        pointer_path,
319                                        value,
320                                    ));
321                                }
322                            } else {
323                                if let Some(logic_id) = self.evaluations.get(eval_key) {
324                                    // Evaluate directly with snapshot
325                                    if let Ok(val) =
326                                        self.engine.run(logic_id, eval_data_snapshot.data())
327                                    {
328                                        let cleaned_val = clean_float_noise_scalar(val);
329                                        // Cache result (thread-safe)
330                                        self.cache_result(
331                                            eval_key,
332                                            Value::Null,
333                                            &eval_data_snapshot,
334                                        );
335                                        results.lock().unwrap().push((
336                                            eval_key.clone(),
337                                            pointer_path,
338                                            cleaned_val,
339                                        ));
340                                    }
341                                }
342                            }
343                        });
344
345                        // Write all results back sequentially (already cleaned in parallel execution)
346                        for (_eval_key, path, value) in results.into_inner().unwrap() {
347                            let cleaned_value = value;
348
349                            self.eval_data.set(&path, cleaned_value.clone());
350                            // Also write to evaluated_schema
351                            if let Some(schema_value) = self.evaluated_schema.pointer_mut(&path) {
352                                *schema_value = cleaned_value;
353                            }
354                        }
355                        continue;
356                    }
357
358                    // Sequential execution (single item or parallel feature disabled)
359                    #[cfg(not(feature = "parallel"))]
360                    let batch_items: &[String] = batch;
361
362                    #[cfg(feature = "parallel")]
363                    let batch_items: &[String] = if batch.len() > 1000 {
364                        &batch[0..0]
365                    } else {
366                        batch
367                    }; // Empty slice if already processed in parallel
368
369                    for eval_key in batch_items {
370                        if let Some(t) = token {
371                            if t.is_cancelled() {
372                                return Err("Cancelled".to_string());
373                            }
374                        }
375                        // Filter individual items if paths are provided
376                        if let Some(filter_paths) = normalized_paths {
377                            if !filter_paths.is_empty()
378                                && !filter_paths.iter().any(|p| {
379                                    eval_key.starts_with(p.as_str())
380                                        || p.starts_with(eval_key.as_str())
381                                })
382                            {
383                                continue;
384                            }
385                        }
386
387                        let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
388
389                        // Try cache first
390                        if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
391                            continue;
392                        }
393
394                        // Cache miss - evaluate
395                        let is_table = self.table_metadata.contains_key(eval_key);
396
397                        if is_table {
398                            if let Ok(rows) =
399                                table_evaluate::evaluate_table(self, eval_key, &eval_data_snapshot, token)
400                            {
401                                let value = Value::Array(rows);
402                                self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
403
404                                self.eval_data.set(&pointer_path, value.clone());
405                                if let Some(schema_value) =
406                                    self.evaluated_schema.pointer_mut(&pointer_path)
407                                {
408                                    *schema_value = value;
409                                }
410                            }
411                        } else {
412                            if let Some(logic_id) = self.evaluations.get(eval_key) {
413                                if let Ok(val) =
414                                    self.engine.run(logic_id, eval_data_snapshot.data())
415                                {
416                                    let cleaned_val = clean_float_noise_scalar(val);
417                                    // Cache result
418                                    self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
419
420                                    self.eval_data.set(&pointer_path, cleaned_val.clone());
421                                    if let Some(schema_value) =
422                                        self.evaluated_schema.pointer_mut(&pointer_path)
423                                    {
424                                        *schema_value = cleaned_val;
425                                    }
426                                }
427                            }
428                        }
429                    }
430                }
431            });
432
433            // Drop lock before calling evaluate_others
434            drop(_lock);
435
436            self.evaluate_others(paths, token);
437
438            Ok(())
439        })
440    }
441
442    pub(crate) fn evaluate_others(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) {
443        if let Some(t) = token {
444            if t.is_cancelled() {
445                return;
446            }
447        }
448        time_block!("    evaluate_others()", {
449            // Step 1: Evaluate "rules" and "others" categories with caching
450            // Rules are evaluated here so their values are available in evaluated_schema
451            let combined_count = self.rules_evaluations.len() + self.others_evaluations.len();
452            if combined_count > 0 {
453                time_block!("      evaluate rules+others", {
454                    let eval_data_snapshot = self.eval_data.clone();
455
456                    let normalized_paths: Option<Vec<String>> = paths.map(|p_list| {
457                        p_list
458                            .iter()
459                            .flat_map(|p| {
460                                let ptr = path_utils::dot_notation_to_schema_pointer(p);
461                                // Also support version with /properties/ prefix for root match
462                                let with_props = if ptr.starts_with("#/") {
463                                    format!("#/properties/{}", &ptr[2..])
464                                } else {
465                                    ptr.clone()
466                                };
467                                vec![ptr, with_props]
468                            })
469                            .collect()
470                    });
471
472                    #[cfg(feature = "parallel")]
473                    {
474                        let combined_results: Mutex<Vec<(String, Value)>> =
475                            Mutex::new(Vec::with_capacity(combined_count));
476
477                        self.rules_evaluations
478                            .par_iter()
479                            .chain(self.others_evaluations.par_iter())
480                            .for_each(|eval_key| {
481                                // Filter items if paths are provided
482                                if let Some(filter_paths) = normalized_paths.as_ref() {
483                                    if !filter_paths.is_empty()
484                                        && !filter_paths.iter().any(|p| {
485                                            eval_key.starts_with(p.as_str())
486                                                || p.starts_with(eval_key.as_str())
487                                        })
488                                    {
489                                        return;
490                                    }
491                                }
492
493                                let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
494
495                                // Try cache first (thread-safe)
496                                if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot)
497                                {
498                                    return;
499                                }
500
501                                // Cache miss - evaluate
502                                if let Some(logic_id) = self.evaluations.get(eval_key) {
503                                    if let Ok(val) =
504                                        self.engine.run(logic_id, eval_data_snapshot.data())
505                                    {
506                                        let cleaned_val = clean_float_noise_scalar(val);
507                                        // Cache result (thread-safe)
508                                        self.cache_result(
509                                            eval_key,
510                                            Value::Null,
511                                            &eval_data_snapshot,
512                                        );
513                                        combined_results
514                                            .lock()
515                                            .unwrap()
516                                            .push((pointer_path, cleaned_val));
517                                    }
518                                }
519                            });
520
521                        // Write results to evaluated_schema
522                        for (result_path, value) in combined_results.into_inner().unwrap() {
523                            if let Some(pointer_value) =
524                                self.evaluated_schema.pointer_mut(&result_path)
525                            {
526                                // Special handling for rules with $evaluation
527                                // This includes both direct rules and array items: /rules/evaluation/0/$evaluation
528                                if !result_path.starts_with("$")
529                                    && result_path.contains("/rules/")
530                                    && !result_path.ends_with("/value")
531                                {
532                                    match pointer_value.as_object_mut() {
533                                        Some(pointer_obj) => {
534                                            pointer_obj.remove("$evaluation");
535                                            pointer_obj.insert("value".to_string(), value);
536                                        }
537                                        None => continue,
538                                    }
539                                } else {
540                                    *pointer_value = value;
541                                }
542                            }
543                        }
544                    }
545
546                    #[cfg(not(feature = "parallel"))]
547                    {
548                        // Sequential evaluation
549                        let combined_evals: Vec<&String> = self
550                            .rules_evaluations
551                            .iter()
552                            .chain(self.others_evaluations.iter())
553                            .collect();
554
555                        for eval_key in combined_evals {
556                            if let Some(t) = token {
557                                if t.is_cancelled() {
558                                    return;
559                                }
560                            }
561                            // Filter items if paths are provided
562                            if let Some(filter_paths) = normalized_paths.as_ref() {
563                                if !filter_paths.is_empty()
564                                    && !filter_paths.iter().any(|p| {
565                                        eval_key.starts_with(p.as_str())
566                                            || p.starts_with(eval_key.as_str())
567                                    })
568                                {
569                                    continue;
570                                }
571                            }
572
573                            let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
574
575                            // Try cache first
576                            if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
577                                continue;
578                            }
579
580                            // Cache miss - evaluate
581                            if let Some(logic_id) = self.evaluations.get(eval_key) {
582                                if let Ok(val) =
583                                    self.engine.run(logic_id, eval_data_snapshot.data())
584                                {
585                                    let cleaned_val = clean_float_noise_scalar(val);
586                                    // Cache result
587                                    self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
588
589                                    if let Some(pointer_value) =
590                                        self.evaluated_schema.pointer_mut(&pointer_path)
591                                    {
592                                        if !pointer_path.starts_with("$")
593                                            && pointer_path.contains("/rules/")
594                                            && !pointer_path.ends_with("/value")
595                                        {
596                                            match pointer_value.as_object_mut() {
597                                                Some(pointer_obj) => {
598                                                    pointer_obj.remove("$evaluation");
599                                                    pointer_obj
600                                                        .insert("value".to_string(), cleaned_val);
601                                                }
602                                                None => continue,
603                                            }
604                                        } else {
605                                            *pointer_value = cleaned_val;
606                                        }
607                                    }
608                                }
609                            }
610                        }
611                    }
612                });
613            }
614        });
615
616        // Step 2: Evaluate options URL templates (handles {variable} patterns)
617        time_block!("      evaluate_options_templates", {
618            self.evaluate_options_templates(paths);
619        });
620
621        // Step 3: Resolve layout logic (metadata injection, hidden propagation)
622        time_block!("      resolve_layout", {
623            let _ = self.resolve_layout(false);
624        });
625    }
626
627    /// Evaluate options URL templates (handles {variable} patterns)
628    fn evaluate_options_templates(&mut self, paths: Option<&[String]>) {
629        // Use pre-collected options templates from parsing (Arc clone is cheap)
630        let templates_to_eval = self.options_templates.clone();
631
632        // Evaluate each template
633        for (path, template_str, params_path) in templates_to_eval.iter() {
634            // Filter items if paths are provided
635            // 'path' here is the schema path to the field (dot notation or similar, need to check)
636            // It seems to be schema pointer based on usage in other methods
637            if let Some(filter_paths) = paths {
638                if !filter_paths.is_empty()
639                    && !filter_paths
640                        .iter()
641                        .any(|p| path.starts_with(p.as_str()) || p.starts_with(path.as_str()))
642                {
643                    continue;
644                }
645            }
646
647            if let Some(params) = self.evaluated_schema.pointer(&params_path) {
648                if let Ok(evaluated) = self.evaluate_template(&template_str, params) {
649                    if let Some(target) = self.evaluated_schema.pointer_mut(&path) {
650                        *target = Value::String(evaluated);
651                    }
652                }
653            }
654        }
655    }
656
657    /// Evaluate a template string like "api/users/{id}" with params
658    fn evaluate_template(&self, template: &str, params: &Value) -> Result<String, String> {
659        let mut result = template.to_string();
660
661        // Simple template evaluation: replace {key} with params.key
662        if let Value::Object(params_map) = params {
663            for (key, value) in params_map {
664                let placeholder = format!("{{{}}}", key);
665                if let Some(str_val) = value.as_str() {
666                    result = result.replace(&placeholder, str_val);
667                } else {
668                    // Convert non-string values to strings
669                    result = result.replace(&placeholder, &value.to_string());
670                }
671            }
672        }
673
674        Ok(result)
675    }
676}