json_eval_rs/table_evaluate.rs
1use serde_json::{Map, Value};
2use std::mem;
3use crate::eval_data::EvalData;
4use crate::{JSONEval, path_utils};
5use crate::table_metadata::RowMetadata;
6
7/// Sandboxed table evaluation for safe parallel execution
8///
9/// All heavy operations (dependency analysis, forward reference checks) are done at parse time.
10/// This function creates an isolated scope to prevent interference between parallel table evaluations.
11///
12/// # Parallel Safety
13///
14/// This function is designed for safe parallel execution:
15/// - Takes `scope_data` as an immutable reference (read-only parent scope)
16/// - Creates an isolated sandbox (clone) for all table-specific mutations
17/// - All temporary variables (`$iteration`, `$threshold`, column vars) exist only in the sandbox
18/// - The parent `scope_data` remains unchanged, preventing race conditions
19/// - Multiple tables can be evaluated concurrently without interference
20///
21/// # Mutation Safety
22///
23/// **ALL data mutations go through EvalData methods:**
24/// - `sandbox.set()` - sets field values with version tracking
25/// - `sandbox.push_to_array()` - appends to arrays with version tracking
26/// - `sandbox.get_table_row_mut()` - gets mutable row references (followed by mark_modified)
27/// - `sandbox.mark_modified()` - explicitly marks paths as modified
28///
29/// This ensures proper version tracking and mutation safety throughout evaluation.
30///
31/// # Sandboxing Strategy
32///
33/// 1. Clone `scope_data` to create an isolated sandbox at the start
34/// 2. All evaluations and mutations happen within the sandbox via EvalData methods
35/// 3. Extract the final table array from the sandbox
36/// 4. Sandbox is dropped, discarding all temporary state
37/// 5. Parent scope remains pristine and can be safely shared across threads
38pub fn evaluate_table(
39 lib: &JSONEval, // Changed to immutable - parallel-safe, only reads metadata and calls engine
40 eval_key: &str,
41 scope_data: &EvalData, // Now immutable - we read from parent scope
42) -> Result<Vec<Value>, String> {
43 // Clone metadata (cheap since it uses Arc internally)
44 let metadata = lib.table_metadata.get(eval_key)
45 .ok_or_else(|| format!("Table metadata not found for {}", eval_key))?.clone();
46
47 // Pre-compute table path once using JSON pointer format
48 let table_pointer_path = path_utils::normalize_to_json_pointer(eval_key);
49
50 // ==========================================
51 // CREATE SANDBOXED SCOPE (thread-safe isolation)
52 // ==========================================
53 // Clone scope_data to create an isolated sandbox for this table evaluation
54 // This prevents parallel table evaluations from interfering with each other
55 let mut sandbox = scope_data.clone();
56
57 // ==========================================
58 // PHASE 0: Evaluate $datas FIRST (before skip/clear)
59 // ==========================================
60 // Capture existing table value and track if dependencies change
61 let existing_table_value = sandbox.get(&table_pointer_path).cloned();
62
63 // Use empty internal context for $data evaluation
64 let empty_context = Value::Object(Map::new());
65 for (name, logic, literal) in metadata.data_plans.iter() {
66 let value = match logic {
67 Some(logic_id) => match lib.engine.run_with_context(logic_id, sandbox.data(), &empty_context) {
68 Ok(val) => val,
69 Err(_) => literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
70 },
71 None => literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
72 };
73
74 sandbox.set(name.as_ref(), value);
75 }
76
77 // ==========================================
78 // PHASE 1: Evaluate $skip - if true, return empty immediately
79 // ==========================================
80 let mut should_skip = metadata.skip_literal;
81 if !should_skip {
82 if let Some(logic_id) = metadata.skip_logic {
83 let val = lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?;
84 should_skip = val.as_bool().unwrap_or(false);
85 }
86 }
87
88 // ==========================================
89 // PHASE 2: Check dependencies before evaluation
90 // ==========================================
91 // Skip evaluation if required dependencies (non-$params, non-$ prefixed) are null/empty
92 let mut requirement_not_filled = false;
93 if let Some(deps) = lib.dependencies.get(eval_key) {
94 for dep in deps.iter() {
95 // Skip $params and any dependency starting with $
96 if dep.contains("$params") || (!dep.contains("$context") && (dep.starts_with("/$") || dep.starts_with("$"))) {
97 continue;
98 }
99
100 // Check if this dependency is null or empty
101 if let Some(dep_value) = sandbox.get_without_properties(dep) {
102 let is_empty = match dep_value {
103 Value::Null => true,
104 Value::String(s) => s.is_empty(),
105 Value::Array(arr) => arr.is_empty(),
106 Value::Object(obj) => obj.is_empty(),
107 _ => false,
108 };
109
110 if is_empty {
111 // Check if the field is required in the schema before skipping
112 let is_field_required = check_field_required(&lib.evaluated_schema, dep);
113
114 if is_field_required {
115 requirement_not_filled = true;
116 break;
117 }
118 // If field is not required (optional), continue without skipping
119 }
120 } else {
121 // println!("dependency {} doesn't exist", dep);
122 // Dependency doesn't exist, treat as null
123 requirement_not_filled = true;
124 break;
125 }
126 }
127 }
128 // println!("requirement_not_filled: {}", requirement_not_filled);
129
130 // ==========================================
131 // PHASE 3: Evaluate $clear - if true, ensure table is empty
132 // ==========================================
133 let mut should_clear = metadata.clear_literal;
134 if !should_clear {
135 if let Some(logic_id) = metadata.clear_logic {
136 let val = lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?;
137 should_clear = val.as_bool().unwrap_or(false);
138 }
139 }
140
141 // Initialize empty table array only when: existing table data is not an array
142 let table_is_not_array = !existing_table_value.as_ref().map_or(false, |v| v.is_array());
143 if should_clear || should_skip || table_is_not_array || requirement_not_filled {
144 sandbox.set(&table_pointer_path, Value::Array(Vec::new()));
145 }
146
147 if should_clear || should_skip || requirement_not_filled {
148 return Ok(Vec::new());
149 }
150
151 let number_from_value = |value: &Value| -> i64 {
152 match value {
153 Value::Number(n) => n.as_i64().unwrap_or_else(|| n.as_f64().map_or(0, |f| f as i64)),
154 Value::String(s) => s.parse::<f64>().map_or(0, |f| f as i64),
155 Value::Bool(true) => 1,
156 Value::Bool(false) => 0,
157 _ => 0,
158 }
159 };
160
161 for plan in metadata.row_plans.iter() {
162 match plan {
163 RowMetadata::Static { columns } => {
164 // CRITICAL: Preserve SCHEMA ORDER for static rows (match JavaScript behavior)
165 let mut evaluated_row = Map::with_capacity(columns.len());
166
167 // Create internal context for column variables
168 let mut internal_context = Map::new();
169
170 // Evaluate columns in schema order (sandboxed)
171 for column in columns.iter() {
172 let value = if let Some(logic_id) = column.logic {
173 lib.engine.run_with_context(&logic_id, sandbox.data(), &Value::Object(internal_context.clone()))?
174 } else {
175 column.literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null)
176 };
177 // Pre-compute string key once from Arc<str>
178 let col_name_str = column.name.as_ref().to_string();
179 // Store in internal context (column vars start with $)
180 internal_context.insert(column.var_path.as_ref().to_string(), value.clone());
181 evaluated_row.insert(col_name_str, value);
182 }
183
184 sandbox.push_to_array(&table_pointer_path, Value::Object(evaluated_row));
185 }
186 RowMetadata::Repeat {
187 start,
188 end,
189 columns,
190 forward_cols,
191 normal_cols,
192 } => {
193 // Evaluate repeat bounds in sandbox
194 let start_val = if let Some(logic_id) = start.logic {
195 lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?
196 } else {
197 Value::clone(&start.literal)
198 };
199 let end_val = if let Some(logic_id) = end.logic {
200 lib.engine.run_with_context(&logic_id, sandbox.data(), &empty_context)?
201 } else {
202 Value::clone(&end.literal)
203 };
204
205 let start_idx = number_from_value(&start_val);
206 let end_idx = number_from_value(&end_val);
207
208 if start_idx > end_idx {
209 continue;
210 }
211
212 // Count existing static rows in sandbox
213 let existing_row_count = sandbox
214 .get(&table_pointer_path)
215 .and_then(|v| v.as_array())
216 .map(|arr| arr.len())
217 .unwrap_or(0);
218
219 // Pre-allocate all rows in sandbox (zero-copy: pre-compute string keys)
220 let total_rows = (end_idx - start_idx + 1) as usize;
221 let col_count = columns.len();
222 // Pre-compute all column name strings once
223 let col_names: Vec<String> = columns.iter()
224 .map(|col| col.name.as_ref().to_string())
225 .collect();
226
227 if let Some(Value::Array(table_arr)) = sandbox.get_mut(&table_pointer_path) {
228 table_arr.reserve(total_rows);
229 for _ in 0..total_rows {
230 let mut row = Map::with_capacity(col_count);
231 for col_name in &col_names {
232 row.insert(col_name.clone(), Value::Null);
233 }
234 table_arr.push(Value::Object(row));
235 }
236 }
237
238 // ========================================
239 // PHASE 4: TOP TO BOTTOM (Forward Pass)
240 // ========================================
241 // Evaluate columns WITHOUT forward references in sandbox
242
243 // Create internal context with $threshold
244 let mut internal_context = Map::new();
245 internal_context.insert("$threshold".to_string(), Value::from(end_idx));
246
247 for iteration in start_idx..=end_idx {
248 let row_idx = (iteration - start_idx) as usize;
249 let target_idx = existing_row_count + row_idx;
250
251 // Set $iteration in internal context
252 internal_context.insert("$iteration".to_string(), Value::from(iteration));
253
254 // Evaluate normal columns in sandbox
255 for &col_idx in normal_cols.iter() {
256 let column = &columns[col_idx];
257 let value = match column.logic {
258 Some(logic_id) => lib.engine.run_with_context(&logic_id, sandbox.data(), &Value::Object(internal_context.clone()))?,
259 None => column.literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
260 };
261
262 // Update table cell in sandbox
263 if let Some(row_obj) = sandbox.get_table_row_mut(&table_pointer_path, target_idx) {
264 if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
265 *cell = value.clone();
266 } else {
267 row_obj.insert(col_names[col_idx].clone(), value.clone());
268 }
269 }
270 // Store in internal context (column vars)
271 internal_context.insert(column.var_path.as_ref().to_string(), value);
272 }
273 }
274 // TODO: Implement mark_modified if needed for tracking
275 // sandbox.mark_modified(&table_pointer_path);
276
277 // ========================================
278 // PHASE 5 (BACKWARD PASS):
279 // Evaluate columns WITH forward references in sandbox
280 // ========================================
281 if !forward_cols.is_empty() {
282 let max_sweeps = 100; // Safety limit to prevent infinite loops
283 let mut scan_from_down = false;
284 let iter_count = (end_idx - start_idx + 1) as usize;
285
286 // Create internal context for backward pass
287 let mut internal_context = Map::new();
288 internal_context.insert("$threshold".to_string(), Value::from(end_idx));
289
290 // Track which columns changed in previous sweep per row
291 // This enables skipping re-evaluation of columns with unchanged dependencies
292 let mut prev_changed: Vec<Vec<bool>> = vec![vec![true; forward_cols.len()]; iter_count];
293
294 for _sweep_num in 1..=max_sweeps {
295 let mut any_changed = false;
296 let mut curr_changed: Vec<Vec<bool>> = vec![vec![false; forward_cols.len()]; iter_count];
297
298 for iter_offset in 0..iter_count {
299 let iteration = if scan_from_down {
300 end_idx - iter_offset as i64
301 } else {
302 start_idx + iter_offset as i64
303 };
304 let row_offset = (iteration - start_idx) as usize;
305 let target_idx = existing_row_count + row_offset;
306
307 // Set $iteration in internal context
308 internal_context.insert("$iteration".to_string(), Value::from(iteration));
309
310 // Restore column values from sandbox to internal context
311 if let Some(Value::Array(table_arr)) = sandbox.get(&table_pointer_path) {
312 if let Some(Value::Object(row_obj)) = table_arr.get(target_idx) {
313 // Collect all column values into internal context
314 for &col_idx in normal_cols.iter().chain(forward_cols.iter()) {
315 let column = &columns[col_idx];
316 if let Some(value) = row_obj.get(column.name.as_ref()) {
317 internal_context.insert(column.var_path.as_ref().to_string(), value.clone());
318 }
319 }
320 }
321 }
322
323 // Evaluate forward columns in sandbox (with dependency-aware skipping)
324 for (fwd_idx, &col_idx) in forward_cols.iter().enumerate() {
325 let column = &columns[col_idx];
326
327 // Determine if we should evaluate this column
328 let mut should_evaluate = _sweep_num == 1; // Always evaluate first sweep
329
330 // Skip if no dependencies changed (only for non-forward-ref columns)
331 if !should_evaluate && !column.has_forward_ref {
332 // Check intra-row column dependencies
333 should_evaluate = column.dependencies.iter().any(|dep| {
334 if dep.starts_with('$') {
335 let dep_name = dep.trim_start_matches('$');
336 // Check if dependency is in forward_cols and changed in prev sweep
337 forward_cols.iter().enumerate().any(|(dep_fwd_idx, &dep_col_idx)| {
338 columns[dep_col_idx].name.as_ref() == dep_name
339 && prev_changed[row_offset][dep_fwd_idx]
340 })
341 } else {
342 // Non-column dependency, always re-evaluate to be safe
343 true
344 }
345 });
346 } else if !should_evaluate {
347 // For forward-ref columns, re-evaluate if anything changed
348 should_evaluate = true;
349 }
350
351 if should_evaluate {
352 let value = match column.logic {
353 Some(logic_id) => lib.engine.run_with_context(&logic_id, sandbox.data(), &Value::Object(internal_context.clone()))?,
354 None => column.literal.as_ref().map(|arc_val| Value::clone(arc_val)).unwrap_or(Value::Null),
355 };
356
357 // Write to sandbox table and update internal context
358 if let Some(row_obj) = sandbox.get_table_row_mut(&table_pointer_path, target_idx) {
359 if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
360 if *cell != value {
361 any_changed = true;
362 curr_changed[row_offset][fwd_idx] = true;
363 *cell = value.clone();
364 }
365 } else {
366 any_changed = true;
367 curr_changed[row_offset][fwd_idx] = true;
368 row_obj.insert(col_names[col_idx].clone(), value.clone());
369 }
370 }
371 // Update internal context with new value
372 internal_context.insert(column.var_path.as_ref().to_string(), value);
373 }
374 }
375 }
376
377 scan_from_down = !scan_from_down;
378 prev_changed = curr_changed;
379
380 // Exit early if converged (no changes in this sweep)
381 if !any_changed {
382 break;
383 }
384 }
385 }
386 }
387 }
388 }
389
390 // Extract result from sandbox (zero-copy: take from sandbox, no mutation of parent scope)
391 let final_rows = if let Some(table_value) = sandbox.get_mut(&table_pointer_path) {
392 if let Some(array) = table_value.as_array_mut() {
393 mem::take(array)
394 } else {
395 Vec::new()
396 }
397 } else {
398 Vec::new()
399 };
400
401 // Sandbox is dropped here, all temporary mutations are discarded
402 // Parent scope_data remains unchanged - safe for parallel execution
403 Ok(final_rows)
404}
405
406/// Check if a field is required based on the schema rules
407///
408/// This function looks up the field in the evaluated schema and checks if it has
409/// a "required" rule with value=true. If the field doesn't exist in the schema
410/// or doesn't have a required rule, it's considered optional.
411///
412/// # Arguments
413///
414/// * `schema` - The evaluated schema Value
415/// * `dep_path` - The dependency path (JSON pointer format, e.g., "/properties/field")
416///
417/// # Returns
418///
419/// * `true` if the field is required, `false` if optional or not found
420fn check_field_required(schema: &Value, dep_path: &str) -> bool {
421 // Convert the dependency path to schema path
422 // For fields like "/properties/field", we need to look at "/properties/field/rules/required"
423 let rules_path = format!("{}/rules/required", path_utils::dot_notation_to_schema_pointer(dep_path));
424
425 // Try to get the required rule from the schema
426 if let Some(required_rule) = path_utils::get_value_by_pointer(schema, &rules_path) {
427 // Check if the required rule has value=true
428 if let Some(rule_obj) = required_rule.as_object() {
429 if let Some(Value::Bool(is_required)) = rule_obj.get("value") {
430 return *is_required;
431 }
432 }
433 // If it's a direct boolean value
434 if let Some(is_required) = required_rule.as_bool() {
435 return is_required;
436 }
437 }
438
439 // If no required rule found, field is optional
440 false
441}