json_eval_rs/jsoneval/
table_evaluate.rs

1use crate::jsoneval::eval_data::EvalData;
2use crate::jsoneval::table_metadata::RowMetadata;
3use crate::jsoneval::path_utils;
4use crate::JSONEval;
5use serde_json::{Map, Value};
6use std::mem;
7
8/// Sandboxed table evaluation for safe parallel execution
9///
10/// All heavy operations (dependency analysis, forward reference checks) are done at parse time.
11/// This function creates an isolated scope to prevent interference between parallel table evaluations.
12///
13/// # Parallel Safety
14///
15/// This function is designed for safe parallel execution:
16/// - Takes `scope_data` as an immutable reference (read-only parent scope)
17/// - Creates an isolated sandbox (clone) for all table-specific mutations
18/// - All temporary variables (`$iteration`, `$threshold`, column vars) exist only in the sandbox
19/// - The parent `scope_data` remains unchanged, preventing race conditions
20/// - Multiple tables can be evaluated concurrently without interference
21///
22/// # Mutation Safety
23///
24/// **ALL data mutations go through EvalData methods:**
25/// - `sandbox.set()` - sets field values with version tracking
26/// - `sandbox.push_to_array()` - appends to arrays with version tracking
27/// - `sandbox.get_table_row_mut()` - gets mutable row references (followed by mark_modified)
28/// - `sandbox.mark_modified()` - explicitly marks paths as modified
29///
30/// This ensures proper version tracking and mutation safety throughout evaluation.
31///
32/// # Sandboxing Strategy
33///
34/// 1. Clone `scope_data` to create an isolated sandbox at the start
35/// 2. All evaluations and mutations happen within the sandbox via EvalData methods
36/// 3. Extract the final table array from the sandbox
37/// 4. Sandbox is dropped, discarding all temporary state
38/// 5. Parent scope remains pristine and can be safely shared across threads
39pub fn evaluate_table(
40    lib: &JSONEval, // Changed to immutable - parallel-safe, only reads metadata and calls engine
41    eval_key: &str,
42    scope_data: &EvalData, // Now immutable - we read from parent scope
43) -> Result<Vec<Value>, String> {
44    // Clone metadata (cheap since it uses Arc internally)
45    let metadata = lib
46        .table_metadata
47        .get(eval_key)
48        .ok_or_else(|| format!("Table metadata not found for {}", eval_key))?
49        .clone();
50
51    // Pre-compute table path once using JSON pointer format
52    let table_pointer_path = path_utils::normalize_to_json_pointer(eval_key);
53
54    // ==========================================
55    // CREATE SANDBOXED SCOPE (thread-safe isolation)
56    // ==========================================
57    // Clone scope_data to create an isolated sandbox for this table evaluation
58    // This prevents parallel table evaluations from interfering with each other
59    let mut sandbox = scope_data.clone();
60
61    // ==========================================
62    // PHASE 0: Evaluate $datas FIRST (before skip/clear)
63    // ==========================================
64    // Capture existing table value and track if dependencies change
65    let existing_table_value = sandbox.get(&table_pointer_path).cloned();
66
67    // Use empty internal context for $data evaluation
68    let empty_context = Value::Object(Map::new());
69    for (name, logic, literal) in metadata.data_plans.iter() {
70        let value = match logic {
71            Some(logic_id) => {
72                match lib
73                    .engine
74                    .run_with_context(logic_id, sandbox.data(), &empty_context)
75                {
76                    Ok(val) => val,
77                    Err(_) => literal
78                        .as_ref()
79                        .map(|arc_val| Value::clone(arc_val))
80                        .unwrap_or(Value::Null),
81                }
82            }
83            None => literal
84                .as_ref()
85                .map(|arc_val| Value::clone(arc_val))
86                .unwrap_or(Value::Null),
87        };
88
89        sandbox.set(name.as_ref(), value);
90    }
91
92    // ==========================================
93    // PHASE 1: Evaluate $skip - if true, return empty immediately
94    // ==========================================
95    let mut should_skip = metadata.skip_literal;
96    if !should_skip {
97        if let Some(logic_id) = metadata.skip_logic {
98            let val = lib
99                .engine
100                .run_with_context(&logic_id, sandbox.data(), &empty_context)?;
101            should_skip = val.as_bool().unwrap_or(false);
102        }
103    }
104
105    // ==========================================
106    // PHASE 2: Check dependencies before evaluation
107    // ==========================================
108    // Skip evaluation if required dependencies (non-$params, non-$ prefixed) are null/empty
109    let mut requirement_not_filled = false;
110    if let Some(deps) = lib.dependencies.get(eval_key) {
111        for dep in deps.iter() {
112            // Skip $params and any dependency starting with $
113            if dep.contains("$params")
114                || (!dep.contains("$context") && (dep.starts_with("/$") || dep.starts_with("$")))
115            {
116                continue;
117            }
118
119            // Check if this dependency is null or empty
120            if let Some(dep_value) = sandbox.get_without_properties(dep) {
121                let is_empty = match dep_value {
122                    Value::Null => true,
123                    Value::String(s) => s.is_empty(),
124                    Value::Array(arr) => arr.is_empty(),
125                    Value::Object(obj) => obj.is_empty(),
126                    _ => false,
127                };
128
129                if is_empty {
130                    // Check if the field is required in the schema before skipping
131                    let is_field_required = check_field_required(&lib.evaluated_schema, dep);
132
133                    if is_field_required {
134                        requirement_not_filled = true;
135                        break;
136                    }
137                    // If field is not required (optional), continue without skipping
138                }
139            } else {
140                // Dependency doesn't exist
141                // Check if the field is required in the schema before skipping
142                let is_field_required = check_field_required(&lib.evaluated_schema, dep);
143
144                if is_field_required {
145                    requirement_not_filled = true;
146                    break;
147                }
148            }
149        }
150    }
151    // println!("requirement_not_filled: {}", requirement_not_filled);
152
153    // ==========================================
154    // PHASE 3: Evaluate $clear - if true, ensure table is empty
155    // ==========================================
156    let mut should_clear = metadata.clear_literal;
157    if !should_clear {
158        if let Some(logic_id) = metadata.clear_logic {
159            let val = lib
160                .engine
161                .run_with_context(&logic_id, sandbox.data(), &empty_context)?;
162            should_clear = val.as_bool().unwrap_or(false);
163        }
164    }
165
166    // Initialize empty table array only when: existing table data is not an array
167    let table_is_not_array = !existing_table_value
168        .as_ref()
169        .map_or(false, |v| v.is_array());
170    if should_clear || should_skip || table_is_not_array || requirement_not_filled {
171        sandbox.set(&table_pointer_path, Value::Array(Vec::new()));
172    }
173
174    if should_clear || should_skip || requirement_not_filled {
175        return Ok(Vec::new());
176    }
177
178    let number_from_value = |value: &Value| -> i64 {
179        match value {
180            Value::Number(n) => n
181                .as_i64()
182                .unwrap_or_else(|| n.as_f64().map_or(0, |f| f as i64)),
183            Value::String(s) => s.parse::<f64>().map_or(0, |f| f as i64),
184            Value::Bool(true) => 1,
185            Value::Bool(false) => 0,
186            _ => 0,
187        }
188    };
189
190    for plan in metadata.row_plans.iter() {
191        match plan {
192            RowMetadata::Static { columns } => {
193                // CRITICAL: Preserve SCHEMA ORDER for static rows (match JavaScript behavior)
194                let mut evaluated_row = Map::with_capacity(columns.len());
195
196                // Create internal context for column variables
197                let mut internal_context = Map::new();
198
199                // Evaluate columns in schema order (sandboxed)
200                for column in columns.iter() {
201                    let value = if let Some(logic_id) = column.logic {
202                        lib.engine.run_with_context(
203                            &logic_id,
204                            sandbox.data(),
205                            &Value::Object(internal_context.clone()),
206                        )?
207                    } else {
208                        column
209                            .literal
210                            .as_ref()
211                            .map(|arc_val| Value::clone(arc_val))
212                            .unwrap_or(Value::Null)
213                    };
214                    // Pre-compute string key once from Arc<str>
215                    let col_name_str = column.name.as_ref().to_string();
216                    // Store in internal context (column vars start with $)
217                    internal_context.insert(column.var_path.as_ref().to_string(), value.clone());
218                    evaluated_row.insert(col_name_str, value);
219                }
220
221                sandbox.push_to_array(&table_pointer_path, Value::Object(evaluated_row));
222            }
223            RowMetadata::Repeat {
224                start,
225                end,
226                columns,
227                forward_cols,
228                normal_cols,
229            } => {
230                // Evaluate repeat bounds in sandbox
231                let start_val = if let Some(logic_id) = start.logic {
232                    lib.engine
233                        .run_with_context(&logic_id, sandbox.data(), &empty_context)?
234                } else {
235                    Value::clone(&start.literal)
236                };
237                let end_val = if let Some(logic_id) = end.logic {
238                    lib.engine
239                        .run_with_context(&logic_id, sandbox.data(), &empty_context)?
240                } else {
241                    Value::clone(&end.literal)
242                };
243
244                let start_idx = number_from_value(&start_val);
245                let end_idx = number_from_value(&end_val);
246
247                if start_idx > end_idx {
248                    continue;
249                }
250
251                // Count existing static rows in sandbox
252                let existing_row_count = sandbox
253                    .get(&table_pointer_path)
254                    .and_then(|v| v.as_array())
255                    .map(|arr| arr.len())
256                    .unwrap_or(0);
257
258                // Pre-allocate all rows in sandbox (zero-copy: pre-compute string keys)
259                let total_rows = (end_idx - start_idx + 1) as usize;
260                let col_count = columns.len();
261                // Pre-compute all column name strings once
262                let col_names: Vec<String> = columns
263                    .iter()
264                    .map(|col| col.name.as_ref().to_string())
265                    .collect();
266
267                if let Some(Value::Array(table_arr)) = sandbox.get_mut(&table_pointer_path) {
268                    table_arr.reserve(total_rows);
269                    for _ in 0..total_rows {
270                        let mut row = Map::with_capacity(col_count);
271                        for col_name in &col_names {
272                            row.insert(col_name.clone(), Value::Null);
273                        }
274                        table_arr.push(Value::Object(row));
275                    }
276                }
277
278                // ========================================
279                // PHASE 4: TOP TO BOTTOM (Forward Pass)
280                // ========================================
281                // Evaluate columns WITHOUT forward references in sandbox
282
283                // Create internal context with $threshold
284                let mut internal_context = Map::new();
285                internal_context.insert("$threshold".to_string(), Value::from(end_idx));
286
287                for iteration in start_idx..=end_idx {
288                    let row_idx = (iteration - start_idx) as usize;
289                    let target_idx = existing_row_count + row_idx;
290
291                    // Set $iteration in internal context
292                    internal_context.insert("$iteration".to_string(), Value::from(iteration));
293
294                    // Evaluate normal columns in sandbox
295                    for &col_idx in normal_cols.iter() {
296                        let column = &columns[col_idx];
297                        let value = match column.logic {
298                            Some(logic_id) => lib.engine.run_with_context(
299                                &logic_id,
300                                sandbox.data(),
301                                &Value::Object(internal_context.clone()),
302                            )?,
303                            None => column
304                                .literal
305                                .as_ref()
306                                .map(|arc_val| Value::clone(arc_val))
307                                .unwrap_or(Value::Null),
308                        };
309
310                        // Update table cell in sandbox
311                        if let Some(row_obj) =
312                            sandbox.get_table_row_mut(&table_pointer_path, target_idx)
313                        {
314                            if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
315                                *cell = value.clone();
316                            } else {
317                                row_obj.insert(col_names[col_idx].clone(), value.clone());
318                            }
319                        }
320                        // Store in internal context (column vars)
321                        internal_context.insert(column.var_path.as_ref().to_string(), value);
322                    }
323                }
324                // TODO: Implement mark_modified if needed for tracking
325                // sandbox.mark_modified(&table_pointer_path);
326
327                // ========================================
328                // PHASE 5 (BACKWARD PASS):
329                // Evaluate columns WITH forward references in sandbox
330                // ========================================
331                if !forward_cols.is_empty() {
332                    let max_sweeps = 100; // Safety limit to prevent infinite loops
333                    let mut scan_from_down = false;
334                    let iter_count = (end_idx - start_idx + 1) as usize;
335
336                    // Create internal context for backward pass
337                    let mut internal_context = Map::new();
338                    internal_context.insert("$threshold".to_string(), Value::from(end_idx));
339
340                    // Track which columns changed in previous sweep per row
341                    // This enables skipping re-evaluation of columns with unchanged dependencies
342                    let mut prev_changed: Vec<Vec<bool>> =
343                        vec![vec![true; forward_cols.len()]; iter_count];
344
345                    for _sweep_num in 1..=max_sweeps {
346                        let mut any_changed = false;
347                        let mut curr_changed: Vec<Vec<bool>> =
348                            vec![vec![false; forward_cols.len()]; iter_count];
349
350                        for iter_offset in 0..iter_count {
351                            let iteration = if scan_from_down {
352                                end_idx - iter_offset as i64
353                            } else {
354                                start_idx + iter_offset as i64
355                            };
356                            let row_offset = (iteration - start_idx) as usize;
357                            let target_idx = existing_row_count + row_offset;
358
359                            // Set $iteration in internal context
360                            internal_context
361                                .insert("$iteration".to_string(), Value::from(iteration));
362
363                            // Restore column values from sandbox to internal context
364                            if let Some(Value::Array(table_arr)) = sandbox.get(&table_pointer_path)
365                            {
366                                if let Some(Value::Object(row_obj)) = table_arr.get(target_idx) {
367                                    // Collect all column values into internal context
368                                    for &col_idx in normal_cols.iter().chain(forward_cols.iter()) {
369                                        let column = &columns[col_idx];
370                                        if let Some(value) = row_obj.get(column.name.as_ref()) {
371                                            internal_context.insert(
372                                                column.var_path.as_ref().to_string(),
373                                                value.clone(),
374                                            );
375                                        }
376                                    }
377                                }
378                            }
379
380                            // Evaluate forward columns in sandbox (with dependency-aware skipping)
381                            for (fwd_idx, &col_idx) in forward_cols.iter().enumerate() {
382                                let column = &columns[col_idx];
383
384                                // Determine if we should evaluate this column
385                                let mut should_evaluate = _sweep_num == 1; // Always evaluate first sweep
386
387                                // Skip if no dependencies changed (only for non-forward-ref columns)
388                                if !should_evaluate && !column.has_forward_ref {
389                                    // Check intra-row column dependencies
390                                    should_evaluate = column.dependencies.iter().any(|dep| {
391                                        if dep.starts_with('$') {
392                                            let dep_name = dep.trim_start_matches('$');
393                                            // Check if dependency is in forward_cols and changed in prev sweep
394                                            forward_cols.iter().enumerate().any(
395                                                |(dep_fwd_idx, &dep_col_idx)| {
396                                                    columns[dep_col_idx].name.as_ref() == dep_name
397                                                        && prev_changed[row_offset][dep_fwd_idx]
398                                                },
399                                            )
400                                        } else {
401                                            // Non-column dependency, always re-evaluate to be safe
402                                            true
403                                        }
404                                    });
405                                } else if !should_evaluate {
406                                    // For forward-ref columns, re-evaluate if anything changed
407                                    should_evaluate = true;
408                                }
409
410                                if should_evaluate {
411                                    let value = match column.logic {
412                                        Some(logic_id) => lib.engine.run_with_context(
413                                            &logic_id,
414                                            sandbox.data(),
415                                            &Value::Object(internal_context.clone()),
416                                        )?,
417                                        None => column
418                                            .literal
419                                            .as_ref()
420                                            .map(|arc_val| Value::clone(arc_val))
421                                            .unwrap_or(Value::Null),
422                                    };
423
424                                    // Write to sandbox table and update internal context
425                                    if let Some(row_obj) =
426                                        sandbox.get_table_row_mut(&table_pointer_path, target_idx)
427                                    {
428                                        if let Some(cell) = row_obj.get_mut(column.name.as_ref()) {
429                                            if *cell != value {
430                                                any_changed = true;
431                                                curr_changed[row_offset][fwd_idx] = true;
432                                                *cell = value.clone();
433                                            }
434                                        } else {
435                                            any_changed = true;
436                                            curr_changed[row_offset][fwd_idx] = true;
437                                            row_obj
438                                                .insert(col_names[col_idx].clone(), value.clone());
439                                        }
440                                    }
441                                    // Update internal context with new value
442                                    internal_context
443                                        .insert(column.var_path.as_ref().to_string(), value);
444                                }
445                            }
446                        }
447
448                        scan_from_down = !scan_from_down;
449                        prev_changed = curr_changed;
450
451                        // Exit early if converged (no changes in this sweep)
452                        if !any_changed {
453                            break;
454                        }
455                    }
456                }
457            }
458        }
459    }
460
461    // Extract result from sandbox (zero-copy: take from sandbox, no mutation of parent scope)
462    let final_rows = if let Some(table_value) = sandbox.get_mut(&table_pointer_path) {
463        if let Some(array) = table_value.as_array_mut() {
464            mem::take(array)
465        } else {
466            Vec::new()
467        }
468    } else {
469        Vec::new()
470    };
471
472    // Sandbox is dropped here, all temporary mutations are discarded
473    // Parent scope_data remains unchanged - safe for parallel execution
474    Ok(final_rows)
475}
476
477/// Check if a field is required based on the schema rules
478///
479/// This function looks up the field in the evaluated schema and checks if it has
480/// a "required" rule with value=true. If the field doesn't exist in the schema
481/// or doesn't have a required rule, it's considered optional.
482///
483/// # Arguments
484///
485/// * `schema` - The evaluated schema Value
486/// * `dep_path` - The dependency path (JSON pointer format, e.g., "/properties/field")
487///
488/// # Returns
489///
490/// * `true` if the field is required, `false` if optional or not found
491fn check_field_required(schema: &Value, dep_path: &str) -> bool {
492    // Convert the dependency path to schema path
493    // For fields like "/properties/field", we need to look at "/properties/field/rules/required"
494    let rules_path = format!(
495        "{}/rules/required",
496        path_utils::dot_notation_to_schema_pointer(dep_path)
497    );
498
499    // Try to get the required rule from the schema
500    if let Some(required_rule) = path_utils::get_value_by_pointer(schema, &rules_path) {
501        // Check if the required rule has value=true
502        if let Some(rule_obj) = required_rule.as_object() {
503            if let Some(Value::Bool(is_required)) = rule_obj.get("value") {
504                return *is_required;
505            }
506        }
507        // If it's a direct boolean value
508        if let Some(is_required) = required_rule.as_bool() {
509            return is_required;
510        }
511    }
512
513    // If no required rule found, field is optional
514    false
515}