json_eval_rs/jsoneval/
evaluate.rs

1use super::JSONEval;
2use crate::jsoneval::json_parser;
3use crate::jsoneval::path_utils;
4use crate::jsoneval::table_evaluate;
5use crate::jsoneval::cancellation::CancellationToken;
6use crate::utils::clean_float_noise;
7use crate::time_block;
8
9use serde_json::Value;
10
11#[cfg(feature = "parallel")]
12use rayon::prelude::*;
13
14#[cfg(feature = "parallel")]
15use std::sync::Mutex;
16
17impl JSONEval {
18    /// Evaluate the schema with the given data and context.
19    ///
20    /// # Arguments
21    ///
22    /// * `data` - The data to evaluate.
23    /// * `context` - The context to evaluate.
24    ///
25    /// # Returns
26    ///
27    /// A `Result` indicating success or an error message.
28    pub fn evaluate(
29        &mut self,
30        data: &str,
31        context: Option<&str>,
32        paths: Option<&[String]>,
33        token: Option<&CancellationToken>,
34    ) -> Result<(), String> {
35        if let Some(t) = token {
36            if t.is_cancelled() {
37                return Err("Cancelled".to_string());
38            }
39        }
40        time_block!("evaluate() [total]", {
41            let context_provided = context.is_some();
42
43            // Use SIMD-accelerated JSON parsing
44            let data: Value = time_block!("  parse data", { json_parser::parse_json_str(data)? });
45            let context: Value = time_block!("  parse context", {
46                json_parser::parse_json_str(context.unwrap_or("{}"))?
47            });
48
49            self.data = data.clone();
50
51            // Collect top-level data keys to selectively purge cache
52            let changed_data_paths: Vec<String> = if let Some(obj) = data.as_object() {
53                obj.keys().map(|k| format!("/{}", k)).collect()
54            } else {
55                Vec::new()
56            };
57
58            // Replace data and context in existing eval_data
59            time_block!("  replace_data_and_context", {
60                self.eval_data.replace_data_and_context(data, context);
61            });
62
63            // Selectively purge cache entries that depend on changed top-level data keys
64            // This is more efficient than clearing entire cache
65            time_block!("  purge_cache", {
66                self.purge_cache_for_changed_data(&changed_data_paths);
67
68                // Also purge context-dependent cache if context was provided
69                if context_provided {
70                    self.purge_cache_for_context_change();
71                }
72            });
73
74            // Call internal evaluate (uses existing data if not provided)
75            self.evaluate_internal(paths, token)
76        })
77    }
78
79    /// Internal evaluate that can be called when data is already set
80    /// This avoids double-locking and unnecessary data cloning for re-evaluation from evaluate_dependents
81    pub(crate) fn evaluate_internal(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) -> Result<(), String> {
82        if let Some(t) = token {
83            if t.is_cancelled() {
84                return Err("Cancelled".to_string());
85            }
86        }
87        time_block!("  evaluate_internal() [total]", {
88            // Acquire lock for synchronous execution
89            let _lock = self.eval_lock.lock().unwrap();
90
91            // Normalize paths to schema pointers for correct filtering
92            let normalized_paths_storage; // Keep alive
93            let normalized_paths = if let Some(p_list) = paths {
94                normalized_paths_storage = p_list
95                    .iter()
96                    .flat_map(|p| {
97                        let normalized = if p.starts_with("#/") {
98                            // Case 1: JSON Schema path (e.g. #/properties/foo) - keep as is
99                            p.to_string()
100                        } else if p.starts_with('/') {
101                            // Case 2: Rust Pointer path (e.g. /properties/foo) - ensure # prefix
102                            format!("#{}", p)
103                        } else {
104                            // Case 3: Dot notation (e.g. properties.foo) - replace dots with slashes and add prefix
105                            format!("#/{}", p.replace('.', "/"))
106                        };
107
108                        vec![normalized]
109                    })
110                    .collect::<Vec<_>>();
111                Some(normalized_paths_storage.as_slice())
112            } else {
113                None
114            };
115
116            // Clone sorted_evaluations (Arc clone is cheap, then clone inner Vec)
117            let eval_batches: Vec<Vec<String>> = (*self.sorted_evaluations).clone();
118
119            // Process each batch - parallelize evaluations within each batch
120            // Batches are processed sequentially to maintain dependency order
121            // Process value evaluations (simple computed fields)
122            // These are independent of rule batches and should always run
123            let eval_data_values = self.eval_data.clone();
124            time_block!("      evaluate values", {
125                #[cfg(feature = "parallel")]
126                if self.value_evaluations.len() > 100 {
127                    let value_results: Mutex<Vec<(String, Value)>> =
128                        Mutex::new(Vec::with_capacity(self.value_evaluations.len()));
129
130                    self.value_evaluations.par_iter().for_each(|eval_key| {
131                        // Skip if has dependencies (will be handled in sorted batches)
132                        if let Some(deps) = self.dependencies.get(eval_key) {
133                            if !deps.is_empty() {
134                                return;
135                            }
136                        }
137
138                        // Filter items if paths are provided
139                        if let Some(filter_paths) = normalized_paths {
140                            if !filter_paths.is_empty()
141                                && !filter_paths.iter().any(|p| {
142                                    eval_key.starts_with(p.as_str())
143                                        || p.starts_with(eval_key.as_str())
144                                })
145                            {
146                                return;
147                            }
148                        }
149
150                        // For value evaluations (e.g. /properties/foo/value), we want the value at that path
151                        // The path in eval_key is like "#/properties/foo/value"
152                        let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
153
154                        // Try cache first (thread-safe)
155                        if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
156                            return;
157                        }
158
159                        // Cache miss - evaluate
160                        if let Some(logic_id) = self.evaluations.get(eval_key) {
161                            if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
162                                let cleaned_val = clean_float_noise(val);
163                                // Cache result (thread-safe)
164                                self.cache_result(eval_key, Value::Null, &eval_data_values);
165                                value_results
166                                    .lock()
167                                    .unwrap()
168                                    .push((pointer_path, cleaned_val));
169                            }
170                        }
171                    });
172
173                    // Write results to evaluated_schema
174                    for (result_path, value) in value_results.into_inner().unwrap() {
175                        if let Some(pointer_value) = self.evaluated_schema.pointer_mut(&result_path)
176                        {
177                            *pointer_value = value;
178                        }
179                    }
180                }
181
182                // Sequential execution for values (if not parallel or small count)
183                #[cfg(feature = "parallel")]
184                let value_eval_items = if self.value_evaluations.len() > 100 {
185                    &self.value_evaluations[0..0]
186                } else {
187                    &self.value_evaluations
188                };
189
190                #[cfg(not(feature = "parallel"))]
191                let value_eval_items = &self.value_evaluations;
192
193                for eval_key in value_eval_items.iter() {
194                    if let Some(t) = token {
195                        if t.is_cancelled() {
196                            return Err("Cancelled".to_string());
197                        }
198                    }
199                    // Skip if has dependencies (will be handled in sorted batches)
200                    if let Some(deps) = self.dependencies.get(eval_key) {
201                        if !deps.is_empty() {
202                            continue;
203                        }
204                    }
205
206                    // Filter items if paths are provided
207                    if let Some(filter_paths) = normalized_paths {
208                        if !filter_paths.is_empty()
209                            && !filter_paths.iter().any(|p| {
210                                eval_key.starts_with(p.as_str()) || p.starts_with(eval_key.as_str())
211                            })
212                        {
213                            continue;
214                        }
215                    }
216
217                    let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
218
219                    // Try cache first
220                    if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
221                        continue;
222                    }
223
224                    // Cache miss - evaluate
225                    if let Some(logic_id) = self.evaluations.get(eval_key) {
226                        if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
227                            let cleaned_val = clean_float_noise(val);
228                            // Cache result
229                            self.cache_result(eval_key, Value::Null, &eval_data_values);
230
231                            if let Some(pointer_value) =
232                                self.evaluated_schema.pointer_mut(&pointer_path)
233                            {
234                                *pointer_value = cleaned_val;
235                            }
236                        }
237                    }
238                }
239            });
240
241            time_block!("    process batches", {
242                for batch in eval_batches {
243                    if let Some(t) = token {
244                        if t.is_cancelled() {
245                            return Err("Cancelled".to_string());
246                        }
247                    }
248                    // Skip empty batches
249                    if batch.is_empty() {
250                        continue;
251                    }
252
253                    // Check if we can skip this entire batch optimization
254                    // If paths are provided, we can check if ANY item in batch matches ANY path
255                    if let Some(filter_paths) = normalized_paths {
256                        if !filter_paths.is_empty() {
257                            let batch_has_match = batch.iter().any(|eval_key| {
258                                filter_paths.iter().any(|p| {
259                                    eval_key.starts_with(p.as_str())
260                                        || p.starts_with(eval_key.as_str())
261                                })
262                            });
263                            if !batch_has_match {
264                                continue;
265                            }
266                        }
267                    }
268
269                    // No pre-checking cache - we'll check inside parallel execution
270                    // This allows thread-safe cache access during parallel evaluation
271
272                    // Parallel execution within batch (no dependencies between items)
273                    // Use Mutex for thread-safe result collection
274                    // Store both eval_key and result for cache storage
275                    let eval_data_snapshot = self.eval_data.clone();
276
277                    // Parallelize only if batch has multiple items (overhead not worth it for single item)
278
279                    #[cfg(feature = "parallel")]
280                    if batch.len() > 1000 {
281                        let results: Mutex<Vec<(String, String, Value)>> =
282                            Mutex::new(Vec::with_capacity(batch.len()));
283                        batch.par_iter().for_each(|eval_key| {
284                            // Filter individual items if paths are provided
285                            if let Some(filter_paths) = normalized_paths {
286                                if !filter_paths.is_empty()
287                                    && !filter_paths.iter().any(|p| {
288                                        eval_key.starts_with(p.as_str())
289                                            || p.starts_with(eval_key.as_str())
290                                    })
291                                {
292                                    return;
293                                }
294                            }
295
296                            let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
297
298                            // Try cache first (thread-safe)
299                            if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
300                                return;
301                            }
302
303                            // Cache miss - evaluate
304                            let is_table = self.table_metadata.contains_key(eval_key);
305
306                            if is_table {
307                                // Evaluate table using sandboxed metadata (parallel-safe, immutable parent scope)
308                                if let Ok(rows) = table_evaluate::evaluate_table(
309                                    self,
310                                    eval_key,
311                                    &eval_data_snapshot,
312                                    token
313                                ) {
314                                    let value = Value::Array(rows);
315                                    // Cache result (thread-safe)
316                                    self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
317                                    results.lock().unwrap().push((
318                                        eval_key.clone(),
319                                        pointer_path,
320                                        value,
321                                    ));
322                                }
323                            } else {
324                                if let Some(logic_id) = self.evaluations.get(eval_key) {
325                                    // Evaluate directly with snapshot
326                                    if let Ok(val) =
327                                        self.engine.run(logic_id, eval_data_snapshot.data())
328                                    {
329                                        let cleaned_val = clean_float_noise(val);
330                                        // Cache result (thread-safe)
331                                        self.cache_result(
332                                            eval_key,
333                                            Value::Null,
334                                            &eval_data_snapshot,
335                                        );
336                                        results.lock().unwrap().push((
337                                            eval_key.clone(),
338                                            pointer_path,
339                                            cleaned_val,
340                                        ));
341                                    }
342                                }
343                            }
344                        });
345
346                        // Write all results back sequentially (already cached in parallel execution)
347                        for (_eval_key, path, value) in results.into_inner().unwrap() {
348                            let cleaned_value = clean_float_noise(value);
349
350                            self.eval_data.set(&path, cleaned_value.clone());
351                            // Also write to evaluated_schema
352                            if let Some(schema_value) = self.evaluated_schema.pointer_mut(&path) {
353                                *schema_value = cleaned_value;
354                            }
355                        }
356                        continue;
357                    }
358
359                    // Sequential execution (single item or parallel feature disabled)
360                    #[cfg(not(feature = "parallel"))]
361                    let batch_items = &batch;
362
363                    #[cfg(feature = "parallel")]
364                    let batch_items = if batch.len() > 1000 {
365                        &batch[0..0]
366                    } else {
367                        &batch
368                    }; // Empty slice if already processed in parallel
369
370                    for eval_key in batch_items {
371                        if let Some(t) = token {
372                            if t.is_cancelled() {
373                                return Err("Cancelled".to_string());
374                            }
375                        }
376                        // Filter individual items if paths are provided
377                        if let Some(filter_paths) = normalized_paths {
378                            if !filter_paths.is_empty()
379                                && !filter_paths.iter().any(|p| {
380                                    eval_key.starts_with(p.as_str())
381                                        || p.starts_with(eval_key.as_str())
382                                })
383                            {
384                                continue;
385                            }
386                        }
387
388                        let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
389
390                        // Try cache first
391                        if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
392                            continue;
393                        }
394
395                        // Cache miss - evaluate
396                        let is_table = self.table_metadata.contains_key(eval_key);
397
398                        if is_table {
399                            if let Ok(rows) =
400                                table_evaluate::evaluate_table(self, eval_key, &eval_data_snapshot, token)
401                            {
402                                let value = Value::Array(rows);
403                                // Cache result
404                                self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
405
406                                let cleaned_value = clean_float_noise(value);
407                                self.eval_data.set(&pointer_path, cleaned_value.clone());
408                                if let Some(schema_value) =
409                                    self.evaluated_schema.pointer_mut(&pointer_path)
410                                {
411                                    *schema_value = cleaned_value;
412                                }
413                            }
414                        } else {
415                            if let Some(logic_id) = self.evaluations.get(eval_key) {
416                                if let Ok(val) =
417                                    self.engine.run(logic_id, eval_data_snapshot.data())
418                                {
419                                    let cleaned_val = clean_float_noise(val);
420                                    // Cache result
421                                    self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
422
423                                    self.eval_data.set(&pointer_path, cleaned_val.clone());
424                                    if let Some(schema_value) =
425                                        self.evaluated_schema.pointer_mut(&pointer_path)
426                                    {
427                                        *schema_value = cleaned_val;
428                                    }
429                                }
430                            }
431                        }
432                    }
433                }
434            });
435
436            // Drop lock before calling evaluate_others
437            drop(_lock);
438
439            self.evaluate_others(paths, token);
440
441            Ok(())
442        })
443    }
444
445    pub(crate) fn evaluate_others(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) {
446        if let Some(t) = token {
447            if t.is_cancelled() {
448                return;
449            }
450        }
451        time_block!("    evaluate_others()", {
452            // Step 1: Evaluate "rules" and "others" categories with caching
453            // Rules are evaluated here so their values are available in evaluated_schema
454            let combined_count = self.rules_evaluations.len() + self.others_evaluations.len();
455            if combined_count > 0 {
456                time_block!("      evaluate rules+others", {
457                    let eval_data_snapshot = self.eval_data.clone();
458
459                    let normalized_paths: Option<Vec<String>> = paths.map(|p_list| {
460                        p_list
461                            .iter()
462                            .flat_map(|p| {
463                                let ptr = path_utils::dot_notation_to_schema_pointer(p);
464                                // Also support version with /properties/ prefix for root match
465                                let with_props = if ptr.starts_with("#/") {
466                                    format!("#/properties/{}", &ptr[2..])
467                                } else {
468                                    ptr.clone()
469                                };
470                                vec![ptr, with_props]
471                            })
472                            .collect()
473                    });
474
475                    #[cfg(feature = "parallel")]
476                    {
477                        let combined_results: Mutex<Vec<(String, Value)>> =
478                            Mutex::new(Vec::with_capacity(combined_count));
479
480                        self.rules_evaluations
481                            .par_iter()
482                            .chain(self.others_evaluations.par_iter())
483                            .for_each(|eval_key| {
484                                // Filter items if paths are provided
485                                if let Some(filter_paths) = normalized_paths.as_ref() {
486                                    if !filter_paths.is_empty()
487                                        && !filter_paths.iter().any(|p| {
488                                            eval_key.starts_with(p.as_str())
489                                                || p.starts_with(eval_key.as_str())
490                                        })
491                                    {
492                                        return;
493                                    }
494                                }
495
496                                let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
497
498                                // Try cache first (thread-safe)
499                                if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot)
500                                {
501                                    return;
502                                }
503
504                                // Cache miss - evaluate
505                                if let Some(logic_id) = self.evaluations.get(eval_key) {
506                                    if let Ok(val) =
507                                        self.engine.run(logic_id, eval_data_snapshot.data())
508                                    {
509                                        let cleaned_val = clean_float_noise(val);
510                                        // Cache result (thread-safe)
511                                        self.cache_result(
512                                            eval_key,
513                                            Value::Null,
514                                            &eval_data_snapshot,
515                                        );
516                                        combined_results
517                                            .lock()
518                                            .unwrap()
519                                            .push((pointer_path, cleaned_val));
520                                    }
521                                }
522                            });
523
524                        // Write results to evaluated_schema
525                        for (result_path, value) in combined_results.into_inner().unwrap() {
526                            if let Some(pointer_value) =
527                                self.evaluated_schema.pointer_mut(&result_path)
528                            {
529                                // Special handling for rules with $evaluation
530                                // This includes both direct rules and array items: /rules/evaluation/0/$evaluation
531                                if !result_path.starts_with("$")
532                                    && result_path.contains("/rules/")
533                                    && !result_path.ends_with("/value")
534                                {
535                                    match pointer_value.as_object_mut() {
536                                        Some(pointer_obj) => {
537                                            pointer_obj.remove("$evaluation");
538                                            pointer_obj.insert("value".to_string(), value);
539                                        }
540                                        None => continue,
541                                    }
542                                } else {
543                                    *pointer_value = value;
544                                }
545                            }
546                        }
547                    }
548
549                    #[cfg(not(feature = "parallel"))]
550                    {
551                        // Sequential evaluation
552                        let combined_evals: Vec<&String> = self
553                            .rules_evaluations
554                            .iter()
555                            .chain(self.others_evaluations.iter())
556                            .collect();
557
558                        for eval_key in combined_evals {
559                            if let Some(t) = token {
560                                if t.is_cancelled() {
561                                    return;
562                                }
563                            }
564                            // Filter items if paths are provided
565                            if let Some(filter_paths) = normalized_paths.as_ref() {
566                                if !filter_paths.is_empty()
567                                    && !filter_paths.iter().any(|p| {
568                                        eval_key.starts_with(p.as_str())
569                                            || p.starts_with(eval_key.as_str())
570                                    })
571                                {
572                                    continue;
573                                }
574                            }
575
576                            let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
577
578                            // Try cache first
579                            if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
580                                continue;
581                            }
582
583                            // Cache miss - evaluate
584                            if let Some(logic_id) = self.evaluations.get(eval_key) {
585                                if let Ok(val) =
586                                    self.engine.run(logic_id, eval_data_snapshot.data())
587                                {
588                                    let cleaned_val = clean_float_noise(val);
589                                    // Cache result
590                                    self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
591
592                                    if let Some(pointer_value) =
593                                        self.evaluated_schema.pointer_mut(&pointer_path)
594                                    {
595                                        if !pointer_path.starts_with("$")
596                                            && pointer_path.contains("/rules/")
597                                            && !pointer_path.ends_with("/value")
598                                        {
599                                            match pointer_value.as_object_mut() {
600                                                Some(pointer_obj) => {
601                                                    pointer_obj.remove("$evaluation");
602                                                    pointer_obj
603                                                        .insert("value".to_string(), cleaned_val);
604                                                }
605                                                None => continue,
606                                            }
607                                        } else {
608                                            *pointer_value = cleaned_val;
609                                        }
610                                    }
611                                }
612                            }
613                        }
614                    }
615                });
616            }
617        });
618
619        // Step 2: Evaluate options URL templates (handles {variable} patterns)
620        time_block!("      evaluate_options_templates", {
621            self.evaluate_options_templates(paths);
622        });
623
624        // Step 3: Resolve layout logic (metadata injection, hidden propagation)
625        time_block!("      resolve_layout", {
626            let _ = self.resolve_layout(false);
627        });
628    }
629
630    /// Evaluate options URL templates (handles {variable} patterns)
631    fn evaluate_options_templates(&mut self, paths: Option<&[String]>) {
632        // Use pre-collected options templates from parsing (Arc clone is cheap)
633        let templates_to_eval = self.options_templates.clone();
634
635        // Evaluate each template
636        for (path, template_str, params_path) in templates_to_eval.iter() {
637            // Filter items if paths are provided
638            // 'path' here is the schema path to the field (dot notation or similar, need to check)
639            // It seems to be schema pointer based on usage in other methods
640            if let Some(filter_paths) = paths {
641                if !filter_paths.is_empty()
642                    && !filter_paths
643                        .iter()
644                        .any(|p| path.starts_with(p.as_str()) || p.starts_with(path.as_str()))
645                {
646                    continue;
647                }
648            }
649
650            if let Some(params) = self.evaluated_schema.pointer(&params_path) {
651                if let Ok(evaluated) = self.evaluate_template(&template_str, params) {
652                    if let Some(target) = self.evaluated_schema.pointer_mut(&path) {
653                        *target = Value::String(evaluated);
654                    }
655                }
656            }
657        }
658    }
659
660    /// Evaluate a template string like "api/users/{id}" with params
661    fn evaluate_template(&self, template: &str, params: &Value) -> Result<String, String> {
662        let mut result = template.to_string();
663
664        // Simple template evaluation: replace {key} with params.key
665        if let Value::Object(params_map) = params {
666            for (key, value) in params_map {
667                let placeholder = format!("{{{}}}", key);
668                if let Some(str_val) = value.as_str() {
669                    result = result.replace(&placeholder, str_val);
670                } else {
671                    // Convert non-string values to strings
672                    result = result.replace(&placeholder, &value.to_string());
673                }
674            }
675        }
676
677        Ok(result)
678    }
679}