Skip to main content

presentar_yaml/
executor.rs

1//! Expression executor for data transformations.
2
3use crate::expression::{AggregateOp, Expression, RankMethod, Transform};
4use std::collections::HashMap;
5
6/// A generic value that can hold any data type.
7#[derive(Debug, Clone, PartialEq, Default)]
8pub enum Value {
9    /// Null value
10    #[default]
11    Null,
12    /// Boolean value
13    Bool(bool),
14    /// Numeric value
15    Number(f64),
16    /// String value
17    String(String),
18    /// Array of values
19    Array(Vec<Self>),
20    /// Object (key-value map)
21    Object(HashMap<String, Self>),
22}
23
24impl Value {
25    /// Create a new null value.
26    #[must_use]
27    pub const fn null() -> Self {
28        Self::Null
29    }
30
31    /// Create a new boolean value.
32    #[must_use]
33    pub const fn bool(v: bool) -> Self {
34        Self::Bool(v)
35    }
36
37    /// Create a new number value.
38    #[must_use]
39    pub const fn number(v: f64) -> Self {
40        Self::Number(v)
41    }
42
43    /// Create a new string value.
44    #[must_use]
45    pub fn string(v: impl Into<String>) -> Self {
46        Self::String(v.into())
47    }
48
49    /// Create a new array value.
50    #[must_use]
51    pub const fn array(v: Vec<Self>) -> Self {
52        Self::Array(v)
53    }
54
55    /// Create a new object value.
56    #[must_use]
57    pub const fn object(v: HashMap<String, Self>) -> Self {
58        Self::Object(v)
59    }
60
61    /// Check if value is null.
62    #[must_use]
63    pub const fn is_null(&self) -> bool {
64        matches!(self, Self::Null)
65    }
66
67    /// Check if value is a boolean.
68    #[must_use]
69    pub const fn is_bool(&self) -> bool {
70        matches!(self, Self::Bool(_))
71    }
72
73    /// Check if value is a number.
74    #[must_use]
75    pub const fn is_number(&self) -> bool {
76        matches!(self, Self::Number(_))
77    }
78
79    /// Check if value is a string.
80    #[must_use]
81    pub const fn is_string(&self) -> bool {
82        matches!(self, Self::String(_))
83    }
84
85    /// Check if value is an array.
86    #[must_use]
87    pub const fn is_array(&self) -> bool {
88        matches!(self, Self::Array(_))
89    }
90
91    /// Check if value is an object.
92    #[must_use]
93    pub const fn is_object(&self) -> bool {
94        matches!(self, Self::Object(_))
95    }
96
97    /// Get as boolean.
98    #[must_use]
99    pub const fn as_bool(&self) -> Option<bool> {
100        match self {
101            Self::Bool(v) => Some(*v),
102            _ => None,
103        }
104    }
105
106    /// Get as number.
107    #[must_use]
108    pub const fn as_number(&self) -> Option<f64> {
109        match self {
110            Self::Number(v) => Some(*v),
111            _ => None,
112        }
113    }
114
115    /// Get as string.
116    #[must_use]
117    pub fn as_str(&self) -> Option<&str> {
118        match self {
119            Self::String(v) => Some(v),
120            _ => None,
121        }
122    }
123
124    /// Get as array.
125    #[must_use]
126    pub const fn as_array(&self) -> Option<&Vec<Self>> {
127        match self {
128            Self::Array(v) => Some(v),
129            _ => None,
130        }
131    }
132
133    /// Get as mutable array.
134    #[must_use]
135    pub fn as_array_mut(&mut self) -> Option<&mut Vec<Self>> {
136        match self {
137            Self::Array(v) => Some(v),
138            _ => None,
139        }
140    }
141
142    /// Get as object.
143    #[must_use]
144    pub const fn as_object(&self) -> Option<&HashMap<String, Self>> {
145        match self {
146            Self::Object(v) => Some(v),
147            _ => None,
148        }
149    }
150
151    /// Get field from object.
152    #[must_use]
153    pub fn get(&self, key: &str) -> Option<&Self> {
154        match self {
155            Self::Object(map) => map.get(key),
156            _ => None,
157        }
158    }
159
160    /// Get as array or return `ExpectedArray` error (DRY helper for apply_ methods).
161    pub fn require_array(&self) -> Result<&Vec<Self>, ExecutionError> {
162        self.as_array().ok_or(ExecutionError::ExpectedArray)
163    }
164
165    /// Extract numeric values from array items by field name.
166    pub fn extract_numbers(&self, field: &str) -> Result<Vec<f64>, ExecutionError> {
167        Ok(self
168            .require_array()?
169            .iter()
170            .filter_map(|item| item.get(field)?.as_number())
171            .collect())
172    }
173
174    /// Get array length or object key count.
175    #[must_use]
176    pub fn len(&self) -> usize {
177        match self {
178            Self::Array(arr) => arr.len(),
179            Self::Object(obj) => obj.len(),
180            Self::String(s) => s.len(),
181            _ => 0,
182        }
183    }
184
185    /// Check if array or object is empty.
186    #[must_use]
187    pub fn is_empty(&self) -> bool {
188        self.len() == 0
189    }
190}
191
192impl From<bool> for Value {
193    fn from(v: bool) -> Self {
194        Self::Bool(v)
195    }
196}
197
198impl From<f64> for Value {
199    fn from(v: f64) -> Self {
200        Self::Number(v)
201    }
202}
203
204impl From<i32> for Value {
205    fn from(v: i32) -> Self {
206        Self::Number(f64::from(v))
207    }
208}
209
210impl From<i64> for Value {
211    fn from(v: i64) -> Self {
212        Self::Number(v as f64)
213    }
214}
215
216impl From<&str> for Value {
217    fn from(v: &str) -> Self {
218        Self::String(v.to_string())
219    }
220}
221
222impl From<String> for Value {
223    fn from(v: String) -> Self {
224        Self::String(v)
225    }
226}
227
228impl<T: Into<Self>> From<Vec<T>> for Value {
229    fn from(v: Vec<T>) -> Self {
230        Self::Array(v.into_iter().map(Into::into).collect())
231    }
232}
233
234/// Execution error.
235#[derive(Debug, Clone, PartialEq, Eq)]
236pub enum ExecutionError {
237    /// Source not found in data context
238    SourceNotFound(String),
239    /// Expected an array for this transform
240    ExpectedArray,
241    /// Expected an object
242    ExpectedObject,
243    /// Field not found
244    FieldNotFound(String),
245    /// Type mismatch
246    TypeMismatch(String),
247    /// Invalid transform
248    InvalidTransform(String),
249}
250
251impl std::fmt::Display for ExecutionError {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            Self::SourceNotFound(name) => write!(f, "source not found: {name}"),
255            Self::ExpectedArray => write!(f, "expected an array"),
256            Self::ExpectedObject => write!(f, "expected an object"),
257            Self::FieldNotFound(name) => write!(f, "field not found: {name}"),
258            Self::TypeMismatch(msg) => write!(f, "type mismatch: {msg}"),
259            Self::InvalidTransform(msg) => write!(f, "invalid transform: {msg}"),
260        }
261    }
262}
263
264impl std::error::Error for ExecutionError {}
265
266/// Data context for expression execution.
267#[derive(Debug, Clone, Default)]
268pub struct DataContext {
269    /// Named data sources
270    sources: HashMap<String, Value>,
271}
272
273impl DataContext {
274    /// Create a new empty context.
275    #[must_use]
276    pub fn new() -> Self {
277        Self::default()
278    }
279
280    /// Add a data source.
281    pub fn insert(&mut self, name: impl Into<String>, value: Value) {
282        self.sources.insert(name.into(), value);
283    }
284
285    /// Get a data source.
286    #[must_use]
287    pub fn get(&self, name: &str) -> Option<&Value> {
288        // Support dotted paths like "data.transactions"
289        let parts: Vec<&str> = name.split('.').collect();
290        let mut current = self.sources.get(parts[0])?;
291
292        for part in &parts[1..] {
293            current = current.get(part)?;
294        }
295
296        Some(current)
297    }
298
299    /// Check if context has a source.
300    #[must_use]
301    pub fn contains(&self, name: &str) -> bool {
302        self.get(name).is_some()
303    }
304}
305
306/// Expression executor.
307#[derive(Debug, Default)]
308pub struct ExpressionExecutor;
309
310impl ExpressionExecutor {
311    /// Create a new executor.
312    #[must_use]
313    pub const fn new() -> Self {
314        Self
315    }
316
317    /// Execute an expression against a data context.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if execution fails.
322    pub fn execute(&self, expr: &Expression, ctx: &DataContext) -> Result<Value, ExecutionError> {
323        // Resolve source
324        let mut value = ctx
325            .get(&expr.source)
326            .cloned()
327            .ok_or_else(|| ExecutionError::SourceNotFound(expr.source.clone()))?;
328
329        // Apply transforms
330        for transform in &expr.transforms {
331            value = self.apply_transform(&value, transform, ctx)?;
332        }
333
334        Ok(value)
335    }
336
337    fn apply_transform(
338        &self,
339        value: &Value,
340        transform: &Transform,
341        ctx: &DataContext,
342    ) -> Result<Value, ExecutionError> {
343        match transform {
344            Transform::Filter {
345                field,
346                value: match_value,
347            } => self.apply_filter(value, field, match_value),
348            Transform::Select { fields } => self.apply_select(value, fields),
349            Transform::Sort { field, desc } => self.apply_sort(value, field, *desc),
350            Transform::Count => Ok(self.apply_count(value)),
351            Transform::Sum { field } => self.apply_sum(value, field),
352            Transform::Mean { field } => self.apply_mean(value, field),
353            Transform::Sample { n } => self.apply_sample(value, *n),
354            Transform::Percentage => self.apply_percentage(value),
355            Transform::Rate { window } => self.apply_rate(value, window),
356            Transform::Join { other, on } => self.apply_join(value, other, on, ctx),
357            Transform::GroupBy { field } => self.apply_group_by(value, field),
358            Transform::Distinct { field } => self.apply_distinct(value, field.as_deref()),
359            Transform::Where {
360                field,
361                op,
362                value: match_value,
363            } => self.apply_where(value, field, op, match_value),
364            Transform::Offset { n } => self.apply_offset(value, *n),
365            Transform::Min { field } => self.apply_min(value, field),
366            Transform::Max { field } => self.apply_max(value, field),
367            Transform::First { n } | Transform::Limit { n } => self.apply_limit(value, *n),
368            Transform::Last { n } => self.apply_last(value, *n),
369            Transform::Flatten => self.apply_flatten(value),
370            Transform::Reverse => self.apply_reverse(value),
371            // New transforms
372            Transform::Map { expr } => self.apply_map(value, expr),
373            Transform::Reduce { initial, expr } => self.apply_reduce(value, initial, expr),
374            Transform::Aggregate { field, op } => self.apply_aggregate(value, field, *op),
375            Transform::Pivot {
376                row_field,
377                col_field,
378                value_field,
379            } => self.apply_pivot(value, row_field, col_field, value_field),
380            Transform::CumulativeSum { field } => self.apply_cumsum(value, field),
381            Transform::Rank { field, method } => self.apply_rank(value, field, *method),
382            Transform::MovingAverage { field, window } => {
383                self.apply_moving_avg(value, field, *window)
384            }
385            Transform::PercentChange { field } => self.apply_pct_change(value, field),
386            Transform::Suggest { prefix, count } => self.apply_suggest(value, prefix, *count),
387        }
388    }
389
390    fn apply_filter(
391        &self,
392        value: &Value,
393        field: &str,
394        match_value: &str,
395    ) -> Result<Value, ExecutionError> {
396        let arr = value.require_array()?;
397
398        let filtered: Vec<Value> = arr
399            .iter()
400            .filter(|item| {
401                if let Some(obj) = item.as_object() {
402                    if let Some(val) = obj.get(field) {
403                        return self.value_matches(val, match_value);
404                    }
405                }
406                false
407            })
408            .cloned()
409            .collect();
410
411        Ok(Value::Array(filtered))
412    }
413
414    fn value_matches(&self, value: &Value, target: &str) -> bool {
415        match value {
416            Value::String(s) => s == target,
417            Value::Number(n) => {
418                if let Ok(t) = target.parse::<f64>() {
419                    (*n - t).abs() < f64::EPSILON
420                } else {
421                    false
422                }
423            }
424            Value::Bool(b) => {
425                matches!((b, target), (true, "true") | (false, "false"))
426            }
427            _ => false,
428        }
429    }
430
431    fn apply_select(&self, value: &Value, fields: &[String]) -> Result<Value, ExecutionError> {
432        let arr = value.require_array()?;
433
434        let selected: Vec<Value> = arr
435            .iter()
436            .map(|item| {
437                if let Some(obj) = item.as_object() {
438                    let mut new_obj = HashMap::new();
439                    for field in fields {
440                        if let Some(val) = obj.get(field) {
441                            new_obj.insert(field.clone(), val.clone());
442                        }
443                    }
444                    Value::Object(new_obj)
445                } else {
446                    item.clone()
447                }
448            })
449            .collect();
450
451        Ok(Value::Array(selected))
452    }
453
454    fn apply_sort(&self, value: &Value, field: &str, desc: bool) -> Result<Value, ExecutionError> {
455        let arr = value.require_array()?;
456        let mut sorted = arr.clone();
457
458        sorted.sort_by(|a, b| {
459            let a_val = a.get(field);
460            let b_val = b.get(field);
461
462            let cmp = match (a_val, b_val) {
463                (Some(Value::Number(a)), Some(Value::Number(b))) => {
464                    a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
465                }
466                (Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
467                _ => std::cmp::Ordering::Equal,
468            };
469
470            if desc {
471                cmp.reverse()
472            } else {
473                cmp
474            }
475        });
476
477        Ok(Value::Array(sorted))
478    }
479
480    fn apply_limit(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
481        let arr = value.require_array()?;
482        Ok(Value::Array(arr.iter().take(n).cloned().collect()))
483    }
484
485    fn apply_count(&self, value: &Value) -> Value {
486        match value {
487            Value::Array(arr) => Value::Number(arr.len() as f64),
488            Value::Object(obj) => Value::Number(obj.len() as f64),
489            Value::String(s) => Value::Number(s.len() as f64),
490            _ => Value::Number(0.0),
491        }
492    }
493
494    fn apply_sum(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
495        let nums = value.extract_numbers(field)?;
496        Ok(Value::Number(nums.iter().sum()))
497    }
498
499    fn apply_mean(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
500        let nums = value.extract_numbers(field)?;
501        if nums.is_empty() {
502            return Ok(Value::Number(0.0));
503        }
504        Ok(Value::Number(nums.iter().sum::<f64>() / nums.len() as f64))
505    }
506
507    fn apply_sample(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
508        let arr = value.require_array()?;
509
510        // Simple deterministic "sampling" - just take first n elements
511        // Real implementation would use random sampling
512        Ok(Value::Array(arr.iter().take(n).cloned().collect()))
513    }
514
515    fn apply_percentage(&self, value: &Value) -> Result<Value, ExecutionError> {
516        match value {
517            Value::Number(n) => Ok(Value::Number(n * 100.0)),
518            _ => Err(ExecutionError::TypeMismatch(
519                "percentage requires a number".to_string(),
520            )),
521        }
522    }
523
524    fn apply_rate(&self, value: &Value, window: &str) -> Result<Value, ExecutionError> {
525        let arr = value.require_array()?;
526
527        // Parse window (e.g., "1m", "5m", "1h")
528        let window_ms = self.parse_window(window)?;
529
530        // For rate calculation, we need timestamped data
531        // Look for a "timestamp" or "time" field
532        let mut values_with_time: Vec<(f64, f64)> = arr
533            .iter()
534            .filter_map(|item| {
535                let obj = item.as_object()?;
536                let time = obj
537                    .get("timestamp")
538                    .or_else(|| obj.get("time"))
539                    .and_then(Value::as_number)?;
540                let val = obj
541                    .get("value")
542                    .or_else(|| obj.get("count"))
543                    .and_then(Value::as_number)
544                    .unwrap_or(1.0);
545                Some((time, val))
546            })
547            .collect();
548
549        if values_with_time.len() < 2 {
550            return Ok(Value::Number(0.0));
551        }
552
553        // Sort by time
554        values_with_time.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
555
556        // Calculate rate over the window
557        let window_ms_f64 = window_ms as f64;
558        let last_time = values_with_time.last().map_or(0.0, |v| v.0);
559        let window_start = last_time - window_ms_f64;
560
561        let sum_in_window: f64 = values_with_time
562            .iter()
563            .filter(|(t, _)| *t >= window_start)
564            .map(|(_, v)| v)
565            .sum();
566
567        // Rate per second
568        let rate = sum_in_window / (window_ms_f64 / 1000.0);
569
570        Ok(Value::Number(rate))
571    }
572
573    fn parse_window(&self, window: &str) -> Result<u64, ExecutionError> {
574        let window = window.trim();
575        if window.is_empty() {
576            return Err(ExecutionError::InvalidTransform("empty window".to_string()));
577        }
578
579        let (num_str, unit) = if let Some(s) = window.strip_suffix("ms") {
580            (s, "ms")
581        } else if let Some(s) = window.strip_suffix('s') {
582            (s, "s")
583        } else if let Some(s) = window.strip_suffix('m') {
584            (s, "m")
585        } else if let Some(s) = window.strip_suffix('h') {
586            (s, "h")
587        } else if let Some(s) = window.strip_suffix('d') {
588            (s, "d")
589        } else {
590            // Assume milliseconds if no unit
591            (window, "ms")
592        };
593
594        let num: u64 = num_str
595            .parse()
596            .map_err(|_| ExecutionError::InvalidTransform(format!("invalid window: {window}")))?;
597
598        let ms = match unit {
599            "s" => num * 1000,
600            "m" => num * 60 * 1000,
601            "h" => num * 60 * 60 * 1000,
602            "d" => num * 24 * 60 * 60 * 1000,
603            // "ms" and any other unit default to num (milliseconds)
604            _ => num,
605        };
606
607        Ok(ms)
608    }
609
610    fn apply_join(
611        &self,
612        value: &Value,
613        other: &str,
614        on: &str,
615        ctx: &DataContext,
616    ) -> Result<Value, ExecutionError> {
617        let left_arr = value.require_array()?;
618
619        // Get the other dataset from context
620        let right_value = ctx
621            .get(other)
622            .ok_or_else(|| ExecutionError::SourceNotFound(other.to_string()))?;
623        let right_arr = right_value
624            .as_array()
625            .ok_or(ExecutionError::ExpectedArray)?;
626
627        // Build a lookup map for the right side (keyed by the join field)
628        let mut right_lookup: HashMap<String, Vec<&Value>> = HashMap::new();
629        for item in right_arr {
630            if let Some(obj) = item.as_object() {
631                if let Some(key_val) = obj.get(on) {
632                    let key = self.value_to_string(key_val);
633                    right_lookup.entry(key).or_default().push(item);
634                }
635            }
636        }
637
638        // Perform the join (left join - keeps all left items)
639        let mut result = Vec::new();
640        for left_item in left_arr {
641            if let Some(left_obj) = left_item.as_object() {
642                if let Some(key_val) = left_obj.get(on) {
643                    let key = self.value_to_string(key_val);
644                    if let Some(right_items) = right_lookup.get(&key) {
645                        // Join with each matching right item
646                        for right_item in right_items {
647                            if let Some(right_obj) = right_item.as_object() {
648                                // Merge left and right objects
649                                let mut merged = left_obj.clone();
650                                for (k, v) in right_obj {
651                                    // Don't overwrite left values, prefix with source name
652                                    if merged.contains_key(k) && k != on {
653                                        merged.insert(format!("{other}_{k}"), v.clone());
654                                    } else if k != on {
655                                        merged.insert(k.clone(), v.clone());
656                                    }
657                                }
658                                result.push(Value::Object(merged));
659                            }
660                        }
661                    } else {
662                        // No match, keep left item as-is (left join behavior)
663                        result.push(left_item.clone());
664                    }
665                } else {
666                    // No join key, keep as-is
667                    result.push(left_item.clone());
668                }
669            } else {
670                // Not an object, keep as-is
671                result.push(left_item.clone());
672            }
673        }
674
675        Ok(Value::Array(result))
676    }
677
678    fn apply_group_by(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
679        let arr = value.require_array()?;
680
681        let mut groups: HashMap<String, Vec<Value>> = HashMap::new();
682
683        for item in arr {
684            let key = if let Some(obj) = item.as_object() {
685                if let Some(val) = obj.get(field) {
686                    self.value_to_string(val)
687                } else {
688                    "_null".to_string()
689                }
690            } else {
691                "_null".to_string()
692            };
693
694            groups.entry(key).or_default().push(item.clone());
695        }
696
697        // Convert to array of objects with key and items
698        let result: Vec<Value> = groups
699            .into_iter()
700            .map(|(key, items)| {
701                let mut obj = HashMap::new();
702                obj.insert("key".to_string(), Value::String(key));
703                obj.insert("items".to_string(), Value::Array(items.clone()));
704                obj.insert("count".to_string(), Value::Number(items.len() as f64));
705                Value::Object(obj)
706            })
707            .collect();
708
709        Ok(Value::Array(result))
710    }
711
712    fn value_to_string(&self, value: &Value) -> String {
713        match value {
714            Value::Null => "_null".to_string(),
715            Value::Bool(b) => b.to_string(),
716            Value::Number(n) => n.to_string(),
717            Value::String(s) => s.clone(),
718            Value::Array(_) => "_array".to_string(),
719            Value::Object(_) => "_object".to_string(),
720        }
721    }
722
723    fn apply_distinct(&self, value: &Value, field: Option<&str>) -> Result<Value, ExecutionError> {
724        let arr = value.require_array()?;
725
726        let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
727        let mut result = Vec::new();
728
729        for item in arr {
730            let key = if let Some(f) = field {
731                if let Some(obj) = item.as_object() {
732                    obj.get(f)
733                        .map(|v| self.value_to_string(v))
734                        .unwrap_or_default()
735                } else {
736                    self.value_to_string(item)
737                }
738            } else {
739                self.value_to_string(item)
740            };
741
742            if seen.insert(key) {
743                result.push(item.clone());
744            }
745        }
746
747        Ok(Value::Array(result))
748    }
749
750    fn apply_where(
751        &self,
752        value: &Value,
753        field: &str,
754        op: &str,
755        match_value: &str,
756    ) -> Result<Value, ExecutionError> {
757        let arr = value.require_array()?;
758
759        let filtered: Vec<Value> = arr
760            .iter()
761            .filter(|item| {
762                if let Some(obj) = item.as_object() {
763                    if let Some(val) = obj.get(field) {
764                        return self.compare_values(val, op, match_value);
765                    }
766                }
767                false
768            })
769            .cloned()
770            .collect();
771
772        Ok(Value::Array(filtered))
773    }
774
775    fn compare_values(&self, value: &Value, op: &str, target: &str) -> bool {
776        match op {
777            "eq" | "==" | "=" => self.value_matches(value, target),
778            "ne" | "!=" | "<>" => !self.value_matches(value, target),
779            "gt" | ">" => {
780                if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
781                    v > t
782                } else {
783                    false
784                }
785            }
786            "lt" | "<" => {
787                if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
788                    v < t
789                } else {
790                    false
791                }
792            }
793            "gte" | ">=" => {
794                if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
795                    v >= t
796                } else {
797                    false
798                }
799            }
800            "lte" | "<=" => {
801                if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
802                    v <= t
803                } else {
804                    false
805                }
806            }
807            "contains" => {
808                if let Some(s) = value.as_str() {
809                    s.contains(target)
810                } else {
811                    false
812                }
813            }
814            "starts_with" => {
815                if let Some(s) = value.as_str() {
816                    s.starts_with(target)
817                } else {
818                    false
819                }
820            }
821            "ends_with" => {
822                if let Some(s) = value.as_str() {
823                    s.ends_with(target)
824                } else {
825                    false
826                }
827            }
828            _ => false,
829        }
830    }
831
832    fn apply_offset(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
833        let arr = value.require_array()?;
834        Ok(Value::Array(arr.iter().skip(n).cloned().collect()))
835    }
836
837    fn apply_min(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
838        let nums = value.extract_numbers(field)?;
839        let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
840        Ok(if min.is_infinite() {
841            Value::Null
842        } else {
843            Value::Number(min)
844        })
845    }
846
847    fn apply_max(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
848        let nums = value.extract_numbers(field)?;
849        let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
850        Ok(if max.is_infinite() {
851            Value::Null
852        } else {
853            Value::Number(max)
854        })
855    }
856
857    fn apply_last(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
858        let arr = value.require_array()?;
859        let len = arr.len();
860        let skip = len.saturating_sub(n);
861        Ok(Value::Array(arr.iter().skip(skip).cloned().collect()))
862    }
863
864    fn apply_flatten(&self, value: &Value) -> Result<Value, ExecutionError> {
865        let arr = value.require_array()?;
866
867        let mut result = Vec::new();
868        for item in arr {
869            if let Some(inner) = item.as_array() {
870                result.extend(inner.iter().cloned());
871            } else {
872                result.push(item.clone());
873            }
874        }
875
876        Ok(Value::Array(result))
877    }
878
879    fn apply_reverse(&self, value: &Value) -> Result<Value, ExecutionError> {
880        let arr = value.require_array()?;
881        let mut reversed = arr.clone();
882        reversed.reverse();
883        Ok(Value::Array(reversed))
884    }
885
886    // =========================================================================
887    // New Transform Implementations
888    // =========================================================================
889
890    fn apply_map(&self, value: &Value, expr: &str) -> Result<Value, ExecutionError> {
891        let arr = value.require_array()?;
892
893        // Simple expression evaluation: extract field if expr is "item.field"
894        // For complex expressions, this would need a proper expression evaluator
895        let mapped: Vec<Value> = arr
896            .iter()
897            .map(|item| {
898                // Handle simple field access like "item.field"
899                if let Some(field) = expr.strip_prefix("item.") {
900                    if let Some(obj) = item.as_object() {
901                        obj.get(field).cloned().unwrap_or(Value::Null)
902                    } else {
903                        item.clone()
904                    }
905                } else {
906                    // Return item unchanged if we can't parse the expression
907                    item.clone()
908                }
909            })
910            .collect();
911
912        Ok(Value::Array(mapped))
913    }
914
915    fn apply_reduce(
916        &self,
917        value: &Value,
918        initial: &str,
919        _expr: &str,
920    ) -> Result<Value, ExecutionError> {
921        let arr = value.require_array()?;
922
923        // Parse initial value
924        let mut acc: f64 = initial.parse().unwrap_or(0.0);
925
926        // Simple sum reduction (a proper implementation would evaluate the expr)
927        for item in arr {
928            if let Some(n) = item.as_number() {
929                acc += n;
930            }
931        }
932
933        Ok(Value::Number(acc))
934    }
935
936    fn apply_aggregate(
937        &self,
938        value: &Value,
939        field: &str,
940        op: AggregateOp,
941    ) -> Result<Value, ExecutionError> {
942        let arr = value.require_array()?;
943
944        // For grouped data, expect array of {key: ..., values: [...]}
945        // For ungrouped data, operate on the field directly
946
947        let values: Vec<f64> = arr
948            .iter()
949            .filter_map(|item| {
950                if let Some(obj) = item.as_object() {
951                    // If this is a group with "values" key
952                    if let Some(Value::Array(group_values)) = obj.get("values") {
953                        return Some(
954                            group_values
955                                .iter()
956                                .filter_map(|v| v.get(field)?.as_number())
957                                .collect::<Vec<_>>(),
958                        );
959                    }
960                    // Direct field access
961                    obj.get(field)?.as_number().map(|n| vec![n])
962                } else {
963                    None
964                }
965            })
966            .flatten()
967            .collect();
968
969        let result = match op {
970            AggregateOp::Sum => values.iter().sum(),
971            AggregateOp::Count => values.len() as f64,
972            AggregateOp::Mean => {
973                if values.is_empty() {
974                    0.0
975                } else {
976                    values.iter().sum::<f64>() / values.len() as f64
977                }
978            }
979            AggregateOp::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
980            AggregateOp::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
981            AggregateOp::First => values.first().copied().unwrap_or(0.0),
982            AggregateOp::Last => values.last().copied().unwrap_or(0.0),
983        };
984
985        Ok(Value::Number(result))
986    }
987
988    fn apply_pivot(
989        &self,
990        value: &Value,
991        row_field: &str,
992        col_field: &str,
993        value_field: &str,
994    ) -> Result<Value, ExecutionError> {
995        let arr = value.require_array()?;
996
997        // Build pivot table
998        let mut rows: HashMap<String, HashMap<String, f64>> = HashMap::new();
999
1000        for item in arr {
1001            if let Some(obj) = item.as_object() {
1002                let row_key = obj
1003                    .get(row_field)
1004                    .map(|v| self.value_to_string(v))
1005                    .unwrap_or_default();
1006                let col_key = obj
1007                    .get(col_field)
1008                    .map(|v| self.value_to_string(v))
1009                    .unwrap_or_default();
1010                let val = obj
1011                    .get(value_field)
1012                    .and_then(|v| v.as_number())
1013                    .unwrap_or(0.0);
1014
1015                rows.entry(row_key)
1016                    .or_default()
1017                    .entry(col_key)
1018                    .and_modify(|v| *v += val)
1019                    .or_insert(val);
1020            }
1021        }
1022
1023        // Convert to array of objects
1024        let result: Vec<Value> = rows
1025            .into_iter()
1026            .map(|(row_key, cols)| {
1027                let mut obj = HashMap::new();
1028                obj.insert(row_field.to_string(), Value::String(row_key));
1029                for (col_key, val) in cols {
1030                    obj.insert(col_key, Value::Number(val));
1031                }
1032                Value::Object(obj)
1033            })
1034            .collect();
1035
1036        Ok(Value::Array(result))
1037    }
1038
1039    fn apply_cumsum(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
1040        let arr = value.require_array()?;
1041
1042        let mut running_sum = 0.0;
1043        let result: Vec<Value> = arr
1044            .iter()
1045            .map(|item| {
1046                if let Some(obj) = item.as_object() {
1047                    let val = obj.get(field).and_then(|v| v.as_number()).unwrap_or(0.0);
1048                    running_sum += val;
1049
1050                    let mut new_obj = obj.clone();
1051                    new_obj.insert(format!("{field}_cumsum"), Value::Number(running_sum));
1052                    Value::Object(new_obj)
1053                } else {
1054                    item.clone()
1055                }
1056            })
1057            .collect();
1058
1059        Ok(Value::Array(result))
1060    }
1061
1062    fn apply_rank(
1063        &self,
1064        value: &Value,
1065        field: &str,
1066        method: RankMethod,
1067    ) -> Result<Value, ExecutionError> {
1068        let arr = value.require_array()?;
1069
1070        // Extract values with indices
1071        let mut indexed: Vec<(usize, f64)> = arr
1072            .iter()
1073            .enumerate()
1074            .filter_map(|(i, item)| item.as_object()?.get(field)?.as_number().map(|n| (i, n)))
1075            .collect();
1076
1077        // Sort by value (descending for ranking)
1078        indexed.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1079
1080        // Assign ranks based on method
1081        let mut ranks = vec![0.0; arr.len()];
1082        match method {
1083            RankMethod::Dense => {
1084                let mut rank = 0;
1085                let mut prev_val: Option<f64> = None;
1086                for (i, val) in indexed {
1087                    if prev_val != Some(val) {
1088                        rank += 1;
1089                    }
1090                    ranks[i] = rank as f64;
1091                    prev_val = Some(val);
1092                }
1093            }
1094            RankMethod::Ordinal => {
1095                for (rank, (i, _)) in indexed.iter().enumerate() {
1096                    ranks[*i] = (rank + 1) as f64;
1097                }
1098            }
1099            RankMethod::Average => {
1100                let mut i = 0;
1101                while i < indexed.len() {
1102                    let val = indexed[i].1;
1103                    let start = i;
1104                    while i < indexed.len() && (indexed[i].1 - val).abs() < f64::EPSILON {
1105                        i += 1;
1106                    }
1107                    let avg_rank =
1108                        (start + 1..=i).map(|r| r as f64).sum::<f64>() / (i - start) as f64;
1109                    for j in start..i {
1110                        ranks[indexed[j].0] = avg_rank;
1111                    }
1112                }
1113            }
1114        }
1115
1116        // Add rank to each object
1117        let result: Vec<Value> = arr
1118            .iter()
1119            .enumerate()
1120            .map(|(i, item)| {
1121                if let Some(obj) = item.as_object() {
1122                    let mut new_obj = obj.clone();
1123                    new_obj.insert(format!("{field}_rank"), Value::Number(ranks[i]));
1124                    Value::Object(new_obj)
1125                } else {
1126                    item.clone()
1127                }
1128            })
1129            .collect();
1130
1131        Ok(Value::Array(result))
1132    }
1133
1134    fn apply_moving_avg(
1135        &self,
1136        value: &Value,
1137        field: &str,
1138        window: usize,
1139    ) -> Result<Value, ExecutionError> {
1140        let arr = value.require_array()?;
1141
1142        let values: Vec<f64> = arr
1143            .iter()
1144            .filter_map(|item| item.as_object()?.get(field)?.as_number())
1145            .collect();
1146
1147        let result: Vec<Value> = arr
1148            .iter()
1149            .enumerate()
1150            .map(|(i, item)| {
1151                if let Some(obj) = item.as_object() {
1152                    let start = i.saturating_sub(window - 1);
1153                    let window_values = &values[start..=i.min(values.len() - 1)];
1154                    let ma = if window_values.is_empty() {
1155                        0.0
1156                    } else {
1157                        window_values.iter().sum::<f64>() / window_values.len() as f64
1158                    };
1159
1160                    let mut new_obj = obj.clone();
1161                    new_obj.insert(format!("{field}_ma{window}"), Value::Number(ma));
1162                    Value::Object(new_obj)
1163                } else {
1164                    item.clone()
1165                }
1166            })
1167            .collect();
1168
1169        Ok(Value::Array(result))
1170    }
1171
1172    fn apply_pct_change(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
1173        let arr = value.require_array()?;
1174
1175        let values: Vec<f64> = arr
1176            .iter()
1177            .filter_map(|item| item.as_object()?.get(field)?.as_number())
1178            .collect();
1179
1180        let result: Vec<Value> = arr
1181            .iter()
1182            .enumerate()
1183            .map(|(i, item)| {
1184                if let Some(obj) = item.as_object() {
1185                    let pct = if i == 0 || values.get(i - 1).map_or(true, |&prev| prev == 0.0) {
1186                        0.0
1187                    } else {
1188                        let prev = values[i - 1];
1189                        let curr = values.get(i).copied().unwrap_or(prev);
1190                        (curr - prev) / prev * 100.0
1191                    };
1192
1193                    let mut new_obj = obj.clone();
1194                    new_obj.insert(format!("{field}_pct_change"), Value::Number(pct));
1195                    Value::Object(new_obj)
1196                } else {
1197                    item.clone()
1198                }
1199            })
1200            .collect();
1201
1202        Ok(Value::Array(result))
1203    }
1204
1205    /// Apply suggestion transform for N-gram/autocomplete models.
1206    ///
1207    /// The input `value` should be a model object with a `model_type` field.
1208    /// This is a stub implementation - actual model inference is handled by
1209    /// the runtime layer which injects results into the context.
1210    ///
1211    /// In production, the runtime loads the .apr model and provides suggestions
1212    /// through a callback or pre-computed context value.
1213    #[allow(clippy::unnecessary_wraps)] // Returns Result for API consistency with other transforms
1214    fn apply_suggest(
1215        &self,
1216        value: &Value,
1217        prefix: &str,
1218        count: usize,
1219    ) -> Result<Value, ExecutionError> {
1220        // Check if value is a model object with pre-computed suggestions
1221        if let Some(obj) = value.as_object() {
1222            // If the model has pre-computed suggestions for this prefix, use them
1223            if let Some(suggestions) = obj.get("_suggestions") {
1224                if let Some(arr) = suggestions.as_array() {
1225                    return Ok(Value::Array(arr.iter().take(count).cloned().collect()));
1226                }
1227            }
1228
1229            // If this is a model reference, return placeholder suggestions
1230            // The runtime layer should populate _suggestions before execution
1231            if obj.contains_key("model_type") || obj.contains_key("source") {
1232                // Return empty array - runtime should inject actual suggestions
1233                return Ok(Value::Array(vec![]));
1234            }
1235        }
1236
1237        // For testing/demo: if value is an array of suggestion objects, filter by prefix
1238        if let Some(arr) = value.as_array() {
1239            let filtered: Vec<Value> = arr
1240                .iter()
1241                .filter(|item| {
1242                    if let Some(obj) = item.as_object() {
1243                        if let Some(text) = obj.get("text").and_then(|v| v.as_str()) {
1244                            return text.starts_with(prefix);
1245                        }
1246                    }
1247                    false
1248                })
1249                .take(count)
1250                .cloned()
1251                .collect();
1252            return Ok(Value::Array(filtered));
1253        }
1254
1255        // Fallback: return empty suggestions
1256        Ok(Value::Array(vec![]))
1257    }
1258}