1use std::collections::HashMap;
10
11use arrow_array::RecordBatch;
12use uni_common::Value;
13use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr, UnaryOp};
14use uni_cypher::locy_ast::{LocyBinaryOp, LocyExpr};
15use uni_locy::{FactRow, LocyError};
16
17pub fn eval_locy_expr(
20 expr: &LocyExpr,
21 bindings: &FactRow,
22 prev_values: Option<&FactRow>,
23) -> Result<Value, LocyError> {
24 match expr {
25 LocyExpr::PrevRef(field) => Ok(prev_values
26 .and_then(|prev| prev.get(field).cloned())
27 .unwrap_or(Value::Null)),
28 LocyExpr::Cypher(cypher_expr) => eval_expr(cypher_expr, bindings),
29 LocyExpr::BinaryOp { left, op, right } => {
30 let l = eval_locy_expr(left, bindings, prev_values)?;
31 let r = eval_locy_expr(right, bindings, prev_values)?;
32 eval_locy_binary_op(&l, op, &r)
33 }
34 LocyExpr::UnaryOp(op, inner) => {
35 let v = eval_locy_expr(inner, bindings, prev_values)?;
36 eval_unary_op(op, &v)
37 }
38 }
39}
40
41pub fn eval_expr(expr: &Expr, bindings: &FactRow) -> Result<Value, LocyError> {
43 match expr {
44 Expr::Literal(lit) => Ok(literal_to_value(lit)),
45 Expr::Variable(name) => Ok(bindings.get(name).cloned().unwrap_or(Value::Null)),
46 Expr::Property(expr, property) => {
47 let base = eval_expr(expr, bindings)?;
48 Ok(get_property(&base, property))
49 }
50 Expr::BinaryOp { left, op, right } => {
51 let l = eval_expr(left, bindings)?;
52 let r = eval_expr(right, bindings)?;
53 eval_binary_op(&l, op, &r)
54 }
55 Expr::UnaryOp { op, expr } => {
56 let v = eval_expr(expr, bindings)?;
57 eval_unary_op(op, &v)
58 }
59 Expr::FunctionCall { name, args, .. } => {
60 let evaluated_args: Result<Vec<Value>, _> =
61 args.iter().map(|a| eval_expr(a, bindings)).collect();
62 eval_function(name, &evaluated_args?)
63 }
64 Expr::Parameter(name) => Ok(bindings.get(name).cloned().unwrap_or(Value::Null)),
65 Expr::IsNull(inner) => {
66 let v = eval_expr(inner, bindings)?;
67 Ok(Value::Bool(v.is_null()))
68 }
69 Expr::IsNotNull(inner) => {
70 let v = eval_expr(inner, bindings)?;
71 Ok(Value::Bool(!v.is_null()))
72 }
73 Expr::List(items) => {
74 let vals: Result<Vec<Value>, _> =
75 items.iter().map(|i| eval_expr(i, bindings)).collect();
76 Ok(Value::List(vals?))
77 }
78 Expr::Map(entries) => {
79 let mut map = HashMap::new();
80 for (k, v) in entries {
81 map.insert(k.clone(), eval_expr(v, bindings)?);
82 }
83 Ok(Value::Map(map))
84 }
85 _ => Err(LocyError::EvaluationError {
86 message: format!("unsupported expression in in-memory evaluation: {expr:?}"),
87 }),
88 }
89}
90
91pub fn eval_aggregate_over_group(
93 func_name: &str,
94 arg_expr: &Expr,
95 group: &[FactRow],
96 rule_name: &str,
97 fold_name: &str,
98) -> Result<Value, LocyError> {
99 let upper = func_name.to_uppercase();
100 match upper.as_str() {
101 "SUM" => {
102 let mut total = 0.0_f64;
103 for row in group {
104 let v = eval_expr(arg_expr, row)?;
105 if let Some(f) = v.as_f64() {
106 total += f;
107 }
108 }
109 if total == total.floor() && total.abs() < i64::MAX as f64 {
110 Ok(Value::Int(total as i64))
111 } else {
112 Ok(Value::Float(total))
113 }
114 }
115 "MSUM" => {
116 let mut total = 0.0_f64;
117 for row in group {
118 let v = eval_expr(arg_expr, row)?;
119 if let Some(f) = v.as_f64() {
120 if f < 0.0 {
121 return Err(LocyError::MsumNegativeValue {
122 rule: rule_name.to_string(),
123 fold: fold_name.to_string(),
124 value: f,
125 });
126 }
127 total += f;
128 }
129 }
130 if total == total.floor() && total.abs() < i64::MAX as f64 {
131 Ok(Value::Int(total as i64))
132 } else {
133 Ok(Value::Float(total))
134 }
135 }
136 "COUNT" | "MCOUNT" => {
137 let count = group
138 .iter()
139 .filter(|row| {
140 eval_expr(arg_expr, row)
141 .map(|v| !v.is_null())
142 .unwrap_or(false)
143 })
144 .count();
145 Ok(Value::Int(count as i64))
146 }
147 "MIN" | "MMIN" => {
148 let mut min_val: Option<Value> = None;
149 for row in group {
150 let v = eval_expr(arg_expr, row)?;
151 if v.is_null() {
152 continue;
153 }
154 min_val = Some(match min_val {
155 None => v,
156 Some(cur) => {
157 if value_less_than(&v, &cur) {
158 v
159 } else {
160 cur
161 }
162 }
163 });
164 }
165 Ok(min_val.unwrap_or(Value::Null))
166 }
167 "MAX" | "MMAX" => {
168 let mut max_val: Option<Value> = None;
169 for row in group {
170 let v = eval_expr(arg_expr, row)?;
171 if v.is_null() {
172 continue;
173 }
174 max_val = Some(match max_val {
175 None => v,
176 Some(cur) => {
177 if value_less_than(&cur, &v) {
178 v
179 } else {
180 cur
181 }
182 }
183 });
184 }
185 Ok(max_val.unwrap_or(Value::Null))
186 }
187 "AVG" => {
188 let mut total = 0.0_f64;
189 let mut count = 0;
190 for row in group {
191 let v = eval_expr(arg_expr, row)?;
192 if let Some(f) = v.as_f64() {
193 total += f;
194 count += 1;
195 }
196 }
197 if count == 0 {
198 Ok(Value::Null)
199 } else {
200 Ok(Value::Float(total / count as f64))
201 }
202 }
203 "COLLECT" => {
204 let mut vals = Vec::new();
205 for row in group {
206 let v = eval_expr(arg_expr, row)?;
207 if !v.is_null() {
208 vals.push(v);
209 }
210 }
211 Ok(Value::List(vals))
212 }
213 _ => Err(LocyError::EvaluationError {
214 message: format!("unknown aggregate function: {func_name}"),
215 }),
216 }
217}
218
219pub(crate) fn literal_to_value(lit: &CypherLiteral) -> Value {
220 match lit {
221 CypherLiteral::Null => Value::Null,
222 CypherLiteral::Bool(b) => Value::Bool(*b),
223 CypherLiteral::Integer(i) => Value::Int(*i),
224 CypherLiteral::Float(f) => Value::Float(*f),
225 CypherLiteral::String(s) => Value::String(s.clone()),
226 CypherLiteral::Bytes(b) => Value::Bytes(b.clone()),
227 }
228}
229
230fn get_property(value: &Value, property: &str) -> Value {
231 match value {
232 Value::Node(n) => n.properties.get(property).cloned().unwrap_or(Value::Null),
233 Value::Edge(e) => e.properties.get(property).cloned().unwrap_or(Value::Null),
234 Value::Map(m) => m.get(property).cloned().unwrap_or(Value::Null),
235 Value::Temporal(uni_common::TemporalValue::Duration { .. }) => {
242 crate::query::datetime::eval_duration_accessor(&value.to_string(), property)
243 .unwrap_or(Value::Null)
244 }
245 Value::Temporal(_) => crate::query::datetime::eval_temporal_accessor_value(value, property)
246 .unwrap_or(Value::Null),
247 _ => Value::Null,
248 }
249}
250
251fn eval_unary_op(op: &UnaryOp, v: &Value) -> Result<Value, LocyError> {
256 match op {
257 UnaryOp::Not => match v {
258 Value::Bool(b) => Ok(Value::Bool(!b)),
259 Value::Null => Ok(Value::Null),
260 _ => Err(LocyError::TypeError {
261 message: format!("NOT requires boolean, got {v:?}"),
262 }),
263 },
264 UnaryOp::Neg => match v {
265 Value::Int(i) => Ok(Value::Int(-i)),
266 Value::Float(f) => Ok(Value::Float(-f)),
267 Value::Null => Ok(Value::Null),
268 _ => Err(LocyError::TypeError {
269 message: format!("negation requires numeric, got {v:?}"),
270 }),
271 },
272 }
273}
274
275fn eval_locy_binary_op(left: &Value, op: &LocyBinaryOp, right: &Value) -> Result<Value, LocyError> {
276 if left.is_null() || right.is_null() {
277 return Ok(Value::Null);
278 }
279 match op {
280 LocyBinaryOp::Add => numeric_op(left, right, |a, b| a + b, |a, b| a + b),
281 LocyBinaryOp::Sub => numeric_op(left, right, |a, b| a - b, |a, b| a - b),
282 LocyBinaryOp::Mul => numeric_op(left, right, |a, b| a * b, |a, b| a * b),
283 LocyBinaryOp::Div => {
284 let r = right.as_f64().unwrap_or(0.0);
285 if r == 0.0 {
286 return Err(LocyError::EvaluationError {
287 message: "division by zero".to_string(),
288 });
289 }
290 numeric_op(left, right, |a, b| a / b, |a, b| a / b)
291 }
292 LocyBinaryOp::Mod => numeric_op(left, right, |a, b| a % b, |a, b| a % b),
293 LocyBinaryOp::Pow => {
294 let l = left.as_f64().ok_or_else(|| LocyError::TypeError {
295 message: format!("pow requires numeric, got {left:?}"),
296 })?;
297 let r = right.as_f64().ok_or_else(|| LocyError::TypeError {
298 message: format!("pow requires numeric, got {right:?}"),
299 })?;
300 Ok(Value::Float(l.powf(r)))
301 }
302 LocyBinaryOp::And => match (left.as_bool(), right.as_bool()) {
303 (Some(a), Some(b)) => Ok(Value::Bool(a && b)),
304 _ => Ok(Value::Null),
305 },
306 LocyBinaryOp::Or => match (left.as_bool(), right.as_bool()) {
307 (Some(a), Some(b)) => Ok(Value::Bool(a || b)),
308 _ => Ok(Value::Null),
309 },
310 LocyBinaryOp::Xor => match (left.as_bool(), right.as_bool()) {
311 (Some(a), Some(b)) => Ok(Value::Bool(a ^ b)),
312 _ => Ok(Value::Null),
313 },
314 }
315}
316
317fn eval_binary_op(left: &Value, op: &BinaryOp, right: &Value) -> Result<Value, LocyError> {
318 if left.is_null() || right.is_null() {
319 return match op {
320 BinaryOp::Eq => Ok(Value::Bool(left.is_null() && right.is_null())),
321 BinaryOp::NotEq => Ok(Value::Bool(!(left.is_null() && right.is_null()))),
322 _ => Ok(Value::Null),
323 };
324 }
325 match op {
326 BinaryOp::Add => numeric_op(left, right, |a, b| a + b, |a, b| a + b),
327 BinaryOp::Sub => numeric_op(left, right, |a, b| a - b, |a, b| a - b),
328 BinaryOp::Mul => numeric_op(left, right, |a, b| a * b, |a, b| a * b),
329 BinaryOp::Div => numeric_op(left, right, |a, b| a / b, |a, b| a / b),
330 BinaryOp::Mod => numeric_op(left, right, |a, b| a % b, |a, b| a % b),
331 BinaryOp::Pow => {
332 let l = left.as_f64().unwrap_or(0.0);
333 let r = right.as_f64().unwrap_or(0.0);
334 Ok(Value::Float(l.powf(r)))
335 }
336 BinaryOp::Eq => Ok(Value::Bool(values_equal(left, right))),
337 BinaryOp::NotEq => Ok(Value::Bool(!values_equal(left, right))),
338 BinaryOp::Lt => Ok(Value::Bool(value_less_than(left, right))),
339 BinaryOp::LtEq => Ok(Value::Bool(
340 value_less_than(left, right) || values_equal(left, right),
341 )),
342 BinaryOp::Gt => Ok(Value::Bool(value_less_than(right, left))),
343 BinaryOp::GtEq => Ok(Value::Bool(
344 value_less_than(right, left) || values_equal(left, right),
345 )),
346 BinaryOp::And => match (left.as_bool(), right.as_bool()) {
347 (Some(a), Some(b)) => Ok(Value::Bool(a && b)),
348 _ => Ok(Value::Null),
349 },
350 BinaryOp::Or => match (left.as_bool(), right.as_bool()) {
351 (Some(a), Some(b)) => Ok(Value::Bool(a || b)),
352 _ => Ok(Value::Null),
353 },
354 BinaryOp::Xor => match (left.as_bool(), right.as_bool()) {
355 (Some(a), Some(b)) => Ok(Value::Bool(a ^ b)),
356 _ => Ok(Value::Null),
357 },
358 BinaryOp::Contains => match (left.as_str(), right.as_str()) {
359 (Some(l), Some(r)) => Ok(Value::Bool(l.contains(r))),
360 _ => Ok(Value::Null),
361 },
362 BinaryOp::StartsWith => match (left.as_str(), right.as_str()) {
363 (Some(l), Some(r)) => Ok(Value::Bool(l.starts_with(r))),
364 _ => Ok(Value::Null),
365 },
366 BinaryOp::EndsWith => match (left.as_str(), right.as_str()) {
367 (Some(l), Some(r)) => Ok(Value::Bool(l.ends_with(r))),
368 _ => Ok(Value::Null),
369 },
370 _ => Err(LocyError::EvaluationError {
371 message: format!("unsupported binary op in in-memory evaluation: {op:?}"),
372 }),
373 }
374}
375
376fn numeric_op(
377 left: &Value,
378 right: &Value,
379 int_op: impl Fn(i64, i64) -> i64,
380 float_op: impl Fn(f64, f64) -> f64,
381) -> Result<Value, LocyError> {
382 match (left, right) {
383 (Value::Int(a), Value::Int(b)) => Ok(Value::Int(int_op(*a, *b))),
384 _ => {
385 let a = left.as_f64().ok_or_else(|| LocyError::TypeError {
386 message: format!("numeric op requires number, got {left:?}"),
387 })?;
388 let b = right.as_f64().ok_or_else(|| LocyError::TypeError {
389 message: format!("numeric op requires number, got {right:?}"),
390 })?;
391 Ok(Value::Float(float_op(a, b)))
392 }
393 }
394}
395
396fn eval_function(name: &str, args: &[Value]) -> Result<Value, LocyError> {
397 let upper = name.to_uppercase();
398 match upper.as_str() {
399 "TOINTEGER" | "TOINT" => {
400 let v = args.first().unwrap_or(&Value::Null);
401 match v {
402 Value::Int(i) => Ok(Value::Int(*i)),
403 Value::Float(f) => Ok(Value::Int(*f as i64)),
404 Value::String(s) => {
405 s.parse::<i64>()
406 .map(Value::Int)
407 .map_err(|_| LocyError::TypeError {
408 message: format!("cannot convert '{s}' to integer"),
409 })
410 }
411 _ => Ok(Value::Null),
412 }
413 }
414 "TOFLOAT" => {
415 let v = args.first().unwrap_or(&Value::Null);
416 match v {
417 Value::Float(f) => Ok(Value::Float(*f)),
418 Value::Int(i) => Ok(Value::Float(*i as f64)),
419 Value::String(s) => {
420 s.parse::<f64>()
421 .map(Value::Float)
422 .map_err(|_| LocyError::TypeError {
423 message: format!("cannot convert '{s}' to float"),
424 })
425 }
426 _ => Ok(Value::Null),
427 }
428 }
429 "TOSTRING" => {
430 let v = args.first().unwrap_or(&Value::Null);
431 match v {
432 Value::String(s) => Ok(Value::String(s.clone())),
433 Value::Int(i) => Ok(Value::String(i.to_string())),
434 Value::Float(f) => Ok(Value::String(f.to_string())),
435 Value::Bool(b) => Ok(Value::String(b.to_string())),
436 Value::Null => Ok(Value::Null),
437 _ => Ok(Value::String(format!("{v:?}"))),
438 }
439 }
440 "ABS" => {
441 let v = args.first().unwrap_or(&Value::Null);
442 match v {
443 Value::Int(i) => Ok(Value::Int(i.abs())),
444 Value::Float(f) => Ok(Value::Float(f.abs())),
445 _ => Ok(Value::Null),
446 }
447 }
448 "COALESCE" => {
449 for a in args {
450 if !a.is_null() {
451 return Ok(a.clone());
452 }
453 }
454 Ok(Value::Null)
455 }
456 "SIMILAR_TO" | "VECTOR_SIMILARITY" => {
457 if args.len() < 2 {
458 return Err(LocyError::EvaluationError {
459 message: format!("{name} requires at least 2 arguments"),
460 });
461 }
462 crate::query::similar_to::eval_similar_to_pure(&args[0], &args[1]).map_err(|e| {
466 LocyError::EvaluationError {
467 message: e.to_string(),
468 }
469 })
470 }
471 _ => crate::query::expr_eval::eval_scalar_function(name, args, None).map_err(|e| {
476 LocyError::EvaluationError {
477 message: e.to_string(),
478 }
479 }),
480 }
481}
482
483pub fn values_equal(a: &Value, b: &Value) -> bool {
485 match (a, b) {
486 (Value::Int(x), Value::Float(y)) => (*x as f64) == *y,
487 (Value::Float(x), Value::Int(y)) => *x == (*y as f64),
488 _ => a == b,
489 }
490}
491
492pub fn values_equal_for_join(a: &Value, b: &Value) -> bool {
500 match (a, b) {
501 (Value::Node(na), Value::Node(nb)) => na.vid == nb.vid,
502 (Value::Edge(ea), Value::Edge(eb)) => ea.eid == eb.eid,
503 _ => values_equal(a, b),
504 }
505}
506
507pub fn value_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
509 if value_less_than(a, b) {
510 std::cmp::Ordering::Less
511 } else if value_less_than(b, a) {
512 std::cmp::Ordering::Greater
513 } else {
514 std::cmp::Ordering::Equal
515 }
516}
517
518pub fn value_less_than(a: &Value, b: &Value) -> bool {
520 match (a, b) {
521 (Value::Int(x), Value::Int(y)) => x < y,
522 (Value::Float(x), Value::Float(y)) => x < y,
523 (Value::Int(x), Value::Float(y)) => (*x as f64) < *y,
524 (Value::Float(x), Value::Int(y)) => *x < (*y as f64),
525 (Value::String(x), Value::String(y)) => x < y,
526 _ => false,
527 }
528}
529
530pub fn value_compare(a: &Value, b: &Value, null_last: bool) -> std::cmp::Ordering {
532 use std::cmp::Ordering;
533 let null_order = if null_last {
534 Ordering::Greater
535 } else {
536 Ordering::Less
537 };
538 match (a.is_null(), b.is_null()) {
539 (true, true) => Ordering::Equal,
540 (true, false) => null_order,
541 (false, true) => null_order.reverse(),
542 (false, false) => value_cmp(a, b),
543 }
544}
545
546pub fn record_batches_to_locy_rows(batches: &[RecordBatch]) -> Vec<FactRow> {
555 let mut rows = Vec::new();
556 for batch in batches {
557 let schema = batch.schema();
558 for row_idx in 0..batch.num_rows() {
559 let mut row = HashMap::new();
560 for (col_idx, field) in schema.fields().iter().enumerate() {
561 if field.name().starts_with("__feat_") {
566 continue;
567 }
568 let column = batch.column(col_idx);
569 let data_type = if uni_common::core::schema::is_datetime_struct(field.data_type()) {
570 Some(&uni_common::DataType::DateTime)
571 } else if uni_common::core::schema::is_time_struct(field.data_type()) {
572 Some(&uni_common::DataType::Time)
573 } else {
574 None
575 };
576 let value = uni_store::storage::arrow_convert::arrow_to_value(
577 column.as_ref(),
578 row_idx,
579 data_type,
580 );
581 row.insert(field.name().clone(), value);
582 }
583 normalize_graph_row(&mut row);
584 rows.push(row);
585 }
586 }
587 rows
588}
589
590pub(crate) fn normalize_graph_row(row: &mut FactRow) {
599 let entity_vars: Vec<String> = row
602 .keys()
603 .filter(|k| {
604 !k.contains('.')
605 && match row.get(*k) {
606 Some(Value::Map(m)) => m.contains_key("_vid") || m.contains_key("_eid"),
607 _ => false,
608 }
609 })
610 .cloned()
611 .collect();
612
613 for var in &entity_vars {
614 let prefix = format!("{}.", var);
617 let helper_keys: Vec<String> = row
618 .keys()
619 .filter(|k| k.starts_with(&prefix))
620 .cloned()
621 .collect();
622 for key in &helper_keys {
623 let prop_name = &key[prefix.len()..];
624 if let Some(val) = row.get(key).cloned()
625 && let Some(Value::Map(m)) = row.get_mut(var)
626 {
627 m.entry(prop_name.to_string()).or_insert(val);
628 }
629 }
630 for key in helper_keys {
632 row.remove(&key);
633 }
634
635 if let Some(Value::Map(map)) = row.remove(var) {
637 row.insert(var.clone(), map_to_graph_entity(map));
638 }
639 }
640}
641
642fn map_to_graph_entity(map: HashMap<String, Value>) -> Value {
644 use uni_common::core::id::{Eid, Vid};
645 use uni_common::value::{Edge, Node};
646
647 if let Some(eid_val) = map.get("_eid") {
649 let eid = match eid_val {
650 Value::Int(i) => Eid::new(*i as u64),
651 _ => return Value::Map(map),
652 };
653 let edge_type = match map.get("_type") {
654 Some(Value::String(s)) => s.clone(),
655 _ => String::new(),
656 };
657 let src = match map.get("_src_vid") {
658 Some(Value::Int(i)) => Vid::new(*i as u64),
659 _ => Vid::new(0),
660 };
661 let dst = match map.get("_dst_vid") {
662 Some(Value::Int(i)) => Vid::new(*i as u64),
663 _ => Vid::new(0),
664 };
665 let properties = extract_properties_from_map(&map);
666 return Value::Edge(Edge {
667 eid,
668 edge_type,
669 src,
670 dst,
671 properties,
672 });
673 }
674
675 if let Some(vid_val) = map.get("_vid") {
677 let vid = match vid_val {
678 Value::Int(i) => Vid::new(*i as u64),
679 _ => return Value::Map(map),
680 };
681 let labels = match map.get("_labels") {
682 Some(Value::List(list)) => list
683 .iter()
684 .filter_map(|v| match v {
685 Value::String(s) => Some(s.clone()),
686 _ => None,
687 })
688 .collect(),
689 _ => Vec::new(),
690 };
691 let properties = extract_properties_from_map(&map);
692 return Value::Node(Node {
693 vid,
694 labels,
695 properties,
696 });
697 }
698
699 Value::Map(map)
700}
701
702fn extract_properties_from_map(map: &HashMap<String, Value>) -> HashMap<String, Value> {
708 let mut properties = HashMap::new();
709
710 if let Some(Value::Map(all_props)) = map.get("_all_props") {
712 for (k, v) in all_props {
713 properties.insert(k.clone(), v.clone());
714 }
715 }
716
717 for (k, v) in map {
719 if !k.starts_with('_') && k != "properties" {
720 properties.entry(k.clone()).or_insert_with(|| v.clone());
721 }
722 }
723
724 properties
725}