json_eval_rs/table_evaluate.rs
1use crate::eval_data::EvalData;
2use crate::table_metadata::RowMetadata;
3use crate::{path_utils, JSONEval};
4use serde_json::{Map, Value};
5use std::mem;
6
7/// Sandboxed table evaluation for safe parallel execution
8///
9/// All heavy operations (dependency analysis, forward reference checks) are done at parse time.
10/// This function creates an isolated scope to prevent interference between parallel table evaluations.
11///
12/// # Parallel Safety
13///
14/// This function is designed for safe parallel execution:
15/// - Takes `scope_data` as an immutable reference (read-only parent scope)
16/// - Creates an isolated sandbox (clone) for all table-specific mutations
17/// - All temporary variables (`$iteration`, `$threshold`, column vars) exist only in the sandbox
18/// - The parent `scope_data` remains unchanged, preventing race conditions
19/// - Multiple tables can be evaluated concurrently without interference
20///
21/// # Mutation Safety
22///
23/// **ALL data mutations go through EvalData methods:**
24/// - `sandbox.set()` - sets field values with version tracking
25/// - `sandbox.push_to_array()` - appends to arrays with version tracking
26/// - `sandbox.get_table_row_mut()` - gets mutable row references (followed by mark_modified)
27/// - `sandbox.mark_modified()` - explicitly marks paths as modified
28///
29/// This ensures proper version tracking and mutation safety throughout evaluation.
30///
31/// # Sandboxing Strategy
32///
33/// 1. Clone `scope_data` to create an isolated sandbox at the start
34/// 2. All evaluations and mutations happen within the sandbox via EvalData methods
35/// 3. Extract the final table array from the sandbox
36/// 4. Sandbox is dropped, discarding all temporary state
37/// 5. Parent scope remains pristine and can be safely shared across threads
38pub fn evaluate_table(
39 lib: &JSONEval, // Changed to immutable - parallel-safe, only reads metadata and calls engine
40 eval_key: &str,
41 scope_data: &EvalData, // Now immutable - we read from parent scope
42) -> Result<Vec<Value>, String> {
43 // Clone metadata (cheap since it uses Arc internally)
44 let metadata = lib
45 .table_metadata
46 .get(eval_key)
47 .ok_or_else(|| format!("Table metadata not found for {}", eval_key))?
48 .clone();
49
50 // Pre-compute table path once using JSON pointer format
51 let table_pointer_path = path_utils::normalize_to_json_pointer(eval_key);
52
53 // ==========================================
54 // CREATE SANDBOXED SCOPE (thread-safe isolation)
55 // ==========================================
56 // Clone scope_data to create an isolated sandbox for this table evaluation
57 // This prevents parallel table evaluations from interfering with each other
58 let mut sandbox = scope_data.clone();
59
60 // ==========================================
61 // PHASE 0: Evaluate $datas FIRST (before skip/clear)
62 // ==========================================
63 // Capture existing table value and track if dependencies change
64 let existing_table_value = sandbox.get(&table_pointer_path).cloned();
65
66 // Use empty internal context for $data evaluation
67 let empty_context = Value::Object(Map::new());
68 for (name, logic, literal) in metadata.data_plans.iter() {
69 let value = match logic {
70 Some(logic_id) => {
71 match lib
72 .engine
73 .run_with_context(logic_id, sandbox.data(), &empty_context)
74 {
75 Ok(val) => val,
76 Err(_) => literal
77 .as_ref()
78 .map(|arc_val| Value::clone(arc_val))
79 .unwrap_or(Value::Null),
80 }
81 }
82 None => literal
83 .as_ref()
84 .map(|arc_val| Value::clone(arc_val))
85 .unwrap_or(Value::Null),
86 };
87
88 sandbox.set(name.as_ref(), value);
89 }
90
91 // ==========================================
92 // PHASE 1: Evaluate $skip - if true, return empty immediately
93 // ==========================================
94 let mut should_skip = metadata.skip_literal;
95 if !should_skip {
96 if let Some(logic_id) = metadata.skip_logic {
97 let val = lib
98 .engine
99 .run_with_context(&logic_id, sandbox.data(), &empty_context)?;
100 should_skip = val.as_bool().unwrap_or(false);
101 }
102 }
103
104 // ==========================================
105 // PHASE 2: Check dependencies before evaluation
106 // ==========================================
107 // Skip evaluation if required dependencies (non-$params, non-$ prefixed) are null/empty
108 let mut requirement_not_filled = false;
109 if let Some(deps) = lib.dependencies.get(eval_key) {
110 for dep in deps.iter() {
111 // Skip $params and any dependency starting with $
112 if dep.contains("$params")
113 || (!dep.contains("$context") && (dep.starts_with("/$") || dep.starts_with("$")))
114 {
115 continue;
116 }
117
118 // Check if this dependency is null or empty
119 if let Some(dep_value) = sandbox.get_without_properties(dep) {
120 let is_empty = match dep_value {
121 Value::Null => true,
122 Value::String(s) => s.is_empty(),
123 Value::Array(arr) => arr.is_empty(),
124 Value::Object(obj) => obj.is_empty(),
125 _ => false,
126 };
127
128 if is_empty {
129 // Check if the field is required in the schema before skipping
130 let is_field_required = check_field_required(&lib.evaluated_schema, dep);
131
132 if is_field_required {
133 requirement_not_filled = true;
134 break;
135 }
136 // If field is not required (optional), continue without skipping
137 }
138 } else {
139 // Dependency doesn't exist
140 // Check if the field is required in the schema before skipping
141 let is_field_required = check_field_required(&lib.evaluated_schema, dep);
142
143 if is_field_required {
144 requirement_not_filled = true;
145 break;
146 }
147 }
148 }
149 }
150 // println!("requirement_not_filled: {}", requirement_not_filled);
151
152 // ==========================================
153 // PHASE 3: Evaluate $clear - if true, ensure table is empty
154 // ==========================================
155 let mut should_clear = metadata.clear_literal;
156 if !should_clear {
157 if let Some(logic_id) = metadata.clear_logic {
158 let val = lib
159 .engine
160 .run_with_context(&logic_id, sandbox.data(), &empty_context)?;
161 should_clear = val.as_bool().unwrap_or(false);
162 }
163 }
164
165 // Initialize empty table array only when: existing table data is not an array
166 let table_is_not_array = !existing_table_value
167 .as_ref()
168 .map_or(false, |v| v.is_array());
169 if should_clear || should_skip || table_is_not_array || requirement_not_filled {
170 sandbox.set(&table_pointer_path, Value::Array(Vec::new()));
171 }
172
173 if should_clear || should_skip || requirement_not_filled {
174 return Ok(Vec::new());
175 }
176
177 let number_from_value = |value: &Value| -> i64 {
178 match value {
179 Value::Number(n) => n
180 .as_i64()
181 .unwrap_or_else(|| n.as_f64().map_or(0, |f| f as i64)),
182 Value::String(s) => s.parse::<f64>().map_or(0, |f| f as i64),
183 Value::Bool(true) => 1,
184 Value::Bool(false) => 0,
185 _ => 0,
186 }
187 };
188
189 for plan in metadata.row_plans.iter() {
190 match plan {
191 RowMetadata::Static { columns } => {
192 // CRITICAL: Preserve SCHEMA ORDER for static rows (match JavaScript behavior)
193 let mut evaluated_row = Map::with_capacity(columns.len());
194
195 // Create internal context for column variables
196 let mut internal_context = Map::new();
197
198 // Evaluate columns in schema order (sandboxed)
199 for column in columns.iter() {
200 let value = if let Some(logic_id) = column.logic {
201 lib.engine.run_with_context(
202 &logic_id,
203 sandbox.data(),
204 &Value::Object(internal_context.clone()),
205 )?
206 } else {
207 column
208 .literal
209 .as_ref()
210 .map(|arc_val| Value::clone(arc_val))
211 .unwrap_or(Value::Null)
212 };
213 // Pre-compute string key once from Arc<str>
214 let col_name_str = column.name.as_ref().to_string();
215 // Store in internal context (column vars start with $)
216 internal_context.insert(column.var_path.as_ref().to_string(), value.clone());
217 evaluated_row.insert(col_name_str, value);
218 }
219
220 sandbox.push_to_array(&table_pointer_path, Value::Object(evaluated_row));
221 }
222 RowMetadata::Repeat {
223 start,
224 end,
225 columns,
226 forward_cols,
227 normal_cols,
228 } => {
229 // Evaluate repeat bounds in sandbox
230 let start_val = if let Some(logic_id) = start.logic {
231 lib.engine
232 .run_with_context(&logic_id, sandbox.data(), &empty_context)?
233 } else {
234 Value::clone(&start.literal)
235 };
236 let end_val = if let Some(logic_id) = end.logic {
237 lib.engine
238 .run_with_context(&logic_id, sandbox.data(), &empty_context)?
239 } else {
240 Value::clone(&end.literal)
241 };
242
243 let start_idx = number_from_value(&start_val);
244 let end_idx = number_from_value(&end_val);
245
246 if start_idx > end_idx {
247 continue;
248 }
249
250 // Count existing static rows in sandbox
251 let existing_row_count = sandbox
252 .get(&table_pointer_path)
253 .and_then(|v| v.as_array())
254 .map(|arr| arr.len())
255 .unwrap_or(0);
256
257 // Pre-allocate all rows in sandbox (zero-copy: pre-compute string keys)
258 let total_rows = (end_idx - start_idx + 1) as usize;
259 let col_count = columns.len();
260 // Pre-compute all column name strings once
261 let col_names: Vec<String> = columns
262 .iter()
263 .map(|col| col.name.as_ref().to_string())
264 .collect();
265
266 if let Some(Value::Array(table_arr)) = sandbox.get_mut(&table_pointer_path) {
267 table_arr.reserve(total_rows);
268 for _ in 0..total_rows {
269 let mut row = Map::with_capacity(col_count);
270 for col_name in &col_names {
271 row.insert(col_name.clone(), Value::Null);
272 }
273 table_arr.push(Value::Object(row));
274 }
275 }
276
277 // ========================================
278 // PHASE 4: TOP TO BOTTOM (Forward Pass)
279 // ========================================
280 // Evaluate columns WITHOUT forward references in sandbox
281
282 // Create internal context with $threshold
283 let mut internal_context = Map::new();
284 internal_context.insert("$threshold".to_string(), Value::from(end_idx));
285
286 for iteration in start_idx..=end_idx {
287 let row_idx = (iteration - start_idx) as usize;
288 let target_idx = existing_row_count + row_idx;
289
290 // Set $iteration in internal context
291 internal_context.insert("$iteration".to_string(), Value::from(iteration));
292
293 // Evaluate normal columns in sandbox
294 for &col_idx in normal_cols.iter() {
295 let column = &columns[col_idx];
296 let value = match column.logic {
297 Some(logic_id) => lib.engine.run_with_context(
298 &logic_id,
299 sandbox.data(),
300 &Value::Object(internal_context.clone()),
301 )?,
302 None => column
303 .literal
304 .as_ref()
305 .map(|arc_val| Value::clone(arc_val))
306 .unwrap_or(Value::Null),
307 };
308
309 // Update table cell in sandbox
310 if let Some(row_obj) =
311 sandbox.get_table_row_mut(&table_pointer_path, target_idx)
312 {
313 if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
314 *cell = value.clone();
315 } else {
316 row_obj.insert(col_names[col_idx].clone(), value.clone());
317 }
318 }
319 // Store in internal context (column vars)
320 internal_context.insert(column.var_path.as_ref().to_string(), value);
321 }
322 }
323 // TODO: Implement mark_modified if needed for tracking
324 // sandbox.mark_modified(&table_pointer_path);
325
326 // ========================================
327 // PHASE 5 (BACKWARD PASS):
328 // Evaluate columns WITH forward references in sandbox
329 // ========================================
330 if !forward_cols.is_empty() {
331 let max_sweeps = 100; // Safety limit to prevent infinite loops
332 let mut scan_from_down = false;
333 let iter_count = (end_idx - start_idx + 1) as usize;
334
335 // Create internal context for backward pass
336 let mut internal_context = Map::new();
337 internal_context.insert("$threshold".to_string(), Value::from(end_idx));
338
339 // Track which columns changed in previous sweep per row
340 // This enables skipping re-evaluation of columns with unchanged dependencies
341 let mut prev_changed: Vec<Vec<bool>> =
342 vec![vec![true; forward_cols.len()]; iter_count];
343
344 for _sweep_num in 1..=max_sweeps {
345 let mut any_changed = false;
346 let mut curr_changed: Vec<Vec<bool>> =
347 vec![vec![false; forward_cols.len()]; iter_count];
348
349 for iter_offset in 0..iter_count {
350 let iteration = if scan_from_down {
351 end_idx - iter_offset as i64
352 } else {
353 start_idx + iter_offset as i64
354 };
355 let row_offset = (iteration - start_idx) as usize;
356 let target_idx = existing_row_count + row_offset;
357
358 // Set $iteration in internal context
359 internal_context
360 .insert("$iteration".to_string(), Value::from(iteration));
361
362 // Restore column values from sandbox to internal context
363 if let Some(Value::Array(table_arr)) = sandbox.get(&table_pointer_path)
364 {
365 if let Some(Value::Object(row_obj)) = table_arr.get(target_idx) {
366 // Collect all column values into internal context
367 for &col_idx in normal_cols.iter().chain(forward_cols.iter()) {
368 let column = &columns[col_idx];
369 if let Some(value) = row_obj.get(column.name.as_ref()) {
370 internal_context.insert(
371 column.var_path.as_ref().to_string(),
372 value.clone(),
373 );
374 }
375 }
376 }
377 }
378
379 // Evaluate forward columns in sandbox (with dependency-aware skipping)
380 for (fwd_idx, &col_idx) in forward_cols.iter().enumerate() {
381 let column = &columns[col_idx];
382
383 // Determine if we should evaluate this column
384 let mut should_evaluate = _sweep_num == 1; // Always evaluate first sweep
385
386 // Skip if no dependencies changed (only for non-forward-ref columns)
387 if !should_evaluate && !column.has_forward_ref {
388 // Check intra-row column dependencies
389 should_evaluate = column.dependencies.iter().any(|dep| {
390 if dep.starts_with('$') {
391 let dep_name = dep.trim_start_matches('$');
392 // Check if dependency is in forward_cols and changed in prev sweep
393 forward_cols.iter().enumerate().any(
394 |(dep_fwd_idx, &dep_col_idx)| {
395 columns[dep_col_idx].name.as_ref() == dep_name
396 && prev_changed[row_offset][dep_fwd_idx]
397 },
398 )
399 } else {
400 // Non-column dependency, always re-evaluate to be safe
401 true
402 }
403 });
404 } else if !should_evaluate {
405 // For forward-ref columns, re-evaluate if anything changed
406 should_evaluate = true;
407 }
408
409 if should_evaluate {
410 let value = match column.logic {
411 Some(logic_id) => lib.engine.run_with_context(
412 &logic_id,
413 sandbox.data(),
414 &Value::Object(internal_context.clone()),
415 )?,
416 None => column
417 .literal
418 .as_ref()
419 .map(|arc_val| Value::clone(arc_val))
420 .unwrap_or(Value::Null),
421 };
422
423 // Write to sandbox table and update internal context
424 if let Some(row_obj) =
425 sandbox.get_table_row_mut(&table_pointer_path, target_idx)
426 {
427 if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
428 if *cell != value {
429 any_changed = true;
430 curr_changed[row_offset][fwd_idx] = true;
431 *cell = value.clone();
432 }
433 } else {
434 any_changed = true;
435 curr_changed[row_offset][fwd_idx] = true;
436 row_obj
437 .insert(col_names[col_idx].clone(), value.clone());
438 }
439 }
440 // Update internal context with new value
441 internal_context
442 .insert(column.var_path.as_ref().to_string(), value);
443 }
444 }
445 }
446
447 scan_from_down = !scan_from_down;
448 prev_changed = curr_changed;
449
450 // Exit early if converged (no changes in this sweep)
451 if !any_changed {
452 break;
453 }
454 }
455 }
456 }
457 }
458 }
459
460 // Extract result from sandbox (zero-copy: take from sandbox, no mutation of parent scope)
461 let final_rows = if let Some(table_value) = sandbox.get_mut(&table_pointer_path) {
462 if let Some(array) = table_value.as_array_mut() {
463 mem::take(array)
464 } else {
465 Vec::new()
466 }
467 } else {
468 Vec::new()
469 };
470
471 // Sandbox is dropped here, all temporary mutations are discarded
472 // Parent scope_data remains unchanged - safe for parallel execution
473 Ok(final_rows)
474}
475
476/// Check if a field is required based on the schema rules
477///
478/// This function looks up the field in the evaluated schema and checks if it has
479/// a "required" rule with value=true. If the field doesn't exist in the schema
480/// or doesn't have a required rule, it's considered optional.
481///
482/// # Arguments
483///
484/// * `schema` - The evaluated schema Value
485/// * `dep_path` - The dependency path (JSON pointer format, e.g., "/properties/field")
486///
487/// # Returns
488///
489/// * `true` if the field is required, `false` if optional or not found
490fn check_field_required(schema: &Value, dep_path: &str) -> bool {
491 // Convert the dependency path to schema path
492 // For fields like "/properties/field", we need to look at "/properties/field/rules/required"
493 let rules_path = format!(
494 "{}/rules/required",
495 path_utils::dot_notation_to_schema_pointer(dep_path)
496 );
497
498 // Try to get the required rule from the schema
499 if let Some(required_rule) = path_utils::get_value_by_pointer(schema, &rules_path) {
500 // Check if the required rule has value=true
501 if let Some(rule_obj) = required_rule.as_object() {
502 if let Some(Value::Bool(is_required)) = rule_obj.get("value") {
503 return *is_required;
504 }
505 }
506 // If it's a direct boolean value
507 if let Some(is_required) = required_rule.as_bool() {
508 return is_required;
509 }
510 }
511
512 // If no required rule found, field is optional
513 false
514}