Skip to main content

uni_query/query/df_graph/
locy_eval.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! In-memory expression evaluation for Locy commands.
5//!
6//! Ported from `uni-locy/src/orchestrator/eval.rs`. Used by SLG, QUERY, EXPLAIN,
7//! ASSUME, ABDUCE, and DERIVE in the native command dispatch path.
8
9use std::collections::HashMap;
10
11use arrow_array::RecordBatch;
12use uni_common::Value;
13use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr, UnaryOp};
14use uni_cypher::locy_ast::{LocyBinaryOp, LocyExpr};
15use uni_locy::{FactRow, LocyError};
16
17/// Evaluate a Locy expression (which may contain prev references) given current bindings
18/// and optional previous-iteration values.
19pub fn eval_locy_expr(
20    expr: &LocyExpr,
21    bindings: &FactRow,
22    prev_values: Option<&FactRow>,
23) -> Result<Value, LocyError> {
24    match expr {
25        LocyExpr::PrevRef(field) => Ok(prev_values
26            .and_then(|prev| prev.get(field).cloned())
27            .unwrap_or(Value::Null)),
28        LocyExpr::Cypher(cypher_expr) => eval_expr(cypher_expr, bindings),
29        LocyExpr::BinaryOp { left, op, right } => {
30            let l = eval_locy_expr(left, bindings, prev_values)?;
31            let r = eval_locy_expr(right, bindings, prev_values)?;
32            eval_locy_binary_op(&l, op, &r)
33        }
34        LocyExpr::UnaryOp(op, inner) => {
35            let v = eval_locy_expr(inner, bindings, prev_values)?;
36            eval_unary_op(op, &v)
37        }
38    }
39}
40
41/// Evaluate a Cypher expression given variable bindings.
42pub fn eval_expr(expr: &Expr, bindings: &FactRow) -> Result<Value, LocyError> {
43    match expr {
44        Expr::Literal(lit) => Ok(literal_to_value(lit)),
45        Expr::Variable(name) => Ok(bindings.get(name).cloned().unwrap_or(Value::Null)),
46        Expr::Property(expr, property) => {
47            let base = eval_expr(expr, bindings)?;
48            Ok(get_property(&base, property))
49        }
50        Expr::BinaryOp { left, op, right } => {
51            let l = eval_expr(left, bindings)?;
52            let r = eval_expr(right, bindings)?;
53            eval_binary_op(&l, op, &r)
54        }
55        Expr::UnaryOp { op, expr } => {
56            let v = eval_expr(expr, bindings)?;
57            eval_unary_op(op, &v)
58        }
59        Expr::FunctionCall { name, args, .. } => {
60            let evaluated_args: Result<Vec<Value>, _> =
61                args.iter().map(|a| eval_expr(a, bindings)).collect();
62            eval_function(name, &evaluated_args?)
63        }
64        Expr::Parameter(name) => Ok(bindings.get(name).cloned().unwrap_or(Value::Null)),
65        Expr::IsNull(inner) => {
66            let v = eval_expr(inner, bindings)?;
67            Ok(Value::Bool(v.is_null()))
68        }
69        Expr::IsNotNull(inner) => {
70            let v = eval_expr(inner, bindings)?;
71            Ok(Value::Bool(!v.is_null()))
72        }
73        Expr::List(items) => {
74            let vals: Result<Vec<Value>, _> =
75                items.iter().map(|i| eval_expr(i, bindings)).collect();
76            Ok(Value::List(vals?))
77        }
78        Expr::Map(entries) => {
79            let mut map = HashMap::new();
80            for (k, v) in entries {
81                map.insert(k.clone(), eval_expr(v, bindings)?);
82            }
83            Ok(Value::Map(map))
84        }
85        _ => Err(LocyError::EvaluationError {
86            message: format!("unsupported expression in in-memory evaluation: {expr:?}"),
87        }),
88    }
89}
90
91/// Evaluate an aggregate function over a group of rows.
92pub fn eval_aggregate_over_group(
93    func_name: &str,
94    arg_expr: &Expr,
95    group: &[FactRow],
96    rule_name: &str,
97    fold_name: &str,
98) -> Result<Value, LocyError> {
99    let upper = func_name.to_uppercase();
100    match upper.as_str() {
101        "SUM" => {
102            let mut total = 0.0_f64;
103            for row in group {
104                let v = eval_expr(arg_expr, row)?;
105                if let Some(f) = v.as_f64() {
106                    total += f;
107                }
108            }
109            if total == total.floor() && total.abs() < i64::MAX as f64 {
110                Ok(Value::Int(total as i64))
111            } else {
112                Ok(Value::Float(total))
113            }
114        }
115        "MSUM" => {
116            let mut total = 0.0_f64;
117            for row in group {
118                let v = eval_expr(arg_expr, row)?;
119                if let Some(f) = v.as_f64() {
120                    if f < 0.0 {
121                        return Err(LocyError::MsumNegativeValue {
122                            rule: rule_name.to_string(),
123                            fold: fold_name.to_string(),
124                            value: f,
125                        });
126                    }
127                    total += f;
128                }
129            }
130            if total == total.floor() && total.abs() < i64::MAX as f64 {
131                Ok(Value::Int(total as i64))
132            } else {
133                Ok(Value::Float(total))
134            }
135        }
136        "COUNT" | "MCOUNT" => {
137            let count = group
138                .iter()
139                .filter(|row| {
140                    eval_expr(arg_expr, row)
141                        .map(|v| !v.is_null())
142                        .unwrap_or(false)
143                })
144                .count();
145            Ok(Value::Int(count as i64))
146        }
147        "MIN" | "MMIN" => {
148            let mut min_val: Option<Value> = None;
149            for row in group {
150                let v = eval_expr(arg_expr, row)?;
151                if v.is_null() {
152                    continue;
153                }
154                min_val = Some(match min_val {
155                    None => v,
156                    Some(cur) => {
157                        if value_less_than(&v, &cur) {
158                            v
159                        } else {
160                            cur
161                        }
162                    }
163                });
164            }
165            Ok(min_val.unwrap_or(Value::Null))
166        }
167        "MAX" | "MMAX" => {
168            let mut max_val: Option<Value> = None;
169            for row in group {
170                let v = eval_expr(arg_expr, row)?;
171                if v.is_null() {
172                    continue;
173                }
174                max_val = Some(match max_val {
175                    None => v,
176                    Some(cur) => {
177                        if value_less_than(&cur, &v) {
178                            v
179                        } else {
180                            cur
181                        }
182                    }
183                });
184            }
185            Ok(max_val.unwrap_or(Value::Null))
186        }
187        "AVG" => {
188            let mut total = 0.0_f64;
189            let mut count = 0;
190            for row in group {
191                let v = eval_expr(arg_expr, row)?;
192                if let Some(f) = v.as_f64() {
193                    total += f;
194                    count += 1;
195                }
196            }
197            if count == 0 {
198                Ok(Value::Null)
199            } else {
200                Ok(Value::Float(total / count as f64))
201            }
202        }
203        "COLLECT" => {
204            let mut vals = Vec::new();
205            for row in group {
206                let v = eval_expr(arg_expr, row)?;
207                if !v.is_null() {
208                    vals.push(v);
209                }
210            }
211            Ok(Value::List(vals))
212        }
213        _ => Err(LocyError::EvaluationError {
214            message: format!("unknown aggregate function: {func_name}"),
215        }),
216    }
217}
218
219pub(crate) fn literal_to_value(lit: &CypherLiteral) -> Value {
220    match lit {
221        CypherLiteral::Null => Value::Null,
222        CypherLiteral::Bool(b) => Value::Bool(*b),
223        CypherLiteral::Integer(i) => Value::Int(*i),
224        CypherLiteral::Float(f) => Value::Float(*f),
225        CypherLiteral::String(s) => Value::String(s.clone()),
226        CypherLiteral::Bytes(b) => Value::Bytes(b.clone()),
227    }
228}
229
230fn get_property(value: &Value, property: &str) -> Value {
231    match value {
232        Value::Node(n) => n.properties.get(property).cloned().unwrap_or(Value::Null),
233        Value::Edge(e) => e.properties.get(property).cloned().unwrap_or(Value::Null),
234        Value::Map(m) => m.get(property).cloned().unwrap_or(Value::Null),
235        // Temporal component accessors (`d.days`, `dt.year`, …) so that the
236        // in-memory evaluator matches Cypher. Without this, a Duration produced
237        // by `duration.inDays(...)` in a YIELD value column had `.days` resolve
238        // to Null in the SLG path (issue #111, decision C). Mirrors the
239        // `_duration_property` / `_temporal_property` UDFs used in the DataFusion
240        // path; an unknown component falls back to Null.
241        Value::Temporal(uni_common::TemporalValue::Duration { .. }) => {
242            crate::query::datetime::eval_duration_accessor(&value.to_string(), property)
243                .unwrap_or(Value::Null)
244        }
245        Value::Temporal(_) => crate::query::datetime::eval_temporal_accessor_value(value, property)
246            .unwrap_or(Value::Null),
247        _ => Value::Null,
248    }
249}
250
251/// Evaluate a unary operator on a value.
252///
253/// Shared by both `eval_locy_expr` and `eval_expr` to avoid duplicating
254/// NOT/negation logic.
255fn eval_unary_op(op: &UnaryOp, v: &Value) -> Result<Value, LocyError> {
256    match op {
257        UnaryOp::Not => match v {
258            Value::Bool(b) => Ok(Value::Bool(!b)),
259            Value::Null => Ok(Value::Null),
260            _ => Err(LocyError::TypeError {
261                message: format!("NOT requires boolean, got {v:?}"),
262            }),
263        },
264        UnaryOp::Neg => match v {
265            Value::Int(i) => Ok(Value::Int(-i)),
266            Value::Float(f) => Ok(Value::Float(-f)),
267            Value::Null => Ok(Value::Null),
268            _ => Err(LocyError::TypeError {
269                message: format!("negation requires numeric, got {v:?}"),
270            }),
271        },
272    }
273}
274
275fn eval_locy_binary_op(left: &Value, op: &LocyBinaryOp, right: &Value) -> Result<Value, LocyError> {
276    if left.is_null() || right.is_null() {
277        return Ok(Value::Null);
278    }
279    match op {
280        LocyBinaryOp::Add => numeric_op(left, right, |a, b| a + b, |a, b| a + b),
281        LocyBinaryOp::Sub => numeric_op(left, right, |a, b| a - b, |a, b| a - b),
282        LocyBinaryOp::Mul => numeric_op(left, right, |a, b| a * b, |a, b| a * b),
283        LocyBinaryOp::Div => {
284            let r = right.as_f64().unwrap_or(0.0);
285            if r == 0.0 {
286                return Err(LocyError::EvaluationError {
287                    message: "division by zero".to_string(),
288                });
289            }
290            numeric_op(left, right, |a, b| a / b, |a, b| a / b)
291        }
292        LocyBinaryOp::Mod => numeric_op(left, right, |a, b| a % b, |a, b| a % b),
293        LocyBinaryOp::Pow => {
294            let l = left.as_f64().ok_or_else(|| LocyError::TypeError {
295                message: format!("pow requires numeric, got {left:?}"),
296            })?;
297            let r = right.as_f64().ok_or_else(|| LocyError::TypeError {
298                message: format!("pow requires numeric, got {right:?}"),
299            })?;
300            Ok(Value::Float(l.powf(r)))
301        }
302        LocyBinaryOp::And => match (left.as_bool(), right.as_bool()) {
303            (Some(a), Some(b)) => Ok(Value::Bool(a && b)),
304            _ => Ok(Value::Null),
305        },
306        LocyBinaryOp::Or => match (left.as_bool(), right.as_bool()) {
307            (Some(a), Some(b)) => Ok(Value::Bool(a || b)),
308            _ => Ok(Value::Null),
309        },
310        LocyBinaryOp::Xor => match (left.as_bool(), right.as_bool()) {
311            (Some(a), Some(b)) => Ok(Value::Bool(a ^ b)),
312            _ => Ok(Value::Null),
313        },
314    }
315}
316
317fn eval_binary_op(left: &Value, op: &BinaryOp, right: &Value) -> Result<Value, LocyError> {
318    if left.is_null() || right.is_null() {
319        return match op {
320            BinaryOp::Eq => Ok(Value::Bool(left.is_null() && right.is_null())),
321            BinaryOp::NotEq => Ok(Value::Bool(!(left.is_null() && right.is_null()))),
322            _ => Ok(Value::Null),
323        };
324    }
325    match op {
326        BinaryOp::Add => numeric_op(left, right, |a, b| a + b, |a, b| a + b),
327        BinaryOp::Sub => numeric_op(left, right, |a, b| a - b, |a, b| a - b),
328        BinaryOp::Mul => numeric_op(left, right, |a, b| a * b, |a, b| a * b),
329        BinaryOp::Div => numeric_op(left, right, |a, b| a / b, |a, b| a / b),
330        BinaryOp::Mod => numeric_op(left, right, |a, b| a % b, |a, b| a % b),
331        BinaryOp::Pow => {
332            let l = left.as_f64().unwrap_or(0.0);
333            let r = right.as_f64().unwrap_or(0.0);
334            Ok(Value::Float(l.powf(r)))
335        }
336        BinaryOp::Eq => Ok(Value::Bool(values_equal(left, right))),
337        BinaryOp::NotEq => Ok(Value::Bool(!values_equal(left, right))),
338        BinaryOp::Lt => Ok(Value::Bool(value_less_than(left, right))),
339        BinaryOp::LtEq => Ok(Value::Bool(
340            value_less_than(left, right) || values_equal(left, right),
341        )),
342        BinaryOp::Gt => Ok(Value::Bool(value_less_than(right, left))),
343        BinaryOp::GtEq => Ok(Value::Bool(
344            value_less_than(right, left) || values_equal(left, right),
345        )),
346        BinaryOp::And => match (left.as_bool(), right.as_bool()) {
347            (Some(a), Some(b)) => Ok(Value::Bool(a && b)),
348            _ => Ok(Value::Null),
349        },
350        BinaryOp::Or => match (left.as_bool(), right.as_bool()) {
351            (Some(a), Some(b)) => Ok(Value::Bool(a || b)),
352            _ => Ok(Value::Null),
353        },
354        BinaryOp::Xor => match (left.as_bool(), right.as_bool()) {
355            (Some(a), Some(b)) => Ok(Value::Bool(a ^ b)),
356            _ => Ok(Value::Null),
357        },
358        BinaryOp::Contains => match (left.as_str(), right.as_str()) {
359            (Some(l), Some(r)) => Ok(Value::Bool(l.contains(r))),
360            _ => Ok(Value::Null),
361        },
362        BinaryOp::StartsWith => match (left.as_str(), right.as_str()) {
363            (Some(l), Some(r)) => Ok(Value::Bool(l.starts_with(r))),
364            _ => Ok(Value::Null),
365        },
366        BinaryOp::EndsWith => match (left.as_str(), right.as_str()) {
367            (Some(l), Some(r)) => Ok(Value::Bool(l.ends_with(r))),
368            _ => Ok(Value::Null),
369        },
370        _ => Err(LocyError::EvaluationError {
371            message: format!("unsupported binary op in in-memory evaluation: {op:?}"),
372        }),
373    }
374}
375
376fn numeric_op(
377    left: &Value,
378    right: &Value,
379    int_op: impl Fn(i64, i64) -> i64,
380    float_op: impl Fn(f64, f64) -> f64,
381) -> Result<Value, LocyError> {
382    match (left, right) {
383        (Value::Int(a), Value::Int(b)) => Ok(Value::Int(int_op(*a, *b))),
384        _ => {
385            let a = left.as_f64().ok_or_else(|| LocyError::TypeError {
386                message: format!("numeric op requires number, got {left:?}"),
387            })?;
388            let b = right.as_f64().ok_or_else(|| LocyError::TypeError {
389                message: format!("numeric op requires number, got {right:?}"),
390            })?;
391            Ok(Value::Float(float_op(a, b)))
392        }
393    }
394}
395
396fn eval_function(name: &str, args: &[Value]) -> Result<Value, LocyError> {
397    let upper = name.to_uppercase();
398    match upper.as_str() {
399        "TOINTEGER" | "TOINT" => {
400            let v = args.first().unwrap_or(&Value::Null);
401            match v {
402                Value::Int(i) => Ok(Value::Int(*i)),
403                Value::Float(f) => Ok(Value::Int(*f as i64)),
404                Value::String(s) => {
405                    s.parse::<i64>()
406                        .map(Value::Int)
407                        .map_err(|_| LocyError::TypeError {
408                            message: format!("cannot convert '{s}' to integer"),
409                        })
410                }
411                _ => Ok(Value::Null),
412            }
413        }
414        "TOFLOAT" => {
415            let v = args.first().unwrap_or(&Value::Null);
416            match v {
417                Value::Float(f) => Ok(Value::Float(*f)),
418                Value::Int(i) => Ok(Value::Float(*i as f64)),
419                Value::String(s) => {
420                    s.parse::<f64>()
421                        .map(Value::Float)
422                        .map_err(|_| LocyError::TypeError {
423                            message: format!("cannot convert '{s}' to float"),
424                        })
425                }
426                _ => Ok(Value::Null),
427            }
428        }
429        "TOSTRING" => {
430            let v = args.first().unwrap_or(&Value::Null);
431            match v {
432                Value::String(s) => Ok(Value::String(s.clone())),
433                Value::Int(i) => Ok(Value::String(i.to_string())),
434                Value::Float(f) => Ok(Value::String(f.to_string())),
435                Value::Bool(b) => Ok(Value::String(b.to_string())),
436                Value::Null => Ok(Value::Null),
437                _ => Ok(Value::String(format!("{v:?}"))),
438            }
439        }
440        "ABS" => {
441            let v = args.first().unwrap_or(&Value::Null);
442            match v {
443                Value::Int(i) => Ok(Value::Int(i.abs())),
444                Value::Float(f) => Ok(Value::Float(f.abs())),
445                _ => Ok(Value::Null),
446            }
447        }
448        "COALESCE" => {
449            for a in args {
450                if !a.is_null() {
451                    return Ok(a.clone());
452                }
453            }
454            Ok(Value::Null)
455        }
456        "SIMILAR_TO" | "VECTOR_SIMILARITY" => {
457            if args.len() < 2 {
458                return Err(LocyError::EvaluationError {
459                    message: format!("{name} requires at least 2 arguments"),
460                });
461            }
462            // In Locy context, handle pure vector-vector case directly.
463            // Storage-dependent cases (auto-embed, FTS) are not available
464            // in the Locy in-memory evaluator.
465            crate::query::similar_to::eval_similar_to_pure(&args[0], &args[1]).map_err(|e| {
466                LocyError::EvaluationError {
467                    message: e.to_string(),
468                }
469            })
470        }
471        // Delegate to the full Cypher scalar function evaluator so that every
472        // function available in Cypher (temporal, math, string, spatial, …) is
473        // automatically available in Locy. Both sides use uni_common::Value, so
474        // no type conversion is needed.
475        _ => crate::query::expr_eval::eval_scalar_function(name, args, None).map_err(|e| {
476            LocyError::EvaluationError {
477                message: e.to_string(),
478            }
479        }),
480    }
481}
482
483/// Compare two values for equality (Cypher semantics).
484pub fn values_equal(a: &Value, b: &Value) -> bool {
485    match (a, b) {
486        (Value::Int(x), Value::Float(y)) => (*x as f64) == *y,
487        (Value::Float(x), Value::Int(y)) => *x == (*y as f64),
488        _ => a == b,
489    }
490}
491
492/// Compare two values for join equality in IS-ref matching.
493///
494/// For graph entities (`Value::Node`, `Value::Edge`), compares by identity
495/// (VID/EID) rather than full structural equality. This is necessary because
496/// the same node may have different property sets across different query
497/// executions (e.g., schema mode adds `overflow_json: Null` in some paths
498/// but not others). For non-graph values, falls back to `values_equal`.
499pub fn values_equal_for_join(a: &Value, b: &Value) -> bool {
500    match (a, b) {
501        (Value::Node(na), Value::Node(nb)) => na.vid == nb.vid,
502        (Value::Edge(ea), Value::Edge(eb)) => ea.eid == eb.eid,
503        _ => values_equal(a, b),
504    }
505}
506
507/// Compare two values returning an Ordering.
508pub fn value_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
509    if value_less_than(a, b) {
510        std::cmp::Ordering::Less
511    } else if value_less_than(b, a) {
512        std::cmp::Ordering::Greater
513    } else {
514        std::cmp::Ordering::Equal
515    }
516}
517
518/// Compare two values for ordering (less than).
519pub fn value_less_than(a: &Value, b: &Value) -> bool {
520    match (a, b) {
521        (Value::Int(x), Value::Int(y)) => x < y,
522        (Value::Float(x), Value::Float(y)) => x < y,
523        (Value::Int(x), Value::Float(y)) => (*x as f64) < *y,
524        (Value::Float(x), Value::Int(y)) => *x < (*y as f64),
525        (Value::String(x), Value::String(y)) => x < y,
526        _ => false,
527    }
528}
529
530/// Compare two values with NULL handling (NULLS LAST, matching Cypher semantics).
531pub fn value_compare(a: &Value, b: &Value, null_last: bool) -> std::cmp::Ordering {
532    use std::cmp::Ordering;
533    let null_order = if null_last {
534        Ordering::Greater
535    } else {
536        Ordering::Less
537    };
538    match (a.is_null(), b.is_null()) {
539        (true, true) => Ordering::Equal,
540        (true, false) => null_order,
541        (false, true) => null_order.reverse(),
542        (false, false) => value_cmp(a, b),
543    }
544}
545
546/// Convert a slice of Arrow RecordBatches into a vector of Locy rows (HashMap<String, Value>).
547///
548/// Handles DateTime and Time struct types via `uni_common` schema helpers so that
549/// temporal values round-trip correctly through the Arrow → Value conversion.
550///
551/// Node/edge struct columns (`_vid`/`_labels`/`_all_props`) are normalized to
552/// `Value::Node` / `Value::Edge` and dotted helper columns (e.g. `a._vid`) are
553/// stripped, matching the behaviour of `Executor::record_batches_to_rows`.
554pub fn record_batches_to_locy_rows(batches: &[RecordBatch]) -> Vec<FactRow> {
555    let mut rows = Vec::new();
556    for batch in batches {
557        let schema = batch.schema();
558        for row_idx in 0..batch.num_rows() {
559            let mut row = HashMap::new();
560            for (col_idx, field) in schema.fields().iter().enumerate() {
561                // Phase B Slice 3 (post-Slice-3 follow-up): strip
562                // synthetic property-feature columns (`__feat_*`)
563                // emitted by `extract_model_invocations` so they
564                // don't leak into user-visible FactRows.
565                if field.name().starts_with("__feat_") {
566                    continue;
567                }
568                let column = batch.column(col_idx);
569                let data_type = if uni_common::core::schema::is_datetime_struct(field.data_type()) {
570                    Some(&uni_common::DataType::DateTime)
571                } else if uni_common::core::schema::is_time_struct(field.data_type()) {
572                    Some(&uni_common::DataType::Time)
573                } else {
574                    None
575                };
576                let value = uni_store::storage::arrow_convert::arrow_to_value(
577                    column.as_ref(),
578                    row_idx,
579                    data_type,
580                );
581                row.insert(field.name().clone(), value);
582            }
583            normalize_graph_row(&mut row);
584            rows.push(row);
585        }
586    }
587    rows
588}
589
590/// Post-process a raw Arrow-converted row so that graph entities are represented
591/// as `Value::Node` / `Value::Edge` and dotted helper columns are removed.
592///
593/// RecordBatches from graph scans emit both a bare struct column (e.g. `a`) and
594/// exploded helper columns (`a._vid`, `a._labels`, `a._all_props`). The bare
595/// column is `Value::Map({_vid, _labels, _all_props})` after `arrow_to_value`.
596/// This function detects these maps and converts them to proper `Value::Node` or
597/// `Value::Edge`, then strips the helpers.
598pub(crate) fn normalize_graph_row(row: &mut FactRow) {
599    // Detect bare graph-entity variables: keys without '.' that are Map values
600    // containing the internal `_vid` or `_eid` field.
601    let entity_vars: Vec<String> = row
602        .keys()
603        .filter(|k| {
604            !k.contains('.')
605                && match row.get(*k) {
606                    Some(Value::Map(m)) => m.contains_key("_vid") || m.contains_key("_eid"),
607                    _ => false,
608                }
609        })
610        .cloned()
611        .collect();
612
613    for var in &entity_vars {
614        // Merge any dotted helper columns into the bare map (they should already
615        // be present from the struct, but merge to be safe).
616        let prefix = format!("{}.", var);
617        let helper_keys: Vec<String> = row
618            .keys()
619            .filter(|k| k.starts_with(&prefix))
620            .cloned()
621            .collect();
622        for key in &helper_keys {
623            let prop_name = &key[prefix.len()..];
624            if let Some(val) = row.get(key).cloned()
625                && let Some(Value::Map(m)) = row.get_mut(var)
626            {
627                m.entry(prop_name.to_string()).or_insert(val);
628            }
629        }
630        // Remove dotted helpers
631        for key in helper_keys {
632            row.remove(&key);
633        }
634
635        // Convert map → Value::Node or Value::Edge
636        if let Some(Value::Map(map)) = row.remove(var) {
637            row.insert(var.clone(), map_to_graph_entity(map));
638        }
639    }
640}
641
642/// Convert a map with internal graph fields to `Value::Node` or `Value::Edge`.
643fn map_to_graph_entity(map: HashMap<String, Value>) -> Value {
644    use uni_common::core::id::{Eid, Vid};
645    use uni_common::value::{Edge, Node};
646
647    // Edge: has _eid
648    if let Some(eid_val) = map.get("_eid") {
649        let eid = match eid_val {
650            Value::Int(i) => Eid::new(*i as u64),
651            _ => return Value::Map(map),
652        };
653        let edge_type = match map.get("_type") {
654            Some(Value::String(s)) => s.clone(),
655            _ => String::new(),
656        };
657        let src = match map.get("_src_vid") {
658            Some(Value::Int(i)) => Vid::new(*i as u64),
659            _ => Vid::new(0),
660        };
661        let dst = match map.get("_dst_vid") {
662            Some(Value::Int(i)) => Vid::new(*i as u64),
663            _ => Vid::new(0),
664        };
665        let properties = extract_properties_from_map(&map);
666        return Value::Edge(Edge {
667            eid,
668            edge_type,
669            src,
670            dst,
671            properties,
672        });
673    }
674
675    // Node: has _vid
676    if let Some(vid_val) = map.get("_vid") {
677        let vid = match vid_val {
678            Value::Int(i) => Vid::new(*i as u64),
679            _ => return Value::Map(map),
680        };
681        let labels = match map.get("_labels") {
682            Some(Value::List(list)) => list
683                .iter()
684                .filter_map(|v| match v {
685                    Value::String(s) => Some(s.clone()),
686                    _ => None,
687                })
688                .collect(),
689            _ => Vec::new(),
690        };
691        let properties = extract_properties_from_map(&map);
692        return Value::Node(Node {
693            vid,
694            labels,
695            properties,
696        });
697    }
698
699    Value::Map(map)
700}
701
702/// Extract user-visible properties from a raw graph-entity map.
703///
704/// Properties are stored in `_all_props` (deserialized by `arrow_to_value` from
705/// the LargeBinary CypherValue codec). Any non-internal keys at the top level
706/// are also included as schema-defined column properties.
707fn extract_properties_from_map(map: &HashMap<String, Value>) -> HashMap<String, Value> {
708    let mut properties = HashMap::new();
709
710    // Primary source: _all_props contains all properties from storage
711    if let Some(Value::Map(all_props)) = map.get("_all_props") {
712        for (k, v) in all_props {
713            properties.insert(k.clone(), v.clone());
714        }
715    }
716
717    // Secondary: inline non-internal keys (schema-defined property columns)
718    for (k, v) in map {
719        if !k.starts_with('_') && k != "properties" {
720            properties.entry(k.clone()).or_insert_with(|| v.clone());
721        }
722    }
723
724    properties
725}