json_eval_rs/jsoneval/evaluate.rs
1use super::JSONEval;
2use crate::jsoneval::json_parser;
3use crate::jsoneval::path_utils;
4use crate::jsoneval::table_evaluate;
5use crate::jsoneval::cancellation::CancellationToken;
6use crate::utils::clean_float_noise;
7use crate::time_block;
8
9use serde_json::Value;
10
11#[cfg(feature = "parallel")]
12use rayon::prelude::*;
13
14#[cfg(feature = "parallel")]
15use std::sync::Mutex;
16
17impl JSONEval {
18 /// Evaluate the schema with the given data and context.
19 ///
20 /// # Arguments
21 ///
22 /// * `data` - The data to evaluate.
23 /// * `context` - The context to evaluate.
24 ///
25 /// # Returns
26 ///
27 /// A `Result` indicating success or an error message.
28 pub fn evaluate(
29 &mut self,
30 data: &str,
31 context: Option<&str>,
32 paths: Option<&[String]>,
33 token: Option<&CancellationToken>,
34 ) -> Result<(), String> {
35 if let Some(t) = token {
36 if t.is_cancelled() {
37 return Err("Cancelled".to_string());
38 }
39 }
40 time_block!("evaluate() [total]", {
41 let context_provided = context.is_some();
42
43 // Use SIMD-accelerated JSON parsing
44 let data: Value = time_block!(" parse data", { json_parser::parse_json_str(data)? });
45 let context: Value = time_block!(" parse context", {
46 json_parser::parse_json_str(context.unwrap_or("{}"))?
47 });
48
49 self.data = data.clone();
50
51 // Collect top-level data keys to selectively purge cache
52 let changed_data_paths: Vec<String> = if let Some(obj) = data.as_object() {
53 obj.keys().map(|k| format!("/{}", k)).collect()
54 } else {
55 Vec::new()
56 };
57
58 // Replace data and context in existing eval_data
59 time_block!(" replace_data_and_context", {
60 self.eval_data.replace_data_and_context(data, context);
61 });
62
63 // Selectively purge cache entries that depend on changed top-level data keys
64 // This is more efficient than clearing entire cache
65 time_block!(" purge_cache", {
66 self.purge_cache_for_changed_data(&changed_data_paths);
67
68 // Also purge context-dependent cache if context was provided
69 if context_provided {
70 self.purge_cache_for_context_change();
71 }
72 });
73
74 // Call internal evaluate (uses existing data if not provided)
75 self.evaluate_internal(paths, token)
76 })
77 }
78
79 /// Internal evaluate that can be called when data is already set
80 /// This avoids double-locking and unnecessary data cloning for re-evaluation from evaluate_dependents
81 pub(crate) fn evaluate_internal(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) -> Result<(), String> {
82 if let Some(t) = token {
83 if t.is_cancelled() {
84 return Err("Cancelled".to_string());
85 }
86 }
87 time_block!(" evaluate_internal() [total]", {
88 // Acquire lock for synchronous execution
89 let _lock = self.eval_lock.lock().unwrap();
90
91 // Normalize paths to schema pointers for correct filtering
92 let normalized_paths_storage; // Keep alive
93 let normalized_paths = if let Some(p_list) = paths {
94 normalized_paths_storage = p_list
95 .iter()
96 .flat_map(|p| {
97 let normalized = if p.starts_with("#/") {
98 // Case 1: JSON Schema path (e.g. #/properties/foo) - keep as is
99 p.to_string()
100 } else if p.starts_with('/') {
101 // Case 2: Rust Pointer path (e.g. /properties/foo) - ensure # prefix
102 format!("#{}", p)
103 } else {
104 // Case 3: Dot notation (e.g. properties.foo) - replace dots with slashes and add prefix
105 format!("#/{}", p.replace('.', "/"))
106 };
107
108 vec![normalized]
109 })
110 .collect::<Vec<_>>();
111 Some(normalized_paths_storage.as_slice())
112 } else {
113 None
114 };
115
116 // Clone sorted_evaluations (Arc clone is cheap, then clone inner Vec)
117 let eval_batches: Vec<Vec<String>> = (*self.sorted_evaluations).clone();
118
119 // Process each batch - parallelize evaluations within each batch
120 // Batches are processed sequentially to maintain dependency order
121 // Process value evaluations (simple computed fields)
122 // These are independent of rule batches and should always run
123 let eval_data_values = self.eval_data.clone();
124 time_block!(" evaluate values", {
125 #[cfg(feature = "parallel")]
126 if self.value_evaluations.len() > 100 {
127 let value_results: Mutex<Vec<(String, Value)>> =
128 Mutex::new(Vec::with_capacity(self.value_evaluations.len()));
129
130 self.value_evaluations.par_iter().for_each(|eval_key| {
131 // Skip if has dependencies (will be handled in sorted batches)
132 if let Some(deps) = self.dependencies.get(eval_key) {
133 if !deps.is_empty() {
134 return;
135 }
136 }
137
138 // Filter items if paths are provided
139 if let Some(filter_paths) = normalized_paths {
140 if !filter_paths.is_empty()
141 && !filter_paths.iter().any(|p| {
142 eval_key.starts_with(p.as_str())
143 || p.starts_with(eval_key.as_str())
144 })
145 {
146 return;
147 }
148 }
149
150 // For value evaluations (e.g. /properties/foo/value), we want the value at that path
151 // The path in eval_key is like "#/properties/foo/value"
152 let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
153
154 // Try cache first (thread-safe)
155 if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
156 return;
157 }
158
159 // Cache miss - evaluate
160 if let Some(logic_id) = self.evaluations.get(eval_key) {
161 if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
162 let cleaned_val = clean_float_noise(val);
163 // Cache result (thread-safe)
164 self.cache_result(eval_key, Value::Null, &eval_data_values);
165 value_results
166 .lock()
167 .unwrap()
168 .push((pointer_path, cleaned_val));
169 }
170 }
171 });
172
173 // Write results to evaluated_schema
174 for (result_path, value) in value_results.into_inner().unwrap() {
175 if let Some(pointer_value) = self.evaluated_schema.pointer_mut(&result_path)
176 {
177 *pointer_value = value;
178 }
179 }
180 }
181
182 // Sequential execution for values (if not parallel or small count)
183 #[cfg(feature = "parallel")]
184 let value_eval_items = if self.value_evaluations.len() > 100 {
185 &self.value_evaluations[0..0]
186 } else {
187 &self.value_evaluations
188 };
189
190 #[cfg(not(feature = "parallel"))]
191 let value_eval_items = &self.value_evaluations;
192
193 for eval_key in value_eval_items.iter() {
194 if let Some(t) = token {
195 if t.is_cancelled() {
196 return Err("Cancelled".to_string());
197 }
198 }
199 // Skip if has dependencies (will be handled in sorted batches)
200 if let Some(deps) = self.dependencies.get(eval_key) {
201 if !deps.is_empty() {
202 continue;
203 }
204 }
205
206 // Filter items if paths are provided
207 if let Some(filter_paths) = normalized_paths {
208 if !filter_paths.is_empty()
209 && !filter_paths.iter().any(|p| {
210 eval_key.starts_with(p.as_str()) || p.starts_with(eval_key.as_str())
211 })
212 {
213 continue;
214 }
215 }
216
217 let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
218
219 // Try cache first
220 if let Some(_) = self.try_get_cached(eval_key, &eval_data_values) {
221 continue;
222 }
223
224 // Cache miss - evaluate
225 if let Some(logic_id) = self.evaluations.get(eval_key) {
226 if let Ok(val) = self.engine.run(logic_id, eval_data_values.data()) {
227 let cleaned_val = clean_float_noise(val);
228 // Cache result
229 self.cache_result(eval_key, Value::Null, &eval_data_values);
230
231 if let Some(pointer_value) =
232 self.evaluated_schema.pointer_mut(&pointer_path)
233 {
234 *pointer_value = cleaned_val;
235 }
236 }
237 }
238 }
239 });
240
241 time_block!(" process batches", {
242 for batch in eval_batches {
243 if let Some(t) = token {
244 if t.is_cancelled() {
245 return Err("Cancelled".to_string());
246 }
247 }
248 // Skip empty batches
249 if batch.is_empty() {
250 continue;
251 }
252
253 // Check if we can skip this entire batch optimization
254 // If paths are provided, we can check if ANY item in batch matches ANY path
255 if let Some(filter_paths) = normalized_paths {
256 if !filter_paths.is_empty() {
257 let batch_has_match = batch.iter().any(|eval_key| {
258 filter_paths.iter().any(|p| {
259 eval_key.starts_with(p.as_str())
260 || p.starts_with(eval_key.as_str())
261 })
262 });
263 if !batch_has_match {
264 continue;
265 }
266 }
267 }
268
269 // No pre-checking cache - we'll check inside parallel execution
270 // This allows thread-safe cache access during parallel evaluation
271
272 // Parallel execution within batch (no dependencies between items)
273 // Use Mutex for thread-safe result collection
274 // Store both eval_key and result for cache storage
275 let eval_data_snapshot = self.eval_data.clone();
276
277 // Parallelize only if batch has multiple items (overhead not worth it for single item)
278
279 #[cfg(feature = "parallel")]
280 if batch.len() > 1000 {
281 let results: Mutex<Vec<(String, String, Value)>> =
282 Mutex::new(Vec::with_capacity(batch.len()));
283 batch.par_iter().for_each(|eval_key| {
284 // Filter individual items if paths are provided
285 if let Some(filter_paths) = normalized_paths {
286 if !filter_paths.is_empty()
287 && !filter_paths.iter().any(|p| {
288 eval_key.starts_with(p.as_str())
289 || p.starts_with(eval_key.as_str())
290 })
291 {
292 return;
293 }
294 }
295
296 let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
297
298 // Try cache first (thread-safe)
299 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
300 return;
301 }
302
303 // Cache miss - evaluate
304 let is_table = self.table_metadata.contains_key(eval_key);
305
306 if is_table {
307 // Evaluate table using sandboxed metadata (parallel-safe, immutable parent scope)
308 if let Ok(rows) = table_evaluate::evaluate_table(
309 self,
310 eval_key,
311 &eval_data_snapshot,
312 token
313 ) {
314 let value = Value::Array(rows);
315 // Cache result (thread-safe)
316 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
317 results.lock().unwrap().push((
318 eval_key.clone(),
319 pointer_path,
320 value,
321 ));
322 }
323 } else {
324 if let Some(logic_id) = self.evaluations.get(eval_key) {
325 // Evaluate directly with snapshot
326 if let Ok(val) =
327 self.engine.run(logic_id, eval_data_snapshot.data())
328 {
329 let cleaned_val = clean_float_noise(val);
330 // Cache result (thread-safe)
331 self.cache_result(
332 eval_key,
333 Value::Null,
334 &eval_data_snapshot,
335 );
336 results.lock().unwrap().push((
337 eval_key.clone(),
338 pointer_path,
339 cleaned_val,
340 ));
341 }
342 }
343 }
344 });
345
346 // Write all results back sequentially (already cached in parallel execution)
347 for (_eval_key, path, value) in results.into_inner().unwrap() {
348 let cleaned_value = clean_float_noise(value);
349
350 self.eval_data.set(&path, cleaned_value.clone());
351 // Also write to evaluated_schema
352 if let Some(schema_value) = self.evaluated_schema.pointer_mut(&path) {
353 *schema_value = cleaned_value;
354 }
355 }
356 continue;
357 }
358
359 // Sequential execution (single item or parallel feature disabled)
360 #[cfg(not(feature = "parallel"))]
361 let batch_items = &batch;
362
363 #[cfg(feature = "parallel")]
364 let batch_items = if batch.len() > 1000 {
365 &batch[0..0]
366 } else {
367 &batch
368 }; // Empty slice if already processed in parallel
369
370 for eval_key in batch_items {
371 if let Some(t) = token {
372 if t.is_cancelled() {
373 return Err("Cancelled".to_string());
374 }
375 }
376 // Filter individual items if paths are provided
377 if let Some(filter_paths) = normalized_paths {
378 if !filter_paths.is_empty()
379 && !filter_paths.iter().any(|p| {
380 eval_key.starts_with(p.as_str())
381 || p.starts_with(eval_key.as_str())
382 })
383 {
384 continue;
385 }
386 }
387
388 let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
389
390 // Try cache first
391 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
392 continue;
393 }
394
395 // Cache miss - evaluate
396 let is_table = self.table_metadata.contains_key(eval_key);
397
398 if is_table {
399 if let Ok(rows) =
400 table_evaluate::evaluate_table(self, eval_key, &eval_data_snapshot, token)
401 {
402 let value = Value::Array(rows);
403 // Cache result
404 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
405
406 let cleaned_value = clean_float_noise(value);
407 self.eval_data.set(&pointer_path, cleaned_value.clone());
408 if let Some(schema_value) =
409 self.evaluated_schema.pointer_mut(&pointer_path)
410 {
411 *schema_value = cleaned_value;
412 }
413 }
414 } else {
415 if let Some(logic_id) = self.evaluations.get(eval_key) {
416 if let Ok(val) =
417 self.engine.run(logic_id, eval_data_snapshot.data())
418 {
419 let cleaned_val = clean_float_noise(val);
420 // Cache result
421 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
422
423 self.eval_data.set(&pointer_path, cleaned_val.clone());
424 if let Some(schema_value) =
425 self.evaluated_schema.pointer_mut(&pointer_path)
426 {
427 *schema_value = cleaned_val;
428 }
429 }
430 }
431 }
432 }
433 }
434 });
435
436 // Drop lock before calling evaluate_others
437 drop(_lock);
438
439 self.evaluate_others(paths, token);
440
441 Ok(())
442 })
443 }
444
445 pub(crate) fn evaluate_others(&mut self, paths: Option<&[String]>, token: Option<&CancellationToken>) {
446 if let Some(t) = token {
447 if t.is_cancelled() {
448 return;
449 }
450 }
451 time_block!(" evaluate_others()", {
452 // Step 1: Evaluate "rules" and "others" categories with caching
453 // Rules are evaluated here so their values are available in evaluated_schema
454 let combined_count = self.rules_evaluations.len() + self.others_evaluations.len();
455 if combined_count > 0 {
456 time_block!(" evaluate rules+others", {
457 let eval_data_snapshot = self.eval_data.clone();
458
459 let normalized_paths: Option<Vec<String>> = paths.map(|p_list| {
460 p_list
461 .iter()
462 .flat_map(|p| {
463 let ptr = path_utils::dot_notation_to_schema_pointer(p);
464 // Also support version with /properties/ prefix for root match
465 let with_props = if ptr.starts_with("#/") {
466 format!("#/properties/{}", &ptr[2..])
467 } else {
468 ptr.clone()
469 };
470 vec![ptr, with_props]
471 })
472 .collect()
473 });
474
475 #[cfg(feature = "parallel")]
476 {
477 let combined_results: Mutex<Vec<(String, Value)>> =
478 Mutex::new(Vec::with_capacity(combined_count));
479
480 self.rules_evaluations
481 .par_iter()
482 .chain(self.others_evaluations.par_iter())
483 .for_each(|eval_key| {
484 // Filter items if paths are provided
485 if let Some(filter_paths) = normalized_paths.as_ref() {
486 if !filter_paths.is_empty()
487 && !filter_paths.iter().any(|p| {
488 eval_key.starts_with(p.as_str())
489 || p.starts_with(eval_key.as_str())
490 })
491 {
492 return;
493 }
494 }
495
496 let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
497
498 // Try cache first (thread-safe)
499 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot)
500 {
501 return;
502 }
503
504 // Cache miss - evaluate
505 if let Some(logic_id) = self.evaluations.get(eval_key) {
506 if let Ok(val) =
507 self.engine.run(logic_id, eval_data_snapshot.data())
508 {
509 let cleaned_val = clean_float_noise(val);
510 // Cache result (thread-safe)
511 self.cache_result(
512 eval_key,
513 Value::Null,
514 &eval_data_snapshot,
515 );
516 combined_results
517 .lock()
518 .unwrap()
519 .push((pointer_path, cleaned_val));
520 }
521 }
522 });
523
524 // Write results to evaluated_schema
525 for (result_path, value) in combined_results.into_inner().unwrap() {
526 if let Some(pointer_value) =
527 self.evaluated_schema.pointer_mut(&result_path)
528 {
529 // Special handling for rules with $evaluation
530 // This includes both direct rules and array items: /rules/evaluation/0/$evaluation
531 if !result_path.starts_with("$")
532 && result_path.contains("/rules/")
533 && !result_path.ends_with("/value")
534 {
535 match pointer_value.as_object_mut() {
536 Some(pointer_obj) => {
537 pointer_obj.remove("$evaluation");
538 pointer_obj.insert("value".to_string(), value);
539 }
540 None => continue,
541 }
542 } else {
543 *pointer_value = value;
544 }
545 }
546 }
547 }
548
549 #[cfg(not(feature = "parallel"))]
550 {
551 // Sequential evaluation
552 let combined_evals: Vec<&String> = self
553 .rules_evaluations
554 .iter()
555 .chain(self.others_evaluations.iter())
556 .collect();
557
558 for eval_key in combined_evals {
559 if let Some(t) = token {
560 if t.is_cancelled() {
561 return;
562 }
563 }
564 // Filter items if paths are provided
565 if let Some(filter_paths) = normalized_paths.as_ref() {
566 if !filter_paths.is_empty()
567 && !filter_paths.iter().any(|p| {
568 eval_key.starts_with(p.as_str())
569 || p.starts_with(eval_key.as_str())
570 })
571 {
572 continue;
573 }
574 }
575
576 let pointer_path = path_utils::normalize_to_json_pointer(eval_key);
577
578 // Try cache first
579 if let Some(_) = self.try_get_cached(eval_key, &eval_data_snapshot) {
580 continue;
581 }
582
583 // Cache miss - evaluate
584 if let Some(logic_id) = self.evaluations.get(eval_key) {
585 if let Ok(val) =
586 self.engine.run(logic_id, eval_data_snapshot.data())
587 {
588 let cleaned_val = clean_float_noise(val);
589 // Cache result
590 self.cache_result(eval_key, Value::Null, &eval_data_snapshot);
591
592 if let Some(pointer_value) =
593 self.evaluated_schema.pointer_mut(&pointer_path)
594 {
595 if !pointer_path.starts_with("$")
596 && pointer_path.contains("/rules/")
597 && !pointer_path.ends_with("/value")
598 {
599 match pointer_value.as_object_mut() {
600 Some(pointer_obj) => {
601 pointer_obj.remove("$evaluation");
602 pointer_obj
603 .insert("value".to_string(), cleaned_val);
604 }
605 None => continue,
606 }
607 } else {
608 *pointer_value = cleaned_val;
609 }
610 }
611 }
612 }
613 }
614 }
615 });
616 }
617 });
618
619 // Step 2: Evaluate options URL templates (handles {variable} patterns)
620 time_block!(" evaluate_options_templates", {
621 self.evaluate_options_templates(paths);
622 });
623
624 // Step 3: Resolve layout logic (metadata injection, hidden propagation)
625 time_block!(" resolve_layout", {
626 let _ = self.resolve_layout(false);
627 });
628 }
629
630 /// Evaluate options URL templates (handles {variable} patterns)
631 fn evaluate_options_templates(&mut self, paths: Option<&[String]>) {
632 // Use pre-collected options templates from parsing (Arc clone is cheap)
633 let templates_to_eval = self.options_templates.clone();
634
635 // Evaluate each template
636 for (path, template_str, params_path) in templates_to_eval.iter() {
637 // Filter items if paths are provided
638 // 'path' here is the schema path to the field (dot notation or similar, need to check)
639 // It seems to be schema pointer based on usage in other methods
640 if let Some(filter_paths) = paths {
641 if !filter_paths.is_empty()
642 && !filter_paths
643 .iter()
644 .any(|p| path.starts_with(p.as_str()) || p.starts_with(path.as_str()))
645 {
646 continue;
647 }
648 }
649
650 if let Some(params) = self.evaluated_schema.pointer(¶ms_path) {
651 if let Ok(evaluated) = self.evaluate_template(&template_str, params) {
652 if let Some(target) = self.evaluated_schema.pointer_mut(&path) {
653 *target = Value::String(evaluated);
654 }
655 }
656 }
657 }
658 }
659
660 /// Evaluate a template string like "api/users/{id}" with params
661 fn evaluate_template(&self, template: &str, params: &Value) -> Result<String, String> {
662 let mut result = template.to_string();
663
664 // Simple template evaluation: replace {key} with params.key
665 if let Value::Object(params_map) = params {
666 for (key, value) in params_map {
667 let placeholder = format!("{{{}}}", key);
668 if let Some(str_val) = value.as_str() {
669 result = result.replace(&placeholder, str_val);
670 } else {
671 // Convert non-string values to strings
672 result = result.replace(&placeholder, &value.to_string());
673 }
674 }
675 }
676
677 Ok(result)
678 }
679}