json_eval_rs/jsoneval/evaluate.rs
1use super::JSONEval;
2use crate::jsoneval::json_parser;
3use crate::jsoneval::path_utils;
4use crate::jsoneval::table_evaluate;
5use crate::jsoneval::cancellation::CancellationToken;
6use crate::utils::clean_float_noise_scalar;
7use crate::time_block;
8
9use serde_json::Value;
10
11#[cfg(feature = "parallel")]
12use rayon::prelude::*;
13
14#[cfg(feature = "parallel")]
15use std::sync::Mutex;
16
17impl JSONEval {
18 /// Evaluate the schema with the given data and context.
19 ///
20 /// # Arguments
21 ///
22 /// * `data` - The data to evaluate.
23 /// * `context` - The context to evaluate.
24 ///
25 /// # Returns
26 ///
27 /// A `Result` indicating success or an error message.
28 pub fn evaluate(
29 &mut self,
30 data: &str,
31 context: Option<&str>,
32 paths: Option<&[String]>,
33 token: Option<&CancellationToken>,
34 ) -> Result<(), String> {
35 if let Some(t) = token {
36 if t.is_cancelled() {
37 return Err("Cancelled".to_string());
38 }
39 }
40 time_block!("evaluate() [total]", {
41 let context_provided = context.is_some();
42
43 // Use SIMD-accelerated JSON parsing
44 let data: Value = time_block!(" parse data", { json_parser::parse_json_str(data)? });
45 let context: Value = time_block!(" parse context", {
46 json_parser::parse_json_str(context.unwrap_or("{}"))?
47 });
48
49 // Capture old data and context before overwriting, to allow precise value-based cache invalidation
50 let old_data = self.data.clone();
51 let old_context = self.context.clone();
52
53 // Store data, context and replace in eval_data (clone once instead of twice)
54 self.data = data.clone();
55 self.context = context.clone();
56 time_block!(" replace_data_and_context", {
57 self.eval_data.replace_data_and_context(data, context);
58 });
59
60 // Selectively purge cache entries by deeply diffing data
61 // This is more efficient than clearing entire cache
62 time_block!(" purge_cache", {
63 self.purge_cache_for_changed_data_with_comparison(&old_data, &self.data);
64
65 // Only purge context-dependent cache if context actually changed
66 if context_provided && old_context != self.context {
67 self.purge_cache_for_context_change();
68 }
69 });
70
71 // Call internal evaluate (uses existing data if not provided)
72 self.evaluate_internal(paths, token)
73 })
74 }
75
76 /// Internal evaluate that can be called when data is already set
77 /// This avoids double-locking and unnecessary data cloning for re-evaluation from evaluate_dependents
78 pub(crate) fn evaluate_internal(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) -> Result<(), String> {
79 if let Some(t) = token {
80 if t.is_cancelled() {
81 return Err("Cancelled".to_string());
82 }
83 }
84 time_block!(" evaluate_internal() [total]", {
85 // Acquire lock for synchronous execution
86 let _lock = self.eval_lock.lock().unwrap();
87
88 // Normalize paths to schema pointers for correct filtering
89 let normalized_paths_storage; // Keep alive
90 let normalized_paths = if let Some(p_list) = paths {
91 normalized_paths_storage = p_list
92 .iter()
93 .flat_map(|p| {
94 let normalized = if p.starts_with("#/") {
95 // Case 1: JSON Schema path (e.g. #/properties/foo) - keep as is
96 p.to_string()
97 } else if p.starts_with('/') {
98 // Case 2: Rust Pointer path (e.g. /properties/foo) - ensure # prefix
99 format!("#{}", p)
100 } else {
101 // Case 3: Dot notation (e.g. properties.foo) - replace dots with slashes and add prefix
102 format!("#/{}", p.replace('.', "/"))
103 };
104
105 vec![normalized]
106 })
107 .collect::<Vec<_>>();
108 Some(normalized_paths_storage.as_slice())
109 } else {
110 None
111 };
112
113 // Borrow sorted_evaluations via Arc (avoid deep-cloning Vec<Vec<String>>)
114 let eval_batches = self.sorted_evaluations.clone();
115
116 // Track cache misses across batches to prevent false hits from large skipped arrays
117 let missed_keys = dashmap::DashSet::new();
118
119 // Process each batch - parallelize evaluations within each batch
120 // Batches are processed sequentially to maintain dependency order
121 // Process value evaluations (simple computed fields)
122 // These are independent of rule batches and should always run
123 let eval_data_values = self.eval_data.clone();
124 time_block!(" evaluate values", {
125 #[cfg(feature = "parallel")]
126 if self.value_evaluations.len() > 100 {
127 let value_results: Mutex<Vec<(String, Value)>> =
128 Mutex::new(Vec::with_capacity(self.value_evaluations.len()));
129
130 self.value_evaluations.par_iter().for_each(|eval_key| {
131 // Skip if has dependencies (will be handled in sorted batches)
132 if let Some(deps) = self.dependencies.get(eval_key) {
133 if !deps.is_empty() {
134 return;
135 }
136 }
137
138 // Filter items if paths are provided
139 if let Some(filter_paths) = normalized_paths {
140 if !filter_paths.is_empty()
141 && !filter_paths.iter().any(|p| {
142 eval_key.starts_with(p.as_str())
143 || p.starts_with(eval_key.as_str())
144 })
145 {
146 return;
147 }
148 }
149
150 // For value evaluations (e.g. /properties/foo/value), we want the value at that path
151 // The path in eval_key is like "#/properties/foo/value"
152 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
153
154 // Try cache first (thread-safe)
155 if let Some(_) = self.try_get_cached(eval_key, &eval_data_values, &missed_keys) {
156 return;
157 }
158
159 // Cache miss - evaluate
160 if let Some(logic_id) = self.evaluations.get(eval_key) {
161 if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
162 let cleaned_val = clean_float_noise_scalar(val);
163 // Cache result (thread-safe)
164 self.cache_result(eval_key, Value::Null, &eval_data_values);
165 value_results
166 .lock()
167 .unwrap()
168 .push((pointer_path, cleaned_val));
169 }
170 }
171 });
172
173 // Write results to evaluated_schema
174 for (result_path, value) in value_results.into_inner().unwrap() {
175 if let Some(pointer_value) = self.evaluated_schema.pointer_mut(&result_path)
176 {
177 *pointer_value = value;
178 }
179 }
180 }
181
182 // Sequential execution for values (if not parallel or small count)
183 #[cfg(feature = "parallel")]
184 let value_eval_items = if self.value_evaluations.len() > 100 {
185 &self.value_evaluations[0..0]
186 } else {
187 &self.value_evaluations
188 };
189
190 #[cfg(not(feature = "parallel"))]
191 let value_eval_items = &self.value_evaluations;
192
193 for eval_key in value_eval_items.iter() {
194 if let Some(t) = token {
195 if t.is_cancelled() {
196 return Err("Cancelled".to_string());
197 }
198 }
199 // Skip if has dependencies (will be handled in sorted batches)
200 if let Some(deps) = self.dependencies.get(eval_key) {
201 if !deps.is_empty() {
202 continue;
203 }
204 }
205
206 // Filter items if paths are provided
207 if let Some(filter_paths) = normalized_paths {
208 if !filter_paths.is_empty()
209 && !filter_paths.iter().any(|p| {
210 eval_key.starts_with(p.as_str()) || p.starts_with(eval_key.as_str())
211 })
212 {
213 continue;
214 }
215 }
216
217 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
218
219 // Try cache first
220 if let Some(_) = self.try_get_cached(eval_key, &eval_data_values, &missed_keys) {
221 continue;
222 }
223
224 // Cache miss - evaluate
225 if let Some(logic_id) = self.evaluations.get(eval_key) {
226 if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
227 let cleaned_val = clean_float_noise_scalar(val);
228 // Cache result
229 self.cache_result(eval_key, Value::Null, &eval_data_values);
230
231 if let Some(pointer_value) =
232 self.evaluated_schema.pointer_mut(&pointer_path)
233 {
234 *pointer_value = cleaned_val;
235 }
236 }
237 }
238 }
239 });
240
241 time_block!(" process batches", {
242 for batch in eval_batches.iter() {
243 if let Some(t) = token {
244 if t.is_cancelled() {
245 return Err("Cancelled".to_string());
246 }
247 }
248 // Skip empty batches
249 if batch.is_empty() {
250 continue;
251 }
252
253 // Check if we can skip this entire batch optimization
254 // If paths are provided, we can check if ANY item in batch matches ANY path
255 if let Some(filter_paths) = normalized_paths {
256 if !filter_paths.is_empty() {
257 let batch_has_match = batch.iter().any(|eval_key| {
258 filter_paths.iter().any(|p| {
259 eval_key.starts_with(p.as_str())
260 || p.starts_with(eval_key.as_str())
261 })
262 });
263 if !batch_has_match {
264 continue;
265 }
266 }
267 }
268
269 // No pre-checking cache - we'll check inside parallel execution
270 // This allows thread-safe cache access during parallel evaluation
271
272 // Parallel execution within batch (no dependencies between items)
273 // Use Mutex for thread-safe result collection
274 // Store both eval_key and result for cache storage
275 let eval_data_snapshot = self.eval_data.clone();
276
277 // Parallelize only if batch has multiple items (overhead not worth it for single item)
278
279 #[cfg(feature = "parallel")]
280 if batch.len() > 1000 {
281 let results: Mutex<Vec<(String, String, Value)>> =
282 Mutex::new(Vec::with_capacity(batch.len()));
283 batch.par_iter().for_each(|eval_key| {
284 // Filter individual items if paths are provided
285 if let Some(filter_paths) = normalized_paths {
286 if !filter_paths.is_empty()
287 && !filter_paths.iter().any(|p| {
288 eval_key.starts_with(p.as_str())
289 || p.starts_with(eval_key.as_str())
290 })
291 {
292 return;
293 }
294 }
295
296 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
297
298 // Try cache first (thread-safe)
299 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot, &missed_keys) {
300 return;
301 }
302
303 // Cache miss - evaluate
304 let is_table = self.table_metadata.contains_key(eval_key);
305
306 if is_table {
307 missed_keys.insert(pointer_path.clone());
308 // Evaluate table using sandboxed metadata (parallel-safe, immutable parent scope)
309 if let Ok(rows) = table_evaluate::evaluate_table(
310 self,
311 eval_key,
312 &eval_data_snapshot,
313 token
314 ) {
315 let value = Value::Array(rows);
316 // Cache result (thread-safe)
317 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
318 results.lock().unwrap().push((
319 eval_key.clone(),
320 pointer_path,
321 value,
322 ));
323 }
324 } else {
325 if let Some(logic_id) = self.evaluations.get(eval_key) {
326 // Evaluate directly with snapshot
327 if let Ok(val) =
328 self.engine.run(logic_id, eval_data_snapshot.data())
329 {
330 let cleaned_val = clean_float_noise_scalar(val);
331 // Cache result (thread-safe)
332 self.cache_result(
333 eval_key,
334 Value::Null,
335 &eval_data_snapshot,
336 );
337 results.lock().unwrap().push((
338 eval_key.clone(),
339 pointer_path,
340 cleaned_val,
341 ));
342 }
343 }
344 }
345 });
346
347 // Write all results back sequentially (already cleaned in parallel execution)
348 for (_eval_key, path, value) in results.into_inner().unwrap() {
349 let cleaned_value = value;
350
351 self.eval_data.set(&path, cleaned_value.clone());
352 // Also write to evaluated_schema
353 if let Some(schema_value) = self.evaluated_schema.pointer_mut(&path) {
354 *schema_value = cleaned_value;
355 }
356 }
357 continue;
358 }
359
360 // Sequential execution (single item or parallel feature disabled)
361 #[cfg(not(feature = "parallel"))]
362 let batch_items: &[String] = batch;
363
364 #[cfg(feature = "parallel")]
365 let batch_items: &[String] = if batch.len() > 1000 {
366 &batch[0..0]
367 } else {
368 batch
369 }; // Empty slice if already processed in parallel
370
371 for eval_key in batch_items {
372 if let Some(t) = token {
373 if t.is_cancelled() {
374 return Err("Cancelled".to_string());
375 }
376 }
377 // Filter individual items if paths are provided
378 if let Some(filter_paths) = normalized_paths {
379 if !filter_paths.is_empty()
380 && !filter_paths.iter().any(|p| {
381 eval_key.starts_with(p.as_str())
382 || p.starts_with(eval_key.as_str())
383 })
384 {
385 continue;
386 }
387 }
388
389 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
390
391 // Try cache first
392 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot, &missed_keys) {
393 continue;
394 }
395
396 // Cache miss - evaluate
397 let is_table = self.table_metadata.contains_key(eval_key);
398
399 if is_table {
400 missed_keys.insert(pointer_path.clone());
401 if let Ok(rows) =
402 table_evaluate::evaluate_table(self, eval_key, &eval_data_snapshot, token)
403 {
404 let value = Value::Array(rows);
405 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
406
407 self.eval_data.set(&pointer_path, value.clone());
408 if let Some(schema_value) =
409 self.evaluated_schema.pointer_mut(&pointer_path)
410 {
411 *schema_value = value;
412 }
413 }
414 } else {
415 if let Some(logic_id) = self.evaluations.get(eval_key) {
416 if let Ok(val) =
417 self.engine.run(logic_id, eval_data_snapshot.data())
418 {
419 let cleaned_val = clean_float_noise_scalar(val);
420 // Cache result
421 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
422
423 self.eval_data.set(&pointer_path, cleaned_val.clone());
424 if let Some(schema_value) =
425 self.evaluated_schema.pointer_mut(&pointer_path)
426 {
427 *schema_value = cleaned_val;
428 }
429 }
430 }
431 }
432 }
433 }
434 });
435
436 // Drop lock before calling evaluate_others
437 drop(_lock);
438
439 self.evaluate_others(paths, token, &missed_keys);
440
441 Ok(())
442 })
443 }
444
445 pub(crate) fn evaluate_others(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>, missed_keys: &dashmap::DashSet<String>) {
446 if let Some(t) = token {
447 if t.is_cancelled() {
448 return;
449 }
450 }
451 time_block!(" evaluate_others()", {
452 // Step 1: Evaluate "rules" and "others" categories with caching
453 // Rules are evaluated here so their values are available in evaluated_schema
454 let combined_count = self.rules_evaluations.len() + self.others_evaluations.len();
455 if combined_count > 0 {
456 time_block!(" evaluate rules+others", {
457 let eval_data_snapshot = self.eval_data.clone();
458
459 let normalized_paths: Option<Vec<String>> = paths.map(|p_list| {
460 p_list
461 .iter()
462 .flat_map(|p| {
463 let ptr = path_utils::dot_notation_to_schema_pointer(p);
464 // Also support version with /properties/ prefix for root match
465 let with_props = if ptr.starts_with("#/") {
466 format!("#/properties/{}", &ptr[2..])
467 } else {
468 ptr.clone()
469 };
470 vec![ptr, with_props]
471 })
472 .collect()
473 });
474
475 #[cfg(feature = "parallel")]
476 {
477 let combined_results: Mutex<Vec<(String, Value)>> =
478 Mutex::new(Vec::with_capacity(combined_count));
479
480 self.rules_evaluations
481 .par_iter()
482 .chain(self.others_evaluations.par_iter())
483 .for_each(|eval_key| {
484 // Filter items if paths are provided
485 if let Some(filter_paths) = normalized_paths.as_ref() {
486 if !filter_paths.is_empty()
487 && !filter_paths.iter().any(|p| {
488 eval_key.starts_with(p.as_str())
489 || p.starts_with(eval_key.as_str())
490 })
491 {
492 return;
493 }
494 }
495
496 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
497
498 // Try cache first (thread-safe)
499 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot, missed_keys)
500 {
501 return;
502 }
503
504 // Cache miss - evaluate
505 if let Some(logic_id) = self.evaluations.get(eval_key) {
506 if let Ok(val) =
507 self.engine.run(logic_id, eval_data_snapshot.data())
508 {
509 let cleaned_val = clean_float_noise_scalar(val);
510 // Cache result (thread-safe)
511 self.cache_result(
512 eval_key,
513 Value::Null,
514 &eval_data_snapshot,
515 );
516 combined_results
517 .lock()
518 .unwrap()
519 .push((pointer_path, cleaned_val));
520 }
521 }
522 });
523
524 // Write results to evaluated_schema
525 for (result_path, value) in combined_results.into_inner().unwrap() {
526 if let Some(pointer_value) =
527 self.evaluated_schema.pointer_mut(&result_path)
528 {
529 // Special handling for rules with $evaluation
530 // This includes both direct rules and array items: /rules/evaluation/0/$evaluation
531 if !result_path.starts_with("$")
532 && result_path.contains("/rules/")
533 && !result_path.ends_with("/value")
534 {
535 match pointer_value.as_object_mut() {
536 Some(pointer_obj) => {
537 pointer_obj.remove("$evaluation");
538 pointer_obj.insert("value".to_string(), value);
539 }
540 None => continue,
541 }
542 } else {
543 *pointer_value = value;
544 }
545 }
546 }
547 }
548
549 #[cfg(not(feature = "parallel"))]
550 {
551 // Sequential evaluation
552 let combined_evals: Vec<&String> = self
553 .rules_evaluations
554 .iter()
555 .chain(self.others_evaluations.iter())
556 .collect();
557
558 for eval_key in combined_evals {
559 if let Some(t) = token {
560 if t.is_cancelled() {
561 return;
562 }
563 }
564 // Filter items if paths are provided
565 if let Some(filter_paths) = normalized_paths.as_ref() {
566 if !filter_paths.is_empty()
567 && !filter_paths.iter().any(|p| {
568 eval_key.starts_with(p.as_str())
569 || p.starts_with(eval_key.as_str())
570 })
571 {
572 continue;
573 }
574 }
575
576 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
577
578 // Try cache first
579 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot, missed_keys) {
580 continue;
581 }
582
583 // Cache miss - evaluate
584 if let Some(logic_id) = self.evaluations.get(eval_key) {
585 if let Ok(val) =
586 self.engine.run(logic_id, eval_data_snapshot.data())
587 {
588 let cleaned_val = clean_float_noise_scalar(val);
589 // Cache result
590 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
591
592 if let Some(pointer_value) =
593 self.evaluated_schema.pointer_mut(&pointer_path)
594 {
595 if !pointer_path.starts_with("$")
596 && pointer_path.contains("/rules/")
597 && !pointer_path.ends_with("/value")
598 {
599 match pointer_value.as_object_mut() {
600 Some(pointer_obj) => {
601 pointer_obj.remove("$evaluation");
602 pointer_obj
603 .insert("value".to_string(), cleaned_val);
604 }
605 None => continue,
606 }
607 } else {
608 *pointer_value = cleaned_val;
609 }
610 }
611 }
612 }
613 }
614 }
615 });
616 }
617 });
618
619 // Step 2: Evaluate options URL templates (handles {variable} patterns)
620 time_block!(" evaluate_options_templates", {
621 self.evaluate_options_templates(paths);
622 });
623
624 // Step 3: Resolve layout logic (metadata injection, hidden propagation)
625 time_block!(" resolve_layout", {
626 let _ = self.resolve_layout(false);
627 });
628 }
629
630 /// Evaluate options URL templates (handles {variable} patterns)
631 fn evaluate_options_templates(&mut self, paths: Option<&[String]>) {
632 // Use pre-collected options templates from parsing (Arc clone is cheap)
633 let templates_to_eval = self.options_templates.clone();
634
635 // Evaluate each template
636 for (path, template_str, params_path) in templates_to_eval.iter() {
637 // Filter items if paths are provided
638 // 'path' here is the schema path to the field (dot notation or similar, need to check)
639 // It seems to be schema pointer based on usage in other methods
640 if let Some(filter_paths) = paths {
641 if !filter_paths.is_empty()
642 && !filter_paths
643 .iter()
644 .any(|p| path.starts_with(p.as_str()) || p.starts_with(path.as_str()))
645 {
646 continue;
647 }
648 }
649
650 if let Some(params) = self.evaluated_schema.pointer(¶ms_path) {
651 if let Ok(evaluated) = self.evaluate_template(&template_str, params) {
652 if let Some(target) = self.evaluated_schema.pointer_mut(&path) {
653 *target = Value::String(evaluated);
654 }
655 }
656 }
657 }
658 }
659
660 /// Evaluate a template string like "api/users/{id}" with params
661 fn evaluate_template(&self, template: &str, params: &Value) -> Result<String, String> {
662 let mut result = template.to_string();
663
664 // Simple template evaluation: replace {key} with params.key
665 if let Value::Object(params_map) = params {
666 for (key, value) in params_map {
667 let placeholder = format!("{{{}}}", key);
668 if let Some(str_val) = value.as_str() {
669 result = result.replace(&placeholder, str_val);
670 } else {
671 // Convert non-string values to strings
672 result = result.replace(&placeholder, &value.to_string());
673 }
674 }
675 }
676
677 Ok(result)
678 }
679}