json_eval_rs/jsoneval/evaluate.rs
1use super::JSONEval;
2use crate::jsoneval::json_parser;
3use crate::jsoneval::path_utils;
4use crate::jsoneval::table_evaluate;
5use crate::jsoneval::cancellation::CancellationToken;
6use crate::utils::clean_float_noise_scalar;
7use crate::time_block;
8
9use serde_json::Value;
10
11#[cfg(feature = "parallel")]
12use rayon::prelude::*;
13
14#[cfg(feature = "parallel")]
15use std::sync::Mutex;
16
17impl JSONEval {
18 /// Evaluate the schema with the given data and context.
19 ///
20 /// # Arguments
21 ///
22 /// * `data` - The data to evaluate.
23 /// * `context` - The context to evaluate.
24 ///
25 /// # Returns
26 ///
27 /// A `Result` indicating success or an error message.
28 pub fn evaluate(
29 &mut self,
30 data: &str,
31 context: Option<&str>,
32 paths: Option<&[String]>,
33 token: Option<&CancellationToken>,
34 ) -> Result<(), String> {
35 if let Some(t) = token {
36 if t.is_cancelled() {
37 return Err("Cancelled".to_string());
38 }
39 }
40 time_block!("evaluate() [total]", {
41 let context_provided = context.is_some();
42
43 // Use SIMD-accelerated JSON parsing
44 let data: Value = time_block!(" parse data", { json_parser::parse_json_str(data)? });
45 let context: Value = time_block!(" parse context", {
46 json_parser::parse_json_str(context.unwrap_or("{}"))?
47 });
48
49 // Collect top-level data keys to selectively purge cache (before move)
50 let changed_data_paths: Vec<String> = if let Some(obj) = data.as_object() {
51 obj.keys().map(|k| format!("/{}", k)).collect()
52 } else {
53 Vec::new()
54 };
55
56 // Store data and replace in eval_data (clone once instead of twice)
57 self.data = data.clone();
58 time_block!(" replace_data_and_context", {
59 self.eval_data.replace_data_and_context(data, context);
60 });
61
62 // Selectively purge cache entries that depend on changed top-level data keys
63 // This is more efficient than clearing entire cache
64 time_block!(" purge_cache", {
65 self.purge_cache_for_changed_data(&changed_data_paths);
66
67 // Also purge context-dependent cache if context was provided
68 if context_provided {
69 self.purge_cache_for_context_change();
70 }
71 });
72
73 // Call internal evaluate (uses existing data if not provided)
74 self.evaluate_internal(paths, token)
75 })
76 }
77
78 /// Internal evaluate that can be called when data is already set
79 /// This avoids double-locking and unnecessary data cloning for re-evaluation from evaluate_dependents
80 pub(crate) fn evaluate_internal(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) -> Result<(), String> {
81 if let Some(t) = token {
82 if t.is_cancelled() {
83 return Err("Cancelled".to_string());
84 }
85 }
86 time_block!(" evaluate_internal() [total]", {
87 // Acquire lock for synchronous execution
88 let _lock = self.eval_lock.lock().unwrap();
89
90 // Normalize paths to schema pointers for correct filtering
91 let normalized_paths_storage; // Keep alive
92 let normalized_paths = if let Some(p_list) = paths {
93 normalized_paths_storage = p_list
94 .iter()
95 .flat_map(|p| {
96 let normalized = if p.starts_with("#/") {
97 // Case 1: JSON Schema path (e.g. #/properties/foo) - keep as is
98 p.to_string()
99 } else if p.starts_with('/') {
100 // Case 2: Rust Pointer path (e.g. /properties/foo) - ensure # prefix
101 format!("#{}", p)
102 } else {
103 // Case 3: Dot notation (e.g. properties.foo) - replace dots with slashes and add prefix
104 format!("#/{}", p.replace('.', "/"))
105 };
106
107 vec![normalized]
108 })
109 .collect::<Vec<_>>();
110 Some(normalized_paths_storage.as_slice())
111 } else {
112 None
113 };
114
115 // Borrow sorted_evaluations via Arc (avoid deep-cloning Vec<Vec<String>>)
116 let eval_batches = self.sorted_evaluations.clone();
117
118 // Process each batch - parallelize evaluations within each batch
119 // Batches are processed sequentially to maintain dependency order
120 // Process value evaluations (simple computed fields)
121 // These are independent of rule batches and should always run
122 let eval_data_values = self.eval_data.clone();
123 time_block!(" evaluate values", {
124 #[cfg(feature = "parallel")]
125 if self.value_evaluations.len() > 100 {
126 let value_results: Mutex<Vec<(String, Value)>> =
127 Mutex::new(Vec::with_capacity(self.value_evaluations.len()));
128
129 self.value_evaluations.par_iter().for_each(|eval_key| {
130 // Skip if has dependencies (will be handled in sorted batches)
131 if let Some(deps) = self.dependencies.get(eval_key) {
132 if !deps.is_empty() {
133 return;
134 }
135 }
136
137 // Filter items if paths are provided
138 if let Some(filter_paths) = normalized_paths {
139 if !filter_paths.is_empty()
140 && !filter_paths.iter().any(|p| {
141 eval_key.starts_with(p.as_str())
142 || p.starts_with(eval_key.as_str())
143 })
144 {
145 return;
146 }
147 }
148
149 // For value evaluations (e.g. /properties/foo/value), we want the value at that path
150 // The path in eval_key is like "#/properties/foo/value"
151 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
152
153 // Try cache first (thread-safe)
154 if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
155 return;
156 }
157
158 // Cache miss - evaluate
159 if let Some(logic_id) = self.evaluations.get(eval_key) {
160 if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
161 let cleaned_val = clean_float_noise_scalar(val);
162 // Cache result (thread-safe)
163 self.cache_result(eval_key, Value::Null, &eval_data_values);
164 value_results
165 .lock()
166 .unwrap()
167 .push((pointer_path, cleaned_val));
168 }
169 }
170 });
171
172 // Write results to evaluated_schema
173 for (result_path, value) in value_results.into_inner().unwrap() {
174 if let Some(pointer_value) = self.evaluated_schema.pointer_mut(&result_path)
175 {
176 *pointer_value = value;
177 }
178 }
179 }
180
181 // Sequential execution for values (if not parallel or small count)
182 #[cfg(feature = "parallel")]
183 let value_eval_items = if self.value_evaluations.len() > 100 {
184 &self.value_evaluations[0..0]
185 } else {
186 &self.value_evaluations
187 };
188
189 #[cfg(not(feature = "parallel"))]
190 let value_eval_items = &self.value_evaluations;
191
192 for eval_key in value_eval_items.iter() {
193 if let Some(t) = token {
194 if t.is_cancelled() {
195 return Err("Cancelled".to_string());
196 }
197 }
198 // Skip if has dependencies (will be handled in sorted batches)
199 if let Some(deps) = self.dependencies.get(eval_key) {
200 if !deps.is_empty() {
201 continue;
202 }
203 }
204
205 // Filter items if paths are provided
206 if let Some(filter_paths) = normalized_paths {
207 if !filter_paths.is_empty()
208 && !filter_paths.iter().any(|p| {
209 eval_key.starts_with(p.as_str()) || p.starts_with(eval_key.as_str())
210 })
211 {
212 continue;
213 }
214 }
215
216 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
217
218 // Try cache first
219 if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
220 continue;
221 }
222
223 // Cache miss - evaluate
224 if let Some(logic_id) = self.evaluations.get(eval_key) {
225 if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
226 let cleaned_val = clean_float_noise_scalar(val);
227 // Cache result
228 self.cache_result(eval_key, Value::Null, &eval_data_values);
229
230 if let Some(pointer_value) =
231 self.evaluated_schema.pointer_mut(&pointer_path)
232 {
233 *pointer_value = cleaned_val;
234 }
235 }
236 }
237 }
238 });
239
240 time_block!(" process batches", {
241 for batch in eval_batches.iter() {
242 if let Some(t) = token {
243 if t.is_cancelled() {
244 return Err("Cancelled".to_string());
245 }
246 }
247 // Skip empty batches
248 if batch.is_empty() {
249 continue;
250 }
251
252 // Check if we can skip this entire batch optimization
253 // If paths are provided, we can check if ANY item in batch matches ANY path
254 if let Some(filter_paths) = normalized_paths {
255 if !filter_paths.is_empty() {
256 let batch_has_match = batch.iter().any(|eval_key| {
257 filter_paths.iter().any(|p| {
258 eval_key.starts_with(p.as_str())
259 || p.starts_with(eval_key.as_str())
260 })
261 });
262 if !batch_has_match {
263 continue;
264 }
265 }
266 }
267
268 // No pre-checking cache - we'll check inside parallel execution
269 // This allows thread-safe cache access during parallel evaluation
270
271 // Parallel execution within batch (no dependencies between items)
272 // Use Mutex for thread-safe result collection
273 // Store both eval_key and result for cache storage
274 let eval_data_snapshot = self.eval_data.clone();
275
276 // Parallelize only if batch has multiple items (overhead not worth it for single item)
277
278 #[cfg(feature = "parallel")]
279 if batch.len() > 1000 {
280 let results: Mutex<Vec<(String, String, Value)>> =
281 Mutex::new(Vec::with_capacity(batch.len()));
282 batch.par_iter().for_each(|eval_key| {
283 // Filter individual items if paths are provided
284 if let Some(filter_paths) = normalized_paths {
285 if !filter_paths.is_empty()
286 && !filter_paths.iter().any(|p| {
287 eval_key.starts_with(p.as_str())
288 || p.starts_with(eval_key.as_str())
289 })
290 {
291 return;
292 }
293 }
294
295 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
296
297 // Try cache first (thread-safe)
298 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
299 return;
300 }
301
302 // Cache miss - evaluate
303 let is_table = self.table_metadata.contains_key(eval_key);
304
305 if is_table {
306 // Evaluate table using sandboxed metadata (parallel-safe, immutable parent scope)
307 if let Ok(rows) = table_evaluate::evaluate_table(
308 self,
309 eval_key,
310 &eval_data_snapshot,
311 token
312 ) {
313 let value = Value::Array(rows);
314 // Cache result (thread-safe)
315 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
316 results.lock().unwrap().push((
317 eval_key.clone(),
318 pointer_path,
319 value,
320 ));
321 }
322 } else {
323 if let Some(logic_id) = self.evaluations.get(eval_key) {
324 // Evaluate directly with snapshot
325 if let Ok(val) =
326 self.engine.run(logic_id, eval_data_snapshot.data())
327 {
328 let cleaned_val = clean_float_noise_scalar(val);
329 // Cache result (thread-safe)
330 self.cache_result(
331 eval_key,
332 Value::Null,
333 &eval_data_snapshot,
334 );
335 results.lock().unwrap().push((
336 eval_key.clone(),
337 pointer_path,
338 cleaned_val,
339 ));
340 }
341 }
342 }
343 });
344
345 // Write all results back sequentially (already cleaned in parallel execution)
346 for (_eval_key, path, value) in results.into_inner().unwrap() {
347 let cleaned_value = value;
348
349 self.eval_data.set(&path, cleaned_value.clone());
350 // Also write to evaluated_schema
351 if let Some(schema_value) = self.evaluated_schema.pointer_mut(&path) {
352 *schema_value = cleaned_value;
353 }
354 }
355 continue;
356 }
357
358 // Sequential execution (single item or parallel feature disabled)
359 #[cfg(not(feature = "parallel"))]
360 let batch_items: &[String] = batch;
361
362 #[cfg(feature = "parallel")]
363 let batch_items: &[String] = if batch.len() > 1000 {
364 &batch[0..0]
365 } else {
366 batch
367 }; // Empty slice if already processed in parallel
368
369 for eval_key in batch_items {
370 if let Some(t) = token {
371 if t.is_cancelled() {
372 return Err("Cancelled".to_string());
373 }
374 }
375 // Filter individual items if paths are provided
376 if let Some(filter_paths) = normalized_paths {
377 if !filter_paths.is_empty()
378 && !filter_paths.iter().any(|p| {
379 eval_key.starts_with(p.as_str())
380 || p.starts_with(eval_key.as_str())
381 })
382 {
383 continue;
384 }
385 }
386
387 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
388
389 // Try cache first
390 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
391 continue;
392 }
393
394 // Cache miss - evaluate
395 let is_table = self.table_metadata.contains_key(eval_key);
396
397 if is_table {
398 if let Ok(rows) =
399 table_evaluate::evaluate_table(self, eval_key, &eval_data_snapshot, token)
400 {
401 let value = Value::Array(rows);
402 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
403
404 self.eval_data.set(&pointer_path, value.clone());
405 if let Some(schema_value) =
406 self.evaluated_schema.pointer_mut(&pointer_path)
407 {
408 *schema_value = value;
409 }
410 }
411 } else {
412 if let Some(logic_id) = self.evaluations.get(eval_key) {
413 if let Ok(val) =
414 self.engine.run(logic_id, eval_data_snapshot.data())
415 {
416 let cleaned_val = clean_float_noise_scalar(val);
417 // Cache result
418 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
419
420 self.eval_data.set(&pointer_path, cleaned_val.clone());
421 if let Some(schema_value) =
422 self.evaluated_schema.pointer_mut(&pointer_path)
423 {
424 *schema_value = cleaned_val;
425 }
426 }
427 }
428 }
429 }
430 }
431 });
432
433 // Drop lock before calling evaluate_others
434 drop(_lock);
435
436 self.evaluate_others(paths, token);
437
438 Ok(())
439 })
440 }
441
442 pub(crate) fn evaluate_others(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) {
443 if let Some(t) = token {
444 if t.is_cancelled() {
445 return;
446 }
447 }
448 time_block!(" evaluate_others()", {
449 // Step 1: Evaluate "rules" and "others" categories with caching
450 // Rules are evaluated here so their values are available in evaluated_schema
451 let combined_count = self.rules_evaluations.len() + self.others_evaluations.len();
452 if combined_count > 0 {
453 time_block!(" evaluate rules+others", {
454 let eval_data_snapshot = self.eval_data.clone();
455
456 let normalized_paths: Option<Vec<String>> = paths.map(|p_list| {
457 p_list
458 .iter()
459 .flat_map(|p| {
460 let ptr = path_utils::dot_notation_to_schema_pointer(p);
461 // Also support version with /properties/ prefix for root match
462 let with_props = if ptr.starts_with("#/") {
463 format!("#/properties/{}", &ptr[2..])
464 } else {
465 ptr.clone()
466 };
467 vec![ptr, with_props]
468 })
469 .collect()
470 });
471
472 #[cfg(feature = "parallel")]
473 {
474 let combined_results: Mutex<Vec<(String, Value)>> =
475 Mutex::new(Vec::with_capacity(combined_count));
476
477 self.rules_evaluations
478 .par_iter()
479 .chain(self.others_evaluations.par_iter())
480 .for_each(|eval_key| {
481 // Filter items if paths are provided
482 if let Some(filter_paths) = normalized_paths.as_ref() {
483 if !filter_paths.is_empty()
484 && !filter_paths.iter().any(|p| {
485 eval_key.starts_with(p.as_str())
486 || p.starts_with(eval_key.as_str())
487 })
488 {
489 return;
490 }
491 }
492
493 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
494
495 // Try cache first (thread-safe)
496 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot)
497 {
498 return;
499 }
500
501 // Cache miss - evaluate
502 if let Some(logic_id) = self.evaluations.get(eval_key) {
503 if let Ok(val) =
504 self.engine.run(logic_id, eval_data_snapshot.data())
505 {
506 let cleaned_val = clean_float_noise_scalar(val);
507 // Cache result (thread-safe)
508 self.cache_result(
509 eval_key,
510 Value::Null,
511 &eval_data_snapshot,
512 );
513 combined_results
514 .lock()
515 .unwrap()
516 .push((pointer_path, cleaned_val));
517 }
518 }
519 });
520
521 // Write results to evaluated_schema
522 for (result_path, value) in combined_results.into_inner().unwrap() {
523 if let Some(pointer_value) =
524 self.evaluated_schema.pointer_mut(&result_path)
525 {
526 // Special handling for rules with $evaluation
527 // This includes both direct rules and array items: /rules/evaluation/0/$evaluation
528 if !result_path.starts_with("$")
529 && result_path.contains("/rules/")
530 && !result_path.ends_with("/value")
531 {
532 match pointer_value.as_object_mut() {
533 Some(pointer_obj) => {
534 pointer_obj.remove("$evaluation");
535 pointer_obj.insert("value".to_string(), value);
536 }
537 None => continue,
538 }
539 } else {
540 *pointer_value = value;
541 }
542 }
543 }
544 }
545
546 #[cfg(not(feature = "parallel"))]
547 {
548 // Sequential evaluation
549 let combined_evals: Vec<&String> = self
550 .rules_evaluations
551 .iter()
552 .chain(self.others_evaluations.iter())
553 .collect();
554
555 for eval_key in combined_evals {
556 if let Some(t) = token {
557 if t.is_cancelled() {
558 return;
559 }
560 }
561 // Filter items if paths are provided
562 if let Some(filter_paths) = normalized_paths.as_ref() {
563 if !filter_paths.is_empty()
564 && !filter_paths.iter().any(|p| {
565 eval_key.starts_with(p.as_str())
566 || p.starts_with(eval_key.as_str())
567 })
568 {
569 continue;
570 }
571 }
572
573 let pointer_path = path_utils::normalize_to_json_pointer(eval_key).into_owned();
574
575 // Try cache first
576 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
577 continue;
578 }
579
580 // Cache miss - evaluate
581 if let Some(logic_id) = self.evaluations.get(eval_key) {
582 if let Ok(val) =
583 self.engine.run(logic_id, eval_data_snapshot.data())
584 {
585 let cleaned_val = clean_float_noise_scalar(val);
586 // Cache result
587 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
588
589 if let Some(pointer_value) =
590 self.evaluated_schema.pointer_mut(&pointer_path)
591 {
592 if !pointer_path.starts_with("$")
593 && pointer_path.contains("/rules/")
594 && !pointer_path.ends_with("/value")
595 {
596 match pointer_value.as_object_mut() {
597 Some(pointer_obj) => {
598 pointer_obj.remove("$evaluation");
599 pointer_obj
600 .insert("value".to_string(), cleaned_val);
601 }
602 None => continue,
603 }
604 } else {
605 *pointer_value = cleaned_val;
606 }
607 }
608 }
609 }
610 }
611 }
612 });
613 }
614 });
615
616 // Step 2: Evaluate options URL templates (handles {variable} patterns)
617 time_block!(" evaluate_options_templates", {
618 self.evaluate_options_templates(paths);
619 });
620
621 // Step 3: Resolve layout logic (metadata injection, hidden propagation)
622 time_block!(" resolve_layout", {
623 let _ = self.resolve_layout(false);
624 });
625 }
626
627 /// Evaluate options URL templates (handles {variable} patterns)
628 fn evaluate_options_templates(&mut self, paths: Option<&[String]>) {
629 // Use pre-collected options templates from parsing (Arc clone is cheap)
630 let templates_to_eval = self.options_templates.clone();
631
632 // Evaluate each template
633 for (path, template_str, params_path) in templates_to_eval.iter() {
634 // Filter items if paths are provided
635 // 'path' here is the schema path to the field (dot notation or similar, need to check)
636 // It seems to be schema pointer based on usage in other methods
637 if let Some(filter_paths) = paths {
638 if !filter_paths.is_empty()
639 && !filter_paths
640 .iter()
641 .any(|p| path.starts_with(p.as_str()) || p.starts_with(path.as_str()))
642 {
643 continue;
644 }
645 }
646
647 if let Some(params) = self.evaluated_schema.pointer(¶ms_path) {
648 if let Ok(evaluated) = self.evaluate_template(&template_str, params) {
649 if let Some(target) = self.evaluated_schema.pointer_mut(&path) {
650 *target = Value::String(evaluated);
651 }
652 }
653 }
654 }
655 }
656
657 /// Evaluate a template string like "api/users/{id}" with params
658 fn evaluate_template(&self, template: &str, params: &Value) -> Result<String, String> {
659 let mut result = template.to_string();
660
661 // Simple template evaluation: replace {key} with params.key
662 if let Value::Object(params_map) = params {
663 for (key, value) in params_map {
664 let placeholder = format!("{{{}}}", key);
665 if let Some(str_val) = value.as_str() {
666 result = result.replace(&placeholder, str_val);
667 } else {
668 // Convert non-string values to strings
669 result = result.replace(&placeholder, &value.to_string());
670 }
671 }
672 }
673
674 Ok(result)
675 }
676}