sea_core/policy/
quantifier.rs

1use super::expression::{AggregateFunction, BinaryOp, Expression, Quantifier};
2use crate::graph::Graph;
3use rust_decimal::prelude::ToPrimitive;
4use rust_decimal::Decimal;
5use std::convert::TryFrom;
6use std::str::FromStr;
7
8impl Expression {
9    pub fn expand(&self, graph: &Graph) -> Result<Expression, String> {
10        match self {
11            Expression::Quantifier {
12                quantifier,
13                variable,
14                collection,
15                condition,
16            } => {
17                let items = Self::get_collection(collection, graph)?;
18
19                match quantifier {
20                    Quantifier::ForAll => {
21                        if items.is_empty() {
22                            return Ok(Expression::literal(true));
23                        }
24
25                        let mut result = Expression::literal(true);
26                        for item in items {
27                            let substituted = condition.substitute(variable, &item)?;
28                            let expanded = substituted.expand(graph)?;
29                            result = Expression::binary(BinaryOp::And, result, expanded);
30                        }
31                        Ok(result)
32                    }
33                    Quantifier::Exists => {
34                        if items.is_empty() {
35                            return Ok(Expression::literal(false));
36                        }
37
38                        let mut result = Expression::literal(false);
39                        for item in items {
40                            let substituted = condition.substitute(variable, &item)?;
41                            let expanded = substituted.expand(graph)?;
42                            result = Expression::binary(BinaryOp::Or, result, expanded);
43                        }
44                        Ok(result)
45                    }
46                    Quantifier::ExistsUnique => {
47                        let mut count = 0;
48                        for item in items {
49                            let substituted = condition.substitute(variable, &item)?;
50                            let expanded = substituted.expand(graph)?;
51
52                            if Self::is_true_literal(&expanded) {
53                                count += 1;
54                            }
55                        }
56                        Ok(Expression::literal(count == 1))
57                    }
58                }
59            }
60            Expression::GroupBy {
61                variable,
62                collection,
63                filter,
64                key,
65                condition,
66            } => {
67                let items = Self::get_collection(collection, graph)?;
68
69                // Filter items if filter is present
70                let filtered_items = if let Some(filter_expr) = filter {
71                    let mut filtered = Vec::new();
72                    for item in items {
73                        let substituted = filter_expr.substitute(variable, &item)?;
74                        let expanded = substituted.expand(graph)?;
75                        if Self::is_true_literal(&expanded) {
76                            filtered.push(item);
77                        }
78                    }
79                    filtered
80                } else {
81                    items
82                };
83
84                // Group items
85                let mut groups: std::collections::HashMap<String, Vec<serde_json::Value>> =
86                    std::collections::HashMap::new();
87                for item in filtered_items {
88                    let substituted_key = key.substitute(variable, &item)?;
89                    let expanded_key = substituted_key.expand(graph)?;
90                    let key_str = match expanded_key {
91                        Expression::Literal(v) => match v {
92                            serde_json::Value::String(s) => s,
93                            serde_json::Value::Number(n) => n.to_string(),
94                            serde_json::Value::Bool(b) => b.to_string(),
95                            serde_json::Value::Null => "null".to_string(),
96                            other => {
97                                return Err(format!(
98                                "Group key must be a string, number, bool, or null literal, got {}",
99                                other
100                            ))
101                            }
102                        },
103                        _ => {
104                            return Err("Group key must evaluate to a literal".to_string());
105                        }
106                    };
107                    groups.entry(key_str).or_default().push(item);
108                }
109
110                // Evaluate condition for each group
111                for (_group_key, group_items) in groups {
112                    // Substitute the variable with the group collection (as a Literal Array)
113                    // This allows aggregations inside the condition to use the group items
114                    let substituted_condition =
115                        condition.substitute(variable, &serde_json::Value::Array(group_items))?;
116                    let expanded = substituted_condition.expand(graph)?;
117                    if !Self::is_true_literal(&expanded) {
118                        return Ok(Expression::literal(false));
119                    }
120                }
121                Ok(Expression::literal(true))
122            }
123            Expression::Binary { op, left, right } => {
124                let left_expanded = left.expand(graph)?;
125                let right_expanded = right.expand(graph)?;
126                Ok(Self::reduce_binary_expression(
127                    op,
128                    left_expanded,
129                    right_expanded,
130                )?)
131            }
132            Expression::Unary { op, operand } => {
133                Ok(Expression::unary(op.clone(), operand.expand(graph)?))
134            }
135            Expression::MemberAccess { .. } => Ok(self.clone()),
136            Expression::Aggregation {
137                function,
138                collection,
139                field,
140                filter,
141            } => {
142                // Evaluate the aggregation and return a literal
143                let result =
144                    Self::evaluate_aggregation(function, collection, field, filter, graph)?;
145                Ok(Expression::Literal(result))
146            }
147            Expression::AggregationComprehension {
148                function,
149                variable,
150                collection,
151                window,
152                predicate,
153                projection,
154                target_unit,
155            } => {
156                let result = Self::evaluate_aggregation_comprehension(
157                    function,
158                    variable,
159                    collection,
160                    window,
161                    predicate,
162                    projection,
163                    target_unit.as_deref(),
164                    graph,
165                )?;
166                Ok(Expression::Literal(result))
167            }
168            _ => Ok(self.clone()),
169        }
170    }
171
172    pub fn substitute(&self, var: &str, value: &serde_json::Value) -> Result<Expression, String> {
173        match self {
174            Expression::Variable(n) => {
175                if n == var {
176                    Ok(Expression::Literal(value.clone()))
177                } else if n.starts_with(&format!("{}.", var)) {
178                    let field = &n[var.len() + 1..];
179                    if let Some(field_value) = value.get(field) {
180                        Ok(Expression::Literal(field_value.clone()))
181                    } else {
182                        // For three-valued semantics, if a field is missing in a substituted
183                        // value, treat it as NULL/unknown rather than a fatal error. That way
184                        // quantifiers and nested expressions can evaluate to NULL when data
185                        // is optional.
186                        Ok(Expression::Literal(serde_json::Value::Null))
187                    }
188                } else {
189                    Ok(self.clone())
190                }
191            }
192            Expression::Binary { op, left, right } => Ok(Expression::binary(
193                op.clone(),
194                left.substitute(var, value)?,
195                right.substitute(var, value)?,
196            )),
197            Expression::Unary { op, operand } => Ok(Expression::unary(
198                op.clone(),
199                operand.substitute(var, value)?,
200            )),
201            Expression::Quantifier {
202                quantifier,
203                variable,
204                collection,
205                condition,
206            } => {
207                if var == variable {
208                    // Don't substitute into condition when var matches the bound variable
209                    // to avoid variable capture
210                    Ok(Expression::quantifier(
211                        quantifier.clone(),
212                        variable,
213                        collection.substitute(var, value)?,
214                        *condition.clone(),
215                    ))
216                } else {
217                    // Substitute into both collection and condition
218                    Ok(Expression::quantifier(
219                        quantifier.clone(),
220                        variable,
221                        collection.substitute(var, value)?,
222                        condition.substitute(var, value)?,
223                    ))
224                }
225            }
226            Expression::MemberAccess { object, member } => {
227                if object == var {
228                    if let Some(field_value) = value.get(member) {
229                        Ok(Expression::Literal(field_value.clone()))
230                    } else {
231                        Ok(Expression::Literal(serde_json::Value::Null))
232                    }
233                } else {
234                    Ok(self.clone())
235                }
236            }
237            Expression::Aggregation {
238                function,
239                collection,
240                field,
241                filter,
242            } => Ok(Expression::aggregation(
243                function.clone(),
244                collection.substitute(var, value)?,
245                field.clone(),
246                filter
247                    .as_ref()
248                    .map(|f| f.substitute(var, value))
249                    .transpose()?,
250            )),
251            Expression::AggregationComprehension {
252                function,
253                variable,
254                collection,
255                window,
256                predicate,
257                projection,
258                target_unit,
259            } => Ok(Expression::AggregationComprehension {
260                function: function.clone(),
261                variable: variable.clone(),
262                collection: Box::new(collection.substitute(var, value)?),
263                window: window.clone(),
264                predicate: Box::new(predicate.substitute(var, value)?),
265                projection: Box::new(projection.substitute(var, value)?),
266                target_unit: target_unit.clone(),
267            }),
268            Expression::GroupBy {
269                variable,
270                collection,
271                filter,
272                key,
273                condition,
274            } => {
275                // Similar to Quantifier, check if variable matches
276                if var == variable {
277                    Ok(Expression::GroupBy {
278                        variable: variable.clone(),
279                        collection: Box::new(collection.substitute(var, value)?),
280                        filter: filter.clone(),
281                        key: key.clone(),
282                        condition: condition.clone(),
283                    })
284                } else {
285                    Ok(Expression::GroupBy {
286                        variable: variable.clone(),
287                        collection: Box::new(collection.substitute(var, value)?),
288                        filter: filter
289                            .as_ref()
290                            .map(|f| f.substitute(var, value))
291                            .transpose()?
292                            .map(Box::new),
293                        key: Box::new(key.substitute(var, value)?),
294                        condition: Box::new(condition.substitute(var, value)?),
295                    })
296                }
297            }
298            _ => Ok(self.clone()),
299        }
300    }
301
302    pub(crate) fn get_collection(
303        expr: &Expression,
304        graph: &Graph,
305    ) -> Result<Vec<serde_json::Value>, String> {
306        match expr {
307            Expression::Variable(name) => match name.as_str() {
308                "flows" => {
309                    let flows: Result<Vec<serde_json::Value>, String> = graph
310                        .all_flows()
311                        .iter()
312                        .map(|f| {
313                            let quantity = f.quantity().to_f64().ok_or_else(|| {
314                                format!("Failed to convert flow quantity {} to f64", f.quantity())
315                            })?;
316
317                            let mut map = serde_json::Map::new();
318                            map.insert("id".to_string(), serde_json::json!(f.id().to_string()));
319                            map.insert(
320                                "from_entity".to_string(),
321                                serde_json::json!(f.from_id().to_string()),
322                            );
323                            map.insert(
324                                "to_entity".to_string(),
325                                serde_json::json!(f.to_id().to_string()),
326                            );
327                            map.insert(
328                                "resource".to_string(),
329                                serde_json::json!(f.resource_id().to_string()),
330                            );
331                            map.insert("quantity".to_string(), serde_json::json!(quantity));
332
333                            for (k, v) in f.attributes().iter() {
334                                if matches!(
335                                    k.as_str(),
336                                    "id" | "from_entity" | "to_entity" | "resource" | "quantity"
337                                ) || map.contains_key(k)
338                                {
339                                    continue;
340                                }
341                                map.insert(k.clone(), v.clone());
342                            }
343                            Ok(serde_json::Value::Object(map))
344                        })
345                        .collect();
346                    flows
347                }
348                "entities" => {
349                    let entities: Vec<serde_json::Value> = graph
350                        .all_entities()
351                        .iter()
352                        .map(|e| {
353                            let mut map = serde_json::Map::new();
354                            map.insert("id".to_string(), serde_json::json!(e.id().to_string()));
355                            map.insert("name".to_string(), serde_json::json!(e.name()));
356                            map.insert("namespace".to_string(), serde_json::json!(e.namespace()));
357
358                            let roles = graph.role_names_for_entity(e.id());
359                            if !roles.is_empty() {
360                                map.insert("roles".to_string(), serde_json::json!(roles));
361                            }
362
363                            for (k, v) in e.attributes().iter() {
364                                if matches!(k.as_str(), "id" | "name" | "namespace")
365                                    || map.contains_key(k)
366                                {
367                                    continue;
368                                }
369                                map.insert(k.clone(), v.clone());
370                            }
371
372                            serde_json::Value::Object(map)
373                        })
374                        .collect();
375                    Ok(entities)
376                }
377                "relations" => {
378                    let relations: Vec<serde_json::Value> = graph
379                        .all_relations()
380                        .iter()
381                        .map(|relation| {
382                            let mut map = serde_json::Map::new();
383                            map.insert(
384                                "id".to_string(),
385                                serde_json::json!(relation.id().to_string()),
386                            );
387                            map.insert("name".to_string(), serde_json::json!(relation.name()));
388                            map.insert(
389                                "predicate".to_string(),
390                                serde_json::json!(relation.predicate()),
391                            );
392
393                            if let Some(subject) = graph.get_role(relation.subject_role()) {
394                                map.insert(
395                                    "subject_role".to_string(),
396                                    serde_json::json!(subject.name()),
397                                );
398                            }
399
400                            if let Some(object) = graph.get_role(relation.object_role()) {
401                                map.insert(
402                                    "object_role".to_string(),
403                                    serde_json::json!(object.name()),
404                                );
405                            }
406
407                            if let Some(flow) = relation.via_flow() {
408                                map.insert("via".to_string(), serde_json::json!(flow.to_string()));
409                            }
410
411                            serde_json::Value::Object(map)
412                        })
413                        .collect();
414
415                    Ok(relations)
416                }
417                "resources" => {
418                    let resources: Vec<serde_json::Value> = graph
419                        .all_resources()
420                        .iter()
421                        .map(|r| {
422                            let mut map = serde_json::Map::new();
423                            map.insert("id".to_string(), serde_json::json!(r.id().to_string()));
424                            map.insert("name".to_string(), serde_json::json!(r.name()));
425                            map.insert("namespace".to_string(), serde_json::json!(r.namespace()));
426                            map.insert("unit".to_string(), serde_json::json!(r.unit()));
427                            for (k, v) in r.attributes().iter() {
428                                if matches!(k.as_str(), "id" | "name" | "namespace" | "unit")
429                                    || map.contains_key(k)
430                                {
431                                    continue;
432                                }
433                                map.insert(k.clone(), v.clone());
434                            }
435                            serde_json::Value::Object(map)
436                        })
437                        .collect();
438                    Ok(resources)
439                }
440                "instances" => {
441                    let instances: Vec<serde_json::Value> = graph
442                        .all_instances()
443                        .iter()
444                        .map(|i| {
445                            let mut map = serde_json::Map::new();
446                            map.insert("id".to_string(), serde_json::json!(i.id().to_string()));
447                            map.insert(
448                                "entity".to_string(),
449                                serde_json::json!(i.entity_id().to_string()),
450                            );
451                            map.insert(
452                                "resource".to_string(),
453                                serde_json::json!(i.resource_id().to_string()),
454                            );
455                            for (k, v) in i.attributes().iter() {
456                                if matches!(k.as_str(), "id" | "entity" | "resource")
457                                    || map.contains_key(k)
458                                {
459                                    continue;
460                                }
461                                map.insert(k.clone(), v.clone());
462                            }
463                            serde_json::Value::Object(map)
464                        })
465                        .collect();
466                    Ok(instances)
467                }
468                _ => Err(format!("Unknown collection: {}", name)),
469            },
470            Expression::Literal(serde_json::Value::Array(arr)) => Ok(arr.clone()),
471            _ => Err("Collection expression must be a variable or array literal".to_string()),
472        }
473    }
474
475    fn is_true_literal(expr: &Expression) -> bool {
476        matches!(expr, Expression::Literal(v) if v.as_bool() == Some(true))
477    }
478
479    pub(crate) fn evaluate_aggregation(
480        function: &AggregateFunction,
481        collection: &Expression,
482        field: &Option<String>,
483        filter: &Option<Box<Expression>>,
484        graph: &Graph,
485    ) -> Result<serde_json::Value, String> {
486        // Get the collection items
487        let items = Self::get_collection(collection, graph)?;
488
489        // Apply filter if present
490        let filtered_items = if let Some(filter_expr) = filter {
491            // Determine the variable name based on collection type
492            let variable_name = match collection {
493                Expression::Variable(name) => match name.as_str() {
494                    "flows" => "flow",
495                    "entities" => "entity",
496                    "resources" => "resource",
497                    "instances" => "instance",
498                    "relations" => "relation",
499                    _ => "item",
500                },
501                _ => "item",
502            };
503
504            items
505                .into_iter()
506                .filter(|item| {
507                    // Substitute collection-specific variables in the filter
508                    let substituted = filter_expr
509                        .substitute(variable_name, item)
510                        .unwrap_or_else(|_| filter_expr.as_ref().clone());
511                    // Expand and check if true
512                    if let Ok(expanded) = substituted.expand(graph) {
513                        Self::is_true_literal(&expanded)
514                    } else {
515                        false
516                    }
517                })
518                .collect::<Vec<_>>()
519        } else {
520            items
521        };
522
523        // Apply aggregation function
524        match function {
525            AggregateFunction::Count => Ok(serde_json::json!(filtered_items.len())),
526
527            AggregateFunction::Sum => {
528                let field_name = field.as_ref().ok_or("Sum requires a field specification")?;
529
530                let sum: Decimal = filtered_items
531                    .iter()
532                    .filter_map(|item| {
533                        item.get(field_name).and_then(|v| {
534                            if let Some(num) = v.as_f64() {
535                                Decimal::from_str(&num.to_string()).ok()
536                            } else if let Some(s) = v.as_str() {
537                                Decimal::from_str(s).ok()
538                            } else {
539                                None
540                            }
541                        })
542                    })
543                    .sum();
544
545                // Convert Decimal to f64 and propagate parsing errors
546                let sum_f64 = sum
547                    .to_f64()
548                    .ok_or_else(|| format!("Failed to convert sum {} to f64", sum))?;
549
550                Ok(serde_json::json!(sum_f64))
551            }
552
553            AggregateFunction::Avg => {
554                let field_name = field.as_ref().ok_or("Avg requires a field specification")?;
555
556                let values: Vec<Decimal> = filtered_items
557                    .iter()
558                    .filter_map(|item| {
559                        item.get(field_name).and_then(|v| {
560                            if let Some(num) = v.as_f64() {
561                                Decimal::from_str(&num.to_string()).ok()
562                            } else if let Some(s) = v.as_str() {
563                                Decimal::from_str(s).ok()
564                            } else {
565                                None
566                            }
567                        })
568                    })
569                    .collect();
570
571                if values.is_empty() {
572                    return Ok(serde_json::json!(null));
573                }
574
575                let sum: Decimal = values.iter().copied().sum();
576                let avg = sum / Decimal::from(values.len());
577
578                // Convert Decimal to f64 and propagate parsing errors
579                let avg_f64 = avg
580                    .to_f64()
581                    .ok_or_else(|| format!("Failed to convert average {} to f64", avg))?;
582
583                Ok(serde_json::json!(avg_f64))
584            }
585
586            AggregateFunction::Min => {
587                let field_name = field.as_ref().ok_or("Min requires a field specification")?;
588
589                let min = filtered_items
590                    .iter()
591                    .filter_map(|item| {
592                        item.get(field_name).and_then(|v| {
593                            if let Some(num) = v.as_f64() {
594                                Decimal::from_str(&num.to_string()).ok()
595                            } else if let Some(s) = v.as_str() {
596                                Decimal::from_str(s).ok()
597                            } else {
598                                None
599                            }
600                        })
601                    })
602                    .min();
603
604                // Convert Decimal to f64 and propagate parsing errors
605                if let Some(min_val) = min {
606                    let min_f64 = min_val
607                        .to_f64()
608                        .ok_or_else(|| format!("Failed to convert min {} to f64", min_val))?;
609                    Ok(serde_json::json!(min_f64))
610                } else {
611                    Ok(serde_json::json!(null))
612                }
613            }
614
615            AggregateFunction::Max => {
616                let field_name = field.as_ref().ok_or("Max requires a field specification")?;
617
618                let max = filtered_items
619                    .iter()
620                    .filter_map(|item| {
621                        item.get(field_name).and_then(|v| {
622                            if let Some(num) = v.as_f64() {
623                                Decimal::from_str(&num.to_string()).ok()
624                            } else if let Some(s) = v.as_str() {
625                                Decimal::from_str(s).ok()
626                            } else {
627                                None
628                            }
629                        })
630                    })
631                    .max();
632
633                // Convert Decimal to f64 and propagate parsing errors
634                if let Some(max_val) = max {
635                    let max_f64 = max_val
636                        .to_f64()
637                        .ok_or_else(|| format!("Failed to convert max {} to f64", max_val))?;
638                    Ok(serde_json::json!(max_f64))
639                } else {
640                    Ok(serde_json::json!(null))
641                }
642            }
643        }
644    }
645
646    #[allow(clippy::too_many_arguments)]
647    pub(crate) fn evaluate_aggregation_comprehension(
648        function: &AggregateFunction,
649        variable: &str,
650        collection: &Expression,
651        window: &Option<crate::policy::WindowSpec>,
652        predicate: &Expression,
653        projection: &Expression,
654        target_unit: Option<&str>,
655        graph: &Graph,
656    ) -> Result<serde_json::Value, String> {
657        let items = Self::get_collection(collection, graph)?;
658
659        // Apply window filtering if present
660        let items = if let Some(w) = window {
661            let now = chrono::Utc::now();
662            let duration_span = i64::try_from(w.duration)
663                .map_err(|_| format!("Window duration {} exceeds supported range", w.duration))?;
664            let unit_lower = w.unit.to_lowercase();
665            let duration = match unit_lower.as_str() {
666                "hour" | "hours" => chrono::Duration::hours(duration_span),
667                "minute" | "minutes" => chrono::Duration::minutes(duration_span),
668                "day" | "days" => chrono::Duration::days(duration_span),
669                "second" | "seconds" => chrono::Duration::seconds(duration_span),
670                _ => {
671                    return Err(format!(
672                        "Invalid window unit '{}' in aggregation window",
673                        w.unit
674                    ))
675                }
676            };
677
678            items
679                .into_iter()
680                .filter(|item| {
681                    let ts_str = item
682                        .get("timestamp")
683                        .or_else(|| item.get("created_at"))
684                        .and_then(|v| v.as_str());
685                    if let Some(s) = ts_str {
686                        if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(s) {
687                            let ts_utc: chrono::DateTime<chrono::Utc> = ts.into();
688                            return ts_utc >= now - duration;
689                        }
690                    }
691                    // If no timestamp, we can't filter, so maybe exclude? Or include?
692                    // Safer to exclude if windowing is requested but data is missing.
693                    false
694                })
695                .collect()
696        } else {
697            items
698        };
699
700        let mut projected_values = Vec::new();
701        for item in items {
702            let substituted_predicate = predicate.substitute(variable, &item)?;
703            let predicate_result = substituted_predicate.expand(graph)?;
704            if !Self::is_true_literal(&predicate_result) {
705                continue;
706            }
707
708            let substituted_projection = projection.substitute(variable, &item)?;
709            let projection_result = substituted_projection.expand(graph)?;
710            match projection_result {
711                Expression::Literal(value) => projected_values.push(value),
712                Expression::QuantityLiteral { value, unit } => {
713                    projected_values.push(serde_json::json!({
714                        "__quantity_value": value.to_string(),
715                        "__quantity_unit": unit,
716                    }));
717                }
718                _ => {
719                    return Err(
720                        "Projection in aggregation comprehension must reduce to a literal"
721                            .to_string(),
722                    );
723                }
724            }
725        }
726
727        match function {
728            AggregateFunction::Count => Ok(serde_json::json!(projected_values.len())),
729            AggregateFunction::Sum
730            | AggregateFunction::Avg
731            | AggregateFunction::Min
732            | AggregateFunction::Max => {
733                Self::fold_numeric(function, &projected_values, target_unit)
734            }
735        }
736    }
737
738    pub(crate) fn fold_numeric(
739        function: &AggregateFunction,
740        values: &[serde_json::Value],
741        target_unit: Option<&str>,
742    ) -> Result<serde_json::Value, String> {
743        use crate::units::UnitRegistry;
744
745        let mut decimals: Vec<Decimal> = Vec::new();
746        let mut source_unit: Option<String> = None;
747        for value in values {
748            if let Some(num) = value.as_f64() {
749                decimals.push(Decimal::from_str(&num.to_string()).map_err(|e| e.to_string())?);
750            } else if let Some(s) = value.as_str() {
751                decimals.push(Decimal::from_str(s).map_err(|e| e.to_string())?);
752            } else if value.is_object() {
753                let map = value
754                    .as_object()
755                    .ok_or_else(|| "Invalid quantity object".to_string())?;
756                if let Some(val) = map.get("__quantity_value") {
757                    let s = val
758                        .as_str()
759                        .ok_or_else(|| "Quantity value must be a string".to_string())?;
760                    decimals.push(Decimal::from_str(s).map_err(|e| e.to_string())?);
761                }
762                if let Some(unit) = map.get("__quantity_unit") {
763                    if source_unit.is_none() {
764                        if let Some(unit_str) = unit.as_str() {
765                            source_unit = Some(unit_str.to_string());
766                        }
767                    }
768                }
769            }
770        }
771
772        if decimals.is_empty() {
773            return Ok(serde_json::json!(null));
774        }
775
776        if let Some(target_unit) = target_unit {
777            if let Some(unit) = source_unit.clone() {
778                let registry = UnitRegistry::global();
779                let registry = registry
780                    .read()
781                    .map_err(|e| format!("Failed to lock unit registry: {}", e))?;
782                let from = registry
783                    .get_unit(&unit)
784                    .map_err(|e| format!("{}", e))?
785                    .clone();
786                let to = registry
787                    .get_unit(target_unit)
788                    .map_err(|e| format!("{}", e))?
789                    .clone();
790                decimals = decimals
791                    .into_iter()
792                    .map(|value| registry.convert(value, &from, &to))
793                    .collect::<Result<Vec<_>, _>>()
794                    .map_err(|e| format!("{}", e))?;
795            }
796        }
797
798        let result = match function {
799            AggregateFunction::Sum => decimals.iter().copied().sum(),
800            AggregateFunction::Avg => {
801                let sum: Decimal = decimals.iter().copied().sum();
802                sum / Decimal::from(decimals.len())
803            }
804            AggregateFunction::Min => decimals
805                .into_iter()
806                .min()
807                .ok_or_else(|| "No values available for min".to_string())?,
808            AggregateFunction::Max => decimals
809                .into_iter()
810                .max()
811                .ok_or_else(|| "No values available for max".to_string())?,
812            AggregateFunction::Count => Decimal::from(values.len() as i64),
813        };
814
815        let as_f64 = result
816            .to_f64()
817            .ok_or_else(|| format!("Failed to convert aggregated value {} to f64", result))?;
818
819        Ok(serde_json::json!(as_f64))
820    }
821
822    fn reduce_binary_expression(
823        op: &BinaryOp,
824        left: Expression,
825        right: Expression,
826    ) -> Result<Expression, String> {
827        match (&left, &right) {
828            (Expression::Literal(left_value), Expression::Literal(right_value)) => {
829                // Preserve tri-state semantics: if either operand is NULL, yield NULL.
830                if left_value.is_null() || right_value.is_null() {
831                    return Ok(Expression::Literal(serde_json::Value::Null));
832                }
833
834                let numeric_cmp = left_value.is_number() && right_value.is_number();
835                let reduced = match op {
836                    BinaryOp::Equal => {
837                        if numeric_cmp {
838                            let left_num = Self::value_to_f64(left_value)
839                                .ok_or_else(|| "Left operand is not numeric".to_string())?;
840                            let right_num = Self::value_to_f64(right_value)
841                                .ok_or_else(|| "Right operand is not numeric".to_string())?;
842                            serde_json::json!(left_num == right_num)
843                        } else {
844                            serde_json::json!(left_value == right_value)
845                        }
846                    }
847                    BinaryOp::NotEqual => {
848                        if numeric_cmp {
849                            let left_num = Self::value_to_f64(left_value)
850                                .ok_or_else(|| "Left operand is not numeric".to_string())?;
851                            let right_num = Self::value_to_f64(right_value)
852                                .ok_or_else(|| "Right operand is not numeric".to_string())?;
853                            serde_json::json!(left_num != right_num)
854                        } else {
855                            serde_json::json!(left_value != right_value)
856                        }
857                    }
858                    BinaryOp::GreaterThan
859                    | BinaryOp::LessThan
860                    | BinaryOp::GreaterThanOrEqual
861                    | BinaryOp::LessThanOrEqual => {
862                        let left_num = Self::value_to_f64(left_value)
863                            .ok_or_else(|| "Left operand is not numeric".to_string())?;
864                        let right_num = Self::value_to_f64(right_value)
865                            .ok_or_else(|| "Right operand is not numeric".to_string())?;
866                        match op {
867                            BinaryOp::GreaterThan => serde_json::json!(left_num > right_num),
868                            BinaryOp::LessThan => serde_json::json!(left_num < right_num),
869                            BinaryOp::GreaterThanOrEqual => {
870                                serde_json::json!(left_num >= right_num)
871                            }
872                            BinaryOp::LessThanOrEqual => serde_json::json!(left_num <= right_num),
873                            _ => unreachable!(),
874                        }
875                    }
876                    BinaryOp::And | BinaryOp::Or => {
877                        let left_bool = match left_value {
878                            serde_json::Value::Bool(b) => Some(*b),
879                            serde_json::Value::Null => None,
880                            _ => return Err("Left operand is not boolean".to_string()),
881                        };
882                        let right_bool = match right_value {
883                            serde_json::Value::Bool(b) => Some(*b),
884                            serde_json::Value::Null => None,
885                            _ => return Err("Right operand is not boolean".to_string()),
886                        };
887
888                        match op {
889                            BinaryOp::And => {
890                                if left_bool == Some(false) || right_bool == Some(false) {
891                                    serde_json::json!(false)
892                                } else if left_bool.is_none() || right_bool.is_none() {
893                                    serde_json::Value::Null
894                                } else {
895                                    serde_json::json!(left_bool.unwrap() && right_bool.unwrap())
896                                }
897                            }
898                            BinaryOp::Or => {
899                                if left_bool == Some(true) || right_bool == Some(true) {
900                                    serde_json::json!(true)
901                                } else if left_bool.is_none() || right_bool.is_none() {
902                                    serde_json::Value::Null
903                                } else {
904                                    serde_json::json!(left_bool.unwrap() || right_bool.unwrap())
905                                }
906                            }
907                            _ => unreachable!(),
908                        }
909                    }
910                    BinaryOp::Contains | BinaryOp::StartsWith | BinaryOp::EndsWith => {
911                        let left_str = left_value
912                            .as_str()
913                            .ok_or_else(|| "Left operand is not string".to_string())?;
914                        let right_str = right_value
915                            .as_str()
916                            .ok_or_else(|| "Right operand is not string".to_string())?;
917                        match op {
918                            BinaryOp::Contains => serde_json::json!(left_str.contains(right_str)),
919                            BinaryOp::StartsWith => {
920                                serde_json::json!(left_str.starts_with(right_str))
921                            }
922                            BinaryOp::EndsWith => serde_json::json!(left_str.ends_with(right_str)),
923                            _ => unreachable!(),
924                        }
925                    }
926                    _ => return Ok(Expression::binary(op.clone(), left.clone(), right.clone())),
927                };
928                Ok(Expression::Literal(reduced))
929            }
930            _ => Ok(Expression::binary(op.clone(), left, right)),
931        }
932    }
933
934    fn value_to_f64(value: &serde_json::Value) -> Option<f64> {
935        value
936            .as_f64()
937            .or_else(|| value.as_i64().map(|v| v as f64))
938            .or_else(|| value.as_u64().map(|v| v as f64))
939    }
940}