Skip to main content

rulemorph/
transform.rs

1use chrono::offset::TimeZone;
2use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
3use csv::ReaderBuilder;
4use regex::Regex;
5use serde_json::{Map, Value as JsonValue};
6use std::cmp::Ordering;
7use std::collections::{HashMap, HashSet};
8use std::path::{Path, PathBuf};
9use std::sync::{Mutex, OnceLock};
10
11use crate::cache::LruCache;
12use crate::error::{TransformError, TransformErrorKind, TransformWarning};
13use crate::model::{
14    Expr, ExprChain, ExprOp, ExprRef, FinalizeSpec, InputFormat, Mapping, RuleFile, V2RuleStep,
15};
16use crate::path::{PathToken, get_path, parse_path};
17use crate::v2_eval::{
18    EvalItem as V2EvalItem, EvalValue as V2EvalValue, V2EvalContext, eval_v2_condition,
19    eval_v2_expr, eval_v2_pipe,
20};
21use crate::v2_parser::{
22    is_literal_escape, is_pipe_value, is_v2_ref, parse_v2_condition, parse_v2_expr,
23    parse_v2_pipe_from_value,
24};
25
26const REGEX_CACHE_CAPACITY: usize = 128;
27
28fn regex_cache() -> &'static Mutex<LruCache<String, Regex>> {
29    static REGEX_CACHE: OnceLock<Mutex<LruCache<String, Regex>>> = OnceLock::new();
30    REGEX_CACHE.get_or_init(|| Mutex::new(LruCache::new(REGEX_CACHE_CAPACITY)))
31}
32
33fn cached_regex(pattern: &str, path: &str) -> Result<Regex, TransformError> {
34    let key = pattern.to_string();
35    if let Some(regex) = {
36        let mut cache = regex_cache().lock().unwrap_or_else(|err| err.into_inner());
37        cache.get_cloned(&key)
38    } {
39        return Ok(regex);
40    }
41
42    let regex = Regex::new(pattern).map_err(|_| {
43        TransformError::new(TransformErrorKind::ExprError, "regex pattern is invalid")
44            .with_path(path)
45    })?;
46    {
47        let mut cache = regex_cache().lock().unwrap_or_else(|err| err.into_inner());
48        cache.insert(key, regex.clone());
49    }
50    Ok(regex)
51}
52
53pub fn transform(
54    rule: &RuleFile,
55    input: &str,
56    context: Option<&JsonValue>,
57) -> Result<JsonValue, TransformError> {
58    transform_with_warnings(rule, input, context).map(|(output, _)| output)
59}
60
61pub fn transform_with_base_dir(
62    rule: &RuleFile,
63    input: &str,
64    context: Option<&JsonValue>,
65    base_dir: &Path,
66) -> Result<JsonValue, TransformError> {
67    transform_with_warnings_with_base_dir(rule, input, context, base_dir).map(|(output, _)| output)
68}
69
70pub fn preflight_validate(
71    rule: &RuleFile,
72    input: &str,
73    context: Option<&JsonValue>,
74) -> Result<(), TransformError> {
75    preflight_validate_with_warnings(rule, input, context).map(|_| ())
76}
77
78pub fn preflight_validate_with_base_dir(
79    rule: &RuleFile,
80    input: &str,
81    context: Option<&JsonValue>,
82    base_dir: &Path,
83) -> Result<(), TransformError> {
84    preflight_validate_with_warnings_with_base_dir(rule, input, context, base_dir).map(|_| ())
85}
86
87#[derive(Debug)]
88pub struct TransformStreamItem {
89    pub output: Option<JsonValue>,
90    pub warnings: Vec<TransformWarning>,
91}
92
93pub struct TransformStream<'a> {
94    rule: &'a RuleFile,
95    context: Option<&'a JsonValue>,
96    records: InputRecordsIter<'a>,
97    base_dir: Option<&'a Path>,
98    done: bool,
99}
100
101impl<'a> TransformStream<'a> {
102    fn new(
103        rule: &'a RuleFile,
104        input: &'a str,
105        context: Option<&'a JsonValue>,
106        base_dir: Option<&'a Path>,
107    ) -> Result<Self, TransformError> {
108        let records = input_records_iter(rule, input)?;
109        Ok(Self {
110            rule,
111            context,
112            records,
113            base_dir,
114            done: false,
115        })
116    }
117}
118
119impl<'a> Iterator for TransformStream<'a> {
120    type Item = Result<TransformStreamItem, TransformError>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        if self.done {
124            return None;
125        }
126
127        loop {
128            let record = match self.records.next() {
129                None => {
130                    self.done = true;
131                    return None;
132                }
133                Some(Ok(record)) => record,
134                Some(Err(err)) => {
135                    self.done = true;
136                    return Some(Err(err));
137                }
138            };
139
140            let mut warnings = Vec::new();
141            match apply_rule_to_record(
142                self.rule,
143                &record,
144                self.context,
145                &mut warnings,
146                self.base_dir,
147            ) {
148                Ok(output) => {
149                    if output.is_none() && warnings.is_empty() {
150                        continue;
151                    }
152                    return Some(Ok(TransformStreamItem { output, warnings }));
153                }
154                Err(err) => {
155                    self.done = true;
156                    return Some(Err(err));
157                }
158            }
159        }
160    }
161}
162
163pub fn transform_stream<'a>(
164    rule: &'a RuleFile,
165    input: &'a str,
166    context: Option<&'a JsonValue>,
167) -> Result<TransformStream<'a>, TransformError> {
168    if rule.finalize.is_some() {
169        return Err(TransformError::new(
170            TransformErrorKind::InvalidInput,
171            "finalize is not supported in stream mode",
172        ));
173    }
174    TransformStream::new(rule, input, context, None)
175}
176
177pub fn transform_stream_with_base_dir<'a>(
178    rule: &'a RuleFile,
179    input: &'a str,
180    context: Option<&'a JsonValue>,
181    base_dir: &'a Path,
182) -> Result<TransformStream<'a>, TransformError> {
183    if rule.finalize.is_some() {
184        return Err(TransformError::new(
185            TransformErrorKind::InvalidInput,
186            "finalize is not supported in stream mode",
187        ));
188    }
189    TransformStream::new(rule, input, context, Some(base_dir))
190}
191
192pub fn transform_with_warnings(
193    rule: &RuleFile,
194    input: &str,
195    context: Option<&JsonValue>,
196) -> Result<(JsonValue, Vec<TransformWarning>), TransformError> {
197    transform_with_warnings_inner(rule, input, context, None)
198}
199
200pub fn transform_with_warnings_with_base_dir(
201    rule: &RuleFile,
202    input: &str,
203    context: Option<&JsonValue>,
204    base_dir: &Path,
205) -> Result<(JsonValue, Vec<TransformWarning>), TransformError> {
206    transform_with_warnings_inner(rule, input, context, Some(base_dir))
207}
208
209fn transform_with_warnings_inner(
210    rule: &RuleFile,
211    input: &str,
212    context: Option<&JsonValue>,
213    base_dir: Option<&Path>,
214) -> Result<(JsonValue, Vec<TransformWarning>), TransformError> {
215    let mut warnings = Vec::new();
216    let mut output_records = Vec::new();
217    if rule.finalize.is_some() {
218        let mut records = input_records_iter(rule, input)?;
219        while let Some(record) = records.next() {
220            let record = record?;
221            let mut record_warnings = Vec::new();
222            if let Some(output) =
223                apply_rule_to_record(rule, &record, context, &mut record_warnings, base_dir)?
224            {
225                output_records.push(output);
226            }
227            warnings.extend(record_warnings);
228        }
229    } else {
230        let stream = match base_dir {
231            Some(base_dir) => transform_stream_with_base_dir(rule, input, context, base_dir)?,
232            None => transform_stream(rule, input, context)?,
233        };
234        for item in stream {
235            let item = item?;
236            warnings.extend(item.warnings);
237            if let Some(output) = item.output {
238                output_records.push(output);
239            }
240        }
241    }
242
243    let mut output = JsonValue::Array(output_records);
244    if let Some(finalize) = &rule.finalize {
245        output = apply_finalize(finalize, output, context)?;
246    }
247
248    Ok((output, warnings))
249}
250
251pub fn transform_record(
252    rule: &RuleFile,
253    record: &JsonValue,
254    context: Option<&JsonValue>,
255) -> Result<Option<JsonValue>, TransformError> {
256    let (output, _warnings) = transform_record_with_warnings(rule, record, context)?;
257    Ok(output)
258}
259
260pub fn transform_record_with_base_dir(
261    rule: &RuleFile,
262    record: &JsonValue,
263    context: Option<&JsonValue>,
264    base_dir: &Path,
265) -> Result<Option<JsonValue>, TransformError> {
266    let (output, _warnings) =
267        transform_record_with_warnings_with_base_dir(rule, record, context, base_dir)?;
268    Ok(output)
269}
270
271pub fn transform_record_with_warnings(
272    rule: &RuleFile,
273    record: &JsonValue,
274    context: Option<&JsonValue>,
275) -> Result<(Option<JsonValue>, Vec<TransformWarning>), TransformError> {
276    transform_record_with_warnings_inner(rule, record, context, None)
277}
278
279pub fn transform_record_with_warnings_with_base_dir(
280    rule: &RuleFile,
281    record: &JsonValue,
282    context: Option<&JsonValue>,
283    base_dir: &Path,
284) -> Result<(Option<JsonValue>, Vec<TransformWarning>), TransformError> {
285    transform_record_with_warnings_inner(rule, record, context, Some(base_dir))
286}
287
288fn transform_record_with_warnings_inner(
289    rule: &RuleFile,
290    record: &JsonValue,
291    context: Option<&JsonValue>,
292    base_dir: Option<&Path>,
293) -> Result<(Option<JsonValue>, Vec<TransformWarning>), TransformError> {
294    let mut warnings = Vec::new();
295    let output = apply_rule_to_record(rule, record, context, &mut warnings, base_dir)?;
296    if output.is_none() {
297        return Ok((None, warnings));
298    }
299    if let Some(finalize) = &rule.finalize {
300        let mut records = Vec::new();
301        if let Some(value) = output {
302            records.push(value);
303        }
304        let finalized = apply_finalize(finalize, JsonValue::Array(records), context)?;
305        return Ok((Some(finalized), warnings));
306    }
307    Ok((output, warnings))
308}
309
310pub fn preflight_validate_with_warnings(
311    rule: &RuleFile,
312    input: &str,
313    context: Option<&JsonValue>,
314) -> Result<Vec<TransformWarning>, TransformError> {
315    preflight_validate_with_warnings_inner(rule, input, context, None)
316}
317
318pub fn preflight_validate_with_warnings_with_base_dir(
319    rule: &RuleFile,
320    input: &str,
321    context: Option<&JsonValue>,
322    base_dir: &Path,
323) -> Result<Vec<TransformWarning>, TransformError> {
324    preflight_validate_with_warnings_inner(rule, input, context, Some(base_dir))
325}
326
327fn preflight_validate_with_warnings_inner(
328    rule: &RuleFile,
329    input: &str,
330    context: Option<&JsonValue>,
331    base_dir: Option<&Path>,
332) -> Result<Vec<TransformWarning>, TransformError> {
333    let mut warnings = Vec::new();
334    if rule.finalize.is_some() {
335        let mut output_records = Vec::new();
336        let mut records = input_records_iter(rule, input)?;
337        while let Some(record) = records.next() {
338            let record = record?;
339            let mut record_warnings = Vec::new();
340            if let Some(output) =
341                apply_rule_to_record(rule, &record, context, &mut record_warnings, base_dir)?
342            {
343                output_records.push(output);
344            }
345            warnings.extend(record_warnings);
346        }
347        if let Some(finalize) = &rule.finalize {
348            let _ = apply_finalize(finalize, JsonValue::Array(output_records), context)?;
349        }
350    } else {
351        let stream = match base_dir {
352            Some(base_dir) => transform_stream_with_base_dir(rule, input, context, base_dir)?,
353            None => transform_stream(rule, input, context)?,
354        };
355        for item in stream {
356            let item = item?;
357            warnings.extend(item.warnings);
358        }
359    }
360    Ok(warnings)
361}
362
363fn apply_mappings(
364    rule: &RuleFile,
365    record: &JsonValue,
366    context: Option<&JsonValue>,
367    warnings: &mut Vec<TransformWarning>,
368) -> Result<JsonValue, TransformError> {
369    let mut out = JsonValue::Object(Map::new());
370    apply_mappings_into(
371        &rule.mappings,
372        record,
373        context,
374        &mut out,
375        warnings,
376        rule.version,
377        "mappings",
378    )?;
379    Ok(out)
380}
381
382fn apply_mappings_into(
383    mappings: &[Mapping],
384    record: &JsonValue,
385    context: Option<&JsonValue>,
386    out: &mut JsonValue,
387    warnings: &mut Vec<TransformWarning>,
388    rule_version: u8,
389    base_path: &str,
390) -> Result<(), TransformError> {
391    for (index, mapping) in mappings.iter().enumerate() {
392        let mapping_path = format!("{}[{}]", base_path, index);
393        if !eval_when(
394            mapping,
395            record,
396            context,
397            out,
398            &mapping_path,
399            warnings,
400            rule_version,
401        ) {
402            continue;
403        }
404        let value = eval_mapping(mapping, record, context, out, &mapping_path, rule_version)?;
405        if let Some(value) = value {
406            set_path(out, &mapping.target, value, &mapping_path)?;
407        }
408    }
409    Ok(())
410}
411
412fn apply_rule_to_record(
413    rule: &RuleFile,
414    record: &JsonValue,
415    context: Option<&JsonValue>,
416    warnings: &mut Vec<TransformWarning>,
417    base_dir: Option<&Path>,
418) -> Result<Option<JsonValue>, TransformError> {
419    if let Some(steps) = &rule.steps {
420        return apply_steps(steps, record, context, warnings, rule.version, base_dir);
421    }
422
423    if !eval_record_when(rule, record, context, warnings) {
424        return Ok(None);
425    }
426
427    let output = apply_mappings(rule, record, context, warnings)?;
428    Ok(Some(output))
429}
430
431fn apply_steps(
432    steps: &[V2RuleStep],
433    record: &JsonValue,
434    context: Option<&JsonValue>,
435    warnings: &mut Vec<TransformWarning>,
436    rule_version: u8,
437    base_dir: Option<&Path>,
438) -> Result<Option<JsonValue>, TransformError> {
439    let mut out = JsonValue::Object(Map::new());
440
441    for (step_index, step) in steps.iter().enumerate() {
442        let base_path = format!("steps[{}]", step_index);
443
444        if let Some(mappings) = &step.mappings {
445            apply_mappings_into(
446                mappings,
447                record,
448                context,
449                &mut out,
450                warnings,
451                rule_version,
452                &format!("{}.mappings", base_path),
453            )?;
454            continue;
455        }
456
457        if let Some(expr) = &step.record_when {
458            let when_path = format!("{}.record_when", base_path);
459            let keep = eval_when_expr(expr, record, context, &out, &when_path, rule_version)?;
460            if !keep {
461                return Ok(None);
462            }
463            continue;
464        }
465
466        if let Some(asserts) = &step.asserts {
467            for (assert_index, assert) in asserts.iter().enumerate() {
468                let assert_path = format!("{}.asserts[{}]", base_path, assert_index);
469                let ok = eval_when_expr(
470                    &assert.when,
471                    record,
472                    context,
473                    &out,
474                    &format!("{}.when", assert_path),
475                    rule_version,
476                )?;
477                if !ok {
478                    return Err(TransformError::new(
479                        TransformErrorKind::AssertionFailed,
480                        format!(
481                            "assert failed: {}: {}",
482                            assert.error.code, assert.error.message
483                        ),
484                    )
485                    .with_path(assert_path));
486                }
487            }
488            continue;
489        }
490
491        if let Some(branch) = &step.branch {
492            let branch_path = format!("{}.branch", base_path);
493            let take = eval_when_expr(
494                &branch.when,
495                record,
496                context,
497                &out,
498                &format!("{}.when", branch_path),
499                rule_version,
500            )?;
501            let (target, target_field) = if take {
502                (Some(branch.then.as_str()), "then")
503            } else {
504                (branch.r#else.as_deref(), "else")
505            };
506            if let Some(target) = target {
507                let (branch_rule, branch_base_dir) = load_rule_from_path(base_dir, target)
508                    .map_err(|err| err.with_path(format!("{}.{}", branch_path, target_field)))?;
509                let branch_input = out.clone();
510                let (branch_output, branch_warnings) = transform_record_with_warnings_inner(
511                    &branch_rule,
512                    &branch_input,
513                    context,
514                    Some(&branch_base_dir),
515                )?;
516                warnings.extend(branch_warnings);
517                let Some(branch_output) = branch_output else {
518                    return Ok(None);
519                };
520
521                if branch.return_ {
522                    return Ok(Some(branch_output));
523                }
524                merge_branch_output(&mut out, &branch_output, &branch_path)?;
525            }
526            continue;
527        }
528    }
529
530    Ok(Some(out))
531}
532
533fn merge_branch_output(
534    out: &mut JsonValue,
535    other: &JsonValue,
536    path: &str,
537) -> Result<(), TransformError> {
538    let out_map = out.as_object_mut().ok_or_else(|| {
539        TransformError::new(TransformErrorKind::InvalidTarget, "output must be object")
540            .with_path(path)
541    })?;
542    let other_map = other.as_object().ok_or_else(|| {
543        TransformError::new(
544            TransformErrorKind::InvalidTarget,
545            "branch output must be object",
546        )
547        .with_path(path)
548    })?;
549    merge_object_maps(out_map, other_map);
550    Ok(())
551}
552
553fn merge_object_maps(out_map: &mut Map<String, JsonValue>, other_map: &Map<String, JsonValue>) {
554    for (key, other_value) in other_map {
555        match (out_map.get_mut(key), other_value) {
556            (Some(JsonValue::Object(out_obj)), JsonValue::Object(other_obj)) => {
557                merge_object_maps(out_obj, other_obj);
558            }
559            _ => {
560                out_map.insert(key.clone(), other_value.clone());
561            }
562        }
563    }
564}
565
566fn load_rule_from_path(
567    base_dir: Option<&Path>,
568    path: &str,
569) -> Result<(RuleFile, PathBuf), TransformError> {
570    let resolved = resolve_rule_path(base_dir, path);
571    let yaml = std::fs::read_to_string(&resolved).map_err(|err| {
572        TransformError::new(
573            TransformErrorKind::InvalidInput,
574            format!("failed to read rule: {}", err),
575        )
576        .with_path(path)
577    })?;
578    let rule = crate::parse_rule_file(&yaml).map_err(|err| {
579        TransformError::new(
580            TransformErrorKind::InvalidInput,
581            format!("failed to parse rule: {}", err),
582        )
583        .with_path(path)
584    })?;
585    let resolved_base = resolved
586        .parent()
587        .unwrap_or_else(|| Path::new("."))
588        .to_path_buf();
589    Ok((rule, resolved_base))
590}
591
592fn resolve_rule_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
593    let rule_path = PathBuf::from(path);
594    if rule_path.is_absolute() {
595        rule_path
596    } else if let Some(base_dir) = base_dir {
597        base_dir.join(rule_path)
598    } else {
599        rule_path
600    }
601}
602
603fn apply_finalize(
604    finalize: &FinalizeSpec,
605    output: JsonValue,
606    context: Option<&JsonValue>,
607) -> Result<JsonValue, TransformError> {
608    let mut records = match output {
609        JsonValue::Array(records) => records,
610        _ => {
611            return Err(TransformError::new(
612                TransformErrorKind::InvalidInput,
613                "finalize expects array output",
614            )
615            .with_path("finalize"));
616        }
617    };
618
619    if let Some(filter) = &finalize.filter {
620        let raw = expr_to_json_for_v2_condition(filter).ok_or_else(|| {
621            TransformError::new(
622                TransformErrorKind::ExprError,
623                "finalize.filter must be a v2 condition",
624            )
625            .with_path("finalize.filter")
626        })?;
627        let cond = parse_v2_condition(&raw).map_err(|err| {
628            TransformError::new(
629                TransformErrorKind::ExprError,
630                format!("invalid v2 condition: {}", err),
631            )
632            .with_path("finalize.filter")
633        })?;
634        let base_out = JsonValue::Array(records.clone());
635        let mut filtered = Vec::new();
636        for (index, item) in records.iter().enumerate() {
637            let ctx = V2EvalContext::new().with_item(V2EvalItem { value: item, index });
638            let keep = eval_v2_condition(&cond, item, context, &base_out, "finalize.filter", &ctx)?;
639            if keep {
640                filtered.push(item.clone());
641            }
642        }
643        records = filtered;
644    }
645
646    if let Some(sort) = &finalize.sort {
647        let tokens = parse_path(&sort.by).map_err(|_| {
648            TransformError::new(
649                TransformErrorKind::InvalidRecordsPath,
650                "finalize.sort.by is invalid",
651            )
652            .with_path("finalize.sort.by")
653        })?;
654
655        struct SortItem {
656            key: SortKey,
657            index: usize,
658            value: JsonValue,
659        }
660
661        let mut items = Vec::with_capacity(records.len());
662        for (index, item) in records.iter().enumerate() {
663            let key_value = get_path(item, &tokens).ok_or_else(|| {
664                TransformError::new(
665                    TransformErrorKind::InvalidRef,
666                    "finalize.sort.by path not found",
667                )
668                .with_path("finalize.sort.by")
669            })?;
670            let key = sort_key_from_value(key_value, "finalize.sort.by")?;
671            items.push(SortItem {
672                key,
673                index,
674                value: item.clone(),
675            });
676        }
677
678        items.sort_by(|left, right| {
679            let mut ordering = compare_sort_keys(&left.key, &right.key);
680            if sort.order == "desc" {
681                ordering = ordering.reverse();
682            }
683            if ordering == Ordering::Equal {
684                left.index.cmp(&right.index)
685            } else {
686                ordering
687            }
688        });
689
690        records = items.into_iter().map(|item| item.value).collect();
691    }
692
693    if let Some(offset) = finalize.offset {
694        if offset > 0 && offset < records.len() {
695            records = records.split_off(offset);
696        } else if offset >= records.len() {
697            records = Vec::new();
698        }
699    }
700
701    if let Some(limit) = finalize.limit {
702        if limit < records.len() {
703            records.truncate(limit);
704        }
705    }
706
707    let output = JsonValue::Array(records);
708    if let Some(wrap) = &finalize.wrap {
709        let wrapped = eval_wrap_value(wrap, &output, context, "finalize.wrap")?;
710        return Ok(wrapped);
711    }
712
713    Ok(output)
714}
715
716fn eval_wrap_value(
717    value: &JsonValue,
718    out: &JsonValue,
719    context: Option<&JsonValue>,
720    path: &str,
721) -> Result<JsonValue, TransformError> {
722    match value {
723        JsonValue::Object(map) => {
724            let mut out_map = serde_json::Map::new();
725            for (key, value) in map {
726                let child_path = format!("{}.{}", path, key);
727                out_map.insert(
728                    key.clone(),
729                    eval_wrap_value(value, out, context, &child_path)?,
730                );
731            }
732            Ok(JsonValue::Object(out_map))
733        }
734        _ => {
735            let expr = parse_v2_expr(value).map_err(|err| {
736                TransformError::new(
737                    TransformErrorKind::ExprError,
738                    format!("invalid v2 expr: {}", err),
739                )
740                .with_path(path)
741            })?;
742            let ctx = V2EvalContext::new();
743            match eval_v2_expr(&expr, out, context, out, path, &ctx)? {
744                V2EvalValue::Missing => Ok(JsonValue::Null),
745                V2EvalValue::Value(value) => Ok(value),
746            }
747        }
748    }
749}
750
751fn sort_key_from_value(value: &JsonValue, path: &str) -> Result<SortKey, TransformError> {
752    match value {
753        JsonValue::Number(number) => number.as_f64().map(SortKey::Number).ok_or_else(|| {
754            TransformError::new(
755                TransformErrorKind::ExprError,
756                "sort key must be a finite number",
757            )
758            .with_path(path)
759        }),
760        JsonValue::String(value) => Ok(SortKey::String(value.clone())),
761        JsonValue::Bool(value) => Ok(SortKey::Bool(*value)),
762        _ => Err(TransformError::new(
763            TransformErrorKind::ExprError,
764            "sort key must be string/number/bool",
765        )
766        .with_path(path)),
767    }
768}
769
770fn input_records_iter<'a>(
771    rule: &RuleFile,
772    input: &'a str,
773) -> Result<InputRecordsIter<'a>, TransformError> {
774    match rule.input.format {
775        InputFormat::Csv => Ok(InputRecordsIter::Csv(CsvRecordIter::new(rule, input)?)),
776        InputFormat::Json => Ok(InputRecordsIter::Json(JsonRecordIter::new(parse_json(
777            rule, input,
778        )?))),
779    }
780}
781
782enum InputRecordsIter<'a> {
783    Csv(CsvRecordIter<'a>),
784    Json(JsonRecordIter),
785}
786
787impl<'a> Iterator for InputRecordsIter<'a> {
788    type Item = Result<JsonValue, TransformError>;
789
790    fn next(&mut self) -> Option<Self::Item> {
791        match self {
792            InputRecordsIter::Csv(iter) => iter.next(),
793            InputRecordsIter::Json(iter) => iter.next(),
794        }
795    }
796}
797
798struct CsvRecordIter<'a> {
799    reader: csv::Reader<&'a [u8]>,
800    headers: Vec<String>,
801    done: bool,
802}
803
804impl<'a> CsvRecordIter<'a> {
805    fn new(rule: &RuleFile, input: &'a str) -> Result<Self, TransformError> {
806        let csv_spec = rule.input.csv.as_ref().ok_or_else(|| {
807            TransformError::new(
808                TransformErrorKind::InvalidInput,
809                "input.csv is required when format=csv",
810            )
811        })?;
812
813        let delimiter_chars: Vec<char> = csv_spec.delimiter.chars().collect();
814        if delimiter_chars.len() != 1 {
815            return Err(TransformError::new(
816                TransformErrorKind::InvalidInput,
817                "csv.delimiter must be a single character",
818            ));
819        }
820        let delimiter = delimiter_chars[0] as u8;
821
822        let mut reader = ReaderBuilder::new()
823            .delimiter(delimiter)
824            .has_headers(csv_spec.has_header)
825            .from_reader(input.as_bytes());
826
827        let headers: Vec<String> = if csv_spec.has_header {
828            let header_record = reader.headers().map_err(|err| {
829                TransformError::new(
830                    TransformErrorKind::InvalidInput,
831                    format!("failed to read csv header: {}", err),
832                )
833            })?;
834            header_record.iter().map(|s| s.to_string()).collect()
835        } else {
836            let columns = csv_spec.columns.as_ref().ok_or_else(|| {
837                TransformError::new(
838                    TransformErrorKind::InvalidInput,
839                    "csv.columns is required when has_header=false",
840                )
841            })?;
842            columns.iter().map(|col| col.name.clone()).collect()
843        };
844
845        Ok(Self {
846            reader,
847            headers,
848            done: false,
849        })
850    }
851}
852
853impl<'a> Iterator for CsvRecordIter<'a> {
854    type Item = Result<JsonValue, TransformError>;
855
856    fn next(&mut self) -> Option<Self::Item> {
857        if self.done {
858            return None;
859        }
860
861        let mut record = csv::StringRecord::new();
862        match self.reader.read_record(&mut record) {
863            Ok(has_data) => {
864                if !has_data {
865                    self.done = true;
866                    return None;
867                }
868                let obj = record_to_object(&self.headers, &record);
869                Some(Ok(JsonValue::Object(obj)))
870            }
871            Err(err) => {
872                self.done = true;
873                Some(Err(TransformError::new(
874                    TransformErrorKind::InvalidInput,
875                    format!("failed to read csv record: {}", err),
876                )))
877            }
878        }
879    }
880}
881
882struct JsonRecordIter {
883    iter: std::vec::IntoIter<JsonValue>,
884}
885
886impl JsonRecordIter {
887    fn new(records: Vec<JsonValue>) -> Self {
888        Self {
889            iter: records.into_iter(),
890        }
891    }
892}
893
894impl Iterator for JsonRecordIter {
895    type Item = Result<JsonValue, TransformError>;
896
897    fn next(&mut self) -> Option<Self::Item> {
898        self.iter.next().map(Ok)
899    }
900}
901
902fn parse_json(rule: &RuleFile, input: &str) -> Result<Vec<JsonValue>, TransformError> {
903    let value: JsonValue = serde_json::from_str(input).map_err(|err| {
904        TransformError::new(
905            TransformErrorKind::InvalidInput,
906            format!("failed to parse JSON input: {}", err),
907        )
908    })?;
909
910    let records_value = match rule
911        .input
912        .json
913        .as_ref()
914        .and_then(|j| j.records_path.as_deref())
915    {
916        Some(path) => {
917            let tokens = parse_path(path).map_err(|err| {
918                TransformError::new(TransformErrorKind::InvalidRecordsPath, err.message())
919                    .with_path("input.json.records_path")
920            })?;
921            let found = get_path(&value, &tokens).ok_or_else(|| {
922                TransformError::new(
923                    TransformErrorKind::InvalidRecordsPath,
924                    "records_path does not exist",
925                )
926                .with_path("input.json.records_path")
927            })?;
928            found
929        }
930        None => &value,
931    };
932
933    match records_value {
934        JsonValue::Array(items) => Ok(items.clone()),
935        JsonValue::Object(_) => Ok(vec![records_value.clone()]),
936        _ => Err(TransformError::new(
937            TransformErrorKind::InvalidInput,
938            "records_path must point to an array or object",
939        )),
940    }
941}
942
943fn record_to_object(headers: &[String], record: &csv::StringRecord) -> Map<String, JsonValue> {
944    let mut obj = Map::new();
945    for (index, name) in headers.iter().enumerate() {
946        if let Some(value) = record.get(index) {
947            obj.insert(name.clone(), JsonValue::String(value.to_string()));
948        }
949    }
950    obj
951}
952
953fn eval_mapping(
954    mapping: &crate::model::Mapping,
955    record: &JsonValue,
956    context: Option<&JsonValue>,
957    out: &JsonValue,
958    mapping_path: &str,
959    version: u8,
960) -> Result<Option<JsonValue>, TransformError> {
961    let value = if let Some(source) = &mapping.source {
962        resolve_source(source, record, context, out, mapping_path)?
963    } else if let Some(literal) = &mapping.value {
964        EvalValue::Value(literal.clone())
965    } else if let Some(expr) = &mapping.expr {
966        // Check if this is a v2 expression (version 2)
967        if version >= 2 {
968            let expr_path = format!("{}.expr", mapping_path);
969            // Try to interpret as v2 pipe
970            let v2_json = expr_to_json_for_v2_pipe(expr);
971            if let Some(json_val) = v2_json {
972                let v2_pipe = parse_v2_pipe_from_value(&json_val).map_err(|e| {
973                    TransformError::new(TransformErrorKind::ExprError, e.to_string())
974                        .with_path(&expr_path)
975                })?;
976                let v2_ctx = V2EvalContext::new();
977                let v2_result = eval_v2_pipe(&v2_pipe, record, context, out, &expr_path, &v2_ctx)?;
978                // Convert v2 EvalValue to v1 EvalValue
979                match v2_result {
980                    V2EvalValue::Missing => EvalValue::Missing,
981                    V2EvalValue::Value(v) => EvalValue::Value(v),
982                }
983            } else {
984                // v2 but not a v2 pipe - use v1 eval
985                eval_expr(expr, record, context, out, &expr_path, None)?
986            }
987        } else {
988            // v1 rule - use v1 eval
989            eval_expr(
990                expr,
991                record,
992                context,
993                out,
994                &format!("{}.expr", mapping_path),
995                None,
996            )?
997        }
998    } else {
999        return Err(TransformError::new(
1000            TransformErrorKind::InvalidInput,
1001            "mapping must define source, value, or expr",
1002        )
1003        .with_path(mapping_path));
1004    };
1005
1006    let mut value = match value {
1007        EvalValue::Missing => {
1008            if let Some(default) = &mapping.default {
1009                default.clone()
1010            } else if mapping.required {
1011                return Err(TransformError::new(
1012                    TransformErrorKind::MissingRequired,
1013                    "required value is missing",
1014                )
1015                .with_path(mapping_path));
1016            } else {
1017                return Ok(None);
1018            }
1019        }
1020        EvalValue::Value(value) => value,
1021    };
1022
1023    if value.is_null() {
1024        if mapping.required {
1025            return Err(TransformError::new(
1026                TransformErrorKind::MissingRequired,
1027                "required value is null",
1028            )
1029            .with_path(mapping_path));
1030        }
1031        return Ok(Some(value));
1032    }
1033
1034    if let Some(type_name) = &mapping.value_type {
1035        value = cast_value(&value, type_name, &format!("{}.type", mapping_path))?;
1036    }
1037
1038    Ok(Some(value))
1039}
1040
1041fn eval_when(
1042    mapping: &crate::model::Mapping,
1043    record: &JsonValue,
1044    context: Option<&JsonValue>,
1045    out: &JsonValue,
1046    mapping_path: &str,
1047    warnings: &mut Vec<TransformWarning>,
1048    rule_version: u8,
1049) -> bool {
1050    let expr = match &mapping.when {
1051        Some(expr) => expr,
1052        None => return true,
1053    };
1054
1055    let when_path = format!("{}.when", mapping_path);
1056    match eval_when_expr(expr, record, context, out, &when_path, rule_version) {
1057        Ok(flag) => flag,
1058        Err(err) => {
1059            warnings.push(err.into());
1060            false
1061        }
1062    }
1063}
1064
1065fn eval_record_when(
1066    rule: &RuleFile,
1067    record: &JsonValue,
1068    context: Option<&JsonValue>,
1069    warnings: &mut Vec<TransformWarning>,
1070) -> bool {
1071    let expr = match &rule.record_when {
1072        Some(expr) => expr,
1073        None => return true,
1074    };
1075
1076    let empty_out = JsonValue::Object(Map::new());
1077    match eval_when_expr(
1078        expr,
1079        record,
1080        context,
1081        &empty_out,
1082        "record_when",
1083        rule.version,
1084    ) {
1085        Ok(flag) => flag,
1086        Err(err) => {
1087            warnings.push(err.into());
1088            false
1089        }
1090    }
1091}
1092
1093fn eval_bool_expr(
1094    expr: &Expr,
1095    record: &JsonValue,
1096    context: Option<&JsonValue>,
1097    out: &JsonValue,
1098    path: &str,
1099) -> Result<bool, TransformError> {
1100    let value = eval_expr(expr, record, context, out, path, None)?;
1101    let value = match value {
1102        EvalValue::Missing => JsonValue::Null,
1103        EvalValue::Value(value) => value,
1104    };
1105    match value {
1106        JsonValue::Bool(flag) => Ok(flag),
1107        _ => Err(when_type_error(path)),
1108    }
1109}
1110
1111fn eval_when_expr(
1112    expr: &Expr,
1113    record: &JsonValue,
1114    context: Option<&JsonValue>,
1115    out: &JsonValue,
1116    path: &str,
1117    rule_version: u8,
1118) -> Result<bool, TransformError> {
1119    if rule_version >= 2 {
1120        if let Some(raw_value) = expr_to_json_for_v2_condition(expr) {
1121            let condition = parse_v2_condition(&raw_value).map_err(|err| {
1122                TransformError::new(
1123                    TransformErrorKind::ExprError,
1124                    format!("invalid v2 condition: {}", err),
1125                )
1126                .with_path(path)
1127            })?;
1128            let ctx = V2EvalContext::new();
1129            return eval_v2_condition(&condition, record, context, out, path, &ctx);
1130        }
1131    }
1132
1133    eval_bool_expr(expr, record, context, out, path)
1134}
1135
1136fn when_type_error(path: &str) -> TransformError {
1137    TransformError::new(
1138        TransformErrorKind::ExprError,
1139        "when/record_when must evaluate to boolean",
1140    )
1141    .with_path(path)
1142}
1143
1144fn resolve_source(
1145    source: &str,
1146    record: &JsonValue,
1147    context: Option<&JsonValue>,
1148    out: &JsonValue,
1149    mapping_path: &str,
1150) -> Result<EvalValue, TransformError> {
1151    let (namespace, path) =
1152        parse_source(source).map_err(|err| err.with_path(format!("{}.source", mapping_path)))?;
1153    let tokens = parse_path_tokens(
1154        path,
1155        TransformErrorKind::InvalidRef,
1156        format!("{}.source", mapping_path),
1157    )?;
1158    let target = match namespace {
1159        Namespace::Input => Some(record),
1160        Namespace::Context => context,
1161        Namespace::Out => Some(out),
1162        Namespace::Item | Namespace::Acc | Namespace::Pipe | Namespace::Local => {
1163            return Err(TransformError::new(
1164                TransformErrorKind::InvalidRef,
1165                "ref namespace must be input|context|out",
1166            )
1167            .with_path(format!("{}.source", mapping_path)));
1168        }
1169    };
1170
1171    match target.and_then(|value| get_path(value, &tokens)) {
1172        Some(value) => Ok(EvalValue::Value(value.clone())),
1173        None => Ok(EvalValue::Missing),
1174    }
1175}
1176
1177fn eval_expr(
1178    expr: &Expr,
1179    record: &JsonValue,
1180    context: Option<&JsonValue>,
1181    out: &JsonValue,
1182    base_path: &str,
1183    locals: Option<&EvalLocals<'_>>,
1184) -> Result<EvalValue, TransformError> {
1185    match expr {
1186        Expr::Literal(value) => Ok(EvalValue::Value(value.clone())),
1187        Expr::Ref(expr_ref) => eval_ref(expr_ref, record, context, out, base_path, locals),
1188        Expr::Op(expr_op) => eval_op(expr_op, record, context, out, base_path, None, locals),
1189        Expr::Chain(expr_chain) => eval_chain(expr_chain, record, context, out, base_path, locals),
1190    }
1191}
1192
1193fn eval_chain(
1194    expr_chain: &ExprChain,
1195    record: &JsonValue,
1196    context: Option<&JsonValue>,
1197    out: &JsonValue,
1198    base_path: &str,
1199    locals: Option<&EvalLocals<'_>>,
1200) -> Result<EvalValue, TransformError> {
1201    if expr_chain.chain.is_empty() {
1202        return Err(TransformError::new(
1203            TransformErrorKind::ExprError,
1204            "expr.chain must be a non-empty array",
1205        )
1206        .with_path(format!("{}.chain", base_path)));
1207    }
1208
1209    let first_path = format!("{}.chain[0]", base_path);
1210    let mut current = eval_expr(
1211        &expr_chain.chain[0],
1212        record,
1213        context,
1214        out,
1215        &first_path,
1216        locals,
1217    )?;
1218
1219    for (index, step) in expr_chain.chain.iter().enumerate().skip(1) {
1220        let step_path = format!("{}.chain[{}]", base_path, index);
1221        let expr_op = match step {
1222            Expr::Op(expr_op) => expr_op,
1223            _ => {
1224                return Err(TransformError::new(
1225                    TransformErrorKind::ExprError,
1226                    "expr.chain items after first must be op",
1227                )
1228                .with_path(step_path));
1229            }
1230        };
1231
1232        let injected = current.clone();
1233        current = eval_op(
1234            expr_op,
1235            record,
1236            context,
1237            out,
1238            &step_path,
1239            Some(&injected),
1240            locals,
1241        )?;
1242    }
1243
1244    Ok(current)
1245}
1246
1247fn eval_ref(
1248    expr_ref: &ExprRef,
1249    record: &JsonValue,
1250    context: Option<&JsonValue>,
1251    out: &JsonValue,
1252    base_path: &str,
1253    locals: Option<&EvalLocals<'_>>,
1254) -> Result<EvalValue, TransformError> {
1255    let (namespace, path) =
1256        parse_ref(&expr_ref.ref_path).map_err(|err| err.with_path(base_path))?;
1257    let tokens = parse_path_tokens(path, TransformErrorKind::InvalidRef, base_path.to_string())?;
1258    let target = match namespace {
1259        Namespace::Input => Some(record),
1260        Namespace::Context => context,
1261        Namespace::Out => Some(out),
1262        Namespace::Item => {
1263            let item = locals.and_then(|locals| locals.item).ok_or_else(|| {
1264                TransformError::new(
1265                    TransformErrorKind::ExprError,
1266                    "item is only available within array ops",
1267                )
1268                .with_path(base_path)
1269            })?;
1270            let (root, rest) = match tokens.split_first() {
1271                Some((PathToken::Key(key), rest)) if key == "value" => (item.value, rest),
1272                Some((PathToken::Key(key), rest)) if key == "index" => {
1273                    if !rest.is_empty() {
1274                        return Ok(EvalValue::Missing);
1275                    }
1276                    let value = JsonValue::Number(serde_json::Number::from(item.index as u64));
1277                    return Ok(EvalValue::Value(value));
1278                }
1279                _ => {
1280                    return Err(TransformError::new(
1281                        TransformErrorKind::ExprError,
1282                        "item ref must start with value or index",
1283                    )
1284                    .with_path(base_path));
1285                }
1286            };
1287            return match get_path(root, rest) {
1288                Some(value) => Ok(EvalValue::Value(value.clone())),
1289                None => Ok(EvalValue::Missing),
1290            };
1291        }
1292        Namespace::Acc => {
1293            let acc = locals.and_then(|locals| locals.acc).ok_or_else(|| {
1294                TransformError::new(
1295                    TransformErrorKind::ExprError,
1296                    "acc is only available within reduce/fold ops",
1297                )
1298                .with_path(base_path)
1299            })?;
1300            let (root, rest) = match tokens.split_first() {
1301                Some((PathToken::Key(key), rest)) if key == "value" => (acc, rest),
1302                _ => {
1303                    return Err(TransformError::new(
1304                        TransformErrorKind::ExprError,
1305                        "acc ref must start with value",
1306                    )
1307                    .with_path(base_path));
1308                }
1309            };
1310            return match get_path(root, rest) {
1311                Some(value) => Ok(EvalValue::Value(value.clone())),
1312                None => Ok(EvalValue::Missing),
1313            };
1314        }
1315        Namespace::Pipe => {
1316            let pipe_value = locals.and_then(|locals| locals.pipe).ok_or_else(|| {
1317                TransformError::new(
1318                    TransformErrorKind::ExprError,
1319                    "pipe is only available within v2 pipes",
1320                )
1321                .with_path(base_path)
1322            })?;
1323            let (root, rest) = match tokens.split_first() {
1324                Some((PathToken::Key(key), rest)) if key == "value" => (pipe_value, rest),
1325                _ => {
1326                    return Err(TransformError::new(
1327                        TransformErrorKind::ExprError,
1328                        "pipe ref must start with value",
1329                    )
1330                    .with_path(base_path));
1331                }
1332            };
1333            let value = match root {
1334                EvalValue::Missing => return Ok(EvalValue::Missing),
1335                EvalValue::Value(value) => value,
1336            };
1337            return match get_path(value, rest) {
1338                Some(value) => Ok(EvalValue::Value(value.clone())),
1339                None => Ok(EvalValue::Missing),
1340            };
1341        }
1342        Namespace::Local => {
1343            let locals_map = locals.and_then(|locals| locals.locals).ok_or_else(|| {
1344                TransformError::new(
1345                    TransformErrorKind::ExprError,
1346                    "local is only available within v2 pipes",
1347                )
1348                .with_path(base_path)
1349            })?;
1350            let (first, rest) = match tokens.split_first() {
1351                Some((PathToken::Key(key), rest)) => (key, rest),
1352                _ => {
1353                    return Err(TransformError::new(
1354                        TransformErrorKind::ExprError,
1355                        "local ref must start with a key",
1356                    )
1357                    .with_path(base_path));
1358                }
1359            };
1360            let local_value = locals_map.get(first).ok_or_else(|| {
1361                TransformError::new(
1362                    TransformErrorKind::ExprError,
1363                    format!("undefined local: {}", first),
1364                )
1365                .with_path(base_path)
1366            })?;
1367            let value = match local_value {
1368                EvalValue::Missing => return Ok(EvalValue::Missing),
1369                EvalValue::Value(value) => value,
1370            };
1371            return match get_path(value, rest) {
1372                Some(value) => Ok(EvalValue::Value(value.clone())),
1373                None => Ok(EvalValue::Missing),
1374            };
1375        }
1376    };
1377
1378    match target.and_then(|value| get_path(value, &tokens)) {
1379        Some(value) => Ok(EvalValue::Value(value.clone())),
1380        None => Ok(EvalValue::Missing),
1381    }
1382}
1383
1384pub(crate) fn eval_op(
1385    expr_op: &ExprOp,
1386    record: &JsonValue,
1387    context: Option<&JsonValue>,
1388    out: &JsonValue,
1389    base_path: &str,
1390    injected: Option<&EvalValue>,
1391    locals: Option<&EvalLocals<'_>>,
1392) -> Result<EvalValue, TransformError> {
1393    let total_len = args_len(&expr_op.args, injected);
1394    if total_len == 0 {
1395        return Err(TransformError::new(
1396            TransformErrorKind::ExprError,
1397            "expr.args must be a non-empty array",
1398        )
1399        .with_path(format!("{}.args", base_path)));
1400    }
1401
1402    match expr_op.op.as_str() {
1403        "concat" => {
1404            let mut parts = Vec::new();
1405            for index in 0..total_len {
1406                let arg_path = format!("{}.args[{}]", base_path, index);
1407                let value = eval_expr_at_index(
1408                    index,
1409                    &expr_op.args,
1410                    injected,
1411                    record,
1412                    context,
1413                    out,
1414                    base_path,
1415                    locals,
1416                )?;
1417                match value {
1418                    EvalValue::Missing => return Ok(EvalValue::Missing),
1419                    EvalValue::Value(value) => {
1420                        if value.is_null() {
1421                            return Err(TransformError::new(
1422                                TransformErrorKind::ExprError,
1423                                "concat does not accept null",
1424                            )
1425                            .with_path(arg_path));
1426                        }
1427                        let part = value_to_string(&value, &arg_path)?;
1428                        parts.push(part);
1429                    }
1430                }
1431            }
1432            Ok(EvalValue::Value(JsonValue::String(parts.join(""))))
1433        }
1434        "coalesce" => {
1435            for index in 0..total_len {
1436                let value = eval_expr_at_index(
1437                    index,
1438                    &expr_op.args,
1439                    injected,
1440                    record,
1441                    context,
1442                    out,
1443                    base_path,
1444                    locals,
1445                )?;
1446                match value {
1447                    EvalValue::Missing => continue,
1448                    EvalValue::Value(value) => {
1449                        if value.is_null() {
1450                            continue;
1451                        }
1452                        return Ok(EvalValue::Value(value));
1453                    }
1454                }
1455            }
1456            Ok(EvalValue::Missing)
1457        }
1458        "to_string" => eval_unary_string_op(
1459            &expr_op.args,
1460            injected,
1461            record,
1462            context,
1463            out,
1464            base_path,
1465            locals,
1466            |value, path| value_to_string(value, path).map(JsonValue::String),
1467        ),
1468        "trim" => eval_unary_string_op(
1469            &expr_op.args,
1470            injected,
1471            record,
1472            context,
1473            out,
1474            base_path,
1475            locals,
1476            |value, path| {
1477                let s = value_as_string(value, path)?;
1478                Ok(JsonValue::String(s.trim().to_string()))
1479            },
1480        ),
1481        "lowercase" => eval_unary_string_op(
1482            &expr_op.args,
1483            injected,
1484            record,
1485            context,
1486            out,
1487            base_path,
1488            locals,
1489            |value, path| {
1490                let s = value_as_string(value, path)?;
1491                Ok(JsonValue::String(s.to_lowercase()))
1492            },
1493        ),
1494        "uppercase" => eval_unary_string_op(
1495            &expr_op.args,
1496            injected,
1497            record,
1498            context,
1499            out,
1500            base_path,
1501            locals,
1502            |value, path| {
1503                let s = value_as_string(value, path)?;
1504                Ok(JsonValue::String(s.to_uppercase()))
1505            },
1506        ),
1507        "replace" => eval_replace(
1508            &expr_op.args,
1509            injected,
1510            record,
1511            context,
1512            out,
1513            base_path,
1514            locals,
1515        ),
1516        "split" => eval_split(
1517            &expr_op.args,
1518            injected,
1519            record,
1520            context,
1521            out,
1522            base_path,
1523            locals,
1524        ),
1525        "pad_start" => eval_pad(
1526            &expr_op.args,
1527            injected,
1528            record,
1529            context,
1530            out,
1531            base_path,
1532            true,
1533            locals,
1534        ),
1535        "pad_end" => eval_pad(
1536            &expr_op.args,
1537            injected,
1538            record,
1539            context,
1540            out,
1541            base_path,
1542            false,
1543            locals,
1544        ),
1545        "lookup" => eval_lookup(
1546            &expr_op.args,
1547            injected,
1548            record,
1549            context,
1550            out,
1551            base_path,
1552            false,
1553            locals,
1554        ),
1555        "lookup_first" => eval_lookup(
1556            &expr_op.args,
1557            injected,
1558            record,
1559            context,
1560            out,
1561            base_path,
1562            true,
1563            locals,
1564        ),
1565        "merge" => eval_json_merge(
1566            &expr_op.args,
1567            injected,
1568            record,
1569            context,
1570            out,
1571            base_path,
1572            false,
1573            locals,
1574        ),
1575        "deep_merge" => eval_json_merge(
1576            &expr_op.args,
1577            injected,
1578            record,
1579            context,
1580            out,
1581            base_path,
1582            true,
1583            locals,
1584        ),
1585        "get" => eval_json_get(
1586            &expr_op.args,
1587            injected,
1588            record,
1589            context,
1590            out,
1591            base_path,
1592            locals,
1593        ),
1594        "pick" => eval_json_pick(
1595            &expr_op.args,
1596            injected,
1597            record,
1598            context,
1599            out,
1600            base_path,
1601            locals,
1602        ),
1603        "omit" => eval_json_omit(
1604            &expr_op.args,
1605            injected,
1606            record,
1607            context,
1608            out,
1609            base_path,
1610            locals,
1611        ),
1612        "keys" => eval_json_keys(
1613            &expr_op.args,
1614            injected,
1615            record,
1616            context,
1617            out,
1618            base_path,
1619            locals,
1620        ),
1621        "values" => eval_json_values(
1622            &expr_op.args,
1623            injected,
1624            record,
1625            context,
1626            out,
1627            base_path,
1628            locals,
1629        ),
1630        "entries" => eval_json_entries(
1631            &expr_op.args,
1632            injected,
1633            record,
1634            context,
1635            out,
1636            base_path,
1637            locals,
1638        ),
1639        "len" => eval_len(
1640            &expr_op.args,
1641            injected,
1642            record,
1643            context,
1644            out,
1645            base_path,
1646            locals,
1647        ),
1648        "from_entries" => eval_json_from_entries(
1649            &expr_op.args,
1650            injected,
1651            record,
1652            context,
1653            out,
1654            base_path,
1655            locals,
1656        ),
1657        "object_flatten" => eval_json_object_flatten(
1658            &expr_op.args,
1659            injected,
1660            record,
1661            context,
1662            out,
1663            base_path,
1664            locals,
1665        ),
1666        "object_unflatten" => eval_json_object_unflatten(
1667            &expr_op.args,
1668            injected,
1669            record,
1670            context,
1671            out,
1672            base_path,
1673            locals,
1674        ),
1675        "map" => eval_array_map(
1676            &expr_op.args,
1677            injected,
1678            record,
1679            context,
1680            out,
1681            base_path,
1682            locals,
1683        ),
1684        "filter" => eval_array_filter(
1685            &expr_op.args,
1686            injected,
1687            record,
1688            context,
1689            out,
1690            base_path,
1691            locals,
1692        ),
1693        "flat_map" => eval_array_flat_map(
1694            &expr_op.args,
1695            injected,
1696            record,
1697            context,
1698            out,
1699            base_path,
1700            locals,
1701        ),
1702        "flatten" => eval_array_flatten(
1703            &expr_op.args,
1704            injected,
1705            record,
1706            context,
1707            out,
1708            base_path,
1709            locals,
1710        ),
1711        "take" => eval_array_take(
1712            &expr_op.args,
1713            injected,
1714            record,
1715            context,
1716            out,
1717            base_path,
1718            locals,
1719        ),
1720        "drop" => eval_array_drop(
1721            &expr_op.args,
1722            injected,
1723            record,
1724            context,
1725            out,
1726            base_path,
1727            locals,
1728        ),
1729        "slice" => eval_array_slice(
1730            &expr_op.args,
1731            injected,
1732            record,
1733            context,
1734            out,
1735            base_path,
1736            locals,
1737        ),
1738        "chunk" => eval_array_chunk(
1739            &expr_op.args,
1740            injected,
1741            record,
1742            context,
1743            out,
1744            base_path,
1745            locals,
1746        ),
1747        "zip" => eval_array_zip(
1748            &expr_op.args,
1749            injected,
1750            record,
1751            context,
1752            out,
1753            base_path,
1754            locals,
1755        ),
1756        "zip_with" => eval_array_zip_with(
1757            &expr_op.args,
1758            injected,
1759            record,
1760            context,
1761            out,
1762            base_path,
1763            locals,
1764        ),
1765        "unzip" => eval_array_unzip(
1766            &expr_op.args,
1767            injected,
1768            record,
1769            context,
1770            out,
1771            base_path,
1772            locals,
1773        ),
1774        "group_by" => eval_array_group_by(
1775            &expr_op.args,
1776            injected,
1777            record,
1778            context,
1779            out,
1780            base_path,
1781            locals,
1782        ),
1783        "key_by" => eval_array_key_by(
1784            &expr_op.args,
1785            injected,
1786            record,
1787            context,
1788            out,
1789            base_path,
1790            locals,
1791        ),
1792        "partition" => eval_array_partition(
1793            &expr_op.args,
1794            injected,
1795            record,
1796            context,
1797            out,
1798            base_path,
1799            locals,
1800        ),
1801        "unique" => eval_array_unique(
1802            &expr_op.args,
1803            injected,
1804            record,
1805            context,
1806            out,
1807            base_path,
1808            locals,
1809        ),
1810        "distinct_by" => eval_array_distinct_by(
1811            &expr_op.args,
1812            injected,
1813            record,
1814            context,
1815            out,
1816            base_path,
1817            locals,
1818        ),
1819        "sort_by" => eval_array_sort_by(
1820            &expr_op.args,
1821            injected,
1822            record,
1823            context,
1824            out,
1825            base_path,
1826            locals,
1827        ),
1828        "find" => eval_array_find(
1829            &expr_op.args,
1830            injected,
1831            record,
1832            context,
1833            out,
1834            base_path,
1835            locals,
1836        ),
1837        "find_index" => eval_array_find_index(
1838            &expr_op.args,
1839            injected,
1840            record,
1841            context,
1842            out,
1843            base_path,
1844            locals,
1845        ),
1846        "index_of" => eval_array_index_of(
1847            &expr_op.args,
1848            injected,
1849            record,
1850            context,
1851            out,
1852            base_path,
1853            locals,
1854        ),
1855        "contains" => eval_array_contains(
1856            &expr_op.args,
1857            injected,
1858            record,
1859            context,
1860            out,
1861            base_path,
1862            locals,
1863        ),
1864        "sum" => eval_array_sum(
1865            &expr_op.args,
1866            injected,
1867            record,
1868            context,
1869            out,
1870            base_path,
1871            locals,
1872        ),
1873        "avg" => eval_array_avg(
1874            &expr_op.args,
1875            injected,
1876            record,
1877            context,
1878            out,
1879            base_path,
1880            locals,
1881        ),
1882        "min" => eval_array_min(
1883            &expr_op.args,
1884            injected,
1885            record,
1886            context,
1887            out,
1888            base_path,
1889            locals,
1890        ),
1891        "max" => eval_array_max(
1892            &expr_op.args,
1893            injected,
1894            record,
1895            context,
1896            out,
1897            base_path,
1898            locals,
1899        ),
1900        "reduce" => eval_array_reduce(
1901            &expr_op.args,
1902            injected,
1903            record,
1904            context,
1905            out,
1906            base_path,
1907            locals,
1908        ),
1909        "fold" => eval_array_fold(
1910            &expr_op.args,
1911            injected,
1912            record,
1913            context,
1914            out,
1915            base_path,
1916            locals,
1917        ),
1918        "+" | "-" | "*" | "/" => {
1919            eval_numeric_op(expr_op, injected, record, context, out, base_path, locals)
1920        }
1921        "round" => eval_round(
1922            &expr_op.args,
1923            injected,
1924            record,
1925            context,
1926            out,
1927            base_path,
1928            locals,
1929        ),
1930        "to_base" => eval_to_base(
1931            &expr_op.args,
1932            injected,
1933            record,
1934            context,
1935            out,
1936            base_path,
1937            locals,
1938        ),
1939        "date_format" => eval_date_format(
1940            &expr_op.args,
1941            injected,
1942            record,
1943            context,
1944            out,
1945            base_path,
1946            locals,
1947        ),
1948        "to_unixtime" => eval_to_unixtime(
1949            &expr_op.args,
1950            injected,
1951            record,
1952            context,
1953            out,
1954            base_path,
1955            locals,
1956        ),
1957        "and" => eval_bool_and_or(
1958            &expr_op.args,
1959            injected,
1960            record,
1961            context,
1962            out,
1963            base_path,
1964            true,
1965            locals,
1966        ),
1967        "or" => eval_bool_and_or(
1968            &expr_op.args,
1969            injected,
1970            record,
1971            context,
1972            out,
1973            base_path,
1974            false,
1975            locals,
1976        ),
1977        "not" => eval_bool_not(
1978            &expr_op.args,
1979            injected,
1980            record,
1981            context,
1982            out,
1983            base_path,
1984            locals,
1985        ),
1986        "==" | "!=" | "<" | "<=" | ">" | ">=" | "~=" => {
1987            eval_compare(expr_op, injected, record, context, out, base_path, locals)
1988        }
1989        _ => Err(
1990            TransformError::new(TransformErrorKind::ExprError, "expr.op is not supported")
1991                .with_path(format!("{}.op", base_path)),
1992        ),
1993    }
1994}
1995
1996fn eval_unary_string_op<F>(
1997    args: &[Expr],
1998    injected: Option<&EvalValue>,
1999    record: &JsonValue,
2000    context: Option<&JsonValue>,
2001    out: &JsonValue,
2002    base_path: &str,
2003    locals: Option<&EvalLocals<'_>>,
2004    op: F,
2005) -> Result<EvalValue, TransformError>
2006where
2007    F: FnOnce(&JsonValue, &str) -> Result<JsonValue, TransformError>,
2008{
2009    let total_len = args_len(args, injected);
2010    if total_len != 1 {
2011        return Err(TransformError::new(
2012            TransformErrorKind::ExprError,
2013            "expr.args must contain exactly one item",
2014        )
2015        .with_path(format!("{}.args", base_path)));
2016    }
2017
2018    let arg_path = format!("{}.args[0]", base_path);
2019    let value = eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
2020    match value {
2021        EvalValue::Missing => Ok(EvalValue::Missing),
2022        EvalValue::Value(value) => {
2023            if value.is_null() {
2024                return Err(TransformError::new(
2025                    TransformErrorKind::ExprError,
2026                    "expr arg must not be null",
2027                )
2028                .with_path(arg_path));
2029            }
2030            op(&value, &arg_path).map(EvalValue::Value)
2031        }
2032    }
2033}
2034
2035fn args_len(args: &[Expr], injected: Option<&EvalValue>) -> usize {
2036    args.len() + usize::from(injected.is_some())
2037}
2038
2039fn arg_expr_at<'a>(
2040    index: usize,
2041    args: &'a [Expr],
2042    injected: Option<&EvalValue>,
2043) -> Option<&'a Expr> {
2044    if injected.is_some() {
2045        if index == 0 {
2046            None
2047        } else {
2048            args.get(index - 1)
2049        }
2050    } else {
2051        args.get(index)
2052    }
2053}
2054
2055fn eval_expr_at_index(
2056    index: usize,
2057    args: &[Expr],
2058    injected: Option<&EvalValue>,
2059    record: &JsonValue,
2060    context: Option<&JsonValue>,
2061    out: &JsonValue,
2062    base_path: &str,
2063    locals: Option<&EvalLocals<'_>>,
2064) -> Result<EvalValue, TransformError> {
2065    if let Some(injected) = injected {
2066        if index == 0 {
2067            return Ok(injected.clone());
2068        }
2069        let arg = args.get(index - 1).ok_or_else(|| {
2070            TransformError::new(
2071                TransformErrorKind::ExprError,
2072                "expr.args index is out of bounds",
2073            )
2074            .with_path(format!("{}.args[{}]", base_path, index))
2075        })?;
2076        let arg_path = format!("{}.args[{}]", base_path, index);
2077        return eval_expr(arg, record, context, out, &arg_path, locals);
2078    }
2079
2080    let arg = args.get(index).ok_or_else(|| {
2081        TransformError::new(
2082            TransformErrorKind::ExprError,
2083            "expr.args index is out of bounds",
2084        )
2085        .with_path(format!("{}.args[{}]", base_path, index))
2086    })?;
2087    let arg_path = format!("{}.args[{}]", base_path, index);
2088    eval_expr(arg, record, context, out, &arg_path, locals)
2089}
2090
2091fn eval_arg_value_at(
2092    index: usize,
2093    args: &[Expr],
2094    injected: Option<&EvalValue>,
2095    record: &JsonValue,
2096    context: Option<&JsonValue>,
2097    out: &JsonValue,
2098    base_path: &str,
2099    locals: Option<&EvalLocals<'_>>,
2100) -> Result<Option<JsonValue>, TransformError> {
2101    match eval_expr_at_index(
2102        index, args, injected, record, context, out, base_path, locals,
2103    )? {
2104        EvalValue::Missing => Ok(None),
2105        EvalValue::Value(value) => Ok(Some(value)),
2106    }
2107}
2108
2109fn eval_arg_string_at(
2110    index: usize,
2111    args: &[Expr],
2112    injected: Option<&EvalValue>,
2113    record: &JsonValue,
2114    context: Option<&JsonValue>,
2115    out: &JsonValue,
2116    base_path: &str,
2117    locals: Option<&EvalLocals<'_>>,
2118) -> Result<Option<String>, TransformError> {
2119    let value = match eval_arg_value_at(
2120        index, args, injected, record, context, out, base_path, locals,
2121    )? {
2122        None => return Ok(None),
2123        Some(value) => value,
2124    };
2125    let arg_path = format!("{}.args[{}]", base_path, index);
2126    if value.is_null() {
2127        return Err(TransformError::new(
2128            TransformErrorKind::ExprError,
2129            "expr arg must not be null",
2130        )
2131        .with_path(arg_path));
2132    }
2133    value_as_string(&value, &arg_path).map(Some)
2134}
2135
2136fn eval_expr_value_or_null_at(
2137    index: usize,
2138    args: &[Expr],
2139    injected: Option<&EvalValue>,
2140    record: &JsonValue,
2141    context: Option<&JsonValue>,
2142    out: &JsonValue,
2143    base_path: &str,
2144    locals: Option<&EvalLocals<'_>>,
2145) -> Result<JsonValue, TransformError> {
2146    match eval_expr_at_index(
2147        index, args, injected, record, context, out, base_path, locals,
2148    )? {
2149        EvalValue::Missing => Ok(JsonValue::Null),
2150        EvalValue::Value(value) => Ok(value),
2151    }
2152}
2153
2154#[derive(Clone, Copy)]
2155enum ReplaceMode {
2156    LiteralFirst,
2157    LiteralAll,
2158    RegexFirst,
2159    RegexAll,
2160}
2161
2162fn parse_replace_mode(value: &str, path: &str) -> Result<ReplaceMode, TransformError> {
2163    match value {
2164        "all" => Ok(ReplaceMode::LiteralAll),
2165        "regex" => Ok(ReplaceMode::RegexFirst),
2166        "regex_all" => Ok(ReplaceMode::RegexAll),
2167        _ => Err(TransformError::new(
2168            TransformErrorKind::ExprError,
2169            "replace mode must be all|regex|regex_all",
2170        )
2171        .with_path(path)),
2172    }
2173}
2174
2175fn eval_replace(
2176    args: &[Expr],
2177    injected: Option<&EvalValue>,
2178    record: &JsonValue,
2179    context: Option<&JsonValue>,
2180    out: &JsonValue,
2181    base_path: &str,
2182    locals: Option<&EvalLocals<'_>>,
2183) -> Result<EvalValue, TransformError> {
2184    let total_len = args_len(args, injected);
2185    if !(3..=4).contains(&total_len) {
2186        return Err(TransformError::new(
2187            TransformErrorKind::ExprError,
2188            "expr.args must contain three or four items",
2189        )
2190        .with_path(format!("{}.args", base_path)));
2191    }
2192
2193    let value =
2194        match eval_arg_string_at(0, args, injected, record, context, out, base_path, locals)? {
2195            None => return Ok(EvalValue::Missing),
2196            Some(value) => value,
2197        };
2198    let pattern =
2199        match eval_arg_string_at(1, args, injected, record, context, out, base_path, locals)? {
2200            None => return Ok(EvalValue::Missing),
2201            Some(value) => value,
2202        };
2203    let replacement =
2204        match eval_arg_string_at(2, args, injected, record, context, out, base_path, locals)? {
2205            None => return Ok(EvalValue::Missing),
2206            Some(value) => value,
2207        };
2208    let pattern_path = format!("{}.args[1]", base_path);
2209
2210    let mode = if total_len == 4 {
2211        let mode_path = format!("{}.args[3]", base_path);
2212        let mode_value =
2213            match eval_arg_string_at(3, args, injected, record, context, out, base_path, locals)? {
2214                None => return Ok(EvalValue::Missing),
2215                Some(value) => value,
2216            };
2217        parse_replace_mode(&mode_value, &mode_path)?
2218    } else {
2219        ReplaceMode::LiteralFirst
2220    };
2221
2222    let replaced = match mode {
2223        ReplaceMode::LiteralFirst => value.replacen(&pattern, &replacement, 1),
2224        ReplaceMode::LiteralAll => value.replace(&pattern, &replacement),
2225        ReplaceMode::RegexFirst => {
2226            let regex = cached_regex(&pattern, &pattern_path)?;
2227            regex.replace(&value, replacement.as_str()).to_string()
2228        }
2229        ReplaceMode::RegexAll => {
2230            let regex = cached_regex(&pattern, &pattern_path)?;
2231            regex.replace_all(&value, replacement.as_str()).to_string()
2232        }
2233    };
2234
2235    Ok(EvalValue::Value(JsonValue::String(replaced)))
2236}
2237
2238fn eval_split(
2239    args: &[Expr],
2240    injected: Option<&EvalValue>,
2241    record: &JsonValue,
2242    context: Option<&JsonValue>,
2243    out: &JsonValue,
2244    base_path: &str,
2245    locals: Option<&EvalLocals<'_>>,
2246) -> Result<EvalValue, TransformError> {
2247    let total_len = args_len(args, injected);
2248    if total_len != 2 {
2249        return Err(TransformError::new(
2250            TransformErrorKind::ExprError,
2251            "expr.args must contain exactly two items",
2252        )
2253        .with_path(format!("{}.args", base_path)));
2254    }
2255
2256    let value =
2257        match eval_arg_string_at(0, args, injected, record, context, out, base_path, locals)? {
2258            None => return Ok(EvalValue::Missing),
2259            Some(value) => value,
2260        };
2261    let delimiter =
2262        match eval_arg_string_at(1, args, injected, record, context, out, base_path, locals)? {
2263            None => return Ok(EvalValue::Missing),
2264            Some(value) => value,
2265        };
2266    let delimiter_path = format!("{}.args[1]", base_path);
2267
2268    if delimiter.is_empty() {
2269        return Err(TransformError::new(
2270            TransformErrorKind::ExprError,
2271            "split delimiter must not be empty",
2272        )
2273        .with_path(delimiter_path));
2274    }
2275
2276    let parts = value
2277        .split(&delimiter)
2278        .map(|part| JsonValue::String(part.to_string()))
2279        .collect::<Vec<_>>();
2280
2281    Ok(EvalValue::Value(JsonValue::Array(parts)))
2282}
2283
2284fn eval_pad(
2285    args: &[Expr],
2286    injected: Option<&EvalValue>,
2287    record: &JsonValue,
2288    context: Option<&JsonValue>,
2289    out: &JsonValue,
2290    base_path: &str,
2291    pad_start: bool,
2292    locals: Option<&EvalLocals<'_>>,
2293) -> Result<EvalValue, TransformError> {
2294    let total_len = args_len(args, injected);
2295    if !(2..=3).contains(&total_len) {
2296        return Err(TransformError::new(
2297            TransformErrorKind::ExprError,
2298            "expr.args must contain two or three items",
2299        )
2300        .with_path(format!("{}.args", base_path)));
2301    }
2302
2303    let value =
2304        match eval_arg_string_at(0, args, injected, record, context, out, base_path, locals)? {
2305            None => return Ok(EvalValue::Missing),
2306            Some(value) => value,
2307        };
2308
2309    let length_value =
2310        match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
2311            None => return Ok(EvalValue::Missing),
2312            Some(value) => value,
2313        };
2314    let length_path = format!("{}.args[1]", base_path);
2315    if length_value.is_null() {
2316        return Err(TransformError::new(
2317            TransformErrorKind::ExprError,
2318            "expr arg must not be null",
2319        )
2320        .with_path(length_path));
2321    }
2322    let length = value_to_i64(
2323        &length_value,
2324        &length_path,
2325        "pad length must be a non-negative integer",
2326    )?;
2327    if length < 0 {
2328        return Err(TransformError::new(
2329            TransformErrorKind::ExprError,
2330            "pad length must be a non-negative integer",
2331        )
2332        .with_path(length_path));
2333    }
2334
2335    let pad_string = if total_len == 3 {
2336        match eval_arg_string_at(2, args, injected, record, context, out, base_path, locals)? {
2337            None => return Ok(EvalValue::Missing),
2338            Some(value) => value,
2339        }
2340    } else {
2341        " ".to_string()
2342    };
2343
2344    let target_len = usize::try_from(length).map_err(|_| {
2345        TransformError::new(
2346            TransformErrorKind::ExprError,
2347            "pad length must be a non-negative integer",
2348        )
2349        .with_path(length_path)
2350    })?;
2351
2352    let padded = pad_string_value(&value, target_len, &pad_string, pad_start);
2353    Ok(EvalValue::Value(JsonValue::String(padded)))
2354}
2355
2356fn pad_string_value(value: &str, target_len: usize, pad: &str, pad_start: bool) -> String {
2357    let value_len = value.chars().count();
2358    if value_len >= target_len || pad.is_empty() {
2359        return value.to_string();
2360    }
2361
2362    let needed = target_len - value_len;
2363    let pad_len = pad.chars().count();
2364    let repeats = (needed + pad_len - 1) / pad_len;
2365    let pad_buf = pad.repeat(repeats);
2366    let pad_slice = pad_buf.chars().take(needed).collect::<String>();
2367
2368    if pad_start {
2369        format!("{}{}", pad_slice, value)
2370    } else {
2371        format!("{}{}", value, pad_slice)
2372    }
2373}
2374
2375fn eval_numeric_op(
2376    expr_op: &ExprOp,
2377    injected: Option<&EvalValue>,
2378    record: &JsonValue,
2379    context: Option<&JsonValue>,
2380    out: &JsonValue,
2381    base_path: &str,
2382    locals: Option<&EvalLocals<'_>>,
2383) -> Result<EvalValue, TransformError> {
2384    let op = expr_op.op.as_str();
2385    let args = &expr_op.args;
2386    let total_len = args_len(args, injected);
2387
2388    let requires_exact_two = matches!(op, "-" | "/");
2389    if requires_exact_two && total_len != 2 {
2390        return Err(TransformError::new(
2391            TransformErrorKind::ExprError,
2392            "expr.args must contain exactly two items",
2393        )
2394        .with_path(format!("{}.args", base_path)));
2395    }
2396    if !requires_exact_two && total_len < 2 {
2397        return Err(TransformError::new(
2398            TransformErrorKind::ExprError,
2399            "expr.args must contain at least two items",
2400        )
2401        .with_path(format!("{}.args", base_path)));
2402    }
2403
2404    let mut result: f64 = 0.0;
2405    for index in 0..total_len {
2406        let arg_path = format!("{}.args[{}]", base_path, index);
2407        let value = match eval_arg_value_at(
2408            index, args, injected, record, context, out, base_path, locals,
2409        )? {
2410            None => return Ok(EvalValue::Missing),
2411            Some(value) => value,
2412        };
2413        if value.is_null() {
2414            return Err(TransformError::new(
2415                TransformErrorKind::ExprError,
2416                "expr arg must not be null",
2417            )
2418            .with_path(arg_path));
2419        }
2420        let number = value_to_number(&value, &arg_path, "operand must be a number")?;
2421        if index == 0 {
2422            result = number;
2423        } else {
2424            result = match op {
2425                "+" => result + number,
2426                "-" => result - number,
2427                "*" => result * number,
2428                "/" => result / number,
2429                _ => result,
2430            };
2431        }
2432    }
2433
2434    Ok(EvalValue::Value(json_number_from_f64(result, base_path)?))
2435}
2436
2437fn eval_round(
2438    args: &[Expr],
2439    injected: Option<&EvalValue>,
2440    record: &JsonValue,
2441    context: Option<&JsonValue>,
2442    out: &JsonValue,
2443    base_path: &str,
2444    locals: Option<&EvalLocals<'_>>,
2445) -> Result<EvalValue, TransformError> {
2446    let total_len = args_len(args, injected);
2447    if !(1..=2).contains(&total_len) {
2448        return Err(TransformError::new(
2449            TransformErrorKind::ExprError,
2450            "expr.args must contain one or two items",
2451        )
2452        .with_path(format!("{}.args", base_path)));
2453    }
2454
2455    let value = match eval_arg_value_at(0, args, injected, record, context, out, base_path, locals)?
2456    {
2457        None => return Ok(EvalValue::Missing),
2458        Some(value) => value,
2459    };
2460    let value_path = format!("{}.args[0]", base_path);
2461    if value.is_null() {
2462        return Err(TransformError::new(
2463            TransformErrorKind::ExprError,
2464            "expr arg must not be null",
2465        )
2466        .with_path(value_path));
2467    }
2468    let number = value_to_number(&value, &value_path, "operand must be a number")?;
2469
2470    let scale = if total_len == 2 {
2471        let scale_path = format!("{}.args[1]", base_path);
2472        let scale_value =
2473            match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
2474                None => return Ok(EvalValue::Missing),
2475                Some(value) => value,
2476            };
2477        if scale_value.is_null() {
2478            return Err(TransformError::new(
2479                TransformErrorKind::ExprError,
2480                "expr arg must not be null",
2481            )
2482            .with_path(scale_path));
2483        }
2484        let scale = value_to_i64(
2485            &scale_value,
2486            &scale_path,
2487            "scale must be a non-negative integer",
2488        )?;
2489        if scale < 0 {
2490            return Err(TransformError::new(
2491                TransformErrorKind::ExprError,
2492                "scale must be a non-negative integer",
2493            )
2494            .with_path(scale_path));
2495        }
2496        if scale > 308 {
2497            return Err(
2498                TransformError::new(TransformErrorKind::ExprError, "scale is too large")
2499                    .with_path(scale_path),
2500            );
2501        }
2502        scale as i32
2503    } else {
2504        0
2505    };
2506
2507    let rounded = if scale == 0 {
2508        number.round()
2509    } else {
2510        let factor = 10f64.powi(scale);
2511        (number * factor).round() / factor
2512    };
2513
2514    Ok(EvalValue::Value(json_number_from_f64(rounded, base_path)?))
2515}
2516
2517fn eval_to_base(
2518    args: &[Expr],
2519    injected: Option<&EvalValue>,
2520    record: &JsonValue,
2521    context: Option<&JsonValue>,
2522    out: &JsonValue,
2523    base_path: &str,
2524    locals: Option<&EvalLocals<'_>>,
2525) -> Result<EvalValue, TransformError> {
2526    let total_len = args_len(args, injected);
2527    if total_len != 2 {
2528        return Err(TransformError::new(
2529            TransformErrorKind::ExprError,
2530            "expr.args must contain exactly two items",
2531        )
2532        .with_path(format!("{}.args", base_path)));
2533    }
2534
2535    let value = match eval_arg_value_at(0, args, injected, record, context, out, base_path, locals)?
2536    {
2537        None => return Ok(EvalValue::Missing),
2538        Some(value) => value,
2539    };
2540    let base_value =
2541        match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
2542            None => return Ok(EvalValue::Missing),
2543            Some(value) => value,
2544        };
2545    let value_path = format!("{}.args[0]", base_path);
2546    let base_path_arg = format!("{}.args[1]", base_path);
2547    if value.is_null() {
2548        return Err(TransformError::new(
2549            TransformErrorKind::ExprError,
2550            "expr arg must not be null",
2551        )
2552        .with_path(value_path));
2553    }
2554    if base_value.is_null() {
2555        return Err(TransformError::new(
2556            TransformErrorKind::ExprError,
2557            "expr arg must not be null",
2558        )
2559        .with_path(base_path_arg));
2560    }
2561
2562    let number = value_to_i64(&value, &value_path, "value must be an integer")?;
2563    let base = value_to_i64(&base_value, &base_path_arg, "base must be an integer")?;
2564    if !(2..=36).contains(&base) {
2565        return Err(TransformError::new(
2566            TransformErrorKind::ExprError,
2567            "base must be between 2 and 36",
2568        )
2569        .with_path(base_path_arg));
2570    }
2571
2572    let formatted = to_radix_string(number, base as u32, &value_path)?;
2573    Ok(EvalValue::Value(JsonValue::String(formatted)))
2574}
2575
2576fn eval_date_format(
2577    args: &[Expr],
2578    injected: Option<&EvalValue>,
2579    record: &JsonValue,
2580    context: Option<&JsonValue>,
2581    out: &JsonValue,
2582    base_path: &str,
2583    locals: Option<&EvalLocals<'_>>,
2584) -> Result<EvalValue, TransformError> {
2585    let total_len = args_len(args, injected);
2586    if !(2..=4).contains(&total_len) {
2587        return Err(TransformError::new(
2588            TransformErrorKind::ExprError,
2589            "expr.args must contain two to four items",
2590        )
2591        .with_path(format!("{}.args", base_path)));
2592    }
2593
2594    let value =
2595        match eval_arg_string_at(0, args, injected, record, context, out, base_path, locals)? {
2596            None => return Ok(EvalValue::Missing),
2597            Some(value) => value,
2598        };
2599    let output_format =
2600        match eval_arg_string_at(1, args, injected, record, context, out, base_path, locals)? {
2601            None => return Ok(EvalValue::Missing),
2602            Some(value) => value,
2603        };
2604    let value_path = format!("{}.args[0]", base_path);
2605    let mut input_formats: Option<Vec<String>> = None;
2606    let mut timezone: Option<FixedOffset> = None;
2607
2608    if total_len >= 3 {
2609        let input_path = format!("{}.args[2]", base_path);
2610        let input_value =
2611            match eval_arg_value_at(2, args, injected, record, context, out, base_path, locals)? {
2612                None => return Ok(EvalValue::Missing),
2613                Some(value) => value,
2614            };
2615        if input_value.is_null() {
2616            return Err(TransformError::new(
2617                TransformErrorKind::ExprError,
2618                "expr arg must not be null",
2619            )
2620            .with_path(input_path));
2621        }
2622
2623        if let Some(value) = input_value.as_str() {
2624            if looks_like_timezone(value) {
2625                timezone = Some(parse_timezone(value, &input_path)?);
2626            } else {
2627                input_formats = Some(parse_format_list(&input_value, &input_path)?);
2628            }
2629        } else {
2630            input_formats = Some(parse_format_list(&input_value, &input_path)?);
2631        }
2632    }
2633
2634    if total_len == 4 {
2635        let tz_path = format!("{}.args[3]", base_path);
2636        let tz_value =
2637            match eval_arg_string_at(3, args, injected, record, context, out, base_path, locals)? {
2638                None => return Ok(EvalValue::Missing),
2639                Some(value) => value,
2640            };
2641        timezone = Some(parse_timezone(&tz_value, &tz_path)?);
2642    }
2643
2644    let dt = parse_datetime(&value, input_formats.as_deref(), timezone, &value_path)?;
2645    let dt = match timezone {
2646        Some(offset) => dt.with_timezone(&offset),
2647        None => dt,
2648    };
2649    let formatted = dt.format(&output_format).to_string();
2650    Ok(EvalValue::Value(JsonValue::String(formatted)))
2651}
2652
2653fn eval_to_unixtime(
2654    args: &[Expr],
2655    injected: Option<&EvalValue>,
2656    record: &JsonValue,
2657    context: Option<&JsonValue>,
2658    out: &JsonValue,
2659    base_path: &str,
2660    locals: Option<&EvalLocals<'_>>,
2661) -> Result<EvalValue, TransformError> {
2662    let total_len = args_len(args, injected);
2663    if !(1..=3).contains(&total_len) {
2664        return Err(TransformError::new(
2665            TransformErrorKind::ExprError,
2666            "expr.args must contain one to three items",
2667        )
2668        .with_path(format!("{}.args", base_path)));
2669    }
2670
2671    let value =
2672        match eval_arg_string_at(0, args, injected, record, context, out, base_path, locals)? {
2673            None => return Ok(EvalValue::Missing),
2674            Some(value) => value,
2675        };
2676    let value_path = format!("{}.args[0]", base_path);
2677
2678    let mut unit = "s".to_string();
2679    let mut timezone: Option<FixedOffset> = None;
2680
2681    if total_len >= 2 {
2682        let arg_path = format!("{}.args[1]", base_path);
2683        let arg_value =
2684            match eval_arg_string_at(1, args, injected, record, context, out, base_path, locals)? {
2685                None => return Ok(EvalValue::Missing),
2686                Some(value) => value,
2687            };
2688        if total_len == 3 {
2689            if arg_value != "s" && arg_value != "ms" {
2690                return Err(TransformError::new(
2691                    TransformErrorKind::ExprError,
2692                    "unit must be s or ms",
2693                )
2694                .with_path(arg_path));
2695            }
2696            unit = arg_value;
2697        } else if arg_value == "s" || arg_value == "ms" {
2698            unit = arg_value;
2699        } else if looks_like_timezone(&arg_value) {
2700            timezone = Some(parse_timezone(&arg_value, &arg_path)?);
2701        } else {
2702            return Err(
2703                TransformError::new(TransformErrorKind::ExprError, "unit must be s or ms")
2704                    .with_path(arg_path),
2705            );
2706        }
2707    }
2708
2709    if total_len == 3 {
2710        let tz_path = format!("{}.args[2]", base_path);
2711        let tz_value =
2712            match eval_arg_string_at(2, args, injected, record, context, out, base_path, locals)? {
2713                None => return Ok(EvalValue::Missing),
2714                Some(value) => value,
2715            };
2716        timezone = Some(parse_timezone(&tz_value, &tz_path)?);
2717    }
2718
2719    let dt = parse_datetime(&value, None, timezone, &value_path)?;
2720    let dt = match timezone {
2721        Some(offset) => dt.with_timezone(&offset),
2722        None => dt,
2723    };
2724    let timestamp = if unit == "ms" {
2725        dt.timestamp_millis()
2726    } else {
2727        dt.timestamp()
2728    };
2729
2730    Ok(EvalValue::Value(JsonValue::Number(timestamp.into())))
2731}
2732
2733fn eval_lookup(
2734    args: &[Expr],
2735    injected: Option<&EvalValue>,
2736    record: &JsonValue,
2737    context: Option<&JsonValue>,
2738    out: &JsonValue,
2739    base_path: &str,
2740    first_only: bool,
2741    locals: Option<&EvalLocals<'_>>,
2742) -> Result<EvalValue, TransformError> {
2743    let total_len = args_len(args, injected);
2744    if !(3..=4).contains(&total_len) {
2745        return Err(TransformError::new(
2746            TransformErrorKind::ExprError,
2747            "lookup args must be [collection, key_path, match_value, output_path?]",
2748        )
2749        .with_path(format!("{}.args", base_path)));
2750    }
2751
2752    let collection_path = format!("{}.args[0]", base_path);
2753    let collection =
2754        match eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)? {
2755            EvalValue::Missing => return Ok(EvalValue::Missing),
2756            EvalValue::Value(value) => value,
2757        };
2758    let collection_array = match collection {
2759        JsonValue::Array(items) => items,
2760        JsonValue::Null => {
2761            return Err(TransformError::new(
2762                TransformErrorKind::ExprError,
2763                "lookup collection must be an array",
2764            )
2765            .with_path(collection_path));
2766        }
2767        _ => {
2768            return Err(TransformError::new(
2769                TransformErrorKind::ExprError,
2770                "lookup collection must be an array",
2771            )
2772            .with_path(collection_path));
2773        }
2774    };
2775
2776    let key_expr = arg_expr_at(1, args, injected).ok_or_else(|| {
2777        TransformError::new(
2778            TransformErrorKind::ExprError,
2779            "lookup key_path must be a non-empty string literal",
2780        )
2781        .with_path(format!("{}.args[1]", base_path))
2782    })?;
2783    let key_path = literal_string(key_expr).ok_or_else(|| {
2784        TransformError::new(
2785            TransformErrorKind::ExprError,
2786            "lookup key_path must be a non-empty string literal",
2787        )
2788        .with_path(format!("{}.args[1]", base_path))
2789    })?;
2790    if key_path.is_empty() {
2791        return Err(TransformError::new(
2792            TransformErrorKind::ExprError,
2793            "lookup key_path must be a non-empty string literal",
2794        )
2795        .with_path(format!("{}.args[1]", base_path)));
2796    }
2797    let key_tokens = parse_path(key_path).map_err(|_| {
2798        TransformError::new(TransformErrorKind::ExprError, "lookup key_path is invalid")
2799            .with_path(format!("{}.args[1]", base_path))
2800    })?;
2801
2802    let output_tokens = if total_len == 4 {
2803        let output_expr = arg_expr_at(3, args, injected).ok_or_else(|| {
2804            TransformError::new(
2805                TransformErrorKind::ExprError,
2806                "lookup output_path must be a non-empty string literal",
2807            )
2808            .with_path(format!("{}.args[3]", base_path))
2809        })?;
2810        let value = literal_string(output_expr).ok_or_else(|| {
2811            TransformError::new(
2812                TransformErrorKind::ExprError,
2813                "lookup output_path must be a non-empty string literal",
2814            )
2815            .with_path(format!("{}.args[3]", base_path))
2816        })?;
2817        if value.is_empty() {
2818            return Err(TransformError::new(
2819                TransformErrorKind::ExprError,
2820                "lookup output_path must be a non-empty string literal",
2821            )
2822            .with_path(format!("{}.args[3]", base_path)));
2823        }
2824        let tokens = parse_path(value).map_err(|_| {
2825            TransformError::new(
2826                TransformErrorKind::ExprError,
2827                "lookup output_path is invalid",
2828            )
2829            .with_path(format!("{}.args[3]", base_path))
2830        })?;
2831        Some(tokens)
2832    } else {
2833        None
2834    };
2835
2836    let match_path = format!("{}.args[2]", base_path);
2837    let match_value =
2838        match eval_expr_at_index(2, args, injected, record, context, out, base_path, locals)? {
2839            EvalValue::Missing => return Ok(EvalValue::Missing),
2840            EvalValue::Value(value) => value,
2841        };
2842    if match_value.is_null() {
2843        return Err(TransformError::new(
2844            TransformErrorKind::ExprError,
2845            "lookup match_value must not be null",
2846        )
2847        .with_path(match_path));
2848    }
2849    let match_key = value_to_string(&match_value, &match_path)?;
2850
2851    let mut results = Vec::new();
2852    for item in &collection_array {
2853        let key_value = match get_path(item, &key_tokens) {
2854            Some(value) => value,
2855            None => continue,
2856        };
2857        let item_key = match value_to_string_optional(key_value) {
2858            Some(value) => value,
2859            None => continue,
2860        };
2861        if item_key != match_key {
2862            continue;
2863        }
2864
2865        let selected = match output_tokens.as_ref() {
2866            Some(tokens) => get_path(item, tokens),
2867            None => Some(item),
2868        };
2869
2870        if let Some(value) = selected {
2871            if first_only {
2872                return Ok(EvalValue::Value(value.clone()));
2873            }
2874            results.push(value.clone());
2875        }
2876    }
2877
2878    if results.is_empty() {
2879        Ok(EvalValue::Missing)
2880    } else {
2881        Ok(EvalValue::Value(JsonValue::Array(results)))
2882    }
2883}
2884
2885fn locals_with_item<'a>(locals: Option<&EvalLocals<'a>>, item: EvalItem<'a>) -> EvalLocals<'a> {
2886    EvalLocals {
2887        item: Some(item),
2888        acc: locals.and_then(|locals| locals.acc),
2889        pipe: locals.and_then(|locals| locals.pipe),
2890        locals: locals.and_then(|locals| locals.locals),
2891    }
2892}
2893
2894fn eval_array_arg(
2895    index: usize,
2896    args: &[Expr],
2897    injected: Option<&EvalValue>,
2898    record: &JsonValue,
2899    context: Option<&JsonValue>,
2900    out: &JsonValue,
2901    base_path: &str,
2902    locals: Option<&EvalLocals<'_>>,
2903) -> Result<Vec<JsonValue>, TransformError> {
2904    let arg_path = format!("{}.args[{}]", base_path, index);
2905    match eval_expr_at_index(
2906        index, args, injected, record, context, out, base_path, locals,
2907    )? {
2908        EvalValue::Missing => Ok(Vec::new()),
2909        EvalValue::Value(value) => {
2910            if value.is_null() {
2911                Ok(Vec::new())
2912            } else if let JsonValue::Array(items) = value {
2913                Ok(items)
2914            } else {
2915                Err(
2916                    TransformError::new(TransformErrorKind::ExprError, "expr arg must be an array")
2917                        .with_path(arg_path),
2918                )
2919            }
2920        }
2921    }
2922}
2923
2924fn eval_expr_or_null(
2925    expr: &Expr,
2926    record: &JsonValue,
2927    context: Option<&JsonValue>,
2928    out: &JsonValue,
2929    base_path: &str,
2930    locals: Option<&EvalLocals<'_>>,
2931) -> Result<JsonValue, TransformError> {
2932    match eval_expr(expr, record, context, out, base_path, locals)? {
2933        EvalValue::Missing => Ok(JsonValue::Null),
2934        EvalValue::Value(value) => Ok(value),
2935    }
2936}
2937
2938fn eval_predicate_expr(
2939    expr: &Expr,
2940    record: &JsonValue,
2941    context: Option<&JsonValue>,
2942    out: &JsonValue,
2943    base_path: &str,
2944    locals: Option<&EvalLocals<'_>>,
2945) -> Result<bool, TransformError> {
2946    match eval_expr(expr, record, context, out, base_path, locals)? {
2947        EvalValue::Missing => Ok(false),
2948        EvalValue::Value(value) => {
2949            if value.is_null() {
2950                return Ok(false);
2951            }
2952            let flag = value_as_bool(&value, base_path)?;
2953            Ok(flag)
2954        }
2955    }
2956}
2957
2958fn eval_key_expr_string(
2959    expr: &Expr,
2960    record: &JsonValue,
2961    context: Option<&JsonValue>,
2962    out: &JsonValue,
2963    base_path: &str,
2964    locals: Option<&EvalLocals<'_>>,
2965) -> Result<String, TransformError> {
2966    let value = match eval_expr(expr, record, context, out, base_path, locals)? {
2967        EvalValue::Missing => {
2968            return Err(TransformError::new(
2969                TransformErrorKind::ExprError,
2970                "expr arg must not be missing",
2971            )
2972            .with_path(base_path));
2973        }
2974        EvalValue::Value(value) => value,
2975    };
2976    if value.is_null() {
2977        return Err(TransformError::new(
2978            TransformErrorKind::ExprError,
2979            "expr arg must not be null",
2980        )
2981        .with_path(base_path));
2982    }
2983    value_to_string(&value, base_path)
2984}
2985
2986fn ensure_eq_compatible(value: &JsonValue, path: &str) -> Result<(), TransformError> {
2987    if value.is_null() {
2988        return Ok(());
2989    }
2990    if value_to_string_optional(value).is_some() {
2991        return Ok(());
2992    }
2993    Err(expr_type_error(
2994        "value must be string/number/bool or null",
2995        path,
2996    ))
2997}
2998
2999#[derive(Clone, Copy, PartialEq, Eq)]
3000enum SortKeyKind {
3001    Number,
3002    String,
3003    Bool,
3004}
3005
3006#[derive(Clone)]
3007enum SortKey {
3008    Number(f64),
3009    String(String),
3010    Bool(bool),
3011}
3012
3013impl SortKey {
3014    fn kind(&self) -> SortKeyKind {
3015        match self {
3016            SortKey::Number(_) => SortKeyKind::Number,
3017            SortKey::String(_) => SortKeyKind::String,
3018            SortKey::Bool(_) => SortKeyKind::Bool,
3019        }
3020    }
3021}
3022
3023fn compare_sort_keys(left: &SortKey, right: &SortKey) -> Ordering {
3024    match (left, right) {
3025        (SortKey::Number(l), SortKey::Number(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
3026        (SortKey::String(l), SortKey::String(r)) => l.cmp(r),
3027        (SortKey::Bool(l), SortKey::Bool(r)) => l.cmp(r),
3028        _ => Ordering::Equal,
3029    }
3030}
3031
3032fn eval_sort_key(
3033    expr: &Expr,
3034    record: &JsonValue,
3035    context: Option<&JsonValue>,
3036    out: &JsonValue,
3037    base_path: &str,
3038    locals: Option<&EvalLocals<'_>>,
3039) -> Result<SortKey, TransformError> {
3040    let value = match eval_expr(expr, record, context, out, base_path, locals)? {
3041        EvalValue::Missing => {
3042            return Err(TransformError::new(
3043                TransformErrorKind::ExprError,
3044                "expr arg must not be missing",
3045            )
3046            .with_path(base_path));
3047        }
3048        EvalValue::Value(value) => value,
3049    };
3050    if value.is_null() {
3051        return Err(TransformError::new(
3052            TransformErrorKind::ExprError,
3053            "expr arg must not be null",
3054        )
3055        .with_path(base_path));
3056    }
3057
3058    match value {
3059        JsonValue::Number(number) => {
3060            let value = number
3061                .as_f64()
3062                .filter(|value| value.is_finite())
3063                .ok_or_else(|| expr_type_error("sort_by key must be a finite number", base_path))?;
3064            Ok(SortKey::Number(value))
3065        }
3066        JsonValue::String(value) => Ok(SortKey::String(value)),
3067        JsonValue::Bool(value) => Ok(SortKey::Bool(value)),
3068        _ => Err(expr_type_error(
3069            "sort_by key must be string/number/bool",
3070            base_path,
3071        )),
3072    }
3073}
3074
3075fn eval_array_map(
3076    args: &[Expr],
3077    injected: Option<&EvalValue>,
3078    record: &JsonValue,
3079    context: Option<&JsonValue>,
3080    out: &JsonValue,
3081    base_path: &str,
3082    locals: Option<&EvalLocals<'_>>,
3083) -> Result<EvalValue, TransformError> {
3084    let total_len = args_len(args, injected);
3085    if total_len != 2 {
3086        return Err(TransformError::new(
3087            TransformErrorKind::ExprError,
3088            "expr.args must contain exactly two items",
3089        )
3090        .with_path(format!("{}.args", base_path)));
3091    }
3092
3093    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3094    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3095        TransformError::new(
3096            TransformErrorKind::ExprError,
3097            "expr.args index is out of bounds",
3098        )
3099        .with_path(format!("{}.args[1]", base_path))
3100    })?;
3101    let expr_index = if injected.is_some() { 0 } else { 1 };
3102    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3103
3104    let mut results = Vec::with_capacity(array.len());
3105    for (index, item) in array.iter().enumerate() {
3106        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3107        let value = eval_expr_or_null(expr, record, context, out, &expr_path, Some(&item_locals))?;
3108        results.push(value);
3109    }
3110
3111    Ok(EvalValue::Value(JsonValue::Array(results)))
3112}
3113
3114fn eval_array_filter(
3115    args: &[Expr],
3116    injected: Option<&EvalValue>,
3117    record: &JsonValue,
3118    context: Option<&JsonValue>,
3119    out: &JsonValue,
3120    base_path: &str,
3121    locals: Option<&EvalLocals<'_>>,
3122) -> Result<EvalValue, TransformError> {
3123    let total_len = args_len(args, injected);
3124    if total_len != 2 {
3125        return Err(TransformError::new(
3126            TransformErrorKind::ExprError,
3127            "expr.args must contain exactly two items",
3128        )
3129        .with_path(format!("{}.args", base_path)));
3130    }
3131
3132    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3133    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3134        TransformError::new(
3135            TransformErrorKind::ExprError,
3136            "expr.args index is out of bounds",
3137        )
3138        .with_path(format!("{}.args[1]", base_path))
3139    })?;
3140    let expr_index = if injected.is_some() { 0 } else { 1 };
3141    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3142
3143    let mut results = Vec::new();
3144    for (index, item) in array.iter().enumerate() {
3145        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3146        if eval_predicate_expr(expr, record, context, out, &expr_path, Some(&item_locals))? {
3147            results.push(item.clone());
3148        }
3149    }
3150
3151    Ok(EvalValue::Value(JsonValue::Array(results)))
3152}
3153
3154fn eval_array_flat_map(
3155    args: &[Expr],
3156    injected: Option<&EvalValue>,
3157    record: &JsonValue,
3158    context: Option<&JsonValue>,
3159    out: &JsonValue,
3160    base_path: &str,
3161    locals: Option<&EvalLocals<'_>>,
3162) -> Result<EvalValue, TransformError> {
3163    let total_len = args_len(args, injected);
3164    if total_len != 2 {
3165        return Err(TransformError::new(
3166            TransformErrorKind::ExprError,
3167            "expr.args must contain exactly two items",
3168        )
3169        .with_path(format!("{}.args", base_path)));
3170    }
3171
3172    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3173    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3174        TransformError::new(
3175            TransformErrorKind::ExprError,
3176            "expr.args index is out of bounds",
3177        )
3178        .with_path(format!("{}.args[1]", base_path))
3179    })?;
3180    let expr_index = if injected.is_some() { 0 } else { 1 };
3181    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3182
3183    let mut results = Vec::new();
3184    for (index, item) in array.iter().enumerate() {
3185        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3186        let value = eval_expr_or_null(expr, record, context, out, &expr_path, Some(&item_locals))?;
3187        match value {
3188            JsonValue::Array(items) => results.extend(items),
3189            value => results.push(value),
3190        }
3191    }
3192
3193    Ok(EvalValue::Value(JsonValue::Array(results)))
3194}
3195
3196fn flatten_value(value: &JsonValue, depth: usize, out: &mut Vec<JsonValue>) {
3197    if depth == 0 {
3198        out.push(value.clone());
3199        return;
3200    }
3201
3202    if let JsonValue::Array(items) = value {
3203        for item in items {
3204            flatten_value(item, depth - 1, out);
3205        }
3206    } else {
3207        out.push(value.clone());
3208    }
3209}
3210
3211fn eval_array_flatten(
3212    args: &[Expr],
3213    injected: Option<&EvalValue>,
3214    record: &JsonValue,
3215    context: Option<&JsonValue>,
3216    out: &JsonValue,
3217    base_path: &str,
3218    locals: Option<&EvalLocals<'_>>,
3219) -> Result<EvalValue, TransformError> {
3220    let total_len = args_len(args, injected);
3221    if !(1..=2).contains(&total_len) {
3222        return Err(TransformError::new(
3223            TransformErrorKind::ExprError,
3224            "expr.args must contain one or two items",
3225        )
3226        .with_path(format!("{}.args", base_path)));
3227    }
3228
3229    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3230    let depth = if total_len == 2 {
3231        let depth_path = format!("{}.args[1]", base_path);
3232        let depth_value =
3233            match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
3234                None => return Ok(EvalValue::Missing),
3235                Some(value) => value,
3236            };
3237        if depth_value.is_null() {
3238            return Err(TransformError::new(
3239                TransformErrorKind::ExprError,
3240                "expr arg must not be null",
3241            )
3242            .with_path(depth_path));
3243        }
3244        let depth = value_to_i64(
3245            &depth_value,
3246            &depth_path,
3247            "depth must be a non-negative integer",
3248        )?;
3249        if depth < 0 {
3250            return Err(TransformError::new(
3251                TransformErrorKind::ExprError,
3252                "depth must be a non-negative integer",
3253            )
3254            .with_path(depth_path));
3255        }
3256        usize::try_from(depth).map_err(|_| {
3257            TransformError::new(TransformErrorKind::ExprError, "depth is too large")
3258                .with_path(depth_path)
3259        })?
3260    } else {
3261        1
3262    };
3263
3264    let mut results = Vec::new();
3265    for item in &array {
3266        flatten_value(item, depth, &mut results);
3267    }
3268
3269    Ok(EvalValue::Value(JsonValue::Array(results)))
3270}
3271
3272fn eval_array_take(
3273    args: &[Expr],
3274    injected: Option<&EvalValue>,
3275    record: &JsonValue,
3276    context: Option<&JsonValue>,
3277    out: &JsonValue,
3278    base_path: &str,
3279    locals: Option<&EvalLocals<'_>>,
3280) -> Result<EvalValue, TransformError> {
3281    let total_len = args_len(args, injected);
3282    if total_len != 2 {
3283        return Err(TransformError::new(
3284            TransformErrorKind::ExprError,
3285            "expr.args must contain exactly two items",
3286        )
3287        .with_path(format!("{}.args", base_path)));
3288    }
3289
3290    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3291    let count_path = format!("{}.args[1]", base_path);
3292    let count_value =
3293        match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
3294            None => return Ok(EvalValue::Missing),
3295            Some(value) => value,
3296        };
3297    if count_value.is_null() {
3298        return Err(TransformError::new(
3299            TransformErrorKind::ExprError,
3300            "expr arg must not be null",
3301        )
3302        .with_path(count_path));
3303    }
3304    let count = value_to_i64(&count_value, &count_path, "count must be an integer")?;
3305
3306    let len = array.len() as i64;
3307    let results = if count >= 0 {
3308        let take_count = count.min(len).max(0) as usize;
3309        array[..take_count].to_vec()
3310    } else {
3311        let abs_count = if count == i64::MIN {
3312            (i64::MAX as u64) + 1
3313        } else {
3314            (-count) as u64
3315        };
3316        let take_count = abs_count.min(array.len() as u64) as usize;
3317        let start = array.len().saturating_sub(take_count);
3318        array[start..].to_vec()
3319    };
3320
3321    Ok(EvalValue::Value(JsonValue::Array(results)))
3322}
3323
3324fn eval_array_drop(
3325    args: &[Expr],
3326    injected: Option<&EvalValue>,
3327    record: &JsonValue,
3328    context: Option<&JsonValue>,
3329    out: &JsonValue,
3330    base_path: &str,
3331    locals: Option<&EvalLocals<'_>>,
3332) -> Result<EvalValue, TransformError> {
3333    let total_len = args_len(args, injected);
3334    if total_len != 2 {
3335        return Err(TransformError::new(
3336            TransformErrorKind::ExprError,
3337            "expr.args must contain exactly two items",
3338        )
3339        .with_path(format!("{}.args", base_path)));
3340    }
3341
3342    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3343    let count_path = format!("{}.args[1]", base_path);
3344    let count_value =
3345        match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
3346            None => return Ok(EvalValue::Missing),
3347            Some(value) => value,
3348        };
3349    if count_value.is_null() {
3350        return Err(TransformError::new(
3351            TransformErrorKind::ExprError,
3352            "expr arg must not be null",
3353        )
3354        .with_path(count_path));
3355    }
3356    let count = value_to_i64(&count_value, &count_path, "count must be an integer")?;
3357
3358    let len = array.len() as i64;
3359    let results = if count >= 0 {
3360        let drop_count = count.min(len).max(0) as usize;
3361        array[drop_count..].to_vec()
3362    } else {
3363        let abs_count = if count == i64::MIN {
3364            (i64::MAX as u64) + 1
3365        } else {
3366            (-count) as u64
3367        };
3368        let drop_count = abs_count.min(array.len() as u64) as usize;
3369        let end = array.len().saturating_sub(drop_count);
3370        array[..end].to_vec()
3371    };
3372
3373    Ok(EvalValue::Value(JsonValue::Array(results)))
3374}
3375
3376fn eval_array_slice(
3377    args: &[Expr],
3378    injected: Option<&EvalValue>,
3379    record: &JsonValue,
3380    context: Option<&JsonValue>,
3381    out: &JsonValue,
3382    base_path: &str,
3383    locals: Option<&EvalLocals<'_>>,
3384) -> Result<EvalValue, TransformError> {
3385    let total_len = args_len(args, injected);
3386    if !(2..=3).contains(&total_len) {
3387        return Err(TransformError::new(
3388            TransformErrorKind::ExprError,
3389            "expr.args must contain two or three items",
3390        )
3391        .with_path(format!("{}.args", base_path)));
3392    }
3393
3394    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3395    let len = array.len() as i64;
3396
3397    let start_path = format!("{}.args[1]", base_path);
3398    let start_value =
3399        match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
3400            None => return Ok(EvalValue::Missing),
3401            Some(value) => value,
3402        };
3403    if start_value.is_null() {
3404        return Err(TransformError::new(
3405            TransformErrorKind::ExprError,
3406            "expr arg must not be null",
3407        )
3408        .with_path(start_path));
3409    }
3410    let start = value_to_i64(&start_value, &start_path, "start must be an integer")?;
3411
3412    let end = if total_len == 3 {
3413        let end_path = format!("{}.args[2]", base_path);
3414        let end_value =
3415            match eval_arg_value_at(2, args, injected, record, context, out, base_path, locals)? {
3416                None => return Ok(EvalValue::Missing),
3417                Some(value) => value,
3418            };
3419        if end_value.is_null() {
3420            return Err(TransformError::new(
3421                TransformErrorKind::ExprError,
3422                "expr arg must not be null",
3423            )
3424            .with_path(end_path));
3425        }
3426        value_to_i64(&end_value, &end_path, "end must be an integer")?
3427    } else {
3428        len
3429    };
3430
3431    let mut start_index = if start < 0 { len + start } else { start };
3432    let mut end_index = if end < 0 { len + end } else { end };
3433    start_index = start_index.clamp(0, len);
3434    end_index = end_index.clamp(0, len);
3435
3436    let results = if end_index <= start_index {
3437        Vec::new()
3438    } else {
3439        array[start_index as usize..end_index as usize].to_vec()
3440    };
3441
3442    Ok(EvalValue::Value(JsonValue::Array(results)))
3443}
3444
3445fn eval_array_chunk(
3446    args: &[Expr],
3447    injected: Option<&EvalValue>,
3448    record: &JsonValue,
3449    context: Option<&JsonValue>,
3450    out: &JsonValue,
3451    base_path: &str,
3452    locals: Option<&EvalLocals<'_>>,
3453) -> Result<EvalValue, TransformError> {
3454    let total_len = args_len(args, injected);
3455    if total_len != 2 {
3456        return Err(TransformError::new(
3457            TransformErrorKind::ExprError,
3458            "expr.args must contain exactly two items",
3459        )
3460        .with_path(format!("{}.args", base_path)));
3461    }
3462
3463    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3464    let size_path = format!("{}.args[1]", base_path);
3465    let size_value =
3466        match eval_arg_value_at(1, args, injected, record, context, out, base_path, locals)? {
3467            None => return Ok(EvalValue::Missing),
3468            Some(value) => value,
3469        };
3470    if size_value.is_null() {
3471        return Err(TransformError::new(
3472            TransformErrorKind::ExprError,
3473            "expr arg must not be null",
3474        )
3475        .with_path(size_path));
3476    }
3477    let size = value_to_i64(&size_value, &size_path, "size must be a positive integer")?;
3478    if size <= 0 {
3479        return Err(TransformError::new(
3480            TransformErrorKind::ExprError,
3481            "size must be a positive integer",
3482        )
3483        .with_path(size_path));
3484    }
3485    let size = usize::try_from(size).map_err(|_| {
3486        TransformError::new(TransformErrorKind::ExprError, "size is too large").with_path(size_path)
3487    })?;
3488
3489    let mut chunks = Vec::new();
3490    let mut index = 0;
3491    while index < array.len() {
3492        let end = (index + size).min(array.len());
3493        chunks.push(JsonValue::Array(array[index..end].to_vec()));
3494        index = end;
3495    }
3496
3497    Ok(EvalValue::Value(JsonValue::Array(chunks)))
3498}
3499
3500fn eval_array_zip(
3501    args: &[Expr],
3502    injected: Option<&EvalValue>,
3503    record: &JsonValue,
3504    context: Option<&JsonValue>,
3505    out: &JsonValue,
3506    base_path: &str,
3507    locals: Option<&EvalLocals<'_>>,
3508) -> Result<EvalValue, TransformError> {
3509    let total_len = args_len(args, injected);
3510    if total_len < 2 {
3511        return Err(TransformError::new(
3512            TransformErrorKind::ExprError,
3513            "expr.args must contain at least two items",
3514        )
3515        .with_path(format!("{}.args", base_path)));
3516    }
3517
3518    let mut arrays = Vec::new();
3519    for index in 0..total_len {
3520        arrays.push(eval_array_arg(
3521            index, args, injected, record, context, out, base_path, locals,
3522        )?);
3523    }
3524
3525    let min_len = arrays.iter().map(|items| items.len()).min().unwrap_or(0);
3526    let mut results = Vec::with_capacity(min_len);
3527    for idx in 0..min_len {
3528        let mut row = Vec::with_capacity(arrays.len());
3529        for array in &arrays {
3530            row.push(array[idx].clone());
3531        }
3532        results.push(JsonValue::Array(row));
3533    }
3534
3535    Ok(EvalValue::Value(JsonValue::Array(results)))
3536}
3537
3538fn eval_array_zip_with(
3539    args: &[Expr],
3540    injected: Option<&EvalValue>,
3541    record: &JsonValue,
3542    context: Option<&JsonValue>,
3543    out: &JsonValue,
3544    base_path: &str,
3545    locals: Option<&EvalLocals<'_>>,
3546) -> Result<EvalValue, TransformError> {
3547    let total_len = args_len(args, injected);
3548    if total_len < 3 {
3549        return Err(TransformError::new(
3550            TransformErrorKind::ExprError,
3551            "expr.args must contain at least three items",
3552        )
3553        .with_path(format!("{}.args", base_path)));
3554    }
3555
3556    let expr_index = total_len - 1;
3557    let expr = arg_expr_at(expr_index, args, injected).ok_or_else(|| {
3558        TransformError::new(
3559            TransformErrorKind::ExprError,
3560            "expr.args index is out of bounds",
3561        )
3562        .with_path(format!("{}.args[{}]", base_path, expr_index))
3563    })?;
3564    let expr_arg_index = if injected.is_some() {
3565        expr_index - 1
3566    } else {
3567        expr_index
3568    };
3569    let expr_path = format!("{}.args[{}]", base_path, expr_arg_index);
3570
3571    let mut arrays = Vec::new();
3572    for index in 0..expr_index {
3573        arrays.push(eval_array_arg(
3574            index, args, injected, record, context, out, base_path, locals,
3575        )?);
3576    }
3577
3578    let min_len = arrays.iter().map(|items| items.len()).min().unwrap_or(0);
3579    let mut results = Vec::with_capacity(min_len);
3580    for idx in 0..min_len {
3581        let mut row = Vec::with_capacity(arrays.len());
3582        for array in &arrays {
3583            row.push(array[idx].clone());
3584        }
3585        let row_value = JsonValue::Array(row);
3586        let item_locals = locals_with_item(
3587            locals,
3588            EvalItem {
3589                value: &row_value,
3590                index: idx,
3591            },
3592        );
3593        let value = eval_expr_or_null(expr, record, context, out, &expr_path, Some(&item_locals))?;
3594        results.push(value);
3595    }
3596
3597    Ok(EvalValue::Value(JsonValue::Array(results)))
3598}
3599
3600fn eval_array_unzip(
3601    args: &[Expr],
3602    injected: Option<&EvalValue>,
3603    record: &JsonValue,
3604    context: Option<&JsonValue>,
3605    out: &JsonValue,
3606    base_path: &str,
3607    locals: Option<&EvalLocals<'_>>,
3608) -> Result<EvalValue, TransformError> {
3609    let total_len = args_len(args, injected);
3610    if total_len != 1 {
3611        return Err(TransformError::new(
3612            TransformErrorKind::ExprError,
3613            "expr.args must contain exactly one item",
3614        )
3615        .with_path(format!("{}.args", base_path)));
3616    }
3617
3618    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3619    if array.is_empty() {
3620        return Ok(EvalValue::Value(JsonValue::Array(Vec::new())));
3621    }
3622
3623    let mut columns: Vec<Vec<JsonValue>> = Vec::new();
3624    let mut expected_len: Option<usize> = None;
3625    for item in &array {
3626        let items = match item {
3627            JsonValue::Array(items) => items,
3628            _ => {
3629                return Err(TransformError::new(
3630                    TransformErrorKind::ExprError,
3631                    "unzip items must be arrays",
3632                )
3633                .with_path(format!("{}.args[0]", base_path)));
3634            }
3635        };
3636        if let Some(expected) = expected_len {
3637            if items.len() != expected {
3638                return Err(TransformError::new(
3639                    TransformErrorKind::ExprError,
3640                    "unzip items must have the same length",
3641                )
3642                .with_path(format!("{}.args[0]", base_path)));
3643            }
3644        } else {
3645            expected_len = Some(items.len());
3646            columns = vec![Vec::with_capacity(array.len()); items.len()];
3647        }
3648        for (index, value) in items.iter().enumerate() {
3649            if let Some(column) = columns.get_mut(index) {
3650                column.push(value.clone());
3651            }
3652        }
3653    }
3654
3655    let output = columns
3656        .into_iter()
3657        .map(JsonValue::Array)
3658        .collect::<Vec<_>>();
3659    Ok(EvalValue::Value(JsonValue::Array(output)))
3660}
3661
3662fn eval_array_group_by(
3663    args: &[Expr],
3664    injected: Option<&EvalValue>,
3665    record: &JsonValue,
3666    context: Option<&JsonValue>,
3667    out: &JsonValue,
3668    base_path: &str,
3669    locals: Option<&EvalLocals<'_>>,
3670) -> Result<EvalValue, TransformError> {
3671    let total_len = args_len(args, injected);
3672    if total_len != 2 {
3673        return Err(TransformError::new(
3674            TransformErrorKind::ExprError,
3675            "expr.args must contain exactly two items",
3676        )
3677        .with_path(format!("{}.args", base_path)));
3678    }
3679
3680    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3681    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3682        TransformError::new(
3683            TransformErrorKind::ExprError,
3684            "expr.args index is out of bounds",
3685        )
3686        .with_path(format!("{}.args[1]", base_path))
3687    })?;
3688    let expr_index = if injected.is_some() { 0 } else { 1 };
3689    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3690
3691    let mut results = Map::new();
3692    for (index, item) in array.iter().enumerate() {
3693        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3694        let key = eval_key_expr_string(expr, record, context, out, &expr_path, Some(&item_locals))?;
3695        let entry = results
3696            .entry(key)
3697            .or_insert_with(|| JsonValue::Array(Vec::new()));
3698        if let JsonValue::Array(items) = entry {
3699            items.push(item.clone());
3700        }
3701    }
3702
3703    Ok(EvalValue::Value(JsonValue::Object(results)))
3704}
3705
3706fn eval_array_key_by(
3707    args: &[Expr],
3708    injected: Option<&EvalValue>,
3709    record: &JsonValue,
3710    context: Option<&JsonValue>,
3711    out: &JsonValue,
3712    base_path: &str,
3713    locals: Option<&EvalLocals<'_>>,
3714) -> Result<EvalValue, TransformError> {
3715    let total_len = args_len(args, injected);
3716    if total_len != 2 {
3717        return Err(TransformError::new(
3718            TransformErrorKind::ExprError,
3719            "expr.args must contain exactly two items",
3720        )
3721        .with_path(format!("{}.args", base_path)));
3722    }
3723
3724    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3725    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3726        TransformError::new(
3727            TransformErrorKind::ExprError,
3728            "expr.args index is out of bounds",
3729        )
3730        .with_path(format!("{}.args[1]", base_path))
3731    })?;
3732    let expr_index = if injected.is_some() { 0 } else { 1 };
3733    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3734
3735    let mut results = Map::new();
3736    for (index, item) in array.iter().enumerate() {
3737        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3738        let key = eval_key_expr_string(expr, record, context, out, &expr_path, Some(&item_locals))?;
3739        results.insert(key, item.clone());
3740    }
3741
3742    Ok(EvalValue::Value(JsonValue::Object(results)))
3743}
3744
3745fn eval_array_partition(
3746    args: &[Expr],
3747    injected: Option<&EvalValue>,
3748    record: &JsonValue,
3749    context: Option<&JsonValue>,
3750    out: &JsonValue,
3751    base_path: &str,
3752    locals: Option<&EvalLocals<'_>>,
3753) -> Result<EvalValue, TransformError> {
3754    let total_len = args_len(args, injected);
3755    if total_len != 2 {
3756        return Err(TransformError::new(
3757            TransformErrorKind::ExprError,
3758            "expr.args must contain exactly two items",
3759        )
3760        .with_path(format!("{}.args", base_path)));
3761    }
3762
3763    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3764    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3765        TransformError::new(
3766            TransformErrorKind::ExprError,
3767            "expr.args index is out of bounds",
3768        )
3769        .with_path(format!("{}.args[1]", base_path))
3770    })?;
3771    let expr_index = if injected.is_some() { 0 } else { 1 };
3772    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3773
3774    let mut matched = Vec::new();
3775    let mut unmatched = Vec::new();
3776    for (index, item) in array.iter().enumerate() {
3777        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3778        if eval_predicate_expr(expr, record, context, out, &expr_path, Some(&item_locals))? {
3779            matched.push(item.clone());
3780        } else {
3781            unmatched.push(item.clone());
3782        }
3783    }
3784
3785    Ok(EvalValue::Value(JsonValue::Array(vec![
3786        JsonValue::Array(matched),
3787        JsonValue::Array(unmatched),
3788    ])))
3789}
3790
3791fn eval_array_unique(
3792    args: &[Expr],
3793    injected: Option<&EvalValue>,
3794    record: &JsonValue,
3795    context: Option<&JsonValue>,
3796    out: &JsonValue,
3797    base_path: &str,
3798    locals: Option<&EvalLocals<'_>>,
3799) -> Result<EvalValue, TransformError> {
3800    let total_len = args_len(args, injected);
3801    if total_len != 1 {
3802        return Err(TransformError::new(
3803            TransformErrorKind::ExprError,
3804            "expr.args must contain exactly one item",
3805        )
3806        .with_path(format!("{}.args", base_path)));
3807    }
3808
3809    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3810    let item_path = format!("{}.args[0]", base_path);
3811
3812    let mut results: Vec<JsonValue> = Vec::new();
3813    for item in array {
3814        ensure_eq_compatible(&item, &item_path)?;
3815        let mut exists = false;
3816        for existing in &results {
3817            if compare_eq(&item, existing, &item_path, &item_path)? {
3818                exists = true;
3819                break;
3820            }
3821        }
3822        if !exists {
3823            results.push(item);
3824        }
3825    }
3826
3827    Ok(EvalValue::Value(JsonValue::Array(results)))
3828}
3829
3830fn eval_array_distinct_by(
3831    args: &[Expr],
3832    injected: Option<&EvalValue>,
3833    record: &JsonValue,
3834    context: Option<&JsonValue>,
3835    out: &JsonValue,
3836    base_path: &str,
3837    locals: Option<&EvalLocals<'_>>,
3838) -> Result<EvalValue, TransformError> {
3839    let total_len = args_len(args, injected);
3840    if total_len != 2 {
3841        return Err(TransformError::new(
3842            TransformErrorKind::ExprError,
3843            "expr.args must contain exactly two items",
3844        )
3845        .with_path(format!("{}.args", base_path)));
3846    }
3847
3848    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3849    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3850        TransformError::new(
3851            TransformErrorKind::ExprError,
3852            "expr.args index is out of bounds",
3853        )
3854        .with_path(format!("{}.args[1]", base_path))
3855    })?;
3856    let expr_index = if injected.is_some() { 0 } else { 1 };
3857    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3858
3859    let mut results = Vec::new();
3860    let mut seen = HashSet::new();
3861    for (index, item) in array.iter().enumerate() {
3862        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3863        let key = eval_key_expr_string(expr, record, context, out, &expr_path, Some(&item_locals))?;
3864        if seen.insert(key) {
3865            results.push(item.clone());
3866        }
3867    }
3868
3869    Ok(EvalValue::Value(JsonValue::Array(results)))
3870}
3871
3872fn eval_array_sort_by(
3873    args: &[Expr],
3874    injected: Option<&EvalValue>,
3875    record: &JsonValue,
3876    context: Option<&JsonValue>,
3877    out: &JsonValue,
3878    base_path: &str,
3879    locals: Option<&EvalLocals<'_>>,
3880) -> Result<EvalValue, TransformError> {
3881    let total_len = args_len(args, injected);
3882    if !(2..=3).contains(&total_len) {
3883        return Err(TransformError::new(
3884            TransformErrorKind::ExprError,
3885            "expr.args must contain two or three items",
3886        )
3887        .with_path(format!("{}.args", base_path)));
3888    }
3889
3890    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3891    if array.is_empty() {
3892        return Ok(EvalValue::Value(JsonValue::Array(Vec::new())));
3893    }
3894
3895    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3896        TransformError::new(
3897            TransformErrorKind::ExprError,
3898            "expr.args index is out of bounds",
3899        )
3900        .with_path(format!("{}.args[1]", base_path))
3901    })?;
3902    let expr_index = if injected.is_some() { 0 } else { 1 };
3903    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3904
3905    let order = if total_len == 3 {
3906        let order_path = format!("{}.args[2]", base_path);
3907        let value =
3908            match eval_arg_string_at(2, args, injected, record, context, out, base_path, locals)? {
3909                None => return Ok(EvalValue::Missing),
3910                Some(value) => value,
3911            };
3912        if value != "asc" && value != "desc" {
3913            return Err(TransformError::new(
3914                TransformErrorKind::ExprError,
3915                "order must be asc or desc",
3916            )
3917            .with_path(order_path));
3918        }
3919        value
3920    } else {
3921        "asc".to_string()
3922    };
3923
3924    struct SortItem {
3925        key: SortKey,
3926        index: usize,
3927        value: JsonValue,
3928    }
3929
3930    let mut items = Vec::with_capacity(array.len());
3931    let mut key_kind: Option<SortKeyKind> = None;
3932    for (index, item) in array.iter().enumerate() {
3933        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
3934        let key = eval_sort_key(expr, record, context, out, &expr_path, Some(&item_locals))?;
3935        let kind = key.kind();
3936        if let Some(existing) = key_kind {
3937            if existing != kind {
3938                return Err(expr_type_error(
3939                    "sort_by keys must be all the same type",
3940                    &expr_path,
3941                ));
3942            }
3943        } else {
3944            key_kind = Some(kind);
3945        }
3946        items.push(SortItem {
3947            key,
3948            index,
3949            value: item.clone(),
3950        });
3951    }
3952
3953    items.sort_by(|left, right| {
3954        let mut ordering = compare_sort_keys(&left.key, &right.key);
3955        if order == "desc" {
3956            ordering = ordering.reverse();
3957        }
3958        if ordering == Ordering::Equal {
3959            left.index.cmp(&right.index)
3960        } else {
3961            ordering
3962        }
3963    });
3964
3965    let results = items.into_iter().map(|item| item.value).collect::<Vec<_>>();
3966    Ok(EvalValue::Value(JsonValue::Array(results)))
3967}
3968
3969fn eval_array_find(
3970    args: &[Expr],
3971    injected: Option<&EvalValue>,
3972    record: &JsonValue,
3973    context: Option<&JsonValue>,
3974    out: &JsonValue,
3975    base_path: &str,
3976    locals: Option<&EvalLocals<'_>>,
3977) -> Result<EvalValue, TransformError> {
3978    let total_len = args_len(args, injected);
3979    if total_len != 2 {
3980        return Err(TransformError::new(
3981            TransformErrorKind::ExprError,
3982            "expr.args must contain exactly two items",
3983        )
3984        .with_path(format!("{}.args", base_path)));
3985    }
3986
3987    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
3988    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
3989        TransformError::new(
3990            TransformErrorKind::ExprError,
3991            "expr.args index is out of bounds",
3992        )
3993        .with_path(format!("{}.args[1]", base_path))
3994    })?;
3995    let expr_index = if injected.is_some() { 0 } else { 1 };
3996    let expr_path = format!("{}.args[{}]", base_path, expr_index);
3997
3998    for (index, item) in array.iter().enumerate() {
3999        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
4000        if eval_predicate_expr(expr, record, context, out, &expr_path, Some(&item_locals))? {
4001            return Ok(EvalValue::Value(item.clone()));
4002        }
4003    }
4004
4005    Ok(EvalValue::Value(JsonValue::Null))
4006}
4007
4008fn eval_array_find_index(
4009    args: &[Expr],
4010    injected: Option<&EvalValue>,
4011    record: &JsonValue,
4012    context: Option<&JsonValue>,
4013    out: &JsonValue,
4014    base_path: &str,
4015    locals: Option<&EvalLocals<'_>>,
4016) -> Result<EvalValue, TransformError> {
4017    let total_len = args_len(args, injected);
4018    if total_len != 2 {
4019        return Err(TransformError::new(
4020            TransformErrorKind::ExprError,
4021            "expr.args must contain exactly two items",
4022        )
4023        .with_path(format!("{}.args", base_path)));
4024    }
4025
4026    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4027    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
4028        TransformError::new(
4029            TransformErrorKind::ExprError,
4030            "expr.args index is out of bounds",
4031        )
4032        .with_path(format!("{}.args[1]", base_path))
4033    })?;
4034    let expr_index = if injected.is_some() { 0 } else { 1 };
4035    let expr_path = format!("{}.args[{}]", base_path, expr_index);
4036
4037    for (index, item) in array.iter().enumerate() {
4038        let item_locals = locals_with_item(locals, EvalItem { value: item, index });
4039        if eval_predicate_expr(expr, record, context, out, &expr_path, Some(&item_locals))? {
4040            return Ok(EvalValue::Value(JsonValue::Number((index as i64).into())));
4041        }
4042    }
4043
4044    Ok(EvalValue::Value(JsonValue::Number((-1).into())))
4045}
4046
4047fn eval_array_index_of(
4048    args: &[Expr],
4049    injected: Option<&EvalValue>,
4050    record: &JsonValue,
4051    context: Option<&JsonValue>,
4052    out: &JsonValue,
4053    base_path: &str,
4054    locals: Option<&EvalLocals<'_>>,
4055) -> Result<EvalValue, TransformError> {
4056    let total_len = args_len(args, injected);
4057    if total_len != 2 {
4058        return Err(TransformError::new(
4059            TransformErrorKind::ExprError,
4060            "expr.args must contain exactly two items",
4061        )
4062        .with_path(format!("{}.args", base_path)));
4063    }
4064
4065    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4066    let value_path = format!("{}.args[1]", base_path);
4067    let value =
4068        eval_expr_value_or_null_at(1, args, injected, record, context, out, base_path, locals)?;
4069
4070    ensure_eq_compatible(&value, &value_path)?;
4071    let item_path = format!("{}.args[0]", base_path);
4072    for (index, item) in array.iter().enumerate() {
4073        ensure_eq_compatible(item, &item_path)?;
4074        if compare_eq(item, &value, &item_path, &value_path)? {
4075            return Ok(EvalValue::Value(JsonValue::Number((index as i64).into())));
4076        }
4077    }
4078
4079    Ok(EvalValue::Value(JsonValue::Number((-1).into())))
4080}
4081
4082fn eval_array_contains(
4083    args: &[Expr],
4084    injected: Option<&EvalValue>,
4085    record: &JsonValue,
4086    context: Option<&JsonValue>,
4087    out: &JsonValue,
4088    base_path: &str,
4089    locals: Option<&EvalLocals<'_>>,
4090) -> Result<EvalValue, TransformError> {
4091    let total_len = args_len(args, injected);
4092    if total_len != 2 {
4093        return Err(TransformError::new(
4094            TransformErrorKind::ExprError,
4095            "expr.args must contain exactly two items",
4096        )
4097        .with_path(format!("{}.args", base_path)));
4098    }
4099
4100    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4101    let value_path = format!("{}.args[1]", base_path);
4102    let value =
4103        eval_expr_value_or_null_at(1, args, injected, record, context, out, base_path, locals)?;
4104
4105    ensure_eq_compatible(&value, &value_path)?;
4106    let item_path = format!("{}.args[0]", base_path);
4107    for item in &array {
4108        ensure_eq_compatible(item, &item_path)?;
4109        if compare_eq(item, &value, &item_path, &value_path)? {
4110            return Ok(EvalValue::Value(JsonValue::Bool(true)));
4111        }
4112    }
4113
4114    Ok(EvalValue::Value(JsonValue::Bool(false)))
4115}
4116
4117fn eval_array_sum(
4118    args: &[Expr],
4119    injected: Option<&EvalValue>,
4120    record: &JsonValue,
4121    context: Option<&JsonValue>,
4122    out: &JsonValue,
4123    base_path: &str,
4124    locals: Option<&EvalLocals<'_>>,
4125) -> Result<EvalValue, TransformError> {
4126    let total_len = args_len(args, injected);
4127    if total_len != 1 {
4128        return Err(TransformError::new(
4129            TransformErrorKind::ExprError,
4130            "expr.args must contain exactly one item",
4131        )
4132        .with_path(format!("{}.args", base_path)));
4133    }
4134
4135    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4136    if array.is_empty() {
4137        return Ok(EvalValue::Value(JsonValue::Null));
4138    }
4139
4140    let item_path = format!("{}.args[0]", base_path);
4141    let mut sum = 0.0;
4142    for item in &array {
4143        let value = value_to_number(item, &item_path, "array item must be a number")?;
4144        sum += value;
4145    }
4146
4147    Ok(EvalValue::Value(json_number_from_f64(sum, base_path)?))
4148}
4149
4150fn eval_array_avg(
4151    args: &[Expr],
4152    injected: Option<&EvalValue>,
4153    record: &JsonValue,
4154    context: Option<&JsonValue>,
4155    out: &JsonValue,
4156    base_path: &str,
4157    locals: Option<&EvalLocals<'_>>,
4158) -> Result<EvalValue, TransformError> {
4159    let total_len = args_len(args, injected);
4160    if total_len != 1 {
4161        return Err(TransformError::new(
4162            TransformErrorKind::ExprError,
4163            "expr.args must contain exactly one item",
4164        )
4165        .with_path(format!("{}.args", base_path)));
4166    }
4167
4168    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4169    if array.is_empty() {
4170        return Ok(EvalValue::Value(JsonValue::Null));
4171    }
4172
4173    let item_path = format!("{}.args[0]", base_path);
4174    let mut sum = 0.0;
4175    for item in &array {
4176        let value = value_to_number(item, &item_path, "array item must be a number")?;
4177        sum += value;
4178    }
4179    let avg = sum / array.len() as f64;
4180
4181    Ok(EvalValue::Value(json_number_from_f64(avg, base_path)?))
4182}
4183
4184fn eval_array_min(
4185    args: &[Expr],
4186    injected: Option<&EvalValue>,
4187    record: &JsonValue,
4188    context: Option<&JsonValue>,
4189    out: &JsonValue,
4190    base_path: &str,
4191    locals: Option<&EvalLocals<'_>>,
4192) -> Result<EvalValue, TransformError> {
4193    let total_len = args_len(args, injected);
4194    if total_len != 1 {
4195        return Err(TransformError::new(
4196            TransformErrorKind::ExprError,
4197            "expr.args must contain exactly one item",
4198        )
4199        .with_path(format!("{}.args", base_path)));
4200    }
4201
4202    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4203    if array.is_empty() {
4204        return Ok(EvalValue::Value(JsonValue::Null));
4205    }
4206
4207    let item_path = format!("{}.args[0]", base_path);
4208    let mut min_value: Option<f64> = None;
4209    for item in &array {
4210        let value = value_to_number(item, &item_path, "array item must be a number")?;
4211        min_value = Some(match min_value {
4212            Some(current) => current.min(value),
4213            None => value,
4214        });
4215    }
4216
4217    Ok(EvalValue::Value(json_number_from_f64(
4218        min_value.unwrap_or(0.0),
4219        base_path,
4220    )?))
4221}
4222
4223fn eval_array_max(
4224    args: &[Expr],
4225    injected: Option<&EvalValue>,
4226    record: &JsonValue,
4227    context: Option<&JsonValue>,
4228    out: &JsonValue,
4229    base_path: &str,
4230    locals: Option<&EvalLocals<'_>>,
4231) -> Result<EvalValue, TransformError> {
4232    let total_len = args_len(args, injected);
4233    if total_len != 1 {
4234        return Err(TransformError::new(
4235            TransformErrorKind::ExprError,
4236            "expr.args must contain exactly one item",
4237        )
4238        .with_path(format!("{}.args", base_path)));
4239    }
4240
4241    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4242    if array.is_empty() {
4243        return Ok(EvalValue::Value(JsonValue::Null));
4244    }
4245
4246    let item_path = format!("{}.args[0]", base_path);
4247    let mut max_value: Option<f64> = None;
4248    for item in &array {
4249        let value = value_to_number(item, &item_path, "array item must be a number")?;
4250        max_value = Some(match max_value {
4251            Some(current) => current.max(value),
4252            None => value,
4253        });
4254    }
4255
4256    Ok(EvalValue::Value(json_number_from_f64(
4257        max_value.unwrap_or(0.0),
4258        base_path,
4259    )?))
4260}
4261
4262fn eval_array_reduce(
4263    args: &[Expr],
4264    injected: Option<&EvalValue>,
4265    record: &JsonValue,
4266    context: Option<&JsonValue>,
4267    out: &JsonValue,
4268    base_path: &str,
4269    locals: Option<&EvalLocals<'_>>,
4270) -> Result<EvalValue, TransformError> {
4271    let total_len = args_len(args, injected);
4272    if total_len != 2 {
4273        return Err(TransformError::new(
4274            TransformErrorKind::ExprError,
4275            "expr.args must contain exactly two items",
4276        )
4277        .with_path(format!("{}.args", base_path)));
4278    }
4279
4280    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4281    if array.is_empty() {
4282        return Ok(EvalValue::Value(JsonValue::Null));
4283    }
4284
4285    let expr = arg_expr_at(1, args, injected).ok_or_else(|| {
4286        TransformError::new(
4287            TransformErrorKind::ExprError,
4288            "expr.args index is out of bounds",
4289        )
4290        .with_path(format!("{}.args[1]", base_path))
4291    })?;
4292    let expr_index = if injected.is_some() { 0 } else { 1 };
4293    let expr_path = format!("{}.args[{}]", base_path, expr_index);
4294
4295    let mut acc = array[0].clone();
4296    for (index, item) in array.iter().enumerate().skip(1) {
4297        let item_locals = EvalLocals {
4298            item: Some(EvalItem { value: item, index }),
4299            acc: Some(&acc),
4300            pipe: locals.and_then(|locals| locals.pipe),
4301            locals: locals.and_then(|locals| locals.locals),
4302        };
4303        let value = eval_expr_or_null(expr, record, context, out, &expr_path, Some(&item_locals))?;
4304        acc = value;
4305    }
4306
4307    Ok(EvalValue::Value(acc))
4308}
4309
4310fn eval_array_fold(
4311    args: &[Expr],
4312    injected: Option<&EvalValue>,
4313    record: &JsonValue,
4314    context: Option<&JsonValue>,
4315    out: &JsonValue,
4316    base_path: &str,
4317    locals: Option<&EvalLocals<'_>>,
4318) -> Result<EvalValue, TransformError> {
4319    let total_len = args_len(args, injected);
4320    if total_len != 3 {
4321        return Err(TransformError::new(
4322            TransformErrorKind::ExprError,
4323            "expr.args must contain exactly three items",
4324        )
4325        .with_path(format!("{}.args", base_path)));
4326    }
4327
4328    let array = eval_array_arg(0, args, injected, record, context, out, base_path, locals)?;
4329    let initial =
4330        match eval_expr_at_index(1, args, injected, record, context, out, base_path, locals)? {
4331            EvalValue::Missing => return Ok(EvalValue::Missing),
4332            EvalValue::Value(value) => value,
4333        };
4334
4335    let expr = arg_expr_at(2, args, injected).ok_or_else(|| {
4336        TransformError::new(
4337            TransformErrorKind::ExprError,
4338            "expr.args index is out of bounds",
4339        )
4340        .with_path(format!("{}.args[2]", base_path))
4341    })?;
4342    let expr_index = if injected.is_some() { 1 } else { 2 };
4343    let expr_path = format!("{}.args[{}]", base_path, expr_index);
4344
4345    let mut acc = initial;
4346    for (index, item) in array.iter().enumerate() {
4347        let item_locals = EvalLocals {
4348            item: Some(EvalItem { value: item, index }),
4349            acc: Some(&acc),
4350            pipe: locals.and_then(|locals| locals.pipe),
4351            locals: locals.and_then(|locals| locals.locals),
4352        };
4353        let value = eval_expr_or_null(expr, record, context, out, &expr_path, Some(&item_locals))?;
4354        acc = value;
4355    }
4356
4357    Ok(EvalValue::Value(acc))
4358}
4359
4360fn eval_json_merge(
4361    args: &[Expr],
4362    injected: Option<&EvalValue>,
4363    record: &JsonValue,
4364    context: Option<&JsonValue>,
4365    out: &JsonValue,
4366    base_path: &str,
4367    deep: bool,
4368    locals: Option<&EvalLocals<'_>>,
4369) -> Result<EvalValue, TransformError> {
4370    let total_len = args_len(args, injected);
4371    if total_len < 2 {
4372        return Err(TransformError::new(
4373            TransformErrorKind::ExprError,
4374            "expr.args must contain at least two items",
4375        )
4376        .with_path(format!("{}.args", base_path)));
4377    }
4378
4379    let mut result: Option<Map<String, JsonValue>> = None;
4380    for index in 0..total_len {
4381        let arg_path = format!("{}.args[{}]", base_path, index);
4382        let value = eval_expr_at_index(
4383            index, args, injected, record, context, out, base_path, locals,
4384        )?;
4385        let value = match value {
4386            EvalValue::Missing => continue,
4387            EvalValue::Value(value) => value,
4388        };
4389        if value.is_null() {
4390            return Err(TransformError::new(
4391                TransformErrorKind::ExprError,
4392                "expr arg must not be null",
4393            )
4394            .with_path(arg_path));
4395        }
4396        let obj = match value {
4397            JsonValue::Object(map) => map,
4398            _ => {
4399                return Err(TransformError::new(
4400                    TransformErrorKind::ExprError,
4401                    "expr arg must be object",
4402                )
4403                .with_path(arg_path));
4404            }
4405        };
4406
4407        match result {
4408            Some(ref mut existing) => merge_object(existing, &obj, deep),
4409            None => result = Some(obj),
4410        }
4411    }
4412
4413    match result {
4414        Some(map) => Ok(EvalValue::Value(JsonValue::Object(map))),
4415        None => Ok(EvalValue::Missing),
4416    }
4417}
4418
4419fn eval_json_get(
4420    args: &[Expr],
4421    injected: Option<&EvalValue>,
4422    record: &JsonValue,
4423    context: Option<&JsonValue>,
4424    out: &JsonValue,
4425    base_path: &str,
4426    locals: Option<&EvalLocals<'_>>,
4427) -> Result<EvalValue, TransformError> {
4428    let total_len = args_len(args, injected);
4429    if total_len != 2 {
4430        return Err(TransformError::new(
4431            TransformErrorKind::ExprError,
4432            "expr.args must contain exactly two items",
4433        )
4434        .with_path(format!("{}.args", base_path)));
4435    }
4436
4437    let base_value =
4438        eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
4439    let base_value = match base_value {
4440        EvalValue::Missing => return Ok(EvalValue::Missing),
4441        EvalValue::Value(value) => value,
4442    };
4443    if base_value.is_null() {
4444        return Ok(EvalValue::Missing);
4445    }
4446
4447    let path_path = format!("{}.args[1]", base_path);
4448    let path_value =
4449        eval_expr_at_index(1, args, injected, record, context, out, base_path, locals)?;
4450    let path_value = match path_value {
4451        EvalValue::Missing => return Ok(EvalValue::Missing),
4452        EvalValue::Value(value) => value,
4453    };
4454    if path_value.is_null() {
4455        return Err(TransformError::new(
4456            TransformErrorKind::ExprError,
4457            "expr arg must not be null",
4458        )
4459        .with_path(path_path));
4460    }
4461    let path = value_as_string(&path_value, &path_path)?;
4462    if path.is_empty() {
4463        return Err(TransformError::new(
4464            TransformErrorKind::ExprError,
4465            "path must be a non-empty string",
4466        )
4467        .with_path(path_path));
4468    }
4469    let tokens = parse_path_tokens(&path, TransformErrorKind::ExprError, &path_path)?;
4470    match get_path(&base_value, &tokens) {
4471        Some(value) => Ok(EvalValue::Value(value.clone())),
4472        None => Ok(EvalValue::Missing),
4473    }
4474}
4475
4476fn eval_json_pick(
4477    args: &[Expr],
4478    injected: Option<&EvalValue>,
4479    record: &JsonValue,
4480    context: Option<&JsonValue>,
4481    out: &JsonValue,
4482    base_path: &str,
4483    locals: Option<&EvalLocals<'_>>,
4484) -> Result<EvalValue, TransformError> {
4485    let total_len = args_len(args, injected);
4486    if total_len != 2 {
4487        return Err(TransformError::new(
4488            TransformErrorKind::ExprError,
4489            "expr.args must contain exactly two items",
4490        )
4491        .with_path(format!("{}.args", base_path)));
4492    }
4493
4494    let base_path_arg = format!("{}.args[0]", base_path);
4495    let base_value =
4496        eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
4497    let base_value = match base_value {
4498        EvalValue::Missing => return Ok(EvalValue::Missing),
4499        EvalValue::Value(value) => value,
4500    };
4501    if base_value.is_null() {
4502        return Err(TransformError::new(
4503            TransformErrorKind::ExprError,
4504            "expr arg must not be null",
4505        )
4506        .with_path(base_path_arg));
4507    }
4508    let base_obj = match base_value {
4509        JsonValue::Object(map) => map,
4510        _ => {
4511            return Err(TransformError::new(
4512                TransformErrorKind::ExprError,
4513                "expr arg must be object",
4514            )
4515            .with_path(base_path_arg));
4516        }
4517    };
4518    let base_value = JsonValue::Object(base_obj);
4519
4520    let paths = eval_json_paths_arg(
4521        args, injected, record, context, out, base_path, locals, 1, true,
4522    )?;
4523    let Some(paths) = paths else {
4524        return Ok(EvalValue::Missing);
4525    };
4526
4527    let mut output = JsonValue::Object(Map::new());
4528    for tokens in paths {
4529        if let Some(value) = get_path(&base_value, &tokens) {
4530            set_path_with_indexes(&mut output, &tokens, value.clone(), base_path)?;
4531        }
4532    }
4533
4534    Ok(EvalValue::Value(output))
4535}
4536
4537fn eval_json_omit(
4538    args: &[Expr],
4539    injected: Option<&EvalValue>,
4540    record: &JsonValue,
4541    context: Option<&JsonValue>,
4542    out: &JsonValue,
4543    base_path: &str,
4544    locals: Option<&EvalLocals<'_>>,
4545) -> Result<EvalValue, TransformError> {
4546    let total_len = args_len(args, injected);
4547    if total_len != 2 {
4548        return Err(TransformError::new(
4549            TransformErrorKind::ExprError,
4550            "expr.args must contain exactly two items",
4551        )
4552        .with_path(format!("{}.args", base_path)));
4553    }
4554
4555    let base_path_arg = format!("{}.args[0]", base_path);
4556    let base_value =
4557        eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
4558    let mut base_value = match base_value {
4559        EvalValue::Missing => return Ok(EvalValue::Missing),
4560        EvalValue::Value(value) => value,
4561    };
4562    if base_value.is_null() {
4563        return Err(TransformError::new(
4564            TransformErrorKind::ExprError,
4565            "expr arg must not be null",
4566        )
4567        .with_path(base_path_arg));
4568    }
4569    let base_obj = match base_value {
4570        JsonValue::Object(map) => map,
4571        _ => {
4572            return Err(TransformError::new(
4573                TransformErrorKind::ExprError,
4574                "expr arg must be object",
4575            )
4576            .with_path(base_path_arg));
4577        }
4578    };
4579    base_value = JsonValue::Object(base_obj);
4580
4581    let paths = eval_json_paths_arg(
4582        args, injected, record, context, out, base_path, locals, 1, false,
4583    )?;
4584    let Some(paths) = paths else {
4585        return Ok(EvalValue::Missing);
4586    };
4587
4588    for tokens in paths {
4589        remove_path(&mut base_value, &tokens);
4590    }
4591
4592    Ok(EvalValue::Value(base_value))
4593}
4594
4595fn eval_json_keys(
4596    args: &[Expr],
4597    injected: Option<&EvalValue>,
4598    record: &JsonValue,
4599    context: Option<&JsonValue>,
4600    out: &JsonValue,
4601    base_path: &str,
4602    locals: Option<&EvalLocals<'_>>,
4603) -> Result<EvalValue, TransformError> {
4604    eval_json_object_unary(
4605        args,
4606        injected,
4607        record,
4608        context,
4609        out,
4610        base_path,
4611        locals,
4612        |map| {
4613            Ok(JsonValue::Array(
4614                map.keys().cloned().map(JsonValue::String).collect(),
4615            ))
4616        },
4617    )
4618}
4619
4620fn eval_json_values(
4621    args: &[Expr],
4622    injected: Option<&EvalValue>,
4623    record: &JsonValue,
4624    context: Option<&JsonValue>,
4625    out: &JsonValue,
4626    base_path: &str,
4627    locals: Option<&EvalLocals<'_>>,
4628) -> Result<EvalValue, TransformError> {
4629    eval_json_object_unary(
4630        args,
4631        injected,
4632        record,
4633        context,
4634        out,
4635        base_path,
4636        locals,
4637        |map| Ok(JsonValue::Array(map.values().cloned().collect())),
4638    )
4639}
4640
4641fn eval_json_entries(
4642    args: &[Expr],
4643    injected: Option<&EvalValue>,
4644    record: &JsonValue,
4645    context: Option<&JsonValue>,
4646    out: &JsonValue,
4647    base_path: &str,
4648    locals: Option<&EvalLocals<'_>>,
4649) -> Result<EvalValue, TransformError> {
4650    eval_json_object_unary(
4651        args,
4652        injected,
4653        record,
4654        context,
4655        out,
4656        base_path,
4657        locals,
4658        |map| {
4659            let mut entries = Vec::with_capacity(map.len());
4660            for (key, value) in map {
4661                let mut entry = Map::new();
4662                entry.insert("key".to_string(), JsonValue::String(key.clone()));
4663                entry.insert("value".to_string(), value.clone());
4664                entries.push(JsonValue::Object(entry));
4665            }
4666            Ok(JsonValue::Array(entries))
4667        },
4668    )
4669}
4670
4671fn eval_len(
4672    args: &[Expr],
4673    injected: Option<&EvalValue>,
4674    record: &JsonValue,
4675    context: Option<&JsonValue>,
4676    out: &JsonValue,
4677    base_path: &str,
4678    locals: Option<&EvalLocals<'_>>,
4679) -> Result<EvalValue, TransformError> {
4680    let total_len = args_len(args, injected);
4681    if total_len != 1 {
4682        return Err(TransformError::new(
4683            TransformErrorKind::ExprError,
4684            "expr.args must contain exactly one item",
4685        )
4686        .with_path(format!("{}.args", base_path)));
4687    }
4688
4689    let arg_path = format!("{}.args[0]", base_path);
4690    let value = eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
4691    let value = match value {
4692        EvalValue::Missing => return Ok(EvalValue::Missing),
4693        EvalValue::Value(value) => value,
4694    };
4695    if value.is_null() {
4696        return Err(TransformError::new(
4697            TransformErrorKind::ExprError,
4698            "expr arg must not be null",
4699        )
4700        .with_path(arg_path));
4701    }
4702
4703    let len = match value {
4704        JsonValue::String(value) => value.chars().count(),
4705        JsonValue::Array(items) => items.len(),
4706        JsonValue::Object(map) => map.len(),
4707        _ => {
4708            return Err(TransformError::new(
4709                TransformErrorKind::ExprError,
4710                "expr arg must be string, array, or object",
4711            )
4712            .with_path(arg_path));
4713        }
4714    };
4715
4716    Ok(EvalValue::Value(JsonValue::Number(
4717        serde_json::Number::from(len as u64),
4718    )))
4719}
4720
4721fn eval_json_from_entries(
4722    args: &[Expr],
4723    injected: Option<&EvalValue>,
4724    record: &JsonValue,
4725    context: Option<&JsonValue>,
4726    out: &JsonValue,
4727    base_path: &str,
4728    locals: Option<&EvalLocals<'_>>,
4729) -> Result<EvalValue, TransformError> {
4730    let total_len = args_len(args, injected);
4731    if !(1..=2).contains(&total_len) {
4732        return Err(TransformError::new(
4733            TransformErrorKind::ExprError,
4734            "expr.args must contain one or two items",
4735        )
4736        .with_path(format!("{}.args", base_path)));
4737    }
4738
4739    let arg_path = format!("{}.args[0]", base_path);
4740    let first_value =
4741        eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
4742    let first_value = match first_value {
4743        EvalValue::Missing => return Ok(EvalValue::Missing),
4744        EvalValue::Value(value) => value,
4745    };
4746    if first_value.is_null() {
4747        return Err(TransformError::new(
4748            TransformErrorKind::ExprError,
4749            "expr arg must not be null",
4750        )
4751        .with_path(&arg_path));
4752    }
4753
4754    if total_len == 1 {
4755        return match first_value {
4756            JsonValue::Object(map) => Ok(EvalValue::Value(JsonValue::Object(map))),
4757            JsonValue::Array(items) => {
4758                let mut output = Map::new();
4759                for (index, item) in items.iter().enumerate() {
4760                    let entry_path = format!("{}[{}]", arg_path, index);
4761                    match item {
4762                        JsonValue::Array(pair) => {
4763                            if pair.len() != 2 {
4764                                return Err(TransformError::new(
4765                                    TransformErrorKind::ExprError,
4766                                    "entries must have exactly two items",
4767                                )
4768                                .with_path(&entry_path));
4769                            }
4770                            let key_path = format!("{}[0]", entry_path);
4771                            let key = value_to_string(&pair[0], &key_path)?;
4772                            let value = pair[1].clone();
4773                            output.insert(key, value);
4774                        }
4775                        JsonValue::Object(map) => {
4776                            let key_path = format!("{}.key", entry_path);
4777                            let value_path = format!("{}.value", entry_path);
4778                            let key_value = map.get("key").ok_or_else(|| {
4779                                TransformError::new(
4780                                    TransformErrorKind::ExprError,
4781                                    "entry must contain key",
4782                                )
4783                                .with_path(&key_path)
4784                            })?;
4785                            if key_value.is_null() {
4786                                return Err(TransformError::new(
4787                                    TransformErrorKind::ExprError,
4788                                    "entry key must not be null",
4789                                )
4790                                .with_path(&key_path));
4791                            }
4792                            let value_value = map.get("value").ok_or_else(|| {
4793                                TransformError::new(
4794                                    TransformErrorKind::ExprError,
4795                                    "entry must contain value",
4796                                )
4797                                .with_path(&value_path)
4798                            })?;
4799                            let key = value_to_string(key_value, &key_path)?;
4800                            output.insert(key, value_value.clone());
4801                        }
4802                        _ => {
4803                            return Err(TransformError::new(
4804                                TransformErrorKind::ExprError,
4805                                "entries must be arrays or objects",
4806                            )
4807                            .with_path(&entry_path));
4808                        }
4809                    }
4810                }
4811                Ok(EvalValue::Value(JsonValue::Object(output)))
4812            }
4813            _ => Err(TransformError::new(
4814                TransformErrorKind::ExprError,
4815                "expr arg must be object or array",
4816            )
4817            .with_path(arg_path)),
4818        };
4819    }
4820
4821    let key = value_to_string(&first_value, &arg_path)?;
4822    let value =
4823        match eval_expr_at_index(1, args, injected, record, context, out, base_path, locals)? {
4824            EvalValue::Missing => return Ok(EvalValue::Missing),
4825            EvalValue::Value(value) => value,
4826        };
4827    let mut output = Map::new();
4828    output.insert(key, value);
4829    Ok(EvalValue::Value(JsonValue::Object(output)))
4830}
4831
4832fn eval_json_object_flatten(
4833    args: &[Expr],
4834    injected: Option<&EvalValue>,
4835    record: &JsonValue,
4836    context: Option<&JsonValue>,
4837    out: &JsonValue,
4838    base_path: &str,
4839    locals: Option<&EvalLocals<'_>>,
4840) -> Result<EvalValue, TransformError> {
4841    eval_json_object_unary(
4842        args,
4843        injected,
4844        record,
4845        context,
4846        out,
4847        base_path,
4848        locals,
4849        |map| {
4850            let mut output = Map::new();
4851            let mut tokens = Vec::new();
4852            flatten_object(map, &mut tokens, &mut output, base_path)?;
4853            Ok(JsonValue::Object(output))
4854        },
4855    )
4856}
4857
4858fn eval_json_object_unflatten(
4859    args: &[Expr],
4860    injected: Option<&EvalValue>,
4861    record: &JsonValue,
4862    context: Option<&JsonValue>,
4863    out: &JsonValue,
4864    base_path: &str,
4865    locals: Option<&EvalLocals<'_>>,
4866) -> Result<EvalValue, TransformError> {
4867    eval_json_object_unary(
4868        args,
4869        injected,
4870        record,
4871        context,
4872        out,
4873        base_path,
4874        locals,
4875        |map| {
4876            let mut paths = Vec::with_capacity(map.len());
4877            let mut values = Vec::with_capacity(map.len());
4878            for (key, value) in map {
4879                let tokens = parse_path_tokens(
4880                    key,
4881                    TransformErrorKind::ExprError,
4882                    format!("{}.args[0]", base_path),
4883                )?;
4884                if tokens
4885                    .iter()
4886                    .any(|token| matches!(token, PathToken::Index(_)))
4887                {
4888                    return Err(TransformError::new(
4889                        TransformErrorKind::ExprError,
4890                        "array indexes are not allowed in path",
4891                    )
4892                    .with_path(format!("{}.args[0]", base_path)));
4893                }
4894                if has_path_conflict(&paths, &tokens) {
4895                    return Err(TransformError::new(
4896                        TransformErrorKind::ExprError,
4897                        "path conflicts with another path",
4898                    )
4899                    .with_path(format!("{}.args[0]", base_path)));
4900                }
4901                paths.push(tokens);
4902                values.push(value.clone());
4903            }
4904
4905            let mut root = JsonValue::Object(Map::new());
4906            for (tokens, value) in paths.into_iter().zip(values) {
4907                set_path_object_only(&mut root, &tokens, value, base_path)?;
4908            }
4909
4910            Ok(root)
4911        },
4912    )
4913}
4914
4915fn eval_json_object_unary<F>(
4916    args: &[Expr],
4917    injected: Option<&EvalValue>,
4918    record: &JsonValue,
4919    context: Option<&JsonValue>,
4920    out: &JsonValue,
4921    base_path: &str,
4922    locals: Option<&EvalLocals<'_>>,
4923    op: F,
4924) -> Result<EvalValue, TransformError>
4925where
4926    F: FnOnce(&Map<String, JsonValue>) -> Result<JsonValue, TransformError>,
4927{
4928    let total_len = args_len(args, injected);
4929    if total_len != 1 {
4930        return Err(TransformError::new(
4931            TransformErrorKind::ExprError,
4932            "expr.args must contain exactly one item",
4933        )
4934        .with_path(format!("{}.args", base_path)));
4935    }
4936
4937    let arg_path = format!("{}.args[0]", base_path);
4938    let value = eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
4939    let value = match value {
4940        EvalValue::Missing => return Ok(EvalValue::Missing),
4941        EvalValue::Value(value) => value,
4942    };
4943    if value.is_null() {
4944        return Err(TransformError::new(
4945            TransformErrorKind::ExprError,
4946            "expr arg must not be null",
4947        )
4948        .with_path(arg_path));
4949    }
4950    let map = match value {
4951        JsonValue::Object(map) => map,
4952        _ => {
4953            return Err(TransformError::new(
4954                TransformErrorKind::ExprError,
4955                "expr arg must be object",
4956            )
4957            .with_path(arg_path));
4958        }
4959    };
4960
4961    op(&map).map(EvalValue::Value)
4962}
4963
4964fn eval_json_paths_arg(
4965    args: &[Expr],
4966    injected: Option<&EvalValue>,
4967    record: &JsonValue,
4968    context: Option<&JsonValue>,
4969    out: &JsonValue,
4970    base_path: &str,
4971    locals: Option<&EvalLocals<'_>>,
4972    index: usize,
4973    allow_terminal_index: bool,
4974) -> Result<Option<Vec<Vec<PathToken>>>, TransformError> {
4975    let arg_path = format!("{}.args[{}]", base_path, index);
4976    let value = eval_expr_at_index(
4977        index, args, injected, record, context, out, base_path, locals,
4978    )?;
4979    let value = match value {
4980        EvalValue::Missing => return Ok(None),
4981        EvalValue::Value(value) => value,
4982    };
4983    if value.is_null() {
4984        return Err(TransformError::new(
4985            TransformErrorKind::ExprError,
4986            "expr arg must not be null",
4987        )
4988        .with_path(arg_path));
4989    }
4990    let items: Vec<(String, String)> = match value {
4991        JsonValue::String(path) => vec![(arg_path.clone(), path)],
4992        JsonValue::Array(items) => items
4993            .iter()
4994            .enumerate()
4995            .map(|(path_index, item)| {
4996                let item_path = format!("{}.args[{}][{}]", base_path, index, path_index);
4997                let path = item.as_str().ok_or_else(|| {
4998                    TransformError::new(
4999                        TransformErrorKind::ExprError,
5000                        "paths must be a string or array of strings",
5001                    )
5002                    .with_path(&item_path)
5003                })?;
5004                Ok::<(String, String), TransformError>((item_path, path.to_string()))
5005            })
5006            .collect::<Result<Vec<_>, TransformError>>()?,
5007        _ => {
5008            return Err(TransformError::new(
5009                TransformErrorKind::ExprError,
5010                "paths must be a string or array of strings",
5011            )
5012            .with_path(arg_path));
5013        }
5014    };
5015
5016    let mut paths = Vec::new();
5017    for (item_path, path) in items {
5018        let tokens = parse_path_tokens(&path, TransformErrorKind::ExprError, &item_path)?;
5019        if !allow_terminal_index && matches!(tokens.last(), Some(PathToken::Index(_))) {
5020            return Err(TransformError::new(
5021                TransformErrorKind::ExprError,
5022                "path must not end with array index",
5023            )
5024            .with_path(item_path));
5025        }
5026        if has_duplicate_path(&paths, &tokens) {
5027            continue;
5028        }
5029        if has_path_conflict(&paths, &tokens) {
5030            return Err(TransformError::new(
5031                TransformErrorKind::ExprError,
5032                "path conflicts with another path",
5033            )
5034            .with_path(item_path));
5035        }
5036        paths.push(tokens);
5037    }
5038
5039    Ok(Some(paths))
5040}
5041
5042fn has_duplicate_path(paths: &[Vec<PathToken>], tokens: &[PathToken]) -> bool {
5043    paths.iter().any(|existing| existing == tokens)
5044}
5045
5046fn has_path_conflict(paths: &[Vec<PathToken>], tokens: &[PathToken]) -> bool {
5047    paths
5048        .iter()
5049        .any(|existing| is_path_prefix(existing, tokens) || is_path_prefix(tokens, existing))
5050}
5051
5052fn is_path_prefix(prefix: &[PathToken], tokens: &[PathToken]) -> bool {
5053    if prefix.len() > tokens.len() {
5054        return false;
5055    }
5056    prefix.iter().zip(tokens).all(|(left, right)| left == right)
5057}
5058
5059fn merge_object(
5060    target: &mut Map<String, JsonValue>,
5061    incoming: &Map<String, JsonValue>,
5062    deep: bool,
5063) {
5064    for (key, value) in incoming {
5065        if deep {
5066            if let (Some(JsonValue::Object(target_obj)), JsonValue::Object(incoming_obj)) =
5067                (target.get_mut(key), value)
5068            {
5069                merge_object(target_obj, incoming_obj, true);
5070                continue;
5071            }
5072        }
5073        target.insert(key.clone(), value.clone());
5074    }
5075}
5076
5077fn flatten_object(
5078    map: &Map<String, JsonValue>,
5079    tokens: &mut Vec<PathToken>,
5080    output: &mut Map<String, JsonValue>,
5081    base_path: &str,
5082) -> Result<(), TransformError> {
5083    for (key, value) in map {
5084        if key.is_empty() {
5085            return Err(TransformError::new(
5086                TransformErrorKind::ExprError,
5087                "object_flatten does not support empty keys",
5088            )
5089            .with_path(format!("{}.args[0]", base_path)));
5090        }
5091        if key.contains('[') || key.contains(']') {
5092            return Err(TransformError::new(
5093                TransformErrorKind::ExprError,
5094                "object_flatten does not support keys with '[' or ']'",
5095            )
5096            .with_path(format!("{}.args[0]", base_path)));
5097        }
5098        tokens.push(PathToken::Key(key.clone()));
5099        match value {
5100            JsonValue::Object(child) => {
5101                if child.is_empty() {
5102                    let path = format_path_tokens(tokens);
5103                    output.insert(path, JsonValue::Object(Map::new()));
5104                } else {
5105                    flatten_object(child, tokens, output, base_path)?;
5106                }
5107            }
5108            _ => {
5109                let path = format_path_tokens(tokens);
5110                output.insert(path, value.clone());
5111            }
5112        }
5113        tokens.pop();
5114    }
5115    Ok(())
5116}
5117
5118fn format_path_tokens(tokens: &[PathToken]) -> String {
5119    let mut path = String::new();
5120    for token in tokens {
5121        match token {
5122            PathToken::Key(key) => {
5123                if needs_bracket_quote(key) {
5124                    let escaped = key.replace('\\', "\\\\").replace('"', "\\\"");
5125                    path.push('[');
5126                    path.push('"');
5127                    path.push_str(&escaped);
5128                    path.push('"');
5129                    path.push(']');
5130                } else {
5131                    if !path.is_empty() {
5132                        path.push('.');
5133                    }
5134                    path.push_str(key);
5135                }
5136            }
5137            PathToken::Index(index) => {
5138                path.push('[');
5139                path.push_str(&index.to_string());
5140                path.push(']');
5141            }
5142        }
5143    }
5144    path
5145}
5146
5147fn needs_bracket_quote(key: &str) -> bool {
5148    key.contains('.')
5149}
5150
5151fn set_path_object_only(
5152    root: &mut JsonValue,
5153    tokens: &[PathToken],
5154    value: JsonValue,
5155    base_path: &str,
5156) -> Result<(), TransformError> {
5157    if tokens.is_empty() {
5158        return Err(
5159            TransformError::new(TransformErrorKind::ExprError, "path is empty")
5160                .with_path(format!("{}.args[0]", base_path)),
5161        );
5162    }
5163
5164    let mut current = root;
5165    for (index, token) in tokens.iter().enumerate() {
5166        let key = match token {
5167            PathToken::Key(key) => key,
5168            PathToken::Index(_) => {
5169                return Err(TransformError::new(
5170                    TransformErrorKind::ExprError,
5171                    "array indexes are not allowed in path",
5172                )
5173                .with_path(format!("{}.args[0]", base_path)));
5174            }
5175        };
5176        let is_last = index == tokens.len() - 1;
5177
5178        match current {
5179            JsonValue::Object(map) => {
5180                if is_last {
5181                    if map.contains_key(key) {
5182                        return Err(TransformError::new(
5183                            TransformErrorKind::ExprError,
5184                            "path conflicts with existing value",
5185                        )
5186                        .with_path(format!("{}.args[0]", base_path)));
5187                    }
5188                    map.insert(key.clone(), value);
5189                    return Ok(());
5190                }
5191
5192                let entry = map
5193                    .entry(key.clone())
5194                    .or_insert_with(|| JsonValue::Object(Map::new()));
5195                if !entry.is_object() {
5196                    return Err(TransformError::new(
5197                        TransformErrorKind::ExprError,
5198                        "path conflicts with non-object value",
5199                    )
5200                    .with_path(format!("{}.args[0]", base_path)));
5201                }
5202                current = entry;
5203            }
5204            _ => {
5205                return Err(TransformError::new(
5206                    TransformErrorKind::ExprError,
5207                    "path conflicts with non-object value",
5208                )
5209                .with_path(format!("{}.args[0]", base_path)));
5210            }
5211        }
5212    }
5213
5214    Ok(())
5215}
5216
5217fn set_path_with_indexes(
5218    root: &mut JsonValue,
5219    tokens: &[PathToken],
5220    value: JsonValue,
5221    base_path: &str,
5222) -> Result<(), TransformError> {
5223    if tokens.is_empty() {
5224        return Err(
5225            TransformError::new(TransformErrorKind::ExprError, "path is empty")
5226                .with_path(format!("{}.args[1]", base_path)),
5227        );
5228    }
5229
5230    let mut current = root;
5231    for (index, token) in tokens.iter().enumerate() {
5232        let is_last = index == tokens.len() - 1;
5233        match token {
5234            PathToken::Key(key) => {
5235                let next_token = tokens.get(index + 1);
5236                match current {
5237                    JsonValue::Object(map) => {
5238                        if is_last {
5239                            map.insert(key.clone(), value);
5240                            return Ok(());
5241                        }
5242                        let entry = map.entry(key.clone()).or_insert_with(|| match next_token {
5243                            Some(PathToken::Index(_)) => JsonValue::Array(Vec::new()),
5244                            _ => JsonValue::Object(Map::new()),
5245                        });
5246                        let expect_index = matches!(next_token, Some(PathToken::Index(_)));
5247                        let entry_is_array = matches!(entry, JsonValue::Array(_));
5248                        let entry_is_object = matches!(entry, JsonValue::Object(_));
5249                        if !(expect_index && entry_is_array || !expect_index && entry_is_object) {
5250                            return Err(TransformError::new(
5251                                TransformErrorKind::ExprError,
5252                                "path conflicts with non-object value",
5253                            )
5254                            .with_path(format!("{}.args[1]", base_path)));
5255                        }
5256                        current = entry;
5257                    }
5258                    _ => {
5259                        return Err(TransformError::new(
5260                            TransformErrorKind::ExprError,
5261                            "path conflicts with non-object value",
5262                        )
5263                        .with_path(format!("{}.args[1]", base_path)));
5264                    }
5265                }
5266            }
5267            PathToken::Index(path_index) => {
5268                let next_token = tokens.get(index + 1);
5269                match current {
5270                    JsonValue::Array(items) => {
5271                        if items.len() <= *path_index {
5272                            items.resize_with(path_index + 1, || JsonValue::Null);
5273                        }
5274                        if is_last {
5275                            items[*path_index] = value;
5276                            return Ok(());
5277                        }
5278                        let entry = &mut items[*path_index];
5279                        if entry.is_null() {
5280                            *entry = match next_token {
5281                                Some(PathToken::Index(_)) => JsonValue::Array(Vec::new()),
5282                                _ => JsonValue::Object(Map::new()),
5283                            };
5284                        }
5285                        let expect_index = matches!(next_token, Some(PathToken::Index(_)));
5286                        let entry_is_array = matches!(entry, JsonValue::Array(_));
5287                        let entry_is_object = matches!(entry, JsonValue::Object(_));
5288                        if !(expect_index && entry_is_array || !expect_index && entry_is_object) {
5289                            return Err(TransformError::new(
5290                                TransformErrorKind::ExprError,
5291                                "path conflicts with non-object value",
5292                            )
5293                            .with_path(format!("{}.args[1]", base_path)));
5294                        }
5295                        current = entry;
5296                    }
5297                    _ => {
5298                        return Err(TransformError::new(
5299                            TransformErrorKind::ExprError,
5300                            "path conflicts with non-object value",
5301                        )
5302                        .with_path(format!("{}.args[1]", base_path)));
5303                    }
5304                }
5305            }
5306        }
5307    }
5308
5309    Ok(())
5310}
5311
5312fn remove_path(root: &mut JsonValue, tokens: &[PathToken]) {
5313    if tokens.is_empty() {
5314        return;
5315    }
5316
5317    let (first, rest) = tokens.split_first().unwrap();
5318    match first {
5319        PathToken::Key(key) => {
5320            if let JsonValue::Object(map) = root {
5321                if rest.is_empty() {
5322                    map.remove(key);
5323                    return;
5324                }
5325                if let Some(next) = map.get_mut(key) {
5326                    remove_path(next, rest);
5327                }
5328            }
5329        }
5330        PathToken::Index(index) => {
5331            if let JsonValue::Array(items) = root {
5332                if let Some(next) = items.get_mut(*index) {
5333                    remove_path(next, rest);
5334                }
5335            }
5336        }
5337    }
5338}
5339
5340fn eval_bool_and_or(
5341    args: &[Expr],
5342    injected: Option<&EvalValue>,
5343    record: &JsonValue,
5344    context: Option<&JsonValue>,
5345    out: &JsonValue,
5346    base_path: &str,
5347    is_and: bool,
5348    locals: Option<&EvalLocals<'_>>,
5349) -> Result<EvalValue, TransformError> {
5350    let total_len = args_len(args, injected);
5351    if total_len < 2 {
5352        return Err(TransformError::new(
5353            TransformErrorKind::ExprError,
5354            "expr.args must contain at least two items",
5355        )
5356        .with_path(format!("{}.args", base_path)));
5357    }
5358
5359    let mut saw_missing = false;
5360    for index in 0..total_len {
5361        let arg_path = format!("{}.args[{}]", base_path, index);
5362        let value = eval_expr_at_index(
5363            index, args, injected, record, context, out, base_path, locals,
5364        )?;
5365        match value {
5366            EvalValue::Missing => {
5367                saw_missing = true;
5368                continue;
5369            }
5370            EvalValue::Value(value) => {
5371                let flag = value_as_bool(&value, &arg_path)?;
5372                if is_and {
5373                    if !flag {
5374                        return Ok(EvalValue::Value(JsonValue::Bool(false)));
5375                    }
5376                } else if flag {
5377                    return Ok(EvalValue::Value(JsonValue::Bool(true)));
5378                }
5379            }
5380        }
5381    }
5382
5383    if saw_missing {
5384        Ok(EvalValue::Missing)
5385    } else {
5386        Ok(EvalValue::Value(JsonValue::Bool(is_and)))
5387    }
5388}
5389
5390fn eval_bool_not(
5391    args: &[Expr],
5392    injected: Option<&EvalValue>,
5393    record: &JsonValue,
5394    context: Option<&JsonValue>,
5395    out: &JsonValue,
5396    base_path: &str,
5397    locals: Option<&EvalLocals<'_>>,
5398) -> Result<EvalValue, TransformError> {
5399    let total_len = args_len(args, injected);
5400    if total_len != 1 {
5401        return Err(TransformError::new(
5402            TransformErrorKind::ExprError,
5403            "expr.args must contain exactly one item",
5404        )
5405        .with_path(format!("{}.args", base_path)));
5406    }
5407
5408    let arg_path = format!("{}.args[0]", base_path);
5409    let value = eval_expr_at_index(0, args, injected, record, context, out, base_path, locals)?;
5410    match value {
5411        EvalValue::Missing => Ok(EvalValue::Missing),
5412        EvalValue::Value(value) => {
5413            let flag = value_as_bool(&value, &arg_path)?;
5414            Ok(EvalValue::Value(JsonValue::Bool(!flag)))
5415        }
5416    }
5417}
5418
5419fn eval_compare(
5420    expr_op: &ExprOp,
5421    injected: Option<&EvalValue>,
5422    record: &JsonValue,
5423    context: Option<&JsonValue>,
5424    out: &JsonValue,
5425    base_path: &str,
5426    locals: Option<&EvalLocals<'_>>,
5427) -> Result<EvalValue, TransformError> {
5428    let total_len = args_len(&expr_op.args, injected);
5429    if total_len != 2 {
5430        return Err(TransformError::new(
5431            TransformErrorKind::ExprError,
5432            "expr.args must contain exactly two items",
5433        )
5434        .with_path(format!("{}.args", base_path)));
5435    }
5436
5437    let left_path = format!("{}.args[0]", base_path);
5438    let right_path = format!("{}.args[1]", base_path);
5439    let left = eval_expr_value_or_null_at(
5440        0,
5441        &expr_op.args,
5442        injected,
5443        record,
5444        context,
5445        out,
5446        base_path,
5447        locals,
5448    )?;
5449    let right = eval_expr_value_or_null_at(
5450        1,
5451        &expr_op.args,
5452        injected,
5453        record,
5454        context,
5455        out,
5456        base_path,
5457        locals,
5458    )?;
5459
5460    let result = match expr_op.op.as_str() {
5461        "==" => compare_eq(&left, &right, &left_path, &right_path)?,
5462        "!=" => !compare_eq(&left, &right, &left_path, &right_path)?,
5463        "<" => compare_numbers(&left, &right, &left_path, &right_path, |l, r| l < r)?,
5464        "<=" => compare_numbers(&left, &right, &left_path, &right_path, |l, r| l <= r)?,
5465        ">" => compare_numbers(&left, &right, &left_path, &right_path, |l, r| l > r)?,
5466        ">=" => compare_numbers(&left, &right, &left_path, &right_path, |l, r| l >= r)?,
5467        "~=" => match_regex(&left, &right, &left_path, &right_path)?,
5468        _ => {
5469            return Err(TransformError::new(
5470                TransformErrorKind::ExprError,
5471                "expr.op is not supported",
5472            )
5473            .with_path(format!("{}.op", base_path)));
5474        }
5475    };
5476
5477    Ok(EvalValue::Value(JsonValue::Bool(result)))
5478}
5479
5480fn compare_eq(
5481    left: &JsonValue,
5482    right: &JsonValue,
5483    left_path: &str,
5484    right_path: &str,
5485) -> Result<bool, TransformError> {
5486    if left.is_null() || right.is_null() {
5487        return Ok(left.is_null() && right.is_null());
5488    }
5489
5490    let left_value = value_to_string(left, left_path)?;
5491    let right_value = value_to_string(right, right_path)?;
5492    Ok(left_value == right_value)
5493}
5494
5495fn compare_numbers<F>(
5496    left: &JsonValue,
5497    right: &JsonValue,
5498    left_path: &str,
5499    right_path: &str,
5500    compare: F,
5501) -> Result<bool, TransformError>
5502where
5503    F: FnOnce(f64, f64) -> bool,
5504{
5505    let left_value = value_to_number(left, left_path, "comparison operand must be a number")?;
5506    let right_value = value_to_number(right, right_path, "comparison operand must be a number")?;
5507    Ok(compare(left_value, right_value))
5508}
5509
5510fn match_regex(
5511    left: &JsonValue,
5512    right: &JsonValue,
5513    left_path: &str,
5514    right_path: &str,
5515) -> Result<bool, TransformError> {
5516    let value = value_as_string(left, left_path)?;
5517    let pattern = value_as_string(right, right_path)?;
5518    let regex = cached_regex(&pattern, right_path)?;
5519    Ok(regex.is_match(&value))
5520}
5521
5522const DEFAULT_DATE_FORMATS_WITH_TZ: [&str; 8] = [
5523    "%Y-%m-%dT%H:%M:%S%:z",
5524    "%Y-%m-%d %H:%M:%S%:z",
5525    "%Y-%m-%dT%H:%M:%S%.f%:z",
5526    "%Y-%m-%d %H:%M:%S%.f%:z",
5527    "%Y-%m-%dT%H:%M:%S%z",
5528    "%Y-%m-%d %H:%M:%S%z",
5529    "%Y/%m/%d %H:%M:%S%:z",
5530    "%Y/%m/%d %H:%M:%S%z",
5531];
5532
5533const DEFAULT_DATE_FORMATS: [&str; 12] = [
5534    "%Y-%m-%d",
5535    "%Y/%m/%d",
5536    "%Y%m%d",
5537    "%Y-%m-%d %H:%M",
5538    "%Y/%m/%d %H:%M",
5539    "%Y-%m-%d %H:%M:%S",
5540    "%Y/%m/%d %H:%M:%S",
5541    "%Y-%m-%dT%H:%M",
5542    "%Y-%m-%dT%H:%M:%S",
5543    "%Y-%m-%dT%H:%M:%S%.f",
5544    "%Y-%m-%d %H:%M:%S%.f",
5545    "%Y/%m/%d %H:%M:%S%.f",
5546];
5547
5548fn parse_format_list(value: &JsonValue, path: &str) -> Result<Vec<String>, TransformError> {
5549    match value {
5550        JsonValue::String(s) => {
5551            if s.is_empty() {
5552                Err(TransformError::new(
5553                    TransformErrorKind::ExprError,
5554                    "input_format must not be empty",
5555                )
5556                .with_path(path))
5557            } else {
5558                Ok(vec![s.clone()])
5559            }
5560        }
5561        JsonValue::Array(items) => {
5562            if items.is_empty() {
5563                return Err(TransformError::new(
5564                    TransformErrorKind::ExprError,
5565                    "input_format must not be empty",
5566                )
5567                .with_path(path));
5568            }
5569            let mut formats = Vec::with_capacity(items.len());
5570            for (index, item) in items.iter().enumerate() {
5571                let item_path = format!("{}[{}]", path, index);
5572                let value = match item.as_str() {
5573                    Some(value) => value,
5574                    None => {
5575                        return Err(TransformError::new(
5576                            TransformErrorKind::ExprError,
5577                            "input_format must be a string or array of strings",
5578                        )
5579                        .with_path(item_path));
5580                    }
5581                };
5582                if value.is_empty() {
5583                    return Err(TransformError::new(
5584                        TransformErrorKind::ExprError,
5585                        "input_format must not be empty",
5586                    )
5587                    .with_path(item_path));
5588                }
5589                formats.push(value.to_string());
5590            }
5591            Ok(formats)
5592        }
5593        _ => Err(TransformError::new(
5594            TransformErrorKind::ExprError,
5595            "input_format must be a string or array of strings",
5596        )
5597        .with_path(path)),
5598    }
5599}
5600
5601fn parse_datetime(
5602    value: &str,
5603    formats: Option<&[String]>,
5604    timezone: Option<FixedOffset>,
5605    path: &str,
5606) -> Result<DateTime<FixedOffset>, TransformError> {
5607    if let Some(formats) = formats {
5608        return parse_datetime_with_formats(value, formats, timezone, path);
5609    }
5610
5611    if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
5612        return Ok(dt);
5613    }
5614    if let Ok(dt) = DateTime::parse_from_rfc2822(value) {
5615        return Ok(dt);
5616    }
5617
5618    for format in DEFAULT_DATE_FORMATS_WITH_TZ {
5619        if let Ok(dt) = DateTime::parse_from_str(value, format) {
5620            return Ok(dt);
5621        }
5622    }
5623
5624    parse_datetime_with_formats(
5625        value,
5626        &DEFAULT_DATE_FORMATS
5627            .iter()
5628            .map(|f| f.to_string())
5629            .collect::<Vec<_>>(),
5630        timezone,
5631        path,
5632    )
5633}
5634
5635fn parse_datetime_with_formats(
5636    value: &str,
5637    formats: &[String],
5638    timezone: Option<FixedOffset>,
5639    path: &str,
5640) -> Result<DateTime<FixedOffset>, TransformError> {
5641    for format in formats {
5642        if let Ok(dt) = DateTime::parse_from_str(value, format) {
5643            return Ok(dt);
5644        }
5645        if let Ok(naive) = NaiveDateTime::parse_from_str(value, format) {
5646            return apply_timezone(naive, timezone, path);
5647        }
5648        if let Ok(date) = NaiveDate::parse_from_str(value, format) {
5649            let naive = date
5650                .and_hms_opt(0, 0, 0)
5651                .ok_or_else(|| expr_type_error("date is invalid", path))?;
5652            return apply_timezone(naive, timezone, path);
5653        }
5654    }
5655
5656    Err(
5657        TransformError::new(TransformErrorKind::ExprError, "date format is invalid")
5658            .with_path(path),
5659    )
5660}
5661
5662fn apply_timezone(
5663    naive: NaiveDateTime,
5664    timezone: Option<FixedOffset>,
5665    path: &str,
5666) -> Result<DateTime<FixedOffset>, TransformError> {
5667    let offset = timezone.unwrap_or_else(|| FixedOffset::east_opt(0).unwrap());
5668    offset
5669        .from_local_datetime(&naive)
5670        .single()
5671        .ok_or_else(|| expr_type_error("date is invalid", path))
5672}
5673
5674fn looks_like_timezone(value: &str) -> bool {
5675    if value.eq_ignore_ascii_case("utc") || value == "Z" {
5676        return true;
5677    }
5678    matches!(value.chars().next(), Some('+') | Some('-'))
5679}
5680
5681fn parse_timezone(value: &str, path: &str) -> Result<FixedOffset, TransformError> {
5682    if value.eq_ignore_ascii_case("utc") || value == "Z" {
5683        return FixedOffset::east_opt(0).ok_or_else(|| {
5684            TransformError::new(
5685                TransformErrorKind::ExprError,
5686                "timezone must be UTC or an offset like +09:00",
5687            )
5688            .with_path(path)
5689        });
5690    }
5691
5692    let (sign, rest) = match value.chars().next() {
5693        Some('+') => (1i32, &value[1..]),
5694        Some('-') => (-1i32, &value[1..]),
5695        _ => {
5696            return Err(TransformError::new(
5697                TransformErrorKind::ExprError,
5698                "timezone must be UTC or an offset like +09:00",
5699            )
5700            .with_path(path));
5701        }
5702    };
5703
5704    let (hours, minutes) = if let Some((h, m)) = rest.split_once(':') {
5705        let hours = h.parse::<i32>().ok();
5706        let minutes = m.parse::<i32>().ok();
5707        match (hours, minutes) {
5708            (Some(hours), Some(minutes)) => (hours, minutes),
5709            _ => {
5710                return Err(TransformError::new(
5711                    TransformErrorKind::ExprError,
5712                    "timezone must be UTC or an offset like +09:00",
5713                )
5714                .with_path(path));
5715            }
5716        }
5717    } else {
5718        match rest.len() {
5719            2 => {
5720                let hours = rest.parse::<i32>().ok();
5721                match hours {
5722                    Some(hours) => (hours, 0),
5723                    None => {
5724                        return Err(TransformError::new(
5725                            TransformErrorKind::ExprError,
5726                            "timezone must be UTC or an offset like +09:00",
5727                        )
5728                        .with_path(path));
5729                    }
5730                }
5731            }
5732            4 => {
5733                let hours = rest[..2].parse::<i32>().ok();
5734                let minutes = rest[2..].parse::<i32>().ok();
5735                match (hours, minutes) {
5736                    (Some(hours), Some(minutes)) => (hours, minutes),
5737                    _ => {
5738                        return Err(TransformError::new(
5739                            TransformErrorKind::ExprError,
5740                            "timezone must be UTC or an offset like +09:00",
5741                        )
5742                        .with_path(path));
5743                    }
5744                }
5745            }
5746            _ => {
5747                return Err(TransformError::new(
5748                    TransformErrorKind::ExprError,
5749                    "timezone must be UTC or an offset like +09:00",
5750                )
5751                .with_path(path));
5752            }
5753        }
5754    };
5755
5756    if !(0..=23).contains(&hours) || !(0..=59).contains(&minutes) {
5757        return Err(TransformError::new(
5758            TransformErrorKind::ExprError,
5759            "timezone must be UTC or an offset like +09:00",
5760        )
5761        .with_path(path));
5762    }
5763
5764    let offset_seconds = sign * (hours * 3600 + minutes * 60);
5765    FixedOffset::east_opt(offset_seconds).ok_or_else(|| {
5766        TransformError::new(
5767            TransformErrorKind::ExprError,
5768            "timezone must be UTC or an offset like +09:00",
5769        )
5770        .with_path(path)
5771    })
5772}
5773
5774fn value_to_string(value: &JsonValue, path: &str) -> Result<String, TransformError> {
5775    match value {
5776        JsonValue::String(s) => Ok(s.clone()),
5777        JsonValue::Number(n) => Ok(number_to_string(n)),
5778        JsonValue::Bool(b) => Ok(b.to_string()),
5779        _ => Err(TransformError::new(
5780            TransformErrorKind::ExprError,
5781            "value must be string/number/bool",
5782        )
5783        .with_path(path)),
5784    }
5785}
5786
5787fn value_as_string(value: &JsonValue, path: &str) -> Result<String, TransformError> {
5788    match value {
5789        JsonValue::String(s) => Ok(s.clone()),
5790        _ => Err(
5791            TransformError::new(TransformErrorKind::ExprError, "value must be a string")
5792                .with_path(path),
5793        ),
5794    }
5795}
5796
5797fn value_as_bool(value: &JsonValue, path: &str) -> Result<bool, TransformError> {
5798    match value {
5799        JsonValue::Bool(flag) => Ok(*flag),
5800        _ => Err(expr_type_error("value must be a boolean", path)),
5801    }
5802}
5803
5804fn value_to_number(value: &JsonValue, path: &str, message: &str) -> Result<f64, TransformError> {
5805    match value {
5806        JsonValue::Number(n) => n
5807            .as_f64()
5808            .filter(|f| f.is_finite())
5809            .ok_or_else(|| expr_type_error(message, path)),
5810        JsonValue::String(s) => s
5811            .parse::<f64>()
5812            .ok()
5813            .filter(|f| f.is_finite())
5814            .ok_or_else(|| expr_type_error(message, path)),
5815        _ => Err(expr_type_error(message, path)),
5816    }
5817}
5818
5819fn value_to_i64(value: &JsonValue, path: &str, message: &str) -> Result<i64, TransformError> {
5820    match value {
5821        JsonValue::Number(n) => {
5822            if let Some(i) = n.as_i64() {
5823                Ok(i)
5824            } else if let Some(u) = n.as_u64() {
5825                i64::try_from(u).map_err(|_| expr_type_error(message, path))
5826            } else if let Some(f) = n.as_f64() {
5827                if f.is_finite() && (f.fract()).abs() < f64::EPSILON {
5828                    let value = f as i64;
5829                    if (value as f64 - f).abs() < f64::EPSILON {
5830                        Ok(value)
5831                    } else {
5832                        Err(expr_type_error(message, path))
5833                    }
5834                } else {
5835                    Err(expr_type_error(message, path))
5836                }
5837            } else {
5838                Err(expr_type_error(message, path))
5839            }
5840        }
5841        JsonValue::String(s) => s.parse::<i64>().map_err(|_| expr_type_error(message, path)),
5842        _ => Err(expr_type_error(message, path)),
5843    }
5844}
5845
5846fn json_number_from_f64(value: f64, path: &str) -> Result<JsonValue, TransformError> {
5847    if !value.is_finite() {
5848        return Err(expr_type_error("number result is not finite", path));
5849    }
5850    if (value.fract()).abs() < f64::EPSILON {
5851        let as_i64 = value as i64;
5852        if (as_i64 as f64 - value).abs() < f64::EPSILON {
5853            return Ok(JsonValue::Number(as_i64.into()));
5854        }
5855    }
5856    serde_json::Number::from_f64(value)
5857        .map(JsonValue::Number)
5858        .ok_or_else(|| expr_type_error("number result is not finite", path))
5859}
5860
5861fn to_radix_string(value: i64, base: u32, path: &str) -> Result<String, TransformError> {
5862    let digits = b"0123456789abcdefghijklmnopqrstuvwxyz";
5863    if base < 2 || base > 36 {
5864        return Err(expr_type_error("base must be between 2 and 36", path));
5865    }
5866
5867    if value == 0 {
5868        return Ok("0".to_string());
5869    }
5870
5871    let is_negative = value < 0;
5872    let mut n = value
5873        .checked_abs()
5874        .ok_or_else(|| expr_type_error("value is out of range for base conversion", path))?
5875        as u64;
5876
5877    let mut buf = Vec::new();
5878    while n > 0 {
5879        let idx = (n % base as u64) as usize;
5880        buf.push(digits[idx] as char);
5881        n /= base as u64;
5882    }
5883    if is_negative {
5884        buf.push('-');
5885    }
5886    buf.reverse();
5887    Ok(buf.iter().collect())
5888}
5889
5890fn value_to_string_optional(value: &JsonValue) -> Option<String> {
5891    match value {
5892        JsonValue::String(s) => Some(s.clone()),
5893        JsonValue::Number(n) => Some(number_to_string(n)),
5894        JsonValue::Bool(b) => Some(b.to_string()),
5895        _ => None,
5896    }
5897}
5898
5899fn expr_type_error(message: &str, path: &str) -> TransformError {
5900    TransformError::new(TransformErrorKind::ExprError, message).with_path(path)
5901}
5902
5903fn number_to_string(number: &serde_json::Number) -> String {
5904    if let Some(i) = number.as_i64() {
5905        return i.to_string();
5906    }
5907    if let Some(u) = number.as_u64() {
5908        return u.to_string();
5909    }
5910    if let Some(f) = number.as_f64() {
5911        let mut s = format!("{}", f);
5912        if s.contains('.') {
5913            while s.ends_with('0') {
5914                s.pop();
5915            }
5916            if s.ends_with('.') {
5917                s.pop();
5918            }
5919        }
5920        return s;
5921    }
5922    number.to_string()
5923}
5924
5925fn cast_value(value: &JsonValue, type_name: &str, path: &str) -> Result<JsonValue, TransformError> {
5926    match type_name {
5927        "string" => Ok(JsonValue::String(value_to_string(value, path)?)),
5928        "int" => cast_to_int(value, path),
5929        "float" => cast_to_float(value, path),
5930        "bool" => cast_to_bool(value, path),
5931        _ => Err(TransformError::new(
5932            TransformErrorKind::TypeCastFailed,
5933            "type must be string|int|float|bool",
5934        )
5935        .with_path(path)),
5936    }
5937}
5938
5939fn cast_to_int(value: &JsonValue, path: &str) -> Result<JsonValue, TransformError> {
5940    match value {
5941        JsonValue::Number(n) => {
5942            if let Some(i) = n.as_i64() {
5943                Ok(JsonValue::Number(i.into()))
5944            } else if let Some(f) = n.as_f64() {
5945                if (f.fract()).abs() < f64::EPSILON {
5946                    Ok(JsonValue::Number((f as i64).into()))
5947                } else {
5948                    Err(type_cast_error("int", path))
5949                }
5950            } else {
5951                Err(type_cast_error("int", path))
5952            }
5953        }
5954        JsonValue::String(s) => s
5955            .parse::<i64>()
5956            .map(|i| JsonValue::Number(i.into()))
5957            .map_err(|_| type_cast_error("int", path)),
5958        _ => Err(type_cast_error("int", path)),
5959    }
5960}
5961
5962fn cast_to_float(value: &JsonValue, path: &str) -> Result<JsonValue, TransformError> {
5963    match value {
5964        JsonValue::Number(n) => n
5965            .as_f64()
5966            .ok_or_else(|| type_cast_error("float", path))
5967            .and_then(|f| {
5968                serde_json::Number::from_f64(f)
5969                    .map(JsonValue::Number)
5970                    .ok_or_else(|| type_cast_error("float", path))
5971            }),
5972        JsonValue::String(s) => s
5973            .parse::<f64>()
5974            .map_err(|_| type_cast_error("float", path))
5975            .and_then(|f| {
5976                serde_json::Number::from_f64(f)
5977                    .map(JsonValue::Number)
5978                    .ok_or_else(|| type_cast_error("float", path))
5979            }),
5980        _ => Err(type_cast_error("float", path)),
5981    }
5982}
5983
5984fn cast_to_bool(value: &JsonValue, path: &str) -> Result<JsonValue, TransformError> {
5985    match value {
5986        JsonValue::Bool(b) => Ok(JsonValue::Bool(*b)),
5987        JsonValue::String(s) => match s.to_lowercase().as_str() {
5988            "true" => Ok(JsonValue::Bool(true)),
5989            "false" => Ok(JsonValue::Bool(false)),
5990            _ => Err(type_cast_error("bool", path)),
5991        },
5992        _ => Err(type_cast_error("bool", path)),
5993    }
5994}
5995
5996fn type_cast_error(type_name: &str, path: &str) -> TransformError {
5997    TransformError::new(
5998        TransformErrorKind::TypeCastFailed,
5999        format!("failed to cast to {}", type_name),
6000    )
6001    .with_path(path)
6002}
6003
6004fn parse_source(source: &str) -> Result<(Namespace, &str), TransformError> {
6005    if let Some((prefix, path)) = source.split_once('.') {
6006        if path.is_empty() {
6007            return Err(TransformError::new(
6008                TransformErrorKind::InvalidRef,
6009                "reference path is empty",
6010            ));
6011        }
6012        let namespace = match prefix {
6013            "input" => Namespace::Input,
6014            "context" => Namespace::Context,
6015            "out" => Namespace::Out,
6016            _ => {
6017                return Err(TransformError::new(
6018                    TransformErrorKind::InvalidRef,
6019                    "ref namespace must be input|context|out",
6020                ));
6021            }
6022        };
6023        Ok((namespace, path))
6024    } else {
6025        if source.is_empty() {
6026            return Err(TransformError::new(
6027                TransformErrorKind::InvalidRef,
6028                "reference path is empty",
6029            ));
6030        }
6031        Ok((Namespace::Input, source))
6032    }
6033}
6034
6035fn parse_ref(value: &str) -> Result<(Namespace, &str), TransformError> {
6036    let (prefix, path) = value.split_once('.').ok_or_else(|| {
6037        TransformError::new(TransformErrorKind::InvalidRef, "ref must include namespace")
6038    })?;
6039
6040    if path.is_empty() {
6041        return Err(TransformError::new(
6042            TransformErrorKind::InvalidRef,
6043            "ref path is empty",
6044        ));
6045    }
6046
6047    let namespace = match prefix {
6048        "input" => Namespace::Input,
6049        "context" => Namespace::Context,
6050        "out" => Namespace::Out,
6051        "item" => Namespace::Item,
6052        "acc" => Namespace::Acc,
6053        "pipe" => Namespace::Pipe,
6054        "local" => Namespace::Local,
6055        _ => {
6056            return Err(TransformError::new(
6057                TransformErrorKind::InvalidRef,
6058                "ref namespace must be input|context|out|item|acc|pipe|local",
6059            ));
6060        }
6061    };
6062
6063    Ok((namespace, path))
6064}
6065
6066fn parse_path_tokens(
6067    path: &str,
6068    kind: TransformErrorKind,
6069    error_path: impl Into<String>,
6070) -> Result<Vec<PathToken>, TransformError> {
6071    parse_path(path)
6072        .map_err(|err| TransformError::new(kind, err.message()).with_path(error_path.into()))
6073}
6074
6075fn set_path(
6076    root: &mut JsonValue,
6077    path: &str,
6078    value: JsonValue,
6079    mapping_path: &str,
6080) -> Result<(), TransformError> {
6081    let tokens = parse_path_tokens(
6082        path,
6083        TransformErrorKind::InvalidTarget,
6084        format!("{}.target", mapping_path),
6085    )?;
6086    if tokens.is_empty() {
6087        return Err(TransformError::new(
6088            TransformErrorKind::InvalidTarget,
6089            "target path is invalid",
6090        )
6091        .with_path(format!("{}.target", mapping_path)));
6092    }
6093
6094    let mut current = root;
6095    for (index, token) in tokens.iter().enumerate() {
6096        let is_last = index == tokens.len() - 1;
6097        let key = match token {
6098            PathToken::Key(key) => key,
6099            PathToken::Index(_) => {
6100                return Err(TransformError::new(
6101                    TransformErrorKind::InvalidTarget,
6102                    "target path must not include indexes",
6103                )
6104                .with_path(format!("{}.target", mapping_path)));
6105            }
6106        };
6107
6108        match current {
6109            JsonValue::Object(map) => {
6110                if is_last {
6111                    map.insert(key.to_string(), value);
6112                    return Ok(());
6113                }
6114
6115                let entry = map
6116                    .entry(key.to_string())
6117                    .or_insert_with(|| JsonValue::Object(Map::new()));
6118                if !entry.is_object() {
6119                    return Err(TransformError::new(
6120                        TransformErrorKind::InvalidTarget,
6121                        "target path conflicts with non-object value",
6122                    )
6123                    .with_path(format!("{}.target", mapping_path)));
6124                }
6125                current = entry;
6126            }
6127            _ => {
6128                return Err(TransformError::new(
6129                    TransformErrorKind::InvalidTarget,
6130                    "target root must be an object",
6131                )
6132                .with_path(format!("{}.target", mapping_path)));
6133            }
6134        }
6135    }
6136
6137    Ok(())
6138}
6139
6140fn literal_string(expr: &Expr) -> Option<&str> {
6141    match expr {
6142        Expr::Literal(value) => value.as_str(),
6143        _ => None,
6144    }
6145}
6146
6147/// Convert an Expr to JSON value for v2 pipe parsing.
6148/// Returns Some if the expr looks like a v2 pipe expression:
6149/// - Literal(Array) -> direct array
6150/// - Ref where ref_path starts with @ -> single element array
6151/// - Chain where first element starts with @ -> convert to array
6152/// Returns None if it looks like v1 expression and should be handled by v1 eval.
6153fn expr_to_json_for_v2_pipe(expr: &Expr) -> Option<JsonValue> {
6154    match expr {
6155        Expr::Literal(JsonValue::Array(arr)) => {
6156            // Direct array - v2 pipe
6157            Some(JsonValue::Array(arr.clone()))
6158        }
6159        Expr::Literal(JsonValue::String(s)) => {
6160            if is_v2_ref(s) || is_pipe_value(s) || is_literal_escape(s) {
6161                Some(JsonValue::String(s.clone()))
6162            } else {
6163                None
6164            }
6165        }
6166        Expr::Ref(expr_ref)
6167            if expr_ref.ref_path.starts_with('@') || is_literal_escape(&expr_ref.ref_path) =>
6168        {
6169            // Single v2 reference or literal escape (serde collapsed 1-element array)
6170            // Wrap it as single-element array
6171            Some(JsonValue::Array(vec![JsonValue::String(
6172                expr_ref.ref_path.clone(),
6173            )]))
6174        }
6175        Expr::Chain(chain) => {
6176            // Check if first element is a v2 ref
6177            if let Some(first) = chain.chain.first() {
6178                if let Expr::Ref(r) = first {
6179                    if r.ref_path.starts_with('@') {
6180                        // Convert chain to array
6181                        let arr: Vec<JsonValue> =
6182                            chain.chain.iter().map(|e| expr_to_json_value(e)).collect();
6183                        return Some(JsonValue::Array(arr));
6184                    }
6185                }
6186            }
6187            None
6188        }
6189        _ => None,
6190    }
6191}
6192
6193/// Convert an Expr to JSON value for v2 condition parsing.
6194/// Accepts literal values and v2-looking refs/chains while avoiding v1-only forms.
6195fn expr_to_json_for_v2_condition(expr: &Expr) -> Option<JsonValue> {
6196    match expr {
6197        Expr::Literal(value) => Some(value.clone()),
6198        Expr::Ref(ref_expr)
6199            if ref_expr.ref_path.starts_with('@') || is_literal_escape(&ref_expr.ref_path) =>
6200        {
6201            Some(JsonValue::String(ref_expr.ref_path.clone()))
6202        }
6203        Expr::Chain(chain) => {
6204            if let Some(first) = chain.chain.first() {
6205                if let Expr::Ref(r) = first {
6206                    if r.ref_path.starts_with('@') {
6207                        let arr: Vec<JsonValue> =
6208                            chain.chain.iter().map(expr_to_json_value).collect();
6209                        return Some(JsonValue::Array(arr));
6210                    }
6211                }
6212            }
6213            None
6214        }
6215        _ => None,
6216    }
6217}
6218
6219/// Helper to convert Expr to JsonValue (for Chain conversion)
6220fn expr_to_json_value(expr: &Expr) -> JsonValue {
6221    match expr {
6222        Expr::Ref(r) => JsonValue::String(r.ref_path.clone()),
6223        Expr::Literal(v) => v.clone(),
6224        Expr::Op(op) => {
6225            let mut obj = serde_json::Map::new();
6226            let args: Vec<JsonValue> = op.args.iter().map(expr_to_json_value).collect();
6227            obj.insert(op.op.clone(), JsonValue::Array(args));
6228            JsonValue::Object(obj)
6229        }
6230        Expr::Chain(chain) => {
6231            let arr: Vec<JsonValue> = chain.chain.iter().map(expr_to_json_value).collect();
6232            JsonValue::Array(arr)
6233        }
6234    }
6235}
6236
6237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6238enum Namespace {
6239    Input,
6240    Context,
6241    Out,
6242    Item,
6243    Acc,
6244    Pipe,
6245    Local,
6246}
6247
6248#[derive(Clone, Copy)]
6249pub(crate) struct EvalItem<'a> {
6250    pub(crate) value: &'a JsonValue,
6251    pub(crate) index: usize,
6252}
6253
6254#[derive(Clone, Copy)]
6255pub(crate) struct EvalLocals<'a> {
6256    pub(crate) item: Option<EvalItem<'a>>,
6257    pub(crate) acc: Option<&'a JsonValue>,
6258    pub(crate) pipe: Option<&'a EvalValue>,
6259    pub(crate) locals: Option<&'a HashMap<String, EvalValue>>,
6260}
6261
6262#[derive(Debug, Clone, PartialEq)]
6263pub(crate) enum EvalValue {
6264    Missing,
6265    Value(JsonValue),
6266}
6267
6268// =============================================================================
6269// T21: v2 transform integration tests
6270// =============================================================================
6271
6272#[cfg(test)]
6273mod v2_transform_tests {
6274    use super::*;
6275    use crate::parse_rule_file;
6276
6277    #[test]
6278    fn test_v2_simple_ref_transform() {
6279        let yaml = r#"
6280version: 2
6281input:
6282  format: json
6283mappings:
6284  - target: user_name
6285    expr:
6286      - "@input.name"
6287"#;
6288        let rule = parse_rule_file(yaml).unwrap();
6289        let input = r#"[{"name": "Alice"}]"#;
6290        let result = transform(&rule, input, None).unwrap();
6291        assert_eq!(result, serde_json::json!([{"user_name": "Alice"}]));
6292    }
6293
6294    #[test]
6295    fn test_v2_scalar_ref_transform() {
6296        let yaml = r#"
6297version: 2
6298input:
6299  format: json
6300mappings:
6301  - target: user_name
6302    expr: "@input.name"
6303"#;
6304        let rule = parse_rule_file(yaml).unwrap();
6305        let input = r#"[{"name": "Alice"}]"#;
6306        let result = transform(&rule, input, None).unwrap();
6307        assert_eq!(result, serde_json::json!([{"user_name": "Alice"}]));
6308    }
6309
6310    #[test]
6311    fn test_v2_literal_object_with_lookup_key_is_literal() {
6312        let yaml = r#"
6313version: 2
6314input:
6315  format: json
6316mappings:
6317  - target: payload
6318    expr:
6319      lookup: 1
6320"#;
6321        let rule = parse_rule_file(yaml).unwrap();
6322        let input = r#"[{"id": 1}]"#;
6323        let result = transform(&rule, input, None).unwrap();
6324        assert_eq!(result, serde_json::json!([{"payload": {"lookup": 1}}]));
6325    }
6326
6327    #[test]
6328    fn test_v2_pipe_with_ops_transform() {
6329        let yaml = r#"
6330version: 2
6331input:
6332  format: json
6333mappings:
6334  - target: name
6335    expr:
6336      - "@input.name"
6337      - trim
6338      - uppercase
6339"#;
6340        let rule = parse_rule_file(yaml).unwrap();
6341        let input = r#"[{"name": "  alice  "}]"#;
6342        let result = transform(&rule, input, None).unwrap();
6343        assert_eq!(result, serde_json::json!([{"name": "ALICE"}]));
6344    }
6345
6346    #[test]
6347    fn test_v2_context_ref_transform() {
6348        let yaml = r#"
6349version: 2
6350input:
6351  format: json
6352mappings:
6353  - target: rate
6354    expr:
6355      - "@context.rate"
6356"#;
6357        let rule = parse_rule_file(yaml).unwrap();
6358        let input = r#"[{"id": 1}]"#;
6359        let context = serde_json::json!({"rate": 1.5});
6360        let result = transform(&rule, input, Some(&context)).unwrap();
6361        assert_eq!(result, serde_json::json!([{"rate": 1.5}]));
6362    }
6363
6364    #[test]
6365    fn test_v2_out_ref_transform() {
6366        let yaml = r#"
6367version: 2
6368input:
6369  format: json
6370mappings:
6371  - target: first_name
6372    expr:
6373      - "@input.name"
6374  - target: greeting
6375    expr:
6376      - "Hello, "
6377      - concat: ["@out.first_name"]
6378"#;
6379        let rule = parse_rule_file(yaml).unwrap();
6380        let input = r#"[{"name": "Bob"}]"#;
6381        let result = transform(&rule, input, None).unwrap();
6382        assert_eq!(
6383            result,
6384            serde_json::json!([{"first_name": "Bob", "greeting": "Hello, Bob"}])
6385        );
6386    }
6387
6388    #[test]
6389    fn test_v2_with_let_step_transform() {
6390        let yaml = r#"
6391version: 2
6392input:
6393  format: json
6394mappings:
6395  - target: total
6396    expr:
6397      - "@input.price"
6398      - let: { base: "$" }
6399      - multiply: [1.1]
6400"#;
6401        let rule = parse_rule_file(yaml).unwrap();
6402        let input = r#"[{"price": 100}]"#;
6403        let result = transform(&rule, input, None).unwrap();
6404        let total = result[0]["total"].as_f64().unwrap();
6405        assert!((total - 110.0).abs() < 0.001);
6406    }
6407
6408    #[test]
6409    fn test_v2_with_if_step_transform() {
6410        let yaml = r#"
6411version: 2
6412input:
6413  format: json
6414mappings:
6415  - target: discount
6416    expr:
6417      - "@input.total"
6418      - if:
6419          cond:
6420            gt: ["$", 1000]
6421          then:
6422            - "$"
6423            - multiply: [0.9]
6424          else:
6425            - "$"
6426"#;
6427        let rule = parse_rule_file(yaml).unwrap();
6428        let input = r#"[{"total": 2000}, {"total": 500}]"#;
6429        let result = transform(&rule, input, None).unwrap();
6430        let first = result[0]["discount"].as_f64().unwrap();
6431        let second = result[1]["discount"].as_f64().unwrap();
6432        assert!((first - 1800.0).abs() < 0.001);
6433        assert!((second - 500.0).abs() < 0.001);
6434    }
6435
6436    #[test]
6437    fn test_v2_with_map_step_transform() {
6438        let yaml = r#"
6439version: 2
6440input:
6441  format: json
6442mappings:
6443  - target: items
6444    expr:
6445      - "@input.values"
6446      - map:
6447        - multiply: [2]
6448"#;
6449        let rule = parse_rule_file(yaml).unwrap();
6450        let input = r#"[{"values": [1, 2, 3]}]"#;
6451        let result = transform(&rule, input, None).unwrap();
6452        // multiply returns f64, so [2.0, 4.0, 6.0]
6453        assert_eq!(result, serde_json::json!([{"items": [2.0, 4.0, 6.0]}]));
6454    }
6455
6456    #[test]
6457    fn test_v2_v1_mixed_mappings() {
6458        // v1 style mapping (source) should still work in version 2
6459        let yaml = r#"
6460version: 2
6461input:
6462  format: json
6463mappings:
6464  - target: name
6465    source: name
6466  - target: upper_name
6467    expr:
6468      - "@input.name"
6469      - uppercase
6470"#;
6471        let rule = parse_rule_file(yaml).unwrap();
6472        let input = r#"[{"name": "alice"}]"#;
6473        let result = transform(&rule, input, None).unwrap();
6474        assert_eq!(
6475            result,
6476            serde_json::json!([{"name": "alice", "upper_name": "ALICE"}])
6477        );
6478    }
6479
6480    #[test]
6481    fn test_v2_lookup_first_transform() {
6482        let yaml = r#"
6483version: 2
6484input:
6485  format: json
6486mappings:
6487  - target: dept_name
6488    expr:
6489      - lookup_first:
6490        - "@context.departments"
6491        - id
6492        - "@input.dept_id"
6493        - name
6494"#;
6495        let rule = parse_rule_file(yaml).unwrap();
6496        let input = r#"[{"dept_id": 2}]"#;
6497        let context = serde_json::json!({
6498            "departments": [
6499                {"id": 1, "name": "Engineering"},
6500                {"id": 2, "name": "Marketing"},
6501                {"id": 3, "name": "Sales"}
6502            ]
6503        });
6504        let result = transform(&rule, input, Some(&context)).unwrap();
6505        assert_eq!(result, serde_json::json!([{"dept_name": "Marketing"}]));
6506    }
6507
6508    #[test]
6509    fn test_v2_lookup_first_with_pipe_value_transform() {
6510        let yaml = r#"
6511version: 2
6512input:
6513  format: json
6514mappings:
6515  - target: dept_name
6516    expr:
6517      - "@context.departments"
6518      - lookup_first:
6519        - id
6520        - "@input.dept_id"
6521        - name
6522"#;
6523        let rule = parse_rule_file(yaml).unwrap();
6524        let input = r#"[{"dept_id": 2}]"#;
6525        let context = serde_json::json!({
6526            "departments": [
6527                {"id": 1, "name": "Engineering"},
6528                {"id": 2, "name": "Marketing"},
6529                {"id": 3, "name": "Sales"}
6530            ]
6531        });
6532        let result = transform(&rule, input, Some(&context)).unwrap();
6533        assert_eq!(result, serde_json::json!([{"dept_name": "Marketing"}]));
6534    }
6535
6536    #[test]
6537    fn test_v1_rules_still_work() {
6538        // Ensure v1 rules are not affected
6539        let yaml = r#"
6540version: 1
6541input:
6542  format: json
6543mappings:
6544  - target: name
6545    source: name
6546  - target: upper
6547    expr:
6548      op: uppercase
6549      args:
6550        - { ref: input.name }
6551"#;
6552        let rule = parse_rule_file(yaml).unwrap();
6553        let input = r#"[{"name": "test"}]"#;
6554        let result = transform(&rule, input, None).unwrap();
6555        assert_eq!(
6556            result,
6557            serde_json::json!([{"name": "test", "upper": "TEST"}])
6558        );
6559    }
6560}