json_eval_rs/table_evaluate.rs
1use serde_json::{Map, Value};
2use std::mem;
3use crate::eval_data::EvalData;
4use crate::{JSONEval, path_utils};
5use crate::table_metadata::RowMetadata;
6
7/// Sandboxed table evaluation for safe parallel execution
8///
9/// All heavy operations (dependency analysis, forward reference checks) are done at parse time.
10/// This function creates an isolated scope to prevent interference between parallel table evaluations.
11///
12/// # Parallel Safety
13///
14/// This function is designed for safe parallel execution:
15/// - Takes `scope_data` as an immutable reference (read-only parent scope)
16/// - Creates an isolated sandbox (clone) for all table-specific mutations
17/// - All temporary variables (`$iteration`, `$threshold`, column vars) exist only in the sandbox
18/// - The parent `scope_data` remains unchanged, preventing race conditions
19/// - Multiple tables can be evaluated concurrently without interference
20///
21/// # Mutation Safety
22///
23/// **ALL data mutations go through EvalData methods:**
24/// - `sandbox.set()` - sets field values with version tracking
25/// - `sandbox.push_to_array()` - appends to arrays with version tracking
26/// - `sandbox.get_table_row_mut()` - gets mutable row references (followed by mark_modified)
27/// - `sandbox.mark_modified()` - explicitly marks paths as modified
28///
29/// This ensures proper version tracking and mutation safety throughout evaluation.
30///
31/// # Sandboxing Strategy
32///
33/// 1. Clone `scope_data` to create an isolated sandbox at the start
34/// 2. All evaluations and mutations happen within the sandbox via EvalData methods
35/// 3. Extract the final table array from the sandbox
36/// 4. Sandbox is dropped, discarding all temporary state
37/// 5. Parent scope remains pristine and can be safely shared across threads
38pub fn evaluate_table(
39 lib: &JSONEval, // Changed to immutable - parallel-safe, only reads metadata and calls engine
40 eval_key: &str,
41 scope_data: &EvalData, // Now immutable - we read from parent scope
42) -> Result<Vec<Value>, String> {
43 // Clone metadata (cheap since it uses Arc internally)
44 let metadata = lib.table_metadata.get(eval_key)
45 .ok_or_else(|| format!("Table metadata not found for {}", eval_key))?.clone();
46
47 // Pre-compute table path once using JSON pointer format
48 let table_pointer_path = path_utils::normalize_to_json_pointer(eval_key);
49
50 // ==========================================
51 // CREATE SANDBOXED SCOPE (thread-safe isolation)
52 // ==========================================
53 // Clone scope_data to create an isolated sandbox for this table evaluation
54 // This prevents parallel table evaluations from interfering with each other
55 let mut sandbox = scope_data.clone();
56
57 // ==========================================
58 // PHASE 0: Evaluate $datas FIRST (before skip/clear)
59 // ==========================================
60 // Capture existing table value and track if dependencies change
61 let existing_table_value = sandbox.get(&table_pointer_path).cloned();
62
63 // Use empty internal context for $data evaluation
64 let empty_context = Value::Object(Map::new());
65 for (name, logic, literal) in metadata.data_plans.iter() {
66 let value = match logic {
67 Some(logic_id) => match lib.engine.run_with_context(logic_id, sandbox.data(), &empty_context) {
68 Ok(val) => val,
69 Err(_) => literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
70 },
71 None => literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
72 };
73
74 sandbox.set(name.as_ref(), value);
75 }
76
77 // ==========================================
78 // PHASE 1: Evaluate $skip - if true, return empty immediately
79 // ==========================================
80 let mut should_skip = metadata.skip_literal;
81 if !should_skip {
82 if let Some(logic_id) = metadata.skip_logic {
83 let val = lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?;
84 should_skip = val.as_bool().unwrap_or(false);
85 }
86 }
87
88 // ==========================================
89 // PHASE 2: Check dependencies before evaluation
90 // ==========================================
91 // Skip evaluation if required dependencies (non-$params, non-$ prefixed) are null/empty
92 let mut requirement_not_filled = false;
93 if let Some(deps) = lib.dependencies.get(eval_key) {
94 for dep in deps.iter() {
95 // Skip $params and any dependency starting with $
96 if dep.contains("$params") || (!dep.contains("$context") && (dep.starts_with("/$") || dep.starts_with("$"))) {
97 continue;
98 }
99
100 // Check if this dependency is null or empty
101 if let Some(dep_value) = sandbox.get_without_properties(dep) {
102 let is_empty = match dep_value {
103 Value::Null => true,
104 Value::String(s) => s.is_empty(),
105 Value::Array(arr) => arr.is_empty(),
106 Value::Object(obj) => obj.is_empty(),
107 _ => false,
108 };
109
110 if is_empty {
111 // Check if the field is required in the schema before skipping
112 let is_field_required = check_field_required(&lib.evaluated_schema, dep);
113
114 if is_field_required {
115 requirement_not_filled = true;
116 break;
117 }
118 // If field is not required (optional), continue without skipping
119 }
120 } else {
121 // Dependency doesn't exist
122 // Check if the field is required in the schema before skipping
123 let is_field_required = check_field_required(&lib.evaluated_schema, dep);
124
125 if is_field_required {
126 requirement_not_filled = true;
127 break;
128 }
129 }
130 }
131 }
132 // println!("requirement_not_filled: {}", requirement_not_filled);
133
134 // ==========================================
135 // PHASE 3: Evaluate $clear - if true, ensure table is empty
136 // ==========================================
137 let mut should_clear = metadata.clear_literal;
138 if !should_clear {
139 if let Some(logic_id) = metadata.clear_logic {
140 let val = lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?;
141 should_clear = val.as_bool().unwrap_or(false);
142 }
143 }
144
145 // Initialize empty table array only when: existing table data is not an array
146 let table_is_not_array = !existing_table_value.as_ref().map_or(false, |v| v.is_array());
147 if should_clear || should_skip || table_is_not_array || requirement_not_filled {
148 sandbox.set(&table_pointer_path, Value::Array(Vec::new()));
149 }
150
151 if should_clear || should_skip || requirement_not_filled {
152 return Ok(Vec::new());
153 }
154
155 let number_from_value = |value: &Value| -> i64 {
156 match value {
157 Value::Number(n) => n.as_i64().unwrap_or_else(|| n.as_f64().map_or(0, |f| f as i64)),
158 Value::String(s) => s.parse::<f64>().map_or(0, |f| f as i64),
159 Value::Bool(true) => 1,
160 Value::Bool(false) => 0,
161 _ => 0,
162 }
163 };
164
165 for plan in metadata.row_plans.iter() {
166 match plan {
167 RowMetadata::Static { columns } => {
168 // CRITICAL: Preserve SCHEMA ORDER for static rows (match JavaScript behavior)
169 let mut evaluated_row = Map::with_capacity(columns.len());
170
171 // Create internal context for column variables
172 let mut internal_context = Map::new();
173
174 // Evaluate columns in schema order (sandboxed)
175 for column in columns.iter() {
176 let value = if let Some(logic_id) = column.logic {
177 lib.engine.run_with_context(&logic_id, sandbox.data(), &Value::Object(internal_context.clone()))?
178 } else {
179 column.literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null)
180 };
181 // Pre-compute string key once from Arc<str>
182 let col_name_str = column.name.as_ref().to_string();
183 // Store in internal context (column vars start with $)
184 internal_context.insert(column.var_path.as_ref().to_string(), value.clone());
185 evaluated_row.insert(col_name_str, value);
186 }
187
188 sandbox.push_to_array(&table_pointer_path, Value::Object(evaluated_row));
189 }
190 RowMetadata::Repeat {
191 start,
192 end,
193 columns,
194 forward_cols,
195 normal_cols,
196 } => {
197 // Evaluate repeat bounds in sandbox
198 let start_val = if let Some(logic_id) = start.logic {
199 lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?
200 } else {
201 Value::clone(&start.literal)
202 };
203 let end_val = if let Some(logic_id) = end.logic {
204 lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?
205 } else {
206 Value::clone(&end.literal)
207 };
208
209 let start_idx = number_from_value(&start_val);
210 let end_idx = number_from_value(&end_val);
211
212 if start_idx > end_idx {
213 continue;
214 }
215
216 // Count existing static rows in sandbox
217 let existing_row_count = sandbox
218 .get(&table_pointer_path)
219 .and_then(|v| v.as_array())
220 .map(|arr| arr.len())
221 .unwrap_or(0);
222
223 // Pre-allocate all rows in sandbox (zero-copy: pre-compute string keys)
224 let total_rows = (end_idx - start_idx + 1) as usize;
225 let col_count = columns.len();
226 // Pre-compute all column name strings once
227 let col_names: Vec<String> = columns.iter()
228 .map(|col| col.name.as_ref().to_string())
229 .collect();
230
231 if let Some(Value::Array(table_arr)) = sandbox.get_mut(&table_pointer_path) {
232 table_arr.reserve(total_rows);
233 for _ in 0..total_rows {
234 let mut row = Map::with_capacity(col_count);
235 for col_name in &col_names {
236 row.insert(col_name.clone(), Value::Null);
237 }
238 table_arr.push(Value::Object(row));
239 }
240 }
241
242 // ========================================
243 // PHASE 4: TOP TO BOTTOM (Forward Pass)
244 // ========================================
245 // Evaluate columns WITHOUT forward references in sandbox
246
247 // Create internal context with $threshold
248 let mut internal_context = Map::new();
249 internal_context.insert("$threshold".to_string(), Value::from(end_idx));
250
251 for iteration in start_idx..=end_idx {
252 let row_idx = (iteration - start_idx) as usize;
253 let target_idx = existing_row_count + row_idx;
254
255 // Set $iteration in internal context
256 internal_context.insert("$iteration".to_string(), Value::from(iteration));
257
258 // Evaluate normal columns in sandbox
259 for &col_idx in normal_cols.iter() {
260 let column = &columns[col_idx];
261 let value = match column.logic {
262 Some(logic_id) => lib.engine.run_with_context(&logic_id, sandbox.data(), &Value::Object(internal_context.clone()))?,
263 None => column.literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
264 };
265
266 // Update table cell in sandbox
267 if let Some(row_obj) = sandbox.get_table_row_mut(&table_pointer_path, target_idx) {
268 if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
269 *cell = value.clone();
270 } else {
271 row_obj.insert(col_names[col_idx].clone(), value.clone());
272 }
273 }
274 // Store in internal context (column vars)
275 internal_context.insert(column.var_path.as_ref().to_string(), value);
276 }
277 }
278 // TODO: Implement mark_modified if needed for tracking
279 // sandbox.mark_modified(&table_pointer_path);
280
281 // ========================================
282 // PHASE 5 (BACKWARD PASS):
283 // Evaluate columns WITH forward references in sandbox
284 // ========================================
285 if !forward_cols.is_empty() {
286 let max_sweeps = 100; // Safety limit to prevent infinite loops
287 let mut scan_from_down = false;
288 let iter_count = (end_idx - start_idx + 1) as usize;
289
290 // Create internal context for backward pass
291 let mut internal_context = Map::new();
292 internal_context.insert("$threshold".to_string(), Value::from(end_idx));
293
294 // Track which columns changed in previous sweep per row
295 // This enables skipping re-evaluation of columns with unchanged dependencies
296 let mut prev_changed: Vec<Vec<bool>> = vec![vec![true; forward_cols.len()]; iter_count];
297
298 for _sweep_num in 1..=max_sweeps {
299 let mut any_changed = false;
300 let mut curr_changed: Vec<Vec<bool>> = vec![vec![false; forward_cols.len()]; iter_count];
301
302 for iter_offset in 0..iter_count {
303 let iteration = if scan_from_down {
304 end_idx - iter_offset as i64
305 } else {
306 start_idx + iter_offset as i64
307 };
308 let row_offset = (iteration - start_idx) as usize;
309 let target_idx = existing_row_count + row_offset;
310
311 // Set $iteration in internal context
312 internal_context.insert("$iteration".to_string(), Value::from(iteration));
313
314 // Restore column values from sandbox to internal context
315 if let Some(Value::Array(table_arr)) = sandbox.get(&table_pointer_path) {
316 if let Some(Value::Object(row_obj)) = table_arr.get(target_idx) {
317 // Collect all column values into internal context
318 for &col_idx in normal_cols.iter().chain(forward_cols.iter()) {
319 let column = &columns[col_idx];
320 if let Some(value) = row_obj.get(column.name.as_ref()) {
321 internal_context.insert(column.var_path.as_ref().to_string(), value.clone());
322 }
323 }
324 }
325 }
326
327 // Evaluate forward columns in sandbox (with dependency-aware skipping)
328 for (fwd_idx, &col_idx) in forward_cols.iter().enumerate() {
329 let column = &columns[col_idx];
330
331 // Determine if we should evaluate this column
332 let mut should_evaluate = _sweep_num == 1; // Always evaluate first sweep
333
334 // Skip if no dependencies changed (only for non-forward-ref columns)
335 if !should_evaluate && !column.has_forward_ref {
336 // Check intra-row column dependencies
337 should_evaluate = column.dependencies.iter().any(|dep| {
338 if dep.starts_with('$') {
339 let dep_name = dep.trim_start_matches('$');
340 // Check if dependency is in forward_cols and changed in prev sweep
341 forward_cols.iter().enumerate().any(|(dep_fwd_idx, &dep_col_idx)| {
342 columns[dep_col_idx].name.as_ref() == dep_name
343 && prev_changed[row_offset][dep_fwd_idx]
344 })
345 } else {
346 // Non-column dependency, always re-evaluate to be safe
347 true
348 }
349 });
350 } else if !should_evaluate {
351 // For forward-ref columns, re-evaluate if anything changed
352 should_evaluate = true;
353 }
354
355 if should_evaluate {
356 let value = match column.logic {
357 Some(logic_id) => lib.engine.run_with_context(&logic_id, sandbox.data(), &Value::Object(internal_context.clone()))?,
358 None => column.literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
359 };
360
361 // Write to sandbox table and update internal context
362 if let Some(row_obj) = sandbox.get_table_row_mut(&table_pointer_path, target_idx) {
363 if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
364 if *cell != value {
365 any_changed = true;
366 curr_changed[row_offset][fwd_idx] = true;
367 *cell = value.clone();
368 }
369 } else {
370 any_changed = true;
371 curr_changed[row_offset][fwd_idx] = true;
372 row_obj.insert(col_names[col_idx].clone(), value.clone());
373 }
374 }
375 // Update internal context with new value
376 internal_context.insert(column.var_path.as_ref().to_string(), value);
377 }
378 }
379 }
380
381 scan_from_down = !scan_from_down;
382 prev_changed = curr_changed;
383
384 // Exit early if converged (no changes in this sweep)
385 if !any_changed {
386 break;
387 }
388 }
389 }
390 }
391 }
392 }
393
394 // Extract result from sandbox (zero-copy: take from sandbox, no mutation of parent scope)
395 let final_rows = if let Some(table_value) = sandbox.get_mut(&table_pointer_path) {
396 if let Some(array) = table_value.as_array_mut() {
397 mem::take(array)
398 } else {
399 Vec::new()
400 }
401 } else {
402 Vec::new()
403 };
404
405 // Sandbox is dropped here, all temporary mutations are discarded
406 // Parent scope_data remains unchanged - safe for parallel execution
407 Ok(final_rows)
408}
409
410/// Check if a field is required based on the schema rules
411///
412/// This function looks up the field in the evaluated schema and checks if it has
413/// a "required" rule with value=true. If the field doesn't exist in the schema
414/// or doesn't have a required rule, it's considered optional.
415///
416/// # Arguments
417///
418/// * `schema` - The evaluated schema Value
419/// * `dep_path` - The dependency path (JSON pointer format, e.g., "/properties/field")
420///
421/// # Returns
422///
423/// * `true` if the field is required, `false` if optional or not found
424fn check_field_required(schema: &Value, dep_path: &str) -> bool {
425 // Convert the dependency path to schema path
426 // For fields like "/properties/field", we need to look at "/properties/field/rules/required"
427 let rules_path = format!("{}/rules/required", path_utils::dot_notation_to_schema_pointer(dep_path));
428
429 // Try to get the required rule from the schema
430 if let Some(required_rule) = path_utils::get_value_by_pointer(schema, &rules_path) {
431 // Check if the required rule has value=true
432 if let Some(rule_obj) = required_rule.as_object() {
433 if let Some(Value::Bool(is_required)) = rule_obj.get("value") {
434 return *is_required;
435 }
436 }
437 // If it's a direct boolean value
438 if let Some(is_required) = required_rule.as_bool() {
439 return is_required;
440 }
441 }
442
443 // If no required rule found, field is optional
444 false
445}