1use crate::expression::{AggregateOp, Expression, RankMethod, Transform};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone, PartialEq, Default)]
8pub enum Value {
9 #[default]
11 Null,
12 Bool(bool),
14 Number(f64),
16 String(String),
18 Array(Vec<Self>),
20 Object(HashMap<String, Self>),
22}
23
24impl Value {
25 #[must_use]
27 pub const fn null() -> Self {
28 Self::Null
29 }
30
31 #[must_use]
33 pub const fn bool(v: bool) -> Self {
34 Self::Bool(v)
35 }
36
37 #[must_use]
39 pub const fn number(v: f64) -> Self {
40 Self::Number(v)
41 }
42
43 #[must_use]
45 pub fn string(v: impl Into<String>) -> Self {
46 Self::String(v.into())
47 }
48
49 #[must_use]
51 pub const fn array(v: Vec<Self>) -> Self {
52 Self::Array(v)
53 }
54
55 #[must_use]
57 pub const fn object(v: HashMap<String, Self>) -> Self {
58 Self::Object(v)
59 }
60
61 #[must_use]
63 pub const fn is_null(&self) -> bool {
64 matches!(self, Self::Null)
65 }
66
67 #[must_use]
69 pub const fn is_bool(&self) -> bool {
70 matches!(self, Self::Bool(_))
71 }
72
73 #[must_use]
75 pub const fn is_number(&self) -> bool {
76 matches!(self, Self::Number(_))
77 }
78
79 #[must_use]
81 pub const fn is_string(&self) -> bool {
82 matches!(self, Self::String(_))
83 }
84
85 #[must_use]
87 pub const fn is_array(&self) -> bool {
88 matches!(self, Self::Array(_))
89 }
90
91 #[must_use]
93 pub const fn is_object(&self) -> bool {
94 matches!(self, Self::Object(_))
95 }
96
97 #[must_use]
99 pub const fn as_bool(&self) -> Option<bool> {
100 match self {
101 Self::Bool(v) => Some(*v),
102 _ => None,
103 }
104 }
105
106 #[must_use]
108 pub const fn as_number(&self) -> Option<f64> {
109 match self {
110 Self::Number(v) => Some(*v),
111 _ => None,
112 }
113 }
114
115 #[must_use]
117 pub fn as_str(&self) -> Option<&str> {
118 match self {
119 Self::String(v) => Some(v),
120 _ => None,
121 }
122 }
123
124 #[must_use]
126 pub const fn as_array(&self) -> Option<&Vec<Self>> {
127 match self {
128 Self::Array(v) => Some(v),
129 _ => None,
130 }
131 }
132
133 #[must_use]
135 pub fn as_array_mut(&mut self) -> Option<&mut Vec<Self>> {
136 match self {
137 Self::Array(v) => Some(v),
138 _ => None,
139 }
140 }
141
142 #[must_use]
144 pub const fn as_object(&self) -> Option<&HashMap<String, Self>> {
145 match self {
146 Self::Object(v) => Some(v),
147 _ => None,
148 }
149 }
150
151 #[must_use]
153 pub fn get(&self, key: &str) -> Option<&Self> {
154 match self {
155 Self::Object(map) => map.get(key),
156 _ => None,
157 }
158 }
159
160 pub fn require_array(&self) -> Result<&Vec<Self>, ExecutionError> {
162 self.as_array().ok_or(ExecutionError::ExpectedArray)
163 }
164
165 pub fn extract_numbers(&self, field: &str) -> Result<Vec<f64>, ExecutionError> {
167 Ok(self
168 .require_array()?
169 .iter()
170 .filter_map(|item| item.get(field)?.as_number())
171 .collect())
172 }
173
174 #[must_use]
176 pub fn len(&self) -> usize {
177 match self {
178 Self::Array(arr) => arr.len(),
179 Self::Object(obj) => obj.len(),
180 Self::String(s) => s.len(),
181 _ => 0,
182 }
183 }
184
185 #[must_use]
187 pub fn is_empty(&self) -> bool {
188 self.len() == 0
189 }
190}
191
192impl From<bool> for Value {
193 fn from(v: bool) -> Self {
194 Self::Bool(v)
195 }
196}
197
198impl From<f64> for Value {
199 fn from(v: f64) -> Self {
200 Self::Number(v)
201 }
202}
203
204impl From<i32> for Value {
205 fn from(v: i32) -> Self {
206 Self::Number(f64::from(v))
207 }
208}
209
210impl From<i64> for Value {
211 fn from(v: i64) -> Self {
212 Self::Number(v as f64)
213 }
214}
215
216impl From<&str> for Value {
217 fn from(v: &str) -> Self {
218 Self::String(v.to_string())
219 }
220}
221
222impl From<String> for Value {
223 fn from(v: String) -> Self {
224 Self::String(v)
225 }
226}
227
228impl<T: Into<Self>> From<Vec<T>> for Value {
229 fn from(v: Vec<T>) -> Self {
230 Self::Array(v.into_iter().map(Into::into).collect())
231 }
232}
233
234#[derive(Debug, Clone, PartialEq, Eq)]
236pub enum ExecutionError {
237 SourceNotFound(String),
239 ExpectedArray,
241 ExpectedObject,
243 FieldNotFound(String),
245 TypeMismatch(String),
247 InvalidTransform(String),
249}
250
251impl std::fmt::Display for ExecutionError {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 Self::SourceNotFound(name) => write!(f, "source not found: {name}"),
255 Self::ExpectedArray => write!(f, "expected an array"),
256 Self::ExpectedObject => write!(f, "expected an object"),
257 Self::FieldNotFound(name) => write!(f, "field not found: {name}"),
258 Self::TypeMismatch(msg) => write!(f, "type mismatch: {msg}"),
259 Self::InvalidTransform(msg) => write!(f, "invalid transform: {msg}"),
260 }
261 }
262}
263
264impl std::error::Error for ExecutionError {}
265
266#[derive(Debug, Clone, Default)]
268pub struct DataContext {
269 sources: HashMap<String, Value>,
271}
272
273impl DataContext {
274 #[must_use]
276 pub fn new() -> Self {
277 Self::default()
278 }
279
280 pub fn insert(&mut self, name: impl Into<String>, value: Value) {
282 self.sources.insert(name.into(), value);
283 }
284
285 #[must_use]
287 pub fn get(&self, name: &str) -> Option<&Value> {
288 let parts: Vec<&str> = name.split('.').collect();
290 let mut current = self.sources.get(parts[0])?;
291
292 for part in &parts[1..] {
293 current = current.get(part)?;
294 }
295
296 Some(current)
297 }
298
299 #[must_use]
301 pub fn contains(&self, name: &str) -> bool {
302 self.get(name).is_some()
303 }
304}
305
306#[derive(Debug, Default)]
308pub struct ExpressionExecutor;
309
310impl ExpressionExecutor {
311 #[must_use]
313 pub const fn new() -> Self {
314 Self
315 }
316
317 pub fn execute(&self, expr: &Expression, ctx: &DataContext) -> Result<Value, ExecutionError> {
323 let mut value = ctx
325 .get(&expr.source)
326 .cloned()
327 .ok_or_else(|| ExecutionError::SourceNotFound(expr.source.clone()))?;
328
329 for transform in &expr.transforms {
331 value = self.apply_transform(&value, transform, ctx)?;
332 }
333
334 Ok(value)
335 }
336
337 fn apply_transform(
338 &self,
339 value: &Value,
340 transform: &Transform,
341 ctx: &DataContext,
342 ) -> Result<Value, ExecutionError> {
343 match transform {
344 Transform::Filter {
345 field,
346 value: match_value,
347 } => self.apply_filter(value, field, match_value),
348 Transform::Select { fields } => self.apply_select(value, fields),
349 Transform::Sort { field, desc } => self.apply_sort(value, field, *desc),
350 Transform::Count => Ok(self.apply_count(value)),
351 Transform::Sum { field } => self.apply_sum(value, field),
352 Transform::Mean { field } => self.apply_mean(value, field),
353 Transform::Sample { n } => self.apply_sample(value, *n),
354 Transform::Percentage => self.apply_percentage(value),
355 Transform::Rate { window } => self.apply_rate(value, window),
356 Transform::Join { other, on } => self.apply_join(value, other, on, ctx),
357 Transform::GroupBy { field } => self.apply_group_by(value, field),
358 Transform::Distinct { field } => self.apply_distinct(value, field.as_deref()),
359 Transform::Where {
360 field,
361 op,
362 value: match_value,
363 } => self.apply_where(value, field, op, match_value),
364 Transform::Offset { n } => self.apply_offset(value, *n),
365 Transform::Min { field } => self.apply_min(value, field),
366 Transform::Max { field } => self.apply_max(value, field),
367 Transform::First { n } | Transform::Limit { n } => self.apply_limit(value, *n),
368 Transform::Last { n } => self.apply_last(value, *n),
369 Transform::Flatten => self.apply_flatten(value),
370 Transform::Reverse => self.apply_reverse(value),
371 Transform::Map { expr } => self.apply_map(value, expr),
373 Transform::Reduce { initial, expr } => self.apply_reduce(value, initial, expr),
374 Transform::Aggregate { field, op } => self.apply_aggregate(value, field, *op),
375 Transform::Pivot {
376 row_field,
377 col_field,
378 value_field,
379 } => self.apply_pivot(value, row_field, col_field, value_field),
380 Transform::CumulativeSum { field } => self.apply_cumsum(value, field),
381 Transform::Rank { field, method } => self.apply_rank(value, field, *method),
382 Transform::MovingAverage { field, window } => {
383 self.apply_moving_avg(value, field, *window)
384 }
385 Transform::PercentChange { field } => self.apply_pct_change(value, field),
386 Transform::Suggest { prefix, count } => self.apply_suggest(value, prefix, *count),
387 }
388 }
389
390 fn apply_filter(
391 &self,
392 value: &Value,
393 field: &str,
394 match_value: &str,
395 ) -> Result<Value, ExecutionError> {
396 let arr = value.require_array()?;
397
398 let filtered: Vec<Value> = arr
399 .iter()
400 .filter(|item| {
401 if let Some(obj) = item.as_object() {
402 if let Some(val) = obj.get(field) {
403 return self.value_matches(val, match_value);
404 }
405 }
406 false
407 })
408 .cloned()
409 .collect();
410
411 Ok(Value::Array(filtered))
412 }
413
414 fn value_matches(&self, value: &Value, target: &str) -> bool {
415 match value {
416 Value::String(s) => s == target,
417 Value::Number(n) => {
418 if let Ok(t) = target.parse::<f64>() {
419 (*n - t).abs() < f64::EPSILON
420 } else {
421 false
422 }
423 }
424 Value::Bool(b) => {
425 matches!((b, target), (true, "true") | (false, "false"))
426 }
427 _ => false,
428 }
429 }
430
431 fn apply_select(&self, value: &Value, fields: &[String]) -> Result<Value, ExecutionError> {
432 let arr = value.require_array()?;
433
434 let selected: Vec<Value> = arr
435 .iter()
436 .map(|item| {
437 if let Some(obj) = item.as_object() {
438 let mut new_obj = HashMap::new();
439 for field in fields {
440 if let Some(val) = obj.get(field) {
441 new_obj.insert(field.clone(), val.clone());
442 }
443 }
444 Value::Object(new_obj)
445 } else {
446 item.clone()
447 }
448 })
449 .collect();
450
451 Ok(Value::Array(selected))
452 }
453
454 fn apply_sort(&self, value: &Value, field: &str, desc: bool) -> Result<Value, ExecutionError> {
455 let arr = value.require_array()?;
456 let mut sorted = arr.clone();
457
458 sorted.sort_by(|a, b| {
459 let a_val = a.get(field);
460 let b_val = b.get(field);
461
462 let cmp = match (a_val, b_val) {
463 (Some(Value::Number(a)), Some(Value::Number(b))) => {
464 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
465 }
466 (Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
467 _ => std::cmp::Ordering::Equal,
468 };
469
470 if desc {
471 cmp.reverse()
472 } else {
473 cmp
474 }
475 });
476
477 Ok(Value::Array(sorted))
478 }
479
480 fn apply_limit(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
481 let arr = value.require_array()?;
482 Ok(Value::Array(arr.iter().take(n).cloned().collect()))
483 }
484
485 fn apply_count(&self, value: &Value) -> Value {
486 match value {
487 Value::Array(arr) => Value::Number(arr.len() as f64),
488 Value::Object(obj) => Value::Number(obj.len() as f64),
489 Value::String(s) => Value::Number(s.len() as f64),
490 _ => Value::Number(0.0),
491 }
492 }
493
494 fn apply_sum(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
495 let nums = value.extract_numbers(field)?;
496 Ok(Value::Number(nums.iter().sum()))
497 }
498
499 fn apply_mean(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
500 let nums = value.extract_numbers(field)?;
501 if nums.is_empty() {
502 return Ok(Value::Number(0.0));
503 }
504 Ok(Value::Number(nums.iter().sum::<f64>() / nums.len() as f64))
505 }
506
507 fn apply_sample(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
508 let arr = value.require_array()?;
509
510 Ok(Value::Array(arr.iter().take(n).cloned().collect()))
513 }
514
515 fn apply_percentage(&self, value: &Value) -> Result<Value, ExecutionError> {
516 match value {
517 Value::Number(n) => Ok(Value::Number(n * 100.0)),
518 _ => Err(ExecutionError::TypeMismatch(
519 "percentage requires a number".to_string(),
520 )),
521 }
522 }
523
524 fn apply_rate(&self, value: &Value, window: &str) -> Result<Value, ExecutionError> {
525 let arr = value.require_array()?;
526
527 let window_ms = self.parse_window(window)?;
529
530 let mut values_with_time: Vec<(f64, f64)> = arr
533 .iter()
534 .filter_map(|item| {
535 let obj = item.as_object()?;
536 let time = obj
537 .get("timestamp")
538 .or_else(|| obj.get("time"))
539 .and_then(Value::as_number)?;
540 let val = obj
541 .get("value")
542 .or_else(|| obj.get("count"))
543 .and_then(Value::as_number)
544 .unwrap_or(1.0);
545 Some((time, val))
546 })
547 .collect();
548
549 if values_with_time.len() < 2 {
550 return Ok(Value::Number(0.0));
551 }
552
553 values_with_time.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
555
556 let window_ms_f64 = window_ms as f64;
558 let last_time = values_with_time.last().map_or(0.0, |v| v.0);
559 let window_start = last_time - window_ms_f64;
560
561 let sum_in_window: f64 = values_with_time
562 .iter()
563 .filter(|(t, _)| *t >= window_start)
564 .map(|(_, v)| v)
565 .sum();
566
567 let rate = sum_in_window / (window_ms_f64 / 1000.0);
569
570 Ok(Value::Number(rate))
571 }
572
573 fn parse_window(&self, window: &str) -> Result<u64, ExecutionError> {
574 let window = window.trim();
575 if window.is_empty() {
576 return Err(ExecutionError::InvalidTransform("empty window".to_string()));
577 }
578
579 let (num_str, unit) = if let Some(s) = window.strip_suffix("ms") {
580 (s, "ms")
581 } else if let Some(s) = window.strip_suffix('s') {
582 (s, "s")
583 } else if let Some(s) = window.strip_suffix('m') {
584 (s, "m")
585 } else if let Some(s) = window.strip_suffix('h') {
586 (s, "h")
587 } else if let Some(s) = window.strip_suffix('d') {
588 (s, "d")
589 } else {
590 (window, "ms")
592 };
593
594 let num: u64 = num_str
595 .parse()
596 .map_err(|_| ExecutionError::InvalidTransform(format!("invalid window: {window}")))?;
597
598 let ms = match unit {
599 "s" => num * 1000,
600 "m" => num * 60 * 1000,
601 "h" => num * 60 * 60 * 1000,
602 "d" => num * 24 * 60 * 60 * 1000,
603 _ => num,
605 };
606
607 Ok(ms)
608 }
609
610 fn apply_join(
611 &self,
612 value: &Value,
613 other: &str,
614 on: &str,
615 ctx: &DataContext,
616 ) -> Result<Value, ExecutionError> {
617 let left_arr = value.require_array()?;
618
619 let right_value = ctx
621 .get(other)
622 .ok_or_else(|| ExecutionError::SourceNotFound(other.to_string()))?;
623 let right_arr = right_value
624 .as_array()
625 .ok_or(ExecutionError::ExpectedArray)?;
626
627 let mut right_lookup: HashMap<String, Vec<&Value>> = HashMap::new();
629 for item in right_arr {
630 if let Some(obj) = item.as_object() {
631 if let Some(key_val) = obj.get(on) {
632 let key = self.value_to_string(key_val);
633 right_lookup.entry(key).or_default().push(item);
634 }
635 }
636 }
637
638 let mut result = Vec::new();
640 for left_item in left_arr {
641 if let Some(left_obj) = left_item.as_object() {
642 if let Some(key_val) = left_obj.get(on) {
643 let key = self.value_to_string(key_val);
644 if let Some(right_items) = right_lookup.get(&key) {
645 for right_item in right_items {
647 if let Some(right_obj) = right_item.as_object() {
648 let mut merged = left_obj.clone();
650 for (k, v) in right_obj {
651 if merged.contains_key(k) && k != on {
653 merged.insert(format!("{other}_{k}"), v.clone());
654 } else if k != on {
655 merged.insert(k.clone(), v.clone());
656 }
657 }
658 result.push(Value::Object(merged));
659 }
660 }
661 } else {
662 result.push(left_item.clone());
664 }
665 } else {
666 result.push(left_item.clone());
668 }
669 } else {
670 result.push(left_item.clone());
672 }
673 }
674
675 Ok(Value::Array(result))
676 }
677
678 fn apply_group_by(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
679 let arr = value.require_array()?;
680
681 let mut groups: HashMap<String, Vec<Value>> = HashMap::new();
682
683 for item in arr {
684 let key = if let Some(obj) = item.as_object() {
685 if let Some(val) = obj.get(field) {
686 self.value_to_string(val)
687 } else {
688 "_null".to_string()
689 }
690 } else {
691 "_null".to_string()
692 };
693
694 groups.entry(key).or_default().push(item.clone());
695 }
696
697 let result: Vec<Value> = groups
699 .into_iter()
700 .map(|(key, items)| {
701 let mut obj = HashMap::new();
702 obj.insert("key".to_string(), Value::String(key));
703 obj.insert("items".to_string(), Value::Array(items.clone()));
704 obj.insert("count".to_string(), Value::Number(items.len() as f64));
705 Value::Object(obj)
706 })
707 .collect();
708
709 Ok(Value::Array(result))
710 }
711
712 fn value_to_string(&self, value: &Value) -> String {
713 match value {
714 Value::Null => "_null".to_string(),
715 Value::Bool(b) => b.to_string(),
716 Value::Number(n) => n.to_string(),
717 Value::String(s) => s.clone(),
718 Value::Array(_) => "_array".to_string(),
719 Value::Object(_) => "_object".to_string(),
720 }
721 }
722
723 fn apply_distinct(&self, value: &Value, field: Option<&str>) -> Result<Value, ExecutionError> {
724 let arr = value.require_array()?;
725
726 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
727 let mut result = Vec::new();
728
729 for item in arr {
730 let key = if let Some(f) = field {
731 if let Some(obj) = item.as_object() {
732 obj.get(f)
733 .map(|v| self.value_to_string(v))
734 .unwrap_or_default()
735 } else {
736 self.value_to_string(item)
737 }
738 } else {
739 self.value_to_string(item)
740 };
741
742 if seen.insert(key) {
743 result.push(item.clone());
744 }
745 }
746
747 Ok(Value::Array(result))
748 }
749
750 fn apply_where(
751 &self,
752 value: &Value,
753 field: &str,
754 op: &str,
755 match_value: &str,
756 ) -> Result<Value, ExecutionError> {
757 let arr = value.require_array()?;
758
759 let filtered: Vec<Value> = arr
760 .iter()
761 .filter(|item| {
762 if let Some(obj) = item.as_object() {
763 if let Some(val) = obj.get(field) {
764 return self.compare_values(val, op, match_value);
765 }
766 }
767 false
768 })
769 .cloned()
770 .collect();
771
772 Ok(Value::Array(filtered))
773 }
774
775 fn compare_values(&self, value: &Value, op: &str, target: &str) -> bool {
776 match op {
777 "eq" | "==" | "=" => self.value_matches(value, target),
778 "ne" | "!=" | "<>" => !self.value_matches(value, target),
779 "gt" | ">" => {
780 if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
781 v > t
782 } else {
783 false
784 }
785 }
786 "lt" | "<" => {
787 if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
788 v < t
789 } else {
790 false
791 }
792 }
793 "gte" | ">=" => {
794 if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
795 v >= t
796 } else {
797 false
798 }
799 }
800 "lte" | "<=" => {
801 if let (Some(v), Ok(t)) = (value.as_number(), target.parse::<f64>()) {
802 v <= t
803 } else {
804 false
805 }
806 }
807 "contains" => {
808 if let Some(s) = value.as_str() {
809 s.contains(target)
810 } else {
811 false
812 }
813 }
814 "starts_with" => {
815 if let Some(s) = value.as_str() {
816 s.starts_with(target)
817 } else {
818 false
819 }
820 }
821 "ends_with" => {
822 if let Some(s) = value.as_str() {
823 s.ends_with(target)
824 } else {
825 false
826 }
827 }
828 _ => false,
829 }
830 }
831
832 fn apply_offset(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
833 let arr = value.require_array()?;
834 Ok(Value::Array(arr.iter().skip(n).cloned().collect()))
835 }
836
837 fn apply_min(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
838 let nums = value.extract_numbers(field)?;
839 let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
840 Ok(if min.is_infinite() {
841 Value::Null
842 } else {
843 Value::Number(min)
844 })
845 }
846
847 fn apply_max(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
848 let nums = value.extract_numbers(field)?;
849 let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
850 Ok(if max.is_infinite() {
851 Value::Null
852 } else {
853 Value::Number(max)
854 })
855 }
856
857 fn apply_last(&self, value: &Value, n: usize) -> Result<Value, ExecutionError> {
858 let arr = value.require_array()?;
859 let len = arr.len();
860 let skip = len.saturating_sub(n);
861 Ok(Value::Array(arr.iter().skip(skip).cloned().collect()))
862 }
863
864 fn apply_flatten(&self, value: &Value) -> Result<Value, ExecutionError> {
865 let arr = value.require_array()?;
866
867 let mut result = Vec::new();
868 for item in arr {
869 if let Some(inner) = item.as_array() {
870 result.extend(inner.iter().cloned());
871 } else {
872 result.push(item.clone());
873 }
874 }
875
876 Ok(Value::Array(result))
877 }
878
879 fn apply_reverse(&self, value: &Value) -> Result<Value, ExecutionError> {
880 let arr = value.require_array()?;
881 let mut reversed = arr.clone();
882 reversed.reverse();
883 Ok(Value::Array(reversed))
884 }
885
886 fn apply_map(&self, value: &Value, expr: &str) -> Result<Value, ExecutionError> {
891 let arr = value.require_array()?;
892
893 let mapped: Vec<Value> = arr
896 .iter()
897 .map(|item| {
898 if let Some(field) = expr.strip_prefix("item.") {
900 if let Some(obj) = item.as_object() {
901 obj.get(field).cloned().unwrap_or(Value::Null)
902 } else {
903 item.clone()
904 }
905 } else {
906 item.clone()
908 }
909 })
910 .collect();
911
912 Ok(Value::Array(mapped))
913 }
914
915 fn apply_reduce(
916 &self,
917 value: &Value,
918 initial: &str,
919 _expr: &str,
920 ) -> Result<Value, ExecutionError> {
921 let arr = value.require_array()?;
922
923 let mut acc: f64 = initial.parse().unwrap_or(0.0);
925
926 for item in arr {
928 if let Some(n) = item.as_number() {
929 acc += n;
930 }
931 }
932
933 Ok(Value::Number(acc))
934 }
935
936 fn apply_aggregate(
937 &self,
938 value: &Value,
939 field: &str,
940 op: AggregateOp,
941 ) -> Result<Value, ExecutionError> {
942 let arr = value.require_array()?;
943
944 let values: Vec<f64> = arr
948 .iter()
949 .filter_map(|item| {
950 if let Some(obj) = item.as_object() {
951 if let Some(Value::Array(group_values)) = obj.get("values") {
953 return Some(
954 group_values
955 .iter()
956 .filter_map(|v| v.get(field)?.as_number())
957 .collect::<Vec<_>>(),
958 );
959 }
960 obj.get(field)?.as_number().map(|n| vec![n])
962 } else {
963 None
964 }
965 })
966 .flatten()
967 .collect();
968
969 let result = match op {
970 AggregateOp::Sum => values.iter().sum(),
971 AggregateOp::Count => values.len() as f64,
972 AggregateOp::Mean => {
973 if values.is_empty() {
974 0.0
975 } else {
976 values.iter().sum::<f64>() / values.len() as f64
977 }
978 }
979 AggregateOp::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
980 AggregateOp::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
981 AggregateOp::First => values.first().copied().unwrap_or(0.0),
982 AggregateOp::Last => values.last().copied().unwrap_or(0.0),
983 };
984
985 Ok(Value::Number(result))
986 }
987
988 fn apply_pivot(
989 &self,
990 value: &Value,
991 row_field: &str,
992 col_field: &str,
993 value_field: &str,
994 ) -> Result<Value, ExecutionError> {
995 let arr = value.require_array()?;
996
997 let mut rows: HashMap<String, HashMap<String, f64>> = HashMap::new();
999
1000 for item in arr {
1001 if let Some(obj) = item.as_object() {
1002 let row_key = obj
1003 .get(row_field)
1004 .map(|v| self.value_to_string(v))
1005 .unwrap_or_default();
1006 let col_key = obj
1007 .get(col_field)
1008 .map(|v| self.value_to_string(v))
1009 .unwrap_or_default();
1010 let val = obj
1011 .get(value_field)
1012 .and_then(|v| v.as_number())
1013 .unwrap_or(0.0);
1014
1015 rows.entry(row_key)
1016 .or_default()
1017 .entry(col_key)
1018 .and_modify(|v| *v += val)
1019 .or_insert(val);
1020 }
1021 }
1022
1023 let result: Vec<Value> = rows
1025 .into_iter()
1026 .map(|(row_key, cols)| {
1027 let mut obj = HashMap::new();
1028 obj.insert(row_field.to_string(), Value::String(row_key));
1029 for (col_key, val) in cols {
1030 obj.insert(col_key, Value::Number(val));
1031 }
1032 Value::Object(obj)
1033 })
1034 .collect();
1035
1036 Ok(Value::Array(result))
1037 }
1038
1039 fn apply_cumsum(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
1040 let arr = value.require_array()?;
1041
1042 let mut running_sum = 0.0;
1043 let result: Vec<Value> = arr
1044 .iter()
1045 .map(|item| {
1046 if let Some(obj) = item.as_object() {
1047 let val = obj.get(field).and_then(|v| v.as_number()).unwrap_or(0.0);
1048 running_sum += val;
1049
1050 let mut new_obj = obj.clone();
1051 new_obj.insert(format!("{field}_cumsum"), Value::Number(running_sum));
1052 Value::Object(new_obj)
1053 } else {
1054 item.clone()
1055 }
1056 })
1057 .collect();
1058
1059 Ok(Value::Array(result))
1060 }
1061
1062 fn apply_rank(
1063 &self,
1064 value: &Value,
1065 field: &str,
1066 method: RankMethod,
1067 ) -> Result<Value, ExecutionError> {
1068 let arr = value.require_array()?;
1069
1070 let mut indexed: Vec<(usize, f64)> = arr
1072 .iter()
1073 .enumerate()
1074 .filter_map(|(i, item)| item.as_object()?.get(field)?.as_number().map(|n| (i, n)))
1075 .collect();
1076
1077 indexed.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1079
1080 let mut ranks = vec![0.0; arr.len()];
1082 match method {
1083 RankMethod::Dense => {
1084 let mut rank = 0;
1085 let mut prev_val: Option<f64> = None;
1086 for (i, val) in indexed {
1087 if prev_val != Some(val) {
1088 rank += 1;
1089 }
1090 ranks[i] = rank as f64;
1091 prev_val = Some(val);
1092 }
1093 }
1094 RankMethod::Ordinal => {
1095 for (rank, (i, _)) in indexed.iter().enumerate() {
1096 ranks[*i] = (rank + 1) as f64;
1097 }
1098 }
1099 RankMethod::Average => {
1100 let mut i = 0;
1101 while i < indexed.len() {
1102 let val = indexed[i].1;
1103 let start = i;
1104 while i < indexed.len() && (indexed[i].1 - val).abs() < f64::EPSILON {
1105 i += 1;
1106 }
1107 let avg_rank =
1108 (start + 1..=i).map(|r| r as f64).sum::<f64>() / (i - start) as f64;
1109 for j in start..i {
1110 ranks[indexed[j].0] = avg_rank;
1111 }
1112 }
1113 }
1114 }
1115
1116 let result: Vec<Value> = arr
1118 .iter()
1119 .enumerate()
1120 .map(|(i, item)| {
1121 if let Some(obj) = item.as_object() {
1122 let mut new_obj = obj.clone();
1123 new_obj.insert(format!("{field}_rank"), Value::Number(ranks[i]));
1124 Value::Object(new_obj)
1125 } else {
1126 item.clone()
1127 }
1128 })
1129 .collect();
1130
1131 Ok(Value::Array(result))
1132 }
1133
1134 fn apply_moving_avg(
1135 &self,
1136 value: &Value,
1137 field: &str,
1138 window: usize,
1139 ) -> Result<Value, ExecutionError> {
1140 let arr = value.require_array()?;
1141
1142 let values: Vec<f64> = arr
1143 .iter()
1144 .filter_map(|item| item.as_object()?.get(field)?.as_number())
1145 .collect();
1146
1147 let result: Vec<Value> = arr
1148 .iter()
1149 .enumerate()
1150 .map(|(i, item)| {
1151 if let Some(obj) = item.as_object() {
1152 let start = i.saturating_sub(window - 1);
1153 let window_values = &values[start..=i.min(values.len() - 1)];
1154 let ma = if window_values.is_empty() {
1155 0.0
1156 } else {
1157 window_values.iter().sum::<f64>() / window_values.len() as f64
1158 };
1159
1160 let mut new_obj = obj.clone();
1161 new_obj.insert(format!("{field}_ma{window}"), Value::Number(ma));
1162 Value::Object(new_obj)
1163 } else {
1164 item.clone()
1165 }
1166 })
1167 .collect();
1168
1169 Ok(Value::Array(result))
1170 }
1171
1172 fn apply_pct_change(&self, value: &Value, field: &str) -> Result<Value, ExecutionError> {
1173 let arr = value.require_array()?;
1174
1175 let values: Vec<f64> = arr
1176 .iter()
1177 .filter_map(|item| item.as_object()?.get(field)?.as_number())
1178 .collect();
1179
1180 let result: Vec<Value> = arr
1181 .iter()
1182 .enumerate()
1183 .map(|(i, item)| {
1184 if let Some(obj) = item.as_object() {
1185 let pct = if i == 0 || values.get(i - 1).map_or(true, |&prev| prev == 0.0) {
1186 0.0
1187 } else {
1188 let prev = values[i - 1];
1189 let curr = values.get(i).copied().unwrap_or(prev);
1190 (curr - prev) / prev * 100.0
1191 };
1192
1193 let mut new_obj = obj.clone();
1194 new_obj.insert(format!("{field}_pct_change"), Value::Number(pct));
1195 Value::Object(new_obj)
1196 } else {
1197 item.clone()
1198 }
1199 })
1200 .collect();
1201
1202 Ok(Value::Array(result))
1203 }
1204
1205 #[allow(clippy::unnecessary_wraps)] fn apply_suggest(
1215 &self,
1216 value: &Value,
1217 prefix: &str,
1218 count: usize,
1219 ) -> Result<Value, ExecutionError> {
1220 if let Some(obj) = value.as_object() {
1222 if let Some(suggestions) = obj.get("_suggestions") {
1224 if let Some(arr) = suggestions.as_array() {
1225 return Ok(Value::Array(arr.iter().take(count).cloned().collect()));
1226 }
1227 }
1228
1229 if obj.contains_key("model_type") || obj.contains_key("source") {
1232 return Ok(Value::Array(vec![]));
1234 }
1235 }
1236
1237 if let Some(arr) = value.as_array() {
1239 let filtered: Vec<Value> = arr
1240 .iter()
1241 .filter(|item| {
1242 if let Some(obj) = item.as_object() {
1243 if let Some(text) = obj.get("text").and_then(|v| v.as_str()) {
1244 return text.starts_with(prefix);
1245 }
1246 }
1247 false
1248 })
1249 .take(count)
1250 .cloned()
1251 .collect();
1252 return Ok(Value::Array(filtered));
1253 }
1254
1255 Ok(Value::Array(vec![]))
1257 }
1258}